1use rusqlite::{params, Connection, OpenFlags, OptionalExtension};
2use serde_json::Value as JsonValue;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{mpsc, Arc, Mutex};
5
6#[derive(Debug, Clone, PartialEq, serde::Serialize)]
7pub struct MessageRecord {
8 pub id: String,
9 pub source: String,
10 pub destination: String,
11 pub title: String,
12 pub content: String,
13 pub timestamp: i64,
14 pub direction: String,
15 pub fields: Option<JsonValue>,
16 pub receipt_status: Option<String>,
17}
18
19#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
20pub struct AnnounceRecord {
21 pub id: String,
22 pub peer: String,
23 pub timestamp: i64,
24 pub name: Option<String>,
25 pub name_source: Option<String>,
26 pub first_seen: i64,
27 pub seen_count: u64,
28 pub app_data_hex: Option<String>,
29 #[serde(default)]
30 pub capabilities: Vec<String>,
31 pub rssi: Option<f64>,
32 pub snr: Option<f64>,
33 pub q: Option<f64>,
34 pub stamp_cost: Option<u32>,
35 pub stamp_cost_flexibility: Option<u32>,
36 pub peering_cost: Option<u32>,
37}
38
39pub struct MessagesStore {
40 write_state: Arc<WriteState>,
41 outbound_write_tx: mpsc::Sender<OutboundWriteCommand>,
42 read_conn: Option<Mutex<Connection>>,
43 read_lock_wait_ns_total: AtomicU64,
44 read_ops_total: AtomicU64,
45}
46
47struct WriteState {
48 conn: Mutex<Connection>,
49 message_count_cache: AtomicU64,
50 write_lock_wait_ns_total: AtomicU64,
51 write_ops_total: AtomicU64,
52}
53
54enum OutboundWriteCommand {
55 InsertMessage {
56 record: MessageRecord,
57 reply: mpsc::Sender<rusqlite::Result<()>>,
58 },
59 ResolveReceiptStatus {
60 message_id: String,
61 candidate_status: String,
62 reply: mpsc::Sender<rusqlite::Result<Option<String>>>,
63 },
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub struct MessagesStoreContentionSnapshot {
68 pub read_lock_wait_ns_total: u64,
69 pub read_ops_total: u64,
70 pub write_lock_wait_ns_total: u64,
71 pub write_ops_total: u64,
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub struct MessageStorageStats {
76 pub count: u64,
77 pub bytes: u64,
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub struct PeerMessageStats {
82 pub outgoing: u64,
83 pub incoming: u64,
84 pub offered: u64,
85 pub unhandled: u64,
86}
87
88impl MessagesStore {
89 const SDK_DOMAIN_SNAPSHOT_KEY: &'static str = "sdk_domains.v1";
90
91 fn is_terminal_receipt_status(status: &str) -> bool {
92 let normalized = status.trim().to_ascii_lowercase();
93 normalized.starts_with("failed")
94 || matches!(normalized.as_str(), "cancelled" | "delivered" | "expired" | "rejected")
95 }
96
97 pub fn in_memory() -> rusqlite::Result<Self> {
98 let conn = Connection::open_in_memory()?;
99 let write_state = Arc::new(WriteState {
100 conn: Mutex::new(conn),
101 message_count_cache: AtomicU64::new(0),
102 write_lock_wait_ns_total: AtomicU64::new(0),
103 write_ops_total: AtomicU64::new(0),
104 });
105 let (outbound_write_tx, outbound_write_rx) = mpsc::channel();
106 let store = Self {
107 write_state: write_state.clone(),
108 outbound_write_tx,
109 read_conn: None,
110 read_lock_wait_ns_total: AtomicU64::new(0),
111 read_ops_total: AtomicU64::new(0),
112 };
113 store.configure_connection()?;
114 store.init_schema()?;
115 store.refresh_message_count_cache()?;
116 Self::spawn_outbound_write_worker(write_state, outbound_write_rx);
117 Ok(store)
118 }
119
120 pub fn open(path: &std::path::Path) -> rusqlite::Result<Self> {
121 let write_conn = Connection::open(path)?;
122 let read_conn = Connection::open_with_flags(
123 path,
124 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI,
125 )?;
126 let write_state = Arc::new(WriteState {
127 conn: Mutex::new(write_conn),
128 message_count_cache: AtomicU64::new(0),
129 write_lock_wait_ns_total: AtomicU64::new(0),
130 write_ops_total: AtomicU64::new(0),
131 });
132 let (outbound_write_tx, outbound_write_rx) = mpsc::channel();
133 let store = Self {
134 write_state: write_state.clone(),
135 outbound_write_tx,
136 read_conn: Some(Mutex::new(read_conn)),
137 read_lock_wait_ns_total: AtomicU64::new(0),
138 read_ops_total: AtomicU64::new(0),
139 };
140 store.configure_connection()?;
141 store.init_schema()?;
142 store.refresh_message_count_cache()?;
143 Self::spawn_outbound_write_worker(write_state, outbound_write_rx);
144 Ok(store)
145 }
146
147 fn refresh_message_count_cache(&self) -> rusqlite::Result<()> {
148 let count: i64 = self.with_read_conn(|conn| {
149 conn.query_row("SELECT COUNT(*) FROM messages", [], |row| row.get(0))
150 })?;
151 self.write_state.message_count_cache.store(count.max(0) as u64, Ordering::Relaxed);
152 Ok(())
153 }
154
155 fn spawn_outbound_write_worker(
156 write_state: Arc<WriteState>,
157 rx: mpsc::Receiver<OutboundWriteCommand>,
158 ) {
159 std::thread::Builder::new()
160 .name("messages-outbound-writer".to_string())
161 .spawn(move || {
162 while let Ok(command) = rx.recv() {
163 match command {
164 OutboundWriteCommand::InsertMessage { record, reply } => {
165 let _ = reply
166 .send(Self::insert_message_direct(write_state.as_ref(), &record));
167 }
168 OutboundWriteCommand::ResolveReceiptStatus {
169 message_id,
170 candidate_status,
171 reply,
172 } => {
173 let _ = reply.send(Self::resolve_receipt_status_direct(
174 write_state.as_ref(),
175 message_id.as_str(),
176 candidate_status.as_str(),
177 ));
178 }
179 }
180 }
181 })
182 .expect("spawn messages outbound writer");
183 }
184
185 fn with_write_conn<T>(
186 &self,
187 f: impl FnOnce(&Connection) -> rusqlite::Result<T>,
188 ) -> rusqlite::Result<T> {
189 let started = std::time::Instant::now();
190 let conn = self.write_state.conn.lock().expect("messages sqlite write mutex poisoned");
191 let waited_ns = started.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64;
192 self.write_state.write_lock_wait_ns_total.fetch_add(waited_ns, Ordering::Relaxed);
193 self.write_state.write_ops_total.fetch_add(1, Ordering::Relaxed);
194 f(&conn)
195 }
196
197 fn with_read_conn<T>(
198 &self,
199 f: impl FnOnce(&Connection) -> rusqlite::Result<T>,
200 ) -> rusqlite::Result<T> {
201 if let Some(conn) = &self.read_conn {
202 let started = std::time::Instant::now();
203 let conn = conn.lock().expect("messages sqlite read mutex poisoned");
204 let waited_ns = started.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64;
205 self.read_lock_wait_ns_total.fetch_add(waited_ns, Ordering::Relaxed);
206 self.read_ops_total.fetch_add(1, Ordering::Relaxed);
207 f(&conn)
208 } else {
209 self.with_write_conn(f)
210 }
211 }
212
213 pub fn contention_snapshot(&self) -> MessagesStoreContentionSnapshot {
214 MessagesStoreContentionSnapshot {
215 read_lock_wait_ns_total: self.read_lock_wait_ns_total.load(Ordering::Relaxed),
216 read_ops_total: self.read_ops_total.load(Ordering::Relaxed),
217 write_lock_wait_ns_total: self
218 .write_state
219 .write_lock_wait_ns_total
220 .load(Ordering::Relaxed),
221 write_ops_total: self.write_state.write_ops_total.load(Ordering::Relaxed),
222 }
223 }
224
225 fn write_lock_and_run<T>(
226 write_state: &WriteState,
227 f: impl FnOnce(&Connection) -> rusqlite::Result<T>,
228 ) -> rusqlite::Result<T> {
229 let started = std::time::Instant::now();
230 let conn = write_state.conn.lock().expect("messages sqlite write mutex poisoned");
231 let waited_ns = started.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64;
232 write_state.write_lock_wait_ns_total.fetch_add(waited_ns, Ordering::Relaxed);
233 write_state.write_ops_total.fetch_add(1, Ordering::Relaxed);
234 f(&conn)
235 }
236
237 fn insert_message_direct(
238 write_state: &WriteState,
239 record: &MessageRecord,
240 ) -> rusqlite::Result<()> {
241 let fields_json =
242 record.fields.as_ref().map(|value| serde_json::to_string(value).unwrap_or_default());
243 Self::write_lock_and_run(write_state, |conn| {
244 let inserted = conn.execute(
245 "INSERT INTO messages (id, source, destination, title, content, timestamp, direction, fields, receipt_status)
246 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
247 ON CONFLICT(id) DO NOTHING",
248 params![
249 &record.id,
250 &record.source,
251 &record.destination,
252 &record.title,
253 &record.content,
254 record.timestamp,
255 &record.direction,
256 fields_json,
257 &record.receipt_status,
258 ],
259 )?;
260 if inserted == 0 {
261 conn.execute(
262 "UPDATE messages
263 SET source = ?2,
264 destination = ?3,
265 title = ?4,
266 content = ?5,
267 timestamp = ?6,
268 direction = ?7,
269 fields = ?8,
270 receipt_status = ?9
271 WHERE id = ?1",
272 params![
273 &record.id,
274 &record.source,
275 &record.destination,
276 &record.title,
277 &record.content,
278 record.timestamp,
279 &record.direction,
280 fields_json,
281 &record.receipt_status,
282 ],
283 )?;
284 } else {
285 write_state.message_count_cache.fetch_add(1, Ordering::Relaxed);
286 }
287 Ok(())
288 })
289 }
290
291 fn resolve_receipt_status_direct(
292 write_state: &WriteState,
293 message_id: &str,
294 candidate_status: &str,
295 ) -> rusqlite::Result<Option<String>> {
296 Self::write_lock_and_run(write_state, |conn| {
297 let existing_status = conn
298 .query_row(
299 "SELECT receipt_status FROM messages WHERE id = ?1 LIMIT 1",
300 params![message_id],
301 |row| row.get::<_, Option<String>>(0),
302 )
303 .optional()?
304 .flatten();
305 if let Some(existing_status) = existing_status {
306 if Self::is_terminal_receipt_status(existing_status.as_str()) {
307 return Ok(Some(existing_status));
308 }
309 }
310 conn.execute(
311 "UPDATE messages SET receipt_status = ?1 WHERE id = ?2",
312 params![candidate_status, message_id],
313 )?;
314 Ok(Some(candidate_status.to_string()))
315 })
316 }
317
318 pub fn insert_message(&self, record: &MessageRecord) -> rusqlite::Result<()> {
319 let (reply_tx, reply_rx) = mpsc::channel();
320 self.outbound_write_tx
321 .send(OutboundWriteCommand::InsertMessage { record: record.clone(), reply: reply_tx })
322 .map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?;
323 reply_rx.recv().map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?
324 }
325
326 pub fn list_messages(
327 &self,
328 limit: usize,
329 before_ts: Option<i64>,
330 ) -> rusqlite::Result<Vec<MessageRecord>> {
331 self.with_read_conn(|conn| {
332 let mut records = Vec::new();
333 if let Some(ts) = before_ts {
334 let mut stmt = conn.prepare(
335 "SELECT id, source, destination, title, content, timestamp, direction, fields, receipt_status FROM messages WHERE timestamp < ?1 ORDER BY timestamp DESC LIMIT ?2",
336 )?;
337 let mut rows = stmt.query(params![ts, limit as i64])?;
338 while let Some(row) = rows.next()? {
339 let fields_json: Option<String> = row.get(7)?;
340 let fields =
341 fields_json.as_ref().and_then(|value| serde_json::from_str(value).ok());
342 let receipt_status: Option<String> = row.get(8)?;
343 records.push(MessageRecord {
344 id: row.get(0)?,
345 source: row.get(1)?,
346 destination: row.get(2)?,
347 title: row.get(3)?,
348 content: row.get(4)?,
349 timestamp: row.get(5)?,
350 direction: row.get(6)?,
351 fields,
352 receipt_status,
353 });
354 }
355 } else {
356 let mut stmt = conn.prepare(
357 "SELECT id, source, destination, title, content, timestamp, direction, fields, receipt_status FROM messages ORDER BY timestamp DESC LIMIT ?1",
358 )?;
359 let mut rows = stmt.query(params![limit as i64])?;
360 while let Some(row) = rows.next()? {
361 let fields_json: Option<String> = row.get(7)?;
362 let fields =
363 fields_json.as_ref().and_then(|value| serde_json::from_str(value).ok());
364 let receipt_status: Option<String> = row.get(8)?;
365 records.push(MessageRecord {
366 id: row.get(0)?,
367 source: row.get(1)?,
368 destination: row.get(2)?,
369 title: row.get(3)?,
370 content: row.get(4)?,
371 timestamp: row.get(5)?,
372 direction: row.get(6)?,
373 fields,
374 receipt_status,
375 });
376 }
377 }
378 Ok(records)
379 })
380 }
381
382 pub fn get_message(&self, message_id: &str) -> rusqlite::Result<Option<MessageRecord>> {
383 self.with_read_conn(|conn| {
384 let mut stmt = conn.prepare(
385 "SELECT id, source, destination, title, content, timestamp, direction, fields, receipt_status FROM messages WHERE id = ?1 LIMIT 1",
386 )?;
387 stmt.query_row(params![message_id], |row| {
388 let fields_json: Option<String> = row.get(7)?;
389 let fields =
390 fields_json.as_ref().and_then(|value| serde_json::from_str(value).ok());
391 let receipt_status: Option<String> = row.get(8)?;
392 Ok(MessageRecord {
393 id: row.get(0)?,
394 source: row.get(1)?,
395 destination: row.get(2)?,
396 title: row.get(3)?,
397 content: row.get(4)?,
398 timestamp: row.get(5)?,
399 direction: row.get(6)?,
400 fields,
401 receipt_status,
402 })
403 })
404 .optional()
405 })
406 }
407
408 pub fn message_count(&self) -> rusqlite::Result<u64> {
409 Ok(self.write_state.message_count_cache.load(Ordering::Relaxed))
410 }
411
412 pub fn message_storage_stats(&self) -> rusqlite::Result<MessageStorageStats> {
413 self.with_read_conn(|conn| {
414 let count = self.write_state.message_count_cache.load(Ordering::Relaxed);
415 let bytes: Option<i64> = conn.query_row(
416 "SELECT COALESCE(SUM(
417 LENGTH(id) +
418 LENGTH(source) +
419 LENGTH(destination) +
420 LENGTH(title) +
421 LENGTH(content) +
422 LENGTH(direction) +
423 COALESCE(LENGTH(fields), 0) +
424 COALESCE(LENGTH(receipt_status), 0)
425 ), 0) FROM messages",
426 [],
427 |row| row.get(0),
428 )?;
429 Ok(MessageStorageStats { count, bytes: bytes.unwrap_or(0).max(0) as u64 })
430 })
431 }
432
433 pub fn peer_message_stats(&self, peer: &str) -> rusqlite::Result<PeerMessageStats> {
434 self.with_read_conn(|conn| {
435 let (outgoing, incoming, offered, unhandled): (i64, i64, i64, i64) = conn.query_row(
436 "SELECT
437 COALESCE(SUM(CASE WHEN destination = ?1 AND direction = 'out' THEN 1 ELSE 0 END), 0),
438 COALESCE(SUM(CASE WHEN source = ?1 AND direction = 'in' THEN 1 ELSE 0 END), 0),
439 COALESCE(SUM(CASE
440 WHEN destination = ?1
441 AND direction = 'out'
442 AND (
443 receipt_status IS NULL
444 OR TRIM(receipt_status) = ''
445 OR (
446 LOWER(receipt_status) NOT LIKE 'sent%'
447 AND LOWER(receipt_status) NOT IN ('cancelled', 'delivered', 'failed', 'expired', 'rejected')
448 )
449 )
450 THEN 1
451 ELSE 0
452 END), 0),
453 COALESCE(SUM(CASE WHEN source = ?1 AND direction = 'in' AND receipt_status IS NULL THEN 1 ELSE 0 END), 0)
454 FROM messages",
455 params![peer],
456 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
457 )?;
458 Ok(PeerMessageStats {
459 outgoing: outgoing.max(0) as u64,
460 incoming: incoming.max(0) as u64,
461 offered: offered.max(0) as u64,
462 unhandled: unhandled.max(0) as u64,
463 })
464 })
465 }
466
467 pub fn count_message_buckets(&self) -> rusqlite::Result<(u64, u64)> {
468 let (queued, in_flight): (i64, i64) = self.with_read_conn(|conn| {
469 let mut stmt = conn.prepare(
470 "SELECT
471 COALESCE(SUM(CASE
472 WHEN receipt_status IS NULL OR TRIM(receipt_status) = '' THEN 1
473 ELSE 0
474 END), 0) AS queued_count,
475 COALESCE(SUM(CASE
476 WHEN receipt_status IS NOT NULL
477 AND TRIM(receipt_status) <> ''
478 AND LOWER(receipt_status) NOT LIKE 'sent%'
479 AND LOWER(receipt_status) NOT IN ('cancelled', 'delivered', 'failed', 'expired', 'rejected')
480 THEN 1
481 ELSE 0
482 END), 0) AS in_flight_count
483 FROM messages",
484 )?;
485 stmt.query_row([], |row| Ok((row.get(0)?, row.get(1)?)))
486 })?;
487 Ok((queued.max(0) as u64, in_flight.max(0) as u64))
488 }
489
490 pub fn count_outbound_messages(&self) -> rusqlite::Result<u64> {
491 let count: i64 = self.with_read_conn(|conn| {
492 conn.query_row("SELECT COUNT(*) FROM messages WHERE direction = 'out'", [], |row| {
493 row.get(0)
494 })
495 })?;
496 Ok(count.max(0) as u64)
497 }
498
499 pub fn expire_outbound_messages_before(&self, cutoff_ts: i64) -> rusqlite::Result<Vec<String>> {
500 self.with_write_conn(|conn| {
501 let mut stmt = conn.prepare(
502 "SELECT id
503 FROM messages
504 WHERE direction = 'out'
505 AND timestamp < ?1
506 AND (
507 receipt_status IS NULL
508 OR TRIM(receipt_status) = ''
509 OR (
510 LOWER(receipt_status) NOT LIKE 'sent%'
511 AND LOWER(receipt_status) NOT IN ('cancelled', 'delivered', 'failed', 'expired', 'rejected')
512 )
513 )
514 ORDER BY timestamp ASC, id ASC",
515 )?;
516 let mut rows = stmt.query(params![cutoff_ts])?;
517 let mut ids = Vec::new();
518 while let Some(row) = rows.next()? {
519 ids.push(row.get::<_, String>(0)?);
520 }
521 drop(rows);
522 drop(stmt);
523 for message_id in ids.iter() {
524 conn.execute(
525 "UPDATE messages SET receipt_status = 'expired' WHERE id = ?1",
526 params![message_id],
527 )?;
528 }
529 Ok(ids)
530 })
531 }
532
533 pub fn prune_outbound_messages(
534 &self,
535 count: usize,
536 eviction_priority: &str,
537 ) -> rusqlite::Result<Vec<String>> {
538 if count == 0 {
539 return Ok(Vec::new());
540 }
541 self.with_write_conn(|conn| {
542 let collect_ids = |query: &str, remaining: usize| -> rusqlite::Result<Vec<String>> {
543 if remaining == 0 {
544 return Ok(Vec::new());
545 }
546 let mut stmt = conn.prepare(query)?;
547 let mut rows = stmt.query(params![remaining as i64])?;
548 let mut ids = Vec::new();
549 while let Some(row) = rows.next()? {
550 ids.push(row.get::<_, String>(0)?);
551 }
552 Ok(ids)
553 };
554
555 let normalized = eviction_priority.trim().to_ascii_lowercase();
556 let mut ids = if normalized == "terminal_first" {
557 let mut selected = collect_ids(
558 "SELECT id
559 FROM messages
560 WHERE direction = 'out'
561 AND receipt_status IS NOT NULL
562 AND TRIM(receipt_status) <> ''
563 AND (
564 LOWER(receipt_status) LIKE 'sent%'
565 OR LOWER(receipt_status) IN ('cancelled', 'delivered', 'failed', 'expired', 'rejected')
566 )
567 ORDER BY timestamp ASC, id ASC
568 LIMIT ?1",
569 count,
570 )?;
571 let remaining = count.saturating_sub(selected.len());
572 if remaining > 0 {
573 let mut non_terminal = collect_ids(
574 "SELECT id
575 FROM messages
576 WHERE direction = 'out'
577 AND (
578 receipt_status IS NULL
579 OR TRIM(receipt_status) = ''
580 OR (
581 LOWER(receipt_status) NOT LIKE 'sent%'
582 AND LOWER(receipt_status) NOT IN ('cancelled', 'delivered', 'failed', 'expired', 'rejected')
583 )
584 )
585 ORDER BY timestamp ASC, id ASC
586 LIMIT ?1",
587 remaining,
588 )?;
589 selected.append(&mut non_terminal);
590 }
591 selected
592 } else {
593 collect_ids(
594 "SELECT id
595 FROM messages
596 WHERE direction = 'out'
597 ORDER BY timestamp ASC, id ASC
598 LIMIT ?1",
599 count,
600 )?
601 };
602
603 ids.sort();
604 ids.dedup();
605 for message_id in ids.iter() {
606 conn.execute("DELETE FROM messages WHERE id = ?1", params![message_id])?;
607 }
608 if !ids.is_empty() {
609 self.write_state
610 .message_count_cache
611 .fetch_sub(ids.len().min(u64::MAX as usize) as u64, Ordering::Relaxed);
612 }
613 Ok(ids)
614 })
615 }
616
617 pub fn prune_messages_to_limit_bytes(&self, limit_bytes: u64) -> rusqlite::Result<Vec<String>> {
618 self.with_write_conn(|conn| {
619 let current_bytes: i64 = conn.query_row(
620 "SELECT COALESCE(SUM(
621 LENGTH(id) +
622 LENGTH(source) +
623 LENGTH(destination) +
624 LENGTH(title) +
625 LENGTH(content) +
626 LENGTH(direction) +
627 COALESCE(LENGTH(fields), 0) +
628 COALESCE(LENGTH(receipt_status), 0)
629 ), 0) FROM messages",
630 [],
631 |row| row.get(0),
632 )?;
633 if current_bytes.max(0) as u64 <= limit_bytes {
634 return Ok(Vec::new());
635 }
636
637 let mut stmt = conn.prepare(
638 "SELECT id,
639 LENGTH(id) +
640 LENGTH(source) +
641 LENGTH(destination) +
642 LENGTH(title) +
643 LENGTH(content) +
644 LENGTH(direction) +
645 COALESCE(LENGTH(fields), 0) +
646 COALESCE(LENGTH(receipt_status), 0) AS approx_bytes
647 FROM messages
648 ORDER BY timestamp ASC, id ASC",
649 )?;
650 let mut rows = stmt.query([])?;
651 let mut bytes = current_bytes.max(0) as u64;
652 let mut ids = Vec::new();
653 while let Some(row) = rows.next()? {
654 if bytes <= limit_bytes {
655 break;
656 }
657 let id: String = row.get(0)?;
658 let approx_bytes: i64 = row.get(1)?;
659 ids.push(id);
660 bytes = bytes.saturating_sub(approx_bytes.max(0) as u64);
661 }
662 drop(rows);
663 drop(stmt);
664
665 for message_id in ids.iter() {
666 conn.execute("DELETE FROM messages WHERE id = ?1", params![message_id])?;
667 }
668 if !ids.is_empty() {
669 self.write_state
670 .message_count_cache
671 .fetch_sub(ids.len().min(u64::MAX as usize) as u64, Ordering::Relaxed);
672 }
673 Ok(ids)
674 })
675 }
676
677 pub fn update_receipt_status(&self, message_id: &str, status: &str) -> rusqlite::Result<()> {
678 self.with_write_conn(|conn| {
679 conn.execute(
680 "UPDATE messages SET receipt_status = ?1 WHERE id = ?2",
681 params![status, message_id],
682 )?;
683 Ok(())
684 })
685 }
686
687 pub fn resolve_receipt_status(
688 &self,
689 message_id: &str,
690 candidate_status: &str,
691 ) -> rusqlite::Result<Option<String>> {
692 let (reply_tx, reply_rx) = mpsc::channel();
693 self.outbound_write_tx
694 .send(OutboundWriteCommand::ResolveReceiptStatus {
695 message_id: message_id.to_string(),
696 candidate_status: candidate_status.to_string(),
697 reply: reply_tx,
698 })
699 .map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?;
700 reply_rx.recv().map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?
701 }
702
703 pub fn clear_messages(&self) -> rusqlite::Result<()> {
704 self.with_write_conn(|conn| {
705 conn.execute("DELETE FROM messages", [])?;
706 self.write_state.message_count_cache.store(0, Ordering::Relaxed);
707 Ok(())
708 })
709 }
710
711 pub fn insert_announce(&self, record: &AnnounceRecord) -> rusqlite::Result<()> {
712 let capabilities_json = serde_json::to_string(&record.capabilities).unwrap_or_default();
713 self.with_write_conn(|conn| {
714 conn.execute(
715 "INSERT OR REPLACE INTO announces (id, peer, timestamp, name, name_source, first_seen, seen_count, app_data_hex, capabilities, rssi, snr, q, stamp_cost, stamp_cost_flexibility, peering_cost) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
716 params![
717 &record.id,
718 &record.peer,
719 record.timestamp,
720 &record.name,
721 &record.name_source,
722 record.first_seen,
723 record.seen_count as i64,
724 &record.app_data_hex,
725 capabilities_json,
726 record.rssi,
727 record.snr,
728 record.q,
729 record.stamp_cost,
730 record.stamp_cost_flexibility,
731 record.peering_cost,
732 ],
733 )?;
734 Ok(())
735 })
736 }
737
738 pub fn list_announces(
739 &self,
740 limit: usize,
741 before_ts: Option<i64>,
742 before_id: Option<&str>,
743 ) -> rusqlite::Result<Vec<AnnounceRecord>> {
744 self.with_read_conn(|conn| {
745 let mut records = Vec::new();
746 let parse_row = |row: &rusqlite::Row| -> rusqlite::Result<AnnounceRecord> {
747 let capabilities_json: Option<String> = row.get(8)?;
748 let capabilities = capabilities_json
749 .as_deref()
750 .and_then(|value| serde_json::from_str::<Vec<String>>(value).ok())
751 .unwrap_or_default();
752 let seen_count: i64 = row.get(6)?;
753 Ok(AnnounceRecord {
754 id: row.get(0)?,
755 peer: row.get(1)?,
756 timestamp: row.get(2)?,
757 name: row.get(3)?,
758 name_source: row.get(4)?,
759 first_seen: row.get(5)?,
760 seen_count: seen_count.max(0) as u64,
761 app_data_hex: row.get(7)?,
762 capabilities,
763 rssi: row.get(9)?,
764 snr: row.get(10)?,
765 q: row.get(11)?,
766 stamp_cost: row.get(12)?,
767 stamp_cost_flexibility: row.get(13)?,
768 peering_cost: row.get(14)?,
769 })
770 };
771 if let Some(ts) = before_ts {
772 let query_with_id = "SELECT id, peer, timestamp, name, name_source, first_seen, seen_count, app_data_hex, capabilities, rssi, snr, q, stamp_cost, stamp_cost_flexibility, peering_cost FROM announces WHERE (timestamp < ?1 OR (timestamp = ?1 AND id < ?2)) ORDER BY timestamp DESC, id DESC LIMIT ?3";
773 let query_without_id = "SELECT id, peer, timestamp, name, name_source, first_seen, seen_count, app_data_hex, capabilities, rssi, snr, q, stamp_cost, stamp_cost_flexibility, peering_cost FROM announces WHERE timestamp < ?1 ORDER BY timestamp DESC, id DESC LIMIT ?2";
774 if let Some(ann_id) = before_id {
775 let mut stmt = conn.prepare(query_with_id)?;
776 let mut rows = stmt.query(params![ts, ann_id, limit as i64])?;
777 while let Some(row) = rows.next()? {
778 records.push(parse_row(row)?);
779 }
780 } else {
781 let mut stmt = conn.prepare(query_without_id)?;
782 let mut rows = stmt.query(params![ts, limit as i64])?;
783 while let Some(row) = rows.next()? {
784 records.push(parse_row(row)?);
785 }
786 }
787 } else {
788 let mut stmt = conn.prepare(
789 "SELECT id, peer, timestamp, name, name_source, first_seen, seen_count, app_data_hex, capabilities, rssi, snr, q, stamp_cost, stamp_cost_flexibility, peering_cost FROM announces ORDER BY timestamp DESC LIMIT ?1",
790 )?;
791 let mut rows = stmt.query(params![limit as i64])?;
792 while let Some(row) = rows.next()? {
793 records.push(parse_row(row)?);
794 }
795 }
796 Ok(records)
797 })
798 }
799
800 pub fn clear_announces(&self) -> rusqlite::Result<()> {
801 self.with_write_conn(|conn| {
802 conn.execute("DELETE FROM announces", [])?;
803 Ok(())
804 })
805 }
806
807 pub fn put_sdk_domain_snapshot(&self, snapshot: &JsonValue) -> rusqlite::Result<()> {
808 let snapshot_json = serde_json::to_string(snapshot)
809 .map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?;
810 self.with_write_conn(|conn| {
811 conn.execute(
812 "INSERT INTO sdk_domain_state (domain, state_json) VALUES (?1, ?2)
813 ON CONFLICT(domain) DO UPDATE SET state_json = excluded.state_json",
814 params![Self::SDK_DOMAIN_SNAPSHOT_KEY, snapshot_json],
815 )?;
816 Ok(())
817 })
818 }
819
820 pub fn get_sdk_domain_snapshot(&self) -> rusqlite::Result<Option<JsonValue>> {
821 let snapshot_json: Option<String> = self.with_read_conn(|conn| {
822 conn.query_row(
823 "SELECT state_json FROM sdk_domain_state WHERE domain = ?1 LIMIT 1",
824 params![Self::SDK_DOMAIN_SNAPSHOT_KEY],
825 |row| row.get(0),
826 )
827 .optional()
828 })?;
829 let Some(snapshot_json) = snapshot_json else {
830 return Ok(None);
831 };
832 let parsed = serde_json::from_str(snapshot_json.as_str()).map_err(|err| {
833 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(err))
834 })?;
835 Ok(Some(parsed))
836 }
837
838 pub fn clear_sdk_domain_snapshot(&self) -> rusqlite::Result<()> {
839 self.with_write_conn(|conn| {
840 conn.execute(
841 "DELETE FROM sdk_domain_state WHERE domain = ?1",
842 params![Self::SDK_DOMAIN_SNAPSHOT_KEY],
843 )?;
844 Ok(())
845 })
846 }
847
848 fn configure_connection(&self) -> rusqlite::Result<()> {
849 self.with_write_conn(|conn| {
850 conn.pragma_update(None, "journal_mode", "WAL")?;
851 conn.pragma_update(None, "synchronous", "NORMAL")?;
852 conn.pragma_update(None, "busy_timeout", 5_000i64)?;
853 Ok(())
854 })?;
855 if self.read_conn.is_some() {
856 self.with_read_conn(|conn| {
857 conn.pragma_update(None, "busy_timeout", 5_000i64)?;
858 Ok(())
859 })?;
860 }
861 Ok(())
862 }
863
864 #[cfg(test)]
865 fn busy_timeout_ms(&self) -> rusqlite::Result<i64> {
866 self.with_write_conn(|conn| conn.query_row("PRAGMA busy_timeout", [], |row| row.get(0)))
867 }
868
869 fn init_schema(&self) -> rusqlite::Result<()> {
870 self.with_write_conn(|conn| {
871 conn.execute_batch(
872 "CREATE TABLE IF NOT EXISTS messages (
873 id TEXT PRIMARY KEY,
874 source TEXT NOT NULL,
875 destination TEXT NOT NULL,
876 title TEXT NOT NULL,
877 content TEXT NOT NULL,
878 timestamp INTEGER NOT NULL,
879 direction TEXT NOT NULL,
880 fields TEXT,
881 receipt_status TEXT
882 );
883 CREATE TABLE IF NOT EXISTS announces (
884 id TEXT PRIMARY KEY,
885 peer TEXT NOT NULL,
886 timestamp INTEGER NOT NULL,
887 name TEXT,
888 name_source TEXT,
889 first_seen INTEGER NOT NULL,
890 seen_count INTEGER NOT NULL,
891 app_data_hex TEXT,
892 capabilities TEXT,
893 rssi REAL,
894 snr REAL,
895 q REAL,
896 stamp_cost INTEGER,
897 stamp_cost_flexibility INTEGER,
898 peering_cost INTEGER
899 );
900 CREATE TABLE IF NOT EXISTS sdk_domain_state (
901 domain TEXT PRIMARY KEY,
902 state_json TEXT NOT NULL
903 );
904 CREATE INDEX IF NOT EXISTS idx_messages_timestamp_desc
905 ON messages(timestamp DESC);
906 CREATE INDEX IF NOT EXISTS idx_messages_direction_timestamp_desc
907 ON messages(direction, timestamp DESC);
908 CREATE INDEX IF NOT EXISTS idx_messages_receipt_status
909 ON messages(receipt_status);
910 CREATE INDEX IF NOT EXISTS idx_announces_timestamp_id_desc
911 ON announces(timestamp DESC, id DESC);",
912 )?;
913 let _ = conn.execute("ALTER TABLE messages ADD COLUMN title TEXT", []);
914 let _ = conn.execute("UPDATE messages SET title = '' WHERE title IS NULL", []);
915 let _ = conn.execute("ALTER TABLE messages ADD COLUMN fields TEXT", []);
916 let _ = conn.execute("ALTER TABLE messages ADD COLUMN receipt_status TEXT", []);
917 let _ = conn.execute("ALTER TABLE announces ADD COLUMN name TEXT", []);
918 let _ = conn.execute("ALTER TABLE announces ADD COLUMN name_source TEXT", []);
919 let _ = conn.execute("ALTER TABLE announces ADD COLUMN first_seen INTEGER", []);
920 let _ = conn.execute("ALTER TABLE announces ADD COLUMN seen_count INTEGER", []);
921 let _ = conn.execute("ALTER TABLE announces ADD COLUMN app_data_hex TEXT", []);
922 let _ = conn.execute("ALTER TABLE announces ADD COLUMN capabilities TEXT", []);
923 let _ = conn.execute("ALTER TABLE announces ADD COLUMN rssi REAL", []);
924 let _ = conn.execute("ALTER TABLE announces ADD COLUMN snr REAL", []);
925 let _ = conn.execute("ALTER TABLE announces ADD COLUMN q REAL", []);
926 let _ = conn.execute("ALTER TABLE announces ADD COLUMN stamp_cost INTEGER", []);
927 let _ =
928 conn.execute("ALTER TABLE announces ADD COLUMN stamp_cost_flexibility INTEGER", []);
929 let _ = conn.execute("ALTER TABLE announces ADD COLUMN peering_cost INTEGER", []);
930 Ok(())
931 })
932 }
933}
934
935#[cfg(test)]
936mod tests {
937 use super::*;
938 use serde_json::json;
939
940 fn outbound_message(id: &str, timestamp: i64, receipt_status: Option<&str>) -> MessageRecord {
941 MessageRecord {
942 id: id.to_string(),
943 source: "src".to_string(),
944 destination: "dst".to_string(),
945 title: "title".to_string(),
946 content: "body".to_string(),
947 timestamp,
948 direction: "out".to_string(),
949 fields: None,
950 receipt_status: receipt_status.map(ToString::to_string),
951 }
952 }
953
954 #[test]
955 fn sdk_domain_snapshot_roundtrip() {
956 let store = MessagesStore::in_memory().expect("in-memory store");
957 let initial = store.get_sdk_domain_snapshot().expect("query snapshot");
958 assert!(initial.is_none(), "snapshot should be absent before first write");
959
960 let snapshot = json!({
961 "topics": [{ "topic_id": "topic-1" }],
962 "attachments": [],
963 "markers": [],
964 });
965 store.put_sdk_domain_snapshot(&snapshot).expect("persist snapshot");
966
967 let loaded = store.get_sdk_domain_snapshot().expect("load snapshot");
968 assert_eq!(loaded, Some(snapshot));
969 }
970
971 #[test]
972 fn sdk_domain_snapshot_clear_removes_record() {
973 let store = MessagesStore::in_memory().expect("in-memory store");
974 store
975 .put_sdk_domain_snapshot(&json!({ "voice_sessions": [{ "session_id": "voice-1" }] }))
976 .expect("persist snapshot");
977 store.clear_sdk_domain_snapshot().expect("clear snapshot");
978 let loaded = store.get_sdk_domain_snapshot().expect("load snapshot");
979 assert!(loaded.is_none(), "snapshot should be removed after clear");
980 }
981
982 #[test]
983 fn message_count_uses_direct_count() {
984 let store = MessagesStore::in_memory().expect("in-memory store");
985 store.insert_message(&outbound_message("msg-1", 1, None)).expect("insert msg-1");
986 store
987 .insert_message(&outbound_message("msg-2", 2, Some("delivered")))
988 .expect("insert msg-2");
989
990 assert_eq!(store.message_count().expect("count messages"), 2);
991 }
992
993 #[test]
994 fn message_count_cache_ignores_replace_for_existing_id() {
995 let store = MessagesStore::in_memory().expect("in-memory store");
996 store.insert_message(&outbound_message("msg-1", 1, None)).expect("insert original");
997 store
998 .insert_message(&outbound_message("msg-1", 2, Some("delivered")))
999 .expect("replace existing");
1000
1001 assert_eq!(store.message_count().expect("count messages"), 1);
1002 }
1003
1004 #[test]
1005 fn configure_connection_sets_busy_timeout() {
1006 let store = MessagesStore::in_memory().expect("in-memory store");
1007 let busy_timeout_ms = store.busy_timeout_ms().expect("query busy_timeout");
1008 assert_eq!(busy_timeout_ms, 5_000);
1009 }
1010
1011 #[test]
1012 fn expire_outbound_messages_marks_non_terminal_records() {
1013 let store = MessagesStore::in_memory().expect("in-memory store");
1014 store
1015 .insert_message(&outbound_message("out-non-terminal", 10, None))
1016 .expect("insert non-terminal");
1017 store
1018 .insert_message(&outbound_message("out-terminal", 10, Some("delivered")))
1019 .expect("insert terminal");
1020 let expired = store.expire_outbound_messages_before(11).expect("expire outbound");
1021 assert_eq!(expired, vec!["out-non-terminal".to_string()]);
1022 let non_terminal = store
1023 .get_message("out-non-terminal")
1024 .expect("load non-terminal")
1025 .expect("non-terminal exists");
1026 assert_eq!(non_terminal.receipt_status.as_deref(), Some("expired"));
1027 let terminal =
1028 store.get_message("out-terminal").expect("load terminal").expect("terminal exists");
1029 assert_eq!(terminal.receipt_status.as_deref(), Some("delivered"));
1030 }
1031
1032 #[test]
1033 fn prune_outbound_messages_terminal_first_prefers_terminal_records() {
1034 let store = MessagesStore::in_memory().expect("in-memory store");
1035 store
1036 .insert_message(&outbound_message("msg-terminal-old", 1, Some("sent: direct")))
1037 .expect("insert terminal old");
1038 store
1039 .insert_message(&outbound_message("msg-non-terminal", 2, None))
1040 .expect("insert non-terminal");
1041 store
1042 .insert_message(&outbound_message("msg-terminal-new", 3, Some("delivered")))
1043 .expect("insert terminal new");
1044
1045 let pruned = store.prune_outbound_messages(2, "terminal_first").expect("prune outbound");
1046 assert_eq!(pruned.len(), 2);
1047 assert!(pruned.iter().any(|id| id == "msg-terminal-old"));
1048 assert!(pruned.iter().any(|id| id == "msg-terminal-new"));
1049 assert!(
1050 store.get_message("msg-non-terminal").expect("load non-terminal").is_some(),
1051 "non-terminal record should remain when terminal records satisfy prune count"
1052 );
1053 assert_eq!(store.message_count().expect("count after prune"), 1);
1054 }
1055
1056 #[test]
1057 fn clear_messages_resets_message_count_cache() {
1058 let store = MessagesStore::in_memory().expect("in-memory store");
1059 store.insert_message(&outbound_message("msg-1", 1, None)).expect("insert msg-1");
1060 store
1061 .insert_message(&outbound_message("msg-2", 2, Some("delivered")))
1062 .expect("insert msg-2");
1063
1064 store.clear_messages().expect("clear messages");
1065
1066 assert_eq!(store.message_count().expect("count after clear"), 0);
1067 }
1068
1069 #[test]
1070 fn prune_messages_to_limit_bytes_removes_oldest_messages() {
1071 let store = MessagesStore::in_memory().expect("in-memory store");
1072 let mut first = outbound_message("msg-1", 1, None);
1073 first.content = "a".repeat(128);
1074 let mut second = outbound_message("msg-2", 2, None);
1075 second.content = "b".repeat(128);
1076 store.insert_message(&first).expect("insert first");
1077 store.insert_message(&second).expect("insert second");
1078
1079 let before = store.message_storage_stats().expect("stats before");
1080 let pruned =
1081 store.prune_messages_to_limit_bytes(before.bytes.saturating_sub(64)).expect("prune");
1082
1083 assert_eq!(pruned, vec!["msg-1".to_string()]);
1084 let remaining = store.list_messages(10, None).expect("remaining");
1085 assert_eq!(remaining.len(), 1);
1086 assert_eq!(remaining[0].id, "msg-2");
1087 }
1088
1089 #[test]
1090 fn peer_message_stats_reports_incoming_and_outgoing_counts() {
1091 let store = MessagesStore::in_memory().expect("in-memory store");
1092 let mut outbound = outbound_message("msg-out", 1, None);
1093 outbound.destination = "peer-a".to_string();
1094 let inbound = MessageRecord {
1095 id: "msg-in".to_string(),
1096 source: "peer-a".to_string(),
1097 destination: "local".to_string(),
1098 title: "title".to_string(),
1099 content: "body".to_string(),
1100 timestamp: 2,
1101 direction: "in".to_string(),
1102 fields: None,
1103 receipt_status: None,
1104 };
1105 store.insert_message(&outbound).expect("insert outbound");
1106 store.insert_message(&inbound).expect("insert inbound");
1107
1108 let stats = store.peer_message_stats("peer-a").expect("peer stats");
1109 assert_eq!(stats.outgoing, 1);
1110 assert_eq!(stats.incoming, 1);
1111 assert_eq!(stats.offered, 1);
1112 assert_eq!(stats.unhandled, 1);
1113 }
1114
1115 #[test]
1116 fn resolve_receipt_status_updates_non_terminal_message() {
1117 let store = MessagesStore::in_memory().expect("in-memory store");
1118 store.insert_message(&outbound_message("msg-1", 1, None)).expect("insert message");
1119
1120 let resolved =
1121 store.resolve_receipt_status("msg-1", "sent: direct").expect("resolve status");
1122
1123 assert_eq!(resolved.as_deref(), Some("sent: direct"));
1124 assert_eq!(
1125 store
1126 .get_message("msg-1")
1127 .expect("load message")
1128 .expect("message exists")
1129 .receipt_status
1130 .as_deref(),
1131 Some("sent: direct")
1132 );
1133 }
1134
1135 #[test]
1136 fn resolve_receipt_status_preserves_terminal_status() {
1137 let store = MessagesStore::in_memory().expect("in-memory store");
1138 store
1139 .insert_message(&outbound_message("msg-1", 1, Some("delivered")))
1140 .expect("insert delivered message");
1141
1142 let resolved =
1143 store.resolve_receipt_status("msg-1", "sent: direct").expect("resolve status");
1144
1145 assert_eq!(resolved.as_deref(), Some("delivered"));
1146 assert_eq!(
1147 store
1148 .get_message("msg-1")
1149 .expect("load message")
1150 .expect("message exists")
1151 .receipt_status
1152 .as_deref(),
1153 Some("delivered")
1154 );
1155 }
1156}