1use serde::{Deserialize, Serialize};
2use std::collections::{HashMap, HashSet};
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
5pub enum PeerState {
6 Connecting,
7 Connected,
8 Disconnected,
9}
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
12pub enum MessageMethod {
13 Direct,
14 Opportunistic,
15 Propagated,
16 Resource,
17}
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20pub enum MessageState {
21 Queued,
22 PathRequested,
23 LinkEstablishing,
24 Sending,
25 SentDirect,
26 SentToPropagation,
27 Delivered,
28 Failed,
29 TimedOut,
30 Cancelled,
31 Received,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35pub enum MessageDirection {
36 Inbound,
37 Outbound,
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
41pub enum SyncPhase {
42 Idle,
43 PathRequested,
44 LinkEstablishing,
45 RequestSent,
46 Receiving,
47 Complete,
48 Failed,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
52pub struct AnnounceRecord {
53 pub destination_hex: String,
54 pub identity_hex: String,
55 pub destination_kind: String,
56 pub app_data: String,
57 pub display_name: Option<String>,
58 pub hops: u8,
59 pub interface_hex: String,
60 pub received_at_ms: u64,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
64pub struct PeerRecord {
65 pub destination_hex: String,
66 pub identity_hex: Option<String>,
67 pub lxmf_destination_hex: Option<String>,
68 pub display_name: Option<String>,
69 pub app_data: Option<String>,
70 pub state: PeerState,
71 pub last_seen_at_ms: u64,
72 pub announce_last_seen_at_ms: Option<u64>,
73 pub lxmf_last_seen_at_ms: Option<u64>,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
77pub struct ConversationRecord {
78 pub conversation_id: String,
79 pub peer_destination_hex: String,
80 pub peer_display_name: Option<String>,
81 pub last_message_preview: Option<String>,
82 pub last_message_at_ms: u64,
83 pub unread_count: u32,
84 pub last_message_state: Option<MessageState>,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
88pub struct MessageRecord {
89 pub message_id_hex: String,
90 pub conversation_id: String,
91 pub direction: MessageDirection,
92 pub destination_hex: String,
93 pub source_hex: Option<String>,
94 pub title: Option<String>,
95 pub body_utf8: String,
96 pub method: MessageMethod,
97 pub state: MessageState,
98 pub detail: Option<String>,
99 pub sent_at_ms: Option<u64>,
100 pub received_at_ms: Option<u64>,
101 pub updated_at_ms: u64,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
105pub struct SyncStatus {
106 pub phase: SyncPhase,
107 pub active_propagation_node_hex: Option<String>,
108 pub requested_at_ms: Option<u64>,
109 pub completed_at_ms: Option<u64>,
110 pub messages_received: u32,
111 pub detail: Option<String>,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
115pub struct SendMessageRequest {
116 pub destination_hex: String,
117 pub body_utf8: String,
118 pub title: Option<String>,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
122pub struct StoredOutboundMessage {
123 pub request: SendMessageRequest,
124 pub message_id_hex: String,
125}
126
127#[derive(Debug, Clone, Default)]
128pub struct MessagingStore {
129 announce_records: HashMap<String, AnnounceRecord>,
130 message_records: HashMap<String, MessageRecord>,
131 message_order: Vec<String>,
132 outbound_messages: HashMap<String, StoredOutboundMessage>,
133 sync_status: SyncStatus,
134}
135
136impl Default for SyncStatus {
137 fn default() -> Self {
138 Self {
139 phase: SyncPhase::Idle,
140 active_propagation_node_hex: None,
141 requested_at_ms: None,
142 completed_at_ms: None,
143 messages_received: 0,
144 detail: None,
145 }
146 }
147}
148
149impl MessagingStore {
150 pub fn conversation_id_for(destination_hex: &str) -> String {
151 destination_hex.trim().to_ascii_lowercase()
152 }
153
154 pub fn record_announce(&mut self, record: AnnounceRecord) {
155 self.announce_records.insert(record.destination_hex.clone(), record);
156 }
157
158 pub fn list_announces(&self) -> Vec<AnnounceRecord> {
159 let mut records = self.announce_records.values().cloned().collect::<Vec<_>>();
160 records.sort_by(|left, right| right.received_at_ms.cmp(&left.received_at_ms));
161 records
162 }
163
164 pub fn list_peers<'a, I>(&self, connected_destinations: I) -> Vec<PeerRecord>
165 where
166 I: IntoIterator<Item = &'a str>,
167 {
168 let connected = connected_destinations
169 .into_iter()
170 .map(|value| value.to_ascii_lowercase())
171 .collect::<HashSet<_>>();
172
173 let mut app_dest_by_identity = HashMap::<String, String>::new();
174 let mut lxmf_dest_by_identity = HashMap::<String, String>::new();
175 let mut app_records = HashMap::<String, AnnounceRecord>::new();
176 let mut lxmf_records = HashMap::<String, AnnounceRecord>::new();
177
178 for record in self.announce_records.values() {
179 if record.destination_kind == "app" {
180 app_dest_by_identity
181 .insert(record.identity_hex.clone(), record.destination_hex.clone());
182 app_records.insert(record.destination_hex.clone(), record.clone());
183 } else if record.destination_kind == "lxmf_delivery" {
184 lxmf_dest_by_identity
185 .insert(record.identity_hex.clone(), record.destination_hex.clone());
186 lxmf_records.insert(record.destination_hex.clone(), record.clone());
187 }
188 }
189
190 let mut peers = Vec::<PeerRecord>::new();
191 for (destination_hex, app_record) in app_records {
192 let identity_hex = Some(app_record.identity_hex.clone());
193 let lxmf_destination_hex = identity_hex
194 .as_ref()
195 .and_then(|identity| lxmf_dest_by_identity.get(identity).cloned());
196 let connected_match = connected.contains(destination_hex.as_str())
197 || lxmf_destination_hex
198 .as_ref()
199 .is_some_and(|value| connected.contains(value.as_str()));
200 peers.push(PeerRecord {
201 destination_hex: destination_hex.clone(),
202 identity_hex,
203 lxmf_destination_hex: lxmf_destination_hex.clone(),
204 display_name: lxmf_destination_hex
205 .as_ref()
206 .and_then(|value| lxmf_records.get(value))
207 .and_then(|record| record.display_name.clone()),
208 app_data: Some(app_record.app_data.clone()),
209 state: if connected_match { PeerState::Connected } else { PeerState::Disconnected },
210 last_seen_at_ms: app_record.received_at_ms.max(
211 lxmf_destination_hex
212 .as_ref()
213 .and_then(|value| lxmf_records.get(value))
214 .map(|record| record.received_at_ms)
215 .unwrap_or(0),
216 ),
217 announce_last_seen_at_ms: Some(app_record.received_at_ms),
218 lxmf_last_seen_at_ms: lxmf_destination_hex
219 .as_ref()
220 .and_then(|value| lxmf_records.get(value))
221 .map(|record| record.received_at_ms),
222 });
223 }
224
225 for (identity_hex, lxmf_destination_hex) in lxmf_dest_by_identity {
226 if app_dest_by_identity.contains_key(identity_hex.as_str()) {
227 continue;
228 }
229 if let Some(record) = lxmf_records.get(&lxmf_destination_hex) {
230 peers.push(PeerRecord {
231 destination_hex: lxmf_destination_hex.clone(),
232 identity_hex: Some(identity_hex),
233 lxmf_destination_hex: Some(lxmf_destination_hex.clone()),
234 display_name: record.display_name.clone(),
235 app_data: None,
236 state: if connected.contains(lxmf_destination_hex.as_str()) {
237 PeerState::Connected
238 } else {
239 PeerState::Disconnected
240 },
241 last_seen_at_ms: record.received_at_ms,
242 announce_last_seen_at_ms: None,
243 lxmf_last_seen_at_ms: Some(record.received_at_ms),
244 });
245 }
246 }
247
248 peers.sort_by(|left, right| right.last_seen_at_ms.cmp(&left.last_seen_at_ms));
249 peers
250 }
251
252 pub fn peer_for_identity<'a, I>(
253 &self,
254 identity_hex: &str,
255 connected_destinations: I,
256 ) -> Option<PeerRecord>
257 where
258 I: IntoIterator<Item = &'a str>,
259 {
260 self.list_peers(connected_destinations)
261 .into_iter()
262 .find(|peer| peer.identity_hex.as_deref() == Some(identity_hex))
263 }
264
265 pub fn upsert_message(&mut self, message: MessageRecord) -> bool {
266 let is_new = !self.message_records.contains_key(message.message_id_hex.as_str());
267 self.message_records.insert(message.message_id_hex.clone(), message.clone());
268 if is_new {
269 self.message_order.push(message.message_id_hex);
270 }
271 is_new
272 }
273
274 pub fn get_message(&self, message_id_hex: &str) -> Option<MessageRecord> {
275 self.message_records.get(message_id_hex).cloned()
276 }
277
278 pub fn update_message(
279 &mut self,
280 message_id_hex: &str,
281 state: MessageState,
282 detail: Option<String>,
283 updated_at_ms: u64,
284 ) -> Option<MessageRecord> {
285 let record = self.message_records.get_mut(message_id_hex)?;
286 record.state = state;
287 record.detail = detail;
288 record.updated_at_ms = updated_at_ms;
289 Some(record.clone())
290 }
291
292 pub fn list_messages(&self, conversation_id: Option<&str>) -> Vec<MessageRecord> {
293 let mut out = Vec::<MessageRecord>::new();
294 for message_id_hex in &self.message_order {
295 let Some(record) = self.message_records.get(message_id_hex).cloned() else {
296 continue;
297 };
298 if conversation_id.is_some_and(|value| record.conversation_id != value) {
299 continue;
300 }
301 out.push(record);
302 }
303 out.sort_by(|left, right| {
304 let left_time = left.received_at_ms.or(left.sent_at_ms).unwrap_or(left.updated_at_ms);
305 let right_time =
306 right.received_at_ms.or(right.sent_at_ms).unwrap_or(right.updated_at_ms);
307 left_time.cmp(&right_time)
308 });
309 out
310 }
311
312 pub fn list_conversations<'a, I>(&self, connected_destinations: I) -> Vec<ConversationRecord>
313 where
314 I: IntoIterator<Item = &'a str>,
315 {
316 let peers = self.list_peers(connected_destinations);
317 let mut peer_map = HashMap::<String, PeerRecord>::new();
318 for peer in peers {
319 peer_map.insert(peer.destination_hex.clone(), peer.clone());
320 if let Some(lxmf_destination_hex) = peer.lxmf_destination_hex.clone() {
321 peer_map.insert(lxmf_destination_hex, peer);
322 }
323 }
324
325 let records = self.list_messages(None);
326 let mut by_conversation = HashMap::<String, ConversationRecord>::new();
327 for record in records {
328 let entry =
329 by_conversation.entry(record.conversation_id.clone()).or_insert_with(|| {
330 ConversationRecord {
331 conversation_id: record.conversation_id.clone(),
332 peer_destination_hex: record.destination_hex.clone(),
333 peer_display_name: peer_map
334 .get(&record.destination_hex)
335 .and_then(peer_display_name_for),
336 last_message_preview: None,
337 last_message_at_ms: 0,
338 unread_count: 0,
339 last_message_state: None,
340 }
341 });
342
343 let event_time =
344 record.received_at_ms.or(record.sent_at_ms).unwrap_or(record.updated_at_ms);
345 if event_time >= entry.last_message_at_ms {
346 entry.peer_destination_hex = record.destination_hex.clone();
347 entry.peer_display_name =
348 peer_map.get(&record.destination_hex).and_then(peer_display_name_for);
349 entry.last_message_preview = message_preview(record.body_utf8.as_str());
350 entry.last_message_at_ms = event_time;
351 entry.last_message_state = Some(record.state);
352 }
353 if matches!(record.direction, MessageDirection::Inbound) {
354 entry.unread_count = entry.unread_count.saturating_add(1);
355 }
356 }
357
358 let mut out = by_conversation.into_values().collect::<Vec<_>>();
359 out.sort_by(|left, right| right.last_message_at_ms.cmp(&left.last_message_at_ms));
360 out
361 }
362
363 pub fn store_outbound(&mut self, outbound: StoredOutboundMessage) {
364 self.outbound_messages.insert(outbound.message_id_hex.clone(), outbound);
365 }
366
367 pub fn outbound(&self, message_id_hex: &str) -> Option<StoredOutboundMessage> {
368 self.outbound_messages.get(message_id_hex).cloned()
369 }
370
371 pub fn set_active_propagation_node(&mut self, destination_hex: Option<String>) -> SyncStatus {
372 self.sync_status.active_propagation_node_hex = destination_hex;
373 self.sync_status.clone()
374 }
375
376 pub fn sync_status(&self) -> SyncStatus {
377 self.sync_status.clone()
378 }
379
380 pub fn update_sync_status<F>(&mut self, apply: F) -> SyncStatus
381 where
382 F: FnOnce(&mut SyncStatus),
383 {
384 apply(&mut self.sync_status);
385 self.sync_status.clone()
386 }
387}
388
389fn message_preview(body_utf8: &str) -> Option<String> {
390 let trimmed = body_utf8.trim();
391 if trimmed.is_empty() {
392 return None;
393 }
394 Some(trimmed.chars().take(80).collect())
395}
396
397fn peer_display_name_for(peer: &PeerRecord) -> Option<String> {
398 peer.display_name
399 .clone()
400 .or_else(|| peer.identity_hex.clone())
401 .or_else(|| Some(peer.destination_hex.clone()))
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407
408 #[test]
409 fn peer_projection_merges_app_and_lxmf_announces() {
410 let mut store = MessagingStore::default();
411 store.record_announce(AnnounceRecord {
412 destination_hex: "appdest".into(),
413 identity_hex: "identity".into(),
414 destination_kind: "app".into(),
415 app_data: "R3AKT".into(),
416 display_name: None,
417 hops: 1,
418 interface_hex: "iface".into(),
419 received_at_ms: 10,
420 });
421 store.record_announce(AnnounceRecord {
422 destination_hex: "lxmfdest".into(),
423 identity_hex: "identity".into(),
424 destination_kind: "lxmf_delivery".into(),
425 app_data: "chat".into(),
426 display_name: Some("Alice".into()),
427 hops: 1,
428 interface_hex: "iface".into(),
429 received_at_ms: 20,
430 });
431
432 let peers = store.list_peers(["lxmfdest"]);
433 assert_eq!(peers.len(), 1);
434 assert_eq!(peers[0].destination_hex, "appdest");
435 assert_eq!(peers[0].lxmf_destination_hex.as_deref(), Some("lxmfdest"));
436 assert_eq!(peers[0].display_name.as_deref(), Some("Alice"));
437 assert_eq!(peers[0].state, PeerState::Connected);
438 }
439
440 #[test]
441 fn conversation_projection_uses_lxmf_destination_for_peer_lookup() {
442 let mut store = MessagingStore::default();
443 store.record_announce(AnnounceRecord {
444 destination_hex: "appdest".into(),
445 identity_hex: "identity".into(),
446 destination_kind: "app".into(),
447 app_data: "R3AKT".into(),
448 display_name: None,
449 hops: 1,
450 interface_hex: "iface".into(),
451 received_at_ms: 10,
452 });
453 store.record_announce(AnnounceRecord {
454 destination_hex: "lxmfdest".into(),
455 identity_hex: "identity".into(),
456 destination_kind: "lxmf_delivery".into(),
457 app_data: "chat".into(),
458 display_name: Some("Alice".into()),
459 hops: 1,
460 interface_hex: "iface".into(),
461 received_at_ms: 20,
462 });
463 store.upsert_message(MessageRecord {
464 message_id_hex: "msg".into(),
465 conversation_id: "lxmfdest".into(),
466 direction: MessageDirection::Outbound,
467 destination_hex: "lxmfdest".into(),
468 source_hex: None,
469 title: None,
470 body_utf8: "hello".into(),
471 method: MessageMethod::Direct,
472 state: MessageState::Delivered,
473 detail: None,
474 sent_at_ms: Some(30),
475 received_at_ms: None,
476 updated_at_ms: 30,
477 });
478
479 let conversations = store.list_conversations(std::iter::empty::<&str>());
480 assert_eq!(conversations.len(), 1);
481 assert_eq!(conversations[0].peer_display_name.as_deref(), Some("Alice"));
482 }
483}