systemprompt_sync/jobs/
content_sync.rs1use crate::local::{ContentDiffEntry, ContentLocalSync};
2use crate::models::LocalSyncDirection;
3use anyhow::{Context, Result};
4use async_trait::async_trait;
5use std::path::Path;
6use std::sync::Arc;
7use systemprompt_database::DbPool;
8use systemprompt_models::{AppPaths, ContentConfigRaw};
9use systemprompt_traits::{Job, JobContext, JobResult};
10
11#[derive(Debug, Clone, Copy)]
12pub struct ContentSyncJob;
13
14#[async_trait]
15impl Job for ContentSyncJob {
16 fn name(&self) -> &'static str {
17 "content_sync"
18 }
19
20 fn description(&self) -> &'static str {
21 "Synchronize content between database and filesystem"
22 }
23
24 fn schedule(&self) -> &'static str {
25 ""
26 }
27
28 fn tags(&self) -> Vec<&'static str> {
29 vec!["content", "sync"]
30 }
31
32 fn enabled(&self) -> bool {
33 false
34 }
35
36 async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
37 let start_time = std::time::Instant::now();
38
39 let db_pool = Arc::clone(
40 ctx.db_pool::<DbPool>()
41 .ok_or_else(|| anyhow::anyhow!("DbPool not available in job context"))?,
42 );
43
44 tracing::info!("Content sync job started");
45
46 let direction = get_direction_from_params(ctx)?;
47 let delete_orphans = get_bool_param(ctx, "delete_orphans");
48 let override_existing = get_bool_param(ctx, "override_existing");
49
50 let config = load_content_config()?;
51 let paths = AppPaths::get().map_err(|e| anyhow::anyhow!("{}", e))?;
52 let services_path = paths.system().services();
53
54 let sources: Vec<_> = config
55 .content_sources
56 .into_iter()
57 .filter(|(_, source)| source.enabled)
58 .filter(|(_, source)| !source.allowed_content_types.contains(&"skill".to_string()))
59 .collect();
60
61 if sources.is_empty() {
62 let duration_ms = start_time.elapsed().as_millis() as u64;
63 tracing::warn!("No enabled content sources found");
64 return Ok(JobResult::success()
65 .with_message("No enabled content sources")
66 .with_duration(duration_ms));
67 }
68
69 let sync = ContentLocalSync::new(db_pool);
70 let mut all_diffs: Vec<ContentDiffEntry> = Vec::new();
71
72 for (name, source) in sources {
73 let source_path = resolve_source_path(&source.path, services_path);
74
75 let diff = sync
76 .calculate_diff(
77 source.source_id.as_str(),
78 &source_path,
79 &source.allowed_content_types,
80 )
81 .await
82 .context(format!("Failed to calculate diff for source: {}", name))?;
83
84 all_diffs.push(ContentDiffEntry {
85 name,
86 source_id: source.source_id.to_string(),
87 category_id: source.category_id.to_string(),
88 path: source_path,
89 allowed_content_types: source.allowed_content_types.clone(),
90 diff,
91 });
92 }
93
94 let has_changes = all_diffs.iter().any(|e| e.diff.has_changes());
95
96 if !has_changes {
97 let duration_ms = start_time.elapsed().as_millis() as u64;
98 tracing::info!("Content is in sync - no changes needed");
99 return Ok(JobResult::success()
100 .with_message("Content is in sync")
101 .with_stats(0, 0)
102 .with_duration(duration_ms));
103 }
104
105 let result = match direction {
106 LocalSyncDirection::ToDisk => sync.sync_to_disk(&all_diffs, delete_orphans).await?,
107 LocalSyncDirection::ToDatabase => {
108 sync.sync_to_db(&all_diffs, delete_orphans, override_existing)
109 .await?
110 },
111 };
112
113 let duration_ms = start_time.elapsed().as_millis() as u64;
114
115 tracing::info!(
116 direction = %result.direction,
117 items_synced = result.items_synced,
118 items_deleted = result.items_deleted,
119 items_skipped = result.items_skipped,
120 errors = result.errors.len(),
121 duration_ms,
122 "Content sync job completed"
123 );
124
125 Ok(JobResult::success()
126 .with_stats(result.items_synced as u64, result.errors.len() as u64)
127 .with_duration(duration_ms))
128 }
129}
130
131fn get_direction_from_params(ctx: &JobContext) -> Result<LocalSyncDirection> {
132 let params = ctx.parameters();
133 let direction_str = params.get("direction").map_or("to_db", String::as_str);
134
135 match direction_str {
136 "to_disk" | "to-disk" | "disk" => Ok(LocalSyncDirection::ToDisk),
137 "to_db" | "to-db" | "db" | "to_database" => Ok(LocalSyncDirection::ToDatabase),
138 other => anyhow::bail!("Invalid direction '{}'. Use 'to_disk' or 'to_db'", other),
139 }
140}
141
142fn get_bool_param(ctx: &JobContext, key: &str) -> bool {
143 ctx.parameters()
144 .get(key)
145 .is_some_and(|v| v == "true" || v == "1" || v == "yes")
146}
147
148fn load_content_config() -> Result<ContentConfigRaw> {
149 let paths = AppPaths::get().map_err(|e| anyhow::anyhow!("{}", e))?;
150 let config_path = paths.system().content_config();
151
152 if !config_path.exists() {
153 anyhow::bail!("Content config not found at: {}", config_path.display());
154 }
155
156 let content = std::fs::read_to_string(config_path).context("Failed to read content config")?;
157 let config: ContentConfigRaw =
158 serde_yaml::from_str(&content).context("Failed to parse content config")?;
159 Ok(config)
160}
161
162fn resolve_source_path(path: &str, services_path: &Path) -> std::path::PathBuf {
163 let path = Path::new(path);
164 if path.is_absolute() {
165 path.to_path_buf()
166 } else {
167 services_path.join(path)
168 }
169}
170
171systemprompt_provider_contracts::submit_job!(&ContentSyncJob);