1use std::collections::{HashMap, HashSet};
3use std::time::{Duration, Instant};
4
5use serde_json::{Value, json};
6
7use imessage_core::dates::unix_ms_to_apple;
8use imessage_core::events;
9use imessage_db::imessage::entities::Message;
10use imessage_db::imessage::repository::MessageRepository;
11use imessage_db::imessage::types::{SortOrder, UpdatedMessageQueryParams};
12use imessage_serializers::config::{AttachmentSerializerConfig, MessageSerializerConfig};
13use imessage_serializers::message::serialize_message;
14
15#[derive(Debug, Clone)]
17pub struct WatcherEvent {
18 pub event_type: String,
19 pub data: Value,
20}
21
22#[derive(Debug, Clone)]
24struct MessageState {
25 date_created: i64,
26 is_delivered: bool,
27 date_delivered: i64,
28 date_read: i64,
29 date_edited: i64,
30 date_retracted: i64,
31 did_notify_recipient: bool,
32 cache_time: Instant,
33}
34
35#[derive(Debug, Clone)]
37struct ChatState {
38 last_read_message_timestamp: i64,
39 cache_time: Instant,
40}
41
42pub struct PollerState {
44 seen_guids: HashMap<String, Instant>,
47 message_states: HashMap<String, MessageState>,
49 chat_states: HashMap<String, ChatState>,
51 cache_ttl: Duration,
53}
54
55impl PollerState {
56 pub fn new() -> Self {
57 Self {
58 seen_guids: HashMap::new(),
59 message_states: HashMap::new(),
60 chat_states: HashMap::new(),
61 cache_ttl: Duration::from_secs(3600),
62 }
63 }
64
65 pub fn trim_caches(&mut self) {
67 let now = Instant::now();
68 self.seen_guids
69 .retain(|_, ts| now.duration_since(*ts) < self.cache_ttl);
70 self.message_states
71 .retain(|_, v| now.duration_since(v.cache_time) < self.cache_ttl);
72 self.chat_states
73 .retain(|_, v| now.duration_since(v.cache_time) < self.cache_ttl);
74 }
75}
76
77impl Default for PollerState {
78 fn default() -> Self {
79 Self::new()
80 }
81}
82
83pub fn poll_messages(
88 repo: &MessageRepository,
89 state: &mut PollerState,
90 after_unix_ms: i64,
91) -> Vec<WatcherEvent> {
92 let mut events_out = Vec::new();
93
94 let messages = match repo.get_updated_messages(&UpdatedMessageQueryParams {
98 after: Some(after_unix_ms),
99 with_chats: true,
100 with_attachments: true,
101 include_created: true,
102 limit: 1000,
103 sort: SortOrder::Asc,
104 ..Default::default()
105 }) {
106 Ok(msgs) => {
107 tracing::debug!(
108 "poll_messages: query returned {} messages (after_unix_ms={})",
109 msgs.len(),
110 after_unix_ms
111 );
112 msgs
113 }
114 Err(e) => {
115 tracing::warn!("Failed to poll messages: {e}");
116 return events_out;
117 }
118 };
119
120 let mut group_change_rowids = HashSet::new();
122 for msg in &messages {
123 let item_type = msg.item_type;
124 if (1..=3).contains(&item_type) && is_text_empty(&msg.text) {
125 let key = format!("group-change-{}", msg.rowid);
126 if state.seen_guids.contains_key(&key) {
127 continue;
128 }
129 state.seen_guids.insert(key, Instant::now());
130 group_change_rowids.insert(msg.rowid);
131
132 let group_action = msg.group_action_type;
133 let event_type = match (item_type, group_action) {
134 (1, 0) => events::PARTICIPANT_ADDED,
135 (1, 1) => events::PARTICIPANT_REMOVED,
136 (2, _) => events::GROUP_NAME_CHANGE,
137 (3, 0) => events::PARTICIPANT_LEFT,
138 (3, 1) => events::GROUP_ICON_CHANGED,
139 (3, 2) => events::GROUP_ICON_REMOVED,
140 _ => continue,
141 };
142
143 let msg_config = MessageSerializerConfig {
144 load_chat_participants: true,
145 include_chats: true,
146 ..Default::default()
147 };
148 let att_config = AttachmentSerializerConfig::default();
149 let data = serialize_message(msg, &msg_config, &att_config, true);
150
151 events_out.push(WatcherEvent {
152 event_type: event_type.to_string(),
153 data,
154 });
155 }
156 }
157
158 let mut relevant_count = 0u32;
160 let mut new_count = 0u32;
161 let mut seen_count = 0u32;
162 let mut skipped_group = 0u32;
163 let mut skipped_irrelevant = 0u32;
164 for msg in &messages {
165 if group_change_rowids.contains(&msg.rowid) {
166 skipped_group += 1;
167 continue;
168 }
169
170 if !is_message_relevant(msg, after_unix_ms) {
172 skipped_irrelevant += 1;
173 continue;
174 }
175 relevant_count += 1;
176
177 let guid = &msg.guid;
178
179 if !state.seen_guids.contains_key(guid) {
181 new_count += 1;
182 state.seen_guids.insert(guid.clone(), Instant::now());
184 cache_message_state(&mut state.message_states, msg);
185
186 let msg_config = MessageSerializerConfig {
187 load_chat_participants: true,
188 include_chats: true,
189 ..Default::default()
190 };
191 let att_config = AttachmentSerializerConfig::default();
192 let data = serialize_message(msg, &msg_config, &att_config, true);
193
194 events_out.push(WatcherEvent {
195 event_type: events::NEW_MESSAGE.to_string(),
196 data,
197 });
198 } else {
199 seen_count += 1;
200 if let Some(prev) = state.message_states.get(guid) {
202 if is_message_updated(msg, prev) {
203 cache_message_state(&mut state.message_states, msg);
204
205 let msg_config = MessageSerializerConfig {
206 load_chat_participants: false,
207 include_chats: false,
208 ..Default::default()
209 };
210 let att_config = AttachmentSerializerConfig::default();
211 let data = serialize_message(msg, &msg_config, &att_config, true);
212
213 events_out.push(WatcherEvent {
214 event_type: events::MESSAGE_UPDATED.to_string(),
215 data,
216 });
217 }
218 } else {
219 cache_message_state(&mut state.message_states, msg);
221 }
222 }
223 }
224
225 tracing::debug!(
226 "poll_messages: total={}, skipped_group={skipped_group}, skipped_irrelevant={skipped_irrelevant}, \
227 relevant={relevant_count}, new={new_count}, seen={seen_count}, events={}",
228 messages.len(),
229 events_out.len()
230 );
231
232 events_out
233}
234
235pub fn poll_chat_reads(
237 repo: &MessageRepository,
238 state: &mut PollerState,
239 after_unix_ms: i64,
240) -> Vec<WatcherEvent> {
241 let mut events_out = Vec::new();
242
243 let after_apple = unix_ms_to_apple(after_unix_ms);
244 let chats = match repo.get_chats_read_since(after_apple) {
245 Ok(c) => c,
246 Err(e) => {
247 tracing::warn!("Failed to poll chat reads: {e}");
248 return events_out;
249 }
250 };
251
252 for chat in &chats {
253 let ts = chat.last_read_message_timestamp.unwrap_or(0);
254 let guid = &chat.guid;
255
256 let should_emit = match state.chat_states.get(guid) {
257 Some(prev) => ts > prev.last_read_message_timestamp,
258 None => true,
259 };
260
261 if should_emit {
262 state.chat_states.insert(
263 guid.clone(),
264 ChatState {
265 last_read_message_timestamp: ts,
266 cache_time: Instant::now(),
267 },
268 );
269
270 events_out.push(WatcherEvent {
271 event_type: events::CHAT_READ_STATUS_CHANGED.to_string(),
272 data: json!({
273 "chatGuid": guid,
274 "read": true,
275 }),
276 });
277 }
278 }
279
280 events_out
281}
282
283fn is_text_empty(text: &Option<String>) -> bool {
288 text.as_ref().is_none_or(|t| t.trim().is_empty())
289}
290
291fn is_message_relevant(msg: &Message, after_unix_ms: i64) -> bool {
296 let check_date =
297 |val: Option<i64>| -> bool { val.is_some_and(|ms| ms > 0 && ms >= after_unix_ms) };
298
299 check_date(msg.date)
300 || check_date(msg.date_delivered)
301 || check_date(msg.date_read)
302 || check_date(msg.date_edited)
303 || check_date(msg.date_retracted)
304}
305
306fn is_message_updated(msg: &Message, prev: &MessageState) -> bool {
310 let date_created = msg.date.unwrap_or(0);
311 if date_created > prev.date_created {
312 return true;
313 }
314
315 let date_delivered = msg.date_delivered.unwrap_or(0);
316 if date_delivered > prev.date_delivered {
317 return true;
318 }
319
320 if msg.is_delivered != prev.is_delivered {
321 return true;
322 }
323
324 let date_read = msg.date_read.unwrap_or(0);
325 if date_read > prev.date_read {
326 return true;
327 }
328
329 let date_edited = msg.date_edited.unwrap_or(0);
330 if date_edited > prev.date_edited {
331 return true;
332 }
333
334 let date_retracted = msg.date_retracted.unwrap_or(0);
335 if date_retracted > prev.date_retracted {
336 return true;
337 }
338
339 let did_notify = msg.did_notify_recipient.unwrap_or(false);
340 if did_notify != prev.did_notify_recipient {
341 return true;
342 }
343
344 false
345}
346
347fn cache_message_state(states: &mut HashMap<String, MessageState>, msg: &Message) {
351 states.insert(
352 msg.guid.clone(),
353 MessageState {
354 date_created: msg.date.unwrap_or(0),
355 is_delivered: msg.is_delivered,
356 date_delivered: msg.date_delivered.unwrap_or(0),
357 date_read: msg.date_read.unwrap_or(0),
358 date_edited: msg.date_edited.unwrap_or(0),
359 date_retracted: msg.date_retracted.unwrap_or(0),
360 did_notify_recipient: msg.did_notify_recipient.unwrap_or(false),
361 cache_time: Instant::now(),
362 },
363 );
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369
370 #[test]
371 fn new_poller_state_is_empty() {
372 let state = PollerState::new();
373 assert!(state.seen_guids.is_empty());
374 assert!(state.message_states.is_empty());
375 assert!(state.chat_states.is_empty());
376 }
377
378 #[test]
379 fn is_text_empty_checks() {
380 assert!(is_text_empty(&None));
381 assert!(is_text_empty(&Some("".to_string())));
382 assert!(is_text_empty(&Some(" ".to_string())));
383 assert!(!is_text_empty(&Some("hello".to_string())));
384 }
385
386 #[test]
387 fn group_change_mapping() {
388 assert_eq!(
390 match (1_i64, 0_i64) {
391 (1, 0) => events::PARTICIPANT_ADDED,
392 (1, 1) => events::PARTICIPANT_REMOVED,
393 (2, _) => events::GROUP_NAME_CHANGE,
394 (3, 0) => events::PARTICIPANT_LEFT,
395 (3, 1) => events::GROUP_ICON_CHANGED,
396 (3, 2) => events::GROUP_ICON_REMOVED,
397 _ => "",
398 },
399 "participant-added"
400 );
401 }
402
403 #[test]
404 fn cache_trim_removes_old_entries() {
405 let mut state = PollerState::new();
406 state.cache_ttl = Duration::from_millis(0); state.chat_states.insert(
409 "test".to_string(),
410 ChatState {
411 last_read_message_timestamp: 100,
412 cache_time: Instant::now() - Duration::from_secs(1),
413 },
414 );
415
416 state.trim_caches();
417 assert!(state.chat_states.is_empty());
418 }
419}