saorsa_core/messaging/
sync.rs

1// Real-time message synchronization
2
3use super::DhtClient;
4use super::types::*;
5use super::user_handle::UserHandle;
6use anyhow::Result;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::{RwLock, broadcast};
12
13/// Real-time sync service for messaging
14pub struct RealtimeSync {
15    /// DHT client for distributed sync
16    _dht_client: DhtClient,
17    /// Event broadcaster
18    event_tx: broadcast::Sender<SyncEvent>,
19    /// Active subscriptions
20    subscriptions: Arc<RwLock<HashMap<ChannelId, Subscription>>>,
21    /// Presence tracker
22    presence: Arc<RwLock<HashMap<UserHandle, UserPresence>>>,
23    /// Typing indicators
24    typing: Arc<RwLock<HashMap<ChannelId, Vec<TypingUser>>>>,
25}
26
27impl RealtimeSync {
28    /// Create new sync service
29    pub async fn new(dht_client: DhtClient) -> Result<Self> {
30        let (event_tx, _) = broadcast::channel(1000);
31
32        Ok(Self {
33            _dht_client: dht_client,
34            event_tx,
35            subscriptions: Arc::new(RwLock::new(HashMap::new())),
36            presence: Arc::new(RwLock::new(HashMap::new())),
37            typing: Arc::new(RwLock::new(HashMap::new())),
38        })
39    }
40
41    /// Subscribe to channel updates
42    pub async fn subscribe_channel(&self, channel_id: ChannelId) -> broadcast::Receiver<SyncEvent> {
43        let mut subs = self.subscriptions.write().await;
44
45        subs.insert(
46            channel_id,
47            Subscription {
48                _channel_id: channel_id,
49                _subscribed_at: Utc::now(),
50                _last_sync: Utc::now(),
51            },
52        );
53
54        self.event_tx.subscribe()
55    }
56
57    /// Unsubscribe from channel
58    pub async fn unsubscribe_channel(&self, channel_id: ChannelId) -> Result<()> {
59        let mut subs = self.subscriptions.write().await;
60        subs.remove(&channel_id);
61        Ok(())
62    }
63
64    /// Broadcast new message
65    pub async fn broadcast_message(&self, message: &EncryptedMessage) -> Result<()> {
66        let event = SyncEvent::NewMessage {
67            message: message.clone(),
68            timestamp: Utc::now(),
69        };
70
71        // Broadcast locally
72        let _ = self.event_tx.send(event.clone());
73
74        // Sync to DHT
75        self.sync_to_dht(message.channel_id, event).await?;
76
77        Ok(())
78    }
79
80    /// Broadcast message edit
81    pub async fn broadcast_edit(
82        &self,
83        message_id: MessageId,
84        new_content: MessageContent,
85    ) -> Result<()> {
86        let event = SyncEvent::MessageEdited {
87            message_id,
88            new_content,
89            edited_at: Utc::now(),
90        };
91
92        let _ = self.event_tx.send(event.clone());
93        // Sync to network
94
95        Ok(())
96    }
97
98    /// Broadcast message deletion
99    pub async fn broadcast_deletion(&self, message_id: MessageId) -> Result<()> {
100        let event = SyncEvent::MessageDeleted {
101            message_id,
102            deleted_at: Utc::now(),
103        };
104
105        let _ = self.event_tx.send(event);
106        Ok(())
107    }
108
109    /// Broadcast reaction change
110    pub async fn broadcast_reaction(
111        &self,
112        message_id: MessageId,
113        emoji: String,
114        added: bool,
115    ) -> Result<()> {
116        let event = if added {
117            SyncEvent::ReactionAdded {
118                message_id,
119                emoji,
120                timestamp: Utc::now(),
121            }
122        } else {
123            SyncEvent::ReactionRemoved {
124                message_id,
125                emoji,
126                timestamp: Utc::now(),
127            }
128        };
129
130        let _ = self.event_tx.send(event);
131        Ok(())
132    }
133
134    /// Broadcast typing indicator
135    pub async fn broadcast_typing(
136        &self,
137        channel_id: ChannelId,
138        user: UserHandle,
139        is_typing: bool,
140    ) -> Result<()> {
141        let mut typing = self.typing.write().await;
142        let channel_typing = typing.entry(channel_id).or_insert_with(Vec::new);
143
144        if is_typing {
145            // Add to typing list
146            let handle = user.clone();
147            if !channel_typing.iter().any(|t| t.user == user) {
148                channel_typing.push(TypingUser {
149                    user: handle,
150                    started_at: Utc::now(),
151                });
152            }
153        } else {
154            // Remove from typing list
155            channel_typing.retain(|t| t.user != user);
156        }
157
158        // Broadcast event
159        let event = SyncEvent::TypingIndicator {
160            channel_id,
161            user,
162            is_typing,
163            timestamp: Utc::now(),
164        };
165
166        let _ = self.event_tx.send(event);
167        Ok(())
168    }
169
170    /// Broadcast read receipt
171    pub async fn broadcast_read_receipt(&self, message_id: MessageId) -> Result<()> {
172        let event = SyncEvent::ReadReceipt {
173            message_id,
174            timestamp: Utc::now(),
175        };
176
177        let _ = self.event_tx.send(event);
178        Ok(())
179    }
180
181    /// Update user presence
182    pub async fn update_presence(&self, user: UserHandle, status: PresenceStatus) -> Result<()> {
183        let mut presence = self.presence.write().await;
184        presence.insert(
185            user.clone(),
186            UserPresence {
187                identity: user.clone(),
188                status: status.clone(),
189                custom_status: None,
190                last_seen: Some(Utc::now()),
191                typing_in: Vec::new(),
192                device: DeviceType::Desktop,
193            },
194        );
195
196        // Broadcast presence update
197        let event = SyncEvent::PresenceUpdate {
198            user,
199            status,
200            timestamp: Utc::now(),
201        };
202
203        let _ = self.event_tx.send(event);
204        Ok(())
205    }
206
207    /// Get current presence for users
208    pub async fn get_presence(
209        &self,
210        users: Vec<UserHandle>,
211    ) -> HashMap<UserHandle, UserPresence> {
212        let presence = self.presence.read().await;
213
214        users
215            .into_iter()
216            .filter_map(|handle| presence.get(&handle).map(|p| (handle, p.clone())))
217            .collect()
218    }
219
220    /// Sync channel state
221    pub async fn sync_channel(&self, channel_id: ChannelId) -> Result<ChannelSyncState> {
222        // Fetch latest state from DHT
223        let _key = format!("channel:sync:{}", channel_id.0);
224
225        // In production, fetch from DHT
226        let state = ChannelSyncState {
227            channel_id,
228            last_message_id: None,
229            last_sync: Utc::now(),
230            unread_count: 0,
231            mention_count: 0,
232        };
233
234        Ok(state)
235    }
236
237    /// Handle incoming sync events
238    pub async fn handle_sync_event(&self, event: SyncEvent) -> Result<()> {
239        // Process event based on type
240        match &event {
241            SyncEvent::NewMessage { .. } => {
242                log::debug!("New message received");
243            }
244            SyncEvent::TypingIndicator {
245                channel_id,
246                user,
247                is_typing,
248                ..
249            } => {
250                let mut typing = self.typing.write().await;
251                let channel_typing = typing.entry(*channel_id).or_insert_with(Vec::new);
252
253                if *is_typing {
254                    if !channel_typing.iter().any(|t| t.user == *user) {
255                        channel_typing.push(TypingUser {
256                            user: user.clone(),
257                            started_at: Utc::now(),
258                        });
259                    }
260                } else {
261                    channel_typing.retain(|t| t.user != *user);
262                }
263            }
264            _ => {}
265        }
266
267        // Broadcast to local subscribers
268        let _ = self.event_tx.send(event);
269
270        Ok(())
271    }
272
273    /// Clean up stale typing indicators
274    pub async fn cleanup_typing(&self) {
275        let mut typing = self.typing.write().await;
276        let timeout = Utc::now() - chrono::Duration::seconds(10);
277
278        for channel_typing in typing.values_mut() {
279            channel_typing.retain(|t| t.started_at > timeout);
280        }
281    }
282
283    /// Sync to DHT network
284    async fn sync_to_dht(&self, channel_id: ChannelId, event: SyncEvent) -> Result<()> {
285        let _key = format!("channel:events:{}", channel_id.0);
286        let _value = serde_json::to_vec(&event)?;
287
288        // In production, publish to DHT
289        // self.dht_client.publish(key, value).await?;
290
291        Ok(())
292    }
293}
294
295/// Sync event types
296#[derive(Debug, Clone, Serialize, Deserialize)]
297pub enum SyncEvent {
298    NewMessage {
299        message: EncryptedMessage,
300        timestamp: DateTime<Utc>,
301    },
302    MessageEdited {
303        message_id: MessageId,
304        new_content: MessageContent,
305        edited_at: DateTime<Utc>,
306    },
307    MessageDeleted {
308        message_id: MessageId,
309        deleted_at: DateTime<Utc>,
310    },
311    ReactionAdded {
312        message_id: MessageId,
313        emoji: String,
314        timestamp: DateTime<Utc>,
315    },
316    ReactionRemoved {
317        message_id: MessageId,
318        emoji: String,
319        timestamp: DateTime<Utc>,
320    },
321    TypingIndicator {
322        channel_id: ChannelId,
323        user: UserHandle,
324        is_typing: bool,
325        timestamp: DateTime<Utc>,
326    },
327    ReadReceipt {
328        message_id: MessageId,
329        timestamp: DateTime<Utc>,
330    },
331    PresenceUpdate {
332        user: UserHandle,
333        status: PresenceStatus,
334        timestamp: DateTime<Utc>,
335    },
336}
337
338/// Channel subscription
339#[derive(Debug, Clone)]
340struct Subscription {
341    _channel_id: ChannelId,
342    _subscribed_at: DateTime<Utc>,
343    _last_sync: DateTime<Utc>,
344}
345
346/// Typing user
347#[derive(Debug, Clone)]
348struct TypingUser {
349    user: UserHandle,
350    started_at: DateTime<Utc>,
351}
352
353/// Channel sync state
354#[derive(Debug, Clone, Serialize, Deserialize)]
355pub struct ChannelSyncState {
356    pub channel_id: ChannelId,
357    pub last_message_id: Option<MessageId>,
358    pub last_sync: DateTime<Utc>,
359    pub unread_count: u32,
360    pub mention_count: u32,
361}
362
363/// Sync conflict resolution
364#[derive(Debug, Clone)]
365pub enum ConflictResolution {
366    /// Use local version
367    UseLocal,
368    /// Use remote version
369    UseRemote,
370    /// Merge both versions
371    Merge,
372    /// Create new version
373    Fork,
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379
380    #[tokio::test]
381    async fn test_sync_creation() {
382        let dht = super::DhtClient::new_mock();
383        let sync = RealtimeSync::new(dht).await.unwrap();
384
385        let channel = ChannelId::new();
386        let mut rx = sync.subscribe_channel(channel).await;
387
388        // Should be able to receive events
389        assert!(rx.try_recv().is_err()); // No events yet
390    }
391
392    #[tokio::test]
393    async fn test_typing_indicators() {
394        let dht = super::DhtClient::new_mock();
395        let sync = RealtimeSync::new(dht).await.unwrap();
396
397        let channel = ChannelId::new();
398
399        // Start typing
400        sync
401            .broadcast_typing(channel, UserHandle::from("alice"), true)
402            .await
403            .unwrap();
404
405        let typing = sync.typing.read().await;
406        assert!(typing.get(&channel).is_some());
407    }
408
409    #[tokio::test]
410    async fn test_presence_update() {
411        let dht = super::DhtClient::new_mock();
412        let sync = RealtimeSync::new(dht).await.unwrap();
413
414        sync
415            .update_presence(UserHandle::from("alice"), PresenceStatus::Online)
416            .await
417            .unwrap();
418
419        let user = UserHandle::from("alice");
420        let presence = sync.get_presence(vec![user.clone()]).await;
421
422        assert!(presence.contains_key(&user));
423    }
424}