systemprompt_sync/local/
content_sync.rs1use crate::diff::ContentDiffCalculator;
5use crate::error::{SyncError, SyncResult};
6use crate::export::export_content_to_file;
7use crate::models::{ContentDiffResult, LocalSyncDirection, LocalSyncResult};
8use std::path::{Path, PathBuf};
9use systemprompt_content::models::{IngestionOptions, IngestionSource};
10use systemprompt_content::repository::ContentRepository;
11use systemprompt_content::services::IngestionService;
12use systemprompt_database::DbPool;
13use systemprompt_identifiers::{CategoryId, ContentId, SourceId};
14use tracing::info;
15
16#[derive(Debug)]
17pub struct ContentDiffEntry {
18 pub name: String,
19 pub source_id: SourceId,
20 pub category_id: CategoryId,
21 pub path: PathBuf,
22 pub allowed_content_types: Vec<String>,
23 pub diff: ContentDiffResult,
24}
25
26#[derive(Debug)]
27pub struct ContentLocalSync {
28 db: DbPool,
29}
30
31impl ContentLocalSync {
32 pub const fn new(db: DbPool) -> Self {
33 Self { db }
34 }
35
36 pub async fn calculate_diff(
37 &self,
38 source_id: &SourceId,
39 disk_path: &Path,
40 allowed_types: &[String],
41 ) -> SyncResult<ContentDiffResult> {
42 let calculator = ContentDiffCalculator::new(&self.db).map_err(SyncError::internal)?;
43 calculator
44 .calculate_diff(source_id, disk_path, allowed_types)
45 .await
46 .map_err(SyncError::internal)
47 }
48
49 pub async fn sync_to_disk(
50 &self,
51 diffs: &[ContentDiffEntry],
52 delete_orphans: bool,
53 ) -> SyncResult<LocalSyncResult> {
54 let content_repo = ContentRepository::new(&self.db).map_err(SyncError::internal)?;
55 let mut result = LocalSyncResult {
56 direction: LocalSyncDirection::ToDisk,
57 ..Default::default()
58 };
59
60 for entry in diffs {
61 let source_path = &entry.path;
62
63 for item in &entry.diff.modified {
64 match content_repo
65 .get_by_source_and_slug(&entry.source_id, &item.slug)
66 .await
67 .map_err(SyncError::internal)?
68 {
69 Some(content) => {
70 export_content_to_file(&content, source_path, &entry.name)?;
71 result.items_synced += 1;
72 info!("Exported modified content: {}", item.slug);
73 },
74 None => {
75 result
76 .errors
77 .push(format!("Content not found in DB: {}", item.slug));
78 },
79 }
80 }
81
82 for item in &entry.diff.removed {
83 match content_repo
84 .get_by_source_and_slug(&entry.source_id, &item.slug)
85 .await
86 .map_err(SyncError::internal)?
87 {
88 Some(content) => {
89 export_content_to_file(&content, source_path, &entry.name)?;
90 result.items_synced += 1;
91 info!("Created content on disk: {}", item.slug);
92 },
93 None => {
94 result
95 .errors
96 .push(format!("Content not found in DB: {}", item.slug));
97 },
98 }
99 }
100
101 if delete_orphans {
102 for item in &entry.diff.added {
103 let file_path = if entry.name == "blog" {
104 source_path.join(&item.slug).join("index.md")
105 } else {
106 source_path.join(format!("{}.md", item.slug))
107 };
108
109 if file_path.exists() {
110 if entry.name == "blog" {
111 std::fs::remove_dir_all(source_path.join(&item.slug))?;
112 } else {
113 std::fs::remove_file(&file_path)?;
114 }
115 result.items_deleted += 1;
116 info!("Deleted orphan content: {}", item.slug);
117 }
118 }
119 } else {
120 result.items_skipped += entry.diff.added.len();
121 }
122 }
123
124 Ok(result)
125 }
126
127 pub async fn sync_to_db(
128 &self,
129 diffs: &[ContentDiffEntry],
130 delete_orphans: bool,
131 override_existing: bool,
132 ) -> SyncResult<LocalSyncResult> {
133 let ingestion_service = IngestionService::new(&self.db).map_err(SyncError::internal)?;
134 let content_repo = ContentRepository::new(&self.db).map_err(SyncError::internal)?;
135 let mut result = LocalSyncResult {
136 direction: LocalSyncDirection::ToDatabase,
137 ..Default::default()
138 };
139
140 for entry in diffs {
141 let source_path = &entry.path;
142 let source = IngestionSource::new(&entry.source_id, &entry.name, &entry.category_id);
143 let report = ingestion_service
144 .ingest_directory(
145 source_path,
146 &source,
147 IngestionOptions::default()
148 .with_recursive(true)
149 .with_override(override_existing),
150 )
151 .await
152 .map_err(SyncError::internal)?;
153
154 result.items_synced += report.files_processed;
155 result.items_skipped_modified += report.skipped_count;
156
157 for error in report.errors {
158 result.errors.push(error);
159 }
160
161 info!(
162 "Ingested {} files from {}",
163 report.files_processed, entry.name
164 );
165
166 if delete_orphans {
167 for item in &entry.diff.removed {
168 let content_id = ContentId::new(&item.slug);
169 content_repo.delete(&content_id).await.map_err(|e| {
170 SyncError::internal(format!("Failed to delete {}: {e}", item.slug))
171 })?;
172 result.items_deleted += 1;
173 info!("Deleted from DB: {}", item.slug);
174 }
175 } else {
176 result.items_skipped += entry.diff.removed.len();
177 }
178 }
179
180 Ok(result)
181 }
182}