use anyhow::Result;
use std::path::{Path, PathBuf};
use systemprompt_database::DbPool;
use systemprompt_models::services::ServicesConfig;
use systemprompt_models::{AppPaths, ContentSourceConfigRaw};
use systemprompt_traits::JobResult;
use crate::services::IngestionService;
use crate::{IngestionOptions, IngestionReport, IngestionSource};
struct IngestionStats {
processed: u64,
errors: u64,
}
pub async fn execute_content_ingestion(
db_pool: &DbPool,
services_config: &ServicesConfig,
) -> Result<JobResult> {
let start_time = std::time::Instant::now();
log_job_started();
let ingestion_service = create_ingestion_service(db_pool)?;
let sources = get_enabled_sources(services_config);
if sources.is_empty() {
return Ok(empty_sources_result(start_time));
}
log_processing_sources(sources.len());
let stats = process_all_sources(&ingestion_service, &sources).await?;
Ok(build_result(start_time, &stats))
}
fn log_job_started() {
tracing::info!("Content ingestion job started");
}
fn create_ingestion_service(db_pool: &DbPool) -> Result<IngestionService> {
IngestionService::new(db_pool)
.map_err(|e| anyhow::anyhow!("Failed to create ingestion service: {}", e))
}
fn get_enabled_sources(
services_config: &ServicesConfig,
) -> Vec<(&String, &ContentSourceConfigRaw)> {
services_config
.content
.raw
.content_sources
.iter()
.filter(|(name, cfg)| cfg.enabled && !name.contains("skill"))
.collect()
}
fn empty_sources_result(start_time: std::time::Instant) -> JobResult {
tracing::warn!("No enabled content sources found");
JobResult::success()
.with_message("No enabled content sources")
.with_duration(start_time.elapsed().as_millis() as u64)
}
fn log_processing_sources(count: usize) {
tracing::debug!(sources_count = count, "Processing content sources");
}
async fn process_all_sources(
service: &IngestionService,
sources: &[(&String, &ContentSourceConfigRaw)],
) -> Result<IngestionStats> {
let mut stats = IngestionStats {
processed: 0,
errors: 0,
};
for (name, config) in sources {
process_single_source(service, name, config, &mut stats).await?;
}
Ok(stats)
}
async fn process_single_source(
service: &IngestionService,
name: &str,
config: &ContentSourceConfigRaw,
stats: &mut IngestionStats,
) -> Result<()> {
tracing::debug!(source = %name, "Ingesting source");
let content_path = resolve_content_path(&config.path)?;
if let Some(err) = validate_source(name, &content_path) {
stats.errors += 1;
log_validation_error(&err);
return Ok(());
}
let report = ingest_source(service, name, &content_path, config).await;
update_stats_from_report(name, report, stats);
Ok(())
}
fn resolve_content_path(path: &str) -> Result<PathBuf> {
let path = Path::new(path);
if path.is_absolute() {
Ok(path.to_path_buf())
} else {
let paths = AppPaths::get().map_err(|e| anyhow::anyhow!("{}", e))?;
Ok(paths.system().services().join(path))
}
}
enum ValidationError {
PathNotFound(PathBuf),
}
fn validate_source(_name: &str, path: &Path) -> Option<ValidationError> {
if !path.exists() {
return Some(ValidationError::PathNotFound(path.to_path_buf()));
}
None
}
fn log_validation_error(err: &ValidationError) {
match err {
ValidationError::PathNotFound(p) => {
tracing::warn!(path = %p.display(), "Source path not found");
},
}
}
async fn ingest_source(
service: &IngestionService,
source_name: &str,
path: &Path,
config: &ContentSourceConfigRaw,
) -> Result<IngestionReport, crate::ContentError> {
let override_existing = config.indexing.is_some_and(|i| i.override_existing);
let recursive = config.indexing.is_some_and(|i| i.recursive);
let source = IngestionSource::new(&config.source_id, source_name, &config.category_id);
service
.ingest_directory(
path,
&source,
IngestionOptions::default()
.with_override(override_existing)
.with_recursive(recursive),
)
.await
}
fn update_stats_from_report(
name: &str,
report: Result<IngestionReport, crate::ContentError>,
stats: &mut IngestionStats,
) {
match report {
Ok(r) => {
stats.processed += r.files_processed as u64;
stats.errors += r.errors.len() as u64;
log_ingestion_errors(&r.errors);
log_source_ingested(name, &r);
},
Err(e) => {
tracing::error!(source = %name, error = %e, "Source ingestion failed");
stats.errors += 1;
},
}
}
fn log_ingestion_errors(errors: &[String]) {
for error in errors {
tracing::warn!(error = %error, "Ingestion error");
}
}
fn log_source_ingested(name: &str, report: &IngestionReport) {
tracing::debug!(
source = %name,
files_found = report.files_found,
files_processed = report.files_processed,
error_count = report.errors.len(),
"Source ingested"
);
}
fn build_result(start_time: std::time::Instant, stats: &IngestionStats) -> JobResult {
let duration_ms = start_time.elapsed().as_millis() as u64;
tracing::info!(
files_processed = stats.processed,
errors = stats.errors,
duration_ms = duration_ms,
"Content ingestion job completed"
);
JobResult::success()
.with_stats(stats.processed, stats.errors)
.with_duration(duration_ms)
}