Skip to main content

lxmf_sdk/
messaging.rs

1use serde::{Deserialize, Serialize};
2use std::collections::{HashMap, HashSet};
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
5pub enum PeerState {
6    Connecting,
7    Connected,
8    Disconnected,
9}
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
12pub enum MessageMethod {
13    Direct,
14    Opportunistic,
15    Propagated,
16    Resource,
17}
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20pub enum MessageState {
21    Queued,
22    PathRequested,
23    LinkEstablishing,
24    Sending,
25    SentDirect,
26    SentToPropagation,
27    Delivered,
28    Failed,
29    TimedOut,
30    Cancelled,
31    Received,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35pub enum MessageDirection {
36    Inbound,
37    Outbound,
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
41pub enum SyncPhase {
42    Idle,
43    PathRequested,
44    LinkEstablishing,
45    RequestSent,
46    Receiving,
47    Complete,
48    Failed,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
52pub struct AnnounceRecord {
53    pub destination_hex: String,
54    pub identity_hex: String,
55    pub destination_kind: String,
56    pub app_data: String,
57    pub display_name: Option<String>,
58    pub hops: u8,
59    pub interface_hex: String,
60    pub received_at_ms: u64,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
64pub struct PeerRecord {
65    pub destination_hex: String,
66    pub identity_hex: Option<String>,
67    pub lxmf_destination_hex: Option<String>,
68    pub display_name: Option<String>,
69    pub app_data: Option<String>,
70    pub state: PeerState,
71    pub last_seen_at_ms: u64,
72    pub announce_last_seen_at_ms: Option<u64>,
73    pub lxmf_last_seen_at_ms: Option<u64>,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
77pub struct ConversationRecord {
78    pub conversation_id: String,
79    pub peer_destination_hex: String,
80    pub peer_display_name: Option<String>,
81    pub last_message_preview: Option<String>,
82    pub last_message_at_ms: u64,
83    pub unread_count: u32,
84    pub last_message_state: Option<MessageState>,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
88pub struct MessageRecord {
89    pub message_id_hex: String,
90    pub conversation_id: String,
91    pub direction: MessageDirection,
92    pub destination_hex: String,
93    pub source_hex: Option<String>,
94    pub title: Option<String>,
95    pub body_utf8: String,
96    pub method: MessageMethod,
97    pub state: MessageState,
98    pub detail: Option<String>,
99    pub sent_at_ms: Option<u64>,
100    pub received_at_ms: Option<u64>,
101    pub updated_at_ms: u64,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
105pub struct SyncStatus {
106    pub phase: SyncPhase,
107    pub active_propagation_node_hex: Option<String>,
108    pub requested_at_ms: Option<u64>,
109    pub completed_at_ms: Option<u64>,
110    pub messages_received: u32,
111    pub detail: Option<String>,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
115pub struct SendMessageRequest {
116    pub destination_hex: String,
117    pub body_utf8: String,
118    pub title: Option<String>,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
122pub struct StoredOutboundMessage {
123    pub request: SendMessageRequest,
124    pub message_id_hex: String,
125}
126
127#[derive(Debug, Clone, Default)]
128pub struct MessagingStore {
129    announce_records: HashMap<String, AnnounceRecord>,
130    message_records: HashMap<String, MessageRecord>,
131    message_order: Vec<String>,
132    outbound_messages: HashMap<String, StoredOutboundMessage>,
133    sync_status: SyncStatus,
134}
135
136impl Default for SyncStatus {
137    fn default() -> Self {
138        Self {
139            phase: SyncPhase::Idle,
140            active_propagation_node_hex: None,
141            requested_at_ms: None,
142            completed_at_ms: None,
143            messages_received: 0,
144            detail: None,
145        }
146    }
147}
148
149impl MessagingStore {
150    pub fn conversation_id_for(destination_hex: &str) -> String {
151        destination_hex.trim().to_ascii_lowercase()
152    }
153
154    pub fn record_announce(&mut self, record: AnnounceRecord) {
155        self.announce_records.insert(record.destination_hex.clone(), record);
156    }
157
158    pub fn list_announces(&self) -> Vec<AnnounceRecord> {
159        let mut records = self.announce_records.values().cloned().collect::<Vec<_>>();
160        records.sort_by(|left, right| right.received_at_ms.cmp(&left.received_at_ms));
161        records
162    }
163
164    pub fn list_peers<'a, I>(&self, connected_destinations: I) -> Vec<PeerRecord>
165    where
166        I: IntoIterator<Item = &'a str>,
167    {
168        let connected = connected_destinations
169            .into_iter()
170            .map(|value| value.to_ascii_lowercase())
171            .collect::<HashSet<_>>();
172
173        let mut app_dest_by_identity = HashMap::<String, String>::new();
174        let mut lxmf_dest_by_identity = HashMap::<String, String>::new();
175        let mut app_records = HashMap::<String, AnnounceRecord>::new();
176        let mut lxmf_records = HashMap::<String, AnnounceRecord>::new();
177
178        for record in self.announce_records.values() {
179            if record.destination_kind == "app" {
180                app_dest_by_identity
181                    .insert(record.identity_hex.clone(), record.destination_hex.clone());
182                app_records.insert(record.destination_hex.clone(), record.clone());
183            } else if record.destination_kind == "lxmf_delivery" {
184                lxmf_dest_by_identity
185                    .insert(record.identity_hex.clone(), record.destination_hex.clone());
186                lxmf_records.insert(record.destination_hex.clone(), record.clone());
187            }
188        }
189
190        let mut peers = Vec::<PeerRecord>::new();
191        for (destination_hex, app_record) in app_records {
192            let identity_hex = Some(app_record.identity_hex.clone());
193            let lxmf_destination_hex = identity_hex
194                .as_ref()
195                .and_then(|identity| lxmf_dest_by_identity.get(identity).cloned());
196            let connected_match = connected.contains(destination_hex.as_str())
197                || lxmf_destination_hex
198                    .as_ref()
199                    .is_some_and(|value| connected.contains(value.as_str()));
200            peers.push(PeerRecord {
201                destination_hex: destination_hex.clone(),
202                identity_hex,
203                lxmf_destination_hex: lxmf_destination_hex.clone(),
204                display_name: lxmf_destination_hex
205                    .as_ref()
206                    .and_then(|value| lxmf_records.get(value))
207                    .and_then(|record| record.display_name.clone()),
208                app_data: Some(app_record.app_data.clone()),
209                state: if connected_match { PeerState::Connected } else { PeerState::Disconnected },
210                last_seen_at_ms: app_record.received_at_ms.max(
211                    lxmf_destination_hex
212                        .as_ref()
213                        .and_then(|value| lxmf_records.get(value))
214                        .map(|record| record.received_at_ms)
215                        .unwrap_or(0),
216                ),
217                announce_last_seen_at_ms: Some(app_record.received_at_ms),
218                lxmf_last_seen_at_ms: lxmf_destination_hex
219                    .as_ref()
220                    .and_then(|value| lxmf_records.get(value))
221                    .map(|record| record.received_at_ms),
222            });
223        }
224
225        for (identity_hex, lxmf_destination_hex) in lxmf_dest_by_identity {
226            if app_dest_by_identity.contains_key(identity_hex.as_str()) {
227                continue;
228            }
229            if let Some(record) = lxmf_records.get(&lxmf_destination_hex) {
230                peers.push(PeerRecord {
231                    destination_hex: lxmf_destination_hex.clone(),
232                    identity_hex: Some(identity_hex),
233                    lxmf_destination_hex: Some(lxmf_destination_hex.clone()),
234                    display_name: record.display_name.clone(),
235                    app_data: None,
236                    state: if connected.contains(lxmf_destination_hex.as_str()) {
237                        PeerState::Connected
238                    } else {
239                        PeerState::Disconnected
240                    },
241                    last_seen_at_ms: record.received_at_ms,
242                    announce_last_seen_at_ms: None,
243                    lxmf_last_seen_at_ms: Some(record.received_at_ms),
244                });
245            }
246        }
247
248        peers.sort_by(|left, right| right.last_seen_at_ms.cmp(&left.last_seen_at_ms));
249        peers
250    }
251
252    pub fn peer_for_identity<'a, I>(
253        &self,
254        identity_hex: &str,
255        connected_destinations: I,
256    ) -> Option<PeerRecord>
257    where
258        I: IntoIterator<Item = &'a str>,
259    {
260        self.list_peers(connected_destinations)
261            .into_iter()
262            .find(|peer| peer.identity_hex.as_deref() == Some(identity_hex))
263    }
264
265    pub fn upsert_message(&mut self, message: MessageRecord) -> bool {
266        let is_new = !self.message_records.contains_key(message.message_id_hex.as_str());
267        self.message_records.insert(message.message_id_hex.clone(), message.clone());
268        if is_new {
269            self.message_order.push(message.message_id_hex);
270        }
271        is_new
272    }
273
274    pub fn get_message(&self, message_id_hex: &str) -> Option<MessageRecord> {
275        self.message_records.get(message_id_hex).cloned()
276    }
277
278    pub fn update_message(
279        &mut self,
280        message_id_hex: &str,
281        state: MessageState,
282        detail: Option<String>,
283        updated_at_ms: u64,
284    ) -> Option<MessageRecord> {
285        let record = self.message_records.get_mut(message_id_hex)?;
286        record.state = state;
287        record.detail = detail;
288        record.updated_at_ms = updated_at_ms;
289        Some(record.clone())
290    }
291
292    pub fn list_messages(&self, conversation_id: Option<&str>) -> Vec<MessageRecord> {
293        let mut out = Vec::<MessageRecord>::new();
294        for message_id_hex in &self.message_order {
295            let Some(record) = self.message_records.get(message_id_hex).cloned() else {
296                continue;
297            };
298            if conversation_id.is_some_and(|value| record.conversation_id != value) {
299                continue;
300            }
301            out.push(record);
302        }
303        out.sort_by(|left, right| {
304            let left_time = left.received_at_ms.or(left.sent_at_ms).unwrap_or(left.updated_at_ms);
305            let right_time =
306                right.received_at_ms.or(right.sent_at_ms).unwrap_or(right.updated_at_ms);
307            left_time.cmp(&right_time)
308        });
309        out
310    }
311
312    pub fn list_conversations<'a, I>(&self, connected_destinations: I) -> Vec<ConversationRecord>
313    where
314        I: IntoIterator<Item = &'a str>,
315    {
316        let peers = self.list_peers(connected_destinations);
317        let mut peer_map = HashMap::<String, PeerRecord>::new();
318        for peer in peers {
319            peer_map.insert(peer.destination_hex.clone(), peer.clone());
320            if let Some(lxmf_destination_hex) = peer.lxmf_destination_hex.clone() {
321                peer_map.insert(lxmf_destination_hex, peer);
322            }
323        }
324
325        let records = self.list_messages(None);
326        let mut by_conversation = HashMap::<String, ConversationRecord>::new();
327        for record in records {
328            let entry =
329                by_conversation.entry(record.conversation_id.clone()).or_insert_with(|| {
330                    ConversationRecord {
331                        conversation_id: record.conversation_id.clone(),
332                        peer_destination_hex: record.destination_hex.clone(),
333                        peer_display_name: peer_map
334                            .get(&record.destination_hex)
335                            .and_then(peer_display_name_for),
336                        last_message_preview: None,
337                        last_message_at_ms: 0,
338                        unread_count: 0,
339                        last_message_state: None,
340                    }
341                });
342
343            let event_time =
344                record.received_at_ms.or(record.sent_at_ms).unwrap_or(record.updated_at_ms);
345            if event_time >= entry.last_message_at_ms {
346                entry.peer_destination_hex = record.destination_hex.clone();
347                entry.peer_display_name =
348                    peer_map.get(&record.destination_hex).and_then(peer_display_name_for);
349                entry.last_message_preview = message_preview(record.body_utf8.as_str());
350                entry.last_message_at_ms = event_time;
351                entry.last_message_state = Some(record.state);
352            }
353            if matches!(record.direction, MessageDirection::Inbound) {
354                entry.unread_count = entry.unread_count.saturating_add(1);
355            }
356        }
357
358        let mut out = by_conversation.into_values().collect::<Vec<_>>();
359        out.sort_by(|left, right| right.last_message_at_ms.cmp(&left.last_message_at_ms));
360        out
361    }
362
363    pub fn store_outbound(&mut self, outbound: StoredOutboundMessage) {
364        self.outbound_messages.insert(outbound.message_id_hex.clone(), outbound);
365    }
366
367    pub fn outbound(&self, message_id_hex: &str) -> Option<StoredOutboundMessage> {
368        self.outbound_messages.get(message_id_hex).cloned()
369    }
370
371    pub fn set_active_propagation_node(&mut self, destination_hex: Option<String>) -> SyncStatus {
372        self.sync_status.active_propagation_node_hex = destination_hex;
373        self.sync_status.clone()
374    }
375
376    pub fn sync_status(&self) -> SyncStatus {
377        self.sync_status.clone()
378    }
379
380    pub fn update_sync_status<F>(&mut self, apply: F) -> SyncStatus
381    where
382        F: FnOnce(&mut SyncStatus),
383    {
384        apply(&mut self.sync_status);
385        self.sync_status.clone()
386    }
387}
388
389fn message_preview(body_utf8: &str) -> Option<String> {
390    let trimmed = body_utf8.trim();
391    if trimmed.is_empty() {
392        return None;
393    }
394    Some(trimmed.chars().take(80).collect())
395}
396
397fn peer_display_name_for(peer: &PeerRecord) -> Option<String> {
398    peer.display_name
399        .clone()
400        .or_else(|| peer.identity_hex.clone())
401        .or_else(|| Some(peer.destination_hex.clone()))
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407
408    #[test]
409    fn peer_projection_merges_app_and_lxmf_announces() {
410        let mut store = MessagingStore::default();
411        store.record_announce(AnnounceRecord {
412            destination_hex: "appdest".into(),
413            identity_hex: "identity".into(),
414            destination_kind: "app".into(),
415            app_data: "R3AKT".into(),
416            display_name: None,
417            hops: 1,
418            interface_hex: "iface".into(),
419            received_at_ms: 10,
420        });
421        store.record_announce(AnnounceRecord {
422            destination_hex: "lxmfdest".into(),
423            identity_hex: "identity".into(),
424            destination_kind: "lxmf_delivery".into(),
425            app_data: "chat".into(),
426            display_name: Some("Alice".into()),
427            hops: 1,
428            interface_hex: "iface".into(),
429            received_at_ms: 20,
430        });
431
432        let peers = store.list_peers(["lxmfdest"]);
433        assert_eq!(peers.len(), 1);
434        assert_eq!(peers[0].destination_hex, "appdest");
435        assert_eq!(peers[0].lxmf_destination_hex.as_deref(), Some("lxmfdest"));
436        assert_eq!(peers[0].display_name.as_deref(), Some("Alice"));
437        assert_eq!(peers[0].state, PeerState::Connected);
438    }
439
440    #[test]
441    fn conversation_projection_uses_lxmf_destination_for_peer_lookup() {
442        let mut store = MessagingStore::default();
443        store.record_announce(AnnounceRecord {
444            destination_hex: "appdest".into(),
445            identity_hex: "identity".into(),
446            destination_kind: "app".into(),
447            app_data: "R3AKT".into(),
448            display_name: None,
449            hops: 1,
450            interface_hex: "iface".into(),
451            received_at_ms: 10,
452        });
453        store.record_announce(AnnounceRecord {
454            destination_hex: "lxmfdest".into(),
455            identity_hex: "identity".into(),
456            destination_kind: "lxmf_delivery".into(),
457            app_data: "chat".into(),
458            display_name: Some("Alice".into()),
459            hops: 1,
460            interface_hex: "iface".into(),
461            received_at_ms: 20,
462        });
463        store.upsert_message(MessageRecord {
464            message_id_hex: "msg".into(),
465            conversation_id: "lxmfdest".into(),
466            direction: MessageDirection::Outbound,
467            destination_hex: "lxmfdest".into(),
468            source_hex: None,
469            title: None,
470            body_utf8: "hello".into(),
471            method: MessageMethod::Direct,
472            state: MessageState::Delivered,
473            detail: None,
474            sent_at_ms: Some(30),
475            received_at_ms: None,
476            updated_at_ms: 30,
477        });
478
479        let conversations = store.list_conversations(std::iter::empty::<&str>());
480        assert_eq!(conversations.len(), 1);
481        assert_eq!(conversations[0].peer_display_name.as_deref(), Some("Alice"));
482    }
483}