Skip to main content

imessage_watcher/
listener.rs

1use parking_lot::Mutex;
2/// File system watcher for chat.db + WAL with debounce and polling orchestration.
3///
4/// Flow:
5///   1. `notify` watches chat.db and chat.db-wal
6///   2. FS events are debounced to 500ms
7///   3. On each debounced event, acquires a process lock (Semaphore(1))
8///   4. Calls MessagePoller and ChatUpdatePoller
9///   5. Emits events via tokio broadcast channel
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use notify::{Event, EventKind, RecursiveMode, Watcher};
15use tokio::sync::{Semaphore, broadcast};
16use tracing::{debug, info, warn};
17
18use imessage_db::imessage::repository::MessageRepository;
19
20use crate::pollers::{self, PollerState, WatcherEvent};
21
22/// The iMessage listener watches for file system changes and polls the database.
23pub struct IMessageListener {
24    /// Path to chat.db
25    db_path: PathBuf,
26    /// Broadcast channel for events
27    event_tx: broadcast::Sender<WatcherEvent>,
28    /// Last poll time (Unix ms)
29    last_check: Arc<Mutex<i64>>,
30}
31
32impl IMessageListener {
33    /// Create a new listener.
34    ///
35    /// `db_path` should be `~/Library/Messages/chat.db`.
36    pub fn new(db_path: PathBuf) -> (Self, broadcast::Receiver<WatcherEvent>) {
37        let (tx, rx) = broadcast::channel(256);
38        let now_ms = current_unix_ms();
39
40        (
41            Self {
42                db_path,
43                event_tx: tx,
44                last_check: Arc::new(Mutex::new(now_ms)),
45            },
46            rx,
47        )
48    }
49
50    /// Subscribe to events.
51    pub fn subscribe(&self) -> broadcast::Receiver<WatcherEvent> {
52        self.event_tx.subscribe()
53    }
54
55    /// Start the file watcher. This runs until the returned handle is dropped.
56    ///
57    /// `repo` is the shared MessageRepository (behind a Mutex).
58    pub async fn start(&self, repo: Arc<Mutex<MessageRepository>>) -> anyhow::Result<()> {
59        let db_path = self.db_path.clone();
60        let wal_path = PathBuf::from(format!("{}-wal", db_path.display()));
61
62        info!(
63            "Starting iMessage listener on {} and {}",
64            db_path.display(),
65            wal_path.display()
66        );
67
68        // Initial poll (seed caches, don't emit)
69        let initial_poller_state = {
70            let last_check = *self.last_check.lock();
71            let after = last_check - 60_000; // 60s lookback for initial seed
72            let repo_lock = repo.lock();
73            let mut poller_state = PollerState::new();
74            let _ = pollers::poll_messages(&repo_lock, &mut poller_state, after);
75            let _ = pollers::poll_chat_reads(&repo_lock, &mut poller_state, after);
76            poller_state
77        };
78
79        let event_tx = self.event_tx.clone();
80        let last_check = self.last_check.clone();
81        let process_lock = Arc::new(Semaphore::new(1));
82
83        // Channel for fs events → debounce → poll
84        let (fs_tx, mut fs_rx) = tokio::sync::mpsc::channel::<()>(64);
85
86        // Start the notify watcher
87        let fs_tx_clone = fs_tx.clone();
88        let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
89            match res {
90                Ok(event) => {
91                    // Only care about data changes (modify events)
92                    if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
93                        let _ = fs_tx_clone.try_send(());
94                    }
95                }
96                Err(e) => {
97                    warn!("File watcher error: {e}");
98                }
99            }
100        })?;
101
102        // Watch both files
103        if db_path.exists() {
104            watcher.watch(&db_path, RecursiveMode::NonRecursive)?;
105        }
106        if wal_path.exists() {
107            watcher.watch(&wal_path, RecursiveMode::NonRecursive)?;
108        }
109
110        info!("iMessage listener started, watching for changes");
111
112        // Poller state lives across polls — seeded from initial poll to avoid re-emitting old messages
113        let poller_state = Arc::new(Mutex::new(initial_poller_state));
114
115        // Debounce + poll loop (with periodic fallback every 5s)
116        let mut fallback_interval = tokio::time::interval(Duration::from_secs(5));
117        fallback_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
118        // Skip the first immediate tick
119        fallback_interval.tick().await;
120
121        loop {
122            // Wait for an fs event OR the fallback timer
123            tokio::select! {
124                result = fs_rx.recv() => {
125                    if result.is_none() {
126                        break; // channel closed
127                    }
128                    // Debounce: wait 500ms, draining any additional events
129                    tokio::time::sleep(Duration::from_millis(500)).await;
130                    while fs_rx.try_recv().is_ok() {} // drain
131                }
132                _ = fallback_interval.tick() => {
133                    // Periodic fallback — drain any queued fs events too
134                    while fs_rx.try_recv().is_ok() {}
135                }
136            }
137
138            // Acquire the process lock
139            let _permit = process_lock.acquire().await.unwrap();
140
141            let now = current_unix_ms();
142            let after = {
143                let lc = last_check.lock();
144                let lookback = *lc - 30_000; // 30s lookback
145                // Cap to not exceed now and not be more than 24h old
146                let min_bound = now - 86_400_000;
147                lookback.max(min_bound).min(now)
148            };
149
150            debug!("Polling for changes since {} (now={})", after, now);
151
152            // Run pollers
153            let events = {
154                let repo_lock = repo.lock();
155                let mut ps = poller_state.lock();
156
157                let mut all_events = pollers::poll_messages(&repo_lock, &mut ps, after);
158                all_events.extend(pollers::poll_chat_reads(&repo_lock, &mut ps, after));
159                ps.trim_caches();
160                all_events
161            };
162
163            // Update last check time
164            *last_check.lock() = now;
165
166            // Emit events
167            for event in events {
168                debug!("Emitting event: {}", event.event_type);
169                let _ = event_tx.send(event);
170                // Small delay between events (10ms) to prevent flooding
171                tokio::time::sleep(Duration::from_millis(10)).await;
172            }
173
174            // Brief delay after releasing the lock
175            drop(_permit);
176            tokio::time::sleep(Duration::from_millis(100)).await;
177        }
178
179        Ok(())
180    }
181}
182
183fn current_unix_ms() -> i64 {
184    std::time::SystemTime::now()
185        .duration_since(std::time::UNIX_EPOCH)
186        .unwrap_or_default()
187        .as_millis() as i64
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193
194    #[test]
195    fn current_unix_ms_is_reasonable() {
196        let ms = current_unix_ms();
197        // Should be after Jan 1, 2024
198        assert!(ms > 1_704_067_200_000);
199    }
200
201    #[test]
202    fn listener_creation() {
203        let db_path = PathBuf::from("/tmp/test-chat.db");
204        let (listener, _rx) = IMessageListener::new(db_path.clone());
205        assert_eq!(listener.db_path, db_path);
206    }
207
208    #[test]
209    fn subscriber_receives_channel() {
210        let (listener, _rx) = IMessageListener::new(PathBuf::from("/tmp/test.db"));
211        let _sub = listener.subscribe();
212        // Just verify it doesn't panic
213    }
214}