use tokio_util::sync::CancellationToken;
use crate::config::SyncConfig;
use crate::embedding::{EmbeddingService, EmbeddingStats};
use crate::harvest::HarvestService;
use crate::progress::{ProgressReporter, SilentReporter};
use crate::sync::{BatchHarvestSummary, ContentHashDetector, DeltaDetector, SyncResult, SyncStats};
use crate::traits::{DatasetStore, EmbeddingProvider, PortalClientFactory};
use crate::{AppError, PortalEntry, PortalType};
pub struct HarvestPipeline<S, E, F, D = ContentHashDetector>
where
S: DatasetStore,
E: EmbeddingProvider,
F: PortalClientFactory,
D: DeltaDetector,
{
pub harvest: HarvestService<S, F, D>,
pub embedding: EmbeddingService<S, E>,
}
impl<S, E, F, D> Clone for HarvestPipeline<S, E, F, D>
where
S: DatasetStore + Clone,
E: EmbeddingProvider + Clone,
F: PortalClientFactory + Clone,
D: DeltaDetector + Clone,
{
fn clone(&self) -> Self {
Self {
harvest: self.harvest.clone(),
embedding: self.embedding.clone(),
}
}
}
impl<S, E, F> HarvestPipeline<S, E, F, ContentHashDetector>
where
S: DatasetStore + Clone,
E: EmbeddingProvider,
F: PortalClientFactory,
{
pub fn from_sync_config(store: S, embedding: E, portal_factory: F, config: SyncConfig) -> Self {
let harvest_config = config.harvest_config();
let embedding_config = config.embedding_service_config();
Self {
harvest: HarvestService::with_config(store.clone(), portal_factory, harvest_config),
embedding: EmbeddingService::with_config(store, embedding, embedding_config),
}
}
pub fn new(store: S, embedding: E, portal_factory: F) -> Self {
Self::from_sync_config(store, embedding, portal_factory, SyncConfig::default())
}
}
impl<S, E, F, D> HarvestPipeline<S, E, F, D>
where
S: DatasetStore + Clone,
E: EmbeddingProvider,
F: PortalClientFactory,
D: DeltaDetector,
{
pub async fn sync_portal(&self, portal_url: &str) -> Result<SyncStats, AppError> {
let (result, _) = self
.sync_portal_with_progress(
portal_url,
None,
"en",
&SilentReporter,
PortalType::default(),
None,
None,
)
.await?;
Ok(result.stats)
}
#[allow(clippy::too_many_arguments)]
pub async fn sync_portal_with_progress<R: ProgressReporter>(
&self,
portal_url: &str,
url_template: Option<&str>,
language: &str,
reporter: &R,
portal_type: PortalType,
profile: Option<&str>,
sparql_endpoint: Option<&str>,
) -> Result<(SyncResult, EmbeddingStats), AppError> {
self.sync_portal_with_progress_cancellable_with_options(
portal_url,
url_template,
language,
reporter,
CancellationToken::new(),
false,
portal_type,
profile,
sparql_endpoint,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn sync_portal_with_progress_cancellable_with_options<R: ProgressReporter>(
&self,
portal_url: &str,
url_template: Option<&str>,
language: &str,
reporter: &R,
cancel_token: CancellationToken,
force_full_sync: bool,
portal_type: PortalType,
profile: Option<&str>,
sparql_endpoint: Option<&str>,
) -> Result<(SyncResult, EmbeddingStats), AppError> {
let sync_result = self
.harvest
.sync_portal_with_progress_cancellable_with_options(
portal_url,
url_template,
language,
reporter,
cancel_token.clone(),
force_full_sync,
portal_type,
profile,
sparql_endpoint,
)
.await?;
if cancel_token.is_cancelled() || sync_result.is_cancelled() {
return Ok((sync_result, EmbeddingStats::default()));
}
let embed_stats = self
.embedding
.embed_pending(Some(portal_url), reporter, cancel_token)
.await?;
Ok((sync_result, embed_stats))
}
pub async fn batch_harvest(&self, portals: &[&PortalEntry]) -> BatchHarvestSummary {
self.batch_harvest_with_progress(portals, &SilentReporter)
.await
}
pub async fn batch_harvest_with_progress<R: ProgressReporter>(
&self,
portals: &[&PortalEntry],
reporter: &R,
) -> BatchHarvestSummary {
self.batch_harvest_with_progress_cancellable(portals, reporter, CancellationToken::new())
.await
}
pub async fn batch_harvest_with_progress_cancellable<R: ProgressReporter>(
&self,
portals: &[&PortalEntry],
reporter: &R,
cancel_token: CancellationToken,
) -> BatchHarvestSummary {
let summary = self
.harvest
.batch_harvest_with_progress_cancellable(portals, reporter, cancel_token.clone())
.await;
if !cancel_token.is_cancelled() {
match self
.embedding
.embed_pending(None, reporter, cancel_token)
.await
{
Ok(_) => {}
Err(e) => {
tracing::warn!(error = %e, "Post-batch embedding pass failed");
}
}
}
summary
}
}