Skip to main content

pim_messaging/
service.rs

1//! Plugin-internal facade combining storage, events, and ports.
2//!
3//! The daemon's RPC layer (when the `messaging` feature is enabled)
4//! holds an `Arc<MessagingService>` and calls into it directly for the
5//! action API (`send`, `mark_*`, `delete_*`, history queries).
6//!
7//! The plugin's [`crate::plugin::MessagingPlugin`] also holds an
8//! `Arc<MessagingService>`; its `handle_payload` thunk dispatches into
9//! [`Self::handle_incoming_message`] /
10//! [`Self::handle_incoming_message_ack`] when a matching
11//! `ControlFrame::PluginPayload` arrives.
12
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::SystemTime;
16
17use anyhow::{anyhow, Context, Result};
18use bytes::Bytes;
19use pim_core::NodeId;
20use pim_crypto::{e2e_decrypt_in_place, e2e_encrypt};
21use pim_plugin::{ControlSender, IdentitySecrets, PeerDirectory};
22use pim_protocol::ControlFrame;
23use serde::{Deserialize, Serialize};
24use tokio::sync::broadcast;
25use tracing::{debug, warn};
26
27use crate::storage::{
28    AckKind, ConversationSummary, MessageDirection, MessageRecord, MessageStatus, MessagingStorage,
29};
30use crate::wire::{decode_ack, decode_message, encode_ack, encode_message, KIND_ACK, KIND_MESSAGE};
31use crate::{hex16, hex_node_id};
32
33/// Maximum plaintext body size accepted by [`MessagingService::send`].
34pub const MAX_BODY_BYTES: usize = 8 * 1024;
35
36/// Discriminator for `HistoryCleared` — `peer` (one conversation)
37/// vs. `all` (everything).
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40pub enum HistoryScope {
41    /// One conversation cleared; `peer_node_id` is set.
42    Peer,
43    /// All conversations cleared; `peer_node_id` is `None`.
44    All,
45}
46
47/// Event emitted by the messaging subsystem and forwarded over JSON-RPC
48/// as `messages.event` notifications.
49#[derive(Debug, Clone, Serialize, Deserialize)]
50#[serde(tag = "kind", rename_all = "snake_case")]
51pub enum MessageEvent {
52    /// New message arrived from a peer.
53    MessageReceived {
54        /// The persisted message.
55        message: Box<MessageRecord>,
56        /// Updated conversation summary (denormalized).
57        conversation: Box<ConversationSummary>,
58    },
59    /// Status of a previously-sent message changed (sent → delivered → read,
60    /// or any → failed).
61    MessageStatus {
62        /// Affected message id (UUIDv4 hex without dashes).
63        message_id: String,
64        /// Peer the original message was addressed to / received from.
65        peer_node_id: String,
66        /// New status.
67        new_status: MessageStatus,
68        /// Wall-clock when the transition happened.
69        at_ms: i64,
70    },
71    /// Message history was wiped — `scope: "peer"` carries
72    /// `peer_node_id`; `scope: "all"` clears everything.
73    HistoryCleared {
74        /// Affected peer (set only for `scope: "peer"`).
75        peer_node_id: Option<String>,
76        /// Whether one conversation or everything was wiped.
77        scope: HistoryScope,
78        /// Number of message rows the daemon actually deleted.
79        deleted_messages: i64,
80    },
81}
82
83/// Plugin-internal service handle.
84pub struct MessagingService {
85    storage: Arc<MessagingStorage>,
86    events_tx: broadcast::Sender<MessageEvent>,
87    peers: Arc<dyn PeerDirectory>,
88    control: Arc<dyn ControlSender>,
89    identity: Arc<dyn IdentitySecrets>,
90}
91
92impl MessagingService {
93    /// Open the messaging database and bind the daemon-supplied ports.
94    pub fn open(
95        db_path: PathBuf,
96        peers: Arc<dyn PeerDirectory>,
97        control: Arc<dyn ControlSender>,
98        identity: Arc<dyn IdentitySecrets>,
99    ) -> Result<Self> {
100        let storage = Arc::new(MessagingStorage::open(db_path)?);
101        let (events_tx, _rx) = broadcast::channel(256);
102        Ok(Self {
103            storage,
104            events_tx,
105            peers,
106            control,
107            identity,
108        })
109    }
110
111    /// Subscribe to the broadcast event stream. Each subscriber gets every
112    /// future event until they drop the receiver.
113    pub fn subscribe(&self) -> broadcast::Receiver<MessageEvent> {
114        self.events_tx.subscribe()
115    }
116
117    /// Borrow the underlying storage handle (read-mostly use cases like
118    /// `messages.history`).
119    pub fn storage(&self) -> &Arc<MessagingStorage> {
120        &self.storage
121    }
122
123    /// Snapshot of all conversations, sorted by most-recent activity.
124    /// Each summary's `name` and `x25519_pubkey` are filled in from the
125    /// peer directory.
126    pub async fn list_conversations(&self) -> Result<Vec<ConversationSummary>> {
127        let storage = self.storage.clone();
128        let mut rows = tokio::task::spawn_blocking(move || storage.list_conversations_raw())
129            .await
130            .context("storage join")??;
131        for row in rows.iter_mut() {
132            let peer = match parse_node_id(&row.peer_node_id) {
133                Some(p) => p,
134                None => continue,
135            };
136            if let Some(name) = self.peers.lookup_name(&peer).await {
137                if !name.is_empty() {
138                    row.name = name;
139                }
140            }
141            if let Some(x25519) = self.peers.lookup_x25519(&peer).await {
142                row.x25519_pubkey = Some(hex32(&x25519));
143            }
144        }
145        Ok(rows)
146    }
147
148    /// Persist a freshly-sent local message in `pending` status.
149    pub async fn record_local_send(
150        &self,
151        peer: NodeId,
152        message_id: [u8; 16],
153        body: String,
154        timestamp_ms: i64,
155    ) -> Result<MessageRecord> {
156        let storage = self.storage.clone();
157        let peer_id_hex = hex_node_id(&peer);
158        let message_id_hex = hex16(&message_id);
159
160        let record = MessageRecord {
161            id: message_id_hex.clone(),
162            peer_node_id: peer_id_hex.clone(),
163            direction: MessageDirection::Sent,
164            body,
165            timestamp_ms,
166            status: MessageStatus::Pending,
167            failure_reason: None,
168            delivered_at_ms: None,
169            read_at_ms: None,
170        };
171        let record_clone = record.clone();
172        tokio::task::spawn_blocking(move || -> Result<()> {
173            storage.insert_message(&record_clone)?;
174            storage.bump_conversation_after_local_send(
175                &peer_id_hex,
176                &message_id_hex,
177                timestamp_ms,
178                &record_clone.body,
179            )?;
180            Ok(())
181        })
182        .await??;
183
184        Ok(record)
185    }
186
187    /// Move an outbound message from `pending` to `sent`.
188    pub async fn mark_sent(&self, peer: NodeId, message_id: [u8; 16], at_ms: i64) -> Result<()> {
189        self.set_status(peer, message_id, MessageStatus::Sent, None, None, at_ms)
190            .await
191    }
192
193    /// Apply a `delivered` ack from the peer.
194    pub async fn mark_delivered(
195        &self,
196        peer: NodeId,
197        message_id: [u8; 16],
198        at_ms: i64,
199    ) -> Result<()> {
200        self.set_status(
201            peer,
202            message_id,
203            MessageStatus::Delivered,
204            Some(at_ms),
205            None,
206            at_ms,
207        )
208        .await
209    }
210
211    /// Apply a `read` ack from the peer (if/when supported by the UI).
212    pub async fn mark_read(&self, peer: NodeId, message_id: [u8; 16], at_ms: i64) -> Result<()> {
213        self.set_status(
214            peer,
215            message_id,
216            MessageStatus::Read,
217            None,
218            Some(at_ms),
219            at_ms,
220        )
221        .await
222    }
223
224    /// Mark a previously-pending outbound message as failed.
225    pub async fn mark_failed(
226        &self,
227        peer: NodeId,
228        message_id: [u8; 16],
229        reason: String,
230        at_ms: i64,
231    ) -> Result<()> {
232        let storage = self.storage.clone();
233        let id_hex = hex16(&message_id);
234        let peer_hex = hex_node_id(&peer);
235        let storage_id = id_hex.clone();
236        let storage_reason = reason.clone();
237        tokio::task::spawn_blocking(move || -> Result<()> {
238            storage.set_message_failed(&storage_id, &storage_reason, at_ms)
239        })
240        .await??;
241
242        let _ = self.events_tx.send(MessageEvent::MessageStatus {
243            message_id: id_hex,
244            peer_node_id: peer_hex,
245            new_status: MessageStatus::Failed,
246            at_ms,
247        });
248        Ok(())
249    }
250
251    async fn set_status(
252        &self,
253        peer: NodeId,
254        message_id: [u8; 16],
255        status: MessageStatus,
256        delivered_at_ms: Option<i64>,
257        read_at_ms: Option<i64>,
258        at_ms: i64,
259    ) -> Result<()> {
260        let storage = self.storage.clone();
261        let id_hex = hex16(&message_id);
262        let peer_hex = hex_node_id(&peer);
263        let storage_id = id_hex.clone();
264        tokio::task::spawn_blocking(move || -> Result<()> {
265            storage.set_message_status(&storage_id, status, delivered_at_ms, read_at_ms)
266        })
267        .await??;
268
269        let _ = self.events_tx.send(MessageEvent::MessageStatus {
270            message_id: id_hex,
271            peer_node_id: peer_hex,
272            new_status: status,
273            at_ms,
274        });
275        Ok(())
276    }
277
278    /// Atomic per-peer wipe — see [`MessagingStorage::delete_conversation`].
279    pub async fn delete_conversation(&self, peer: NodeId) -> Result<(usize, bool)> {
280        let storage = self.storage.clone();
281        let peer_hex = hex_node_id(&peer);
282        let peer_hex_for_storage = peer_hex.clone();
283        let outcome = tokio::task::spawn_blocking(move || -> Result<(usize, bool)> {
284            storage.delete_conversation(&peer_hex_for_storage)
285        })
286        .await??;
287        let _ = self.events_tx.send(MessageEvent::HistoryCleared {
288            peer_node_id: Some(peer_hex),
289            scope: HistoryScope::Peer,
290            deleted_messages: outcome.0 as i64,
291        });
292        Ok(outcome)
293    }
294
295    /// Atomic global wipe — see [`MessagingStorage::delete_all_messages`].
296    pub async fn delete_all_messages(&self) -> Result<(usize, usize)> {
297        let storage = self.storage.clone();
298        let outcome = tokio::task::spawn_blocking(move || -> Result<(usize, usize)> {
299            storage.delete_all_messages()
300        })
301        .await??;
302        let _ = self.events_tx.send(MessageEvent::HistoryCleared {
303            peer_node_id: None,
304            scope: HistoryScope::All,
305            deleted_messages: outcome.0 as i64,
306        });
307        Ok(outcome)
308    }
309
310    /// Encrypt + send a user message to `peer`. Returns the persisted record
311    /// (with status `pending` initially, transitioning to `sent` once the
312    /// transport accepts it). Bumps to `failed` if no route or no x25519 is
313    /// known yet.
314    pub async fn send(&self, peer: NodeId, body: String) -> Result<MessageRecord> {
315        if body.len() > MAX_BODY_BYTES {
316            return Err(anyhow!("message body exceeds {MAX_BODY_BYTES} bytes"));
317        }
318
319        let recipient_x25519 = match self.peers.lookup_x25519(&peer).await {
320            Some(k) => k,
321            None => {
322                return Err(anyhow!(
323                    "no x25519 public key cached for {peer}; wait until peer comes online and re-issues PeerInfo"
324                ))
325            }
326        };
327
328        let mut id_bytes = [0u8; 16];
329        id_bytes.copy_from_slice(uuid::Uuid::new_v4().as_bytes());
330        let timestamp_ms = now_ms();
331
332        let record = self
333            .record_local_send(peer, id_bytes, body.clone(), timestamp_ms)
334            .await?;
335
336        let ciphertext = e2e_encrypt(body.as_bytes(), &recipient_x25519)
337            .map_err(|e| anyhow!("e2e_encrypt failed: {e}"))?;
338
339        let body_bytes = encode_message(id_bytes, timestamp_ms as u64, &ciphertext);
340        let frame = ControlFrame::PluginPayload {
341            kind: KIND_MESSAGE.into(),
342            body: body_bytes,
343        };
344
345        let sent = self.control.send_routed(peer, frame).await;
346        if sent {
347            let _ = self.mark_sent(peer, id_bytes, now_ms()).await;
348        } else {
349            let _ = self
350                .mark_failed(peer, id_bytes, "no_route".into(), now_ms())
351                .await;
352        }
353
354        Ok(record)
355    }
356
357    /// Decrypt + persist an incoming `messaging.msg` and ack it.
358    pub async fn handle_incoming_message(&self, src: NodeId, body: Bytes) {
359        let decoded = match decode_message(&body) {
360            Ok(d) => d,
361            Err(e) => {
362                warn!(%src, "messaging: malformed messaging.msg: {e}");
363                return;
364            }
365        };
366
367        let identity_seed = self.identity.signing_seed();
368        let mut buffer = decoded.ciphertext;
369        if let Err(e) = e2e_decrypt_in_place(&mut buffer, &identity_seed) {
370            warn!(%src, "messaging: ECIES decrypt failed: {e}");
371            return;
372        }
373        let plaintext = match String::from_utf8(buffer) {
374            Ok(s) => s,
375            Err(_) => {
376                warn!(%src, "messaging: payload not valid UTF-8");
377                return;
378            }
379        };
380
381        let received_at = now_ms();
382        let cached_name = self.peers.lookup_name(&src).await;
383        let cached_x25519 = self.peers.lookup_x25519(&src).await;
384
385        let storage = self.storage.clone();
386        let peer_id_hex = hex_node_id(&src);
387        let message_id_hex = hex16(&decoded.message_id);
388        let timestamp_ms = decoded.timestamp_ms as i64;
389
390        let record = MessageRecord {
391            id: message_id_hex.clone(),
392            peer_node_id: peer_id_hex.clone(),
393            direction: MessageDirection::Received,
394            body: plaintext.clone(),
395            timestamp_ms,
396            status: MessageStatus::Delivered,
397            failure_reason: None,
398            delivered_at_ms: Some(received_at),
399            read_at_ms: None,
400        };
401        let record_clone = record.clone();
402        let preview_source = plaintext;
403
404        let bump_result =
405            tokio::task::spawn_blocking(move || -> Result<crate::storage::ConversationBump> {
406                storage.insert_message(&record_clone)?;
407                storage.bump_conversation_after_remote_receive(
408                    &peer_id_hex,
409                    &message_id_hex,
410                    timestamp_ms,
411                    &preview_source,
412                )
413            })
414            .await;
415
416        let bump = match bump_result {
417            Ok(Ok(b)) => b,
418            Ok(Err(e)) => {
419                warn!(%src, "messaging: storage write failed: {e}");
420                return;
421            }
422            Err(e) => {
423                warn!(%src, "messaging: storage join failed: {e}");
424                return;
425            }
426        };
427
428        let peer_hex = hex_node_id(&src);
429        let short = crate::storage::short_id(&peer_hex);
430        let conversation = ConversationSummary {
431            peer_node_id: peer_hex,
432            peer_node_id_short: short.clone(),
433            name: cached_name
434                .filter(|n| !n.is_empty())
435                .unwrap_or_else(|| short.clone()),
436            last_message_preview: Some(bump.preview),
437            last_message_ts_ms: Some(timestamp_ms),
438            unread_count: bump.unread_count,
439            x25519_pubkey: cached_x25519.map(|k| hex32(&k)),
440        };
441
442        let _ = self.events_tx.send(MessageEvent::MessageReceived {
443            message: Box::new(record),
444            conversation: Box::new(conversation),
445        });
446
447        // Best-effort delivery ack via PluginPayload.
448        let ack = ControlFrame::PluginPayload {
449            kind: KIND_ACK.into(),
450            body: encode_ack(decoded.message_id, AckKind::Delivered as u8),
451        };
452        let _ = self.control.send_routed(src, ack).await;
453        debug!(%src, "messaging: stored received + acked delivered");
454    }
455
456    /// Apply an inbound `messaging.ack` to the local outbound row.
457    pub async fn handle_incoming_message_ack(&self, src: NodeId, body: Bytes) {
458        let decoded = match decode_ack(&body) {
459            Ok(d) => d,
460            Err(e) => {
461                warn!(%src, "messaging: malformed messaging.ack: {e}");
462                return;
463            }
464        };
465        let kind = match AckKind::from_u8(decoded.ack_kind) {
466            Some(k) => k,
467            None => {
468                warn!(%src, "messaging: ignoring ack with unknown kind {}", decoded.ack_kind);
469                return;
470            }
471        };
472        let now = now_ms();
473        match kind {
474            AckKind::Delivered => {
475                let _ = self.mark_delivered(src, decoded.message_id, now).await;
476            }
477            AckKind::Read => {
478                let _ = self.mark_read(src, decoded.message_id, now).await;
479            }
480        }
481    }
482
483    /// Plugin lifecycle: daemon dropped this peer's identity. Wipe any
484    /// per-peer state owned by messaging and emit `HistoryCleared` so
485    /// live UIs flush their per-peer buffers.
486    pub async fn on_peer_forgotten(&self, peer: NodeId) {
487        let storage = self.storage.clone();
488        let peer_hex = hex_node_id(&peer);
489        let peer_hex_for_storage = peer_hex.clone();
490        let outcome = tokio::task::spawn_blocking(move || -> Result<(usize, bool)> {
491            storage.delete_conversation(&peer_hex_for_storage)
492        })
493        .await;
494
495        let outcome = match outcome {
496            Ok(Ok(o)) => o,
497            Ok(Err(e)) => {
498                warn!(%peer, "messaging: on_peer_forgotten storage failed: {e}");
499                return;
500            }
501            Err(e) => {
502                warn!(%peer, "messaging: on_peer_forgotten join failed: {e}");
503                return;
504            }
505        };
506
507        if outcome.0 > 0 || outcome.1 {
508            let _ = self.events_tx.send(MessageEvent::HistoryCleared {
509                peer_node_id: Some(peer_hex),
510                scope: HistoryScope::Peer,
511                deleted_messages: outcome.0 as i64,
512            });
513        }
514    }
515}
516
517/// Wall-clock now in milliseconds since the Unix epoch (i64 to match the
518/// SQLite column type).
519pub fn now_ms() -> i64 {
520    SystemTime::now()
521        .duration_since(SystemTime::UNIX_EPOCH)
522        .map(|d| d.as_millis() as i64)
523        .unwrap_or(0)
524}
525
526fn hex32(bytes: &[u8; 32]) -> String {
527    let mut out = String::with_capacity(64);
528    for b in bytes {
529        out.push_str(&format!("{b:02x}"));
530    }
531    out
532}
533
534fn parse_node_id(hex: &str) -> Option<NodeId> {
535    if hex.len() != 32 {
536        return None;
537    }
538    let mut bytes = [0u8; 16];
539    for i in 0..16 {
540        let pair = &hex[i * 2..i * 2 + 2];
541        bytes[i] = u8::from_str_radix(pair, 16).ok()?;
542    }
543    Some(NodeId::from_bytes(bytes))
544}