Skip to main content

pim_messaging/
storage.rs

1//! SQLite-backed persistence for the messaging plugin.
2//!
3//! All blocking calls are funneled through `tokio::task::spawn_blocking`
4//! by [`crate::service::MessagingService`], so handlers never call into
5//! rusqlite from an async context directly.
6
7use std::path::PathBuf;
8use std::sync::Mutex;
9
10use anyhow::{anyhow, Context, Result};
11use pim_core::NodeId;
12use rusqlite::{params, Connection};
13use serde::{Deserialize, Serialize};
14
15use crate::hex_node_id;
16
17const SCHEMA: &str = include_str!("schema.sql");
18
19/// Direction of a stored message relative to the local node.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "lowercase")]
22pub enum MessageDirection {
23    /// Outbound message originated locally.
24    Sent,
25    /// Inbound message received from a peer.
26    Received,
27}
28
29impl MessageDirection {
30    fn as_str(&self) -> &'static str {
31        match self {
32            MessageDirection::Sent => "sent",
33            MessageDirection::Received => "received",
34        }
35    }
36
37    fn from_str(s: &str) -> Result<Self> {
38        match s {
39            "sent" => Ok(Self::Sent),
40            "received" => Ok(Self::Received),
41            other => Err(anyhow!("unknown direction: {other}")),
42        }
43    }
44}
45
46/// Lifecycle of an outbound message, from local persistence to peer
47/// acknowledgement. Inbound messages start at `Delivered`.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "lowercase")]
50pub enum MessageStatus {
51    /// Persisted locally; not yet handed to the transport layer.
52    Pending,
53    /// Sent on the wire; awaiting recipient ack.
54    Sent,
55    /// Recipient confirmed receipt.
56    Delivered,
57    /// Recipient marked the message as read.
58    Read,
59    /// Send / delivery failed permanently.
60    Failed,
61}
62
63impl MessageStatus {
64    fn as_str(&self) -> &'static str {
65        match self {
66            MessageStatus::Pending => "pending",
67            MessageStatus::Sent => "sent",
68            MessageStatus::Delivered => "delivered",
69            MessageStatus::Read => "read",
70            MessageStatus::Failed => "failed",
71        }
72    }
73
74    fn from_str(s: &str) -> Result<Self> {
75        match s {
76            "pending" => Ok(Self::Pending),
77            "sent" => Ok(Self::Sent),
78            "delivered" => Ok(Self::Delivered),
79            "read" => Ok(Self::Read),
80            "failed" => Ok(Self::Failed),
81            other => Err(anyhow!("unknown status: {other}")),
82        }
83    }
84}
85
86/// Acknowledgement category transmitted on the wire (`messaging.ack` payload).
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum AckKind {
89    /// Recipient persisted the message — equivalent to `MessageStatus::Delivered`.
90    Delivered = 1,
91    /// Recipient marked the message read in their UI.
92    Read = 2,
93}
94
95impl AckKind {
96    /// Decode the wire-level ack tag. Returns `None` for unknown values.
97    pub fn from_u8(v: u8) -> Option<Self> {
98        match v {
99            1 => Some(Self::Delivered),
100            2 => Some(Self::Read),
101            _ => None,
102        }
103    }
104}
105
106/// One stored message — sent or received. Field ordering matches the JSON
107/// shape exposed via JSON-RPC.
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct MessageRecord {
110    /// 32-char lowercase hex (UUIDv4 bytes).
111    pub id: String,
112    /// Peer node id (32-char lowercase hex).
113    pub peer_node_id: String,
114    /// `sent` | `received`.
115    pub direction: MessageDirection,
116    /// Plaintext UTF-8 body.
117    pub body: String,
118    /// Wall-clock at sender (`timestamp_ms` from the wire frame for
119    /// `received`, local clock for `sent`).
120    pub timestamp_ms: i64,
121    /// Lifecycle status.
122    pub status: MessageStatus,
123    /// Last failure reason (only set when `status = Failed`).
124    pub failure_reason: Option<String>,
125    /// Wall-clock when delivery ack was applied.
126    pub delivered_at_ms: Option<i64>,
127    /// Wall-clock when read ack was applied.
128    pub read_at_ms: Option<i64>,
129}
130
131/// Denormalized "conversation" row used to avoid full-table aggregations
132/// on the conversations list query. The `name` and `x25519_pubkey` fields
133/// are populated by the service layer from the daemon's peer directory.
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct ConversationSummary {
136    /// Peer node id (32-char lowercase hex).
137    pub peer_node_id: String,
138    /// 8-char prefix for compact UI surfacing (e.g. sidebar lines).
139    pub peer_node_id_short: String,
140    /// Latest friendly name we have for the peer; falls back to short id.
141    pub name: String,
142    /// First few characters of the most recent message body (≤ 80 chars).
143    pub last_message_preview: Option<String>,
144    /// Timestamp of the most recent message (sent or received).
145    pub last_message_ts_ms: Option<i64>,
146    /// Number of received messages since `mark_read`.
147    pub unread_count: i64,
148    /// Cached X25519 static public key (64-char lowercase hex), if known.
149    pub x25519_pubkey: Option<String>,
150}
151
152/// Wraps a single SQLite connection guarded by a `std::sync::Mutex`. The
153/// daemon serializes writes through the mutex, which is acceptable for the
154/// expected message volume (interactive chat).
155pub struct MessagingStorage {
156    conn: Mutex<Connection>,
157}
158
159impl MessagingStorage {
160    /// Open or create the database file and apply the canonical schema.
161    pub fn open(path: PathBuf) -> Result<Self> {
162        if let Some(parent) = path.parent() {
163            std::fs::create_dir_all(parent)
164                .with_context(|| format!("create messages dir {}", parent.display()))?;
165        }
166
167        let conn = Connection::open(&path)
168            .with_context(|| format!("open messages db at {}", path.display()))?;
169
170        #[cfg(unix)]
171        {
172            use std::os::unix::fs::PermissionsExt;
173            if let Ok(meta) = std::fs::metadata(&path) {
174                let mut perms = meta.permissions();
175                perms.set_mode(0o600);
176                let _ = std::fs::set_permissions(&path, perms);
177            }
178        }
179
180        conn.execute_batch(SCHEMA)
181            .context("apply messages schema")?;
182
183        Ok(Self {
184            conn: Mutex::new(conn),
185        })
186    }
187
188    /// Persist a brand-new message row. Caller is responsible for choosing
189    /// the appropriate initial `status` (`Pending` for outbound, `Delivered`
190    /// for inbound).
191    pub fn insert_message(&self, m: &MessageRecord) -> Result<()> {
192        let conn = self.conn.lock().unwrap();
193        conn.execute(
194            "INSERT OR REPLACE INTO messages \
195             (id, peer_node_id, direction, body, timestamp_ms, status, failure_reason, delivered_at_ms, read_at_ms) \
196             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
197            params![
198                m.id,
199                m.peer_node_id,
200                m.direction.as_str(),
201                m.body,
202                m.timestamp_ms,
203                m.status.as_str(),
204                m.failure_reason,
205                m.delivered_at_ms,
206                m.read_at_ms,
207            ],
208        )?;
209        Ok(())
210    }
211
212    /// Refresh the `conversations_meta` row after a local outbound send.
213    pub fn bump_conversation_after_local_send(
214        &self,
215        peer_id_hex: &str,
216        message_id_hex: &str,
217        ts_ms: i64,
218        body: &str,
219    ) -> Result<()> {
220        let preview = preview_of(body);
221        let conn = self.conn.lock().unwrap();
222        conn.execute(
223            "INSERT INTO conversations_meta (peer_node_id, unread_count, last_read_message_id, \
224                last_message_id, last_message_preview, last_message_ts_ms) \
225             VALUES (?1, 0, NULL, ?2, ?3, ?4) \
226             ON CONFLICT(peer_node_id) DO UPDATE SET \
227                last_message_id = excluded.last_message_id, \
228                last_message_preview = excluded.last_message_preview, \
229                last_message_ts_ms = excluded.last_message_ts_ms",
230            params![peer_id_hex, message_id_hex, preview, ts_ms],
231        )?;
232        Ok(())
233    }
234
235    /// Refresh the `conversations_meta` row after an inbound message and
236    /// return the resulting partial summary (the service layer fills in
237    /// `name` and `x25519_pubkey` from the peer directory).
238    pub fn bump_conversation_after_remote_receive(
239        &self,
240        peer_id_hex: &str,
241        message_id_hex: &str,
242        ts_ms: i64,
243        body: &str,
244    ) -> Result<ConversationBump> {
245        let preview = preview_of(body);
246        let conn = self.conn.lock().unwrap();
247
248        conn.execute(
249            "INSERT INTO conversations_meta (peer_node_id, unread_count, last_message_id, \
250                last_message_preview, last_message_ts_ms) \
251             VALUES (?1, 1, ?2, ?3, ?4) \
252             ON CONFLICT(peer_node_id) DO UPDATE SET \
253                unread_count = unread_count + 1, \
254                last_message_id = excluded.last_message_id, \
255                last_message_preview = excluded.last_message_preview, \
256                last_message_ts_ms = excluded.last_message_ts_ms",
257            params![peer_id_hex, message_id_hex, preview, ts_ms],
258        )?;
259
260        let unread_count: i64 = conn.query_row(
261            "SELECT unread_count FROM conversations_meta WHERE peer_node_id = ?1",
262            params![peer_id_hex],
263            |row| row.get::<_, i64>(0),
264        )?;
265
266        Ok(ConversationBump {
267            preview,
268            unread_count,
269        })
270    }
271
272    /// Update the lifecycle status of an existing outbound message and the
273    /// optional delivered/read timestamps.
274    pub fn set_message_status(
275        &self,
276        message_id_hex: &str,
277        status: MessageStatus,
278        delivered_at_ms: Option<i64>,
279        read_at_ms: Option<i64>,
280    ) -> Result<()> {
281        let conn = self.conn.lock().unwrap();
282        match (delivered_at_ms, read_at_ms) {
283            (Some(d), Some(r)) => {
284                conn.execute(
285                    "UPDATE messages SET status = ?1, delivered_at_ms = ?2, read_at_ms = ?3 WHERE id = ?4",
286                    params![status.as_str(), d, r, message_id_hex],
287                )?;
288            }
289            (Some(d), None) => {
290                conn.execute(
291                    "UPDATE messages SET status = ?1, delivered_at_ms = COALESCE(delivered_at_ms, ?2) WHERE id = ?3",
292                    params![status.as_str(), d, message_id_hex],
293                )?;
294            }
295            (None, Some(r)) => {
296                conn.execute(
297                    "UPDATE messages SET status = ?1, read_at_ms = COALESCE(read_at_ms, ?2) WHERE id = ?3",
298                    params![status.as_str(), r, message_id_hex],
299                )?;
300            }
301            (None, None) => {
302                conn.execute(
303                    "UPDATE messages SET status = ?1 WHERE id = ?2",
304                    params![status.as_str(), message_id_hex],
305                )?;
306            }
307        }
308        Ok(())
309    }
310
311    /// Set `status = failed` and persist a human-readable reason.
312    pub fn set_message_failed(&self, message_id_hex: &str, reason: &str, at_ms: i64) -> Result<()> {
313        let conn = self.conn.lock().unwrap();
314        conn.execute(
315            "UPDATE messages SET status = 'failed', failure_reason = ?1, delivered_at_ms = COALESCE(delivered_at_ms, ?2) WHERE id = ?3",
316            params![reason, at_ms, message_id_hex],
317        )?;
318        Ok(())
319    }
320
321    /// Page through a peer's history newest-first.
322    pub fn history(
323        &self,
324        peer_id_hex: &str,
325        before_ts_ms: Option<i64>,
326        limit: i64,
327    ) -> Result<(Vec<MessageRecord>, bool)> {
328        let conn = self.conn.lock().unwrap();
329        let limit_plus = limit.saturating_add(1).max(2);
330
331        let mut stmt = match before_ts_ms {
332            Some(_) => conn.prepare(
333                "SELECT id, peer_node_id, direction, body, timestamp_ms, status, failure_reason, delivered_at_ms, read_at_ms \
334                 FROM messages \
335                 WHERE peer_node_id = ?1 AND timestamp_ms < ?2 \
336                 ORDER BY timestamp_ms DESC, id DESC \
337                 LIMIT ?3",
338            )?,
339            None => conn.prepare(
340                "SELECT id, peer_node_id, direction, body, timestamp_ms, status, failure_reason, delivered_at_ms, read_at_ms \
341                 FROM messages \
342                 WHERE peer_node_id = ?1 \
343                 ORDER BY timestamp_ms DESC, id DESC \
344                 LIMIT ?2",
345            )?,
346        };
347
348        let rows = match before_ts_ms {
349            Some(ts) => stmt.query_map(params![peer_id_hex, ts, limit_plus], parse_row)?,
350            None => stmt.query_map(params![peer_id_hex, limit_plus], parse_row)?,
351        };
352
353        let mut out: Vec<MessageRecord> = rows.collect::<Result<Vec<_>, _>>()?;
354        let has_more = out.len() as i64 > limit;
355        if has_more {
356            out.truncate(limit as usize);
357        }
358        Ok((out, has_more))
359    }
360
361    /// Snapshot of all conversations, sorted by most-recent activity.
362    /// `name` and `x25519_pubkey` are left blank/None — the service
363    /// layer fills them in from the daemon's peer directory.
364    pub fn list_conversations_raw(&self) -> Result<Vec<ConversationSummary>> {
365        let conn = self.conn.lock().unwrap();
366        let mut stmt = conn.prepare(
367            "SELECT peer_node_id, last_message_preview, last_message_ts_ms, unread_count \
368             FROM conversations_meta \
369             ORDER BY last_message_ts_ms DESC NULLS LAST",
370        )?;
371
372        let rows = stmt
373            .query_map([], |row| {
374                let peer_node_id: String = row.get(0)?;
375                let preview: Option<String> = row.get(1)?;
376                let ts: Option<i64> = row.get(2)?;
377                let unread: i64 = row.get(3)?;
378                let short = short_id(&peer_node_id);
379                Ok(ConversationSummary {
380                    peer_node_id_short: short.clone(),
381                    name: short,
382                    peer_node_id,
383                    last_message_preview: preview,
384                    last_message_ts_ms: ts,
385                    unread_count: unread,
386                    x25519_pubkey: None,
387                })
388            })?
389            .collect::<Result<Vec<_>, _>>()?;
390
391        Ok(rows)
392    }
393
394    /// Atomic per-peer wipe of `messages` and `conversations_meta`.
395    /// Returns `(deleted_messages, deleted_conversation)` so the caller
396    /// can report exact counts to the RPC client.
397    pub fn delete_conversation(&self, peer_id_hex: &str) -> Result<(usize, bool)> {
398        let mut conn = self.conn.lock().unwrap();
399        let tx = conn.transaction()?;
400        let deleted_messages = tx.execute(
401            "DELETE FROM messages WHERE peer_node_id = ?1",
402            params![peer_id_hex],
403        )?;
404        let deleted_meta = tx.execute(
405            "DELETE FROM conversations_meta WHERE peer_node_id = ?1",
406            params![peer_id_hex],
407        )?;
408        tx.commit()?;
409        Ok((deleted_messages, deleted_meta > 0))
410    }
411
412    /// Atomic global wipe of every message + conversation row.
413    pub fn delete_all_messages(&self) -> Result<(usize, usize)> {
414        let mut conn = self.conn.lock().unwrap();
415        let tx = conn.transaction()?;
416        let deleted_messages = tx.execute("DELETE FROM messages", [])?;
417        let deleted_meta = tx.execute("DELETE FROM conversations_meta", [])?;
418        tx.commit()?;
419        Ok((deleted_messages, deleted_meta))
420    }
421
422    /// Mark every message at-or-before `up_to_ts_ms` as read for the peer.
423    /// Returns the new `unread_count` (always 0).
424    pub fn mark_read_up_to(&self, peer_id_hex: &str, up_to_ts_ms: i64) -> Result<i64> {
425        let conn = self.conn.lock().unwrap();
426        conn.execute(
427            "UPDATE messages SET status = 'read', read_at_ms = COALESCE(read_at_ms, ?2) \
428             WHERE peer_node_id = ?1 AND direction = 'received' AND timestamp_ms <= ?2 AND status != 'read'",
429            params![peer_id_hex, up_to_ts_ms],
430        )?;
431        conn.execute(
432            "UPDATE conversations_meta SET unread_count = 0 WHERE peer_node_id = ?1",
433            params![peer_id_hex],
434        )?;
435        Ok(0)
436    }
437
438    /// Convenience wrapper used by [`crate::service::MessagingService`]
439    /// when handling an `on_peer_forgotten` lifecycle event. Equivalent
440    /// to [`Self::delete_conversation`] but keyed by `NodeId` instead of
441    /// the hex projection.
442    pub fn delete_conversation_for_peer(&self, peer: &NodeId) -> Result<(usize, bool)> {
443        let hex = hex_node_id(peer);
444        self.delete_conversation(&hex)
445    }
446}
447
448/// Side-effect summary returned by
449/// [`MessagingStorage::bump_conversation_after_remote_receive`]. The
450/// service layer combines this with the peer-directory lookup to build
451/// the full [`ConversationSummary`] published on the event channel.
452#[derive(Debug, Clone)]
453pub struct ConversationBump {
454    /// Truncated preview as stored in `conversations_meta.last_message_preview`.
455    pub preview: String,
456    /// Updated `unread_count` for the conversation.
457    pub unread_count: i64,
458}
459
460fn parse_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<MessageRecord> {
461    let direction_str: String = row.get(2)?;
462    let status_str: String = row.get(5)?;
463
464    Ok(MessageRecord {
465        id: row.get(0)?,
466        peer_node_id: row.get(1)?,
467        direction: MessageDirection::from_str(&direction_str)
468            .map_err(|e| rusqlite::Error::InvalidColumnName(e.to_string()))?,
469        body: row.get(3)?,
470        timestamp_ms: row.get(4)?,
471        status: MessageStatus::from_str(&status_str)
472            .map_err(|e| rusqlite::Error::InvalidColumnName(e.to_string()))?,
473        failure_reason: row.get(6)?,
474        delivered_at_ms: row.get(7)?,
475        read_at_ms: row.get(8)?,
476    })
477}
478
479fn preview_of(body: &str) -> String {
480    let truncated: String = body.chars().take(80).collect();
481    truncated.replace('\n', " ")
482}
483
484pub(crate) fn short_id(hex: &str) -> String {
485    hex.chars().take(8).collect()
486}