ricecoder_orchestration/managers/
sync_manager.rs

1//! Synchronization manager for cross-project updates
2
3use crate::error::{OrchestrationError, Result};
4use crate::models::{Operation, Transaction, TransactionState};
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use tracing::{debug, info};
9use uuid::Uuid;
10
11#[derive(Debug, Clone)]
12pub struct SyncConflict {
13    pub project: String,
14    pub description: String,
15    pub resolution_strategy: ConflictResolution,
16}
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum ConflictResolution {
20    KeepExisting,
21    UseNew,
22    Manual,
23    Merge,
24}
25
26#[derive(Debug, Clone)]
27pub struct SyncLogEntry {
28    pub timestamp: String,
29    pub project: String,
30    pub operation: String,
31    pub status: String,
32    pub details: String,
33}
34
35pub struct SyncManager {
36    transactions: Arc<RwLock<HashMap<String, Transaction>>>,
37    sync_log: Arc<RwLock<Vec<SyncLogEntry>>>,
38    conflicts: Arc<RwLock<Vec<SyncConflict>>>,
39}
40
41impl SyncManager {
42    pub fn new() -> Self {
43        debug!("Creating SyncManager");
44        Self {
45            transactions: Arc::new(RwLock::new(HashMap::new())),
46            sync_log: Arc::new(RwLock::new(Vec::new())),
47            conflicts: Arc::new(RwLock::new(Vec::new())),
48        }
49    }
50
51    pub async fn start_transaction(&self, operations: Vec<Operation>) -> Result<String> {
52        let txn_id = Uuid::new_v4().to_string();
53        info!("Starting transaction: {}", txn_id);
54
55        let transaction = Transaction {
56            id: txn_id.clone(),
57            operations,
58            state: TransactionState::Pending,
59        };
60
61        self.transactions
62            .write()
63            .await
64            .insert(txn_id.clone(), transaction);
65
66        self.log_operation(
67            "system",
68            "transaction_start",
69            "success",
70            &format!("Transaction {} started", txn_id),
71        )
72        .await;
73
74        Ok(txn_id)
75    }
76
77    pub async fn get_transaction(&self, txn_id: &str) -> Result<Option<Transaction>> {
78        Ok(self.transactions.read().await.get(txn_id).cloned())
79    }
80
81    pub async fn commit_transaction(&self, txn_id: &str) -> Result<()> {
82        info!("Committing transaction: {}", txn_id);
83
84        {
85            let mut transactions = self.transactions.write().await;
86
87            let transaction = transactions
88                .get_mut(txn_id)
89                .ok_or_else(|| OrchestrationError::TransactionFailed(format!(
90                    "Transaction not found: {}",
91                    txn_id
92                )))?;
93
94            if transaction.state != TransactionState::Pending {
95                return Err(OrchestrationError::TransactionFailed(format!(
96                    "Cannot commit transaction in state: {:?}",
97                    transaction.state
98                )));
99            }
100
101            for operation in &transaction.operations {
102                self.validate_operation(operation).await?;
103            }
104
105            transaction.state = TransactionState::Committed;
106        }
107
108        self.log_operation(
109            "system",
110            "transaction_commit",
111            "success",
112            &format!("Transaction {} committed", txn_id),
113        )
114        .await;
115
116        Ok(())
117    }
118
119    pub async fn rollback_transaction(&self, txn_id: &str) -> Result<()> {
120        info!("Rolling back transaction: {}", txn_id);
121
122        {
123            let mut transactions = self.transactions.write().await;
124
125            let transaction = transactions
126                .get_mut(txn_id)
127                .ok_or_else(|| OrchestrationError::RollbackFailed(format!(
128                    "Transaction not found: {}",
129                    txn_id
130                )))?;
131
132            if transaction.state == TransactionState::RolledBack {
133                return Ok(());
134            }
135
136            for operation in transaction.operations.iter().rev() {
137                self.revert_operation(operation).await?;
138            }
139
140            transaction.state = TransactionState::RolledBack;
141        }
142
143        self.log_operation(
144            "system",
145            "transaction_rollback",
146            "success",
147            &format!("Transaction {} rolled back", txn_id),
148        )
149        .await;
150
151        Ok(())
152    }
153
154    pub async fn sync_configuration(
155        &self,
156        source_project: &str,
157        target_projects: &[String],
158        config_data: serde_json::Value,
159    ) -> Result<()> {
160        info!(
161            "Synchronizing configuration from {} to {} projects",
162            source_project,
163            target_projects.len()
164        );
165
166        for target in target_projects {
167            self.log_operation(
168                target,
169                "config_sync",
170                "in_progress",
171                &format!("Syncing config from {}", source_project),
172            )
173            .await;
174
175            if let Err(e) = self.check_config_conflicts(target, &config_data).await {
176                self.conflicts.write().await.push(SyncConflict {
177                    project: target.clone(),
178                    description: e.to_string(),
179                    resolution_strategy: ConflictResolution::Manual,
180                });
181
182                self.log_operation(
183                    target,
184                    "config_sync",
185                    "conflict",
186                    &format!("Conflict detected: {}", e),
187                )
188                .await;
189
190                continue;
191            }
192
193            self.log_operation(
194                target,
195                "config_sync",
196                "success",
197                "Configuration synchronized",
198            )
199            .await;
200        }
201
202        Ok(())
203    }
204
205    pub async fn sync_version_updates(
206        &self,
207        source_project: &str,
208        new_version: &str,
209        dependent_projects: &[String],
210    ) -> Result<()> {
211        info!(
212            "Synchronizing version {} from {} to {} projects",
213            new_version,
214            source_project,
215            dependent_projects.len()
216        );
217
218        for target in dependent_projects {
219            self.log_operation(
220                target,
221                "version_sync",
222                "in_progress",
223                &format!("Updating to version {} from {}", new_version, source_project),
224            )
225            .await;
226
227            if let Err(e) = self.validate_version_compatibility(target, new_version).await {
228                self.log_operation(
229                    target,
230                    "version_sync",
231                    "failed",
232                    &format!("Version compatibility check failed: {}", e),
233                )
234                .await;
235
236                return Err(e);
237            }
238
239            self.log_operation(
240                target,
241                "version_sync",
242                "success",
243                &format!("Version updated to {}", new_version),
244            )
245            .await;
246        }
247
248        Ok(())
249    }
250
251    pub async fn detect_conflicts(
252        &self,
253        project: &str,
254        incoming_data: &serde_json::Value,
255    ) -> Result<Vec<SyncConflict>> {
256        debug!("Detecting conflicts for project: {}", project);
257
258        let mut detected_conflicts = Vec::new();
259
260        if let Err(e) = self.check_config_conflicts(project, incoming_data).await {
261            detected_conflicts.push(SyncConflict {
262                project: project.to_string(),
263                description: e.to_string(),
264                resolution_strategy: ConflictResolution::Manual,
265            });
266        }
267
268        Ok(detected_conflicts)
269    }
270
271    pub async fn resolve_conflict(
272        &self,
273        conflict: &SyncConflict,
274        strategy: ConflictResolution,
275    ) -> Result<()> {
276        info!(
277            "Resolving conflict in {} using strategy: {:?}",
278            conflict.project, strategy
279        );
280
281        self.log_operation(
282            &conflict.project,
283            "conflict_resolution",
284            "success",
285            &format!("Conflict resolved using strategy: {:?}", strategy),
286        )
287        .await;
288
289        let mut conflicts = self.conflicts.write().await;
290        conflicts.retain(|c| c.project != conflict.project || c.description != conflict.description);
291
292        Ok(())
293    }
294
295    pub async fn get_conflicts(&self) -> Vec<SyncConflict> {
296        self.conflicts.read().await.clone()
297    }
298
299    pub async fn get_sync_log(&self) -> Vec<SyncLogEntry> {
300        self.sync_log.read().await.clone()
301    }
302
303    pub async fn clear_sync_log(&self) {
304        self.sync_log.write().await.clear();
305    }
306
307    async fn log_operation(
308        &self,
309        project: &str,
310        operation: &str,
311        status: &str,
312        details: &str,
313    ) {
314        let entry = SyncLogEntry {
315            timestamp: chrono::Utc::now().to_rfc3339(),
316            project: project.to_string(),
317            operation: operation.to_string(),
318            status: status.to_string(),
319            details: details.to_string(),
320        };
321
322        self.sync_log.write().await.push(entry);
323    }
324
325    async fn validate_operation(&self, _operation: &Operation) -> Result<()> {
326        debug!("Validating operation");
327        Ok(())
328    }
329
330    async fn revert_operation(&self, _operation: &Operation) -> Result<()> {
331        debug!("Reverting operation");
332        Ok(())
333    }
334
335    async fn check_config_conflicts(
336        &self,
337        _project: &str,
338        _config_data: &serde_json::Value,
339    ) -> Result<()> {
340        Ok(())
341    }
342
343    async fn validate_version_compatibility(
344        &self,
345        _project: &str,
346        _version: &str,
347    ) -> Result<()> {
348        Ok(())
349    }
350}
351
352impl Default for SyncManager {
353    fn default() -> Self {
354        Self::new()
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361
362    #[tokio::test]
363    async fn test_sync_manager_creation() {
364        let manager = SyncManager::new();
365        assert_eq!(manager.get_conflicts().await.len(), 0);
366    }
367
368    #[tokio::test]
369    async fn test_start_transaction() {
370        let manager = SyncManager::new();
371        let txn_id = manager.start_transaction(vec![]).await.unwrap();
372        assert!(!txn_id.is_empty());
373    }
374
375    #[tokio::test]
376    async fn test_commit_transaction() {
377        let manager = SyncManager::new();
378        let txn_id = manager.start_transaction(vec![]).await.unwrap();
379        manager.commit_transaction(&txn_id).await.unwrap();
380        let txn = manager.get_transaction(&txn_id).await.unwrap();
381        assert_eq!(txn.unwrap().state, TransactionState::Committed);
382    }
383
384    #[tokio::test]
385    async fn test_rollback_transaction() {
386        let manager = SyncManager::new();
387        let txn_id = manager.start_transaction(vec![]).await.unwrap();
388        manager.rollback_transaction(&txn_id).await.unwrap();
389        let txn = manager.get_transaction(&txn_id).await.unwrap();
390        assert_eq!(txn.unwrap().state, TransactionState::RolledBack);
391    }
392}