Skip to main content

systemprompt_sync/jobs/
content_sync.rs

1use crate::local::{ContentDiffEntry, ContentLocalSync};
2use crate::models::LocalSyncDirection;
3use anyhow::{Context, Result};
4use async_trait::async_trait;
5use std::path::Path;
6use std::sync::Arc;
7use systemprompt_database::DbPool;
8use systemprompt_models::{AppPaths, ContentConfigRaw};
9use systemprompt_traits::{Job, JobContext, JobResult};
10
11#[derive(Debug, Clone, Copy)]
12pub struct ContentSyncJob;
13
14#[async_trait]
15impl Job for ContentSyncJob {
16    fn name(&self) -> &'static str {
17        "content_sync"
18    }
19
20    fn description(&self) -> &'static str {
21        "Synchronize content between database and filesystem"
22    }
23
24    fn schedule(&self) -> &'static str {
25        ""
26    }
27
28    fn tags(&self) -> Vec<&'static str> {
29        vec!["content", "sync"]
30    }
31
32    fn enabled(&self) -> bool {
33        false
34    }
35
36    async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
37        let start_time = std::time::Instant::now();
38
39        let db_pool = Arc::clone(
40            ctx.db_pool::<DbPool>()
41                .ok_or_else(|| anyhow::anyhow!("DbPool not available in job context"))?,
42        );
43
44        tracing::info!("Content sync job started");
45
46        let direction = get_direction_from_params(ctx)?;
47        let delete_orphans = get_bool_param(ctx, "delete_orphans");
48        let override_existing = get_bool_param(ctx, "override_existing");
49
50        let config = load_content_config()?;
51        let paths = AppPaths::get().map_err(|e| anyhow::anyhow!("{}", e))?;
52        let services_path = paths.system().services();
53
54        let sources: Vec<_> = config
55            .content_sources
56            .into_iter()
57            .filter(|(_, source)| source.enabled)
58            .filter(|(_, source)| !source.allowed_content_types.contains(&"skill".to_string()))
59            .collect();
60
61        if sources.is_empty() {
62            let duration_ms = start_time.elapsed().as_millis() as u64;
63            tracing::warn!("No enabled content sources found");
64            return Ok(JobResult::success()
65                .with_message("No enabled content sources")
66                .with_duration(duration_ms));
67        }
68
69        let sync = ContentLocalSync::new(db_pool);
70        let mut all_diffs: Vec<ContentDiffEntry> = Vec::new();
71
72        for (name, source) in sources {
73            let source_path = resolve_source_path(&source.path, services_path);
74
75            let diff = sync
76                .calculate_diff(
77                    source.source_id.as_str(),
78                    &source_path,
79                    &source.allowed_content_types,
80                )
81                .await
82                .context(format!("Failed to calculate diff for source: {}", name))?;
83
84            all_diffs.push(ContentDiffEntry {
85                name,
86                source_id: source.source_id.to_string(),
87                category_id: source.category_id.to_string(),
88                path: source_path,
89                allowed_content_types: source.allowed_content_types.clone(),
90                diff,
91            });
92        }
93
94        let has_changes = all_diffs.iter().any(|e| e.diff.has_changes());
95
96        if !has_changes {
97            let duration_ms = start_time.elapsed().as_millis() as u64;
98            tracing::info!("Content is in sync - no changes needed");
99            return Ok(JobResult::success()
100                .with_message("Content is in sync")
101                .with_stats(0, 0)
102                .with_duration(duration_ms));
103        }
104
105        let result = match direction {
106            LocalSyncDirection::ToDisk => sync.sync_to_disk(&all_diffs, delete_orphans).await?,
107            LocalSyncDirection::ToDatabase => {
108                sync.sync_to_db(&all_diffs, delete_orphans, override_existing)
109                    .await?
110            },
111        };
112
113        let duration_ms = start_time.elapsed().as_millis() as u64;
114
115        tracing::info!(
116            direction = %result.direction,
117            items_synced = result.items_synced,
118            items_deleted = result.items_deleted,
119            items_skipped = result.items_skipped,
120            errors = result.errors.len(),
121            duration_ms,
122            "Content sync job completed"
123        );
124
125        Ok(JobResult::success()
126            .with_stats(result.items_synced as u64, result.errors.len() as u64)
127            .with_duration(duration_ms))
128    }
129}
130
131fn get_direction_from_params(ctx: &JobContext) -> Result<LocalSyncDirection> {
132    let params = ctx.parameters();
133    let direction_str = params.get("direction").map_or("to_db", String::as_str);
134
135    match direction_str {
136        "to_disk" | "to-disk" | "disk" => Ok(LocalSyncDirection::ToDisk),
137        "to_db" | "to-db" | "db" | "to_database" => Ok(LocalSyncDirection::ToDatabase),
138        other => anyhow::bail!("Invalid direction '{}'. Use 'to_disk' or 'to_db'", other),
139    }
140}
141
142fn get_bool_param(ctx: &JobContext, key: &str) -> bool {
143    ctx.parameters()
144        .get(key)
145        .is_some_and(|v| v == "true" || v == "1" || v == "yes")
146}
147
148fn load_content_config() -> Result<ContentConfigRaw> {
149    let paths = AppPaths::get().map_err(|e| anyhow::anyhow!("{}", e))?;
150    let config_path = paths.system().content_config();
151
152    if !config_path.exists() {
153        anyhow::bail!("Content config not found at: {}", config_path.display());
154    }
155
156    let content = std::fs::read_to_string(config_path).context("Failed to read content config")?;
157    let config: ContentConfigRaw =
158        serde_yaml::from_str(&content).context("Failed to parse content config")?;
159    Ok(config)
160}
161
162fn resolve_source_path(path: &str, services_path: &Path) -> std::path::PathBuf {
163    let path = Path::new(path);
164    if path.is_absolute() {
165        path.to_path_buf()
166    } else {
167        services_path.join(path)
168    }
169}
170
171systemprompt_provider_contracts::submit_job!(&ContentSyncJob);