use serde::{Deserialize, Serialize};
use super::IntegrationError;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct InboundMessage {
pub rowid: i64,
pub handle_id: String,
pub body: String,
pub date: i64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct Watermark {
pub last_rowid: i64,
}
impl Watermark {
pub fn new(last_rowid: i64) -> Self {
Self { last_rowid }
}
pub fn path_in(base_dir: &std::path::Path) -> std::path::PathBuf {
base_dir.join("messages-inbound-watermark.json")
}
pub fn load(base_dir: &std::path::Path) -> Result<Option<Self>, IntegrationError> {
let path = Self::path_in(base_dir);
match std::fs::read(&path) {
Ok(bytes) => {
let wm: Watermark = serde_json::from_slice(&bytes).map_err(|e| {
IntegrationError::Backend(format!("watermark json parse: {e}"))
})?;
Ok(Some(wm))
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(IntegrationError::Backend(format!("watermark read: {e}"))),
}
}
pub fn persist(&self, base_dir: &std::path::Path) -> Result<(), IntegrationError> {
std::fs::create_dir_all(base_dir).map_err(|e| {
IntegrationError::Backend(format!("watermark mkdir {}: {e}", base_dir.display()))
})?;
let path = Self::path_in(base_dir);
let bytes = serde_json::to_vec(self)
.map_err(|e| IntegrationError::Backend(format!("watermark serialize: {e}")))?;
atomic_write(&path, &bytes)
}
}
fn atomic_write(path: &std::path::Path, bytes: &[u8]) -> Result<(), IntegrationError> {
use std::sync::atomic::{AtomicU64, Ordering};
static SEQ: AtomicU64 = AtomicU64::new(0);
let seq = SEQ.fetch_add(1, Ordering::Relaxed);
let mut tmp_os = path.as_os_str().to_owned();
tmp_os.push(format!(".tmp.{}.{}", std::process::id(), seq));
let tmp = std::path::PathBuf::from(tmp_os);
std::fs::write(&tmp, bytes)
.map_err(|e| IntegrationError::Backend(format!("watermark temp write: {e}")))?;
std::fs::rename(&tmp, path)
.map_err(|e| IntegrationError::Backend(format!("watermark rename: {e}")))
}
const MAX_ATTRIBUTED_BODY_DECODE_BYTES: usize = 8 * 1024;
pub fn decode_attributed_body(blob: &[u8]) -> Option<String> {
if blob.len() > MAX_ATTRIBUTED_BODY_DECODE_BYTES {
return None;
}
if let Some(s) = decode_via_nsstring_marker(blob) {
return Some(s);
}
decode_via_plus_scan(blob)
}
fn decode_via_nsstring_marker(blob: &[u8]) -> Option<String> {
let marker = b"NSString";
let mut search_from = 0usize;
while let Some(rel) = find_subsequence(&blob[search_from..], marker) {
let after_marker = search_from + rel + marker.len();
if let Some(plus_rel) = find_byte(&blob[after_marker..], b'+') {
let plus_idx = after_marker + plus_rel;
if let Some(s) = read_length_prefixed_utf8(blob, plus_idx + 1) {
if !s.is_empty() {
return Some(s);
}
}
}
search_from = after_marker;
}
None
}
fn decode_via_plus_scan(blob: &[u8]) -> Option<String> {
let mut i = 0usize;
while i < blob.len() {
if blob[i] == b'+' {
if let Some(s) = read_length_prefixed_utf8(blob, i + 1) {
if !s.is_empty() && s.chars().all(|c| !c.is_control() || c == '\n' || c == '\t') {
return Some(s);
}
}
}
i += 1;
}
None
}
fn read_length_prefixed_utf8(blob: &[u8], pos: usize) -> Option<String> {
let first = *blob.get(pos)?;
let (len, start) = if first == 0x81 {
let lo = *blob.get(pos + 1)? as usize;
let hi = *blob.get(pos + 2)? as usize;
((lo | (hi << 8)), pos + 3)
} else if first < 0x80 {
(first as usize, pos + 1)
} else {
return None;
};
if len == 0 {
return None;
}
let end = start.checked_add(len)?;
let slice = blob.get(start..end)?;
std::str::from_utf8(slice).ok().map(|s| s.to_string())
}
fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
if needle.is_empty() || haystack.len() < needle.len() {
return None;
}
haystack
.windows(needle.len())
.position(|window| window == needle)
}
fn find_byte(haystack: &[u8], byte: u8) -> Option<usize> {
haystack.iter().position(|&b| b == byte)
}
pub fn resolve_body(attributed_body_hex: Option<&str>, text: Option<&str>) -> Option<String> {
if let Some(hex) = attributed_body_hex {
if !hex.is_empty() {
if let Some(bytes) = hex_decode(hex) {
if let Some(decoded) = decode_attributed_body(&bytes) {
if !decoded.is_empty() {
return Some(decoded);
}
}
}
}
}
match text {
Some(t) if !t.is_empty() => Some(t.to_string()),
_ => None,
}
}
fn hex_decode(hex: &str) -> Option<Vec<u8>> {
let bytes = hex.as_bytes();
if bytes.len() % 2 != 0 {
return None;
}
let mut out = Vec::with_capacity(bytes.len() / 2);
let mut i = 0;
while i < bytes.len() {
let hi = hex_nibble(bytes[i])?;
let lo = hex_nibble(bytes[i + 1])?;
out.push((hi << 4) | lo);
i += 2;
}
Some(out)
}
fn hex_nibble(b: u8) -> Option<u8> {
match b {
b'0'..=b'9' => Some(b - b'0'),
b'a'..=b'f' => Some(b - b'a' + 10),
b'A'..=b'F' => Some(b - b'A' + 10),
_ => None,
}
}
fn inbound_sql(min_rowid: i64) -> String {
format!(
"SELECT message.ROWID AS rowid, \
handle.id AS handle_id, \
CASE WHEN message.attributedBody IS NULL THEN NULL \
ELSE hex(message.attributedBody) END AS attributed_body_hex, \
message.text AS text, \
message.date AS date \
FROM message \
JOIN handle ON message.handle_id = handle.ROWID \
WHERE message.ROWID > {min_rowid} AND message.is_from_me = 0 \
ORDER BY message.ROWID ASC;"
)
}
fn max_rowid_sql() -> &'static str {
"SELECT COALESCE(MAX(ROWID), 0) AS rowid FROM message;"
}
#[derive(Debug, Deserialize)]
struct RawInboundRow {
rowid: i64,
handle_id: String,
attributed_body_hex: Option<String>,
text: Option<String>,
date: i64,
}
pub fn parse_inbound_rows(json_stdout: &[u8]) -> Result<Vec<InboundMessage>, IntegrationError> {
let trimmed = json_stdout
.iter()
.position(|b| !b.is_ascii_whitespace())
.map(|p| &json_stdout[p..])
.unwrap_or(&[]);
if trimmed.is_empty() {
return Ok(vec![]);
}
let raw: Vec<RawInboundRow> = serde_json::from_slice(trimmed)
.map_err(|e| IntegrationError::Backend(format!("inbound sqlite json: {e}")))?;
let mut out = Vec::with_capacity(raw.len());
for r in raw {
let body = match resolve_body(r.attributed_body_hex.as_deref(), r.text.as_deref()) {
Some(b) => b,
None => continue,
};
out.push(InboundMessage {
rowid: r.rowid,
handle_id: r.handle_id,
body,
date: r.date,
});
}
Ok(out)
}
fn run_sqlite3_json(
db_path: &std::path::Path,
sql: &str,
) -> Result<Vec<u8>, IntegrationError> {
let output = std::process::Command::new("/usr/bin/sqlite3")
.arg("-json")
.arg(db_path)
.arg(sql)
.output()
.map_err(|e| IntegrationError::Backend(format!("inbound sqlite: {e}")))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
return Err(IntegrationError::Backend(format!(
"inbound sqlite failed: {stderr}"
)));
}
Ok(output.stdout)
}
pub fn max_rowid_in_db(db_path: &std::path::Path) -> Result<i64, IntegrationError> {
let stdout = run_sqlite3_json(db_path, max_rowid_sql())?;
parse_max_rowid(&stdout)
}
#[derive(Debug, Deserialize)]
struct MaxRowidRow {
rowid: i64,
}
fn parse_max_rowid(json_stdout: &[u8]) -> Result<i64, IntegrationError> {
let trimmed = json_stdout
.iter()
.position(|b| !b.is_ascii_whitespace())
.map(|p| &json_stdout[p..])
.unwrap_or(&[]);
if trimmed.is_empty() {
return Ok(0);
}
let rows: Vec<MaxRowidRow> = serde_json::from_slice(trimmed)
.map_err(|e| IntegrationError::Backend(format!("max rowid json: {e}")))?;
Ok(rows.first().map(|r| r.rowid).unwrap_or(0))
}
pub fn read_inbound_from_db(
db_path: &std::path::Path,
min_rowid: i64,
) -> Result<Vec<InboundMessage>, IntegrationError> {
let stdout = run_sqlite3_json(db_path, &inbound_sql(min_rowid))?;
parse_inbound_rows(&stdout)
}
#[cfg(target_os = "macos")]
pub fn default_chat_db_path() -> std::path::PathBuf {
let mut db = std::path::PathBuf::from(std::env::var_os("HOME").unwrap_or_default());
db.push("Library/Messages/chat.db");
db
}
#[cfg(target_os = "macos")]
pub fn read_inbound_messages(min_rowid: i64) -> Result<Vec<InboundMessage>, IntegrationError> {
read_inbound_from_db(&default_chat_db_path(), min_rowid)
}
#[cfg(target_os = "macos")]
pub fn max_rowid_default_db() -> Result<i64, IntegrationError> {
max_rowid_in_db(&default_chat_db_path())
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
fn ts_blob(text: &str) -> Vec<u8> {
let tb = text.as_bytes();
let mut out: Vec<u8> = Vec::new();
out.extend_from_slice(&[0x04, 0x0b]);
out.extend_from_slice(b"streamtyped");
out.extend_from_slice(&[
0x81, 0xe8, 0x03, 0x84, 0x01, 0x40, 0x84, 0x84, 0x84, 0x12,
]);
out.extend_from_slice(b"NSAttributedString");
out.extend_from_slice(&[0x00, 0x84, 0x84, 0x08]);
out.extend_from_slice(b"NSObject");
out.extend_from_slice(&[0x00, 0x85, 0x92, 0x84, 0x84, 0x84, 0x0f]);
out.extend_from_slice(b"NSMutableString");
out.extend_from_slice(&[0x01, 0x84, 0x84, 0x08]);
out.extend_from_slice(b"NSString");
out.extend_from_slice(&[0x01, 0x95, 0x84, 0x01, 0x2b]); if tb.len() < 0x80 {
out.push(tb.len() as u8);
} else {
out.push(0x81);
out.push((tb.len() & 0xff) as u8);
out.push(((tb.len() >> 8) & 0xff) as u8);
}
out.extend_from_slice(tb);
out.extend_from_slice(&[0x86, 0x84, 0x02, 0x69, 0x49, 0x00, 0x01]);
out
}
fn hex_encode(bytes: &[u8]) -> String {
let mut s = String::with_capacity(bytes.len() * 2);
for b in bytes {
s.push_str(&format!("{b:02x}"));
}
s
}
#[test]
fn decode_short_body() {
let blob = ts_blob("A7 approve");
assert_eq!(decode_attributed_body(&blob).as_deref(), Some("A7 approve"));
}
#[test]
fn decode_long_body_uses_u16_length_escape() {
let long = format!("A7 approve {}", "x".repeat(200));
let blob = ts_blob(&long);
assert_eq!(decode_attributed_body(&blob).as_deref(), Some(long.as_str()));
}
#[test]
fn oversized_attributed_body_skips_decode_no_dos_hang() {
let mut blob: Vec<u8> = Vec::with_capacity(64 * 1024);
while blob.len() < 64 * 1024 {
blob.extend_from_slice(b"NSString");
}
assert!(
blob.len() > MAX_ATTRIBUTED_BODY_DECODE_BYTES,
"fixture must exceed the decode cap"
);
let start = std::time::Instant::now();
let decoded = decode_attributed_body(&blob);
let elapsed = start.elapsed();
assert_eq!(decoded, None, "oversized blob must not decode to a body");
assert!(
elapsed < std::time::Duration::from_millis(100),
"decode of an oversized blob must be near-instant (cap engaged), took {elapsed:?}"
);
let hex = hex_encode(&blob);
assert_eq!(
resolve_body(Some(&hex), None),
None,
"oversized blob + NULL text → bodyless row, dropped"
);
assert_eq!(
resolve_body(Some(&hex), Some("approve")).as_deref(),
Some("approve"),
"oversized blob falls back to message.text"
);
}
#[test]
fn resolve_body_prefers_attributedbody_over_null_text() {
let blob = ts_blob("approve");
let hex = hex_encode(&blob);
let body = resolve_body(Some(&hex), None);
assert_eq!(body.as_deref(), Some("approve"));
assert_ne!(body.as_deref(), Some(""));
}
#[test]
fn resolve_body_falls_back_to_text_when_no_attributedbody() {
assert_eq!(
resolve_body(None, Some("deny")).as_deref(),
Some("deny")
);
assert_eq!(resolve_body(None, Some("")), None);
assert_eq!(resolve_body(None, None), None);
}
#[test]
fn parse_inbound_rows_decodes_and_drops_bodyless() {
let blob = ts_blob("approve");
let hex = hex_encode(&blob);
let json = format!(
"[{{\"rowid\":5,\"handle_id\":\"+15551234567\",\"attributed_body_hex\":\"{hex}\",\"text\":null,\"date\":700000000000000000}}]"
);
let rows = parse_inbound_rows(json.as_bytes()).unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].rowid, 5);
assert_eq!(rows[0].handle_id, "+15551234567");
assert_eq!(rows[0].body, "approve");
assert_eq!(rows[0].date, 700000000000000000);
}
#[test]
fn parse_inbound_rows_empty_stdout_is_zero_rows() {
assert!(parse_inbound_rows(b"").unwrap().is_empty());
assert!(parse_inbound_rows(b" \n").unwrap().is_empty());
}
#[test]
fn watermark_persist_survives_restart() {
let dir = tempfile::tempdir().unwrap();
let base = dir.path();
assert_eq!(Watermark::load(base).unwrap(), None);
let wm = Watermark::new(42);
wm.persist(base).unwrap();
let reloaded = Watermark::load(base).unwrap();
assert_eq!(reloaded, Some(Watermark::new(42)));
assert_eq!(reloaded.unwrap().last_rowid, 42);
}
#[test]
fn watermark_persist_is_atomic_no_temp_left() {
let dir = tempfile::tempdir().unwrap();
let base = dir.path();
Watermark::new(7).persist(base).unwrap();
let entries: Vec<_> = std::fs::read_dir(base)
.unwrap()
.map(|e| e.unwrap().file_name().to_string_lossy().to_string())
.collect();
assert!(
entries.contains(&"messages-inbound-watermark.json".to_string()),
"final watermark file must exist, got {entries:?}"
);
assert!(
!entries.iter().any(|n| n.contains(".tmp.")),
"no temp file should remain, got {entries:?}"
);
}
fn seed_temp_chat_db(dir: &Path, attr_text: &str) -> std::path::PathBuf {
let db = dir.join("chat.db");
let blob_hex = hex_encode(&ts_blob(attr_text));
let schema_and_rows = format!(
"CREATE TABLE handle (ROWID INTEGER PRIMARY KEY, id TEXT);\n\
CREATE TABLE message (ROWID INTEGER PRIMARY KEY, handle_id INTEGER, \
text TEXT, attributedBody BLOB, is_from_me INTEGER, date INTEGER);\n\
INSERT INTO handle (ROWID, id) VALUES (1, '+15551234567');\n\
-- (ii) older pre-existing inbound row\n\
INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
VALUES (1, 1, 'old hello', NULL, 0, 600000000000000000);\n\
-- a from-me row interleaved (must be excluded)\n\
INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
VALUES (2, 1, 'my outbound', NULL, 1, 650000000000000000);\n\
-- (i) NULL-text row, body in attributedBody (the row SC-1 depends on)\n\
INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
VALUES (3, 1, NULL, X'{blob_hex}', 0, 700000000000000000);\n\
-- (iii) outbound after the watermark (must be excluded)\n\
INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
VALUES (4, 1, 'me again', NULL, 1, 750000000000000000);\n"
);
let status = std::process::Command::new("/usr/bin/sqlite3")
.arg(&db)
.arg(&schema_and_rows)
.status()
.expect("sqlite3 must be available to seed the fixture");
assert!(status.success(), "fixture seed failed");
db
}
fn seed_solo_self_reply_chat_db(dir: &Path) -> std::path::PathBuf {
let db = dir.join("chat.db");
let prompt = "Approval needed: send wire transfer\nReply `A0 approve` or `A0 deny`.";
let prompt_sql = prompt.replace('\'', "''");
let schema_and_rows = format!(
"CREATE TABLE handle (ROWID INTEGER PRIMARY KEY, id TEXT);\n\
CREATE TABLE message (ROWID INTEGER PRIMARY KEY, handle_id INTEGER, \
text TEXT, attributedBody BLOB, is_from_me INTEGER, date INTEGER);\n\
INSERT INTO handle (ROWID, id) VALUES (1, '+15551234567');\n\
-- (a) received self-echo of the daemon's own prompt (KEEP)\n\
INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
VALUES (10, 1, '{prompt_sql}', NULL, 0, 800000000000000000);\n\
-- (b) the sent/synced-sent twin of that prompt (DROP)\n\
INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
VALUES (11, 1, '{prompt_sql}', NULL, 1, 800000000000000001);\n\
-- (c) the user's typed `A0 approve` self-reply, in the text column (KEEP)\n\
INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
VALUES (12, 1, 'A0 approve', NULL, 0, 800000000000000002);\n\
-- (d) the sent/synced-sent twin of that reply (DROP)\n\
INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me, date) \
VALUES (13, 1, 'A0 approve', NULL, 1, 800000000000000003);\n"
);
let status = std::process::Command::new("/usr/bin/sqlite3")
.arg(&db)
.arg(&schema_and_rows)
.status()
.expect("sqlite3 must be available to seed the fixture");
assert!(status.success(), "fixture seed failed");
db
}
#[test]
fn solo_self_reply_kept_synced_sent_twins_dropped() {
if !Path::new("/usr/bin/sqlite3").exists() {
eprintln!("skip: /usr/bin/sqlite3 not present");
return;
}
let dir = tempfile::tempdir().unwrap();
let db = seed_solo_self_reply_chat_db(dir.path());
let rows = read_inbound_from_db(&db, 0).unwrap();
assert_eq!(rows.len(), 2, "exactly the two is_from_me=0 rows kept, got {rows:?}");
let prompt = "Approval needed: send wire transfer\nReply `A0 approve` or `A0 deny`.";
assert_eq!(rows[0].rowid, 10);
assert_eq!(rows[0].handle_id, "+15551234567");
assert_eq!(rows[0].body, prompt, "own-prompt echo body kept intact");
assert_eq!(rows[1].rowid, 12);
assert_eq!(rows[1].handle_id, "+15551234567");
assert_eq!(rows[1].body, "A0 approve", "solo self-reply body kept intact");
let returned_rowids: Vec<i64> = rows.iter().map(|r| r.rowid).collect();
assert!(
!returned_rowids.contains(&11),
"the is_from_me=1 prompt twin (rowid 11) must be dropped, got {returned_rowids:?}"
);
assert!(
!returned_rowids.contains(&13),
"the is_from_me=1 reply twin (rowid 13) must be dropped, got {returned_rowids:?}"
);
}
#[test]
fn attributedbody_null_text_decode_and_watermark() {
if !Path::new("/usr/bin/sqlite3").exists() {
eprintln!("skip: /usr/bin/sqlite3 not present");
return;
}
let dir = tempfile::tempdir().unwrap();
let db = seed_temp_chat_db(dir.path(), "A7 approve");
let base = dir.path().join("car-home");
Watermark::new(1).persist(&base).unwrap();
let wm = Watermark::load(&base).unwrap().unwrap();
let rows = read_inbound_from_db(&db, wm.last_rowid).unwrap();
assert_eq!(rows.len(), 1, "got {rows:?}");
let row = &rows[0];
assert_eq!(row.rowid, 3);
assert_eq!(row.handle_id, "+15551234567");
assert_eq!(row.body, "A7 approve");
assert_ne!(row.body, "");
assert_eq!(row.date, 700000000000000000);
let new_high = rows.iter().map(|r| r.rowid).max().unwrap();
Watermark::new(new_high).persist(&base).unwrap();
let wm2 = Watermark::load(&base).unwrap().unwrap();
assert_eq!(wm2.last_rowid, 3);
let rows_after = read_inbound_from_db(&db, wm2.last_rowid).unwrap();
assert!(
rows_after.is_empty(),
"row replayed after restart: {rows_after:?}"
);
}
#[test]
fn fresh_watermark_seeds_to_max_rowid_no_replay() {
if !Path::new("/usr/bin/sqlite3").exists() {
eprintln!("skip: /usr/bin/sqlite3 not present");
return;
}
let dir = tempfile::tempdir().unwrap();
let db = seed_temp_chat_db(dir.path(), "approve");
let base = dir.path().join("car-home");
assert_eq!(Watermark::load(&base).unwrap(), None);
let max = max_rowid_in_db(&db).unwrap();
assert_eq!(max, 4);
Watermark::new(max).persist(&base).unwrap();
let wm = Watermark::load(&base).unwrap().unwrap();
let rows = read_inbound_from_db(&db, wm.last_rowid).unwrap();
assert!(rows.is_empty(), "fresh seed should yield zero rows: {rows:?}");
}
}