systemprompt_sync/local/
content_sync.rs1use crate::diff::ContentDiffCalculator;
2use crate::export::export_content_to_file;
3use crate::models::{ContentDiffResult, LocalSyncDirection, LocalSyncResult};
4use anyhow::{Context, Result};
5use std::path::{Path, PathBuf};
6use systemprompt_content::models::{IngestionOptions, IngestionSource};
7use systemprompt_content::repository::ContentRepository;
8use systemprompt_content::services::IngestionService;
9use systemprompt_database::DbPool;
10use systemprompt_identifiers::{CategoryId, ContentId, SourceId};
11use tracing::info;
12
13#[derive(Debug)]
14pub struct ContentDiffEntry {
15 pub name: String,
16 pub source_id: String,
17 pub category_id: String,
18 pub path: PathBuf,
19 pub allowed_content_types: Vec<String>,
20 pub diff: ContentDiffResult,
21}
22
23#[derive(Debug)]
24pub struct ContentLocalSync {
25 db: DbPool,
26}
27
28impl ContentLocalSync {
29 pub const fn new(db: DbPool) -> Self {
30 Self { db }
31 }
32
33 pub async fn calculate_diff(
34 &self,
35 source_id: &str,
36 disk_path: &Path,
37 allowed_types: &[String],
38 ) -> Result<ContentDiffResult> {
39 let calculator = ContentDiffCalculator::new(&self.db)?;
40 calculator
41 .calculate_diff(source_id, disk_path, allowed_types)
42 .await
43 }
44
45 pub async fn sync_to_disk(
46 &self,
47 diffs: &[ContentDiffEntry],
48 delete_orphans: bool,
49 ) -> Result<LocalSyncResult> {
50 let content_repo = ContentRepository::new(&self.db)?;
51 let mut result = LocalSyncResult {
52 direction: LocalSyncDirection::ToDisk,
53 ..Default::default()
54 };
55
56 for entry in diffs {
57 let source_path = &entry.path;
58
59 for item in &entry.diff.modified {
60 let source_id = SourceId::new(&entry.source_id);
61 match content_repo
62 .get_by_source_and_slug(&source_id, &item.slug)
63 .await?
64 {
65 Some(content) => {
66 export_content_to_file(&content, source_path, &entry.name)?;
67 result.items_synced += 1;
68 info!("Exported modified content: {}", item.slug);
69 },
70 None => {
71 result
72 .errors
73 .push(format!("Content not found in DB: {}", item.slug));
74 },
75 }
76 }
77
78 for item in &entry.diff.removed {
79 let source_id = SourceId::new(&entry.source_id);
80 match content_repo
81 .get_by_source_and_slug(&source_id, &item.slug)
82 .await?
83 {
84 Some(content) => {
85 export_content_to_file(&content, source_path, &entry.name)?;
86 result.items_synced += 1;
87 info!("Created content on disk: {}", item.slug);
88 },
89 None => {
90 result
91 .errors
92 .push(format!("Content not found in DB: {}", item.slug));
93 },
94 }
95 }
96
97 if delete_orphans {
98 for item in &entry.diff.added {
99 let file_path = if entry.name == "blog" {
100 source_path.join(&item.slug).join("index.md")
101 } else {
102 source_path.join(format!("{}.md", item.slug))
103 };
104
105 if file_path.exists() {
106 if entry.name == "blog" {
107 std::fs::remove_dir_all(source_path.join(&item.slug))?;
108 } else {
109 std::fs::remove_file(&file_path)?;
110 }
111 result.items_deleted += 1;
112 info!("Deleted orphan content: {}", item.slug);
113 }
114 }
115 } else {
116 result.items_skipped += entry.diff.added.len();
117 }
118 }
119
120 Ok(result)
121 }
122
123 pub async fn sync_to_db(
124 &self,
125 diffs: &[ContentDiffEntry],
126 delete_orphans: bool,
127 override_existing: bool,
128 ) -> Result<LocalSyncResult> {
129 let ingestion_service = IngestionService::new(&self.db)?;
130 let content_repo = ContentRepository::new(&self.db)?;
131 let mut result = LocalSyncResult {
132 direction: LocalSyncDirection::ToDatabase,
133 ..Default::default()
134 };
135
136 for entry in diffs {
137 let source_path = &entry.path;
138 let source_id = SourceId::new(&entry.source_id);
139 let category_id = CategoryId::new(&entry.category_id);
140 let source = IngestionSource::new(&source_id, &entry.name, &category_id);
141 let report = ingestion_service
142 .ingest_directory(
143 source_path,
144 &source,
145 IngestionOptions::default()
146 .with_recursive(true)
147 .with_override(override_existing),
148 )
149 .await?;
150
151 result.items_synced += report.files_processed;
152 result.items_skipped_modified += report.skipped_count;
153
154 for error in report.errors {
155 result.errors.push(error);
156 }
157
158 info!(
159 "Ingested {} files from {}",
160 report.files_processed, entry.name
161 );
162
163 if delete_orphans {
164 for item in &entry.diff.removed {
165 let content_id = ContentId::new(&item.slug);
166 content_repo
167 .delete(&content_id)
168 .await
169 .context(format!("Failed to delete: {}", item.slug))?;
170 result.items_deleted += 1;
171 info!("Deleted from DB: {}", item.slug);
172 }
173 } else {
174 result.items_skipped += entry.diff.removed.len();
175 }
176 }
177
178 Ok(result)
179 }
180}