use crate::error::{SyncError, SyncResult};
use crate::local::{ContentDiffEntry, ContentLocalSync};
use crate::models::LocalSyncDirection;
use async_trait::async_trait;
use std::path::Path;
use std::sync::Arc;
use systemprompt_database::DbPool;
use systemprompt_models::{AppPaths, ContentConfigRaw};
use systemprompt_traits::{Job, JobContext, JobResult, ProviderError, ProviderResult};
#[derive(Debug, Clone, Copy)]
pub struct ContentSyncJob;
#[async_trait]
impl Job for ContentSyncJob {
fn name(&self) -> &'static str {
"content_sync"
}
fn description(&self) -> &'static str {
"Synchronize content between database and filesystem"
}
fn schedule(&self) -> &'static str {
""
}
fn tags(&self) -> Vec<&'static str> {
vec!["content", "sync"]
}
fn enabled(&self) -> bool {
false
}
async fn execute(&self, ctx: &JobContext) -> ProviderResult<JobResult> {
let start_time = std::time::Instant::now();
let db_pool = Arc::clone(ctx.db_pool::<DbPool>().ok_or_else(|| {
ProviderError::Configuration("DbPool not available in job context".into())
})?);
let paths = ctx
.app_paths::<Arc<AppPaths>>()
.ok_or_else(|| {
ProviderError::Configuration("AppPaths not available in job context".into())
})?
.as_ref();
tracing::info!("Content sync job started");
let direction = get_direction_from_params(ctx)
.map_err(|e| ProviderError::InvalidInput(e.to_string()))?;
let delete_orphans = get_bool_param(ctx, "delete_orphans");
let override_existing = get_bool_param(ctx, "override_existing");
let config =
load_content_config(paths).map_err(|e| ProviderError::Configuration(e.to_string()))?;
let services_path = paths.system().services();
let sources: Vec<_> = config
.content_sources
.into_iter()
.filter(|(_, source)| source.enabled)
.filter(|(_, source)| !source.allowed_content_types.contains(&"skill".to_string()))
.collect();
if sources.is_empty() {
let duration_ms = start_time.elapsed().as_millis() as u64;
tracing::warn!("No enabled content sources found");
return Ok(JobResult::success()
.with_message("No enabled content sources")
.with_duration(duration_ms));
}
let sync = ContentLocalSync::new(db_pool);
let mut all_diffs: Vec<ContentDiffEntry> = Vec::new();
for (name, source) in sources {
let source_path = resolve_source_path(&source.path, services_path);
let diff = sync
.calculate_diff(
&source.source_id,
&source_path,
&source.allowed_content_types,
)
.await
.map_err(|e| {
ProviderError::RenderFailed(format!(
"Failed to calculate diff for source {name}: {e}"
))
})?;
all_diffs.push(ContentDiffEntry {
name,
source_id: source.source_id.clone(),
category_id: source.category_id.clone(),
path: source_path,
allowed_content_types: source.allowed_content_types.clone(),
diff,
});
}
let has_changes = all_diffs.iter().any(|e| e.diff.has_changes());
if !has_changes {
let duration_ms = start_time.elapsed().as_millis() as u64;
tracing::info!("Content is in sync - no changes needed");
return Ok(JobResult::success()
.with_message("Content is in sync")
.with_stats(0, 0)
.with_duration(duration_ms));
}
let result = match direction {
LocalSyncDirection::ToDisk => sync
.sync_to_disk(&all_diffs, delete_orphans)
.await
.map_err(|e| ProviderError::RenderFailed(e.to_string()))?,
LocalSyncDirection::ToDatabase => sync
.sync_to_db(&all_diffs, delete_orphans, override_existing)
.await
.map_err(|e| ProviderError::RenderFailed(e.to_string()))?,
};
let duration_ms = start_time.elapsed().as_millis() as u64;
tracing::info!(
direction = %result.direction,
items_synced = result.items_synced,
items_deleted = result.items_deleted,
items_skipped = result.items_skipped,
errors = result.errors.len(),
duration_ms,
"Content sync job completed"
);
Ok(JobResult::success()
.with_stats(result.items_synced as u64, result.errors.len() as u64)
.with_duration(duration_ms))
}
}
fn get_direction_from_params(ctx: &JobContext) -> SyncResult<LocalSyncDirection> {
let params = ctx.parameters();
let direction_str = params.get("direction").map_or("to_db", String::as_str);
match direction_str {
"to_disk" | "to-disk" | "disk" => Ok(LocalSyncDirection::ToDisk),
"to_db" | "to-db" | "db" | "to_database" => Ok(LocalSyncDirection::ToDatabase),
other => Err(SyncError::invalid_input(format!(
"Invalid direction '{other}'. Use 'to_disk' or 'to_db'"
))),
}
}
fn get_bool_param(ctx: &JobContext, key: &str) -> bool {
ctx.parameters()
.get(key)
.is_some_and(|v| v == "true" || v == "1" || v == "yes")
}
fn load_content_config(paths: &AppPaths) -> SyncResult<ContentConfigRaw> {
let config_path = paths.system().content_config();
if !config_path.exists() {
return Err(SyncError::MissingConfig(format!(
"Content config not found at: {}",
config_path.display()
)));
}
let display_path = config_path.display().to_string();
let content = std::fs::read_to_string(config_path).map_err(|e| {
SyncError::internal(format!("Failed to read content config {display_path}: {e}"))
})?;
let config: ContentConfigRaw = serde_yaml::from_str(&content)?;
Ok(config)
}
fn resolve_source_path(path: &str, services_path: &Path) -> std::path::PathBuf {
let path = Path::new(path);
if path.is_absolute() {
path.to_path_buf()
} else {
services_path.join(path)
}
}
systemprompt_provider_contracts::submit_job!(&ContentSyncJob);