1use super::DhtClient;
4use super::types::*;
5use super::user_handle::UserHandle;
6use anyhow::Result;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::{RwLock, broadcast};
12
13pub struct RealtimeSync {
15 _dht_client: DhtClient,
17 event_tx: broadcast::Sender<SyncEvent>,
19 subscriptions: Arc<RwLock<HashMap<ChannelId, Subscription>>>,
21 presence: Arc<RwLock<HashMap<UserHandle, UserPresence>>>,
23 typing: Arc<RwLock<HashMap<ChannelId, Vec<TypingUser>>>>,
25}
26
27impl RealtimeSync {
28 pub async fn new(dht_client: DhtClient) -> Result<Self> {
30 let (event_tx, _) = broadcast::channel(1000);
31
32 Ok(Self {
33 _dht_client: dht_client,
34 event_tx,
35 subscriptions: Arc::new(RwLock::new(HashMap::new())),
36 presence: Arc::new(RwLock::new(HashMap::new())),
37 typing: Arc::new(RwLock::new(HashMap::new())),
38 })
39 }
40
41 pub async fn subscribe_channel(&self, channel_id: ChannelId) -> broadcast::Receiver<SyncEvent> {
43 let mut subs = self.subscriptions.write().await;
44
45 subs.insert(
46 channel_id,
47 Subscription {
48 _channel_id: channel_id,
49 _subscribed_at: Utc::now(),
50 _last_sync: Utc::now(),
51 },
52 );
53
54 self.event_tx.subscribe()
55 }
56
57 pub async fn unsubscribe_channel(&self, channel_id: ChannelId) -> Result<()> {
59 let mut subs = self.subscriptions.write().await;
60 subs.remove(&channel_id);
61 Ok(())
62 }
63
64 pub async fn broadcast_message(&self, message: &EncryptedMessage) -> Result<()> {
66 let event = SyncEvent::NewMessage {
67 message: message.clone(),
68 timestamp: Utc::now(),
69 };
70
71 let _ = self.event_tx.send(event.clone());
73
74 self.sync_to_dht(message.channel_id, event).await?;
76
77 Ok(())
78 }
79
80 pub async fn broadcast_edit(
82 &self,
83 message_id: MessageId,
84 new_content: MessageContent,
85 ) -> Result<()> {
86 let event = SyncEvent::MessageEdited {
87 message_id,
88 new_content,
89 edited_at: Utc::now(),
90 };
91
92 let _ = self.event_tx.send(event.clone());
93 Ok(())
96 }
97
98 pub async fn broadcast_deletion(&self, message_id: MessageId) -> Result<()> {
100 let event = SyncEvent::MessageDeleted {
101 message_id,
102 deleted_at: Utc::now(),
103 };
104
105 let _ = self.event_tx.send(event);
106 Ok(())
107 }
108
109 pub async fn broadcast_reaction(
111 &self,
112 message_id: MessageId,
113 emoji: String,
114 added: bool,
115 ) -> Result<()> {
116 let event = if added {
117 SyncEvent::ReactionAdded {
118 message_id,
119 emoji,
120 timestamp: Utc::now(),
121 }
122 } else {
123 SyncEvent::ReactionRemoved {
124 message_id,
125 emoji,
126 timestamp: Utc::now(),
127 }
128 };
129
130 let _ = self.event_tx.send(event);
131 Ok(())
132 }
133
134 pub async fn broadcast_typing(
136 &self,
137 channel_id: ChannelId,
138 user: UserHandle,
139 is_typing: bool,
140 ) -> Result<()> {
141 let mut typing = self.typing.write().await;
142 let channel_typing = typing.entry(channel_id).or_insert_with(Vec::new);
143
144 if is_typing {
145 let handle = user.clone();
147 if !channel_typing.iter().any(|t| t.user == user) {
148 channel_typing.push(TypingUser {
149 user: handle,
150 started_at: Utc::now(),
151 });
152 }
153 } else {
154 channel_typing.retain(|t| t.user != user);
156 }
157
158 let event = SyncEvent::TypingIndicator {
160 channel_id,
161 user,
162 is_typing,
163 timestamp: Utc::now(),
164 };
165
166 let _ = self.event_tx.send(event);
167 Ok(())
168 }
169
170 pub async fn broadcast_read_receipt(&self, message_id: MessageId) -> Result<()> {
172 let event = SyncEvent::ReadReceipt {
173 message_id,
174 timestamp: Utc::now(),
175 };
176
177 let _ = self.event_tx.send(event);
178 Ok(())
179 }
180
181 pub async fn update_presence(&self, user: UserHandle, status: PresenceStatus) -> Result<()> {
183 let mut presence = self.presence.write().await;
184 presence.insert(
185 user.clone(),
186 UserPresence {
187 identity: user.clone(),
188 status: status.clone(),
189 custom_status: None,
190 last_seen: Some(Utc::now()),
191 typing_in: Vec::new(),
192 device: DeviceType::Desktop,
193 },
194 );
195
196 let event = SyncEvent::PresenceUpdate {
198 user,
199 status,
200 timestamp: Utc::now(),
201 };
202
203 let _ = self.event_tx.send(event);
204 Ok(())
205 }
206
207 pub async fn get_presence(&self, users: Vec<UserHandle>) -> HashMap<UserHandle, UserPresence> {
209 let presence = self.presence.read().await;
210
211 users
212 .into_iter()
213 .filter_map(|handle| presence.get(&handle).map(|p| (handle, p.clone())))
214 .collect()
215 }
216
217 pub async fn sync_channel(&self, channel_id: ChannelId) -> Result<ChannelSyncState> {
219 let _key = format!("channel:sync:{}", channel_id.0);
221
222 let state = ChannelSyncState {
224 channel_id,
225 last_message_id: None,
226 last_sync: Utc::now(),
227 unread_count: 0,
228 mention_count: 0,
229 };
230
231 Ok(state)
232 }
233
234 pub async fn handle_sync_event(&self, event: SyncEvent) -> Result<()> {
236 match &event {
238 SyncEvent::NewMessage { .. } => {
239 tracing::debug!("New message received");
240 }
241 SyncEvent::TypingIndicator {
242 channel_id,
243 user,
244 is_typing,
245 ..
246 } => {
247 let mut typing = self.typing.write().await;
248 let channel_typing = typing.entry(*channel_id).or_insert_with(Vec::new);
249
250 if *is_typing {
251 if !channel_typing.iter().any(|t| t.user == *user) {
252 channel_typing.push(TypingUser {
253 user: user.clone(),
254 started_at: Utc::now(),
255 });
256 }
257 } else {
258 channel_typing.retain(|t| t.user != *user);
259 }
260 }
261 _ => {}
262 }
263
264 let _ = self.event_tx.send(event);
266
267 Ok(())
268 }
269
270 pub async fn cleanup_typing(&self) {
272 let mut typing = self.typing.write().await;
273 let timeout = Utc::now() - chrono::Duration::seconds(10);
274
275 for channel_typing in typing.values_mut() {
276 channel_typing.retain(|t| t.started_at > timeout);
277 }
278 }
279
280 async fn sync_to_dht(&self, channel_id: ChannelId, event: SyncEvent) -> Result<()> {
282 let _key = format!("channel:events:{}", channel_id.0);
283 let _value = serde_json::to_vec(&event)?;
284
285 Ok(())
289 }
290}
291
292#[derive(Debug, Clone, Serialize, Deserialize)]
294pub enum SyncEvent {
295 NewMessage {
296 message: EncryptedMessage,
297 timestamp: DateTime<Utc>,
298 },
299 MessageEdited {
300 message_id: MessageId,
301 new_content: MessageContent,
302 edited_at: DateTime<Utc>,
303 },
304 MessageDeleted {
305 message_id: MessageId,
306 deleted_at: DateTime<Utc>,
307 },
308 ReactionAdded {
309 message_id: MessageId,
310 emoji: String,
311 timestamp: DateTime<Utc>,
312 },
313 ReactionRemoved {
314 message_id: MessageId,
315 emoji: String,
316 timestamp: DateTime<Utc>,
317 },
318 TypingIndicator {
319 channel_id: ChannelId,
320 user: UserHandle,
321 is_typing: bool,
322 timestamp: DateTime<Utc>,
323 },
324 ReadReceipt {
325 message_id: MessageId,
326 timestamp: DateTime<Utc>,
327 },
328 PresenceUpdate {
329 user: UserHandle,
330 status: PresenceStatus,
331 timestamp: DateTime<Utc>,
332 },
333}
334
335#[derive(Debug, Clone)]
337struct Subscription {
338 _channel_id: ChannelId,
339 _subscribed_at: DateTime<Utc>,
340 _last_sync: DateTime<Utc>,
341}
342
343#[derive(Debug, Clone)]
345struct TypingUser {
346 user: UserHandle,
347 started_at: DateTime<Utc>,
348}
349
350#[derive(Debug, Clone, Serialize, Deserialize)]
352pub struct ChannelSyncState {
353 pub channel_id: ChannelId,
354 pub last_message_id: Option<MessageId>,
355 pub last_sync: DateTime<Utc>,
356 pub unread_count: u32,
357 pub mention_count: u32,
358}
359
360#[derive(Debug, Clone)]
362pub enum ConflictResolution {
363 UseLocal,
365 UseRemote,
367 Merge,
369 Fork,
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376
377 #[tokio::test]
378 async fn test_sync_creation() {
379 let dht = super::DhtClient::new_mock();
380 let sync = RealtimeSync::new(dht).await.unwrap();
381
382 let channel = ChannelId::new();
383 let mut rx = sync.subscribe_channel(channel).await;
384
385 assert!(rx.try_recv().is_err()); }
388
389 #[tokio::test]
390 async fn test_typing_indicators() {
391 let dht = super::DhtClient::new_mock();
392 let sync = RealtimeSync::new(dht).await.unwrap();
393
394 let channel = ChannelId::new();
395
396 sync.broadcast_typing(channel, UserHandle::from("alice"), true)
398 .await
399 .unwrap();
400
401 let typing = sync.typing.read().await;
402 assert!(typing.get(&channel).is_some());
403 }
404
405 #[tokio::test]
406 async fn test_presence_update() {
407 let dht = super::DhtClient::new_mock();
408 let sync = RealtimeSync::new(dht).await.unwrap();
409
410 sync.update_presence(UserHandle::from("alice"), PresenceStatus::Online)
411 .await
412 .unwrap();
413
414 let user = UserHandle::from("alice");
415 let presence = sync.get_presence(vec![user.clone()]).await;
416
417 assert!(presence.contains_key(&user));
418 }
419}