Skip to main content

car_integrations/messages/
read.rs

1//! Inbound chat.db reader + persisted ROWID watermark (net-new for the
2//! iMessage approval transport, Unit 1).
3//!
4//! The existing `messages/mod.rs` reader only queries the `chat` table (chat
5//! summaries). This module is the **net-new inbound read half**: it runs SQL
6//! over the `message` table joined to `handle`, returning only rows newer than
7//! a persisted high-water `ROWID` and not from the local user (`is_from_me =
8//! 0`), with the body decoded from the `attributedBody` typedstream BLOB (the
9//! `message.text` column is frequently NULL on modern Messages).
10//!
11//! Two halves, deliberately split so the verifier can run SC-1/SC-2 headless:
12//!
13//! - The **pure** logic — `decode_attributed_body` (typedstream → String) and
14//!   the watermark JSON store (`Watermark`) — is reachable and tested on ANY
15//!   OS. No `#[cfg]` gate.
16//! - The **chat.db query** (`read_inbound_messages` /
17//!   `read_inbound_messages_from`) shells out to `/usr/bin/sqlite3 -json` the
18//!   same way `list_chats_from_database` does, and is `#[cfg(target_os =
19//!   "macos")]`-gated for production. A test seam
20//!   (`read_inbound_from_sqlite3`) runs the same SQL path against a temp DB
21//!   when `/usr/bin/sqlite3` is present (it is, on the macOS dev host where the
22//!   gates run), so SC-1 exercises the real query + real decode end to end.
23
24use serde::{Deserialize, Serialize};
25
26use super::IntegrationError;
27
28/// One inbound iMessage row, projected from `message` JOIN `handle`.
29///
30/// `body` is the human-readable message text, already decoded from
31/// `attributedBody` (or the `text` column when it was populated). It is never
32/// the raw BLOB bytes and never empty for a row that carried a body.
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34pub struct InboundMessage {
35    /// `message.ROWID` — the monotonic high-water key.
36    pub rowid: i64,
37    /// `handle.id` — the sender handle (phone number / Apple ID email).
38    pub handle_id: String,
39    /// Decoded human-readable body.
40    pub body: String,
41    /// `message.date` — Apple epoch nanoseconds (raw; correlation only).
42    pub date: i64,
43}
44
45/// Persisted high-water ROWID for the inbound reader.
46///
47/// Serialized as `{"last_rowid": <i64>}` JSON, written atomically (temp +
48/// rename) under an INJECTABLE base dir so tests never touch the real
49/// `~/.car/`.
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
51pub struct Watermark {
52    pub last_rowid: i64,
53}
54
55impl Watermark {
56    pub fn new(last_rowid: i64) -> Self {
57        Self { last_rowid }
58    }
59
60    /// Resolve the watermark JSON file inside `base_dir` (the `~/.car/`
61    /// equivalent). The file lives at `<base_dir>/messages-inbound-watermark.json`.
62    pub fn path_in(base_dir: &std::path::Path) -> std::path::PathBuf {
63        base_dir.join("messages-inbound-watermark.json")
64    }
65
66    /// Load the watermark from disk. Returns `Ok(None)` when no file exists
67    /// (fresh boot — caller seeds to `MAX(ROWID)`).
68    pub fn load(base_dir: &std::path::Path) -> Result<Option<Self>, IntegrationError> {
69        let path = Self::path_in(base_dir);
70        match std::fs::read(&path) {
71            Ok(bytes) => {
72                let wm: Watermark = serde_json::from_slice(&bytes).map_err(|e| {
73                    IntegrationError::Backend(format!("watermark json parse: {e}"))
74                })?;
75                Ok(Some(wm))
76            }
77            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
78            Err(e) => Err(IntegrationError::Backend(format!("watermark read: {e}"))),
79        }
80    }
81
82    /// Persist the watermark atomically under `base_dir`.
83    ///
84    /// Mirrors the `car-server-core` `atomic_write_sync` / `supervisor.rs`
85    /// `write_json_atomic` shape — `create_dir_all` + write a unique temp file
86    /// in the same dir + `rename(2)` (atomic on the same filesystem). NOT a
87    /// plain non-atomic `fs::write`; NOT Swift `UserDefaults`. `car-server-core`'s
88    /// `atomic_write_sync` is private to that crate and not reachable from
89    /// `car-integrations`, so the shape is mirrored locally per the plan.
90    pub fn persist(&self, base_dir: &std::path::Path) -> Result<(), IntegrationError> {
91        std::fs::create_dir_all(base_dir).map_err(|e| {
92            IntegrationError::Backend(format!("watermark mkdir {}: {e}", base_dir.display()))
93        })?;
94        let path = Self::path_in(base_dir);
95        let bytes = serde_json::to_vec(self)
96            .map_err(|e| IntegrationError::Backend(format!("watermark serialize: {e}")))?;
97        atomic_write(&path, &bytes)
98    }
99}
100
101/// Crash-safe write: unique temp in the same directory + atomic `rename`.
102/// Mirrors `car-server-core::handler::atomic_write_sync` (private to that
103/// crate) and `car-registry::supervisor::write_json_atomic`. `rename(2)` is
104/// atomic on the same filesystem; the per-call temp name (pid + monotonic seq)
105/// means two writers never collide on the temp file.
106fn atomic_write(path: &std::path::Path, bytes: &[u8]) -> Result<(), IntegrationError> {
107    use std::sync::atomic::{AtomicU64, Ordering};
108    static SEQ: AtomicU64 = AtomicU64::new(0);
109    let seq = SEQ.fetch_add(1, Ordering::Relaxed);
110    let mut tmp_os = path.as_os_str().to_owned();
111    tmp_os.push(format!(".tmp.{}.{}", std::process::id(), seq));
112    let tmp = std::path::PathBuf::from(tmp_os);
113    std::fs::write(&tmp, bytes)
114        .map_err(|e| IntegrationError::Backend(format!("watermark temp write: {e}")))?;
115    std::fs::rename(&tmp, path)
116        .map_err(|e| IntegrationError::Backend(format!("watermark rename: {e}")))
117}
118
119/// Upper bound on the `attributedBody` blob the decoder will scan. Approval
120/// replies ("approve"/"deny"/"<code> approve") are well under 300 bytes; this
121/// 8 KiB cap is comfortably above any real reply yet small enough that the
122/// O(n^2) marker scanners can never be wedged by a hostile sender's giant blob
123/// (which would otherwise stall the single serial poll loop before the
124/// allowlist drop downstream). A blob over this cap falls back to
125/// `message.text`.
126const MAX_ATTRIBUTED_BODY_DECODE_BYTES: usize = 8 * 1024;
127
128/// Decode the human-readable body string out of a Messages `attributedBody`
129/// typedstream / `NSKeyedArchiver` BLOB.
130///
131/// Modern Messages stores the reply body in `message.attributedBody` (a
132/// streamtyped/typedstream BLOB) and leaves `message.text` NULL. The body is
133/// an `NSString` embedded after the `NSAttributedString` / `NSMutableString` /
134/// `NSString` class chain. The reliable, version-stable tell:
135///
136/// 1. Find the `NSString` class marker.
137/// 2. Scan forward to the `+` (0x2b) string-type tag.
138/// 3. Read the length prefix: a single byte `< 0x80` is the length; `0x81`
139///    means the next two bytes are a little-endian `u16` length (the
140///    typedstream "long length" escape).
141/// 4. Read that many UTF-8 bytes — that is the body.
142///
143/// Returns `None` when no body can be recovered (so the caller falls back to
144/// `message.text`). Zero-dependency hand-parse — no extra crate.
145pub fn decode_attributed_body(blob: &[u8]) -> Option<String> {
146    // Remote-DoS guard: both marker scanners are O(n^2) on a pathological blob
147    // full of repeated "NSString"/`+` markers, and the poller decodes EVERY
148    // inbound row's body for ANY sender BEFORE the downstream allowlist drop.
149    // An approval reply's typedstream is well under 300 bytes, so any blob over
150    // this small cap cannot be an approval reply — skip the decode entirely and
151    // fall back to `message.text` (returning `None` routes the caller through
152    // its text-fallback path). This bounds worst-case decode to ~1ms.
153    if blob.len() > MAX_ATTRIBUTED_BODY_DECODE_BYTES {
154        return None;
155    }
156    // Primary: structured parse anchored on the NSString class marker.
157    if let Some(s) = decode_via_nsstring_marker(blob) {
158        return Some(s);
159    }
160    // Fallback: some payloads place the body after a bare `+` tag without a
161    // preceding literal "NSString" token (class already referenced by index).
162    // Scan every `+`-anchored candidate and take the first that decodes to a
163    // non-empty valid UTF-8 run.
164    decode_via_plus_scan(blob)
165}
166
167fn decode_via_nsstring_marker(blob: &[u8]) -> Option<String> {
168    let marker = b"NSString";
169    let mut search_from = 0usize;
170    while let Some(rel) = find_subsequence(&blob[search_from..], marker) {
171        let after_marker = search_from + rel + marker.len();
172        // Anchor on the first 0x2b (`+` string-type tag) after the NSString
173        // marker. Fails safe: a mis-anchor decodes to garbage → the caller's
174        // text fallback (and, for an approval reply, `parse_inbound` → Ignore),
175        // never a wrong resolve.
176        if let Some(plus_rel) = find_byte(&blob[after_marker..], b'+') {
177            let plus_idx = after_marker + plus_rel;
178            if let Some(s) = read_length_prefixed_utf8(blob, plus_idx + 1) {
179                if !s.is_empty() {
180                    return Some(s);
181                }
182            }
183        }
184        search_from = after_marker;
185    }
186    None
187}
188
189/// Best-effort fallback when the structured `NSString`-anchored parse fails:
190/// scan every `+`-tagged candidate and take the first that decodes to a
191/// non-empty, mostly-printable UTF-8 run. This is **intentionally lossy** — it
192/// has no schema anchor, so a wrong guess is possible. That is acceptable here
193/// because a wrong decode for an approval reply parses to `Ignore`
194/// (`messaging_orchestrator::parse_inbound`) and so FAILS SAFE: a mis-decoded
195/// body never resolves an approval, it is simply dropped. Approval replies
196/// ("approve"/"deny"/"<code> approve") are short, so the structured parse above
197/// handles them in practice; this only catches odd typedstream layouts.
198fn decode_via_plus_scan(blob: &[u8]) -> Option<String> {
199    let mut i = 0usize;
200    while i < blob.len() {
201        if blob[i] == b'+' {
202            if let Some(s) = read_length_prefixed_utf8(blob, i + 1) {
203                if !s.is_empty() && s.chars().all(|c| !c.is_control() || c == '\n' || c == '\t') {
204                    return Some(s);
205                }
206            }
207        }
208        i += 1;
209    }
210    None
211}
212
213/// Read a typedstream length-prefixed UTF-8 string starting at `pos`.
214/// `pos` points at the length byte. Single byte `< 0x80` = length; `0x81` =
215/// next two bytes are a little-endian u16 length.
216///
217/// The `0x82` (u24) and `0x83` (u32) "long length" escapes — used only for
218/// bodies of ≥64KB / ≥16MB respectively — are intentionally NOT handled: they
219/// only occur for enormous messages, which are out of scope for short approval
220/// replies. Returning `None` here makes the caller fall back to `message.text`
221/// (and, failing that, drop the row), which is the correct, safe behavior for a
222/// body too large to be an approval reply.
223fn read_length_prefixed_utf8(blob: &[u8], pos: usize) -> Option<String> {
224    let first = *blob.get(pos)?;
225    let (len, start) = if first == 0x81 {
226        let lo = *blob.get(pos + 1)? as usize;
227        let hi = *blob.get(pos + 2)? as usize;
228        ((lo | (hi << 8)), pos + 3)
229    } else if first < 0x80 {
230        (first as usize, pos + 1)
231    } else {
232        // 0x82/0x83 (u24/u32 length escapes, ≥64KB bodies) deliberately fall
233        // through to None → fall back to `text`. Out of scope for approvals.
234        return None;
235    };
236    if len == 0 {
237        return None;
238    }
239    let end = start.checked_add(len)?;
240    let slice = blob.get(start..end)?;
241    std::str::from_utf8(slice).ok().map(|s| s.to_string())
242}
243
244fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
245    if needle.is_empty() || haystack.len() < needle.len() {
246        return None;
247    }
248    haystack
249        .windows(needle.len())
250        .position(|window| window == needle)
251}
252
253fn find_byte(haystack: &[u8], byte: u8) -> Option<usize> {
254    haystack.iter().position(|&b| b == byte)
255}
256
257/// Resolve a body for a row from its decoded `attributedBody` (hex) and the
258/// `text` column. Decodes `attributedBody` first; falls back to `text` only
259/// when `text` is non-null and non-empty. Returns `None` when neither yields a
260/// body (the caller drops bodyless rows).
261pub fn resolve_body(attributed_body_hex: Option<&str>, text: Option<&str>) -> Option<String> {
262    if let Some(hex) = attributed_body_hex {
263        if !hex.is_empty() {
264            if let Some(bytes) = hex_decode(hex) {
265                if let Some(decoded) = decode_attributed_body(&bytes) {
266                    if !decoded.is_empty() {
267                        return Some(decoded);
268                    }
269                }
270            }
271        }
272    }
273    match text {
274        Some(t) if !t.is_empty() => Some(t.to_string()),
275        _ => None,
276    }
277}
278
279/// Decode a lowercase/uppercase hex string (as produced by SQLite `hex()`)
280/// into bytes. Returns `None` on malformed input.
281fn hex_decode(hex: &str) -> Option<Vec<u8>> {
282    let bytes = hex.as_bytes();
283    if bytes.len() % 2 != 0 {
284        return None;
285    }
286    let mut out = Vec::with_capacity(bytes.len() / 2);
287    let mut i = 0;
288    while i < bytes.len() {
289        let hi = hex_nibble(bytes[i])?;
290        let lo = hex_nibble(bytes[i + 1])?;
291        out.push((hi << 4) | lo);
292        i += 2;
293    }
294    Some(out)
295}
296
297fn hex_nibble(b: u8) -> Option<u8> {
298    match b {
299        b'0'..=b'9' => Some(b - b'0'),
300        b'a'..=b'f' => Some(b - b'a' + 10),
301        b'A'..=b'F' => Some(b - b'A' + 10),
302        _ => None,
303    }
304}
305
306/// SQL projecting one inbound row: ROWID, handle.id, attributedBody-as-hex,
307/// text, date. Filters to `ROWID > :min` and `is_from_me = 0`, ordered by
308/// ROWID asc. Shared by the production reader and the test seam.
309fn inbound_sql(min_rowid: i64) -> String {
310    format!(
311        "SELECT message.ROWID AS rowid, \
312         handle.id AS handle_id, \
313         CASE WHEN message.attributedBody IS NULL THEN NULL \
314              ELSE hex(message.attributedBody) END AS attributed_body_hex, \
315         message.text AS text, \
316         message.date AS date \
317         FROM message \
318         JOIN handle ON message.handle_id = handle.ROWID \
319         WHERE message.ROWID > {min_rowid} AND message.is_from_me = 0 \
320         ORDER BY message.ROWID ASC;"
321    )
322}
323
324/// SQL for the current MAX(ROWID) of the message table (fresh-boot seed).
325fn max_rowid_sql() -> &'static str {
326    "SELECT COALESCE(MAX(ROWID), 0) AS rowid FROM message;"
327}
328
329/// One raw row as `/usr/bin/sqlite3 -json` returns it (BLOB column hex-encoded
330/// by the `hex()` SQL above so JSON stays valid).
331#[derive(Debug, Deserialize)]
332struct RawInboundRow {
333    rowid: i64,
334    handle_id: String,
335    attributed_body_hex: Option<String>,
336    text: Option<String>,
337    date: i64,
338}
339
340/// Parse the `/usr/bin/sqlite3 -json` stdout for the inbound query into typed
341/// `InboundMessage`s, decoding each body. Rows with no recoverable body are
342/// dropped. Pure — reachable in tests on any OS.
343pub fn parse_inbound_rows(json_stdout: &[u8]) -> Result<Vec<InboundMessage>, IntegrationError> {
344    // sqlite3 -json prints nothing (empty) when there are zero rows.
345    let trimmed = json_stdout
346        .iter()
347        .position(|b| !b.is_ascii_whitespace())
348        .map(|p| &json_stdout[p..])
349        .unwrap_or(&[]);
350    if trimmed.is_empty() {
351        return Ok(vec![]);
352    }
353    let raw: Vec<RawInboundRow> = serde_json::from_slice(trimmed)
354        .map_err(|e| IntegrationError::Backend(format!("inbound sqlite json: {e}")))?;
355    let mut out = Vec::with_capacity(raw.len());
356    for r in raw {
357        let body = match resolve_body(r.attributed_body_hex.as_deref(), r.text.as_deref()) {
358            Some(b) => b,
359            None => continue,
360        };
361        out.push(InboundMessage {
362            rowid: r.rowid,
363            handle_id: r.handle_id,
364            body,
365            date: r.date,
366        });
367    }
368    Ok(out)
369}
370
371/// Run `/usr/bin/sqlite3 -json <db> <sql>` and return stdout. Shared invocation
372/// harness, mirroring `list_chats_from_database`. Available wherever
373/// `/usr/bin/sqlite3` is (macOS prod + the macOS dev host where gates run).
374fn run_sqlite3_json(
375    db_path: &std::path::Path,
376    sql: &str,
377) -> Result<Vec<u8>, IntegrationError> {
378    let output = std::process::Command::new("/usr/bin/sqlite3")
379        .arg("-json")
380        .arg(db_path)
381        .arg(sql)
382        .output()
383        .map_err(|e| IntegrationError::Backend(format!("inbound sqlite: {e}")))?;
384    if !output.status.success() {
385        let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
386        return Err(IntegrationError::Backend(format!(
387            "inbound sqlite failed: {stderr}"
388        )));
389    }
390    Ok(output.stdout)
391}
392
393/// Return the current `MAX(ROWID)` of the `message` table in `db_path`.
394/// Used to seed the watermark on a fresh/missing watermark file so no
395/// pre-existing text is ever returned as new.
396pub fn max_rowid_in_db(db_path: &std::path::Path) -> Result<i64, IntegrationError> {
397    let stdout = run_sqlite3_json(db_path, max_rowid_sql())?;
398    parse_max_rowid(&stdout)
399}
400
401#[derive(Debug, Deserialize)]
402struct MaxRowidRow {
403    rowid: i64,
404}
405
406fn parse_max_rowid(json_stdout: &[u8]) -> Result<i64, IntegrationError> {
407    let trimmed = json_stdout
408        .iter()
409        .position(|b| !b.is_ascii_whitespace())
410        .map(|p| &json_stdout[p..])
411        .unwrap_or(&[]);
412    if trimmed.is_empty() {
413        return Ok(0);
414    }
415    let rows: Vec<MaxRowidRow> = serde_json::from_slice(trimmed)
416        .map_err(|e| IntegrationError::Backend(format!("max rowid json: {e}")))?;
417    Ok(rows.first().map(|r| r.rowid).unwrap_or(0))
418}
419
420/// Read inbound messages from `db_path` with `ROWID > min_rowid` and
421/// `is_from_me = 0`. The body of each row is decoded from `attributedBody`
422/// (falling back to `text`). Rows ordered by ROWID ascending. This is the
423/// injectable-path core: any caller (production reader, test seam) supplies the
424/// DB path so nothing hardcodes `~/Library/Messages/chat.db`.
425pub fn read_inbound_from_db(
426    db_path: &std::path::Path,
427    min_rowid: i64,
428) -> Result<Vec<InboundMessage>, IntegrationError> {
429    let stdout = run_sqlite3_json(db_path, &inbound_sql(min_rowid))?;
430    parse_inbound_rows(&stdout)
431}
432
433/// Resolve the default macOS chat.db path (`~/Library/Messages/chat.db`).
434/// Same resolution as `list_chats_from_database`. macOS only.
435#[cfg(target_os = "macos")]
436pub fn default_chat_db_path() -> std::path::PathBuf {
437    let mut db = std::path::PathBuf::from(std::env::var_os("HOME").unwrap_or_default());
438    db.push("Library/Messages/chat.db");
439    db
440}
441
442/// Production reader: read new inbound messages from the user's real Messages
443/// library past `min_rowid`. macOS-gated (requires Full Disk Access). The
444/// orchestrator (Unit 2/4, daemon-side) calls this in-process.
445#[cfg(target_os = "macos")]
446pub fn read_inbound_messages(min_rowid: i64) -> Result<Vec<InboundMessage>, IntegrationError> {
447    read_inbound_from_db(&default_chat_db_path(), min_rowid)
448}
449
450/// Production fresh-boot seed: MAX(ROWID) of the user's real Messages library.
451/// macOS-gated. Seed the watermark to this on first boot so no pre-existing
452/// text is replayed as new.
453#[cfg(target_os = "macos")]
454pub fn max_rowid_default_db() -> Result<i64, IntegrationError> {
455    max_rowid_in_db(&default_chat_db_path())
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461    use std::path::Path;
462
463    fn ts_blob(text: &str) -> Vec<u8> {
464        // Reproduce the canonical Messages `attributedBody` typedstream layout
465        // (streamtyped header + NSAttributedString/NSMutableString/NSString
466        // class chain + `+` string tag + length prefix + UTF-8 body), so the
467        // decoder is exercised against the real on-disk shape — including the
468        // 0x81+u16 long-length escape for bodies >= 128 bytes.
469        let tb = text.as_bytes();
470        let mut out: Vec<u8> = Vec::new();
471        out.extend_from_slice(&[0x04, 0x0b]);
472        out.extend_from_slice(b"streamtyped");
473        out.extend_from_slice(&[
474            0x81, 0xe8, 0x03, 0x84, 0x01, 0x40, 0x84, 0x84, 0x84, 0x12,
475        ]);
476        out.extend_from_slice(b"NSAttributedString");
477        out.extend_from_slice(&[0x00, 0x84, 0x84, 0x08]);
478        out.extend_from_slice(b"NSObject");
479        out.extend_from_slice(&[0x00, 0x85, 0x92, 0x84, 0x84, 0x84, 0x0f]);
480        out.extend_from_slice(b"NSMutableString");
481        out.extend_from_slice(&[0x01, 0x84, 0x84, 0x08]);
482        out.extend_from_slice(b"NSString");
483        out.extend_from_slice(&[0x01, 0x95, 0x84, 0x01, 0x2b]); // `+` string tag
484        if tb.len() < 0x80 {
485            out.push(tb.len() as u8);
486        } else {
487            out.push(0x81);
488            out.push((tb.len() & 0xff) as u8);
489            out.push(((tb.len() >> 8) & 0xff) as u8);
490        }
491        out.extend_from_slice(tb);
492        out.extend_from_slice(&[0x86, 0x84, 0x02, 0x69, 0x49, 0x00, 0x01]);
493        out
494    }
495
496    fn hex_encode(bytes: &[u8]) -> String {
497        let mut s = String::with_capacity(bytes.len() * 2);
498        for b in bytes {
499            s.push_str(&format!("{b:02x}"));
500        }
501        s
502    }
503
504    // ---- Pure decode tests (run on ANY OS) ----
505
506    #[test]
507    fn decode_short_body() {
508        let blob = ts_blob("A7 approve");
509        assert_eq!(decode_attributed_body(&blob).as_deref(), Some("A7 approve"));
510    }
511
512    #[test]
513    fn decode_long_body_uses_u16_length_escape() {
514        let long = format!("A7 approve {}", "x".repeat(200));
515        let blob = ts_blob(&long);
516        assert_eq!(decode_attributed_body(&blob).as_deref(), Some(long.as_str()));
517    }
518
519    #[test]
520    fn oversized_attributed_body_skips_decode_no_dos_hang() {
521        // Remote-DoS regression: a pathological blob — 64 KiB of repeated
522        // "NSString" markers — would drive the O(n^2) marker scanners for
523        // seconds and wedge the single serial poll loop. The size cap must
524        // skip the decode entirely and return quickly with the text fallback.
525        let mut blob: Vec<u8> = Vec::with_capacity(64 * 1024);
526        while blob.len() < 64 * 1024 {
527            blob.extend_from_slice(b"NSString");
528        }
529        assert!(
530            blob.len() > MAX_ATTRIBUTED_BODY_DECODE_BYTES,
531            "fixture must exceed the decode cap"
532        );
533
534        // The decode short-circuits to None (cap engaged) — proven by both the
535        // result AND the wall-clock: a real O(n^2) scan of 64 KiB of markers
536        // takes multiple seconds; the cap bounds it to microseconds.
537        let start = std::time::Instant::now();
538        let decoded = decode_attributed_body(&blob);
539        let elapsed = start.elapsed();
540        assert_eq!(decoded, None, "oversized blob must not decode to a body");
541        assert!(
542            elapsed < std::time::Duration::from_millis(100),
543            "decode of an oversized blob must be near-instant (cap engaged), took {elapsed:?}"
544        );
545
546        // End to end through the reader's row path with a NULL `text` column:
547        // resolve_body returns None (blob skipped, no text), so the row is
548        // dropped — NOT a hang, NOT raw bytes. (A real reply would instead
549        // populate `text` and fall back to it.)
550        let hex = hex_encode(&blob);
551        assert_eq!(
552            resolve_body(Some(&hex), None),
553            None,
554            "oversized blob + NULL text → bodyless row, dropped"
555        );
556        // With a text fallback present, the row decodes to the text value.
557        assert_eq!(
558            resolve_body(Some(&hex), Some("approve")).as_deref(),
559            Some("approve"),
560            "oversized blob falls back to message.text"
561        );
562    }
563
564    #[test]
565    fn resolve_body_prefers_attributedbody_over_null_text() {
566        let blob = ts_blob("approve");
567        let hex = hex_encode(&blob);
568        // text NULL, attributedBody populated -> decoded body, never "".
569        let body = resolve_body(Some(&hex), None);
570        assert_eq!(body.as_deref(), Some("approve"));
571        assert_ne!(body.as_deref(), Some(""));
572    }
573
574    #[test]
575    fn resolve_body_falls_back_to_text_when_no_attributedbody() {
576        assert_eq!(
577            resolve_body(None, Some("deny")).as_deref(),
578            Some("deny")
579        );
580        // empty text + no blob -> None (bodyless row dropped).
581        assert_eq!(resolve_body(None, Some("")), None);
582        assert_eq!(resolve_body(None, None), None);
583    }
584
585    #[test]
586    fn parse_inbound_rows_decodes_and_drops_bodyless() {
587        let blob = ts_blob("approve");
588        let hex = hex_encode(&blob);
589        let json = format!(
590            "[{{\"rowid\":5,\"handle_id\":\"+15551234567\",\"attributed_body_hex\":\"{hex}\",\"text\":null,\"date\":700000000000000000}}]"
591        );
592        let rows = parse_inbound_rows(json.as_bytes()).unwrap();
593        assert_eq!(rows.len(), 1);
594        assert_eq!(rows[0].rowid, 5);
595        assert_eq!(rows[0].handle_id, "+15551234567");
596        assert_eq!(rows[0].body, "approve");
597        assert_eq!(rows[0].date, 700000000000000000);
598    }
599
600    #[test]
601    fn parse_inbound_rows_empty_stdout_is_zero_rows() {
602        assert!(parse_inbound_rows(b"").unwrap().is_empty());
603        assert!(parse_inbound_rows(b"  \n").unwrap().is_empty());
604    }
605
606    // ---- Watermark persistence (run on ANY OS; SC-2) ----
607
608    #[test]
609    fn watermark_persist_survives_restart() {
610        let dir = tempfile::tempdir().unwrap();
611        let base = dir.path();
612        // Nothing persisted yet -> fresh boot.
613        assert_eq!(Watermark::load(base).unwrap(), None);
614
615        let wm = Watermark::new(42);
616        wm.persist(base).unwrap();
617
618        // "Restart": reload purely from disk, drop the in-memory value.
619        let reloaded = Watermark::load(base).unwrap();
620        assert_eq!(reloaded, Some(Watermark::new(42)));
621        assert_eq!(reloaded.unwrap().last_rowid, 42);
622    }
623
624    #[test]
625    fn watermark_persist_is_atomic_no_temp_left() {
626        let dir = tempfile::tempdir().unwrap();
627        let base = dir.path();
628        Watermark::new(7).persist(base).unwrap();
629        // Only the final file remains — no leftover .tmp.* temp files.
630        let entries: Vec<_> = std::fs::read_dir(base)
631            .unwrap()
632            .map(|e| e.unwrap().file_name().to_string_lossy().to_string())
633            .collect();
634        assert!(
635            entries.contains(&"messages-inbound-watermark.json".to_string()),
636            "final watermark file must exist, got {entries:?}"
637        );
638        assert!(
639            !entries.iter().any(|n| n.contains(".tmp.")),
640            "no temp file should remain, got {entries:?}"
641        );
642    }
643
644    // ---- Test seam: real SQL path over a temp chat.db via /usr/bin/sqlite3 ----
645
646    /// Build a temp chat.db with the real `message`/`handle` schema and seed:
647    /// (i) an older pre-existing inbound row (rowid 1),
648    /// (ii) a NULL-`text` row whose body lives in `attributedBody` (rowid 3),
649    /// (iii) an outbound `is_from_me = 1` row (rowid 4, must be excluded),
650    /// using a known handle.id. Returns the db path (kept alive by `dir`).
651    fn seed_temp_chat_db(dir: &Path, attr_text: &str) -> std::path::PathBuf {
652        let db = dir.join("chat.db");
653        let blob_hex = hex_encode(&ts_blob(attr_text));
654        let schema_and_rows = format!(
655            "CREATE TABLE handle (ROWID INTEGER PRIMARY KEY, id TEXT);\n\
656             CREATE TABLE message (ROWID INTEGER PRIMARY KEY, handle_id INTEGER, \
657                 text TEXT, attributedBody BLOB, is_from_me INTEGER, date INTEGER);\n\
658             INSERT INTO handle (ROWID, id) VALUES (1, '+15551234567');\n\
659             -- (ii) older pre-existing inbound row\n\
660             INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
661                 VALUES (1, 1, 'old hello', NULL, 0, 600000000000000000);\n\
662             -- a from-me row interleaved (must be excluded)\n\
663             INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
664                 VALUES (2, 1, 'my outbound', NULL, 1, 650000000000000000);\n\
665             -- (i) NULL-text row, body in attributedBody (the row SC-1 depends on)\n\
666             INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
667                 VALUES (3, 1, NULL, X'{blob_hex}', 0, 700000000000000000);\n\
668             -- (iii) outbound after the watermark (must be excluded)\n\
669             INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
670                 VALUES (4, 1, 'me again', NULL, 1, 750000000000000000);\n"
671        );
672        let status = std::process::Command::new("/usr/bin/sqlite3")
673            .arg(&db)
674            .arg(&schema_and_rows)
675            .status()
676            .expect("sqlite3 must be available to seed the fixture");
677        assert!(status.success(), "fixture seed failed");
678        db
679    }
680
681    /// SC-D fixture — the single-Apple-ID (solo) self-reply shape.
682    ///
683    /// A NEW sibling of `seed_temp_chat_db` (it MUST stay separate: SC-1's
684    /// `attributedbody_null_text_decode_and_watermark` hard-asserts
685    /// `rows.len() == 1` against that fixture, so extending it would break SC-1).
686    ///
687    /// Probe provenance: on a single-Apple-ID account the on-device probe showed
688    /// that BOTH the daemon's own outbound prompt AND the user's typed reply land
689    /// in `chat.db` twice — once `is_from_me = 1` (the sent/synced-sent copy) and
690    /// once `is_from_me = 0` (a received self-echo). The typed reply body sits in
691    /// the `text` column (NOT `attributedBody`). The literal probe handle and
692    /// chat.db rowids carry no behavioral weight (the filter keys only on
693    /// `is_from_me` and ROWID ordering), so this fixture uses the standard test
694    /// handle and synthetic monotonic rowids per scope-call C1 — NO literal probe
695    /// phone number / rowids appear here.
696    ///
697    /// Seeds, on the SAME handle past a watermark of 0, four rows:
698    ///   (a) `is_from_me = 0` echo of the daemon's own-sent prompt body — KEPT,
699    ///   (b) its `is_from_me = 1` synced-sent twin                       — DROPPED,
700    ///   (c) `is_from_me = 0` `A0 approve` self-reply                    — KEPT,
701    ///   (d) its `is_from_me = 1` synced-sent twin                       — DROPPED.
702    fn seed_solo_self_reply_chat_db(dir: &Path) -> std::path::PathBuf {
703        let db = dir.join("chat.db");
704        // The daemon's own outbound prompt body (the exact `outbound_body` shape:
705        // a multi-token line that echoes back `is_from_me = 0` on a solo account).
706        let prompt = "Approval needed: send wire transfer\nReply `A0 approve` or `A0 deny`.";
707        // Single-quotes escaped for the SQL string literal.
708        let prompt_sql = prompt.replace('\'', "''");
709        let schema_and_rows = format!(
710            "CREATE TABLE handle (ROWID INTEGER PRIMARY KEY, id TEXT);\n\
711             CREATE TABLE message (ROWID INTEGER PRIMARY KEY, handle_id INTEGER, \
712                 text TEXT, attributedBody BLOB, is_from_me INTEGER, date INTEGER);\n\
713             INSERT INTO handle (ROWID, id) VALUES (1, '+15551234567');\n\
714             -- (a) received self-echo of the daemon's own prompt (KEEP)\n\
715             INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
716                 VALUES (10, 1, '{prompt_sql}', NULL, 0, 800000000000000000);\n\
717             -- (b) the sent/synced-sent twin of that prompt (DROP)\n\
718             INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
719                 VALUES (11, 1, '{prompt_sql}', NULL, 1, 800000000000000001);\n\
720             -- (c) the user's typed `A0 approve` self-reply, in the text column (KEEP)\n\
721             INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
722                 VALUES (12, 1, 'A0 approve', NULL, 0, 800000000000000002);\n\
723             -- (d) the sent/synced-sent twin of that reply (DROP)\n\
724             INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
725                 VALUES (13, 1, 'A0 approve', NULL, 1, 800000000000000003);\n"
726        );
727        let status = std::process::Command::new("/usr/bin/sqlite3")
728            .arg(&db)
729            .arg(&schema_and_rows)
730            .status()
731            .expect("sqlite3 must be available to seed the fixture");
732        assert!(status.success(), "fixture seed failed");
733        db
734    }
735
736    #[test]
737    fn solo_self_reply_kept_synced_sent_twins_dropped() {
738        // SC-D (Wall 1 pinned): the `is_from_me = 0` filter (read.rs:319) KEEPS the
739        // solo user's two `is_from_me = 0` rows — the own-prompt self-echo and the
740        // `A0 approve` reply — with bodies intact, and DROPS both `is_from_me = 1`
741        // synced-sent twins. This is a regression tripwire in the keep-direction:
742        // it fails the moment a future edit narrows or handle-scopes the filter
743        // such that the solo self-reply would be dropped. Skips when sqlite3 is
744        // absent, per existing precedent.
745        if !Path::new("/usr/bin/sqlite3").exists() {
746            eprintln!("skip: /usr/bin/sqlite3 not present");
747            return;
748        }
749        let dir = tempfile::tempdir().unwrap();
750        let db = seed_solo_self_reply_chat_db(dir.path());
751
752        // Watermark 0: all four seeded rows (rowids 10–13) are past it.
753        let rows = read_inbound_from_db(&db, 0).unwrap();
754
755        // Exactly the two `is_from_me = 0` rows survive, in ROWID order.
756        assert_eq!(rows.len(), 2, "exactly the two is_from_me=0 rows kept, got {rows:?}");
757
758        // (a) the own-prompt self-echo is kept, body intact (the multi-token
759        // prompt — Wall 2 would Ignore it at the parser, but Wall 1 keeps the row).
760        let prompt = "Approval needed: send wire transfer\nReply `A0 approve` or `A0 deny`.";
761        assert_eq!(rows[0].rowid, 10);
762        assert_eq!(rows[0].handle_id, "+15551234567");
763        assert_eq!(rows[0].body, prompt, "own-prompt echo body kept intact");
764
765        // (c) the `A0 approve` self-reply is kept, body intact.
766        assert_eq!(rows[1].rowid, 12);
767        assert_eq!(rows[1].handle_id, "+15551234567");
768        assert_eq!(rows[1].body, "A0 approve", "solo self-reply body kept intact");
769
770        // Neither `is_from_me = 1` synced-sent twin (rowids 11, 13) appears.
771        let returned_rowids: Vec<i64> = rows.iter().map(|r| r.rowid).collect();
772        assert!(
773            !returned_rowids.contains(&11),
774            "the is_from_me=1 prompt twin (rowid 11) must be dropped, got {returned_rowids:?}"
775        );
776        assert!(
777            !returned_rowids.contains(&13),
778            "the is_from_me=1 reply twin (rowid 13) must be dropped, got {returned_rowids:?}"
779        );
780    }
781
782    #[test]
783    fn attributedbody_null_text_decode_and_watermark() {
784        // SC-1: post-watermark is_from_me=0 rows returned (older excluded);
785        // NULL-text/attributedBody row decodes to the exact expected string;
786        // fields correct; no replay after restart from persisted watermark.
787        if !Path::new("/usr/bin/sqlite3").exists() {
788            eprintln!("skip: /usr/bin/sqlite3 not present");
789            return;
790        }
791        let dir = tempfile::tempdir().unwrap();
792        let db = seed_temp_chat_db(dir.path(), "A7 approve");
793
794        // Watermark set to 1 (we have already seen the older row).
795        let base = dir.path().join("car-home");
796        Watermark::new(1).persist(&base).unwrap();
797        let wm = Watermark::load(&base).unwrap().unwrap();
798
799        let rows = read_inbound_from_db(&db, wm.last_rowid).unwrap();
800
801        // Exactly one row: rowid 3 (the NULL-text/attributedBody inbound).
802        // Older row 1 excluded by watermark; from-me rows 2 and 4 excluded.
803        assert_eq!(rows.len(), 1, "got {rows:?}");
804        let row = &rows[0];
805        assert_eq!(row.rowid, 3);
806        assert_eq!(row.handle_id, "+15551234567");
807        // The load-bearing assertion: decoded body is the EXACT string, not ""
808        // and not raw bytes.
809        assert_eq!(row.body, "A7 approve");
810        assert_ne!(row.body, "");
811        assert_eq!(row.date, 700000000000000000);
812
813        // Advance + persist the watermark to the max rowid we just saw.
814        let new_high = rows.iter().map(|r| r.rowid).max().unwrap();
815        Watermark::new(new_high).persist(&base).unwrap();
816
817        // "Restart": reconstruct the watermark purely from disk and re-read the
818        // unchanged DB -> the same row must NOT be replayed.
819        let wm2 = Watermark::load(&base).unwrap().unwrap();
820        assert_eq!(wm2.last_rowid, 3);
821        let rows_after = read_inbound_from_db(&db, wm2.last_rowid).unwrap();
822        assert!(
823            rows_after.is_empty(),
824            "row replayed after restart: {rows_after:?}"
825        );
826    }
827
828    #[test]
829    fn fresh_watermark_seeds_to_max_rowid_no_replay() {
830        // SC-2 edge: fresh/missing watermark seeds to MAX(ROWID) so the first
831        // read of an unchanged DB yields zero new rows.
832        if !Path::new("/usr/bin/sqlite3").exists() {
833            eprintln!("skip: /usr/bin/sqlite3 not present");
834            return;
835        }
836        let dir = tempfile::tempdir().unwrap();
837        let db = seed_temp_chat_db(dir.path(), "approve");
838        let base = dir.path().join("car-home");
839
840        // Fresh boot: no watermark on disk.
841        assert_eq!(Watermark::load(&base).unwrap(), None);
842
843        // Seed to MAX(ROWID).
844        let max = max_rowid_in_db(&db).unwrap();
845        assert_eq!(max, 4);
846        Watermark::new(max).persist(&base).unwrap();
847
848        // First read after fresh-boot seed: zero new rows on the unchanged DB.
849        let wm = Watermark::load(&base).unwrap().unwrap();
850        let rows = read_inbound_from_db(&db, wm.last_rowid).unwrap();
851        assert!(rows.is_empty(), "fresh seed should yield zero rows: {rows:?}");
852    }
853}