Skip to main content

systemprompt_sync/local/
content_sync.rs

1//! Two-way sync between content stored on disk (markdown + frontmatter) and
2//! the database via the `systemprompt-content` ingestion + repository layers.
3
4use crate::diff::ContentDiffCalculator;
5use crate::error::{SyncError, SyncResult};
6use crate::export::export_content_to_file;
7use crate::models::{ContentDiffResult, LocalSyncDirection, LocalSyncResult};
8use std::path::{Path, PathBuf};
9use systemprompt_content::models::{IngestionOptions, IngestionSource};
10use systemprompt_content::repository::ContentRepository;
11use systemprompt_content::services::IngestionService;
12use systemprompt_database::DbPool;
13use systemprompt_identifiers::{CategoryId, ContentId, SourceId};
14use tracing::info;
15
16#[derive(Debug)]
17pub struct ContentDiffEntry {
18    pub name: String,
19    pub source_id: SourceId,
20    pub category_id: CategoryId,
21    pub path: PathBuf,
22    pub allowed_content_types: Vec<String>,
23    pub diff: ContentDiffResult,
24}
25
26#[derive(Debug)]
27pub struct ContentLocalSync {
28    db: DbPool,
29}
30
31impl ContentLocalSync {
32    pub const fn new(db: DbPool) -> Self {
33        Self { db }
34    }
35
36    pub async fn calculate_diff(
37        &self,
38        source_id: &SourceId,
39        disk_path: &Path,
40        allowed_types: &[String],
41    ) -> SyncResult<ContentDiffResult> {
42        let calculator = ContentDiffCalculator::new(&self.db).map_err(SyncError::internal)?;
43        calculator
44            .calculate_diff(source_id, disk_path, allowed_types)
45            .await
46            .map_err(SyncError::internal)
47    }
48
49    pub async fn sync_to_disk(
50        &self,
51        diffs: &[ContentDiffEntry],
52        delete_orphans: bool,
53    ) -> SyncResult<LocalSyncResult> {
54        let content_repo = ContentRepository::new(&self.db).map_err(SyncError::internal)?;
55        let mut result = LocalSyncResult {
56            direction: LocalSyncDirection::ToDisk,
57            ..Default::default()
58        };
59
60        for entry in diffs {
61            let source_path = &entry.path;
62
63            for item in &entry.diff.modified {
64                match content_repo
65                    .get_by_source_and_slug(&entry.source_id, &item.slug)
66                    .await
67                    .map_err(SyncError::internal)?
68                {
69                    Some(content) => {
70                        export_content_to_file(&content, source_path, &entry.name)?;
71                        result.items_synced += 1;
72                        info!("Exported modified content: {}", item.slug);
73                    },
74                    None => {
75                        result
76                            .errors
77                            .push(format!("Content not found in DB: {}", item.slug));
78                    },
79                }
80            }
81
82            for item in &entry.diff.removed {
83                match content_repo
84                    .get_by_source_and_slug(&entry.source_id, &item.slug)
85                    .await
86                    .map_err(SyncError::internal)?
87                {
88                    Some(content) => {
89                        export_content_to_file(&content, source_path, &entry.name)?;
90                        result.items_synced += 1;
91                        info!("Created content on disk: {}", item.slug);
92                    },
93                    None => {
94                        result
95                            .errors
96                            .push(format!("Content not found in DB: {}", item.slug));
97                    },
98                }
99            }
100
101            if delete_orphans {
102                for item in &entry.diff.added {
103                    let file_path = if entry.name == "blog" {
104                        source_path.join(&item.slug).join("index.md")
105                    } else {
106                        source_path.join(format!("{}.md", item.slug))
107                    };
108
109                    if file_path.exists() {
110                        if entry.name == "blog" {
111                            std::fs::remove_dir_all(source_path.join(&item.slug))?;
112                        } else {
113                            std::fs::remove_file(&file_path)?;
114                        }
115                        result.items_deleted += 1;
116                        info!("Deleted orphan content: {}", item.slug);
117                    }
118                }
119            } else {
120                result.items_skipped += entry.diff.added.len();
121            }
122        }
123
124        Ok(result)
125    }
126
127    pub async fn sync_to_db(
128        &self,
129        diffs: &[ContentDiffEntry],
130        delete_orphans: bool,
131        override_existing: bool,
132    ) -> SyncResult<LocalSyncResult> {
133        let ingestion_service = IngestionService::new(&self.db).map_err(SyncError::internal)?;
134        let content_repo = ContentRepository::new(&self.db).map_err(SyncError::internal)?;
135        let mut result = LocalSyncResult {
136            direction: LocalSyncDirection::ToDatabase,
137            ..Default::default()
138        };
139
140        for entry in diffs {
141            let source_path = &entry.path;
142            let source = IngestionSource::new(&entry.source_id, &entry.name, &entry.category_id);
143            let report = ingestion_service
144                .ingest_directory(
145                    source_path,
146                    &source,
147                    IngestionOptions::default()
148                        .with_recursive(true)
149                        .with_override(override_existing),
150                )
151                .await
152                .map_err(SyncError::internal)?;
153
154            result.items_synced += report.files_processed;
155            result.items_skipped_modified += report.skipped_count;
156
157            for error in report.errors {
158                result.errors.push(error);
159            }
160
161            info!(
162                "Ingested {} files from {}",
163                report.files_processed, entry.name
164            );
165
166            if delete_orphans {
167                for item in &entry.diff.removed {
168                    let content_id = ContentId::new(&item.slug);
169                    content_repo.delete(&content_id).await.map_err(|e| {
170                        SyncError::internal(format!("Failed to delete {}: {e}", item.slug))
171                    })?;
172                    result.items_deleted += 1;
173                    info!("Deleted from DB: {}", item.slug);
174                }
175            } else {
176                result.items_skipped += entry.diff.removed.len();
177            }
178        }
179
180        Ok(result)
181    }
182}