Skip to main content

systemprompt_sync/local/
agents_sync.rs

1//! Two-way sync between agents stored on disk (one directory per agent) and
2//! the database via the `systemprompt-agent` ingestion + repository layers.
3
4use crate::diff::AgentsDiffCalculator;
5use crate::error::{SyncError, SyncResult};
6use crate::export::export_agent_to_disk;
7use crate::models::{AgentsDiffResult, LocalSyncDirection, LocalSyncResult};
8use std::path::PathBuf;
9use systemprompt_agent::repository::content::AgentRepository;
10use systemprompt_agent::services::AgentIngestionService;
11use systemprompt_database::DbPool;
12use systemprompt_identifiers::SourceId;
13use tracing::info;
14
15#[derive(Debug)]
16pub struct AgentsLocalSync {
17    db: DbPool,
18    agents_path: PathBuf,
19}
20
21impl AgentsLocalSync {
22    pub const fn new(db: DbPool, agents_path: PathBuf) -> Self {
23        Self { db, agents_path }
24    }
25
26    pub async fn calculate_diff(&self) -> SyncResult<AgentsDiffResult> {
27        let calculator = AgentsDiffCalculator::new(&self.db).map_err(SyncError::internal)?;
28        calculator
29            .calculate_diff(&self.agents_path)
30            .await
31            .map_err(SyncError::internal)
32    }
33
34    pub async fn sync_to_disk(
35        &self,
36        diff: &AgentsDiffResult,
37        delete_orphans: bool,
38    ) -> SyncResult<LocalSyncResult> {
39        let agent_repo = AgentRepository::new(&self.db).map_err(SyncError::internal)?;
40        let mut result = LocalSyncResult {
41            direction: LocalSyncDirection::ToDisk,
42            ..Default::default()
43        };
44
45        for item in &diff.modified {
46            match agent_repo
47                .get_by_agent_id(&item.agent_id)
48                .await
49                .map_err(SyncError::internal)?
50            {
51                Some(agent) => {
52                    export_agent_to_disk(&agent, &self.agents_path)?;
53                    result.items_synced += 1;
54                    info!("Exported modified agent: {}", item.agent_id);
55                },
56                None => {
57                    result
58                        .errors
59                        .push(format!("Agent not found in DB: {}", item.agent_id));
60                },
61            }
62        }
63
64        for item in &diff.removed {
65            match agent_repo
66                .get_by_agent_id(&item.agent_id)
67                .await
68                .map_err(SyncError::internal)?
69            {
70                Some(agent) => {
71                    export_agent_to_disk(&agent, &self.agents_path)?;
72                    result.items_synced += 1;
73                    info!("Created agent on disk: {}", item.agent_id);
74                },
75                None => {
76                    result
77                        .errors
78                        .push(format!("Agent not found in DB: {}", item.agent_id));
79                },
80            }
81        }
82
83        if delete_orphans {
84            for item in &diff.added {
85                let agent_dir = self.agents_path.join(&item.name);
86
87                if agent_dir.exists() {
88                    std::fs::remove_dir_all(&agent_dir)?;
89                    result.items_deleted += 1;
90                    info!("Deleted orphan agent: {}", item.agent_id);
91                }
92            }
93        } else {
94            result.items_skipped += diff.added.len();
95        }
96
97        Ok(result)
98    }
99
100    pub async fn sync_to_db(
101        &self,
102        diff: &AgentsDiffResult,
103        delete_orphans: bool,
104    ) -> SyncResult<LocalSyncResult> {
105        let ingestion_service =
106            AgentIngestionService::new(&self.db).map_err(SyncError::internal)?;
107        let mut result = LocalSyncResult {
108            direction: LocalSyncDirection::ToDatabase,
109            ..Default::default()
110        };
111
112        let source_id = SourceId::new("agents");
113        let report = ingestion_service
114            .ingest_directory(&self.agents_path, source_id, true)
115            .await
116            .map_err(SyncError::internal)?;
117
118        result.items_synced += report.files_processed;
119
120        for error in report.errors {
121            result.errors.push(error);
122        }
123
124        info!("Ingested {} agents", report.files_processed);
125
126        if delete_orphans && !diff.removed.is_empty() {
127            tracing::warn!(
128                count = diff.removed.len(),
129                "Agent deletion from database not supported, skipping orphan removal"
130            );
131        }
132        result.items_skipped += diff.removed.len();
133
134        Ok(result)
135    }
136}