1use serde::{Deserialize, Serialize};
25
26use super::IntegrationError;
27
28#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34pub struct InboundMessage {
35 pub rowid: i64,
37 pub handle_id: String,
39 pub body: String,
41 pub date: i64,
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
51pub struct Watermark {
52 pub last_rowid: i64,
53}
54
55impl Watermark {
56 pub fn new(last_rowid: i64) -> Self {
57 Self { last_rowid }
58 }
59
60 pub fn path_in(base_dir: &std::path::Path) -> std::path::PathBuf {
63 base_dir.join("messages-inbound-watermark.json")
64 }
65
66 pub fn load(base_dir: &std::path::Path) -> Result<Option<Self>, IntegrationError> {
69 let path = Self::path_in(base_dir);
70 match std::fs::read(&path) {
71 Ok(bytes) => {
72 let wm: Watermark = serde_json::from_slice(&bytes).map_err(|e| {
73 IntegrationError::Backend(format!("watermark json parse: {e}"))
74 })?;
75 Ok(Some(wm))
76 }
77 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
78 Err(e) => Err(IntegrationError::Backend(format!("watermark read: {e}"))),
79 }
80 }
81
82 pub fn persist(&self, base_dir: &std::path::Path) -> Result<(), IntegrationError> {
91 std::fs::create_dir_all(base_dir).map_err(|e| {
92 IntegrationError::Backend(format!("watermark mkdir {}: {e}", base_dir.display()))
93 })?;
94 let path = Self::path_in(base_dir);
95 let bytes = serde_json::to_vec(self)
96 .map_err(|e| IntegrationError::Backend(format!("watermark serialize: {e}")))?;
97 atomic_write(&path, &bytes)
98 }
99}
100
101fn atomic_write(path: &std::path::Path, bytes: &[u8]) -> Result<(), IntegrationError> {
107 use std::sync::atomic::{AtomicU64, Ordering};
108 static SEQ: AtomicU64 = AtomicU64::new(0);
109 let seq = SEQ.fetch_add(1, Ordering::Relaxed);
110 let mut tmp_os = path.as_os_str().to_owned();
111 tmp_os.push(format!(".tmp.{}.{}", std::process::id(), seq));
112 let tmp = std::path::PathBuf::from(tmp_os);
113 std::fs::write(&tmp, bytes)
114 .map_err(|e| IntegrationError::Backend(format!("watermark temp write: {e}")))?;
115 std::fs::rename(&tmp, path)
116 .map_err(|e| IntegrationError::Backend(format!("watermark rename: {e}")))
117}
118
119const MAX_ATTRIBUTED_BODY_DECODE_BYTES: usize = 8 * 1024;
127
128pub fn decode_attributed_body(blob: &[u8]) -> Option<String> {
146 if blob.len() > MAX_ATTRIBUTED_BODY_DECODE_BYTES {
154 return None;
155 }
156 if let Some(s) = decode_via_nsstring_marker(blob) {
158 return Some(s);
159 }
160 decode_via_plus_scan(blob)
165}
166
167fn decode_via_nsstring_marker(blob: &[u8]) -> Option<String> {
168 let marker = b"NSString";
169 let mut search_from = 0usize;
170 while let Some(rel) = find_subsequence(&blob[search_from..], marker) {
171 let after_marker = search_from + rel + marker.len();
172 if let Some(plus_rel) = find_byte(&blob[after_marker..], b'+') {
177 let plus_idx = after_marker + plus_rel;
178 if let Some(s) = read_length_prefixed_utf8(blob, plus_idx + 1) {
179 if !s.is_empty() {
180 return Some(s);
181 }
182 }
183 }
184 search_from = after_marker;
185 }
186 None
187}
188
189fn decode_via_plus_scan(blob: &[u8]) -> Option<String> {
199 let mut i = 0usize;
200 while i < blob.len() {
201 if blob[i] == b'+' {
202 if let Some(s) = read_length_prefixed_utf8(blob, i + 1) {
203 if !s.is_empty() && s.chars().all(|c| !c.is_control() || c == '\n' || c == '\t') {
204 return Some(s);
205 }
206 }
207 }
208 i += 1;
209 }
210 None
211}
212
213fn read_length_prefixed_utf8(blob: &[u8], pos: usize) -> Option<String> {
224 let first = *blob.get(pos)?;
225 let (len, start) = if first == 0x81 {
226 let lo = *blob.get(pos + 1)? as usize;
227 let hi = *blob.get(pos + 2)? as usize;
228 ((lo | (hi << 8)), pos + 3)
229 } else if first < 0x80 {
230 (first as usize, pos + 1)
231 } else {
232 return None;
235 };
236 if len == 0 {
237 return None;
238 }
239 let end = start.checked_add(len)?;
240 let slice = blob.get(start..end)?;
241 std::str::from_utf8(slice).ok().map(|s| s.to_string())
242}
243
244fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
245 if needle.is_empty() || haystack.len() < needle.len() {
246 return None;
247 }
248 haystack
249 .windows(needle.len())
250 .position(|window| window == needle)
251}
252
253fn find_byte(haystack: &[u8], byte: u8) -> Option<usize> {
254 haystack.iter().position(|&b| b == byte)
255}
256
257pub fn resolve_body(attributed_body_hex: Option<&str>, text: Option<&str>) -> Option<String> {
262 if let Some(hex) = attributed_body_hex {
263 if !hex.is_empty() {
264 if let Some(bytes) = hex_decode(hex) {
265 if let Some(decoded) = decode_attributed_body(&bytes) {
266 if !decoded.is_empty() {
267 return Some(decoded);
268 }
269 }
270 }
271 }
272 }
273 match text {
274 Some(t) if !t.is_empty() => Some(t.to_string()),
275 _ => None,
276 }
277}
278
279fn hex_decode(hex: &str) -> Option<Vec<u8>> {
282 let bytes = hex.as_bytes();
283 if bytes.len() % 2 != 0 {
284 return None;
285 }
286 let mut out = Vec::with_capacity(bytes.len() / 2);
287 let mut i = 0;
288 while i < bytes.len() {
289 let hi = hex_nibble(bytes[i])?;
290 let lo = hex_nibble(bytes[i + 1])?;
291 out.push((hi << 4) | lo);
292 i += 2;
293 }
294 Some(out)
295}
296
297fn hex_nibble(b: u8) -> Option<u8> {
298 match b {
299 b'0'..=b'9' => Some(b - b'0'),
300 b'a'..=b'f' => Some(b - b'a' + 10),
301 b'A'..=b'F' => Some(b - b'A' + 10),
302 _ => None,
303 }
304}
305
306fn inbound_sql(min_rowid: i64) -> String {
310 format!(
311 "SELECT message.ROWID AS rowid, \
312 handle.id AS handle_id, \
313 CASE WHEN message.attributedBody IS NULL THEN NULL \
314 ELSE hex(message.attributedBody) END AS attributed_body_hex, \
315 message.text AS text, \
316 message.date AS date \
317 FROM message \
318 JOIN handle ON message.handle_id = handle.ROWID \
319 WHERE message.ROWID > {min_rowid} AND message.is_from_me = 0 \
320 ORDER BY message.ROWID ASC;"
321 )
322}
323
324fn max_rowid_sql() -> &'static str {
326 "SELECT COALESCE(MAX(ROWID), 0) AS rowid FROM message;"
327}
328
329#[derive(Debug, Deserialize)]
332struct RawInboundRow {
333 rowid: i64,
334 handle_id: String,
335 attributed_body_hex: Option<String>,
336 text: Option<String>,
337 date: i64,
338}
339
340pub fn parse_inbound_rows(json_stdout: &[u8]) -> Result<Vec<InboundMessage>, IntegrationError> {
344 let trimmed = json_stdout
346 .iter()
347 .position(|b| !b.is_ascii_whitespace())
348 .map(|p| &json_stdout[p..])
349 .unwrap_or(&[]);
350 if trimmed.is_empty() {
351 return Ok(vec![]);
352 }
353 let raw: Vec<RawInboundRow> = serde_json::from_slice(trimmed)
354 .map_err(|e| IntegrationError::Backend(format!("inbound sqlite json: {e}")))?;
355 let mut out = Vec::with_capacity(raw.len());
356 for r in raw {
357 let body = match resolve_body(r.attributed_body_hex.as_deref(), r.text.as_deref()) {
358 Some(b) => b,
359 None => continue,
360 };
361 out.push(InboundMessage {
362 rowid: r.rowid,
363 handle_id: r.handle_id,
364 body,
365 date: r.date,
366 });
367 }
368 Ok(out)
369}
370
371fn run_sqlite3_json(
375 db_path: &std::path::Path,
376 sql: &str,
377) -> Result<Vec<u8>, IntegrationError> {
378 let output = std::process::Command::new("/usr/bin/sqlite3")
379 .arg("-json")
380 .arg(db_path)
381 .arg(sql)
382 .output()
383 .map_err(|e| IntegrationError::Backend(format!("inbound sqlite: {e}")))?;
384 if !output.status.success() {
385 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
386 return Err(IntegrationError::Backend(format!(
387 "inbound sqlite failed: {stderr}"
388 )));
389 }
390 Ok(output.stdout)
391}
392
393pub fn max_rowid_in_db(db_path: &std::path::Path) -> Result<i64, IntegrationError> {
397 let stdout = run_sqlite3_json(db_path, max_rowid_sql())?;
398 parse_max_rowid(&stdout)
399}
400
401#[derive(Debug, Deserialize)]
402struct MaxRowidRow {
403 rowid: i64,
404}
405
406fn parse_max_rowid(json_stdout: &[u8]) -> Result<i64, IntegrationError> {
407 let trimmed = json_stdout
408 .iter()
409 .position(|b| !b.is_ascii_whitespace())
410 .map(|p| &json_stdout[p..])
411 .unwrap_or(&[]);
412 if trimmed.is_empty() {
413 return Ok(0);
414 }
415 let rows: Vec<MaxRowidRow> = serde_json::from_slice(trimmed)
416 .map_err(|e| IntegrationError::Backend(format!("max rowid json: {e}")))?;
417 Ok(rows.first().map(|r| r.rowid).unwrap_or(0))
418}
419
420pub fn read_inbound_from_db(
426 db_path: &std::path::Path,
427 min_rowid: i64,
428) -> Result<Vec<InboundMessage>, IntegrationError> {
429 let stdout = run_sqlite3_json(db_path, &inbound_sql(min_rowid))?;
430 parse_inbound_rows(&stdout)
431}
432
433#[cfg(target_os = "macos")]
436pub fn default_chat_db_path() -> std::path::PathBuf {
437 let mut db = std::path::PathBuf::from(std::env::var_os("HOME").unwrap_or_default());
438 db.push("Library/Messages/chat.db");
439 db
440}
441
442#[cfg(target_os = "macos")]
446pub fn read_inbound_messages(min_rowid: i64) -> Result<Vec<InboundMessage>, IntegrationError> {
447 read_inbound_from_db(&default_chat_db_path(), min_rowid)
448}
449
450#[cfg(target_os = "macos")]
454pub fn max_rowid_default_db() -> Result<i64, IntegrationError> {
455 max_rowid_in_db(&default_chat_db_path())
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461 use std::path::Path;
462
463 fn ts_blob(text: &str) -> Vec<u8> {
464 let tb = text.as_bytes();
470 let mut out: Vec<u8> = Vec::new();
471 out.extend_from_slice(&[0x04, 0x0b]);
472 out.extend_from_slice(b"streamtyped");
473 out.extend_from_slice(&[
474 0x81, 0xe8, 0x03, 0x84, 0x01, 0x40, 0x84, 0x84, 0x84, 0x12,
475 ]);
476 out.extend_from_slice(b"NSAttributedString");
477 out.extend_from_slice(&[0x00, 0x84, 0x84, 0x08]);
478 out.extend_from_slice(b"NSObject");
479 out.extend_from_slice(&[0x00, 0x85, 0x92, 0x84, 0x84, 0x84, 0x0f]);
480 out.extend_from_slice(b"NSMutableString");
481 out.extend_from_slice(&[0x01, 0x84, 0x84, 0x08]);
482 out.extend_from_slice(b"NSString");
483 out.extend_from_slice(&[0x01, 0x95, 0x84, 0x01, 0x2b]); if tb.len() < 0x80 {
485 out.push(tb.len() as u8);
486 } else {
487 out.push(0x81);
488 out.push((tb.len() & 0xff) as u8);
489 out.push(((tb.len() >> 8) & 0xff) as u8);
490 }
491 out.extend_from_slice(tb);
492 out.extend_from_slice(&[0x86, 0x84, 0x02, 0x69, 0x49, 0x00, 0x01]);
493 out
494 }
495
496 fn hex_encode(bytes: &[u8]) -> String {
497 let mut s = String::with_capacity(bytes.len() * 2);
498 for b in bytes {
499 s.push_str(&format!("{b:02x}"));
500 }
501 s
502 }
503
504 #[test]
507 fn decode_short_body() {
508 let blob = ts_blob("A7 approve");
509 assert_eq!(decode_attributed_body(&blob).as_deref(), Some("A7 approve"));
510 }
511
512 #[test]
513 fn decode_long_body_uses_u16_length_escape() {
514 let long = format!("A7 approve {}", "x".repeat(200));
515 let blob = ts_blob(&long);
516 assert_eq!(decode_attributed_body(&blob).as_deref(), Some(long.as_str()));
517 }
518
519 #[test]
520 fn oversized_attributed_body_skips_decode_no_dos_hang() {
521 let mut blob: Vec<u8> = Vec::with_capacity(64 * 1024);
526 while blob.len() < 64 * 1024 {
527 blob.extend_from_slice(b"NSString");
528 }
529 assert!(
530 blob.len() > MAX_ATTRIBUTED_BODY_DECODE_BYTES,
531 "fixture must exceed the decode cap"
532 );
533
534 let start = std::time::Instant::now();
538 let decoded = decode_attributed_body(&blob);
539 let elapsed = start.elapsed();
540 assert_eq!(decoded, None, "oversized blob must not decode to a body");
541 assert!(
542 elapsed < std::time::Duration::from_millis(100),
543 "decode of an oversized blob must be near-instant (cap engaged), took {elapsed:?}"
544 );
545
546 let hex = hex_encode(&blob);
551 assert_eq!(
552 resolve_body(Some(&hex), None),
553 None,
554 "oversized blob + NULL text → bodyless row, dropped"
555 );
556 assert_eq!(
558 resolve_body(Some(&hex), Some("approve")).as_deref(),
559 Some("approve"),
560 "oversized blob falls back to message.text"
561 );
562 }
563
564 #[test]
565 fn resolve_body_prefers_attributedbody_over_null_text() {
566 let blob = ts_blob("approve");
567 let hex = hex_encode(&blob);
568 let body = resolve_body(Some(&hex), None);
570 assert_eq!(body.as_deref(), Some("approve"));
571 assert_ne!(body.as_deref(), Some(""));
572 }
573
574 #[test]
575 fn resolve_body_falls_back_to_text_when_no_attributedbody() {
576 assert_eq!(
577 resolve_body(None, Some("deny")).as_deref(),
578 Some("deny")
579 );
580 assert_eq!(resolve_body(None, Some("")), None);
582 assert_eq!(resolve_body(None, None), None);
583 }
584
585 #[test]
586 fn parse_inbound_rows_decodes_and_drops_bodyless() {
587 let blob = ts_blob("approve");
588 let hex = hex_encode(&blob);
589 let json = format!(
590 "[{{\"rowid\":5,\"handle_id\":\"+15551234567\",\"attributed_body_hex\":\"{hex}\",\"text\":null,\"date\":700000000000000000}}]"
591 );
592 let rows = parse_inbound_rows(json.as_bytes()).unwrap();
593 assert_eq!(rows.len(), 1);
594 assert_eq!(rows[0].rowid, 5);
595 assert_eq!(rows[0].handle_id, "+15551234567");
596 assert_eq!(rows[0].body, "approve");
597 assert_eq!(rows[0].date, 700000000000000000);
598 }
599
600 #[test]
601 fn parse_inbound_rows_empty_stdout_is_zero_rows() {
602 assert!(parse_inbound_rows(b"").unwrap().is_empty());
603 assert!(parse_inbound_rows(b" \n").unwrap().is_empty());
604 }
605
606 #[test]
609 fn watermark_persist_survives_restart() {
610 let dir = tempfile::tempdir().unwrap();
611 let base = dir.path();
612 assert_eq!(Watermark::load(base).unwrap(), None);
614
615 let wm = Watermark::new(42);
616 wm.persist(base).unwrap();
617
618 let reloaded = Watermark::load(base).unwrap();
620 assert_eq!(reloaded, Some(Watermark::new(42)));
621 assert_eq!(reloaded.unwrap().last_rowid, 42);
622 }
623
624 #[test]
625 fn watermark_persist_is_atomic_no_temp_left() {
626 let dir = tempfile::tempdir().unwrap();
627 let base = dir.path();
628 Watermark::new(7).persist(base).unwrap();
629 let entries: Vec<_> = std::fs::read_dir(base)
631 .unwrap()
632 .map(|e| e.unwrap().file_name().to_string_lossy().to_string())
633 .collect();
634 assert!(
635 entries.contains(&"messages-inbound-watermark.json".to_string()),
636 "final watermark file must exist, got {entries:?}"
637 );
638 assert!(
639 !entries.iter().any(|n| n.contains(".tmp.")),
640 "no temp file should remain, got {entries:?}"
641 );
642 }
643
644 fn seed_temp_chat_db(dir: &Path, attr_text: &str) -> std::path::PathBuf {
652 let db = dir.join("chat.db");
653 let blob_hex = hex_encode(&ts_blob(attr_text));
654 let schema_and_rows = format!(
655 "CREATE TABLE handle (ROWID INTEGER PRIMARY KEY, id TEXT);\n\
656 CREATE TABLE message (ROWID INTEGER PRIMARY KEY, handle_id INTEGER, \
657 text TEXT, attributedBody BLOB, is_from_me INTEGER, date INTEGER);\n\
658 INSERT INTO handle (ROWID, id) VALUES (1, '+15551234567');\n\
659 -- (ii) older pre-existing inbound row\n\
660 INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
661 VALUES (1, 1, 'old hello', NULL, 0, 600000000000000000);\n\
662 -- a from-me row interleaved (must be excluded)\n\
663 INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
664 VALUES (2, 1, 'my outbound', NULL, 1, 650000000000000000);\n\
665 -- (i) NULL-text row, body in attributedBody (the row SC-1 depends on)\n\
666 INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
667 VALUES (3, 1, NULL, X'{blob_hex}', 0, 700000000000000000);\n\
668 -- (iii) outbound after the watermark (must be excluded)\n\
669 INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
670 VALUES (4, 1, 'me again', NULL, 1, 750000000000000000);\n"
671 );
672 let status = std::process::Command::new("/usr/bin/sqlite3")
673 .arg(&db)
674 .arg(&schema_and_rows)
675 .status()
676 .expect("sqlite3 must be available to seed the fixture");
677 assert!(status.success(), "fixture seed failed");
678 db
679 }
680
681 fn seed_solo_self_reply_chat_db(dir: &Path) -> std::path::PathBuf {
703 let db = dir.join("chat.db");
704 let prompt = "Approval needed: send wire transfer\nReply `A0 approve` or `A0 deny`.";
707 let prompt_sql = prompt.replace('\'', "''");
709 let schema_and_rows = format!(
710 "CREATE TABLE handle (ROWID INTEGER PRIMARY KEY, id TEXT);\n\
711 CREATE TABLE message (ROWID INTEGER PRIMARY KEY, handle_id INTEGER, \
712 text TEXT, attributedBody BLOB, is_from_me INTEGER, date INTEGER);\n\
713 INSERT INTO handle (ROWID, id) VALUES (1, '+15551234567');\n\
714 -- (a) received self-echo of the daemon's own prompt (KEEP)\n\
715 INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
716 VALUES (10, 1, '{prompt_sql}', NULL, 0, 800000000000000000);\n\
717 -- (b) the sent/synced-sent twin of that prompt (DROP)\n\
718 INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
719 VALUES (11, 1, '{prompt_sql}', NULL, 1, 800000000000000001);\n\
720 -- (c) the user's typed `A0 approve` self-reply, in the text column (KEEP)\n\
721 INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
722 VALUES (12, 1, 'A0 approve', NULL, 0, 800000000000000002);\n\
723 -- (d) the sent/synced-sent twin of that reply (DROP)\n\
724 INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
725 VALUES (13, 1, 'A0 approve', NULL, 1, 800000000000000003);\n"
726 );
727 let status = std::process::Command::new("/usr/bin/sqlite3")
728 .arg(&db)
729 .arg(&schema_and_rows)
730 .status()
731 .expect("sqlite3 must be available to seed the fixture");
732 assert!(status.success(), "fixture seed failed");
733 db
734 }
735
736 #[test]
737 fn solo_self_reply_kept_synced_sent_twins_dropped() {
738 if !Path::new("/usr/bin/sqlite3").exists() {
746 eprintln!("skip: /usr/bin/sqlite3 not present");
747 return;
748 }
749 let dir = tempfile::tempdir().unwrap();
750 let db = seed_solo_self_reply_chat_db(dir.path());
751
752 let rows = read_inbound_from_db(&db, 0).unwrap();
754
755 assert_eq!(rows.len(), 2, "exactly the two is_from_me=0 rows kept, got {rows:?}");
757
758 let prompt = "Approval needed: send wire transfer\nReply `A0 approve` or `A0 deny`.";
761 assert_eq!(rows[0].rowid, 10);
762 assert_eq!(rows[0].handle_id, "+15551234567");
763 assert_eq!(rows[0].body, prompt, "own-prompt echo body kept intact");
764
765 assert_eq!(rows[1].rowid, 12);
767 assert_eq!(rows[1].handle_id, "+15551234567");
768 assert_eq!(rows[1].body, "A0 approve", "solo self-reply body kept intact");
769
770 let returned_rowids: Vec<i64> = rows.iter().map(|r| r.rowid).collect();
772 assert!(
773 !returned_rowids.contains(&11),
774 "the is_from_me=1 prompt twin (rowid 11) must be dropped, got {returned_rowids:?}"
775 );
776 assert!(
777 !returned_rowids.contains(&13),
778 "the is_from_me=1 reply twin (rowid 13) must be dropped, got {returned_rowids:?}"
779 );
780 }
781
782 #[test]
783 fn attributedbody_null_text_decode_and_watermark() {
784 if !Path::new("/usr/bin/sqlite3").exists() {
788 eprintln!("skip: /usr/bin/sqlite3 not present");
789 return;
790 }
791 let dir = tempfile::tempdir().unwrap();
792 let db = seed_temp_chat_db(dir.path(), "A7 approve");
793
794 let base = dir.path().join("car-home");
796 Watermark::new(1).persist(&base).unwrap();
797 let wm = Watermark::load(&base).unwrap().unwrap();
798
799 let rows = read_inbound_from_db(&db, wm.last_rowid).unwrap();
800
801 assert_eq!(rows.len(), 1, "got {rows:?}");
804 let row = &rows[0];
805 assert_eq!(row.rowid, 3);
806 assert_eq!(row.handle_id, "+15551234567");
807 assert_eq!(row.body, "A7 approve");
810 assert_ne!(row.body, "");
811 assert_eq!(row.date, 700000000000000000);
812
813 let new_high = rows.iter().map(|r| r.rowid).max().unwrap();
815 Watermark::new(new_high).persist(&base).unwrap();
816
817 let wm2 = Watermark::load(&base).unwrap().unwrap();
820 assert_eq!(wm2.last_rowid, 3);
821 let rows_after = read_inbound_from_db(&db, wm2.last_rowid).unwrap();
822 assert!(
823 rows_after.is_empty(),
824 "row replayed after restart: {rows_after:?}"
825 );
826 }
827
828 #[test]
829 fn fresh_watermark_seeds_to_max_rowid_no_replay() {
830 if !Path::new("/usr/bin/sqlite3").exists() {
833 eprintln!("skip: /usr/bin/sqlite3 not present");
834 return;
835 }
836 let dir = tempfile::tempdir().unwrap();
837 let db = seed_temp_chat_db(dir.path(), "approve");
838 let base = dir.path().join("car-home");
839
840 assert_eq!(Watermark::load(&base).unwrap(), None);
842
843 let max = max_rowid_in_db(&db).unwrap();
845 assert_eq!(max, 4);
846 Watermark::new(max).persist(&base).unwrap();
847
848 let wm = Watermark::load(&base).unwrap().unwrap();
850 let rows = read_inbound_from_db(&db, wm.last_rowid).unwrap();
851 assert!(rows.is_empty(), "fresh seed should yield zero rows: {rows:?}");
852 }
853}