ricecoder_orchestration/managers/
sync_manager.rs1use crate::error::{OrchestrationError, Result};
4use crate::models::{Operation, Transaction, TransactionState};
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use tracing::{debug, info};
9use uuid::Uuid;
10
11#[derive(Debug, Clone)]
12pub struct SyncConflict {
13 pub project: String,
14 pub description: String,
15 pub resolution_strategy: ConflictResolution,
16}
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum ConflictResolution {
20 KeepExisting,
21 UseNew,
22 Manual,
23 Merge,
24}
25
26#[derive(Debug, Clone)]
27pub struct SyncLogEntry {
28 pub timestamp: String,
29 pub project: String,
30 pub operation: String,
31 pub status: String,
32 pub details: String,
33}
34
35pub struct SyncManager {
36 transactions: Arc<RwLock<HashMap<String, Transaction>>>,
37 sync_log: Arc<RwLock<Vec<SyncLogEntry>>>,
38 conflicts: Arc<RwLock<Vec<SyncConflict>>>,
39}
40
41impl SyncManager {
42 pub fn new() -> Self {
43 debug!("Creating SyncManager");
44 Self {
45 transactions: Arc::new(RwLock::new(HashMap::new())),
46 sync_log: Arc::new(RwLock::new(Vec::new())),
47 conflicts: Arc::new(RwLock::new(Vec::new())),
48 }
49 }
50
51 pub async fn start_transaction(&self, operations: Vec<Operation>) -> Result<String> {
52 let txn_id = Uuid::new_v4().to_string();
53 info!("Starting transaction: {}", txn_id);
54
55 let transaction = Transaction {
56 id: txn_id.clone(),
57 operations,
58 state: TransactionState::Pending,
59 };
60
61 self.transactions
62 .write()
63 .await
64 .insert(txn_id.clone(), transaction);
65
66 self.log_operation(
67 "system",
68 "transaction_start",
69 "success",
70 &format!("Transaction {} started", txn_id),
71 )
72 .await;
73
74 Ok(txn_id)
75 }
76
77 pub async fn get_transaction(&self, txn_id: &str) -> Result<Option<Transaction>> {
78 Ok(self.transactions.read().await.get(txn_id).cloned())
79 }
80
81 pub async fn commit_transaction(&self, txn_id: &str) -> Result<()> {
82 info!("Committing transaction: {}", txn_id);
83
84 {
85 let mut transactions = self.transactions.write().await;
86
87 let transaction = transactions
88 .get_mut(txn_id)
89 .ok_or_else(|| OrchestrationError::TransactionFailed(format!(
90 "Transaction not found: {}",
91 txn_id
92 )))?;
93
94 if transaction.state != TransactionState::Pending {
95 return Err(OrchestrationError::TransactionFailed(format!(
96 "Cannot commit transaction in state: {:?}",
97 transaction.state
98 )));
99 }
100
101 for operation in &transaction.operations {
102 self.validate_operation(operation).await?;
103 }
104
105 transaction.state = TransactionState::Committed;
106 }
107
108 self.log_operation(
109 "system",
110 "transaction_commit",
111 "success",
112 &format!("Transaction {} committed", txn_id),
113 )
114 .await;
115
116 Ok(())
117 }
118
119 pub async fn rollback_transaction(&self, txn_id: &str) -> Result<()> {
120 info!("Rolling back transaction: {}", txn_id);
121
122 {
123 let mut transactions = self.transactions.write().await;
124
125 let transaction = transactions
126 .get_mut(txn_id)
127 .ok_or_else(|| OrchestrationError::RollbackFailed(format!(
128 "Transaction not found: {}",
129 txn_id
130 )))?;
131
132 if transaction.state == TransactionState::RolledBack {
133 return Ok(());
134 }
135
136 for operation in transaction.operations.iter().rev() {
137 self.revert_operation(operation).await?;
138 }
139
140 transaction.state = TransactionState::RolledBack;
141 }
142
143 self.log_operation(
144 "system",
145 "transaction_rollback",
146 "success",
147 &format!("Transaction {} rolled back", txn_id),
148 )
149 .await;
150
151 Ok(())
152 }
153
154 pub async fn sync_configuration(
155 &self,
156 source_project: &str,
157 target_projects: &[String],
158 config_data: serde_json::Value,
159 ) -> Result<()> {
160 info!(
161 "Synchronizing configuration from {} to {} projects",
162 source_project,
163 target_projects.len()
164 );
165
166 for target in target_projects {
167 self.log_operation(
168 target,
169 "config_sync",
170 "in_progress",
171 &format!("Syncing config from {}", source_project),
172 )
173 .await;
174
175 if let Err(e) = self.check_config_conflicts(target, &config_data).await {
176 self.conflicts.write().await.push(SyncConflict {
177 project: target.clone(),
178 description: e.to_string(),
179 resolution_strategy: ConflictResolution::Manual,
180 });
181
182 self.log_operation(
183 target,
184 "config_sync",
185 "conflict",
186 &format!("Conflict detected: {}", e),
187 )
188 .await;
189
190 continue;
191 }
192
193 self.log_operation(
194 target,
195 "config_sync",
196 "success",
197 "Configuration synchronized",
198 )
199 .await;
200 }
201
202 Ok(())
203 }
204
205 pub async fn sync_version_updates(
206 &self,
207 source_project: &str,
208 new_version: &str,
209 dependent_projects: &[String],
210 ) -> Result<()> {
211 info!(
212 "Synchronizing version {} from {} to {} projects",
213 new_version,
214 source_project,
215 dependent_projects.len()
216 );
217
218 for target in dependent_projects {
219 self.log_operation(
220 target,
221 "version_sync",
222 "in_progress",
223 &format!("Updating to version {} from {}", new_version, source_project),
224 )
225 .await;
226
227 if let Err(e) = self.validate_version_compatibility(target, new_version).await {
228 self.log_operation(
229 target,
230 "version_sync",
231 "failed",
232 &format!("Version compatibility check failed: {}", e),
233 )
234 .await;
235
236 return Err(e);
237 }
238
239 self.log_operation(
240 target,
241 "version_sync",
242 "success",
243 &format!("Version updated to {}", new_version),
244 )
245 .await;
246 }
247
248 Ok(())
249 }
250
251 pub async fn detect_conflicts(
252 &self,
253 project: &str,
254 incoming_data: &serde_json::Value,
255 ) -> Result<Vec<SyncConflict>> {
256 debug!("Detecting conflicts for project: {}", project);
257
258 let mut detected_conflicts = Vec::new();
259
260 if let Err(e) = self.check_config_conflicts(project, incoming_data).await {
261 detected_conflicts.push(SyncConflict {
262 project: project.to_string(),
263 description: e.to_string(),
264 resolution_strategy: ConflictResolution::Manual,
265 });
266 }
267
268 Ok(detected_conflicts)
269 }
270
271 pub async fn resolve_conflict(
272 &self,
273 conflict: &SyncConflict,
274 strategy: ConflictResolution,
275 ) -> Result<()> {
276 info!(
277 "Resolving conflict in {} using strategy: {:?}",
278 conflict.project, strategy
279 );
280
281 self.log_operation(
282 &conflict.project,
283 "conflict_resolution",
284 "success",
285 &format!("Conflict resolved using strategy: {:?}", strategy),
286 )
287 .await;
288
289 let mut conflicts = self.conflicts.write().await;
290 conflicts.retain(|c| c.project != conflict.project || c.description != conflict.description);
291
292 Ok(())
293 }
294
295 pub async fn get_conflicts(&self) -> Vec<SyncConflict> {
296 self.conflicts.read().await.clone()
297 }
298
299 pub async fn get_sync_log(&self) -> Vec<SyncLogEntry> {
300 self.sync_log.read().await.clone()
301 }
302
303 pub async fn clear_sync_log(&self) {
304 self.sync_log.write().await.clear();
305 }
306
307 async fn log_operation(
308 &self,
309 project: &str,
310 operation: &str,
311 status: &str,
312 details: &str,
313 ) {
314 let entry = SyncLogEntry {
315 timestamp: chrono::Utc::now().to_rfc3339(),
316 project: project.to_string(),
317 operation: operation.to_string(),
318 status: status.to_string(),
319 details: details.to_string(),
320 };
321
322 self.sync_log.write().await.push(entry);
323 }
324
325 async fn validate_operation(&self, _operation: &Operation) -> Result<()> {
326 debug!("Validating operation");
327 Ok(())
328 }
329
330 async fn revert_operation(&self, _operation: &Operation) -> Result<()> {
331 debug!("Reverting operation");
332 Ok(())
333 }
334
335 async fn check_config_conflicts(
336 &self,
337 _project: &str,
338 _config_data: &serde_json::Value,
339 ) -> Result<()> {
340 Ok(())
341 }
342
343 async fn validate_version_compatibility(
344 &self,
345 _project: &str,
346 _version: &str,
347 ) -> Result<()> {
348 Ok(())
349 }
350}
351
352impl Default for SyncManager {
353 fn default() -> Self {
354 Self::new()
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361
362 #[tokio::test]
363 async fn test_sync_manager_creation() {
364 let manager = SyncManager::new();
365 assert_eq!(manager.get_conflicts().await.len(), 0);
366 }
367
368 #[tokio::test]
369 async fn test_start_transaction() {
370 let manager = SyncManager::new();
371 let txn_id = manager.start_transaction(vec![]).await.unwrap();
372 assert!(!txn_id.is_empty());
373 }
374
375 #[tokio::test]
376 async fn test_commit_transaction() {
377 let manager = SyncManager::new();
378 let txn_id = manager.start_transaction(vec![]).await.unwrap();
379 manager.commit_transaction(&txn_id).await.unwrap();
380 let txn = manager.get_transaction(&txn_id).await.unwrap();
381 assert_eq!(txn.unwrap().state, TransactionState::Committed);
382 }
383
384 #[tokio::test]
385 async fn test_rollback_transaction() {
386 let manager = SyncManager::new();
387 let txn_id = manager.start_transaction(vec![]).await.unwrap();
388 manager.rollback_transaction(&txn_id).await.unwrap();
389 let txn = manager.get_transaction(&txn_id).await.unwrap();
390 assert_eq!(txn.unwrap().state, TransactionState::RolledBack);
391 }
392}