systemprompt-sync 0.6.0

Cloud sync services for systemprompt.io AI governance infrastructure. File, database, and crate deployment across governance tenants in the MCP governance pipeline.
//! Scheduled job that synchronises content between disk and database.

use crate::error::{SyncError, SyncResult};
use crate::local::{ContentDiffEntry, ContentLocalSync};
use crate::models::LocalSyncDirection;
use async_trait::async_trait;
use std::path::Path;
use std::sync::Arc;
use systemprompt_database::DbPool;
use systemprompt_models::{AppPaths, ContentConfigRaw};
use systemprompt_traits::{Job, JobContext, JobResult, ProviderError, ProviderResult};

#[derive(Debug, Clone, Copy)]
pub struct ContentSyncJob;

#[async_trait]
impl Job for ContentSyncJob {
    fn name(&self) -> &'static str {
        "content_sync"
    }

    fn description(&self) -> &'static str {
        "Synchronize content between database and filesystem"
    }

    fn schedule(&self) -> &'static str {
        ""
    }

    fn tags(&self) -> Vec<&'static str> {
        vec!["content", "sync"]
    }

    fn enabled(&self) -> bool {
        false
    }

    async fn execute(&self, ctx: &JobContext) -> ProviderResult<JobResult> {
        let start_time = std::time::Instant::now();

        let db_pool = Arc::clone(ctx.db_pool::<DbPool>().ok_or_else(|| {
            ProviderError::Configuration("DbPool not available in job context".into())
        })?);

        let paths = ctx
            .app_paths::<Arc<AppPaths>>()
            .ok_or_else(|| {
                ProviderError::Configuration("AppPaths not available in job context".into())
            })?
            .as_ref();

        tracing::info!("Content sync job started");

        let direction = get_direction_from_params(ctx)
            .map_err(|e| ProviderError::InvalidInput(e.to_string()))?;
        let delete_orphans = get_bool_param(ctx, "delete_orphans");
        let override_existing = get_bool_param(ctx, "override_existing");

        let config =
            load_content_config(paths).map_err(|e| ProviderError::Configuration(e.to_string()))?;
        let services_path = paths.system().services();

        let sources: Vec<_> = config
            .content_sources
            .into_iter()
            .filter(|(_, source)| source.enabled)
            .filter(|(_, source)| !source.allowed_content_types.contains(&"skill".to_string()))
            .collect();

        if sources.is_empty() {
            let duration_ms = start_time.elapsed().as_millis() as u64;
            tracing::warn!("No enabled content sources found");
            return Ok(JobResult::success()
                .with_message("No enabled content sources")
                .with_duration(duration_ms));
        }

        let sync = ContentLocalSync::new(db_pool);
        let mut all_diffs: Vec<ContentDiffEntry> = Vec::new();

        for (name, source) in sources {
            let source_path = resolve_source_path(&source.path, services_path);

            let diff = sync
                .calculate_diff(
                    &source.source_id,
                    &source_path,
                    &source.allowed_content_types,
                )
                .await
                .map_err(|e| {
                    ProviderError::RenderFailed(format!(
                        "Failed to calculate diff for source {name}: {e}"
                    ))
                })?;

            all_diffs.push(ContentDiffEntry {
                name,
                source_id: source.source_id.clone(),
                category_id: source.category_id.clone(),
                path: source_path,
                allowed_content_types: source.allowed_content_types.clone(),
                diff,
            });
        }

        let has_changes = all_diffs.iter().any(|e| e.diff.has_changes());

        if !has_changes {
            let duration_ms = start_time.elapsed().as_millis() as u64;
            tracing::info!("Content is in sync - no changes needed");
            return Ok(JobResult::success()
                .with_message("Content is in sync")
                .with_stats(0, 0)
                .with_duration(duration_ms));
        }

        let result = match direction {
            LocalSyncDirection::ToDisk => sync
                .sync_to_disk(&all_diffs, delete_orphans)
                .await
                .map_err(|e| ProviderError::RenderFailed(e.to_string()))?,
            LocalSyncDirection::ToDatabase => sync
                .sync_to_db(&all_diffs, delete_orphans, override_existing)
                .await
                .map_err(|e| ProviderError::RenderFailed(e.to_string()))?,
        };

        let duration_ms = start_time.elapsed().as_millis() as u64;

        tracing::info!(
            direction = %result.direction,
            items_synced = result.items_synced,
            items_deleted = result.items_deleted,
            items_skipped = result.items_skipped,
            errors = result.errors.len(),
            duration_ms,
            "Content sync job completed"
        );

        Ok(JobResult::success()
            .with_stats(result.items_synced as u64, result.errors.len() as u64)
            .with_duration(duration_ms))
    }
}

fn get_direction_from_params(ctx: &JobContext) -> SyncResult<LocalSyncDirection> {
    let params = ctx.parameters();
    let direction_str = params.get("direction").map_or("to_db", String::as_str);

    match direction_str {
        "to_disk" | "to-disk" | "disk" => Ok(LocalSyncDirection::ToDisk),
        "to_db" | "to-db" | "db" | "to_database" => Ok(LocalSyncDirection::ToDatabase),
        other => Err(SyncError::invalid_input(format!(
            "Invalid direction '{other}'. Use 'to_disk' or 'to_db'"
        ))),
    }
}

fn get_bool_param(ctx: &JobContext, key: &str) -> bool {
    ctx.parameters()
        .get(key)
        .is_some_and(|v| v == "true" || v == "1" || v == "yes")
}

fn load_content_config(paths: &AppPaths) -> SyncResult<ContentConfigRaw> {
    let config_path = paths.system().content_config();

    if !config_path.exists() {
        return Err(SyncError::MissingConfig(format!(
            "Content config not found at: {}",
            config_path.display()
        )));
    }

    let display_path = config_path.display().to_string();
    let content = std::fs::read_to_string(config_path).map_err(|e| {
        SyncError::internal(format!("Failed to read content config {display_path}: {e}"))
    })?;
    let config: ContentConfigRaw = serde_yaml::from_str(&content)?;
    Ok(config)
}

fn resolve_source_path(path: &str, services_path: &Path) -> std::path::PathBuf {
    let path = Path::new(path);
    if path.is_absolute() {
        path.to_path_buf()
    } else {
        services_path.join(path)
    }
}

systemprompt_provider_contracts::submit_job!(&ContentSyncJob);