car-integrations 0.30.0

OS-native account-bound integrations (Calendar, Contacts, Mail) for CAR
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
//! Inbound chat.db reader + persisted ROWID watermark (net-new for the
//! iMessage approval transport, Unit 1).
//!
//! The existing `messages/mod.rs` reader only queries the `chat` table (chat
//! summaries). This module is the **net-new inbound read half**: it runs SQL
//! over the `message` table joined to `handle`, returning only rows newer than
//! a persisted high-water `ROWID` and not from the local user (`is_from_me =
//! 0`), with the body decoded from the `attributedBody` typedstream BLOB (the
//! `message.text` column is frequently NULL on modern Messages).
//!
//! Two halves, deliberately split so the verifier can run SC-1/SC-2 headless:
//!
//! - The **pure** logic — `decode_attributed_body` (typedstream → String) and
//!   the watermark JSON store (`Watermark`) — is reachable and tested on ANY
//!   OS. No `#[cfg]` gate.
//! - The **chat.db query** (`read_inbound_messages` /
//!   `read_inbound_messages_from`) shells out to `/usr/bin/sqlite3 -json` the
//!   same way `list_chats_from_database` does, and is `#[cfg(target_os =
//!   "macos")]`-gated for production. A test seam
//!   (`read_inbound_from_sqlite3`) runs the same SQL path against a temp DB
//!   when `/usr/bin/sqlite3` is present (it is, on the macOS dev host where the
//!   gates run), so SC-1 exercises the real query + real decode end to end.

use serde::{Deserialize, Serialize};

use super::IntegrationError;

/// One inbound iMessage row, projected from `message` JOIN `handle`.
///
/// `body` is the human-readable message text, already decoded from
/// `attributedBody` (or the `text` column when it was populated). It is never
/// the raw BLOB bytes and never empty for a row that carried a body.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct InboundMessage {
    /// `message.ROWID` — the monotonic high-water key.
    pub rowid: i64,
    /// `handle.id` — the sender handle (phone number / Apple ID email).
    pub handle_id: String,
    /// Decoded human-readable body.
    pub body: String,
    /// `message.date` — Apple epoch nanoseconds (raw; correlation only).
    pub date: i64,
}

/// Persisted high-water ROWID for the inbound reader.
///
/// Serialized as `{"last_rowid": <i64>}` JSON, written atomically (temp +
/// rename) under an INJECTABLE base dir so tests never touch the real
/// `~/.car/`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct Watermark {
    pub last_rowid: i64,
}

impl Watermark {
    pub fn new(last_rowid: i64) -> Self {
        Self { last_rowid }
    }

    /// Resolve the watermark JSON file inside `base_dir` (the `~/.car/`
    /// equivalent). The file lives at `<base_dir>/messages-inbound-watermark.json`.
    pub fn path_in(base_dir: &std::path::Path) -> std::path::PathBuf {
        base_dir.join("messages-inbound-watermark.json")
    }

    /// Load the watermark from disk. Returns `Ok(None)` when no file exists
    /// (fresh boot — caller seeds to `MAX(ROWID)`).
    pub fn load(base_dir: &std::path::Path) -> Result<Option<Self>, IntegrationError> {
        let path = Self::path_in(base_dir);
        match std::fs::read(&path) {
            Ok(bytes) => {
                let wm: Watermark = serde_json::from_slice(&bytes).map_err(|e| {
                    IntegrationError::Backend(format!("watermark json parse: {e}"))
                })?;
                Ok(Some(wm))
            }
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
            Err(e) => Err(IntegrationError::Backend(format!("watermark read: {e}"))),
        }
    }

    /// Persist the watermark atomically under `base_dir`.
    ///
    /// Mirrors the `car-server-core` `atomic_write_sync` / `supervisor.rs`
    /// `write_json_atomic` shape — `create_dir_all` + write a unique temp file
    /// in the same dir + `rename(2)` (atomic on the same filesystem). NOT a
    /// plain non-atomic `fs::write`; NOT Swift `UserDefaults`. `car-server-core`'s
    /// `atomic_write_sync` is private to that crate and not reachable from
    /// `car-integrations`, so the shape is mirrored locally per the plan.
    pub fn persist(&self, base_dir: &std::path::Path) -> Result<(), IntegrationError> {
        std::fs::create_dir_all(base_dir).map_err(|e| {
            IntegrationError::Backend(format!("watermark mkdir {}: {e}", base_dir.display()))
        })?;
        let path = Self::path_in(base_dir);
        let bytes = serde_json::to_vec(self)
            .map_err(|e| IntegrationError::Backend(format!("watermark serialize: {e}")))?;
        atomic_write(&path, &bytes)
    }
}

/// Crash-safe write: unique temp in the same directory + atomic `rename`.
/// Mirrors `car-server-core::handler::atomic_write_sync` (private to that
/// crate) and `car-registry::supervisor::write_json_atomic`. `rename(2)` is
/// atomic on the same filesystem; the per-call temp name (pid + monotonic seq)
/// means two writers never collide on the temp file.
fn atomic_write(path: &std::path::Path, bytes: &[u8]) -> Result<(), IntegrationError> {
    use std::sync::atomic::{AtomicU64, Ordering};
    static SEQ: AtomicU64 = AtomicU64::new(0);
    let seq = SEQ.fetch_add(1, Ordering::Relaxed);
    let mut tmp_os = path.as_os_str().to_owned();
    tmp_os.push(format!(".tmp.{}.{}", std::process::id(), seq));
    let tmp = std::path::PathBuf::from(tmp_os);
    std::fs::write(&tmp, bytes)
        .map_err(|e| IntegrationError::Backend(format!("watermark temp write: {e}")))?;
    std::fs::rename(&tmp, path)
        .map_err(|e| IntegrationError::Backend(format!("watermark rename: {e}")))
}

/// Upper bound on the `attributedBody` blob the decoder will scan. Approval
/// replies ("approve"/"deny"/"<code> approve") are well under 300 bytes; this
/// 8 KiB cap is comfortably above any real reply yet small enough that the
/// O(n^2) marker scanners can never be wedged by a hostile sender's giant blob
/// (which would otherwise stall the single serial poll loop before the
/// allowlist drop downstream). A blob over this cap falls back to
/// `message.text`.
const MAX_ATTRIBUTED_BODY_DECODE_BYTES: usize = 8 * 1024;

/// Decode the human-readable body string out of a Messages `attributedBody`
/// typedstream / `NSKeyedArchiver` BLOB.
///
/// Modern Messages stores the reply body in `message.attributedBody` (a
/// streamtyped/typedstream BLOB) and leaves `message.text` NULL. The body is
/// an `NSString` embedded after the `NSAttributedString` / `NSMutableString` /
/// `NSString` class chain. The reliable, version-stable tell:
///
/// 1. Find the `NSString` class marker.
/// 2. Scan forward to the `+` (0x2b) string-type tag.
/// 3. Read the length prefix: a single byte `< 0x80` is the length; `0x81`
///    means the next two bytes are a little-endian `u16` length (the
///    typedstream "long length" escape).
/// 4. Read that many UTF-8 bytes — that is the body.
///
/// Returns `None` when no body can be recovered (so the caller falls back to
/// `message.text`). Zero-dependency hand-parse — no extra crate.
pub fn decode_attributed_body(blob: &[u8]) -> Option<String> {
    // Remote-DoS guard: both marker scanners are O(n^2) on a pathological blob
    // full of repeated "NSString"/`+` markers, and the poller decodes EVERY
    // inbound row's body for ANY sender BEFORE the downstream allowlist drop.
    // An approval reply's typedstream is well under 300 bytes, so any blob over
    // this small cap cannot be an approval reply — skip the decode entirely and
    // fall back to `message.text` (returning `None` routes the caller through
    // its text-fallback path). This bounds worst-case decode to ~1ms.
    if blob.len() > MAX_ATTRIBUTED_BODY_DECODE_BYTES {
        return None;
    }
    // Primary: structured parse anchored on the NSString class marker.
    if let Some(s) = decode_via_nsstring_marker(blob) {
        return Some(s);
    }
    // Fallback: some payloads place the body after a bare `+` tag without a
    // preceding literal "NSString" token (class already referenced by index).
    // Scan every `+`-anchored candidate and take the first that decodes to a
    // non-empty valid UTF-8 run.
    decode_via_plus_scan(blob)
}

fn decode_via_nsstring_marker(blob: &[u8]) -> Option<String> {
    let marker = b"NSString";
    let mut search_from = 0usize;
    while let Some(rel) = find_subsequence(&blob[search_from..], marker) {
        let after_marker = search_from + rel + marker.len();
        // Anchor on the first 0x2b (`+` string-type tag) after the NSString
        // marker. Fails safe: a mis-anchor decodes to garbage → the caller's
        // text fallback (and, for an approval reply, `parse_inbound` → Ignore),
        // never a wrong resolve.
        if let Some(plus_rel) = find_byte(&blob[after_marker..], b'+') {
            let plus_idx = after_marker + plus_rel;
            if let Some(s) = read_length_prefixed_utf8(blob, plus_idx + 1) {
                if !s.is_empty() {
                    return Some(s);
                }
            }
        }
        search_from = after_marker;
    }
    None
}

/// Best-effort fallback when the structured `NSString`-anchored parse fails:
/// scan every `+`-tagged candidate and take the first that decodes to a
/// non-empty, mostly-printable UTF-8 run. This is **intentionally lossy** — it
/// has no schema anchor, so a wrong guess is possible. That is acceptable here
/// because a wrong decode for an approval reply parses to `Ignore`
/// (`messaging_orchestrator::parse_inbound`) and so FAILS SAFE: a mis-decoded
/// body never resolves an approval, it is simply dropped. Approval replies
/// ("approve"/"deny"/"<code> approve") are short, so the structured parse above
/// handles them in practice; this only catches odd typedstream layouts.
fn decode_via_plus_scan(blob: &[u8]) -> Option<String> {
    let mut i = 0usize;
    while i < blob.len() {
        if blob[i] == b'+' {
            if let Some(s) = read_length_prefixed_utf8(blob, i + 1) {
                if !s.is_empty() && s.chars().all(|c| !c.is_control() || c == '\n' || c == '\t') {
                    return Some(s);
                }
            }
        }
        i += 1;
    }
    None
}

/// Read a typedstream length-prefixed UTF-8 string starting at `pos`.
/// `pos` points at the length byte. Single byte `< 0x80` = length; `0x81` =
/// next two bytes are a little-endian u16 length.
///
/// The `0x82` (u24) and `0x83` (u32) "long length" escapes — used only for
/// bodies of ≥64KB / ≥16MB respectively — are intentionally NOT handled: they
/// only occur for enormous messages, which are out of scope for short approval
/// replies. Returning `None` here makes the caller fall back to `message.text`
/// (and, failing that, drop the row), which is the correct, safe behavior for a
/// body too large to be an approval reply.
fn read_length_prefixed_utf8(blob: &[u8], pos: usize) -> Option<String> {
    let first = *blob.get(pos)?;
    let (len, start) = if first == 0x81 {
        let lo = *blob.get(pos + 1)? as usize;
        let hi = *blob.get(pos + 2)? as usize;
        ((lo | (hi << 8)), pos + 3)
    } else if first < 0x80 {
        (first as usize, pos + 1)
    } else {
        // 0x82/0x83 (u24/u32 length escapes, ≥64KB bodies) deliberately fall
        // through to None → fall back to `text`. Out of scope for approvals.
        return None;
    };
    if len == 0 {
        return None;
    }
    let end = start.checked_add(len)?;
    let slice = blob.get(start..end)?;
    std::str::from_utf8(slice).ok().map(|s| s.to_string())
}

fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
    if needle.is_empty() || haystack.len() < needle.len() {
        return None;
    }
    haystack
        .windows(needle.len())
        .position(|window| window == needle)
}

fn find_byte(haystack: &[u8], byte: u8) -> Option<usize> {
    haystack.iter().position(|&b| b == byte)
}

/// Resolve a body for a row from its decoded `attributedBody` (hex) and the
/// `text` column. Decodes `attributedBody` first; falls back to `text` only
/// when `text` is non-null and non-empty. Returns `None` when neither yields a
/// body (the caller drops bodyless rows).
pub fn resolve_body(attributed_body_hex: Option<&str>, text: Option<&str>) -> Option<String> {
    if let Some(hex) = attributed_body_hex {
        if !hex.is_empty() {
            if let Some(bytes) = hex_decode(hex) {
                if let Some(decoded) = decode_attributed_body(&bytes) {
                    if !decoded.is_empty() {
                        return Some(decoded);
                    }
                }
            }
        }
    }
    match text {
        Some(t) if !t.is_empty() => Some(t.to_string()),
        _ => None,
    }
}

/// Decode a lowercase/uppercase hex string (as produced by SQLite `hex()`)
/// into bytes. Returns `None` on malformed input.
fn hex_decode(hex: &str) -> Option<Vec<u8>> {
    let bytes = hex.as_bytes();
    if bytes.len() % 2 != 0 {
        return None;
    }
    let mut out = Vec::with_capacity(bytes.len() / 2);
    let mut i = 0;
    while i < bytes.len() {
        let hi = hex_nibble(bytes[i])?;
        let lo = hex_nibble(bytes[i + 1])?;
        out.push((hi << 4) | lo);
        i += 2;
    }
    Some(out)
}

fn hex_nibble(b: u8) -> Option<u8> {
    match b {
        b'0'..=b'9' => Some(b - b'0'),
        b'a'..=b'f' => Some(b - b'a' + 10),
        b'A'..=b'F' => Some(b - b'A' + 10),
        _ => None,
    }
}

/// SQL projecting one inbound row: ROWID, handle.id, attributedBody-as-hex,
/// text, date. Filters to `ROWID > :min` and `is_from_me = 0`, ordered by
/// ROWID asc. Shared by the production reader and the test seam.
fn inbound_sql(min_rowid: i64) -> String {
    format!(
        "SELECT message.ROWID AS rowid, \
         handle.id AS handle_id, \
         CASE WHEN message.attributedBody IS NULL THEN NULL \
              ELSE hex(message.attributedBody) END AS attributed_body_hex, \
         message.text AS text, \
         message.date AS date \
         FROM message \
         JOIN handle ON message.handle_id = handle.ROWID \
         WHERE message.ROWID > {min_rowid} AND message.is_from_me = 0 \
         ORDER BY message.ROWID ASC;"
    )
}

/// SQL for the current MAX(ROWID) of the message table (fresh-boot seed).
fn max_rowid_sql() -> &'static str {
    "SELECT COALESCE(MAX(ROWID), 0) AS rowid FROM message;"
}

/// One raw row as `/usr/bin/sqlite3 -json` returns it (BLOB column hex-encoded
/// by the `hex()` SQL above so JSON stays valid).
#[derive(Debug, Deserialize)]
struct RawInboundRow {
    rowid: i64,
    handle_id: String,
    attributed_body_hex: Option<String>,
    text: Option<String>,
    date: i64,
}

/// Parse the `/usr/bin/sqlite3 -json` stdout for the inbound query into typed
/// `InboundMessage`s, decoding each body. Rows with no recoverable body are
/// dropped. Pure — reachable in tests on any OS.
pub fn parse_inbound_rows(json_stdout: &[u8]) -> Result<Vec<InboundMessage>, IntegrationError> {
    // sqlite3 -json prints nothing (empty) when there are zero rows.
    let trimmed = json_stdout
        .iter()
        .position(|b| !b.is_ascii_whitespace())
        .map(|p| &json_stdout[p..])
        .unwrap_or(&[]);
    if trimmed.is_empty() {
        return Ok(vec![]);
    }
    let raw: Vec<RawInboundRow> = serde_json::from_slice(trimmed)
        .map_err(|e| IntegrationError::Backend(format!("inbound sqlite json: {e}")))?;
    let mut out = Vec::with_capacity(raw.len());
    for r in raw {
        let body = match resolve_body(r.attributed_body_hex.as_deref(), r.text.as_deref()) {
            Some(b) => b,
            None => continue,
        };
        out.push(InboundMessage {
            rowid: r.rowid,
            handle_id: r.handle_id,
            body,
            date: r.date,
        });
    }
    Ok(out)
}

/// Run `/usr/bin/sqlite3 -json <db> <sql>` and return stdout. Shared invocation
/// harness, mirroring `list_chats_from_database`. Available wherever
/// `/usr/bin/sqlite3` is (macOS prod + the macOS dev host where gates run).
fn run_sqlite3_json(
    db_path: &std::path::Path,
    sql: &str,
) -> Result<Vec<u8>, IntegrationError> {
    let output = std::process::Command::new("/usr/bin/sqlite3")
        .arg("-json")
        .arg(db_path)
        .arg(sql)
        .output()
        .map_err(|e| IntegrationError::Backend(format!("inbound sqlite: {e}")))?;
    if !output.status.success() {
        let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
        return Err(IntegrationError::Backend(format!(
            "inbound sqlite failed: {stderr}"
        )));
    }
    Ok(output.stdout)
}

/// Return the current `MAX(ROWID)` of the `message` table in `db_path`.
/// Used to seed the watermark on a fresh/missing watermark file so no
/// pre-existing text is ever returned as new.
pub fn max_rowid_in_db(db_path: &std::path::Path) -> Result<i64, IntegrationError> {
    let stdout = run_sqlite3_json(db_path, max_rowid_sql())?;
    parse_max_rowid(&stdout)
}

#[derive(Debug, Deserialize)]
struct MaxRowidRow {
    rowid: i64,
}

fn parse_max_rowid(json_stdout: &[u8]) -> Result<i64, IntegrationError> {
    let trimmed = json_stdout
        .iter()
        .position(|b| !b.is_ascii_whitespace())
        .map(|p| &json_stdout[p..])
        .unwrap_or(&[]);
    if trimmed.is_empty() {
        return Ok(0);
    }
    let rows: Vec<MaxRowidRow> = serde_json::from_slice(trimmed)
        .map_err(|e| IntegrationError::Backend(format!("max rowid json: {e}")))?;
    Ok(rows.first().map(|r| r.rowid).unwrap_or(0))
}

/// Read inbound messages from `db_path` with `ROWID > min_rowid` and
/// `is_from_me = 0`. The body of each row is decoded from `attributedBody`
/// (falling back to `text`). Rows ordered by ROWID ascending. This is the
/// injectable-path core: any caller (production reader, test seam) supplies the
/// DB path so nothing hardcodes `~/Library/Messages/chat.db`.
pub fn read_inbound_from_db(
    db_path: &std::path::Path,
    min_rowid: i64,
) -> Result<Vec<InboundMessage>, IntegrationError> {
    let stdout = run_sqlite3_json(db_path, &inbound_sql(min_rowid))?;
    parse_inbound_rows(&stdout)
}

/// Resolve the default macOS chat.db path (`~/Library/Messages/chat.db`).
/// Same resolution as `list_chats_from_database`. macOS only.
#[cfg(target_os = "macos")]
pub fn default_chat_db_path() -> std::path::PathBuf {
    let mut db = std::path::PathBuf::from(std::env::var_os("HOME").unwrap_or_default());
    db.push("Library/Messages/chat.db");
    db
}

/// Production reader: read new inbound messages from the user's real Messages
/// library past `min_rowid`. macOS-gated (requires Full Disk Access). The
/// orchestrator (Unit 2/4, daemon-side) calls this in-process.
#[cfg(target_os = "macos")]
pub fn read_inbound_messages(min_rowid: i64) -> Result<Vec<InboundMessage>, IntegrationError> {
    read_inbound_from_db(&default_chat_db_path(), min_rowid)
}

/// Production fresh-boot seed: MAX(ROWID) of the user's real Messages library.
/// macOS-gated. Seed the watermark to this on first boot so no pre-existing
/// text is replayed as new.
#[cfg(target_os = "macos")]
pub fn max_rowid_default_db() -> Result<i64, IntegrationError> {
    max_rowid_in_db(&default_chat_db_path())
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::path::Path;

    fn ts_blob(text: &str) -> Vec<u8> {
        // Reproduce the canonical Messages `attributedBody` typedstream layout
        // (streamtyped header + NSAttributedString/NSMutableString/NSString
        // class chain + `+` string tag + length prefix + UTF-8 body), so the
        // decoder is exercised against the real on-disk shape — including the
        // 0x81+u16 long-length escape for bodies >= 128 bytes.
        let tb = text.as_bytes();
        let mut out: Vec<u8> = Vec::new();
        out.extend_from_slice(&[0x04, 0x0b]);
        out.extend_from_slice(b"streamtyped");
        out.extend_from_slice(&[
            0x81, 0xe8, 0x03, 0x84, 0x01, 0x40, 0x84, 0x84, 0x84, 0x12,
        ]);
        out.extend_from_slice(b"NSAttributedString");
        out.extend_from_slice(&[0x00, 0x84, 0x84, 0x08]);
        out.extend_from_slice(b"NSObject");
        out.extend_from_slice(&[0x00, 0x85, 0x92, 0x84, 0x84, 0x84, 0x0f]);
        out.extend_from_slice(b"NSMutableString");
        out.extend_from_slice(&[0x01, 0x84, 0x84, 0x08]);
        out.extend_from_slice(b"NSString");
        out.extend_from_slice(&[0x01, 0x95, 0x84, 0x01, 0x2b]); // `+` string tag
        if tb.len() < 0x80 {
            out.push(tb.len() as u8);
        } else {
            out.push(0x81);
            out.push((tb.len() & 0xff) as u8);
            out.push(((tb.len() >> 8) & 0xff) as u8);
        }
        out.extend_from_slice(tb);
        out.extend_from_slice(&[0x86, 0x84, 0x02, 0x69, 0x49, 0x00, 0x01]);
        out
    }

    fn hex_encode(bytes: &[u8]) -> String {
        let mut s = String::with_capacity(bytes.len() * 2);
        for b in bytes {
            s.push_str(&format!("{b:02x}"));
        }
        s
    }

    // ---- Pure decode tests (run on ANY OS) ----

    #[test]
    fn decode_short_body() {
        let blob = ts_blob("A7 approve");
        assert_eq!(decode_attributed_body(&blob).as_deref(), Some("A7 approve"));
    }

    #[test]
    fn decode_long_body_uses_u16_length_escape() {
        let long = format!("A7 approve {}", "x".repeat(200));
        let blob = ts_blob(&long);
        assert_eq!(decode_attributed_body(&blob).as_deref(), Some(long.as_str()));
    }

    #[test]
    fn oversized_attributed_body_skips_decode_no_dos_hang() {
        // Remote-DoS regression: a pathological blob — 64 KiB of repeated
        // "NSString" markers — would drive the O(n^2) marker scanners for
        // seconds and wedge the single serial poll loop. The size cap must
        // skip the decode entirely and return quickly with the text fallback.
        let mut blob: Vec<u8> = Vec::with_capacity(64 * 1024);
        while blob.len() < 64 * 1024 {
            blob.extend_from_slice(b"NSString");
        }
        assert!(
            blob.len() > MAX_ATTRIBUTED_BODY_DECODE_BYTES,
            "fixture must exceed the decode cap"
        );

        // The decode short-circuits to None (cap engaged) — proven by both the
        // result AND the wall-clock: a real O(n^2) scan of 64 KiB of markers
        // takes multiple seconds; the cap bounds it to microseconds.
        let start = std::time::Instant::now();
        let decoded = decode_attributed_body(&blob);
        let elapsed = start.elapsed();
        assert_eq!(decoded, None, "oversized blob must not decode to a body");
        assert!(
            elapsed < std::time::Duration::from_millis(100),
            "decode of an oversized blob must be near-instant (cap engaged), took {elapsed:?}"
        );

        // End to end through the reader's row path with a NULL `text` column:
        // resolve_body returns None (blob skipped, no text), so the row is
        // dropped — NOT a hang, NOT raw bytes. (A real reply would instead
        // populate `text` and fall back to it.)
        let hex = hex_encode(&blob);
        assert_eq!(
            resolve_body(Some(&hex), None),
            None,
            "oversized blob + NULL text → bodyless row, dropped"
        );
        // With a text fallback present, the row decodes to the text value.
        assert_eq!(
            resolve_body(Some(&hex), Some("approve")).as_deref(),
            Some("approve"),
            "oversized blob falls back to message.text"
        );
    }

    #[test]
    fn resolve_body_prefers_attributedbody_over_null_text() {
        let blob = ts_blob("approve");
        let hex = hex_encode(&blob);
        // text NULL, attributedBody populated -> decoded body, never "".
        let body = resolve_body(Some(&hex), None);
        assert_eq!(body.as_deref(), Some("approve"));
        assert_ne!(body.as_deref(), Some(""));
    }

    #[test]
    fn resolve_body_falls_back_to_text_when_no_attributedbody() {
        assert_eq!(
            resolve_body(None, Some("deny")).as_deref(),
            Some("deny")
        );
        // empty text + no blob -> None (bodyless row dropped).
        assert_eq!(resolve_body(None, Some("")), None);
        assert_eq!(resolve_body(None, None), None);
    }

    #[test]
    fn parse_inbound_rows_decodes_and_drops_bodyless() {
        let blob = ts_blob("approve");
        let hex = hex_encode(&blob);
        let json = format!(
            "[{{\"rowid\":5,\"handle_id\":\"+15551234567\",\"attributed_body_hex\":\"{hex}\",\"text\":null,\"date\":700000000000000000}}]"
        );
        let rows = parse_inbound_rows(json.as_bytes()).unwrap();
        assert_eq!(rows.len(), 1);
        assert_eq!(rows[0].rowid, 5);
        assert_eq!(rows[0].handle_id, "+15551234567");
        assert_eq!(rows[0].body, "approve");
        assert_eq!(rows[0].date, 700000000000000000);
    }

    #[test]
    fn parse_inbound_rows_empty_stdout_is_zero_rows() {
        assert!(parse_inbound_rows(b"").unwrap().is_empty());
        assert!(parse_inbound_rows(b"  \n").unwrap().is_empty());
    }

    // ---- Watermark persistence (run on ANY OS; SC-2) ----

    #[test]
    fn watermark_persist_survives_restart() {
        let dir = tempfile::tempdir().unwrap();
        let base = dir.path();
        // Nothing persisted yet -> fresh boot.
        assert_eq!(Watermark::load(base).unwrap(), None);

        let wm = Watermark::new(42);
        wm.persist(base).unwrap();

        // "Restart": reload purely from disk, drop the in-memory value.
        let reloaded = Watermark::load(base).unwrap();
        assert_eq!(reloaded, Some(Watermark::new(42)));
        assert_eq!(reloaded.unwrap().last_rowid, 42);
    }

    #[test]
    fn watermark_persist_is_atomic_no_temp_left() {
        let dir = tempfile::tempdir().unwrap();
        let base = dir.path();
        Watermark::new(7).persist(base).unwrap();
        // Only the final file remains — no leftover .tmp.* temp files.
        let entries: Vec<_> = std::fs::read_dir(base)
            .unwrap()
            .map(|e| e.unwrap().file_name().to_string_lossy().to_string())
            .collect();
        assert!(
            entries.contains(&"messages-inbound-watermark.json".to_string()),
            "final watermark file must exist, got {entries:?}"
        );
        assert!(
            !entries.iter().any(|n| n.contains(".tmp.")),
            "no temp file should remain, got {entries:?}"
        );
    }

    // ---- Test seam: real SQL path over a temp chat.db via /usr/bin/sqlite3 ----

    /// Build a temp chat.db with the real `message`/`handle` schema and seed:
    /// (i) an older pre-existing inbound row (rowid 1),
    /// (ii) a NULL-`text` row whose body lives in `attributedBody` (rowid 3),
    /// (iii) an outbound `is_from_me = 1` row (rowid 4, must be excluded),
    /// using a known handle.id. Returns the db path (kept alive by `dir`).
    fn seed_temp_chat_db(dir: &Path, attr_text: &str) -> std::path::PathBuf {
        let db = dir.join("chat.db");
        let blob_hex = hex_encode(&ts_blob(attr_text));
        let schema_and_rows = format!(
            "CREATE TABLE handle (ROWID INTEGER PRIMARY KEY, id TEXT);\n\
             CREATE TABLE message (ROWID INTEGER PRIMARY KEY, handle_id INTEGER, \
                 text TEXT, attributedBody BLOB, is_from_me INTEGER, date INTEGER);\n\
             INSERT INTO handle (ROWID, id) VALUES (1, '+15551234567');\n\
             -- (ii) older pre-existing inbound row\n\
             INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
                 VALUES (1, 1, 'old hello', NULL, 0, 600000000000000000);\n\
             -- a from-me row interleaved (must be excluded)\n\
             INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
                 VALUES (2, 1, 'my outbound', NULL, 1, 650000000000000000);\n\
             -- (i) NULL-text row, body in attributedBody (the row SC-1 depends on)\n\
             INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
                 VALUES (3, 1, NULL, X'{blob_hex}', 0, 700000000000000000);\n\
             -- (iii) outbound after the watermark (must be excluded)\n\
             INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
                 VALUES (4, 1, 'me again', NULL, 1, 750000000000000000);\n"
        );
        let status = std::process::Command::new("/usr/bin/sqlite3")
            .arg(&db)
            .arg(&schema_and_rows)
            .status()
            .expect("sqlite3 must be available to seed the fixture");
        assert!(status.success(), "fixture seed failed");
        db
    }

    /// SC-D fixture — the single-Apple-ID (solo) self-reply shape.
    ///
    /// A NEW sibling of `seed_temp_chat_db` (it MUST stay separate: SC-1's
    /// `attributedbody_null_text_decode_and_watermark` hard-asserts
    /// `rows.len() == 1` against that fixture, so extending it would break SC-1).
    ///
    /// Probe provenance: on a single-Apple-ID account the on-device probe showed
    /// that BOTH the daemon's own outbound prompt AND the user's typed reply land
    /// in `chat.db` twice — once `is_from_me = 1` (the sent/synced-sent copy) and
    /// once `is_from_me = 0` (a received self-echo). The typed reply body sits in
    /// the `text` column (NOT `attributedBody`). The literal probe handle and
    /// chat.db rowids carry no behavioral weight (the filter keys only on
    /// `is_from_me` and ROWID ordering), so this fixture uses the standard test
    /// handle and synthetic monotonic rowids per scope-call C1 — NO literal probe
    /// phone number / rowids appear here.
    ///
    /// Seeds, on the SAME handle past a watermark of 0, four rows:
    ///   (a) `is_from_me = 0` echo of the daemon's own-sent prompt body — KEPT,
    ///   (b) its `is_from_me = 1` synced-sent twin                       — DROPPED,
    ///   (c) `is_from_me = 0` `A0 approve` self-reply                    — KEPT,
    ///   (d) its `is_from_me = 1` synced-sent twin                       — DROPPED.
    fn seed_solo_self_reply_chat_db(dir: &Path) -> std::path::PathBuf {
        let db = dir.join("chat.db");
        // The daemon's own outbound prompt body (the exact `outbound_body` shape:
        // a multi-token line that echoes back `is_from_me = 0` on a solo account).
        let prompt = "Approval needed: send wire transfer\nReply `A0 approve` or `A0 deny`.";
        // Single-quotes escaped for the SQL string literal.
        let prompt_sql = prompt.replace('\'', "''");
        let schema_and_rows = format!(
            "CREATE TABLE handle (ROWID INTEGER PRIMARY KEY, id TEXT);\n\
             CREATE TABLE message (ROWID INTEGER PRIMARY KEY, handle_id INTEGER, \
                 text TEXT, attributedBody BLOB, is_from_me INTEGER, date INTEGER);\n\
             INSERT INTO handle (ROWID, id) VALUES (1, '+15551234567');\n\
             -- (a) received self-echo of the daemon's own prompt (KEEP)\n\
             INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
                 VALUES (10, 1, '{prompt_sql}', NULL, 0, 800000000000000000);\n\
             -- (b) the sent/synced-sent twin of that prompt (DROP)\n\
             INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
                 VALUES (11, 1, '{prompt_sql}', NULL, 1, 800000000000000001);\n\
             -- (c) the user's typed `A0 approve` self-reply, in the text column (KEEP)\n\
             INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
                 VALUES (12, 1, 'A0 approve', NULL, 0, 800000000000000002);\n\
             -- (d) the sent/synced-sent twin of that reply (DROP)\n\
             INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
                 VALUES (13, 1, 'A0 approve', NULL, 1, 800000000000000003);\n"
        );
        let status = std::process::Command::new("/usr/bin/sqlite3")
            .arg(&db)
            .arg(&schema_and_rows)
            .status()
            .expect("sqlite3 must be available to seed the fixture");
        assert!(status.success(), "fixture seed failed");
        db
    }

    #[test]
    fn solo_self_reply_kept_synced_sent_twins_dropped() {
        // SC-D (Wall 1 pinned): the `is_from_me = 0` filter (read.rs:319) KEEPS the
        // solo user's two `is_from_me = 0` rows — the own-prompt self-echo and the
        // `A0 approve` reply — with bodies intact, and DROPS both `is_from_me = 1`
        // synced-sent twins. This is a regression tripwire in the keep-direction:
        // it fails the moment a future edit narrows or handle-scopes the filter
        // such that the solo self-reply would be dropped. Skips when sqlite3 is
        // absent, per existing precedent.
        if !Path::new("/usr/bin/sqlite3").exists() {
            eprintln!("skip: /usr/bin/sqlite3 not present");
            return;
        }
        let dir = tempfile::tempdir().unwrap();
        let db = seed_solo_self_reply_chat_db(dir.path());

        // Watermark 0: all four seeded rows (rowids 10–13) are past it.
        let rows = read_inbound_from_db(&db, 0).unwrap();

        // Exactly the two `is_from_me = 0` rows survive, in ROWID order.
        assert_eq!(rows.len(), 2, "exactly the two is_from_me=0 rows kept, got {rows:?}");

        // (a) the own-prompt self-echo is kept, body intact (the multi-token
        // prompt — Wall 2 would Ignore it at the parser, but Wall 1 keeps the row).
        let prompt = "Approval needed: send wire transfer\nReply `A0 approve` or `A0 deny`.";
        assert_eq!(rows[0].rowid, 10);
        assert_eq!(rows[0].handle_id, "+15551234567");
        assert_eq!(rows[0].body, prompt, "own-prompt echo body kept intact");

        // (c) the `A0 approve` self-reply is kept, body intact.
        assert_eq!(rows[1].rowid, 12);
        assert_eq!(rows[1].handle_id, "+15551234567");
        assert_eq!(rows[1].body, "A0 approve", "solo self-reply body kept intact");

        // Neither `is_from_me = 1` synced-sent twin (rowids 11, 13) appears.
        let returned_rowids: Vec<i64> = rows.iter().map(|r| r.rowid).collect();
        assert!(
            !returned_rowids.contains(&11),
            "the is_from_me=1 prompt twin (rowid 11) must be dropped, got {returned_rowids:?}"
        );
        assert!(
            !returned_rowids.contains(&13),
            "the is_from_me=1 reply twin (rowid 13) must be dropped, got {returned_rowids:?}"
        );
    }

    #[test]
    fn attributedbody_null_text_decode_and_watermark() {
        // SC-1: post-watermark is_from_me=0 rows returned (older excluded);
        // NULL-text/attributedBody row decodes to the exact expected string;
        // fields correct; no replay after restart from persisted watermark.
        if !Path::new("/usr/bin/sqlite3").exists() {
            eprintln!("skip: /usr/bin/sqlite3 not present");
            return;
        }
        let dir = tempfile::tempdir().unwrap();
        let db = seed_temp_chat_db(dir.path(), "A7 approve");

        // Watermark set to 1 (we have already seen the older row).
        let base = dir.path().join("car-home");
        Watermark::new(1).persist(&base).unwrap();
        let wm = Watermark::load(&base).unwrap().unwrap();

        let rows = read_inbound_from_db(&db, wm.last_rowid).unwrap();

        // Exactly one row: rowid 3 (the NULL-text/attributedBody inbound).
        // Older row 1 excluded by watermark; from-me rows 2 and 4 excluded.
        assert_eq!(rows.len(), 1, "got {rows:?}");
        let row = &rows[0];
        assert_eq!(row.rowid, 3);
        assert_eq!(row.handle_id, "+15551234567");
        // The load-bearing assertion: decoded body is the EXACT string, not ""
        // and not raw bytes.
        assert_eq!(row.body, "A7 approve");
        assert_ne!(row.body, "");
        assert_eq!(row.date, 700000000000000000);

        // Advance + persist the watermark to the max rowid we just saw.
        let new_high = rows.iter().map(|r| r.rowid).max().unwrap();
        Watermark::new(new_high).persist(&base).unwrap();

        // "Restart": reconstruct the watermark purely from disk and re-read the
        // unchanged DB -> the same row must NOT be replayed.
        let wm2 = Watermark::load(&base).unwrap().unwrap();
        assert_eq!(wm2.last_rowid, 3);
        let rows_after = read_inbound_from_db(&db, wm2.last_rowid).unwrap();
        assert!(
            rows_after.is_empty(),
            "row replayed after restart: {rows_after:?}"
        );
    }

    #[test]
    fn fresh_watermark_seeds_to_max_rowid_no_replay() {
        // SC-2 edge: fresh/missing watermark seeds to MAX(ROWID) so the first
        // read of an unchanged DB yields zero new rows.
        if !Path::new("/usr/bin/sqlite3").exists() {
            eprintln!("skip: /usr/bin/sqlite3 not present");
            return;
        }
        let dir = tempfile::tempdir().unwrap();
        let db = seed_temp_chat_db(dir.path(), "approve");
        let base = dir.path().join("car-home");

        // Fresh boot: no watermark on disk.
        assert_eq!(Watermark::load(&base).unwrap(), None);

        // Seed to MAX(ROWID).
        let max = max_rowid_in_db(&db).unwrap();
        assert_eq!(max, 4);
        Watermark::new(max).persist(&base).unwrap();

        // First read after fresh-boot seed: zero new rows on the unchanged DB.
        let wm = Watermark::load(&base).unwrap().unwrap();
        let rows = read_inbound_from_db(&db, wm.last_rowid).unwrap();
        assert!(rows.is_empty(), "fresh seed should yield zero rows: {rows:?}");
    }
}