saorsa_core/messaging/
sync.rs

1// Real-time message synchronization
2
3use super::DhtClient;
4use super::types::*;
5use super::user_handle::UserHandle;
6use anyhow::Result;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::{RwLock, broadcast};
12
13/// Real-time sync service for messaging
14pub struct RealtimeSync {
15    /// DHT client for distributed sync
16    _dht_client: DhtClient,
17    /// Event broadcaster
18    event_tx: broadcast::Sender<SyncEvent>,
19    /// Active subscriptions
20    subscriptions: Arc<RwLock<HashMap<ChannelId, Subscription>>>,
21    /// Presence tracker
22    presence: Arc<RwLock<HashMap<UserHandle, UserPresence>>>,
23    /// Typing indicators
24    typing: Arc<RwLock<HashMap<ChannelId, Vec<TypingUser>>>>,
25}
26
27impl RealtimeSync {
28    /// Create new sync service
29    pub async fn new(dht_client: DhtClient) -> Result<Self> {
30        let (event_tx, _) = broadcast::channel(1000);
31
32        Ok(Self {
33            _dht_client: dht_client,
34            event_tx,
35            subscriptions: Arc::new(RwLock::new(HashMap::new())),
36            presence: Arc::new(RwLock::new(HashMap::new())),
37            typing: Arc::new(RwLock::new(HashMap::new())),
38        })
39    }
40
41    /// Subscribe to channel updates
42    pub async fn subscribe_channel(&self, channel_id: ChannelId) -> broadcast::Receiver<SyncEvent> {
43        let mut subs = self.subscriptions.write().await;
44
45        subs.insert(
46            channel_id,
47            Subscription {
48                _channel_id: channel_id,
49                _subscribed_at: Utc::now(),
50                _last_sync: Utc::now(),
51            },
52        );
53
54        self.event_tx.subscribe()
55    }
56
57    /// Unsubscribe from channel
58    pub async fn unsubscribe_channel(&self, channel_id: ChannelId) -> Result<()> {
59        let mut subs = self.subscriptions.write().await;
60        subs.remove(&channel_id);
61        Ok(())
62    }
63
64    /// Broadcast new message
65    pub async fn broadcast_message(&self, message: &EncryptedMessage) -> Result<()> {
66        let event = SyncEvent::NewMessage {
67            message: message.clone(),
68            timestamp: Utc::now(),
69        };
70
71        // Broadcast locally
72        let _ = self.event_tx.send(event.clone());
73
74        // Sync to DHT
75        self.sync_to_dht(message.channel_id, event).await?;
76
77        Ok(())
78    }
79
80    /// Broadcast message edit
81    pub async fn broadcast_edit(
82        &self,
83        message_id: MessageId,
84        new_content: MessageContent,
85    ) -> Result<()> {
86        let event = SyncEvent::MessageEdited {
87            message_id,
88            new_content,
89            edited_at: Utc::now(),
90        };
91
92        let _ = self.event_tx.send(event.clone());
93        // Sync to network
94
95        Ok(())
96    }
97
98    /// Broadcast message deletion
99    pub async fn broadcast_deletion(&self, message_id: MessageId) -> Result<()> {
100        let event = SyncEvent::MessageDeleted {
101            message_id,
102            deleted_at: Utc::now(),
103        };
104
105        let _ = self.event_tx.send(event);
106        Ok(())
107    }
108
109    /// Broadcast reaction change
110    pub async fn broadcast_reaction(
111        &self,
112        message_id: MessageId,
113        emoji: String,
114        added: bool,
115    ) -> Result<()> {
116        let event = if added {
117            SyncEvent::ReactionAdded {
118                message_id,
119                emoji,
120                timestamp: Utc::now(),
121            }
122        } else {
123            SyncEvent::ReactionRemoved {
124                message_id,
125                emoji,
126                timestamp: Utc::now(),
127            }
128        };
129
130        let _ = self.event_tx.send(event);
131        Ok(())
132    }
133
134    /// Broadcast typing indicator
135    pub async fn broadcast_typing(
136        &self,
137        channel_id: ChannelId,
138        user: UserHandle,
139        is_typing: bool,
140    ) -> Result<()> {
141        let mut typing = self.typing.write().await;
142        let channel_typing = typing.entry(channel_id).or_insert_with(Vec::new);
143
144        if is_typing {
145            // Add to typing list
146            let handle = user.clone();
147            if !channel_typing.iter().any(|t| t.user == user) {
148                channel_typing.push(TypingUser {
149                    user: handle,
150                    started_at: Utc::now(),
151                });
152            }
153        } else {
154            // Remove from typing list
155            channel_typing.retain(|t| t.user != user);
156        }
157
158        // Broadcast event
159        let event = SyncEvent::TypingIndicator {
160            channel_id,
161            user,
162            is_typing,
163            timestamp: Utc::now(),
164        };
165
166        let _ = self.event_tx.send(event);
167        Ok(())
168    }
169
170    /// Broadcast read receipt
171    pub async fn broadcast_read_receipt(&self, message_id: MessageId) -> Result<()> {
172        let event = SyncEvent::ReadReceipt {
173            message_id,
174            timestamp: Utc::now(),
175        };
176
177        let _ = self.event_tx.send(event);
178        Ok(())
179    }
180
181    /// Update user presence
182    pub async fn update_presence(&self, user: UserHandle, status: PresenceStatus) -> Result<()> {
183        let mut presence = self.presence.write().await;
184        presence.insert(
185            user.clone(),
186            UserPresence {
187                identity: user.clone(),
188                status: status.clone(),
189                custom_status: None,
190                last_seen: Some(Utc::now()),
191                typing_in: Vec::new(),
192                device: DeviceType::Desktop,
193            },
194        );
195
196        // Broadcast presence update
197        let event = SyncEvent::PresenceUpdate {
198            user,
199            status,
200            timestamp: Utc::now(),
201        };
202
203        let _ = self.event_tx.send(event);
204        Ok(())
205    }
206
207    /// Get current presence for users
208    pub async fn get_presence(&self, users: Vec<UserHandle>) -> HashMap<UserHandle, UserPresence> {
209        let presence = self.presence.read().await;
210
211        users
212            .into_iter()
213            .filter_map(|handle| presence.get(&handle).map(|p| (handle, p.clone())))
214            .collect()
215    }
216
217    /// Sync channel state
218    pub async fn sync_channel(&self, channel_id: ChannelId) -> Result<ChannelSyncState> {
219        // Fetch latest state from DHT
220        let _key = format!("channel:sync:{}", channel_id.0);
221
222        // In production, fetch from DHT
223        let state = ChannelSyncState {
224            channel_id,
225            last_message_id: None,
226            last_sync: Utc::now(),
227            unread_count: 0,
228            mention_count: 0,
229        };
230
231        Ok(state)
232    }
233
234    /// Handle incoming sync events
235    pub async fn handle_sync_event(&self, event: SyncEvent) -> Result<()> {
236        // Process event based on type
237        match &event {
238            SyncEvent::NewMessage { .. } => {
239                tracing::debug!("New message received");
240            }
241            SyncEvent::TypingIndicator {
242                channel_id,
243                user,
244                is_typing,
245                ..
246            } => {
247                let mut typing = self.typing.write().await;
248                let channel_typing = typing.entry(*channel_id).or_insert_with(Vec::new);
249
250                if *is_typing {
251                    if !channel_typing.iter().any(|t| t.user == *user) {
252                        channel_typing.push(TypingUser {
253                            user: user.clone(),
254                            started_at: Utc::now(),
255                        });
256                    }
257                } else {
258                    channel_typing.retain(|t| t.user != *user);
259                }
260            }
261            _ => {}
262        }
263
264        // Broadcast to local subscribers
265        let _ = self.event_tx.send(event);
266
267        Ok(())
268    }
269
270    /// Clean up stale typing indicators
271    pub async fn cleanup_typing(&self) {
272        let mut typing = self.typing.write().await;
273        let timeout = Utc::now() - chrono::Duration::seconds(10);
274
275        for channel_typing in typing.values_mut() {
276            channel_typing.retain(|t| t.started_at > timeout);
277        }
278    }
279
280    /// Sync to DHT network
281    async fn sync_to_dht(&self, channel_id: ChannelId, event: SyncEvent) -> Result<()> {
282        let _key = format!("channel:events:{}", channel_id.0);
283        let _value = serde_json::to_vec(&event)?;
284
285        // In production, publish to DHT
286        // self.dht_client.publish(key, value).await?;
287
288        Ok(())
289    }
290}
291
292/// Sync event types
293#[derive(Debug, Clone, Serialize, Deserialize)]
294pub enum SyncEvent {
295    NewMessage {
296        message: EncryptedMessage,
297        timestamp: DateTime<Utc>,
298    },
299    MessageEdited {
300        message_id: MessageId,
301        new_content: MessageContent,
302        edited_at: DateTime<Utc>,
303    },
304    MessageDeleted {
305        message_id: MessageId,
306        deleted_at: DateTime<Utc>,
307    },
308    ReactionAdded {
309        message_id: MessageId,
310        emoji: String,
311        timestamp: DateTime<Utc>,
312    },
313    ReactionRemoved {
314        message_id: MessageId,
315        emoji: String,
316        timestamp: DateTime<Utc>,
317    },
318    TypingIndicator {
319        channel_id: ChannelId,
320        user: UserHandle,
321        is_typing: bool,
322        timestamp: DateTime<Utc>,
323    },
324    ReadReceipt {
325        message_id: MessageId,
326        timestamp: DateTime<Utc>,
327    },
328    PresenceUpdate {
329        user: UserHandle,
330        status: PresenceStatus,
331        timestamp: DateTime<Utc>,
332    },
333}
334
335/// Channel subscription
336#[derive(Debug, Clone)]
337struct Subscription {
338    _channel_id: ChannelId,
339    _subscribed_at: DateTime<Utc>,
340    _last_sync: DateTime<Utc>,
341}
342
343/// Typing user
344#[derive(Debug, Clone)]
345struct TypingUser {
346    user: UserHandle,
347    started_at: DateTime<Utc>,
348}
349
350/// Channel sync state
351#[derive(Debug, Clone, Serialize, Deserialize)]
352pub struct ChannelSyncState {
353    pub channel_id: ChannelId,
354    pub last_message_id: Option<MessageId>,
355    pub last_sync: DateTime<Utc>,
356    pub unread_count: u32,
357    pub mention_count: u32,
358}
359
360/// Sync conflict resolution
361#[derive(Debug, Clone)]
362pub enum ConflictResolution {
363    /// Use local version
364    UseLocal,
365    /// Use remote version
366    UseRemote,
367    /// Merge both versions
368    Merge,
369    /// Create new version
370    Fork,
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376
377    #[tokio::test]
378    async fn test_sync_creation() {
379        let dht = super::DhtClient::new_mock();
380        let sync = RealtimeSync::new(dht).await.unwrap();
381
382        let channel = ChannelId::new();
383        let mut rx = sync.subscribe_channel(channel).await;
384
385        // Should be able to receive events
386        assert!(rx.try_recv().is_err()); // No events yet
387    }
388
389    #[tokio::test]
390    async fn test_typing_indicators() {
391        let dht = super::DhtClient::new_mock();
392        let sync = RealtimeSync::new(dht).await.unwrap();
393
394        let channel = ChannelId::new();
395
396        // Start typing
397        sync.broadcast_typing(channel, UserHandle::from("alice"), true)
398            .await
399            .unwrap();
400
401        let typing = sync.typing.read().await;
402        assert!(typing.get(&channel).is_some());
403    }
404
405    #[tokio::test]
406    async fn test_presence_update() {
407        let dht = super::DhtClient::new_mock();
408        let sync = RealtimeSync::new(dht).await.unwrap();
409
410        sync.update_presence(UserHandle::from("alice"), PresenceStatus::Online)
411            .await
412            .unwrap();
413
414        let user = UserHandle::from("alice");
415        let presence = sync.get_presence(vec![user.clone()]).await;
416
417        assert!(presence.contains_key(&user));
418    }
419}