1use super::DhtClient;
4use super::types::*;
5use super::user_handle::UserHandle;
6use anyhow::Result;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::{RwLock, broadcast};
12
13pub struct RealtimeSync {
15 _dht_client: DhtClient,
17 event_tx: broadcast::Sender<SyncEvent>,
19 subscriptions: Arc<RwLock<HashMap<ChannelId, Subscription>>>,
21 presence: Arc<RwLock<HashMap<UserHandle, UserPresence>>>,
23 typing: Arc<RwLock<HashMap<ChannelId, Vec<TypingUser>>>>,
25}
26
27impl RealtimeSync {
28 pub async fn new(dht_client: DhtClient) -> Result<Self> {
30 let (event_tx, _) = broadcast::channel(1000);
31
32 Ok(Self {
33 _dht_client: dht_client,
34 event_tx,
35 subscriptions: Arc::new(RwLock::new(HashMap::new())),
36 presence: Arc::new(RwLock::new(HashMap::new())),
37 typing: Arc::new(RwLock::new(HashMap::new())),
38 })
39 }
40
41 pub async fn subscribe_channel(&self, channel_id: ChannelId) -> broadcast::Receiver<SyncEvent> {
43 let mut subs = self.subscriptions.write().await;
44
45 subs.insert(
46 channel_id,
47 Subscription {
48 _channel_id: channel_id,
49 _subscribed_at: Utc::now(),
50 _last_sync: Utc::now(),
51 },
52 );
53
54 self.event_tx.subscribe()
55 }
56
57 pub async fn unsubscribe_channel(&self, channel_id: ChannelId) -> Result<()> {
59 let mut subs = self.subscriptions.write().await;
60 subs.remove(&channel_id);
61 Ok(())
62 }
63
64 pub async fn broadcast_message(&self, message: &EncryptedMessage) -> Result<()> {
66 let event = SyncEvent::NewMessage {
67 message: message.clone(),
68 timestamp: Utc::now(),
69 };
70
71 let _ = self.event_tx.send(event.clone());
73
74 self.sync_to_dht(message.channel_id, event).await?;
76
77 Ok(())
78 }
79
80 pub async fn broadcast_edit(
82 &self,
83 message_id: MessageId,
84 new_content: MessageContent,
85 ) -> Result<()> {
86 let event = SyncEvent::MessageEdited {
87 message_id,
88 new_content,
89 edited_at: Utc::now(),
90 };
91
92 let _ = self.event_tx.send(event.clone());
93 Ok(())
96 }
97
98 pub async fn broadcast_deletion(&self, message_id: MessageId) -> Result<()> {
100 let event = SyncEvent::MessageDeleted {
101 message_id,
102 deleted_at: Utc::now(),
103 };
104
105 let _ = self.event_tx.send(event);
106 Ok(())
107 }
108
109 pub async fn broadcast_reaction(
111 &self,
112 message_id: MessageId,
113 emoji: String,
114 added: bool,
115 ) -> Result<()> {
116 let event = if added {
117 SyncEvent::ReactionAdded {
118 message_id,
119 emoji,
120 timestamp: Utc::now(),
121 }
122 } else {
123 SyncEvent::ReactionRemoved {
124 message_id,
125 emoji,
126 timestamp: Utc::now(),
127 }
128 };
129
130 let _ = self.event_tx.send(event);
131 Ok(())
132 }
133
134 pub async fn broadcast_typing(
136 &self,
137 channel_id: ChannelId,
138 user: UserHandle,
139 is_typing: bool,
140 ) -> Result<()> {
141 let mut typing = self.typing.write().await;
142 let channel_typing = typing.entry(channel_id).or_insert_with(Vec::new);
143
144 if is_typing {
145 let handle = user.clone();
147 if !channel_typing.iter().any(|t| t.user == user) {
148 channel_typing.push(TypingUser {
149 user: handle,
150 started_at: Utc::now(),
151 });
152 }
153 } else {
154 channel_typing.retain(|t| t.user != user);
156 }
157
158 let event = SyncEvent::TypingIndicator {
160 channel_id,
161 user,
162 is_typing,
163 timestamp: Utc::now(),
164 };
165
166 let _ = self.event_tx.send(event);
167 Ok(())
168 }
169
170 pub async fn broadcast_read_receipt(&self, message_id: MessageId) -> Result<()> {
172 let event = SyncEvent::ReadReceipt {
173 message_id,
174 timestamp: Utc::now(),
175 };
176
177 let _ = self.event_tx.send(event);
178 Ok(())
179 }
180
181 pub async fn update_presence(&self, user: UserHandle, status: PresenceStatus) -> Result<()> {
183 let mut presence = self.presence.write().await;
184 presence.insert(
185 user.clone(),
186 UserPresence {
187 identity: user.clone(),
188 status: status.clone(),
189 custom_status: None,
190 last_seen: Some(Utc::now()),
191 typing_in: Vec::new(),
192 device: DeviceType::Desktop,
193 },
194 );
195
196 let event = SyncEvent::PresenceUpdate {
198 user,
199 status,
200 timestamp: Utc::now(),
201 };
202
203 let _ = self.event_tx.send(event);
204 Ok(())
205 }
206
207 pub async fn get_presence(
209 &self,
210 users: Vec<UserHandle>,
211 ) -> HashMap<UserHandle, UserPresence> {
212 let presence = self.presence.read().await;
213
214 users
215 .into_iter()
216 .filter_map(|handle| presence.get(&handle).map(|p| (handle, p.clone())))
217 .collect()
218 }
219
220 pub async fn sync_channel(&self, channel_id: ChannelId) -> Result<ChannelSyncState> {
222 let _key = format!("channel:sync:{}", channel_id.0);
224
225 let state = ChannelSyncState {
227 channel_id,
228 last_message_id: None,
229 last_sync: Utc::now(),
230 unread_count: 0,
231 mention_count: 0,
232 };
233
234 Ok(state)
235 }
236
237 pub async fn handle_sync_event(&self, event: SyncEvent) -> Result<()> {
239 match &event {
241 SyncEvent::NewMessage { .. } => {
242 log::debug!("New message received");
243 }
244 SyncEvent::TypingIndicator {
245 channel_id,
246 user,
247 is_typing,
248 ..
249 } => {
250 let mut typing = self.typing.write().await;
251 let channel_typing = typing.entry(*channel_id).or_insert_with(Vec::new);
252
253 if *is_typing {
254 if !channel_typing.iter().any(|t| t.user == *user) {
255 channel_typing.push(TypingUser {
256 user: user.clone(),
257 started_at: Utc::now(),
258 });
259 }
260 } else {
261 channel_typing.retain(|t| t.user != *user);
262 }
263 }
264 _ => {}
265 }
266
267 let _ = self.event_tx.send(event);
269
270 Ok(())
271 }
272
273 pub async fn cleanup_typing(&self) {
275 let mut typing = self.typing.write().await;
276 let timeout = Utc::now() - chrono::Duration::seconds(10);
277
278 for channel_typing in typing.values_mut() {
279 channel_typing.retain(|t| t.started_at > timeout);
280 }
281 }
282
283 async fn sync_to_dht(&self, channel_id: ChannelId, event: SyncEvent) -> Result<()> {
285 let _key = format!("channel:events:{}", channel_id.0);
286 let _value = serde_json::to_vec(&event)?;
287
288 Ok(())
292 }
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
297pub enum SyncEvent {
298 NewMessage {
299 message: EncryptedMessage,
300 timestamp: DateTime<Utc>,
301 },
302 MessageEdited {
303 message_id: MessageId,
304 new_content: MessageContent,
305 edited_at: DateTime<Utc>,
306 },
307 MessageDeleted {
308 message_id: MessageId,
309 deleted_at: DateTime<Utc>,
310 },
311 ReactionAdded {
312 message_id: MessageId,
313 emoji: String,
314 timestamp: DateTime<Utc>,
315 },
316 ReactionRemoved {
317 message_id: MessageId,
318 emoji: String,
319 timestamp: DateTime<Utc>,
320 },
321 TypingIndicator {
322 channel_id: ChannelId,
323 user: UserHandle,
324 is_typing: bool,
325 timestamp: DateTime<Utc>,
326 },
327 ReadReceipt {
328 message_id: MessageId,
329 timestamp: DateTime<Utc>,
330 },
331 PresenceUpdate {
332 user: UserHandle,
333 status: PresenceStatus,
334 timestamp: DateTime<Utc>,
335 },
336}
337
338#[derive(Debug, Clone)]
340struct Subscription {
341 _channel_id: ChannelId,
342 _subscribed_at: DateTime<Utc>,
343 _last_sync: DateTime<Utc>,
344}
345
346#[derive(Debug, Clone)]
348struct TypingUser {
349 user: UserHandle,
350 started_at: DateTime<Utc>,
351}
352
353#[derive(Debug, Clone, Serialize, Deserialize)]
355pub struct ChannelSyncState {
356 pub channel_id: ChannelId,
357 pub last_message_id: Option<MessageId>,
358 pub last_sync: DateTime<Utc>,
359 pub unread_count: u32,
360 pub mention_count: u32,
361}
362
363#[derive(Debug, Clone)]
365pub enum ConflictResolution {
366 UseLocal,
368 UseRemote,
370 Merge,
372 Fork,
374}
375
376#[cfg(test)]
377mod tests {
378 use super::*;
379
380 #[tokio::test]
381 async fn test_sync_creation() {
382 let dht = super::DhtClient::new_mock();
383 let sync = RealtimeSync::new(dht).await.unwrap();
384
385 let channel = ChannelId::new();
386 let mut rx = sync.subscribe_channel(channel).await;
387
388 assert!(rx.try_recv().is_err()); }
391
392 #[tokio::test]
393 async fn test_typing_indicators() {
394 let dht = super::DhtClient::new_mock();
395 let sync = RealtimeSync::new(dht).await.unwrap();
396
397 let channel = ChannelId::new();
398
399 sync
401 .broadcast_typing(channel, UserHandle::from("alice"), true)
402 .await
403 .unwrap();
404
405 let typing = sync.typing.read().await;
406 assert!(typing.get(&channel).is_some());
407 }
408
409 #[tokio::test]
410 async fn test_presence_update() {
411 let dht = super::DhtClient::new_mock();
412 let sync = RealtimeSync::new(dht).await.unwrap();
413
414 sync
415 .update_presence(UserHandle::from("alice"), PresenceStatus::Online)
416 .await
417 .unwrap();
418
419 let user = UserHandle::from("alice");
420 let presence = sync.get_presence(vec![user.clone()]).await;
421
422 assert!(presence.contains_key(&user));
423 }
424}