use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use chrono::Utc;
use futures::stream::{self, StreamExt};
use tokio_util::sync::CancellationToken;
use crate::config::{HarvestConfig, PortalType};
use crate::models::NewDataset;
use crate::progress::{HarvestEvent, ProgressReporter, SilentReporter};
use crate::sync::{
AtomicSyncStats, ContentHashDetector, DeltaDetector, SyncOutcome, SyncResult, SyncStatus,
};
use crate::traits::{DatasetStore, PortalClient, PortalClientFactory};
use crate::{AppError, BatchHarvestSummary, PortalEntry, PortalHarvestResult, SyncStats};
struct PreProcessedDataset {
dataset: NewDataset,
outcome: SyncOutcome,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SyncMode {
Full,
Incremental,
}
impl SyncMode {
fn as_str(&self) -> &'static str {
match self {
SyncMode::Full => "full",
SyncMode::Incremental => "incremental",
}
}
}
#[derive(Clone)]
struct PreprocessContext<D: DeltaDetector> {
existing_hashes: HashMap<String, Option<String>>,
all_seen_ids: Arc<Mutex<Vec<String>>>,
stats: Arc<AtomicSyncStats>,
processed_count: Arc<AtomicUsize>,
delta_detector: D,
}
fn preprocess_dataset<C: PortalClient, D: DeltaDetector>(
portal_data: C::PortalData,
portal_url: &str,
url_template: Option<&str>,
language: &str,
ctx: &PreprocessContext<D>,
) -> Option<PreProcessedDataset> {
let new_dataset = C::into_new_dataset(portal_data, portal_url, url_template, language);
let decision = ctx.delta_detector.needs_reprocessing(
ctx.existing_hashes.get(&new_dataset.original_id),
&new_dataset.content_hash,
);
match decision.outcome {
SyncOutcome::Unchanged => {
ctx.stats.record(SyncOutcome::Unchanged);
if let Ok(mut ids) = ctx.all_seen_ids.lock() {
ids.push(new_dataset.original_id);
}
ctx.processed_count.fetch_add(1, Ordering::Relaxed);
None
}
SyncOutcome::Updated | SyncOutcome::Created => {
if let Ok(mut ids) = ctx.all_seen_ids.lock() {
ids.push(new_dataset.original_id.clone());
}
Some(PreProcessedDataset {
dataset: new_dataset,
outcome: decision.outcome,
})
}
SyncOutcome::Failed | SyncOutcome::Skipped => {
ctx.stats.record(decision.outcome);
ctx.processed_count.fetch_add(1, Ordering::Relaxed);
None
}
}
}
#[allow(dead_code)]
enum SyncPlan<PD: Send> {
Full {
ids: Vec<String>,
},
FullBulkStream {
estimated_total: usize,
},
Incremental {
datasets: Vec<PD>,
},
}
impl<PD: Send> SyncPlan<PD> {
fn len(&self) -> usize {
match self {
SyncPlan::Full { ids } => ids.len(),
SyncPlan::FullBulkStream { estimated_total } => *estimated_total,
SyncPlan::Incremental { datasets } => datasets.len(),
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct SyncOptions<'a> {
pub portal_url: &'a str,
pub url_template: Option<&'a str>,
pub language: &'a str,
pub force_full_sync: bool,
pub portal_type: PortalType,
pub profile: Option<&'a str>,
pub sparql_endpoint: Option<&'a str>,
}
pub struct HarvestService<S, F, D = ContentHashDetector>
where
S: DatasetStore,
F: PortalClientFactory,
D: DeltaDetector,
{
store: S,
portal_factory: F,
delta_detector: D,
config: HarvestConfig,
}
impl<S, F, D> Clone for HarvestService<S, F, D>
where
S: DatasetStore + Clone,
F: PortalClientFactory + Clone,
D: DeltaDetector + Clone,
{
fn clone(&self) -> Self {
Self {
store: self.store.clone(),
portal_factory: self.portal_factory.clone(),
delta_detector: self.delta_detector.clone(),
config: self.config.clone(),
}
}
}
impl<S, F> HarvestService<S, F, ContentHashDetector>
where
S: DatasetStore,
F: PortalClientFactory,
{
pub fn new(store: S, portal_factory: F) -> Self {
Self {
store,
portal_factory,
delta_detector: ContentHashDetector,
config: HarvestConfig::default(),
}
}
pub fn with_config(store: S, portal_factory: F, config: HarvestConfig) -> Self {
Self {
store,
portal_factory,
delta_detector: ContentHashDetector,
config,
}
}
}
impl<S, F, D> HarvestService<S, F, D>
where
S: DatasetStore,
F: PortalClientFactory,
D: DeltaDetector,
{
pub fn with_delta_detector(
store: S,
portal_factory: F,
delta_detector: D,
config: HarvestConfig,
) -> Self {
Self {
store,
portal_factory,
delta_detector,
config,
}
}
pub async fn sync_portal(&self, portal_url: &str) -> Result<SyncStats, AppError> {
self.sync_portal_with_progress(
portal_url,
None,
"en",
&SilentReporter,
PortalType::default(),
None,
None,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn sync_portal_with_progress<R: ProgressReporter>(
&self,
portal_url: &str,
url_template: Option<&str>,
language: &str,
reporter: &R,
portal_type: PortalType,
profile: Option<&str>,
sparql_endpoint: Option<&str>,
) -> Result<SyncStats, AppError> {
let result = self
.sync_portal_with_progress_cancellable_internal(
SyncOptions {
portal_url,
url_template,
language,
force_full_sync: self.config.force_full_sync,
portal_type,
profile,
sparql_endpoint,
},
reporter,
CancellationToken::new(), )
.await?;
Ok(result.stats)
}
async fn determine_sync_plan<R: ProgressReporter>(
&self,
portal_url: &str,
portal_client: &F::Client,
reporter: &R,
force_full_sync: bool,
) -> Result<(SyncMode, SyncPlan<<F::Client as PortalClient>::PortalData>), AppError> {
if force_full_sync {
tracing::info!(portal = portal_url, "Force full sync requested");
return self
.full_sync_plan(portal_url, portal_client, reporter)
.await;
}
let last_sync = self.store.get_last_sync_time(portal_url).await?;
match last_sync {
Some(since) => {
tracing::info!(
portal = portal_url,
last_sync = %since,
"Attempting incremental sync"
);
match portal_client.search_modified_since(since).await {
Ok(datasets) => {
let count = datasets.len();
tracing::info!(
portal = portal_url,
modified_count = count,
"Incremental sync: found {} modified datasets",
count
);
reporter.report(HarvestEvent::PortalDatasetsFound { count });
Ok((SyncMode::Incremental, SyncPlan::Incremental { datasets }))
}
Err(e) => {
tracing::warn!(
portal = portal_url,
error = %e,
"Incremental sync failed, falling back to full sync"
);
self.full_sync_plan(portal_url, portal_client, reporter)
.await
}
}
}
None => {
tracing::info!(portal = portal_url, "First sync, using full sync");
self.full_sync_plan(portal_url, portal_client, reporter)
.await
}
}
}
async fn full_sync_plan<R: ProgressReporter>(
&self,
portal_url: &str,
portal_client: &F::Client,
reporter: &R,
) -> Result<(SyncMode, SyncPlan<<F::Client as PortalClient>::PortalData>), AppError> {
let estimated_total = portal_client.dataset_count().await.unwrap_or(0);
if estimated_total > 0 {
tracing::info!(
portal = portal_url,
count = estimated_total,
"Full sync: streaming {} datasets page-by-page",
estimated_total
);
reporter.report(HarvestEvent::PortalDatasetsFound {
count: estimated_total,
});
} else {
tracing::info!(
portal = portal_url,
"Full sync: streaming datasets (total unknown)"
);
}
Ok((SyncMode::Full, SyncPlan::FullBulkStream { estimated_total }))
}
pub async fn batch_harvest(&self, portals: &[&PortalEntry]) -> BatchHarvestSummary {
self.batch_harvest_with_progress(portals, &SilentReporter)
.await
}
pub async fn batch_harvest_with_progress<R: ProgressReporter>(
&self,
portals: &[&PortalEntry],
reporter: &R,
) -> BatchHarvestSummary {
self.batch_harvest_with_progress_cancellable(
portals,
reporter,
CancellationToken::new(), )
.await
}
pub async fn sync_portal_cancellable(
&self,
portal_url: &str,
cancel_token: CancellationToken,
) -> Result<SyncResult, AppError> {
self.sync_portal_with_progress_cancellable_internal(
SyncOptions {
portal_url,
url_template: None,
language: "en",
force_full_sync: self.config.force_full_sync,
portal_type: PortalType::default(),
profile: None,
sparql_endpoint: None,
},
&SilentReporter,
cancel_token,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn sync_portal_with_progress_cancellable<R: ProgressReporter>(
&self,
portal_url: &str,
url_template: Option<&str>,
language: &str,
reporter: &R,
cancel_token: CancellationToken,
portal_type: PortalType,
profile: Option<&str>,
sparql_endpoint: Option<&str>,
) -> Result<SyncResult, AppError> {
self.sync_portal_with_progress_cancellable_internal(
SyncOptions {
portal_url,
url_template,
language,
force_full_sync: self.config.force_full_sync,
portal_type,
profile,
sparql_endpoint,
},
reporter,
cancel_token,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn sync_portal_with_progress_cancellable_with_options<R: ProgressReporter>(
&self,
portal_url: &str,
url_template: Option<&str>,
language: &str,
reporter: &R,
cancel_token: CancellationToken,
force_full_sync: bool,
portal_type: PortalType,
profile: Option<&str>,
sparql_endpoint: Option<&str>,
) -> Result<SyncResult, AppError> {
let force_full_sync = self.config.force_full_sync || force_full_sync;
self.sync_portal_with_progress_cancellable_internal(
SyncOptions {
portal_url,
url_template,
language,
force_full_sync,
portal_type,
profile,
sparql_endpoint,
},
reporter,
cancel_token,
)
.await
}
async fn sync_portal_with_progress_cancellable_internal<R: ProgressReporter>(
&self,
options: SyncOptions<'_>,
reporter: &R,
cancel_token: CancellationToken,
) -> Result<SyncResult, AppError> {
let SyncOptions {
portal_url,
url_template,
language,
force_full_sync,
portal_type,
profile,
sparql_endpoint,
} = options;
let sync_start = Utc::now();
if cancel_token.is_cancelled() {
if let Err(e) = self
.store
.record_sync_status(
portal_url,
sync_start,
"unknown",
SyncStatus::Cancelled.as_str(),
0,
)
.await
{
tracing::warn!(error = %e, "Failed to record cancelled sync status");
}
return Ok(SyncResult::cancelled(SyncStats::default()));
}
let portal_client = self.portal_factory.create(
portal_url,
portal_type,
language,
profile,
sparql_endpoint,
)?;
let (sync_mode, plan) = self
.determine_sync_plan(portal_url, &portal_client, reporter, force_full_sync)
.await?;
if cancel_token.is_cancelled() {
if let Err(e) = self
.store
.record_sync_status(
portal_url,
sync_start,
sync_mode.as_str(),
SyncStatus::Cancelled.as_str(),
0,
)
.await
{
tracing::warn!(error = %e, "Failed to record cancelled sync status");
}
return Ok(SyncResult::cancelled(SyncStats::default()));
}
let existing_hashes = self.store.get_hashes_for_portal(portal_url).await?;
reporter.report(HarvestEvent::ExistingDatasetsFound {
count: existing_hashes.len(),
});
let total = plan.len();
let is_definitely_empty = total == 0 && !matches!(plan, SyncPlan::FullBulkStream { .. });
if is_definitely_empty {
tracing::info!(
portal = portal_url,
"No datasets to process (portal up to date)"
);
if let Err(e) = self
.store
.record_sync_status(
portal_url,
sync_start,
sync_mode.as_str(),
SyncStatus::Completed.as_str(),
0,
)
.await
{
tracing::warn!(error = %e, "Failed to record sync status");
}
return Ok(SyncResult::completed(SyncStats::default()));
}
let stats = Arc::new(AtomicSyncStats::new());
let all_seen_ids: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let processed_count = Arc::new(AtomicUsize::new(0));
let last_reported = Arc::new(AtomicUsize::new(0));
let was_cancelled = Arc::new(std::sync::atomic::AtomicBool::new(false));
reporter.report(HarvestEvent::PreprocessingStarted { total });
let report_interval = std::cmp::max(total / 20, 50);
let url_template_arc: Option<Arc<str>> = url_template.map(Arc::from);
let language_arc: Arc<str> = Arc::from(language);
let upsert_batch_size = self.config.upsert_batch_size.max(1);
match plan {
SyncPlan::Full { ids } => {
let ctx = PreprocessContext {
existing_hashes,
all_seen_ids: Arc::clone(&all_seen_ids),
stats: Arc::clone(&stats),
processed_count: Arc::clone(&processed_count),
delta_detector: self.delta_detector.clone(),
};
let portal_url_arc: Arc<str> = Arc::from(portal_url);
let pre_processed = stream::iter(ids)
.map(|id| {
let portal_client = portal_client.clone();
let portal_url_owned = Arc::clone(&portal_url_arc);
let cancel_token = cancel_token.clone();
let was_cancelled = Arc::clone(&was_cancelled);
let url_template = url_template_arc.clone();
let language = Arc::clone(&language_arc);
let ctx = ctx.clone();
async move {
if cancel_token.is_cancelled() {
was_cancelled.store(true, Ordering::SeqCst);
return None;
}
let portal_data = match portal_client.get_dataset(&id).await {
Ok(data) => data,
Err(e) => {
tracing::warn!(
portal = portal_url_owned.as_ref(),
dataset_id = id,
error = %e,
"Failed to fetch dataset, skipping"
);
ctx.stats.record(SyncOutcome::Failed);
ctx.processed_count.fetch_add(1, Ordering::Relaxed);
return None;
}
};
if cancel_token.is_cancelled() {
was_cancelled.store(true, Ordering::SeqCst);
return None;
}
preprocess_dataset::<F::Client, D>(
portal_data,
&portal_url_owned,
url_template.as_deref(),
&language,
&ctx,
)
}
})
.buffer_unordered(self.config.concurrency)
.take_while(|_| {
let is_cancelled = cancel_token.is_cancelled();
async move { !is_cancelled }
})
.filter_map(|opt| async { opt });
if self.config.dry_run {
use futures::stream::StreamExt as _;
let mut stream = std::pin::pin!(pre_processed);
while let Some(item) = stream.next().await {
stats.record(item.outcome);
processed_count.fetch_add(1, Ordering::Relaxed);
}
if cancel_token.is_cancelled() {
was_cancelled.store(true, Ordering::SeqCst);
}
} else {
let mut batch = Vec::with_capacity(upsert_batch_size);
let mut stream = std::pin::pin!(pre_processed);
while let Some(item) = stream.next().await {
if cancel_token.is_cancelled() {
was_cancelled.store(true, Ordering::SeqCst);
break;
}
batch.push(item);
if batch.len() >= upsert_batch_size {
let full_batch = std::mem::replace(
&mut batch,
Vec::with_capacity(upsert_batch_size),
);
let batch_len = full_batch.len();
Self::process_upsert_batch(full_batch, &self.store, &stats).await;
Self::report_progress(
&processed_count,
batch_len,
total,
report_interval,
&last_reported,
&stats,
reporter,
);
}
}
if !batch.is_empty() {
let batch_len = batch.len();
Self::process_upsert_batch(batch, &self.store, &stats).await;
Self::report_progress(
&processed_count,
batch_len,
total,
report_interval,
&last_reported,
&stats,
reporter,
);
}
}
}
SyncPlan::FullBulkStream { estimated_total: _ } => {
let ctx = PreprocessContext {
existing_hashes,
all_seen_ids: Arc::clone(&all_seen_ids),
stats: Arc::clone(&stats),
processed_count: Arc::clone(&processed_count),
delta_detector: self.delta_detector.clone(),
};
let url_template_ref = url_template_arc.as_deref();
let mut page_stream = portal_client.search_all_datasets_stream();
let mut batch = Vec::with_capacity(upsert_batch_size);
while let Some(page_result) = page_stream.next().await {
if cancel_token.is_cancelled() {
was_cancelled.store(true, Ordering::SeqCst);
break;
}
let page = match page_result {
Ok(datasets) => datasets,
Err(e) => {
tracing::warn!(
portal = portal_url,
error = %e,
"Page fetch failed during streaming harvest"
);
stats.record(SyncOutcome::Failed);
break;
}
};
for portal_data in page {
if cancel_token.is_cancelled() {
was_cancelled.store(true, Ordering::SeqCst);
break;
}
if self.config.dry_run {
if let Some(item) = preprocess_dataset::<F::Client, D>(
portal_data,
portal_url,
url_template_ref,
language,
&ctx,
) {
stats.record(item.outcome);
let current = processed_count.fetch_add(1, Ordering::Relaxed) + 1;
let last = last_reported.load(Ordering::Relaxed);
if (current >= last + report_interval
|| (total > 0 && current >= total))
&& last_reported
.compare_exchange(
last,
current,
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_ok()
{
let current_stats = stats.to_stats();
reporter.report(HarvestEvent::DatasetProcessed {
current,
total,
created: current_stats.created,
updated: current_stats.updated,
unchanged: current_stats.unchanged,
failed: current_stats.failed,
skipped: current_stats.skipped,
});
}
}
} else if let Some(item) = preprocess_dataset::<F::Client, D>(
portal_data,
portal_url,
url_template_ref,
language,
&ctx,
) {
batch.push(item);
if batch.len() >= upsert_batch_size {
let full_batch = std::mem::replace(
&mut batch,
Vec::with_capacity(upsert_batch_size),
);
let batch_len = full_batch.len();
Self::process_upsert_batch(full_batch, &self.store, &stats).await;
Self::report_progress(
&processed_count,
batch_len,
total,
report_interval,
&last_reported,
&stats,
reporter,
);
}
}
}
}
if !batch.is_empty() {
let batch_len = batch.len();
Self::process_upsert_batch(batch, &self.store, &stats).await;
Self::report_progress(
&processed_count,
batch_len,
total,
report_interval,
&last_reported,
&stats,
reporter,
);
}
}
SyncPlan::Incremental { datasets } => {
let ctx = PreprocessContext {
existing_hashes,
all_seen_ids: Arc::clone(&all_seen_ids),
stats: Arc::clone(&stats),
processed_count: Arc::clone(&processed_count),
delta_detector: self.delta_detector.clone(),
};
let url_template_ref = url_template_arc.as_deref();
if self.config.dry_run {
for portal_data in datasets {
if cancel_token.is_cancelled() {
was_cancelled.store(true, Ordering::SeqCst);
break;
}
if let Some(item) = preprocess_dataset::<F::Client, D>(
portal_data,
portal_url,
url_template_ref,
language,
&ctx,
) {
stats.record(item.outcome);
let current = processed_count.fetch_add(1, Ordering::Relaxed) + 1;
let last = last_reported.load(Ordering::Relaxed);
if (current >= last + report_interval || current >= total)
&& last_reported
.compare_exchange(
last,
current,
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_ok()
{
let current_stats = stats.to_stats();
reporter.report(HarvestEvent::DatasetProcessed {
current,
total,
created: current_stats.created,
updated: current_stats.updated,
unchanged: current_stats.unchanged,
failed: current_stats.failed,
skipped: current_stats.skipped,
});
}
}
}
} else {
let mut batch = Vec::with_capacity(upsert_batch_size);
for portal_data in datasets {
if cancel_token.is_cancelled() {
was_cancelled.store(true, Ordering::SeqCst);
break;
}
if let Some(item) = preprocess_dataset::<F::Client, D>(
portal_data,
portal_url,
url_template_ref,
language,
&ctx,
) {
batch.push(item);
if batch.len() >= upsert_batch_size {
let full_batch = std::mem::replace(
&mut batch,
Vec::with_capacity(upsert_batch_size),
);
let batch_len = full_batch.len();
Self::process_upsert_batch(full_batch, &self.store, &stats).await;
Self::report_progress(
&processed_count,
batch_len,
total,
report_interval,
&last_reported,
&stats,
reporter,
);
}
}
}
if !batch.is_empty() {
let batch_len = batch.len();
Self::process_upsert_batch(batch, &self.store, &stats).await;
Self::report_progress(
&processed_count,
batch_len,
total,
report_interval,
&last_reported,
&stats,
reporter,
);
}
}
}
}
{
let preprocess_stats = stats.to_stats();
reporter.report(HarvestEvent::PreprocessingCompleted {
changed: preprocess_stats.created + preprocess_stats.updated,
unchanged: preprocess_stats.unchanged,
failed: preprocess_stats.failed,
});
}
if self.config.dry_run {
let final_stats = stats.to_stats();
let is_cancelled = was_cancelled.load(Ordering::SeqCst) || cancel_token.is_cancelled();
if is_cancelled {
tracing::info!(
portal = portal_url,
created = final_stats.created,
updated = final_stats.updated,
unchanged = final_stats.unchanged,
failed = final_stats.failed,
"Dry run cancelled — no changes written"
);
return Ok(SyncResult::cancelled(final_stats));
} else {
tracing::info!(
portal = portal_url,
created = final_stats.created,
updated = final_stats.updated,
unchanged = final_stats.unchanged,
failed = final_stats.failed,
"Dry run complete — no changes written"
);
return Ok(SyncResult::completed(final_stats));
}
}
let final_stats = stats.to_stats();
let is_cancelled = was_cancelled.load(Ordering::SeqCst) || cancel_token.is_cancelled();
let status = if is_cancelled {
SyncStatus::Cancelled
} else {
SyncStatus::Completed
};
if sync_mode == SyncMode::Full
&& !is_cancelled
&& final_stats.failed == 0
&& final_stats.skipped == 0
{
let seen_list = all_seen_ids
.lock()
.ok()
.map(|g| g.to_vec())
.unwrap_or_default();
match self
.store
.mark_stale_by_exclusion(portal_url, &seen_list)
.await
{
Ok(stale_count) if stale_count > 0 => {
tracing::warn!(
portal = portal_url,
stale_count,
"Marked {} dataset(s) as stale (not found on portal)",
stale_count
);
reporter.report(HarvestEvent::StaleDetected {
count: stale_count as usize,
});
}
Err(e) => {
tracing::warn!(
portal = portal_url,
error = %e,
"Failed to mark stale datasets (non-fatal)"
);
}
_ => {}
}
}
tracing::info!(
portal = portal_url,
status = status.as_str(),
"Finalizing: recording sync status..."
);
if let Err(e) = self
.store
.record_sync_status(
portal_url,
sync_start,
sync_mode.as_str(),
status.as_str(),
final_stats.total() as i32,
)
.await
{
tracing::warn!(error = %e, "Failed to record sync status");
}
if is_cancelled {
tracing::info!(
portal = portal_url,
processed = final_stats.total(),
"Sync cancelled - partial progress saved"
);
Ok(SyncResult::cancelled(final_stats))
} else {
Ok(SyncResult::completed(final_stats))
}
}
fn report_progress(
processed_count: &Arc<AtomicUsize>,
batch_len: usize,
total: usize,
report_interval: usize,
last_reported: &Arc<AtomicUsize>,
stats: &Arc<AtomicSyncStats>,
reporter: &impl ProgressReporter,
) {
let current = processed_count.fetch_add(batch_len, Ordering::Relaxed) + batch_len;
let last = last_reported.load(Ordering::Relaxed);
let should_report = current >= last + report_interval || current >= total;
if should_report
&& last_reported
.compare_exchange(last, current, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
let current_stats = stats.to_stats();
reporter.report(HarvestEvent::DatasetProcessed {
current,
total,
created: current_stats.created,
updated: current_stats.updated,
unchanged: current_stats.unchanged,
failed: current_stats.failed,
skipped: current_stats.skipped,
});
}
}
async fn process_upsert_batch(
batch: Vec<PreProcessedDataset>,
store: &S,
stats: &Arc<AtomicSyncStats>,
) {
let mut seen = std::collections::HashSet::new();
let mut deduped = Vec::with_capacity(batch.len());
let mut dup_count = 0usize;
for item in batch {
let key = (
item.dataset.original_id.clone(),
item.dataset.source_portal.clone(),
);
if seen.insert(key) {
deduped.push(item);
} else {
dup_count += 1;
}
}
if dup_count > 0 {
tracing::debug!(dup_count, "Removed duplicate datasets within upsert batch");
for _ in 0..dup_count {
stats.record(SyncOutcome::Skipped);
}
}
let outcomes: Vec<SyncOutcome> = deduped.iter().map(|i| i.outcome).collect();
let datasets: Vec<NewDataset> = deduped.into_iter().map(|i| i.dataset).collect();
match store.batch_upsert(&datasets).await {
Ok(_) => {
for outcome in outcomes {
stats.record(outcome);
}
}
Err(e) => {
tracing::warn!(
count = datasets.len(),
error = %e,
"Failed to batch upsert datasets"
);
for _ in 0..datasets.len() {
stats.record(SyncOutcome::Failed);
}
}
}
}
pub async fn batch_harvest_cancellable(
&self,
portals: &[&PortalEntry],
cancel_token: CancellationToken,
) -> BatchHarvestSummary {
self.batch_harvest_with_progress_cancellable(portals, &SilentReporter, cancel_token)
.await
}
pub async fn batch_harvest_with_progress_cancellable<R: ProgressReporter>(
&self,
portals: &[&PortalEntry],
reporter: &R,
cancel_token: CancellationToken,
) -> BatchHarvestSummary {
let mut summary = BatchHarvestSummary::new();
let total = portals.len();
reporter.report(HarvestEvent::BatchStarted {
total_portals: total,
});
for (i, portal) in portals.iter().enumerate() {
if cancel_token.is_cancelled() {
reporter.report(HarvestEvent::BatchCancelled {
completed_portals: i,
total_portals: total,
});
break;
}
reporter.report(HarvestEvent::PortalStarted {
portal_index: i,
total_portals: total,
portal_name: &portal.name,
portal_url: &portal.url,
});
match self
.sync_portal_with_progress_cancellable(
&portal.url,
portal.url_template.as_deref(),
portal.language(),
reporter,
cancel_token.clone(),
portal.portal_type,
portal.profile(),
portal.sparql_endpoint(),
)
.await
{
Ok(result) => {
if result.is_cancelled() {
reporter.report(HarvestEvent::PortalCancelled {
portal_index: i,
total_portals: total,
portal_name: &portal.name,
stats: &result.stats,
});
summary.add(PortalHarvestResult::success(
portal.name.clone(),
portal.url.clone(),
result.stats,
));
reporter.report(HarvestEvent::BatchCancelled {
completed_portals: i + 1,
total_portals: total,
});
break;
} else {
reporter.report(HarvestEvent::PortalCompleted {
portal_index: i,
total_portals: total,
portal_name: &portal.name,
stats: &result.stats,
});
summary.add(PortalHarvestResult::success(
portal.name.clone(),
portal.url.clone(),
result.stats,
));
}
}
Err(e) => {
let error_str = e.to_string();
reporter.report(HarvestEvent::PortalFailed {
portal_index: i,
total_portals: total,
portal_name: &portal.name,
error: &error_str,
});
summary.add(PortalHarvestResult::failure(
portal.name.clone(),
portal.url.clone(),
error_str,
));
}
}
}
if !cancel_token.is_cancelled() {
reporter.report(HarvestEvent::BatchCompleted { summary: &summary });
}
summary
}
}
#[cfg(test)]
mod tests {
use crate::config::HarvestConfig;
#[test]
fn test_harvest_config_default() {
let config = HarvestConfig::default();
assert!(config.concurrency > 0, "concurrency should be positive");
}
}