Skip to main content

construct/channels/
imessage.rs

1use crate::channels::traits::{Channel, ChannelMessage, SendMessage};
2use async_trait::async_trait;
3use directories::UserDirs;
4use rusqlite::{Connection, OpenFlags};
5use std::path::Path;
6use tokio::sync::mpsc;
7
8/// Extract plain text from an iMessage `attributedBody` typedstream blob.
9///
10/// Modern macOS (Ventura+) stores message content in `attributedBody` as an
11/// `NSMutableAttributedString` serialized via Apple's typedstream format,
12/// rather than the plain `text` column.
13///
14/// This follows the well-documented marker-based approach used by LangChain,
15/// steipete/imsg, and mac_apt (all MIT-licensed). See:
16/// <https://chrissardegna.com/blog/reverse-engineering-apples-typedstream-format/>
17fn extract_text_from_attributed_body(blob: &[u8]) -> Option<String> {
18    // Find the start-of-text marker: [0x01, 0x2B]
19    // 0x2B is the C-string type tag in Apple's typedstream format.
20    let marker_pos = blob.windows(2).position(|w| w == [0x01, 0x2B])?;
21    let rest = blob.get(marker_pos + 2..)?;
22
23    if rest.is_empty() {
24        return None;
25    }
26
27    // Read variable-length prefix immediately after the marker.
28    // The length determines text extent — we do NOT scan for an end marker,
29    // because byte pairs like [0x86, 0x84] can appear inside valid UTF-8
30    // (e.g. U+2184 LATIN SMALL LETTER REVERSED C encodes to E2 86 84).
31    //
32    //   0x00-0x7F => literal length (1 byte)
33    //   0x81      => next 2 bytes are little-endian u16 length
34    //   0x82      => next 4 bytes are little-endian u32 length
35    //   0x80, 0x83+ are not observed in iMessage typedstreams; reject gracefully.
36    let (length, text_start) = match rest[0] {
37        0x81 if rest.len() >= 3 => {
38            let len = u16::from_le_bytes([rest[1], rest[2]]) as usize;
39            (len, 3)
40        }
41        0x82 if rest.len() >= 5 => {
42            let len = u32::from_le_bytes([rest[1], rest[2], rest[3], rest[4]]) as usize;
43            (len, 5)
44        }
45        b if b <= 0x7F => (b as usize, 1),
46        _ => return None,
47    };
48
49    let text_bytes = rest.get(text_start..text_start + length)?;
50    std::str::from_utf8(text_bytes).ok().map(str::to_owned)
51}
52
53/// Resolve message content from the `text` column with `attributedBody` fallback.
54///
55/// Prefers the plain `text` column when present. Falls back to parsing the
56/// typedstream blob in `attributedBody` (modern macOS). Logs a warning when
57/// `attributedBody` exists but cannot be parsed.
58fn resolve_message_content(rowid: i64, text: Option<String>, body: Option<Vec<u8>>) -> String {
59    text.filter(|t| !t.trim().is_empty())
60        .or_else(|| {
61            let parsed = body.as_deref().and_then(extract_text_from_attributed_body);
62            if parsed.is_none() && body.as_ref().is_some_and(|b| !b.is_empty()) {
63                tracing::warn!(rowid, "failed to parse attributedBody");
64            }
65            parsed
66        })
67        .unwrap_or_default()
68}
69
70/// iMessage channel using macOS `AppleScript` bridge.
71/// Polls the Messages database for new messages and sends replies via `osascript`.
72#[derive(Clone)]
73pub struct IMessageChannel {
74    allowed_contacts: Vec<String>,
75    poll_interval_secs: u64,
76}
77
78impl IMessageChannel {
79    pub fn new(allowed_contacts: Vec<String>) -> Self {
80        Self {
81            allowed_contacts,
82            poll_interval_secs: 3,
83        }
84    }
85
86    fn is_contact_allowed(&self, sender: &str) -> bool {
87        if self.allowed_contacts.iter().any(|u| u == "*") {
88            return true;
89        }
90        self.allowed_contacts
91            .iter()
92            .any(|u| u.eq_ignore_ascii_case(sender))
93    }
94}
95
96/// Escape a string for safe interpolation into `AppleScript`.
97///
98/// This prevents injection attacks by escaping:
99/// - Backslashes (`\` → `\\`)
100/// - Double quotes (`"` → `\"`)
101/// - Newlines (`\n` → `\\n`, `\r` → `\\r`) to prevent code injection via line breaks
102fn escape_applescript(s: &str) -> String {
103    s.replace('\\', "\\\\")
104        .replace('"', "\\\"")
105        .replace('\n', "\\n")
106        .replace('\r', "\\r")
107}
108
109/// Validate that a target looks like a valid phone number or email address.
110///
111/// This is a defense-in-depth measure to reject obviously malicious targets
112/// before they reach `AppleScript` interpolation.
113///
114/// Valid patterns:
115/// - Phone: starts with `+` followed by digits (with optional spaces/dashes)
116/// - Email: contains `@` with alphanumeric chars on both sides
117fn is_valid_imessage_target(target: &str) -> bool {
118    let target = target.trim();
119    if target.is_empty() {
120        return false;
121    }
122
123    // Phone number: +1234567890 or +1 234-567-8900
124    if target.starts_with('+') {
125        let digits_only: String = target.chars().filter(char::is_ascii_digit).collect();
126        // Must have at least 7 digits (shortest valid phone numbers)
127        return digits_only.len() >= 7 && digits_only.len() <= 15;
128    }
129
130    // Email: simple validation (contains @ with chars on both sides)
131    if let Some(at_pos) = target.find('@') {
132        let local = &target[..at_pos];
133        let domain = &target[at_pos + 1..];
134
135        // Local part: non-empty, alphanumeric + common email chars
136        let local_valid = !local.is_empty()
137            && local
138                .chars()
139                .all(|c| c.is_alphanumeric() || "._+-".contains(c));
140
141        // Domain: non-empty, contains a dot, alphanumeric + dots/hyphens
142        let domain_valid = !domain.is_empty()
143            && domain.contains('.')
144            && domain
145                .chars()
146                .all(|c| c.is_alphanumeric() || ".-".contains(c));
147
148        return local_valid && domain_valid;
149    }
150
151    false
152}
153
154#[async_trait]
155impl Channel for IMessageChannel {
156    fn name(&self) -> &str {
157        "imessage"
158    }
159
160    async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
161        // Defense-in-depth: validate target format before any interpolation
162        if !is_valid_imessage_target(&message.recipient) {
163            anyhow::bail!(
164                "Invalid iMessage target: must be a phone number (+1234567890) or email (user@example.com)"
165            );
166        }
167
168        // SECURITY: Escape both message AND target to prevent AppleScript injection
169        // See: CWE-78 (OS Command Injection)
170        let escaped_msg = escape_applescript(&message.content);
171        let escaped_target = escape_applescript(&message.recipient);
172
173        let script = format!(
174            r#"tell application "Messages"
175    set targetService to 1st account whose service type = iMessage
176    set targetBuddy to participant "{escaped_target}" of targetService
177    send "{escaped_msg}" to targetBuddy
178end tell"#
179        );
180
181        let output = tokio::process::Command::new("osascript")
182            .arg("-e")
183            .arg(&script)
184            .output()
185            .await?;
186
187        if !output.status.success() {
188            let stderr = String::from_utf8_lossy(&output.stderr);
189            anyhow::bail!("iMessage send failed: {stderr}");
190        }
191
192        Ok(())
193    }
194
195    async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
196        tracing::info!("iMessage channel listening (AppleScript bridge)...");
197
198        // Query the Messages SQLite database for new messages
199        // The database is at ~/Library/Messages/chat.db
200        let db_path = UserDirs::new()
201            .map(|u| u.home_dir().join("Library/Messages/chat.db"))
202            .ok_or_else(|| anyhow::anyhow!("Cannot find home directory"))?;
203
204        if !db_path.exists() {
205            anyhow::bail!(
206                "Messages database not found at {}. Ensure Messages.app is set up and Full Disk Access is granted.",
207                db_path.display()
208            );
209        }
210
211        // Open a persistent read-only connection instead of creating
212        // a new one on every 3-second poll cycle.
213        let path = db_path.to_path_buf();
214        let conn = tokio::task::spawn_blocking(move || -> anyhow::Result<Connection> {
215            Ok(Connection::open_with_flags(
216                &path,
217                OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
218            )?)
219        })
220        .await??;
221
222        // Track the last ROWID we've seen (shuttle conn in and out)
223        let (mut conn, initial_rowid) =
224            tokio::task::spawn_blocking(move || -> anyhow::Result<(Connection, i64)> {
225                let rowid = {
226                    let mut stmt =
227                        conn.prepare("SELECT MAX(ROWID) FROM message WHERE is_from_me = 0")?;
228                    let rowid: Option<i64> = stmt.query_row([], |row| row.get(0))?;
229                    rowid.unwrap_or(0)
230                };
231                Ok((conn, rowid))
232            })
233            .await??;
234        let mut last_rowid = initial_rowid;
235
236        loop {
237            tokio::time::sleep(tokio::time::Duration::from_secs(self.poll_interval_secs)).await;
238
239            let since = last_rowid;
240            let (returned_conn, poll_result) = tokio::task::spawn_blocking(
241                move || -> (Connection, anyhow::Result<Vec<(i64, String, String)>>) {
242                    let result = (|| -> anyhow::Result<Vec<(i64, String, String)>> {
243                        let mut stmt = conn.prepare(
244                            "SELECT m.ROWID, h.id, m.text, m.attributedBody \
245                     FROM message m \
246                     JOIN handle h ON m.handle_id = h.ROWID \
247                     WHERE m.ROWID > ?1 \
248                     AND m.is_from_me = 0 \
249                     AND (m.text IS NOT NULL OR m.attributedBody IS NOT NULL) \
250                     ORDER BY m.ROWID ASC \
251                     LIMIT 20",
252                        )?;
253                        let rows = stmt.query_map([since], |row| {
254                            let rowid = row.get::<_, i64>(0)?;
255                            let sender = row.get::<_, String>(1)?;
256                            let text: Option<String> = row.get(2)?;
257                            let body: Option<Vec<u8>> = row.get(3)?;
258                            Ok((rowid, sender, resolve_message_content(rowid, text, body)))
259                        })?;
260                        let results = rows.collect::<Result<Vec<_>, _>>()?;
261                        Ok(results)
262                    })();
263
264                    (conn, result)
265                },
266            )
267            .await
268            .map_err(|e| anyhow::anyhow!("iMessage poll worker join error: {e}"))?;
269            conn = returned_conn;
270
271            match poll_result {
272                Ok(messages) => {
273                    for (rowid, sender, text) in messages {
274                        if rowid > last_rowid {
275                            last_rowid = rowid;
276                        }
277
278                        if !self.is_contact_allowed(&sender) {
279                            continue;
280                        }
281
282                        if text.trim().is_empty() {
283                            continue;
284                        }
285
286                        let msg = ChannelMessage {
287                            id: rowid.to_string(),
288                            sender: sender.clone(),
289                            reply_target: sender.clone(),
290                            content: text,
291                            channel: "imessage".to_string(),
292                            timestamp: std::time::SystemTime::now()
293                                .duration_since(std::time::UNIX_EPOCH)
294                                .unwrap_or_default()
295                                .as_secs(),
296                            thread_ts: None,
297                            interruption_scope_id: None,
298                            attachments: vec![],
299                        };
300
301                        if tx.send(msg).await.is_err() {
302                            return Ok(());
303                        }
304                    }
305                }
306                Err(e) => {
307                    tracing::warn!("iMessage poll error: {e}");
308                }
309            }
310        }
311    }
312
313    async fn health_check(&self) -> bool {
314        if !cfg!(target_os = "macos") {
315            return false;
316        }
317
318        let db_path = UserDirs::new()
319            .map(|u| u.home_dir().join("Library/Messages/chat.db"))
320            .unwrap_or_default();
321
322        db_path.exists()
323    }
324}
325
326/// Get the current max ROWID from the messages table.
327/// Uses rusqlite with parameterized queries for security (CWE-89 prevention).
328async fn get_max_rowid(db_path: &Path) -> anyhow::Result<i64> {
329    let path = db_path.to_path_buf();
330    let result = tokio::task::spawn_blocking(move || -> anyhow::Result<i64> {
331        let conn = Connection::open_with_flags(
332            &path,
333            OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
334        )?;
335        let mut stmt = conn.prepare("SELECT MAX(ROWID) FROM message WHERE is_from_me = 0")?;
336        let rowid: Option<i64> = stmt.query_row([], |row| row.get(0))?;
337        Ok(rowid.unwrap_or(0))
338    })
339    .await??;
340    Ok(result)
341}
342
343/// Fetch messages newer than `since_rowid`.
344/// Uses rusqlite with parameterized queries for security (CWE-89 prevention).
345/// The `since_rowid` parameter is bound safely, preventing SQL injection.
346async fn fetch_new_messages(
347    db_path: &Path,
348    since_rowid: i64,
349) -> anyhow::Result<Vec<(i64, String, String)>> {
350    let path = db_path.to_path_buf();
351    let results =
352        tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<(i64, String, String)>> {
353            let conn = Connection::open_with_flags(
354                &path,
355                OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
356            )?;
357            let mut stmt = conn.prepare(
358                "SELECT m.ROWID, h.id, m.text, m.attributedBody \
359             FROM message m \
360             JOIN handle h ON m.handle_id = h.ROWID \
361             WHERE m.ROWID > ?1 \
362             AND m.is_from_me = 0 \
363             AND (m.text IS NOT NULL OR m.attributedBody IS NOT NULL) \
364             ORDER BY m.ROWID ASC \
365             LIMIT 20",
366            )?;
367            let rows = stmt.query_map([since_rowid], |row| {
368                let rowid = row.get::<_, i64>(0)?;
369                let sender = row.get::<_, String>(1)?;
370                let text: Option<String> = row.get(2)?;
371                let body: Option<Vec<u8>> = row.get(3)?;
372                Ok((rowid, sender, resolve_message_content(rowid, text, body)))
373            })?;
374            let results: Vec<_> = rows
375                .collect::<Result<Vec<_>, _>>()?
376                .into_iter()
377                .filter(|(_, _, content)| !content.trim().is_empty())
378                .collect();
379            Ok(results)
380        })
381        .await??;
382    Ok(results)
383}
384
385#[cfg(test)]
386mod tests {
387    use super::*;
388
389    #[test]
390    fn creates_with_contacts() {
391        let ch = IMessageChannel::new(vec!["+1234567890".into()]);
392        assert_eq!(ch.allowed_contacts.len(), 1);
393        assert_eq!(ch.poll_interval_secs, 3);
394    }
395
396    #[test]
397    fn creates_with_empty_contacts() {
398        let ch = IMessageChannel::new(vec![]);
399        assert!(ch.allowed_contacts.is_empty());
400    }
401
402    #[test]
403    fn wildcard_allows_anyone() {
404        let ch = IMessageChannel::new(vec!["*".into()]);
405        assert!(ch.is_contact_allowed("+1234567890"));
406        assert!(ch.is_contact_allowed("random@icloud.com"));
407        assert!(ch.is_contact_allowed(""));
408    }
409
410    #[test]
411    fn specific_contact_allowed() {
412        let ch = IMessageChannel::new(vec!["+1234567890".into(), "user@icloud.com".into()]);
413        assert!(ch.is_contact_allowed("+1234567890"));
414        assert!(ch.is_contact_allowed("user@icloud.com"));
415    }
416
417    #[test]
418    fn unknown_contact_denied() {
419        let ch = IMessageChannel::new(vec!["+1234567890".into()]);
420        assert!(!ch.is_contact_allowed("+9999999999"));
421        assert!(!ch.is_contact_allowed("hacker@evil.com"));
422    }
423
424    #[test]
425    fn contact_case_insensitive() {
426        let ch = IMessageChannel::new(vec!["User@iCloud.com".into()]);
427        assert!(ch.is_contact_allowed("user@icloud.com"));
428        assert!(ch.is_contact_allowed("USER@ICLOUD.COM"));
429    }
430
431    #[test]
432    fn empty_allowlist_denies_all() {
433        let ch = IMessageChannel::new(vec![]);
434        assert!(!ch.is_contact_allowed("+1234567890"));
435        assert!(!ch.is_contact_allowed("anyone"));
436    }
437
438    #[test]
439    fn name_returns_imessage() {
440        let ch = IMessageChannel::new(vec![]);
441        assert_eq!(ch.name(), "imessage");
442    }
443
444    #[test]
445    fn wildcard_among_others_still_allows_all() {
446        let ch = IMessageChannel::new(vec!["+111".into(), "*".into(), "+222".into()]);
447        assert!(ch.is_contact_allowed("totally-unknown"));
448    }
449
450    #[test]
451    fn contact_with_spaces_exact_match() {
452        let ch = IMessageChannel::new(vec!["  spaced  ".into()]);
453        assert!(ch.is_contact_allowed("  spaced  "));
454        assert!(!ch.is_contact_allowed("spaced"));
455    }
456
457    // ══════════════════════════════════════════════════════════
458    // AppleScript Escaping Tests (CWE-78 Prevention)
459    // ══════════════════════════════════════════════════════════
460
461    #[test]
462    fn escape_applescript_double_quotes() {
463        assert_eq!(escape_applescript(r#"hello "world""#), r#"hello \"world\""#);
464    }
465
466    #[test]
467    fn escape_applescript_backslashes() {
468        assert_eq!(escape_applescript(r"path\to\file"), r"path\\to\\file");
469    }
470
471    #[test]
472    fn escape_applescript_mixed() {
473        assert_eq!(
474            escape_applescript(r#"say "hello\" world"#),
475            r#"say \"hello\\\" world"#
476        );
477    }
478
479    #[test]
480    fn escape_applescript_injection_attempt() {
481        // This is the exact attack vector from the security report
482        let malicious = r#"" & do shell script "id" & ""#;
483        let escaped = escape_applescript(malicious);
484        // After escaping, the quotes should be escaped and not break out
485        assert_eq!(escaped, r#"\" & do shell script \"id\" & \""#);
486        // Verify all quotes are now escaped (preceded by backslash)
487        // The escaped string should not have any unescaped quotes (quote not preceded by backslash)
488        let chars: Vec<char> = escaped.chars().collect();
489        for (i, &c) in chars.iter().enumerate() {
490            if c == '"' {
491                // Every quote must be preceded by a backslash
492                assert!(
493                    i > 0 && chars[i - 1] == '\\',
494                    "Found unescaped quote at position {i}"
495                );
496            }
497        }
498    }
499
500    #[test]
501    fn escape_applescript_empty_string() {
502        assert_eq!(escape_applescript(""), "");
503    }
504
505    #[test]
506    fn escape_applescript_no_special_chars() {
507        assert_eq!(escape_applescript("hello world"), "hello world");
508    }
509
510    #[test]
511    fn escape_applescript_unicode() {
512        assert_eq!(escape_applescript("hello 🦀 world"), "hello 🦀 world");
513    }
514
515    #[test]
516    fn escape_applescript_newlines_escaped() {
517        assert_eq!(escape_applescript("line1\nline2"), "line1\\nline2");
518        assert_eq!(escape_applescript("line1\rline2"), "line1\\rline2");
519        assert_eq!(escape_applescript("line1\r\nline2"), "line1\\r\\nline2");
520    }
521
522    // ══════════════════════════════════════════════════════════
523    // Target Validation Tests
524    // ══════════════════════════════════════════════════════════
525
526    #[test]
527    fn valid_phone_number_simple() {
528        assert!(is_valid_imessage_target("+1234567890"));
529    }
530
531    #[test]
532    fn valid_phone_number_with_country_code() {
533        assert!(is_valid_imessage_target("+14155551234"));
534    }
535
536    #[test]
537    fn valid_phone_number_with_spaces() {
538        assert!(is_valid_imessage_target("+1 415 555 1234"));
539    }
540
541    #[test]
542    fn valid_phone_number_with_dashes() {
543        assert!(is_valid_imessage_target("+1-415-555-1234"));
544    }
545
546    #[test]
547    fn valid_phone_number_international() {
548        assert!(is_valid_imessage_target("+447911123456")); // UK
549        assert!(is_valid_imessage_target("+81312345678")); // Japan
550    }
551
552    #[test]
553    fn valid_email_simple() {
554        assert!(is_valid_imessage_target("user@example.com"));
555    }
556
557    #[test]
558    fn valid_email_with_subdomain() {
559        assert!(is_valid_imessage_target("user@mail.example.com"));
560    }
561
562    #[test]
563    fn valid_email_with_plus() {
564        assert!(is_valid_imessage_target("user+tag@example.com"));
565    }
566
567    #[test]
568    fn valid_email_with_dots() {
569        assert!(is_valid_imessage_target("first.last@example.com"));
570    }
571
572    #[test]
573    fn valid_email_icloud() {
574        assert!(is_valid_imessage_target("user@icloud.com"));
575        assert!(is_valid_imessage_target("user@me.com"));
576    }
577
578    #[test]
579    fn invalid_target_empty() {
580        assert!(!is_valid_imessage_target(""));
581        assert!(!is_valid_imessage_target("   "));
582    }
583
584    #[test]
585    fn invalid_target_no_plus_prefix() {
586        // Phone numbers must start with +
587        assert!(!is_valid_imessage_target("1234567890"));
588    }
589
590    #[test]
591    fn invalid_target_too_short_phone() {
592        // Less than 7 digits
593        assert!(!is_valid_imessage_target("+123456"));
594    }
595
596    #[test]
597    fn invalid_target_too_long_phone() {
598        // More than 15 digits
599        assert!(!is_valid_imessage_target("+1234567890123456"));
600    }
601
602    #[test]
603    fn invalid_target_email_no_at() {
604        assert!(!is_valid_imessage_target("userexample.com"));
605    }
606
607    #[test]
608    fn invalid_target_email_no_domain() {
609        assert!(!is_valid_imessage_target("user@"));
610    }
611
612    #[test]
613    fn invalid_target_email_no_local() {
614        assert!(!is_valid_imessage_target("@example.com"));
615    }
616
617    #[test]
618    fn invalid_target_email_no_dot_in_domain() {
619        assert!(!is_valid_imessage_target("user@localhost"));
620    }
621
622    #[test]
623    fn invalid_target_injection_attempt() {
624        // The exact attack vector from the security report
625        assert!(!is_valid_imessage_target(r#"" & do shell script "id" & ""#));
626    }
627
628    #[test]
629    fn invalid_target_applescript_injection() {
630        // Various injection attempts
631        assert!(!is_valid_imessage_target(r#"test" & quit"#));
632        assert!(!is_valid_imessage_target(r"test\ndo shell script"));
633        assert!(!is_valid_imessage_target("test\"; malicious code; \""));
634    }
635
636    #[test]
637    fn invalid_target_special_chars() {
638        assert!(!is_valid_imessage_target("user<script>@example.com"));
639        assert!(!is_valid_imessage_target("user@example.com; rm -rf /"));
640    }
641
642    #[test]
643    fn invalid_target_null_byte() {
644        assert!(!is_valid_imessage_target("user\0@example.com"));
645    }
646
647    #[test]
648    fn invalid_target_newline() {
649        assert!(!is_valid_imessage_target("user\n@example.com"));
650    }
651
652    #[test]
653    fn target_with_leading_trailing_whitespace_trimmed() {
654        // Should trim and validate
655        assert!(is_valid_imessage_target("  +1234567890  "));
656        assert!(is_valid_imessage_target("  user@example.com  "));
657    }
658
659    // ══════════════════════════════════════════════════════════
660    // SQLite/rusqlite Database Tests (CWE-89 Prevention)
661    // ══════════════════════════════════════════════════════════
662
663    /// Helper to create a temporary test database with Messages schema
664    fn create_test_db() -> (tempfile::TempDir, std::path::PathBuf) {
665        let dir = tempfile::tempdir().unwrap();
666        let db_path = dir.path().join("chat.db");
667
668        let conn = Connection::open(&db_path).unwrap();
669
670        // Create minimal schema matching macOS Messages.app
671        conn.execute_batch(
672            "CREATE TABLE handle (
673                ROWID INTEGER PRIMARY KEY,
674                id TEXT NOT NULL
675            );
676            CREATE TABLE message (
677                ROWID INTEGER PRIMARY KEY,
678                handle_id INTEGER,
679                text TEXT,
680                attributedBody BLOB,
681                is_from_me INTEGER DEFAULT 0,
682                FOREIGN KEY (handle_id) REFERENCES handle(ROWID)
683            );",
684        )
685        .unwrap();
686
687        (dir, db_path)
688    }
689
690    #[tokio::test]
691    async fn get_max_rowid_empty_database() {
692        let (_dir, db_path) = create_test_db();
693        let result = get_max_rowid(&db_path).await;
694        assert!(result.is_ok());
695        // Empty table returns 0 (NULL coalesced)
696        assert_eq!(result.unwrap(), 0);
697    }
698
699    #[tokio::test]
700    async fn get_max_rowid_with_messages() {
701        let (_dir, db_path) = create_test_db();
702
703        // Insert test data
704        {
705            let conn = Connection::open(&db_path).unwrap();
706            conn.execute(
707                "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')",
708                [],
709            )
710            .unwrap();
711            conn.execute(
712                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (100, 1, 'Hello', 0)",
713                []
714            ).unwrap();
715            conn.execute(
716                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (200, 1, 'World', 0)",
717                []
718            ).unwrap();
719            // This one is from_me=1, should be ignored
720            conn.execute(
721                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (300, 1, 'Sent', 1)",
722                []
723            ).unwrap();
724        }
725
726        let result = get_max_rowid(&db_path).await.unwrap();
727        // Should return 200, not 300 (ignores is_from_me=1)
728        assert_eq!(result, 200);
729    }
730
731    #[tokio::test]
732    async fn get_max_rowid_nonexistent_database() {
733        let path = std::path::Path::new("/nonexistent/path/chat.db");
734        let result = get_max_rowid(path).await;
735        assert!(result.is_err());
736    }
737
738    #[tokio::test]
739    async fn fetch_new_messages_empty_database() {
740        let (_dir, db_path) = create_test_db();
741        let result = fetch_new_messages(&db_path, 0).await;
742        assert!(result.is_ok());
743        assert!(result.unwrap().is_empty());
744    }
745
746    #[tokio::test]
747    async fn fetch_new_messages_returns_correct_data() {
748        let (_dir, db_path) = create_test_db();
749
750        // Insert test data
751        {
752            let conn = Connection::open(&db_path).unwrap();
753            conn.execute(
754                "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')",
755                [],
756            )
757            .unwrap();
758            conn.execute(
759                "INSERT INTO handle (ROWID, id) VALUES (2, 'user@example.com')",
760                [],
761            )
762            .unwrap();
763            conn.execute(
764                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'First message', 0)",
765                []
766            ).unwrap();
767            conn.execute(
768                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (20, 2, 'Second message', 0)",
769                []
770            ).unwrap();
771        }
772
773        let result = fetch_new_messages(&db_path, 0).await.unwrap();
774        assert_eq!(result.len(), 2);
775        assert_eq!(
776            result[0],
777            (10, "+1234567890".to_string(), "First message".to_string())
778        );
779        assert_eq!(
780            result[1],
781            (
782                20,
783                "user@example.com".to_string(),
784                "Second message".to_string()
785            )
786        );
787    }
788
789    #[tokio::test]
790    async fn fetch_new_messages_filters_by_rowid() {
791        let (_dir, db_path) = create_test_db();
792
793        // Insert test data
794        {
795            let conn = Connection::open(&db_path).unwrap();
796            conn.execute(
797                "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')",
798                [],
799            )
800            .unwrap();
801            conn.execute(
802                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'Old message', 0)",
803                []
804            ).unwrap();
805            conn.execute(
806                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (20, 1, 'New message', 0)",
807                []
808            ).unwrap();
809        }
810
811        // Fetch only messages after ROWID 15
812        let result = fetch_new_messages(&db_path, 15).await.unwrap();
813        assert_eq!(result.len(), 1);
814        assert_eq!(result[0].0, 20);
815        assert_eq!(result[0].2, "New message");
816    }
817
818    #[tokio::test]
819    async fn fetch_new_messages_excludes_sent_messages() {
820        let (_dir, db_path) = create_test_db();
821
822        // Insert test data
823        {
824            let conn = Connection::open(&db_path).unwrap();
825            conn.execute(
826                "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')",
827                [],
828            )
829            .unwrap();
830            conn.execute(
831                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'Received', 0)",
832                []
833            ).unwrap();
834            conn.execute(
835                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (20, 1, 'Sent by me', 1)",
836                []
837            ).unwrap();
838        }
839
840        let result = fetch_new_messages(&db_path, 0).await.unwrap();
841        assert_eq!(result.len(), 1);
842        assert_eq!(result[0].2, "Received");
843    }
844
845    #[tokio::test]
846    async fn fetch_new_messages_excludes_null_text_and_null_body() {
847        let (_dir, db_path) = create_test_db();
848
849        // Insert test data: one with text, one with neither text nor attributedBody
850        {
851            let conn = Connection::open(&db_path).unwrap();
852            conn.execute(
853                "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')",
854                [],
855            )
856            .unwrap();
857            conn.execute(
858                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'Has text', 0)",
859                []
860            ).unwrap();
861            conn.execute(
862                "INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me) VALUES (20, 1, NULL, NULL, 0)",
863                [],
864            )
865            .unwrap();
866        }
867
868        let result = fetch_new_messages(&db_path, 0).await.unwrap();
869        // Message with NULL text AND NULL attributedBody is excluded
870        assert_eq!(result.len(), 1);
871        assert_eq!(result[0].2, "Has text");
872    }
873
874    #[tokio::test]
875    async fn fetch_new_messages_respects_limit() {
876        let (_dir, db_path) = create_test_db();
877
878        // Insert 25 messages (limit is 20)
879        {
880            let conn = Connection::open(&db_path).unwrap();
881            conn.execute(
882                "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')",
883                [],
884            )
885            .unwrap();
886            for i in 1..=25 {
887                conn.execute(
888                    &format!("INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES ({i}, 1, 'Message {i}', 0)"),
889                    []
890                ).unwrap();
891            }
892        }
893
894        let result = fetch_new_messages(&db_path, 0).await.unwrap();
895        assert_eq!(result.len(), 20); // Limited to 20
896        assert_eq!(result[0].0, 1); // First message
897        assert_eq!(result[19].0, 20); // 20th message
898    }
899
900    #[tokio::test]
901    async fn fetch_new_messages_ordered_by_rowid_asc() {
902        let (_dir, db_path) = create_test_db();
903
904        // Insert messages out of order
905        {
906            let conn = Connection::open(&db_path).unwrap();
907            conn.execute(
908                "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')",
909                [],
910            )
911            .unwrap();
912            conn.execute(
913                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (30, 1, 'Third', 0)",
914                []
915            ).unwrap();
916            conn.execute(
917                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'First', 0)",
918                []
919            ).unwrap();
920            conn.execute(
921                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (20, 1, 'Second', 0)",
922                []
923            ).unwrap();
924        }
925
926        let result = fetch_new_messages(&db_path, 0).await.unwrap();
927        assert_eq!(result.len(), 3);
928        assert_eq!(result[0].0, 10);
929        assert_eq!(result[1].0, 20);
930        assert_eq!(result[2].0, 30);
931    }
932
933    #[tokio::test]
934    async fn fetch_new_messages_nonexistent_database() {
935        let path = std::path::Path::new("/nonexistent/path/chat.db");
936        let result = fetch_new_messages(path, 0).await;
937        assert!(result.is_err());
938    }
939
940    #[tokio::test]
941    async fn fetch_new_messages_handles_special_characters() {
942        let (_dir, db_path) = create_test_db();
943
944        // Insert message with special characters (potential SQL injection patterns)
945        {
946            let conn = Connection::open(&db_path).unwrap();
947            conn.execute(
948                "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')",
949                [],
950            )
951            .unwrap();
952            conn.execute(
953                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'Hello \"world'' OR 1=1; DROP TABLE message;--', 0)",
954                []
955            ).unwrap();
956        }
957
958        let result = fetch_new_messages(&db_path, 0).await.unwrap();
959        assert_eq!(result.len(), 1);
960        // The special characters should be preserved, not interpreted as SQL
961        assert!(result[0].2.contains("DROP TABLE"));
962    }
963
964    #[tokio::test]
965    async fn fetch_new_messages_handles_unicode() {
966        let (_dir, db_path) = create_test_db();
967
968        {
969            let conn = Connection::open(&db_path).unwrap();
970            conn.execute(
971                "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')",
972                [],
973            )
974            .unwrap();
975            conn.execute(
976                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'Hello 🦀 世界 مرحبا', 0)",
977                []
978            ).unwrap();
979        }
980
981        let result = fetch_new_messages(&db_path, 0).await.unwrap();
982        assert_eq!(result.len(), 1);
983        assert_eq!(result[0].2, "Hello 🦀 世界 مرحبا");
984    }
985
986    #[tokio::test]
987    async fn fetch_new_messages_filters_empty_text() {
988        let (_dir, db_path) = create_test_db();
989
990        {
991            let conn = Connection::open(&db_path).unwrap();
992            conn.execute(
993                "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')",
994                [],
995            )
996            .unwrap();
997            conn.execute(
998                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, '', 0)",
999                [],
1000            )
1001            .unwrap();
1002        }
1003
1004        let result = fetch_new_messages(&db_path, 0).await.unwrap();
1005        // Empty-content messages are filtered out
1006        assert!(result.is_empty());
1007    }
1008
1009    #[tokio::test]
1010    async fn fetch_new_messages_negative_rowid_edge_case() {
1011        let (_dir, db_path) = create_test_db();
1012
1013        {
1014            let conn = Connection::open(&db_path).unwrap();
1015            conn.execute(
1016                "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')",
1017                [],
1018            )
1019            .unwrap();
1020            conn.execute(
1021                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'Test', 0)",
1022                []
1023            ).unwrap();
1024        }
1025
1026        // Negative rowid should still work (fetch all messages with ROWID > -1)
1027        let result = fetch_new_messages(&db_path, -1).await.unwrap();
1028        assert_eq!(result.len(), 1);
1029    }
1030
1031    #[tokio::test]
1032    async fn fetch_new_messages_large_rowid_edge_case() {
1033        let (_dir, db_path) = create_test_db();
1034
1035        {
1036            let conn = Connection::open(&db_path).unwrap();
1037            conn.execute(
1038                "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')",
1039                [],
1040            )
1041            .unwrap();
1042            conn.execute(
1043                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'Test', 0)",
1044                []
1045            ).unwrap();
1046        }
1047
1048        // Very large rowid should return empty (no messages after this)
1049        let result = fetch_new_messages(&db_path, i64::MAX - 1).await.unwrap();
1050        assert!(result.is_empty());
1051    }
1052
1053    // ══════════════════════════════════════════════════════════
1054    // attributedBody / typedstream parsing tests
1055    // ══════════════════════════════════════════════════════════
1056
1057    /// Build a minimal typedstream blob containing the given text.
1058    /// Format: [header] [class bytes] [0x01, 0x2B] [length-prefix] [utf8] [0x86, 0x84]
1059    fn make_attributed_body(text: &str) -> Vec<u8> {
1060        let text_bytes = text.as_bytes();
1061        let mut blob = Vec::new();
1062        // Fake streamtyped header (not parsed by our extractor)
1063        blob.extend_from_slice(b"\x04\x0bstreamtyped\x81\xe8\x03");
1064        // Class hierarchy bytes (skipped by marker scan)
1065        blob.extend_from_slice(b"\x84\x84NSMutableAttributedString\x00");
1066        // Start-of-text marker
1067        blob.push(0x01);
1068        blob.push(0x2B);
1069        // Length prefix (try_from panics on violation — correct for test helper)
1070        let len = text_bytes.len();
1071        if len <= 0x7F {
1072            blob.push(u8::try_from(len).unwrap());
1073        } else if len <= 0xFFFF {
1074            blob.push(0x81);
1075            blob.extend_from_slice(&u16::try_from(len).unwrap().to_le_bytes());
1076        } else {
1077            blob.push(0x82);
1078            blob.extend_from_slice(&u32::try_from(len).unwrap().to_le_bytes());
1079        }
1080        // Text content
1081        blob.extend_from_slice(text_bytes);
1082        // End-of-text marker
1083        blob.push(0x86);
1084        blob.push(0x84);
1085        // Trailing attribute bytes (ignored)
1086        blob.extend_from_slice(b"\x86\x86");
1087        blob
1088    }
1089
1090    // Real attributedBody blob from macOS chat.db, captured during testing.
1091    // Decodes to: "Testing with imsg installed"
1092    const REAL_BLOB_TESTING: &[u8] = &[
1093        0x04, 0x0B, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6D, 0x74, 0x79, 0x70, 0x65, 0x64, 0x81, 0xE8,
1094        0x03, 0x84, 0x01, 0x40, 0x84, 0x84, 0x84, 0x12, 0x4E, 0x53, 0x41, 0x74, 0x74, 0x72, 0x69,
1095        0x62, 0x75, 0x74, 0x65, 0x64, 0x53, 0x74, 0x72, 0x69, 0x6E, 0x67, 0x00, 0x84, 0x84, 0x08,
1096        0x4E, 0x53, 0x4F, 0x62, 0x6A, 0x65, 0x63, 0x74, 0x00, 0x85, 0x92, 0x84, 0x84, 0x84, 0x08,
1097        0x4E, 0x53, 0x53, 0x74, 0x72, 0x69, 0x6E, 0x67, 0x01, 0x94, 0x84, 0x01, 0x2B, 0x1B, 0x54,
1098        0x65, 0x73, 0x74, 0x69, 0x6E, 0x67, 0x20, 0x77, 0x69, 0x74, 0x68, 0x20, 0x69, 0x6D, 0x73,
1099        0x67, 0x20, 0x69, 0x6E, 0x73, 0x74, 0x61, 0x6C, 0x6C, 0x65, 0x64, 0x86, 0x84, 0x02, 0x69,
1100        0x49, 0x01, 0x1B, 0x92, 0x84, 0x84, 0x84, 0x0C, 0x4E, 0x53, 0x44, 0x69, 0x63, 0x74, 0x69,
1101        0x6F, 0x6E, 0x61, 0x72, 0x79, 0x00, 0x94, 0x84, 0x01, 0x69, 0x01, 0x92, 0x84, 0x96, 0x96,
1102        0x1D, 0x5F, 0x5F, 0x6B, 0x49, 0x4D, 0x4D, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x50, 0x61,
1103        0x72, 0x74, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x4E, 0x61, 0x6D, 0x65,
1104        0x86, 0x92, 0x84, 0x84, 0x84, 0x08, 0x4E, 0x53, 0x4E, 0x75, 0x6D, 0x62, 0x65, 0x72, 0x00,
1105        0x84, 0x84, 0x07, 0x4E, 0x53, 0x56, 0x61, 0x6C, 0x75, 0x65, 0x00, 0x94, 0x84, 0x01, 0x2A,
1106        0x84, 0x99, 0x99, 0x00, 0x86, 0x86, 0x86,
1107    ];
1108
1109    // Real attributedBody blob from unknownbreaker/MessageBridge (MIT).
1110    // Decodes to: "1"
1111    const REAL_BLOB_ONE: &[u8] = &[
1112        0x04, 0x0b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x74, 0x79, 0x70, 0x65, 0x64, 0x81, 0xe8,
1113        0x03, 0x84, 0x01, 0x40, 0x84, 0x84, 0x84, 0x12, 0x4e, 0x53, 0x41, 0x74, 0x74, 0x72, 0x69,
1114        0x62, 0x75, 0x74, 0x65, 0x64, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x00, 0x84, 0x84, 0x08,
1115        0x4e, 0x53, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x00, 0x85, 0x92, 0x84, 0x84, 0x84, 0x08,
1116        0x4e, 0x53, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x01, 0x94, 0x84, 0x01, 0x2b, 0x01, 0x31,
1117        0x86, 0x84, 0x02, 0x69, 0x49, 0x01, 0x01, 0x92, 0x84, 0x84, 0x84, 0x0c, 0x4e, 0x53, 0x44,
1118        0x69, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x72, 0x79, 0x00, 0x94, 0x84, 0x01, 0x69, 0x01,
1119        0x92, 0x84, 0x96, 0x96, 0x1d, 0x5f, 0x5f, 0x6b, 0x49, 0x4d, 0x4d, 0x65, 0x73, 0x73, 0x61,
1120        0x67, 0x65, 0x50, 0x61, 0x72, 0x74, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65,
1121        0x4e, 0x61, 0x6d, 0x65, 0x86, 0x92, 0x84, 0x84, 0x84, 0x08, 0x4e, 0x53, 0x4e, 0x75, 0x6d,
1122        0x62, 0x65, 0x72, 0x00, 0x84, 0x84, 0x07, 0x4e, 0x53, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x00,
1123        0x94, 0x84, 0x01, 0x2a, 0x84, 0x99, 0x99, 0x00, 0x86, 0x86, 0x86,
1124    ];
1125
1126    #[test]
1127    fn extract_real_blob_testing_with_imsg() {
1128        let result = extract_text_from_attributed_body(REAL_BLOB_TESTING);
1129        assert_eq!(result, Some("Testing with imsg installed".to_string()));
1130    }
1131
1132    #[test]
1133    fn extract_real_blob_single_char() {
1134        // From unknownbreaker/MessageBridge (MIT)
1135        let result = extract_text_from_attributed_body(REAL_BLOB_ONE);
1136        assert_eq!(result, Some("1".to_string()));
1137    }
1138
1139    #[test]
1140    fn extract_text_containing_end_marker_bytes() {
1141        // U+2184 LATIN SMALL LETTER REVERSED C encodes to E2 86 84 in UTF-8.
1142        // The old parser scanned for [0x86, 0x84] as end marker and would
1143        // truncate here. The length-based parser must handle this correctly.
1144        let text = "before\u{2184}after";
1145        let blob = make_attributed_body(text);
1146        let result = extract_text_from_attributed_body(&blob);
1147        assert_eq!(result, Some(text.to_string()));
1148    }
1149
1150    #[test]
1151    fn extract_zero_length_returns_empty_string() {
1152        // Marker found with length prefix = 0. Valid typedstream encoding
1153        // for an empty NSString — parser returns Some(""), which
1154        // resolve_message_content() will treat as empty and discard.
1155        let blob = b"\x01\x2B\x00";
1156        let result = extract_text_from_attributed_body(blob);
1157        assert_eq!(result, Some(String::new()));
1158    }
1159
1160    #[test]
1161    fn extract_no_markers_returns_none() {
1162        let blob = b"just some random bytes with no markers";
1163        let result = extract_text_from_attributed_body(blob);
1164        assert!(result.is_none());
1165    }
1166
1167    #[test]
1168    fn extract_invalid_utf8_returns_none() {
1169        let blob = b"\x01\x2B\x04\xFF\xFE\x80\x81";
1170        let result = extract_text_from_attributed_body(blob);
1171        assert!(result.is_none());
1172    }
1173
1174    #[test]
1175    fn extract_truncated_blob_returns_none() {
1176        // Length prefix says 27 bytes but blob is truncated
1177        let blob = b"\x01\x2B\x1B\x54\x65\x73\x74";
1178        let result = extract_text_from_attributed_body(blob);
1179        assert!(result.is_none());
1180    }
1181
1182    #[test]
1183    fn extract_long_text_two_byte_length() {
1184        // >127 bytes triggers 0x81 length prefix
1185        let long_text: String = "A".repeat(200);
1186        let blob = make_attributed_body(&long_text);
1187        let result = extract_text_from_attributed_body(&blob);
1188        assert_eq!(result, Some(long_text));
1189    }
1190
1191    #[test]
1192    fn extract_four_byte_length_prefix() {
1193        // Test the 0x82 branch: 4-byte little-endian u32 length prefix.
1194        // Construct directly — make_attributed_body only emits 0x82 for >64KB.
1195        let text = b"Hello";
1196        let mut blob = Vec::new();
1197        blob.extend_from_slice(b"\x01\x2B"); // start marker
1198        blob.push(0x82); // 4-byte length tag
1199        blob.extend_from_slice(&5_u32.to_le_bytes()); // length = 5
1200        blob.extend_from_slice(text);
1201        let result = extract_text_from_attributed_body(&blob);
1202        assert_eq!(result, Some("Hello".to_string()));
1203    }
1204
1205    #[test]
1206    fn extract_text_boundary_127_to_128() {
1207        // 127 is max single-byte length, 128 is min two-byte length
1208        for len in [127, 128] {
1209            let text: String = "X".repeat(len);
1210            let blob = make_attributed_body(&text);
1211            let result = extract_text_from_attributed_body(&blob);
1212            assert_eq!(result, Some(text), "failed at length {len}");
1213        }
1214    }
1215
1216    #[tokio::test]
1217    async fn fetch_new_messages_reads_attributed_body_fallback() {
1218        let (_dir, db_path) = create_test_db();
1219
1220        {
1221            let conn = Connection::open(&db_path).unwrap();
1222            conn.execute(
1223                "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')",
1224                [],
1225            )
1226            .unwrap();
1227            // Real blob from macOS chat.db — text=NULL, attributedBody present
1228            conn.execute(
1229                "INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me) VALUES (10, 1, NULL, ?1, 0)",
1230                [REAL_BLOB_TESTING.to_vec()],
1231            ).unwrap();
1232        }
1233
1234        let result = fetch_new_messages(&db_path, 0).await.unwrap();
1235        assert_eq!(result.len(), 1);
1236        assert_eq!(result[0].2, "Testing with imsg installed");
1237    }
1238
1239    #[tokio::test]
1240    async fn fetch_new_messages_empty_text_falls_back_to_attributed_body() {
1241        let (_dir, db_path) = create_test_db();
1242
1243        {
1244            let conn = Connection::open(&db_path).unwrap();
1245            conn.execute(
1246                "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')",
1247                [],
1248            )
1249            .unwrap();
1250            // text = '' (empty string, not NULL) with valid attributedBody
1251            conn.execute(
1252                "INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me) VALUES (10, 1, '', ?1, 0)",
1253                [REAL_BLOB_ONE.to_vec()],
1254            ).unwrap();
1255        }
1256
1257        let result = fetch_new_messages(&db_path, 0).await.unwrap();
1258        assert_eq!(result.len(), 1);
1259        assert_eq!(result[0].2, "1");
1260    }
1261
1262    #[tokio::test]
1263    async fn fetch_new_messages_prefers_text_over_attributed_body() {
1264        let (_dir, db_path) = create_test_db();
1265
1266        {
1267            let conn = Connection::open(&db_path).unwrap();
1268            conn.execute(
1269                "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')",
1270                [],
1271            )
1272            .unwrap();
1273            // Both text and attributedBody present — text column wins
1274            conn.execute(
1275                "INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me) VALUES (10, 1, 'Plain text', ?1, 0)",
1276                [REAL_BLOB_ONE.to_vec()],
1277            ).unwrap();
1278        }
1279
1280        let result = fetch_new_messages(&db_path, 0).await.unwrap();
1281        assert_eq!(result.len(), 1);
1282        assert_eq!(result[0].2, "Plain text");
1283    }
1284
1285    #[tokio::test]
1286    async fn fetch_new_messages_mixed_text_and_attributed_body() {
1287        let (_dir, db_path) = create_test_db();
1288
1289        {
1290            let conn = Connection::open(&db_path).unwrap();
1291            conn.execute(
1292                "INSERT INTO handle (ROWID, id) VALUES (1, '+1234567890')",
1293                [],
1294            )
1295            .unwrap();
1296            // Old-style message with text column
1297            conn.execute(
1298                "INSERT INTO message (ROWID, handle_id, text, is_from_me) VALUES (10, 1, 'Legacy message', 0)",
1299                []
1300            ).unwrap();
1301            // Modern message with only attributedBody (real blob)
1302            conn.execute(
1303                "INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me) VALUES (20, 1, NULL, ?1, 0)",
1304                [REAL_BLOB_ONE.to_vec()],
1305            ).unwrap();
1306            // Message with neither (should be excluded)
1307            conn.execute(
1308                "INSERT INTO message (ROWID, handle_id, text, attributedBody, is_from_me) VALUES (30, 1, NULL, NULL, 0)",
1309                [],
1310            ).unwrap();
1311        }
1312
1313        let result = fetch_new_messages(&db_path, 0).await.unwrap();
1314        assert_eq!(result.len(), 2);
1315        assert_eq!(result[0].2, "Legacy message");
1316        assert_eq!(result[1].2, "1");
1317    }
1318}