ricecoder_teams/
sync.rs

1/// Synchronization and hot-reload support
2use crate::error::{Result, TeamError};
3use ricecoder_storage::PathResolver;
4use std::collections::HashMap;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime};
8use tokio::sync::RwLock;
9use tokio::time::sleep;
10
11/// Configuration change event
12#[derive(Debug, Clone)]
13pub struct ConfigChangeEvent {
14    pub team_id: String,
15    pub timestamp: SystemTime,
16    pub change_type: ChangeType,
17}
18
19/// Type of configuration change
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum ChangeType {
22    Created,
23    Modified,
24    Deleted,
25}
26
27/// Callback for configuration changes
28pub type ChangeCallback = Arc<dyn Fn(ConfigChangeEvent) + Send + Sync>;
29
30/// Synchronizes team standards and supports hot-reload
31pub struct SyncService {
32    /// Tracks last modified time for each team's standards
33    last_modified: Arc<RwLock<HashMap<String, SystemTime>>>,
34    /// Callbacks for configuration changes
35    change_callbacks: Arc<RwLock<Vec<ChangeCallback>>>,
36    /// Team members for notification
37    team_members: Arc<RwLock<HashMap<String, Vec<String>>>>,
38    /// Notification history to track delivery
39    notification_history: Arc<RwLock<Vec<NotificationRecord>>>,
40}
41
42/// Record of a notification sent to team members
43#[derive(Debug, Clone)]
44pub struct NotificationRecord {
45    pub team_id: String,
46    pub message: String,
47    pub recipients: Vec<String>,
48    pub timestamp: SystemTime,
49    pub delivered: bool,
50}
51
52impl SyncService {
53    /// Create a new SyncService
54    pub fn new() -> Self {
55        SyncService {
56            last_modified: Arc::new(RwLock::new(HashMap::new())),
57            change_callbacks: Arc::new(RwLock::new(Vec::new())),
58            team_members: Arc::new(RwLock::new(HashMap::new())),
59            notification_history: Arc::new(RwLock::new(Vec::new())),
60        }
61    }
62
63    /// Register a callback for configuration changes
64    pub async fn register_change_callback(&self, callback: ChangeCallback) -> Result<()> {
65        let mut callbacks = self.change_callbacks.write().await;
66        callbacks.push(callback);
67        tracing::debug!("Configuration change callback registered");
68        Ok(())
69    }
70
71    /// Register team members for notifications
72    pub async fn register_team_members(
73        &self,
74        team_id: &str,
75        member_ids: Vec<String>,
76    ) -> Result<()> {
77        let mut members = self.team_members.write().await;
78        members.insert(team_id.to_string(), member_ids);
79        tracing::info!(
80            team_id = %team_id,
81            member_count = %members.get(team_id).map(|m| m.len()).unwrap_or(0),
82            "Team members registered for notifications"
83        );
84        Ok(())
85    }
86
87    /// Synchronize standards for a team
88    pub async fn sync_standards(&self, team_id: &str) -> Result<()> {
89        let storage_path = Self::resolve_team_standards_path(team_id)?;
90
91        if !storage_path.exists() {
92            return Err(TeamError::TeamNotFound(format!(
93                "Standards not found for team: {}",
94                team_id
95            )));
96        }
97
98        // Get current modification time
99        let metadata = std::fs::metadata(&storage_path)
100            .map_err(|e| TeamError::StorageError(format!("Failed to get file metadata: {}", e)))?;
101
102        let modified_time = metadata.modified().map_err(|e| {
103            TeamError::StorageError(format!("Failed to get modification time: {}", e))
104        })?;
105
106        // Update last modified time
107        let mut last_mod = self.last_modified.write().await;
108        last_mod.insert(team_id.to_string(), modified_time);
109
110        tracing::info!(
111            team_id = %team_id,
112            path = ?storage_path,
113            "Standards synchronized successfully"
114        );
115
116        Ok(())
117    }
118
119    /// Watch for configuration changes within 5 seconds
120    pub async fn watch_for_changes(&self, team_id: &str) -> Result<()> {
121        let storage_path = Self::resolve_team_standards_path(team_id)?;
122
123        // Initialize last modified time
124        self.sync_standards(team_id).await?;
125
126        let team_id_clone = team_id.to_string();
127        let last_modified = Arc::clone(&self.last_modified);
128        let change_callbacks = Arc::clone(&self.change_callbacks);
129
130        // Spawn a background task to watch for changes
131        tokio::spawn(async move {
132            loop {
133                // Check for changes every 1 second (will detect within 5 seconds)
134                sleep(Duration::from_secs(1)).await;
135
136                if !storage_path.exists() {
137                    continue;
138                }
139
140                // Get current modification time
141                match std::fs::metadata(&storage_path) {
142                    Ok(metadata) => {
143                        if let Ok(current_modified) = metadata.modified() {
144                            let last_mod = last_modified.read().await;
145                            if let Some(last_time) = last_mod.get(&team_id_clone) {
146                                if current_modified > *last_time {
147                                    // Configuration has changed
148                                    drop(last_mod);
149
150                                    let event = ConfigChangeEvent {
151                                        team_id: team_id_clone.clone(),
152                                        timestamp: current_modified,
153                                        change_type: ChangeType::Modified,
154                                    };
155
156                                    // Update last modified time
157                                    let mut last_mod_write = last_modified.write().await;
158                                    last_mod_write.insert(team_id_clone.clone(), current_modified);
159                                    drop(last_mod_write);
160
161                                    // Trigger callbacks
162                                    let callbacks = change_callbacks.read().await;
163                                    for callback in callbacks.iter() {
164                                        callback(event.clone());
165                                    }
166
167                                    tracing::info!(
168                                        team_id = %team_id_clone,
169                                        "Configuration change detected"
170                                    );
171                                }
172                            }
173                        }
174                    }
175                    Err(e) => {
176                        tracing::warn!(
177                            team_id = %team_id_clone,
178                            error = %e,
179                            "Failed to check file metadata"
180                        );
181                    }
182                }
183            }
184        });
185
186        tracing::info!(
187            team_id = %team_id,
188            "Watching for configuration changes (5-second detection window)"
189        );
190
191        Ok(())
192    }
193
194    /// Notify all team members of changes
195    pub async fn notify_members(&self, team_id: &str, message: &str) -> Result<()> {
196        let members = self.team_members.read().await;
197
198        let member_ids = members.get(team_id).cloned().unwrap_or_default();
199
200        if member_ids.is_empty() {
201            tracing::warn!(
202                team_id = %team_id,
203                "No team members registered for notifications"
204            );
205            return Ok(());
206        }
207
208        // Create notification record
209        let record = NotificationRecord {
210            team_id: team_id.to_string(),
211            message: message.to_string(),
212            recipients: member_ids.clone(),
213            timestamp: SystemTime::now(),
214            delivered: true,
215        };
216
217        // Store notification record
218        let mut history = self.notification_history.write().await;
219        history.push(record);
220
221        tracing::info!(
222            team_id = %team_id,
223            recipient_count = %member_ids.len(),
224            message = %message,
225            "Team members notified of configuration changes"
226        );
227
228        Ok(())
229    }
230
231    /// Get notification history for a team
232    pub async fn get_notification_history(&self, team_id: &str) -> Result<Vec<NotificationRecord>> {
233        let history = self.notification_history.read().await;
234        let team_notifications: Vec<NotificationRecord> = history
235            .iter()
236            .filter(|n| n.team_id == team_id)
237            .cloned()
238            .collect();
239
240        Ok(team_notifications)
241    }
242
243    /// Get last modified time for a team's standards
244    pub async fn get_last_modified(&self, team_id: &str) -> Result<Option<SystemTime>> {
245        let last_mod = self.last_modified.read().await;
246        Ok(last_mod.get(team_id).copied())
247    }
248
249    // Helper functions
250
251    /// Resolve the storage path for team standards
252    fn resolve_team_standards_path(team_id: &str) -> Result<PathBuf> {
253        let global_path = PathResolver::resolve_global_path()
254            .map_err(|e| TeamError::StorageError(e.to_string()))?;
255
256        let standards_path = global_path
257            .join("teams")
258            .join(team_id)
259            .join("standards.yaml");
260
261        Ok(standards_path)
262    }
263}
264
265impl Default for SyncService {
266    fn default() -> Self {
267        Self::new()
268    }
269}