1use std::path::PathBuf;
8use std::sync::Mutex;
9
10use anyhow::{anyhow, Context, Result};
11use pim_core::NodeId;
12use rusqlite::{params, Connection};
13use serde::{Deserialize, Serialize};
14
15use crate::hex_node_id;
16
17const SCHEMA: &str = include_str!("schema.sql");
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "lowercase")]
22pub enum MessageDirection {
23 Sent,
25 Received,
27}
28
29impl MessageDirection {
30 fn as_str(&self) -> &'static str {
31 match self {
32 MessageDirection::Sent => "sent",
33 MessageDirection::Received => "received",
34 }
35 }
36
37 fn from_str(s: &str) -> Result<Self> {
38 match s {
39 "sent" => Ok(Self::Sent),
40 "received" => Ok(Self::Received),
41 other => Err(anyhow!("unknown direction: {other}")),
42 }
43 }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "lowercase")]
50pub enum MessageStatus {
51 Pending,
53 Sent,
55 Delivered,
57 Read,
59 Failed,
61}
62
63impl MessageStatus {
64 fn as_str(&self) -> &'static str {
65 match self {
66 MessageStatus::Pending => "pending",
67 MessageStatus::Sent => "sent",
68 MessageStatus::Delivered => "delivered",
69 MessageStatus::Read => "read",
70 MessageStatus::Failed => "failed",
71 }
72 }
73
74 fn from_str(s: &str) -> Result<Self> {
75 match s {
76 "pending" => Ok(Self::Pending),
77 "sent" => Ok(Self::Sent),
78 "delivered" => Ok(Self::Delivered),
79 "read" => Ok(Self::Read),
80 "failed" => Ok(Self::Failed),
81 other => Err(anyhow!("unknown status: {other}")),
82 }
83 }
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum AckKind {
89 Delivered = 1,
91 Read = 2,
93}
94
95impl AckKind {
96 pub fn from_u8(v: u8) -> Option<Self> {
98 match v {
99 1 => Some(Self::Delivered),
100 2 => Some(Self::Read),
101 _ => None,
102 }
103 }
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct MessageRecord {
110 pub id: String,
112 pub peer_node_id: String,
114 pub direction: MessageDirection,
116 pub body: String,
118 pub timestamp_ms: i64,
121 pub status: MessageStatus,
123 pub failure_reason: Option<String>,
125 pub delivered_at_ms: Option<i64>,
127 pub read_at_ms: Option<i64>,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct ConversationSummary {
136 pub peer_node_id: String,
138 pub peer_node_id_short: String,
140 pub name: String,
142 pub last_message_preview: Option<String>,
144 pub last_message_ts_ms: Option<i64>,
146 pub unread_count: i64,
148 pub x25519_pubkey: Option<String>,
150}
151
152pub struct MessagingStorage {
156 conn: Mutex<Connection>,
157}
158
159impl MessagingStorage {
160 pub fn open(path: PathBuf) -> Result<Self> {
162 if let Some(parent) = path.parent() {
163 std::fs::create_dir_all(parent)
164 .with_context(|| format!("create messages dir {}", parent.display()))?;
165 }
166
167 let conn = Connection::open(&path)
168 .with_context(|| format!("open messages db at {}", path.display()))?;
169
170 #[cfg(unix)]
171 {
172 use std::os::unix::fs::PermissionsExt;
173 if let Ok(meta) = std::fs::metadata(&path) {
174 let mut perms = meta.permissions();
175 perms.set_mode(0o600);
176 let _ = std::fs::set_permissions(&path, perms);
177 }
178 }
179
180 conn.execute_batch(SCHEMA)
181 .context("apply messages schema")?;
182
183 Ok(Self {
184 conn: Mutex::new(conn),
185 })
186 }
187
188 pub fn insert_message(&self, m: &MessageRecord) -> Result<()> {
192 let conn = self.conn.lock().unwrap();
193 conn.execute(
194 "INSERT OR REPLACE INTO messages \
195 (id, peer_node_id, direction, body, timestamp_ms, status, failure_reason, delivered_at_ms, read_at_ms) \
196 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
197 params![
198 m.id,
199 m.peer_node_id,
200 m.direction.as_str(),
201 m.body,
202 m.timestamp_ms,
203 m.status.as_str(),
204 m.failure_reason,
205 m.delivered_at_ms,
206 m.read_at_ms,
207 ],
208 )?;
209 Ok(())
210 }
211
212 pub fn bump_conversation_after_local_send(
214 &self,
215 peer_id_hex: &str,
216 message_id_hex: &str,
217 ts_ms: i64,
218 body: &str,
219 ) -> Result<()> {
220 let preview = preview_of(body);
221 let conn = self.conn.lock().unwrap();
222 conn.execute(
223 "INSERT INTO conversations_meta (peer_node_id, unread_count, last_read_message_id, \
224 last_message_id, last_message_preview, last_message_ts_ms) \
225 VALUES (?1, 0, NULL, ?2, ?3, ?4) \
226 ON CONFLICT(peer_node_id) DO UPDATE SET \
227 last_message_id = excluded.last_message_id, \
228 last_message_preview = excluded.last_message_preview, \
229 last_message_ts_ms = excluded.last_message_ts_ms",
230 params![peer_id_hex, message_id_hex, preview, ts_ms],
231 )?;
232 Ok(())
233 }
234
235 pub fn bump_conversation_after_remote_receive(
239 &self,
240 peer_id_hex: &str,
241 message_id_hex: &str,
242 ts_ms: i64,
243 body: &str,
244 ) -> Result<ConversationBump> {
245 let preview = preview_of(body);
246 let conn = self.conn.lock().unwrap();
247
248 conn.execute(
249 "INSERT INTO conversations_meta (peer_node_id, unread_count, last_message_id, \
250 last_message_preview, last_message_ts_ms) \
251 VALUES (?1, 1, ?2, ?3, ?4) \
252 ON CONFLICT(peer_node_id) DO UPDATE SET \
253 unread_count = unread_count + 1, \
254 last_message_id = excluded.last_message_id, \
255 last_message_preview = excluded.last_message_preview, \
256 last_message_ts_ms = excluded.last_message_ts_ms",
257 params![peer_id_hex, message_id_hex, preview, ts_ms],
258 )?;
259
260 let unread_count: i64 = conn.query_row(
261 "SELECT unread_count FROM conversations_meta WHERE peer_node_id = ?1",
262 params![peer_id_hex],
263 |row| row.get::<_, i64>(0),
264 )?;
265
266 Ok(ConversationBump {
267 preview,
268 unread_count,
269 })
270 }
271
272 pub fn set_message_status(
275 &self,
276 message_id_hex: &str,
277 status: MessageStatus,
278 delivered_at_ms: Option<i64>,
279 read_at_ms: Option<i64>,
280 ) -> Result<()> {
281 let conn = self.conn.lock().unwrap();
282 match (delivered_at_ms, read_at_ms) {
283 (Some(d), Some(r)) => {
284 conn.execute(
285 "UPDATE messages SET status = ?1, delivered_at_ms = ?2, read_at_ms = ?3 WHERE id = ?4",
286 params![status.as_str(), d, r, message_id_hex],
287 )?;
288 }
289 (Some(d), None) => {
290 conn.execute(
291 "UPDATE messages SET status = ?1, delivered_at_ms = COALESCE(delivered_at_ms, ?2) WHERE id = ?3",
292 params![status.as_str(), d, message_id_hex],
293 )?;
294 }
295 (None, Some(r)) => {
296 conn.execute(
297 "UPDATE messages SET status = ?1, read_at_ms = COALESCE(read_at_ms, ?2) WHERE id = ?3",
298 params![status.as_str(), r, message_id_hex],
299 )?;
300 }
301 (None, None) => {
302 conn.execute(
303 "UPDATE messages SET status = ?1 WHERE id = ?2",
304 params![status.as_str(), message_id_hex],
305 )?;
306 }
307 }
308 Ok(())
309 }
310
311 pub fn set_message_failed(&self, message_id_hex: &str, reason: &str, at_ms: i64) -> Result<()> {
313 let conn = self.conn.lock().unwrap();
314 conn.execute(
315 "UPDATE messages SET status = 'failed', failure_reason = ?1, delivered_at_ms = COALESCE(delivered_at_ms, ?2) WHERE id = ?3",
316 params![reason, at_ms, message_id_hex],
317 )?;
318 Ok(())
319 }
320
321 pub fn history(
323 &self,
324 peer_id_hex: &str,
325 before_ts_ms: Option<i64>,
326 limit: i64,
327 ) -> Result<(Vec<MessageRecord>, bool)> {
328 let conn = self.conn.lock().unwrap();
329 let limit_plus = limit.saturating_add(1).max(2);
330
331 let mut stmt = match before_ts_ms {
332 Some(_) => conn.prepare(
333 "SELECT id, peer_node_id, direction, body, timestamp_ms, status, failure_reason, delivered_at_ms, read_at_ms \
334 FROM messages \
335 WHERE peer_node_id = ?1 AND timestamp_ms < ?2 \
336 ORDER BY timestamp_ms DESC, id DESC \
337 LIMIT ?3",
338 )?,
339 None => conn.prepare(
340 "SELECT id, peer_node_id, direction, body, timestamp_ms, status, failure_reason, delivered_at_ms, read_at_ms \
341 FROM messages \
342 WHERE peer_node_id = ?1 \
343 ORDER BY timestamp_ms DESC, id DESC \
344 LIMIT ?2",
345 )?,
346 };
347
348 let rows = match before_ts_ms {
349 Some(ts) => stmt.query_map(params![peer_id_hex, ts, limit_plus], parse_row)?,
350 None => stmt.query_map(params![peer_id_hex, limit_plus], parse_row)?,
351 };
352
353 let mut out: Vec<MessageRecord> = rows.collect::<Result<Vec<_>, _>>()?;
354 let has_more = out.len() as i64 > limit;
355 if has_more {
356 out.truncate(limit as usize);
357 }
358 Ok((out, has_more))
359 }
360
361 pub fn list_conversations_raw(&self) -> Result<Vec<ConversationSummary>> {
365 let conn = self.conn.lock().unwrap();
366 let mut stmt = conn.prepare(
367 "SELECT peer_node_id, last_message_preview, last_message_ts_ms, unread_count \
368 FROM conversations_meta \
369 ORDER BY last_message_ts_ms DESC NULLS LAST",
370 )?;
371
372 let rows = stmt
373 .query_map([], |row| {
374 let peer_node_id: String = row.get(0)?;
375 let preview: Option<String> = row.get(1)?;
376 let ts: Option<i64> = row.get(2)?;
377 let unread: i64 = row.get(3)?;
378 let short = short_id(&peer_node_id);
379 Ok(ConversationSummary {
380 peer_node_id_short: short.clone(),
381 name: short,
382 peer_node_id,
383 last_message_preview: preview,
384 last_message_ts_ms: ts,
385 unread_count: unread,
386 x25519_pubkey: None,
387 })
388 })?
389 .collect::<Result<Vec<_>, _>>()?;
390
391 Ok(rows)
392 }
393
394 pub fn delete_conversation(&self, peer_id_hex: &str) -> Result<(usize, bool)> {
398 let mut conn = self.conn.lock().unwrap();
399 let tx = conn.transaction()?;
400 let deleted_messages = tx.execute(
401 "DELETE FROM messages WHERE peer_node_id = ?1",
402 params![peer_id_hex],
403 )?;
404 let deleted_meta = tx.execute(
405 "DELETE FROM conversations_meta WHERE peer_node_id = ?1",
406 params![peer_id_hex],
407 )?;
408 tx.commit()?;
409 Ok((deleted_messages, deleted_meta > 0))
410 }
411
412 pub fn delete_all_messages(&self) -> Result<(usize, usize)> {
414 let mut conn = self.conn.lock().unwrap();
415 let tx = conn.transaction()?;
416 let deleted_messages = tx.execute("DELETE FROM messages", [])?;
417 let deleted_meta = tx.execute("DELETE FROM conversations_meta", [])?;
418 tx.commit()?;
419 Ok((deleted_messages, deleted_meta))
420 }
421
422 pub fn mark_read_up_to(&self, peer_id_hex: &str, up_to_ts_ms: i64) -> Result<i64> {
425 let conn = self.conn.lock().unwrap();
426 conn.execute(
427 "UPDATE messages SET status = 'read', read_at_ms = COALESCE(read_at_ms, ?2) \
428 WHERE peer_node_id = ?1 AND direction = 'received' AND timestamp_ms <= ?2 AND status != 'read'",
429 params![peer_id_hex, up_to_ts_ms],
430 )?;
431 conn.execute(
432 "UPDATE conversations_meta SET unread_count = 0 WHERE peer_node_id = ?1",
433 params![peer_id_hex],
434 )?;
435 Ok(0)
436 }
437
438 pub fn delete_conversation_for_peer(&self, peer: &NodeId) -> Result<(usize, bool)> {
443 let hex = hex_node_id(peer);
444 self.delete_conversation(&hex)
445 }
446}
447
448#[derive(Debug, Clone)]
453pub struct ConversationBump {
454 pub preview: String,
456 pub unread_count: i64,
458}
459
460fn parse_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<MessageRecord> {
461 let direction_str: String = row.get(2)?;
462 let status_str: String = row.get(5)?;
463
464 Ok(MessageRecord {
465 id: row.get(0)?,
466 peer_node_id: row.get(1)?,
467 direction: MessageDirection::from_str(&direction_str)
468 .map_err(|e| rusqlite::Error::InvalidColumnName(e.to_string()))?,
469 body: row.get(3)?,
470 timestamp_ms: row.get(4)?,
471 status: MessageStatus::from_str(&status_str)
472 .map_err(|e| rusqlite::Error::InvalidColumnName(e.to_string()))?,
473 failure_reason: row.get(6)?,
474 delivered_at_ms: row.get(7)?,
475 read_at_ms: row.get(8)?,
476 })
477}
478
479fn preview_of(body: &str) -> String {
480 let truncated: String = body.chars().take(80).collect();
481 truncated.replace('\n', " ")
482}
483
484pub(crate) fn short_id(hex: &str) -> String {
485 hex.chars().take(8).collect()
486}