Skip to main content

imessage_watcher/
pollers.rs

1/// Database pollers that detect new/updated messages and chat read-status changes.
2use std::collections::{HashMap, HashSet};
3use std::time::{Duration, Instant};
4
5use serde_json::{Value, json};
6
7use imessage_core::dates::unix_ms_to_apple;
8use imessage_core::events;
9use imessage_db::imessage::entities::Message;
10use imessage_db::imessage::repository::MessageRepository;
11use imessage_db::imessage::types::{SortOrder, UpdatedMessageQueryParams};
12use imessage_serializers::config::{AttachmentSerializerConfig, MessageSerializerConfig};
13use imessage_serializers::message::serialize_message;
14
15/// An event emitted by the pollers.
16#[derive(Debug, Clone)]
17pub struct WatcherEvent {
18    pub event_type: String,
19    pub data: Value,
20}
21
22/// Cached state for a message (to detect updates).
23#[derive(Debug, Clone)]
24struct MessageState {
25    date_created: i64,
26    is_delivered: bool,
27    date_delivered: i64,
28    date_read: i64,
29    date_edited: i64,
30    date_retracted: i64,
31    did_notify_recipient: bool,
32    cache_time: Instant,
33}
34
35/// Cached state for a chat (to detect read-status changes).
36#[derive(Debug, Clone)]
37struct ChatState {
38    last_read_message_timestamp: i64,
39    cache_time: Instant,
40}
41
42/// Combined poller state: tracks seen messages, message states, chat states.
43pub struct PollerState {
44    /// GUIDs of messages we've seen (for new vs. update detection).
45    /// Values are insertion timestamps for time-bounded eviction.
46    seen_guids: HashMap<String, Instant>,
47    /// Cached message states for update detection.
48    message_states: HashMap<String, MessageState>,
49    /// Cached chat states for read-status detection.
50    chat_states: HashMap<String, ChatState>,
51    /// Cache TTL (1 hour).
52    cache_ttl: Duration,
53}
54
55impl PollerState {
56    pub fn new() -> Self {
57        Self {
58            seen_guids: HashMap::new(),
59            message_states: HashMap::new(),
60            chat_states: HashMap::new(),
61            cache_ttl: Duration::from_secs(3600),
62        }
63    }
64
65    /// Trim all caches, removing entries older than cache_ttl.
66    pub fn trim_caches(&mut self) {
67        let now = Instant::now();
68        self.seen_guids
69            .retain(|_, ts| now.duration_since(*ts) < self.cache_ttl);
70        self.message_states
71            .retain(|_, v| now.duration_since(v.cache_time) < self.cache_ttl);
72        self.chat_states
73            .retain(|_, v| now.duration_since(v.cache_time) < self.cache_ttl);
74    }
75}
76
77impl Default for PollerState {
78    fn default() -> Self {
79        Self::new()
80    }
81}
82
83/// Poll for new and updated messages.
84///
85/// `after_unix_ms` is the Unix-ms timestamp to look back from (typically lastCheck - 30s).
86/// Returns a list of events to emit.
87pub fn poll_messages(
88    repo: &MessageRepository,
89    state: &mut PollerState,
90    after_unix_ms: i64,
91) -> Vec<WatcherEvent> {
92    let mut events_out = Vec::new();
93
94    // Query messages updated since `after_unix_ms`.
95    // The SQL has OR clauses for date, date_delivered, date_read, date_edited, date_retracted,
96    // so messages whose creation date is older but were recently delivered/read/edited are caught.
97    let messages = match repo.get_updated_messages(&UpdatedMessageQueryParams {
98        after: Some(after_unix_ms),
99        with_chats: true,
100        with_attachments: true,
101        include_created: true,
102        limit: 1000,
103        sort: SortOrder::Asc,
104        ..Default::default()
105    }) {
106        Ok(msgs) => {
107            tracing::debug!(
108                "poll_messages: query returned {} messages (after_unix_ms={})",
109                msgs.len(),
110                after_unix_ms
111            );
112            msgs
113        }
114        Err(e) => {
115            tracing::warn!("Failed to poll messages: {e}");
116            return events_out;
117        }
118    };
119
120    // Process group changes first (messages with itemType 1-3 and empty text)
121    let mut group_change_rowids = HashSet::new();
122    for msg in &messages {
123        let item_type = msg.item_type;
124        if (1..=3).contains(&item_type) && is_text_empty(&msg.text) {
125            let key = format!("group-change-{}", msg.rowid);
126            if state.seen_guids.contains_key(&key) {
127                continue;
128            }
129            state.seen_guids.insert(key, Instant::now());
130            group_change_rowids.insert(msg.rowid);
131
132            let group_action = msg.group_action_type;
133            let event_type = match (item_type, group_action) {
134                (1, 0) => events::PARTICIPANT_ADDED,
135                (1, 1) => events::PARTICIPANT_REMOVED,
136                (2, _) => events::GROUP_NAME_CHANGE,
137                (3, 0) => events::PARTICIPANT_LEFT,
138                (3, 1) => events::GROUP_ICON_CHANGED,
139                (3, 2) => events::GROUP_ICON_REMOVED,
140                _ => continue,
141            };
142
143            let msg_config = MessageSerializerConfig {
144                load_chat_participants: true,
145                include_chats: true,
146                ..Default::default()
147            };
148            let att_config = AttachmentSerializerConfig::default();
149            let data = serialize_message(msg, &msg_config, &att_config, true);
150
151            events_out.push(WatcherEvent {
152                event_type: event_type.to_string(),
153                data,
154            });
155        }
156    }
157
158    // Now process normal messages (filter by date fields >= after_unix_ms)
159    let mut relevant_count = 0u32;
160    let mut new_count = 0u32;
161    let mut seen_count = 0u32;
162    let mut skipped_group = 0u32;
163    let mut skipped_irrelevant = 0u32;
164    for msg in &messages {
165        if group_change_rowids.contains(&msg.rowid) {
166            skipped_group += 1;
167            continue;
168        }
169
170        // Check if any relevant date field is >= after_unix_ms
171        if !is_message_relevant(msg, after_unix_ms) {
172            skipped_irrelevant += 1;
173            continue;
174        }
175        relevant_count += 1;
176
177        let guid = &msg.guid;
178
179        // Determine if this is a new or updated message
180        if !state.seen_guids.contains_key(guid) {
181            new_count += 1;
182            // New message
183            state.seen_guids.insert(guid.clone(), Instant::now());
184            cache_message_state(&mut state.message_states, msg);
185
186            let msg_config = MessageSerializerConfig {
187                load_chat_participants: true,
188                include_chats: true,
189                ..Default::default()
190            };
191            let att_config = AttachmentSerializerConfig::default();
192            let data = serialize_message(msg, &msg_config, &att_config, true);
193
194            events_out.push(WatcherEvent {
195                event_type: events::NEW_MESSAGE.to_string(),
196                data,
197            });
198        } else {
199            seen_count += 1;
200            // Check if it's actually updated
201            if let Some(prev) = state.message_states.get(guid) {
202                if is_message_updated(msg, prev) {
203                    cache_message_state(&mut state.message_states, msg);
204
205                    let msg_config = MessageSerializerConfig {
206                        load_chat_participants: false,
207                        include_chats: false,
208                        ..Default::default()
209                    };
210                    let att_config = AttachmentSerializerConfig::default();
211                    let data = serialize_message(msg, &msg_config, &att_config, true);
212
213                    events_out.push(WatcherEvent {
214                        event_type: events::MESSAGE_UPDATED.to_string(),
215                        data,
216                    });
217                }
218            } else {
219                // No cached state — cache it now
220                cache_message_state(&mut state.message_states, msg);
221            }
222        }
223    }
224
225    tracing::debug!(
226        "poll_messages: total={}, skipped_group={skipped_group}, skipped_irrelevant={skipped_irrelevant}, \
227         relevant={relevant_count}, new={new_count}, seen={seen_count}, events={}",
228        messages.len(),
229        events_out.len()
230    );
231
232    events_out
233}
234
235/// Poll for chat read-status changes.
236pub fn poll_chat_reads(
237    repo: &MessageRepository,
238    state: &mut PollerState,
239    after_unix_ms: i64,
240) -> Vec<WatcherEvent> {
241    let mut events_out = Vec::new();
242
243    let after_apple = unix_ms_to_apple(after_unix_ms);
244    let chats = match repo.get_chats_read_since(after_apple) {
245        Ok(c) => c,
246        Err(e) => {
247            tracing::warn!("Failed to poll chat reads: {e}");
248            return events_out;
249        }
250    };
251
252    for chat in &chats {
253        let ts = chat.last_read_message_timestamp.unwrap_or(0);
254        let guid = &chat.guid;
255
256        let should_emit = match state.chat_states.get(guid) {
257            Some(prev) => ts > prev.last_read_message_timestamp,
258            None => true,
259        };
260
261        if should_emit {
262            state.chat_states.insert(
263                guid.clone(),
264                ChatState {
265                    last_read_message_timestamp: ts,
266                    cache_time: Instant::now(),
267                },
268            );
269
270            events_out.push(WatcherEvent {
271                event_type: events::CHAT_READ_STATUS_CHANGED.to_string(),
272                data: json!({
273                    "chatGuid": guid,
274                    "read": true,
275                }),
276            });
277        }
278    }
279
280    events_out
281}
282
283// ---------------------------------------------------------------------------
284// Helpers
285// ---------------------------------------------------------------------------
286
287fn is_text_empty(text: &Option<String>) -> bool {
288    text.as_ref().is_none_or(|t| t.trim().is_empty())
289}
290
291/// Check if a message has any relevant date field >= the given threshold.
292///
293/// Note: Message date fields are already in Unix ms (converted by the row mapper),
294/// so we compare directly without calling `apple_to_unix_ms`.
295fn is_message_relevant(msg: &Message, after_unix_ms: i64) -> bool {
296    let check_date =
297        |val: Option<i64>| -> bool { val.is_some_and(|ms| ms > 0 && ms >= after_unix_ms) };
298
299    check_date(msg.date)
300        || check_date(msg.date_delivered)
301        || check_date(msg.date_read)
302        || check_date(msg.date_edited)
303        || check_date(msg.date_retracted)
304}
305
306/// Check if a message has changed compared to its cached state.
307///
308/// Note: Message date fields are already in Unix ms (converted by the row mapper).
309fn is_message_updated(msg: &Message, prev: &MessageState) -> bool {
310    let date_created = msg.date.unwrap_or(0);
311    if date_created > prev.date_created {
312        return true;
313    }
314
315    let date_delivered = msg.date_delivered.unwrap_or(0);
316    if date_delivered > prev.date_delivered {
317        return true;
318    }
319
320    if msg.is_delivered != prev.is_delivered {
321        return true;
322    }
323
324    let date_read = msg.date_read.unwrap_or(0);
325    if date_read > prev.date_read {
326        return true;
327    }
328
329    let date_edited = msg.date_edited.unwrap_or(0);
330    if date_edited > prev.date_edited {
331        return true;
332    }
333
334    let date_retracted = msg.date_retracted.unwrap_or(0);
335    if date_retracted > prev.date_retracted {
336        return true;
337    }
338
339    let did_notify = msg.did_notify_recipient.unwrap_or(false);
340    if did_notify != prev.did_notify_recipient {
341        return true;
342    }
343
344    false
345}
346
347/// Save the current state of a message to the cache.
348///
349/// Note: Message date fields are already in Unix ms (converted by the row mapper).
350fn cache_message_state(states: &mut HashMap<String, MessageState>, msg: &Message) {
351    states.insert(
352        msg.guid.clone(),
353        MessageState {
354            date_created: msg.date.unwrap_or(0),
355            is_delivered: msg.is_delivered,
356            date_delivered: msg.date_delivered.unwrap_or(0),
357            date_read: msg.date_read.unwrap_or(0),
358            date_edited: msg.date_edited.unwrap_or(0),
359            date_retracted: msg.date_retracted.unwrap_or(0),
360            did_notify_recipient: msg.did_notify_recipient.unwrap_or(false),
361            cache_time: Instant::now(),
362        },
363    );
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369
370    #[test]
371    fn new_poller_state_is_empty() {
372        let state = PollerState::new();
373        assert!(state.seen_guids.is_empty());
374        assert!(state.message_states.is_empty());
375        assert!(state.chat_states.is_empty());
376    }
377
378    #[test]
379    fn is_text_empty_checks() {
380        assert!(is_text_empty(&None));
381        assert!(is_text_empty(&Some("".to_string())));
382        assert!(is_text_empty(&Some("  ".to_string())));
383        assert!(!is_text_empty(&Some("hello".to_string())));
384    }
385
386    #[test]
387    fn group_change_mapping() {
388        // itemType=1, groupActionType=0 => participant-added
389        assert_eq!(
390            match (1_i64, 0_i64) {
391                (1, 0) => events::PARTICIPANT_ADDED,
392                (1, 1) => events::PARTICIPANT_REMOVED,
393                (2, _) => events::GROUP_NAME_CHANGE,
394                (3, 0) => events::PARTICIPANT_LEFT,
395                (3, 1) => events::GROUP_ICON_CHANGED,
396                (3, 2) => events::GROUP_ICON_REMOVED,
397                _ => "",
398            },
399            "participant-added"
400        );
401    }
402
403    #[test]
404    fn cache_trim_removes_old_entries() {
405        let mut state = PollerState::new();
406        state.cache_ttl = Duration::from_millis(0); // expire immediately
407
408        state.chat_states.insert(
409            "test".to_string(),
410            ChatState {
411                last_read_message_timestamp: 100,
412                cache_time: Instant::now() - Duration::from_secs(1),
413            },
414        );
415
416        state.trim_caches();
417        assert!(state.chat_states.is_empty());
418    }
419}