imessage_watcher/
listener.rs1use parking_lot::Mutex;
2use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use notify::{Event, EventKind, RecursiveMode, Watcher};
15use tokio::sync::{Semaphore, broadcast};
16use tracing::{debug, info, warn};
17
18use imessage_db::imessage::repository::MessageRepository;
19
20use crate::pollers::{self, PollerState, WatcherEvent};
21
22pub struct IMessageListener {
24 db_path: PathBuf,
26 event_tx: broadcast::Sender<WatcherEvent>,
28 last_check: Arc<Mutex<i64>>,
30}
31
32impl IMessageListener {
33 pub fn new(db_path: PathBuf) -> (Self, broadcast::Receiver<WatcherEvent>) {
37 let (tx, rx) = broadcast::channel(256);
38 let now_ms = current_unix_ms();
39
40 (
41 Self {
42 db_path,
43 event_tx: tx,
44 last_check: Arc::new(Mutex::new(now_ms)),
45 },
46 rx,
47 )
48 }
49
50 pub fn subscribe(&self) -> broadcast::Receiver<WatcherEvent> {
52 self.event_tx.subscribe()
53 }
54
55 pub async fn start(&self, repo: Arc<Mutex<MessageRepository>>) -> anyhow::Result<()> {
59 let db_path = self.db_path.clone();
60 let wal_path = PathBuf::from(format!("{}-wal", db_path.display()));
61
62 info!(
63 "Starting iMessage listener on {} and {}",
64 db_path.display(),
65 wal_path.display()
66 );
67
68 let initial_poller_state = {
70 let last_check = *self.last_check.lock();
71 let after = last_check - 60_000; let repo_lock = repo.lock();
73 let mut poller_state = PollerState::new();
74 let _ = pollers::poll_messages(&repo_lock, &mut poller_state, after);
75 let _ = pollers::poll_chat_reads(&repo_lock, &mut poller_state, after);
76 poller_state
77 };
78
79 let event_tx = self.event_tx.clone();
80 let last_check = self.last_check.clone();
81 let process_lock = Arc::new(Semaphore::new(1));
82
83 let (fs_tx, mut fs_rx) = tokio::sync::mpsc::channel::<()>(64);
85
86 let fs_tx_clone = fs_tx.clone();
88 let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
89 match res {
90 Ok(event) => {
91 if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
93 let _ = fs_tx_clone.try_send(());
94 }
95 }
96 Err(e) => {
97 warn!("File watcher error: {e}");
98 }
99 }
100 })?;
101
102 if db_path.exists() {
104 watcher.watch(&db_path, RecursiveMode::NonRecursive)?;
105 }
106 if wal_path.exists() {
107 watcher.watch(&wal_path, RecursiveMode::NonRecursive)?;
108 }
109
110 info!("iMessage listener started, watching for changes");
111
112 let poller_state = Arc::new(Mutex::new(initial_poller_state));
114
115 let mut fallback_interval = tokio::time::interval(Duration::from_secs(5));
117 fallback_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
118 fallback_interval.tick().await;
120
121 loop {
122 tokio::select! {
124 result = fs_rx.recv() => {
125 if result.is_none() {
126 break; }
128 tokio::time::sleep(Duration::from_millis(500)).await;
130 while fs_rx.try_recv().is_ok() {} }
132 _ = fallback_interval.tick() => {
133 while fs_rx.try_recv().is_ok() {}
135 }
136 }
137
138 let _permit = process_lock.acquire().await.unwrap();
140
141 let now = current_unix_ms();
142 let after = {
143 let lc = last_check.lock();
144 let lookback = *lc - 30_000; let min_bound = now - 86_400_000;
147 lookback.max(min_bound).min(now)
148 };
149
150 debug!("Polling for changes since {} (now={})", after, now);
151
152 let events = {
154 let repo_lock = repo.lock();
155 let mut ps = poller_state.lock();
156
157 let mut all_events = pollers::poll_messages(&repo_lock, &mut ps, after);
158 all_events.extend(pollers::poll_chat_reads(&repo_lock, &mut ps, after));
159 ps.trim_caches();
160 all_events
161 };
162
163 *last_check.lock() = now;
165
166 for event in events {
168 debug!("Emitting event: {}", event.event_type);
169 let _ = event_tx.send(event);
170 tokio::time::sleep(Duration::from_millis(10)).await;
172 }
173
174 drop(_permit);
176 tokio::time::sleep(Duration::from_millis(100)).await;
177 }
178
179 Ok(())
180 }
181}
182
183fn current_unix_ms() -> i64 {
184 std::time::SystemTime::now()
185 .duration_since(std::time::UNIX_EPOCH)
186 .unwrap_or_default()
187 .as_millis() as i64
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 #[test]
195 fn current_unix_ms_is_reasonable() {
196 let ms = current_unix_ms();
197 assert!(ms > 1_704_067_200_000);
199 }
200
201 #[test]
202 fn listener_creation() {
203 let db_path = PathBuf::from("/tmp/test-chat.db");
204 let (listener, _rx) = IMessageListener::new(db_path.clone());
205 assert_eq!(listener.db_path, db_path);
206 }
207
208 #[test]
209 fn subscriber_receives_channel() {
210 let (listener, _rx) = IMessageListener::new(PathBuf::from("/tmp/test.db"));
211 let _sub = listener.subscribe();
212 }
214}