Skip to main content

systemprompt_sync/local/
content_sync.rs

1use crate::diff::ContentDiffCalculator;
2use crate::export::export_content_to_file;
3use crate::models::{ContentDiffResult, LocalSyncDirection, LocalSyncResult};
4use anyhow::{Context, Result};
5use std::path::{Path, PathBuf};
6use systemprompt_content::models::{IngestionOptions, IngestionSource};
7use systemprompt_content::repository::ContentRepository;
8use systemprompt_content::services::IngestionService;
9use systemprompt_database::DbPool;
10use systemprompt_identifiers::{CategoryId, ContentId, SourceId};
11use tracing::info;
12
13#[derive(Debug)]
14pub struct ContentDiffEntry {
15    pub name: String,
16    pub source_id: String,
17    pub category_id: String,
18    pub path: PathBuf,
19    pub allowed_content_types: Vec<String>,
20    pub diff: ContentDiffResult,
21}
22
23#[derive(Debug)]
24pub struct ContentLocalSync {
25    db: DbPool,
26}
27
28impl ContentLocalSync {
29    pub const fn new(db: DbPool) -> Self {
30        Self { db }
31    }
32
33    pub async fn calculate_diff(
34        &self,
35        source_id: &str,
36        disk_path: &Path,
37        allowed_types: &[String],
38    ) -> Result<ContentDiffResult> {
39        let calculator = ContentDiffCalculator::new(&self.db)?;
40        calculator
41            .calculate_diff(source_id, disk_path, allowed_types)
42            .await
43    }
44
45    pub async fn sync_to_disk(
46        &self,
47        diffs: &[ContentDiffEntry],
48        delete_orphans: bool,
49    ) -> Result<LocalSyncResult> {
50        let content_repo = ContentRepository::new(&self.db)?;
51        let mut result = LocalSyncResult {
52            direction: LocalSyncDirection::ToDisk,
53            ..Default::default()
54        };
55
56        for entry in diffs {
57            let source_path = &entry.path;
58
59            for item in &entry.diff.modified {
60                let source_id = SourceId::new(&entry.source_id);
61                match content_repo
62                    .get_by_source_and_slug(&source_id, &item.slug)
63                    .await?
64                {
65                    Some(content) => {
66                        export_content_to_file(&content, source_path, &entry.name)?;
67                        result.items_synced += 1;
68                        info!("Exported modified content: {}", item.slug);
69                    },
70                    None => {
71                        result
72                            .errors
73                            .push(format!("Content not found in DB: {}", item.slug));
74                    },
75                }
76            }
77
78            for item in &entry.diff.removed {
79                let source_id = SourceId::new(&entry.source_id);
80                match content_repo
81                    .get_by_source_and_slug(&source_id, &item.slug)
82                    .await?
83                {
84                    Some(content) => {
85                        export_content_to_file(&content, source_path, &entry.name)?;
86                        result.items_synced += 1;
87                        info!("Created content on disk: {}", item.slug);
88                    },
89                    None => {
90                        result
91                            .errors
92                            .push(format!("Content not found in DB: {}", item.slug));
93                    },
94                }
95            }
96
97            if delete_orphans {
98                for item in &entry.diff.added {
99                    let file_path = if entry.name == "blog" {
100                        source_path.join(&item.slug).join("index.md")
101                    } else {
102                        source_path.join(format!("{}.md", item.slug))
103                    };
104
105                    if file_path.exists() {
106                        if entry.name == "blog" {
107                            std::fs::remove_dir_all(source_path.join(&item.slug))?;
108                        } else {
109                            std::fs::remove_file(&file_path)?;
110                        }
111                        result.items_deleted += 1;
112                        info!("Deleted orphan content: {}", item.slug);
113                    }
114                }
115            } else {
116                result.items_skipped += entry.diff.added.len();
117            }
118        }
119
120        Ok(result)
121    }
122
123    pub async fn sync_to_db(
124        &self,
125        diffs: &[ContentDiffEntry],
126        delete_orphans: bool,
127        override_existing: bool,
128    ) -> Result<LocalSyncResult> {
129        let ingestion_service = IngestionService::new(&self.db)?;
130        let content_repo = ContentRepository::new(&self.db)?;
131        let mut result = LocalSyncResult {
132            direction: LocalSyncDirection::ToDatabase,
133            ..Default::default()
134        };
135
136        for entry in diffs {
137            let source_path = &entry.path;
138            let source_id = SourceId::new(&entry.source_id);
139            let category_id = CategoryId::new(&entry.category_id);
140            let source = IngestionSource::new(&source_id, &entry.name, &category_id);
141            let report = ingestion_service
142                .ingest_directory(
143                    source_path,
144                    &source,
145                    IngestionOptions::default()
146                        .with_recursive(true)
147                        .with_override(override_existing),
148                )
149                .await?;
150
151            result.items_synced += report.files_processed;
152            result.items_skipped_modified += report.skipped_count;
153
154            for error in report.errors {
155                result.errors.push(error);
156            }
157
158            info!(
159                "Ingested {} files from {}",
160                report.files_processed, entry.name
161            );
162
163            if delete_orphans {
164                for item in &entry.diff.removed {
165                    let content_id = ContentId::new(&item.slug);
166                    content_repo
167                        .delete(&content_id)
168                        .await
169                        .context(format!("Failed to delete: {}", item.slug))?;
170                    result.items_deleted += 1;
171                    info!("Deleted from DB: {}", item.slug);
172                }
173            } else {
174                result.items_skipped += entry.diff.removed.len();
175            }
176        }
177
178        Ok(result)
179    }
180}