Skip to main content

rns_rpc/storage/
messages.rs

1use rusqlite::{params, Connection, OpenFlags, OptionalExtension};
2use serde_json::Value as JsonValue;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{mpsc, Arc, Mutex};
5
6#[derive(Debug, Clone, PartialEq, serde::Serialize)]
7pub struct MessageRecord {
8    pub id: String,
9    pub source: String,
10    pub destination: String,
11    pub title: String,
12    pub content: String,
13    pub timestamp: i64,
14    pub direction: String,
15    pub fields: Option<JsonValue>,
16    pub receipt_status: Option<String>,
17}
18
19#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
20pub struct AnnounceRecord {
21    pub id: String,
22    pub peer: String,
23    pub timestamp: i64,
24    pub name: Option<String>,
25    pub name_source: Option<String>,
26    pub first_seen: i64,
27    pub seen_count: u64,
28    pub app_data_hex: Option<String>,
29    #[serde(default)]
30    pub capabilities: Vec<String>,
31    pub rssi: Option<f64>,
32    pub snr: Option<f64>,
33    pub q: Option<f64>,
34    pub stamp_cost: Option<u32>,
35    pub stamp_cost_flexibility: Option<u32>,
36    pub peering_cost: Option<u32>,
37}
38
39pub struct MessagesStore {
40    write_state: Arc<WriteState>,
41    outbound_write_tx: mpsc::Sender<OutboundWriteCommand>,
42    read_conn: Option<Mutex<Connection>>,
43    read_lock_wait_ns_total: AtomicU64,
44    read_ops_total: AtomicU64,
45}
46
47struct WriteState {
48    conn: Mutex<Connection>,
49    message_count_cache: AtomicU64,
50    write_lock_wait_ns_total: AtomicU64,
51    write_ops_total: AtomicU64,
52}
53
54enum OutboundWriteCommand {
55    InsertMessage {
56        record: MessageRecord,
57        reply: mpsc::Sender<rusqlite::Result<()>>,
58    },
59    ResolveReceiptStatus {
60        message_id: String,
61        candidate_status: String,
62        reply: mpsc::Sender<rusqlite::Result<Option<String>>>,
63    },
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub struct MessagesStoreContentionSnapshot {
68    pub read_lock_wait_ns_total: u64,
69    pub read_ops_total: u64,
70    pub write_lock_wait_ns_total: u64,
71    pub write_ops_total: u64,
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub struct MessageStorageStats {
76    pub count: u64,
77    pub bytes: u64,
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub struct PeerMessageStats {
82    pub outgoing: u64,
83    pub incoming: u64,
84    pub offered: u64,
85    pub unhandled: u64,
86}
87
88impl MessagesStore {
89    const SDK_DOMAIN_SNAPSHOT_KEY: &'static str = "sdk_domains.v1";
90
91    fn is_terminal_receipt_status(status: &str) -> bool {
92        let normalized = status.trim().to_ascii_lowercase();
93        normalized.starts_with("failed")
94            || matches!(normalized.as_str(), "cancelled" | "delivered" | "expired" | "rejected")
95    }
96
97    pub fn in_memory() -> rusqlite::Result<Self> {
98        let conn = Connection::open_in_memory()?;
99        let write_state = Arc::new(WriteState {
100            conn: Mutex::new(conn),
101            message_count_cache: AtomicU64::new(0),
102            write_lock_wait_ns_total: AtomicU64::new(0),
103            write_ops_total: AtomicU64::new(0),
104        });
105        let (outbound_write_tx, outbound_write_rx) = mpsc::channel();
106        let store = Self {
107            write_state: write_state.clone(),
108            outbound_write_tx,
109            read_conn: None,
110            read_lock_wait_ns_total: AtomicU64::new(0),
111            read_ops_total: AtomicU64::new(0),
112        };
113        store.configure_connection()?;
114        store.init_schema()?;
115        store.refresh_message_count_cache()?;
116        Self::spawn_outbound_write_worker(write_state, outbound_write_rx);
117        Ok(store)
118    }
119
120    pub fn open(path: &std::path::Path) -> rusqlite::Result<Self> {
121        let write_conn = Connection::open(path)?;
122        let read_conn = Connection::open_with_flags(
123            path,
124            OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI,
125        )?;
126        let write_state = Arc::new(WriteState {
127            conn: Mutex::new(write_conn),
128            message_count_cache: AtomicU64::new(0),
129            write_lock_wait_ns_total: AtomicU64::new(0),
130            write_ops_total: AtomicU64::new(0),
131        });
132        let (outbound_write_tx, outbound_write_rx) = mpsc::channel();
133        let store = Self {
134            write_state: write_state.clone(),
135            outbound_write_tx,
136            read_conn: Some(Mutex::new(read_conn)),
137            read_lock_wait_ns_total: AtomicU64::new(0),
138            read_ops_total: AtomicU64::new(0),
139        };
140        store.configure_connection()?;
141        store.init_schema()?;
142        store.refresh_message_count_cache()?;
143        Self::spawn_outbound_write_worker(write_state, outbound_write_rx);
144        Ok(store)
145    }
146
147    fn refresh_message_count_cache(&self) -> rusqlite::Result<()> {
148        let count: i64 = self.with_read_conn(|conn| {
149            conn.query_row("SELECT COUNT(*) FROM messages", [], |row| row.get(0))
150        })?;
151        self.write_state.message_count_cache.store(count.max(0) as u64, Ordering::Relaxed);
152        Ok(())
153    }
154
155    fn spawn_outbound_write_worker(
156        write_state: Arc<WriteState>,
157        rx: mpsc::Receiver<OutboundWriteCommand>,
158    ) {
159        std::thread::Builder::new()
160            .name("messages-outbound-writer".to_string())
161            .spawn(move || {
162                while let Ok(command) = rx.recv() {
163                    match command {
164                        OutboundWriteCommand::InsertMessage { record, reply } => {
165                            let _ = reply
166                                .send(Self::insert_message_direct(write_state.as_ref(), &record));
167                        }
168                        OutboundWriteCommand::ResolveReceiptStatus {
169                            message_id,
170                            candidate_status,
171                            reply,
172                        } => {
173                            let _ = reply.send(Self::resolve_receipt_status_direct(
174                                write_state.as_ref(),
175                                message_id.as_str(),
176                                candidate_status.as_str(),
177                            ));
178                        }
179                    }
180                }
181            })
182            .expect("spawn messages outbound writer");
183    }
184
185    fn with_write_conn<T>(
186        &self,
187        f: impl FnOnce(&Connection) -> rusqlite::Result<T>,
188    ) -> rusqlite::Result<T> {
189        let started = std::time::Instant::now();
190        let conn = self.write_state.conn.lock().expect("messages sqlite write mutex poisoned");
191        let waited_ns = started.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64;
192        self.write_state.write_lock_wait_ns_total.fetch_add(waited_ns, Ordering::Relaxed);
193        self.write_state.write_ops_total.fetch_add(1, Ordering::Relaxed);
194        f(&conn)
195    }
196
197    fn with_read_conn<T>(
198        &self,
199        f: impl FnOnce(&Connection) -> rusqlite::Result<T>,
200    ) -> rusqlite::Result<T> {
201        if let Some(conn) = &self.read_conn {
202            let started = std::time::Instant::now();
203            let conn = conn.lock().expect("messages sqlite read mutex poisoned");
204            let waited_ns = started.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64;
205            self.read_lock_wait_ns_total.fetch_add(waited_ns, Ordering::Relaxed);
206            self.read_ops_total.fetch_add(1, Ordering::Relaxed);
207            f(&conn)
208        } else {
209            self.with_write_conn(f)
210        }
211    }
212
213    pub fn contention_snapshot(&self) -> MessagesStoreContentionSnapshot {
214        MessagesStoreContentionSnapshot {
215            read_lock_wait_ns_total: self.read_lock_wait_ns_total.load(Ordering::Relaxed),
216            read_ops_total: self.read_ops_total.load(Ordering::Relaxed),
217            write_lock_wait_ns_total: self
218                .write_state
219                .write_lock_wait_ns_total
220                .load(Ordering::Relaxed),
221            write_ops_total: self.write_state.write_ops_total.load(Ordering::Relaxed),
222        }
223    }
224
225    fn write_lock_and_run<T>(
226        write_state: &WriteState,
227        f: impl FnOnce(&Connection) -> rusqlite::Result<T>,
228    ) -> rusqlite::Result<T> {
229        let started = std::time::Instant::now();
230        let conn = write_state.conn.lock().expect("messages sqlite write mutex poisoned");
231        let waited_ns = started.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64;
232        write_state.write_lock_wait_ns_total.fetch_add(waited_ns, Ordering::Relaxed);
233        write_state.write_ops_total.fetch_add(1, Ordering::Relaxed);
234        f(&conn)
235    }
236
237    fn insert_message_direct(
238        write_state: &WriteState,
239        record: &MessageRecord,
240    ) -> rusqlite::Result<()> {
241        let fields_json =
242            record.fields.as_ref().map(|value| serde_json::to_string(value).unwrap_or_default());
243        Self::write_lock_and_run(write_state, |conn| {
244            let inserted = conn.execute(
245                "INSERT INTO messages (id, source, destination, title, content, timestamp, direction, fields, receipt_status)
246                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
247                 ON CONFLICT(id) DO NOTHING",
248                params![
249                    &record.id,
250                    &record.source,
251                    &record.destination,
252                    &record.title,
253                    &record.content,
254                    record.timestamp,
255                    &record.direction,
256                    fields_json,
257                    &record.receipt_status,
258                ],
259            )?;
260            if inserted == 0 {
261                conn.execute(
262                    "UPDATE messages
263                     SET source = ?2,
264                         destination = ?3,
265                         title = ?4,
266                         content = ?5,
267                         timestamp = ?6,
268                         direction = ?7,
269                         fields = ?8,
270                         receipt_status = ?9
271                     WHERE id = ?1",
272                    params![
273                        &record.id,
274                        &record.source,
275                        &record.destination,
276                        &record.title,
277                        &record.content,
278                        record.timestamp,
279                        &record.direction,
280                        fields_json,
281                        &record.receipt_status,
282                    ],
283                )?;
284            } else {
285                write_state.message_count_cache.fetch_add(1, Ordering::Relaxed);
286            }
287            Ok(())
288        })
289    }
290
291    fn resolve_receipt_status_direct(
292        write_state: &WriteState,
293        message_id: &str,
294        candidate_status: &str,
295    ) -> rusqlite::Result<Option<String>> {
296        Self::write_lock_and_run(write_state, |conn| {
297            let existing_status = conn
298                .query_row(
299                    "SELECT receipt_status FROM messages WHERE id = ?1 LIMIT 1",
300                    params![message_id],
301                    |row| row.get::<_, Option<String>>(0),
302                )
303                .optional()?
304                .flatten();
305            if let Some(existing_status) = existing_status {
306                if Self::is_terminal_receipt_status(existing_status.as_str()) {
307                    return Ok(Some(existing_status));
308                }
309            }
310            conn.execute(
311                "UPDATE messages SET receipt_status = ?1 WHERE id = ?2",
312                params![candidate_status, message_id],
313            )?;
314            Ok(Some(candidate_status.to_string()))
315        })
316    }
317
318    pub fn insert_message(&self, record: &MessageRecord) -> rusqlite::Result<()> {
319        let (reply_tx, reply_rx) = mpsc::channel();
320        self.outbound_write_tx
321            .send(OutboundWriteCommand::InsertMessage { record: record.clone(), reply: reply_tx })
322            .map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?;
323        reply_rx.recv().map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?
324    }
325
326    pub fn list_messages(
327        &self,
328        limit: usize,
329        before_ts: Option<i64>,
330    ) -> rusqlite::Result<Vec<MessageRecord>> {
331        self.with_read_conn(|conn| {
332            let mut records = Vec::new();
333            if let Some(ts) = before_ts {
334                let mut stmt = conn.prepare(
335                    "SELECT id, source, destination, title, content, timestamp, direction, fields, receipt_status FROM messages WHERE timestamp < ?1 ORDER BY timestamp DESC LIMIT ?2",
336                )?;
337                let mut rows = stmt.query(params![ts, limit as i64])?;
338                while let Some(row) = rows.next()? {
339                    let fields_json: Option<String> = row.get(7)?;
340                    let fields =
341                        fields_json.as_ref().and_then(|value| serde_json::from_str(value).ok());
342                    let receipt_status: Option<String> = row.get(8)?;
343                    records.push(MessageRecord {
344                        id: row.get(0)?,
345                        source: row.get(1)?,
346                        destination: row.get(2)?,
347                        title: row.get(3)?,
348                        content: row.get(4)?,
349                        timestamp: row.get(5)?,
350                        direction: row.get(6)?,
351                        fields,
352                        receipt_status,
353                    });
354                }
355            } else {
356                let mut stmt = conn.prepare(
357                    "SELECT id, source, destination, title, content, timestamp, direction, fields, receipt_status FROM messages ORDER BY timestamp DESC LIMIT ?1",
358                )?;
359                let mut rows = stmt.query(params![limit as i64])?;
360                while let Some(row) = rows.next()? {
361                    let fields_json: Option<String> = row.get(7)?;
362                    let fields =
363                        fields_json.as_ref().and_then(|value| serde_json::from_str(value).ok());
364                    let receipt_status: Option<String> = row.get(8)?;
365                    records.push(MessageRecord {
366                        id: row.get(0)?,
367                        source: row.get(1)?,
368                        destination: row.get(2)?,
369                        title: row.get(3)?,
370                        content: row.get(4)?,
371                        timestamp: row.get(5)?,
372                        direction: row.get(6)?,
373                        fields,
374                        receipt_status,
375                    });
376                }
377            }
378            Ok(records)
379        })
380    }
381
382    pub fn get_message(&self, message_id: &str) -> rusqlite::Result<Option<MessageRecord>> {
383        self.with_read_conn(|conn| {
384            let mut stmt = conn.prepare(
385                "SELECT id, source, destination, title, content, timestamp, direction, fields, receipt_status FROM messages WHERE id = ?1 LIMIT 1",
386            )?;
387            stmt.query_row(params![message_id], |row| {
388                let fields_json: Option<String> = row.get(7)?;
389                let fields =
390                    fields_json.as_ref().and_then(|value| serde_json::from_str(value).ok());
391                let receipt_status: Option<String> = row.get(8)?;
392                Ok(MessageRecord {
393                    id: row.get(0)?,
394                    source: row.get(1)?,
395                    destination: row.get(2)?,
396                    title: row.get(3)?,
397                    content: row.get(4)?,
398                    timestamp: row.get(5)?,
399                    direction: row.get(6)?,
400                    fields,
401                    receipt_status,
402                })
403            })
404            .optional()
405        })
406    }
407
408    pub fn message_count(&self) -> rusqlite::Result<u64> {
409        Ok(self.write_state.message_count_cache.load(Ordering::Relaxed))
410    }
411
412    pub fn message_storage_stats(&self) -> rusqlite::Result<MessageStorageStats> {
413        self.with_read_conn(|conn| {
414            let count = self.write_state.message_count_cache.load(Ordering::Relaxed);
415            let bytes: Option<i64> = conn.query_row(
416                "SELECT COALESCE(SUM(
417                    LENGTH(id) +
418                    LENGTH(source) +
419                    LENGTH(destination) +
420                    LENGTH(title) +
421                    LENGTH(content) +
422                    LENGTH(direction) +
423                    COALESCE(LENGTH(fields), 0) +
424                    COALESCE(LENGTH(receipt_status), 0)
425                ), 0) FROM messages",
426                [],
427                |row| row.get(0),
428            )?;
429            Ok(MessageStorageStats { count, bytes: bytes.unwrap_or(0).max(0) as u64 })
430        })
431    }
432
433    pub fn peer_message_stats(&self, peer: &str) -> rusqlite::Result<PeerMessageStats> {
434        self.with_read_conn(|conn| {
435            let (outgoing, incoming, offered, unhandled): (i64, i64, i64, i64) = conn.query_row(
436                "SELECT
437                    COALESCE(SUM(CASE WHEN destination = ?1 AND direction = 'out' THEN 1 ELSE 0 END), 0),
438                    COALESCE(SUM(CASE WHEN source = ?1 AND direction = 'in' THEN 1 ELSE 0 END), 0),
439                    COALESCE(SUM(CASE
440                        WHEN destination = ?1
441                         AND direction = 'out'
442                         AND (
443                            receipt_status IS NULL
444                            OR TRIM(receipt_status) = ''
445                            OR (
446                                LOWER(receipt_status) NOT LIKE 'sent%'
447                                AND LOWER(receipt_status) NOT IN ('cancelled', 'delivered', 'failed', 'expired', 'rejected')
448                            )
449                         )
450                        THEN 1
451                        ELSE 0
452                    END), 0),
453                    COALESCE(SUM(CASE WHEN source = ?1 AND direction = 'in' AND receipt_status IS NULL THEN 1 ELSE 0 END), 0)
454                 FROM messages",
455                params![peer],
456                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
457            )?;
458            Ok(PeerMessageStats {
459                outgoing: outgoing.max(0) as u64,
460                incoming: incoming.max(0) as u64,
461                offered: offered.max(0) as u64,
462                unhandled: unhandled.max(0) as u64,
463            })
464        })
465    }
466
467    pub fn count_message_buckets(&self) -> rusqlite::Result<(u64, u64)> {
468        let (queued, in_flight): (i64, i64) = self.with_read_conn(|conn| {
469            let mut stmt = conn.prepare(
470                "SELECT
471                    COALESCE(SUM(CASE
472                        WHEN receipt_status IS NULL OR TRIM(receipt_status) = '' THEN 1
473                        ELSE 0
474                    END), 0) AS queued_count,
475                    COALESCE(SUM(CASE
476                        WHEN receipt_status IS NOT NULL
477                            AND TRIM(receipt_status) <> ''
478                            AND LOWER(receipt_status) NOT LIKE 'sent%'
479                            AND LOWER(receipt_status) NOT IN ('cancelled', 'delivered', 'failed', 'expired', 'rejected')
480                        THEN 1
481                        ELSE 0
482                    END), 0) AS in_flight_count
483                 FROM messages",
484            )?;
485            stmt.query_row([], |row| Ok((row.get(0)?, row.get(1)?)))
486        })?;
487        Ok((queued.max(0) as u64, in_flight.max(0) as u64))
488    }
489
490    pub fn count_outbound_messages(&self) -> rusqlite::Result<u64> {
491        let count: i64 = self.with_read_conn(|conn| {
492            conn.query_row("SELECT COUNT(*) FROM messages WHERE direction = 'out'", [], |row| {
493                row.get(0)
494            })
495        })?;
496        Ok(count.max(0) as u64)
497    }
498
499    pub fn expire_outbound_messages_before(&self, cutoff_ts: i64) -> rusqlite::Result<Vec<String>> {
500        self.with_write_conn(|conn| {
501            let mut stmt = conn.prepare(
502                "SELECT id
503                 FROM messages
504                 WHERE direction = 'out'
505                   AND timestamp < ?1
506                   AND (
507                        receipt_status IS NULL
508                        OR TRIM(receipt_status) = ''
509                        OR (
510                            LOWER(receipt_status) NOT LIKE 'sent%'
511                            AND LOWER(receipt_status) NOT IN ('cancelled', 'delivered', 'failed', 'expired', 'rejected')
512                        )
513                   )
514                 ORDER BY timestamp ASC, id ASC",
515            )?;
516            let mut rows = stmt.query(params![cutoff_ts])?;
517            let mut ids = Vec::new();
518            while let Some(row) = rows.next()? {
519                ids.push(row.get::<_, String>(0)?);
520            }
521            drop(rows);
522            drop(stmt);
523            for message_id in ids.iter() {
524                conn.execute(
525                    "UPDATE messages SET receipt_status = 'expired' WHERE id = ?1",
526                    params![message_id],
527                )?;
528            }
529            Ok(ids)
530        })
531    }
532
533    pub fn prune_outbound_messages(
534        &self,
535        count: usize,
536        eviction_priority: &str,
537    ) -> rusqlite::Result<Vec<String>> {
538        if count == 0 {
539            return Ok(Vec::new());
540        }
541        self.with_write_conn(|conn| {
542            let collect_ids = |query: &str, remaining: usize| -> rusqlite::Result<Vec<String>> {
543                if remaining == 0 {
544                    return Ok(Vec::new());
545                }
546                let mut stmt = conn.prepare(query)?;
547                let mut rows = stmt.query(params![remaining as i64])?;
548                let mut ids = Vec::new();
549                while let Some(row) = rows.next()? {
550                    ids.push(row.get::<_, String>(0)?);
551                }
552                Ok(ids)
553            };
554
555            let normalized = eviction_priority.trim().to_ascii_lowercase();
556            let mut ids = if normalized == "terminal_first" {
557                let mut selected = collect_ids(
558                    "SELECT id
559                     FROM messages
560                     WHERE direction = 'out'
561                       AND receipt_status IS NOT NULL
562                       AND TRIM(receipt_status) <> ''
563                       AND (
564                            LOWER(receipt_status) LIKE 'sent%'
565                            OR LOWER(receipt_status) IN ('cancelled', 'delivered', 'failed', 'expired', 'rejected')
566                       )
567                     ORDER BY timestamp ASC, id ASC
568                     LIMIT ?1",
569                    count,
570                )?;
571                let remaining = count.saturating_sub(selected.len());
572                if remaining > 0 {
573                    let mut non_terminal = collect_ids(
574                        "SELECT id
575                         FROM messages
576                         WHERE direction = 'out'
577                           AND (
578                                receipt_status IS NULL
579                                OR TRIM(receipt_status) = ''
580                                OR (
581                                    LOWER(receipt_status) NOT LIKE 'sent%'
582                                    AND LOWER(receipt_status) NOT IN ('cancelled', 'delivered', 'failed', 'expired', 'rejected')
583                                )
584                           )
585                         ORDER BY timestamp ASC, id ASC
586                         LIMIT ?1",
587                        remaining,
588                    )?;
589                    selected.append(&mut non_terminal);
590                }
591                selected
592            } else {
593                collect_ids(
594                    "SELECT id
595                     FROM messages
596                     WHERE direction = 'out'
597                     ORDER BY timestamp ASC, id ASC
598                     LIMIT ?1",
599                    count,
600                )?
601            };
602
603            ids.sort();
604            ids.dedup();
605            for message_id in ids.iter() {
606                conn.execute("DELETE FROM messages WHERE id = ?1", params![message_id])?;
607            }
608            if !ids.is_empty() {
609                self.write_state
610                    .message_count_cache
611                    .fetch_sub(ids.len().min(u64::MAX as usize) as u64, Ordering::Relaxed);
612            }
613            Ok(ids)
614        })
615    }
616
617    pub fn prune_messages_to_limit_bytes(&self, limit_bytes: u64) -> rusqlite::Result<Vec<String>> {
618        self.with_write_conn(|conn| {
619            let current_bytes: i64 = conn.query_row(
620                "SELECT COALESCE(SUM(
621                    LENGTH(id) +
622                    LENGTH(source) +
623                    LENGTH(destination) +
624                    LENGTH(title) +
625                    LENGTH(content) +
626                    LENGTH(direction) +
627                    COALESCE(LENGTH(fields), 0) +
628                    COALESCE(LENGTH(receipt_status), 0)
629                ), 0) FROM messages",
630                [],
631                |row| row.get(0),
632            )?;
633            if current_bytes.max(0) as u64 <= limit_bytes {
634                return Ok(Vec::new());
635            }
636
637            let mut stmt = conn.prepare(
638                "SELECT id,
639                        LENGTH(id) +
640                        LENGTH(source) +
641                        LENGTH(destination) +
642                        LENGTH(title) +
643                        LENGTH(content) +
644                        LENGTH(direction) +
645                        COALESCE(LENGTH(fields), 0) +
646                        COALESCE(LENGTH(receipt_status), 0) AS approx_bytes
647                 FROM messages
648                 ORDER BY timestamp ASC, id ASC",
649            )?;
650            let mut rows = stmt.query([])?;
651            let mut bytes = current_bytes.max(0) as u64;
652            let mut ids = Vec::new();
653            while let Some(row) = rows.next()? {
654                if bytes <= limit_bytes {
655                    break;
656                }
657                let id: String = row.get(0)?;
658                let approx_bytes: i64 = row.get(1)?;
659                ids.push(id);
660                bytes = bytes.saturating_sub(approx_bytes.max(0) as u64);
661            }
662            drop(rows);
663            drop(stmt);
664
665            for message_id in ids.iter() {
666                conn.execute("DELETE FROM messages WHERE id = ?1", params![message_id])?;
667            }
668            if !ids.is_empty() {
669                self.write_state
670                    .message_count_cache
671                    .fetch_sub(ids.len().min(u64::MAX as usize) as u64, Ordering::Relaxed);
672            }
673            Ok(ids)
674        })
675    }
676
677    pub fn update_receipt_status(&self, message_id: &str, status: &str) -> rusqlite::Result<()> {
678        self.with_write_conn(|conn| {
679            conn.execute(
680                "UPDATE messages SET receipt_status = ?1 WHERE id = ?2",
681                params![status, message_id],
682            )?;
683            Ok(())
684        })
685    }
686
687    pub fn resolve_receipt_status(
688        &self,
689        message_id: &str,
690        candidate_status: &str,
691    ) -> rusqlite::Result<Option<String>> {
692        let (reply_tx, reply_rx) = mpsc::channel();
693        self.outbound_write_tx
694            .send(OutboundWriteCommand::ResolveReceiptStatus {
695                message_id: message_id.to_string(),
696                candidate_status: candidate_status.to_string(),
697                reply: reply_tx,
698            })
699            .map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?;
700        reply_rx.recv().map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?
701    }
702
703    pub fn clear_messages(&self) -> rusqlite::Result<()> {
704        self.with_write_conn(|conn| {
705            conn.execute("DELETE FROM messages", [])?;
706            self.write_state.message_count_cache.store(0, Ordering::Relaxed);
707            Ok(())
708        })
709    }
710
711    pub fn insert_announce(&self, record: &AnnounceRecord) -> rusqlite::Result<()> {
712        let capabilities_json = serde_json::to_string(&record.capabilities).unwrap_or_default();
713        self.with_write_conn(|conn| {
714            conn.execute(
715                "INSERT OR REPLACE INTO announces (id, peer, timestamp, name, name_source, first_seen, seen_count, app_data_hex, capabilities, rssi, snr, q, stamp_cost, stamp_cost_flexibility, peering_cost) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
716                params![
717                    &record.id,
718                    &record.peer,
719                    record.timestamp,
720                    &record.name,
721                    &record.name_source,
722                    record.first_seen,
723                    record.seen_count as i64,
724                    &record.app_data_hex,
725                    capabilities_json,
726                    record.rssi,
727                    record.snr,
728                    record.q,
729                    record.stamp_cost,
730                    record.stamp_cost_flexibility,
731                    record.peering_cost,
732                ],
733            )?;
734            Ok(())
735        })
736    }
737
738    pub fn list_announces(
739        &self,
740        limit: usize,
741        before_ts: Option<i64>,
742        before_id: Option<&str>,
743    ) -> rusqlite::Result<Vec<AnnounceRecord>> {
744        self.with_read_conn(|conn| {
745            let mut records = Vec::new();
746            let parse_row = |row: &rusqlite::Row| -> rusqlite::Result<AnnounceRecord> {
747                let capabilities_json: Option<String> = row.get(8)?;
748                let capabilities = capabilities_json
749                    .as_deref()
750                    .and_then(|value| serde_json::from_str::<Vec<String>>(value).ok())
751                    .unwrap_or_default();
752                let seen_count: i64 = row.get(6)?;
753                Ok(AnnounceRecord {
754                    id: row.get(0)?,
755                    peer: row.get(1)?,
756                    timestamp: row.get(2)?,
757                    name: row.get(3)?,
758                    name_source: row.get(4)?,
759                    first_seen: row.get(5)?,
760                    seen_count: seen_count.max(0) as u64,
761                    app_data_hex: row.get(7)?,
762                    capabilities,
763                    rssi: row.get(9)?,
764                    snr: row.get(10)?,
765                    q: row.get(11)?,
766                    stamp_cost: row.get(12)?,
767                    stamp_cost_flexibility: row.get(13)?,
768                    peering_cost: row.get(14)?,
769                })
770            };
771            if let Some(ts) = before_ts {
772                let query_with_id = "SELECT id, peer, timestamp, name, name_source, first_seen, seen_count, app_data_hex, capabilities, rssi, snr, q, stamp_cost, stamp_cost_flexibility, peering_cost FROM announces WHERE (timestamp < ?1 OR (timestamp = ?1 AND id < ?2)) ORDER BY timestamp DESC, id DESC LIMIT ?3";
773                let query_without_id = "SELECT id, peer, timestamp, name, name_source, first_seen, seen_count, app_data_hex, capabilities, rssi, snr, q, stamp_cost, stamp_cost_flexibility, peering_cost FROM announces WHERE timestamp < ?1 ORDER BY timestamp DESC, id DESC LIMIT ?2";
774                if let Some(ann_id) = before_id {
775                    let mut stmt = conn.prepare(query_with_id)?;
776                    let mut rows = stmt.query(params![ts, ann_id, limit as i64])?;
777                    while let Some(row) = rows.next()? {
778                        records.push(parse_row(row)?);
779                    }
780                } else {
781                    let mut stmt = conn.prepare(query_without_id)?;
782                    let mut rows = stmt.query(params![ts, limit as i64])?;
783                    while let Some(row) = rows.next()? {
784                        records.push(parse_row(row)?);
785                    }
786                }
787            } else {
788                let mut stmt = conn.prepare(
789                    "SELECT id, peer, timestamp, name, name_source, first_seen, seen_count, app_data_hex, capabilities, rssi, snr, q, stamp_cost, stamp_cost_flexibility, peering_cost FROM announces ORDER BY timestamp DESC LIMIT ?1",
790                )?;
791                let mut rows = stmt.query(params![limit as i64])?;
792                while let Some(row) = rows.next()? {
793                    records.push(parse_row(row)?);
794                }
795            }
796            Ok(records)
797        })
798    }
799
800    pub fn clear_announces(&self) -> rusqlite::Result<()> {
801        self.with_write_conn(|conn| {
802            conn.execute("DELETE FROM announces", [])?;
803            Ok(())
804        })
805    }
806
807    pub fn put_sdk_domain_snapshot(&self, snapshot: &JsonValue) -> rusqlite::Result<()> {
808        let snapshot_json = serde_json::to_string(snapshot)
809            .map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?;
810        self.with_write_conn(|conn| {
811            conn.execute(
812                "INSERT INTO sdk_domain_state (domain, state_json) VALUES (?1, ?2)
813                 ON CONFLICT(domain) DO UPDATE SET state_json = excluded.state_json",
814                params![Self::SDK_DOMAIN_SNAPSHOT_KEY, snapshot_json],
815            )?;
816            Ok(())
817        })
818    }
819
820    pub fn get_sdk_domain_snapshot(&self) -> rusqlite::Result<Option<JsonValue>> {
821        let snapshot_json: Option<String> = self.with_read_conn(|conn| {
822            conn.query_row(
823                "SELECT state_json FROM sdk_domain_state WHERE domain = ?1 LIMIT 1",
824                params![Self::SDK_DOMAIN_SNAPSHOT_KEY],
825                |row| row.get(0),
826            )
827            .optional()
828        })?;
829        let Some(snapshot_json) = snapshot_json else {
830            return Ok(None);
831        };
832        let parsed = serde_json::from_str(snapshot_json.as_str()).map_err(|err| {
833            rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(err))
834        })?;
835        Ok(Some(parsed))
836    }
837
838    pub fn clear_sdk_domain_snapshot(&self) -> rusqlite::Result<()> {
839        self.with_write_conn(|conn| {
840            conn.execute(
841                "DELETE FROM sdk_domain_state WHERE domain = ?1",
842                params![Self::SDK_DOMAIN_SNAPSHOT_KEY],
843            )?;
844            Ok(())
845        })
846    }
847
848    fn configure_connection(&self) -> rusqlite::Result<()> {
849        self.with_write_conn(|conn| {
850            conn.pragma_update(None, "journal_mode", "WAL")?;
851            conn.pragma_update(None, "synchronous", "NORMAL")?;
852            conn.pragma_update(None, "busy_timeout", 5_000i64)?;
853            Ok(())
854        })?;
855        if self.read_conn.is_some() {
856            self.with_read_conn(|conn| {
857                conn.pragma_update(None, "busy_timeout", 5_000i64)?;
858                Ok(())
859            })?;
860        }
861        Ok(())
862    }
863
864    #[cfg(test)]
865    fn busy_timeout_ms(&self) -> rusqlite::Result<i64> {
866        self.with_write_conn(|conn| conn.query_row("PRAGMA busy_timeout", [], |row| row.get(0)))
867    }
868
869    fn init_schema(&self) -> rusqlite::Result<()> {
870        self.with_write_conn(|conn| {
871            conn.execute_batch(
872                "CREATE TABLE IF NOT EXISTS messages (
873                    id TEXT PRIMARY KEY,
874                    source TEXT NOT NULL,
875                    destination TEXT NOT NULL,
876                    title TEXT NOT NULL,
877                    content TEXT NOT NULL,
878                    timestamp INTEGER NOT NULL,
879                    direction TEXT NOT NULL,
880                    fields TEXT,
881                    receipt_status TEXT
882                );
883                CREATE TABLE IF NOT EXISTS announces (
884                    id TEXT PRIMARY KEY,
885                    peer TEXT NOT NULL,
886                    timestamp INTEGER NOT NULL,
887                    name TEXT,
888                    name_source TEXT,
889                    first_seen INTEGER NOT NULL,
890                    seen_count INTEGER NOT NULL,
891                    app_data_hex TEXT,
892                    capabilities TEXT,
893                    rssi REAL,
894                    snr REAL,
895                    q REAL,
896                    stamp_cost INTEGER,
897                    stamp_cost_flexibility INTEGER,
898                    peering_cost INTEGER
899                );
900                CREATE TABLE IF NOT EXISTS sdk_domain_state (
901                    domain TEXT PRIMARY KEY,
902                    state_json TEXT NOT NULL
903                );
904                CREATE INDEX IF NOT EXISTS idx_messages_timestamp_desc
905                    ON messages(timestamp DESC);
906                CREATE INDEX IF NOT EXISTS idx_messages_direction_timestamp_desc
907                    ON messages(direction, timestamp DESC);
908                CREATE INDEX IF NOT EXISTS idx_messages_receipt_status
909                    ON messages(receipt_status);
910                CREATE INDEX IF NOT EXISTS idx_announces_timestamp_id_desc
911                    ON announces(timestamp DESC, id DESC);",
912            )?;
913            let _ = conn.execute("ALTER TABLE messages ADD COLUMN title TEXT", []);
914            let _ = conn.execute("UPDATE messages SET title = '' WHERE title IS NULL", []);
915            let _ = conn.execute("ALTER TABLE messages ADD COLUMN fields TEXT", []);
916            let _ = conn.execute("ALTER TABLE messages ADD COLUMN receipt_status TEXT", []);
917            let _ = conn.execute("ALTER TABLE announces ADD COLUMN name TEXT", []);
918            let _ = conn.execute("ALTER TABLE announces ADD COLUMN name_source TEXT", []);
919            let _ = conn.execute("ALTER TABLE announces ADD COLUMN first_seen INTEGER", []);
920            let _ = conn.execute("ALTER TABLE announces ADD COLUMN seen_count INTEGER", []);
921            let _ = conn.execute("ALTER TABLE announces ADD COLUMN app_data_hex TEXT", []);
922            let _ = conn.execute("ALTER TABLE announces ADD COLUMN capabilities TEXT", []);
923            let _ = conn.execute("ALTER TABLE announces ADD COLUMN rssi REAL", []);
924            let _ = conn.execute("ALTER TABLE announces ADD COLUMN snr REAL", []);
925            let _ = conn.execute("ALTER TABLE announces ADD COLUMN q REAL", []);
926            let _ = conn.execute("ALTER TABLE announces ADD COLUMN stamp_cost INTEGER", []);
927            let _ =
928                conn.execute("ALTER TABLE announces ADD COLUMN stamp_cost_flexibility INTEGER", []);
929            let _ = conn.execute("ALTER TABLE announces ADD COLUMN peering_cost INTEGER", []);
930            Ok(())
931        })
932    }
933}
934
935#[cfg(test)]
936mod tests {
937    use super::*;
938    use serde_json::json;
939
940    fn outbound_message(id: &str, timestamp: i64, receipt_status: Option<&str>) -> MessageRecord {
941        MessageRecord {
942            id: id.to_string(),
943            source: "src".to_string(),
944            destination: "dst".to_string(),
945            title: "title".to_string(),
946            content: "body".to_string(),
947            timestamp,
948            direction: "out".to_string(),
949            fields: None,
950            receipt_status: receipt_status.map(ToString::to_string),
951        }
952    }
953
954    #[test]
955    fn sdk_domain_snapshot_roundtrip() {
956        let store = MessagesStore::in_memory().expect("in-memory store");
957        let initial = store.get_sdk_domain_snapshot().expect("query snapshot");
958        assert!(initial.is_none(), "snapshot should be absent before first write");
959
960        let snapshot = json!({
961            "topics": [{ "topic_id": "topic-1" }],
962            "attachments": [],
963            "markers": [],
964        });
965        store.put_sdk_domain_snapshot(&snapshot).expect("persist snapshot");
966
967        let loaded = store.get_sdk_domain_snapshot().expect("load snapshot");
968        assert_eq!(loaded, Some(snapshot));
969    }
970
971    #[test]
972    fn sdk_domain_snapshot_clear_removes_record() {
973        let store = MessagesStore::in_memory().expect("in-memory store");
974        store
975            .put_sdk_domain_snapshot(&json!({ "voice_sessions": [{ "session_id": "voice-1" }] }))
976            .expect("persist snapshot");
977        store.clear_sdk_domain_snapshot().expect("clear snapshot");
978        let loaded = store.get_sdk_domain_snapshot().expect("load snapshot");
979        assert!(loaded.is_none(), "snapshot should be removed after clear");
980    }
981
982    #[test]
983    fn message_count_uses_direct_count() {
984        let store = MessagesStore::in_memory().expect("in-memory store");
985        store.insert_message(&outbound_message("msg-1", 1, None)).expect("insert msg-1");
986        store
987            .insert_message(&outbound_message("msg-2", 2, Some("delivered")))
988            .expect("insert msg-2");
989
990        assert_eq!(store.message_count().expect("count messages"), 2);
991    }
992
993    #[test]
994    fn message_count_cache_ignores_replace_for_existing_id() {
995        let store = MessagesStore::in_memory().expect("in-memory store");
996        store.insert_message(&outbound_message("msg-1", 1, None)).expect("insert original");
997        store
998            .insert_message(&outbound_message("msg-1", 2, Some("delivered")))
999            .expect("replace existing");
1000
1001        assert_eq!(store.message_count().expect("count messages"), 1);
1002    }
1003
1004    #[test]
1005    fn configure_connection_sets_busy_timeout() {
1006        let store = MessagesStore::in_memory().expect("in-memory store");
1007        let busy_timeout_ms = store.busy_timeout_ms().expect("query busy_timeout");
1008        assert_eq!(busy_timeout_ms, 5_000);
1009    }
1010
1011    #[test]
1012    fn expire_outbound_messages_marks_non_terminal_records() {
1013        let store = MessagesStore::in_memory().expect("in-memory store");
1014        store
1015            .insert_message(&outbound_message("out-non-terminal", 10, None))
1016            .expect("insert non-terminal");
1017        store
1018            .insert_message(&outbound_message("out-terminal", 10, Some("delivered")))
1019            .expect("insert terminal");
1020        let expired = store.expire_outbound_messages_before(11).expect("expire outbound");
1021        assert_eq!(expired, vec!["out-non-terminal".to_string()]);
1022        let non_terminal = store
1023            .get_message("out-non-terminal")
1024            .expect("load non-terminal")
1025            .expect("non-terminal exists");
1026        assert_eq!(non_terminal.receipt_status.as_deref(), Some("expired"));
1027        let terminal =
1028            store.get_message("out-terminal").expect("load terminal").expect("terminal exists");
1029        assert_eq!(terminal.receipt_status.as_deref(), Some("delivered"));
1030    }
1031
1032    #[test]
1033    fn prune_outbound_messages_terminal_first_prefers_terminal_records() {
1034        let store = MessagesStore::in_memory().expect("in-memory store");
1035        store
1036            .insert_message(&outbound_message("msg-terminal-old", 1, Some("sent: direct")))
1037            .expect("insert terminal old");
1038        store
1039            .insert_message(&outbound_message("msg-non-terminal", 2, None))
1040            .expect("insert non-terminal");
1041        store
1042            .insert_message(&outbound_message("msg-terminal-new", 3, Some("delivered")))
1043            .expect("insert terminal new");
1044
1045        let pruned = store.prune_outbound_messages(2, "terminal_first").expect("prune outbound");
1046        assert_eq!(pruned.len(), 2);
1047        assert!(pruned.iter().any(|id| id == "msg-terminal-old"));
1048        assert!(pruned.iter().any(|id| id == "msg-terminal-new"));
1049        assert!(
1050            store.get_message("msg-non-terminal").expect("load non-terminal").is_some(),
1051            "non-terminal record should remain when terminal records satisfy prune count"
1052        );
1053        assert_eq!(store.message_count().expect("count after prune"), 1);
1054    }
1055
1056    #[test]
1057    fn clear_messages_resets_message_count_cache() {
1058        let store = MessagesStore::in_memory().expect("in-memory store");
1059        store.insert_message(&outbound_message("msg-1", 1, None)).expect("insert msg-1");
1060        store
1061            .insert_message(&outbound_message("msg-2", 2, Some("delivered")))
1062            .expect("insert msg-2");
1063
1064        store.clear_messages().expect("clear messages");
1065
1066        assert_eq!(store.message_count().expect("count after clear"), 0);
1067    }
1068
1069    #[test]
1070    fn prune_messages_to_limit_bytes_removes_oldest_messages() {
1071        let store = MessagesStore::in_memory().expect("in-memory store");
1072        let mut first = outbound_message("msg-1", 1, None);
1073        first.content = "a".repeat(128);
1074        let mut second = outbound_message("msg-2", 2, None);
1075        second.content = "b".repeat(128);
1076        store.insert_message(&first).expect("insert first");
1077        store.insert_message(&second).expect("insert second");
1078
1079        let before = store.message_storage_stats().expect("stats before");
1080        let pruned =
1081            store.prune_messages_to_limit_bytes(before.bytes.saturating_sub(64)).expect("prune");
1082
1083        assert_eq!(pruned, vec!["msg-1".to_string()]);
1084        let remaining = store.list_messages(10, None).expect("remaining");
1085        assert_eq!(remaining.len(), 1);
1086        assert_eq!(remaining[0].id, "msg-2");
1087    }
1088
1089    #[test]
1090    fn peer_message_stats_reports_incoming_and_outgoing_counts() {
1091        let store = MessagesStore::in_memory().expect("in-memory store");
1092        let mut outbound = outbound_message("msg-out", 1, None);
1093        outbound.destination = "peer-a".to_string();
1094        let inbound = MessageRecord {
1095            id: "msg-in".to_string(),
1096            source: "peer-a".to_string(),
1097            destination: "local".to_string(),
1098            title: "title".to_string(),
1099            content: "body".to_string(),
1100            timestamp: 2,
1101            direction: "in".to_string(),
1102            fields: None,
1103            receipt_status: None,
1104        };
1105        store.insert_message(&outbound).expect("insert outbound");
1106        store.insert_message(&inbound).expect("insert inbound");
1107
1108        let stats = store.peer_message_stats("peer-a").expect("peer stats");
1109        assert_eq!(stats.outgoing, 1);
1110        assert_eq!(stats.incoming, 1);
1111        assert_eq!(stats.offered, 1);
1112        assert_eq!(stats.unhandled, 1);
1113    }
1114
1115    #[test]
1116    fn resolve_receipt_status_updates_non_terminal_message() {
1117        let store = MessagesStore::in_memory().expect("in-memory store");
1118        store.insert_message(&outbound_message("msg-1", 1, None)).expect("insert message");
1119
1120        let resolved =
1121            store.resolve_receipt_status("msg-1", "sent: direct").expect("resolve status");
1122
1123        assert_eq!(resolved.as_deref(), Some("sent: direct"));
1124        assert_eq!(
1125            store
1126                .get_message("msg-1")
1127                .expect("load message")
1128                .expect("message exists")
1129                .receipt_status
1130                .as_deref(),
1131            Some("sent: direct")
1132        );
1133    }
1134
1135    #[test]
1136    fn resolve_receipt_status_preserves_terminal_status() {
1137        let store = MessagesStore::in_memory().expect("in-memory store");
1138        store
1139            .insert_message(&outbound_message("msg-1", 1, Some("delivered")))
1140            .expect("insert delivered message");
1141
1142        let resolved =
1143            store.resolve_receipt_status("msg-1", "sent: direct").expect("resolve status");
1144
1145        assert_eq!(resolved.as_deref(), Some("delivered"));
1146        assert_eq!(
1147            store
1148                .get_message("msg-1")
1149                .expect("load message")
1150                .expect("message exists")
1151                .receipt_status
1152                .as_deref(),
1153            Some("delivered")
1154        );
1155    }
1156}