1use core::{
62 mem,
63 num::{NonZeroU32, NonZeroU64},
64 sync::atomic::{AtomicBool, Ordering},
65};
66
67use alloc::{
68 collections::{BTreeMap, VecDeque},
69 string::String,
70 sync::Arc,
71 vec,
72 vec::Vec,
73};
74
75use imap_codec::{
76 fragmentizer::Fragmentizer,
77 imap_types::{
78 command::SelectParameter,
79 core::{Atom, Vec1},
80 extensions::enable::CapabilityEnable,
81 fetch::{MacroOrMessageDataItemNames, MessageDataItem, MessageDataItemName},
82 flag::{Flag, FlagFetch},
83 mailbox::Mailbox,
84 response::Capability,
85 sequence::SequenceSet,
86 },
87};
88use log::trace;
89use thiserror::Error;
90
91use crate::{
92 coroutine::*,
93 rfc2177::idle::{ImapIdle, ImapIdleError, ImapIdleOptions, ImapIdleYield},
94 rfc3501::{
95 fetch::{ImapMessageFetch, ImapMessageFetchError, ImapMessageFetchOptions},
96 select::{ImapMailboxSelect, ImapMailboxSelectError, ImapMailboxSelectOptions, SelectData},
97 },
98 rfc5161::enable::{ImapExtensionEnable, ImapExtensionEnableError},
99};
100
101#[derive(Clone, Debug)]
104pub enum ImapMailboxWatchEvent {
105 EnvelopeAdded {
106 uid: NonZeroU32,
107 items: Vec<MessageDataItem<'static>>,
108 },
109 FlagsAdded {
110 uid: NonZeroU32,
111 flags: Vec<Flag<'static>>,
112 },
113 FlagsRemoved {
114 uid: NonZeroU32,
115 flags: Vec<Flag<'static>>,
116 },
117 EnvelopeRemoved {
118 uid: NonZeroU32,
119 },
120}
121
122#[derive(Debug, Error)]
124pub enum ImapMailboxWatchError {
125 #[error("IMAP server does not advertise QRESYNC")]
126 QresyncUnsupported,
127 #[error("IMAP server did not return UIDVALIDITY in SELECT response")]
128 MissingUidValidity,
129 #[error("IMAP server did not return HIGHESTMODSEQ in SELECT response")]
130 MissingHighestModSeq,
131 #[error("Invalid `1:*` sequence set: {0}")]
132 InvalidSequenceSet(String),
133 #[error("IMAP SELECT error")]
134 Select(#[from] ImapMailboxSelectError),
135 #[error("IMAP FETCH error")]
136 Fetch(#[from] ImapMessageFetchError),
137 #[error("IMAP IDLE error")]
138 Idle(#[from] ImapIdleError),
139 #[error("IMAP ENABLE error")]
140 Enable(#[from] ImapExtensionEnableError),
141}
142
143#[derive(Debug)]
145pub enum ImapMailboxWatchYield {
146 WantsRead,
147 WantsWrite(Vec<u8>),
148 Event(ImapMailboxWatchEvent),
149}
150
151enum State {
152 EnableQresync(ImapExtensionEnable),
153 SelectInitial(ImapMailboxSelect),
154 FetchBaseline(ImapMessageFetch),
155 BeginIdle,
156 Idle(ImapIdle),
157 SelectQresync(ImapMailboxSelect),
158 EmitDeltas,
159 Terminal,
160}
161
162pub struct ImapMailboxWatch {
164 state: State,
165 shutdown: Arc<AtomicBool>,
166 idle_done: Arc<AtomicBool>,
167 idle_saw_data: bool,
168 mailbox: Mailbox<'static>,
169 uid_validity: Option<NonZeroU32>,
170 highest_mod_seq: u64,
171 shadow: BTreeMap<NonZeroU32, Vec<Flag<'static>>>,
172 pending: VecDeque<ImapMailboxWatchEvent>,
173}
174
175impl ImapMailboxWatch {
176 pub fn new(
178 capability: &[Capability<'static>],
179 mailbox: Mailbox<'static>,
180 shutdown: Arc<AtomicBool>,
181 ) -> Result<Self, ImapMailboxWatchError> {
182 if !capability.contains(&Capability::QResync) {
183 return Err(ImapMailboxWatchError::QresyncUnsupported);
184 }
185
186 let condstore = CapabilityEnable::CondStore;
189 let qresync = CapabilityEnable::from(
191 Atom::try_from("QRESYNC").expect("`QRESYNC` is a syntactically valid IMAP atom"),
192 );
193 let capabilities =
194 Vec1::try_from(vec![condstore, qresync]).expect("two capabilities is non-empty");
195 let enable = ImapExtensionEnable::new(capabilities);
196
197 Ok(Self {
198 state: State::EnableQresync(enable),
199 shutdown,
200 idle_done: Arc::new(AtomicBool::new(false)),
201 idle_saw_data: false,
202 mailbox,
203 uid_validity: None,
204 highest_mod_seq: 0,
205 shadow: BTreeMap::new(),
206 pending: VecDeque::new(),
207 })
208 }
209
210 fn compute_deltas(&mut self, data: &SelectData) {
211 for uid in &data.vanished_earlier {
212 if self.shadow.remove(uid).is_some() {
213 self.pending
214 .push_back(ImapMailboxWatchEvent::EnvelopeRemoved { uid: *uid });
215 }
216 }
217
218 for fetch in &data.changed {
219 let items_vec: Vec<MessageDataItem<'static>> =
220 fetch.items.clone().into_inner().into_iter().collect();
221 let (uid_opt, new_flags) = extract_uid_flags(&items_vec);
222 let Some(uid) = uid_opt else {
223 continue;
224 };
225
226 match self.shadow.get(&uid).cloned() {
227 None => {
228 self.shadow.insert(uid, new_flags);
229 self.pending
230 .push_back(ImapMailboxWatchEvent::EnvelopeAdded {
231 uid,
232 items: items_vec,
233 });
234 }
235 Some(old_flags) => {
236 let added: Vec<Flag<'static>> = new_flags
237 .iter()
238 .filter(|f| !old_flags.contains(f))
239 .cloned()
240 .collect();
241 let removed: Vec<Flag<'static>> = old_flags
242 .iter()
243 .filter(|f| !new_flags.contains(f))
244 .cloned()
245 .collect();
246 self.shadow.insert(uid, new_flags);
247 if !added.is_empty() {
248 self.pending
249 .push_back(ImapMailboxWatchEvent::FlagsAdded { uid, flags: added });
250 }
251 if !removed.is_empty() {
252 self.pending.push_back(ImapMailboxWatchEvent::FlagsRemoved {
253 uid,
254 flags: removed,
255 });
256 }
257 }
258 }
259 }
260 }
261}
262
263impl ImapCoroutine for ImapMailboxWatch {
264 type Yield = ImapMailboxWatchYield;
265 type Return = Result<(), ImapMailboxWatchError>;
266
267 fn resume(
268 &mut self,
269 fragmentizer: &mut Fragmentizer,
270 mut arg: Option<&[u8]>,
271 ) -> ImapCoroutineState<Self::Yield, Self::Return> {
272 if self.shutdown.load(Ordering::SeqCst) {
273 self.idle_done.store(true, Ordering::SeqCst);
274 }
275
276 loop {
277 let state = mem::replace(&mut self.state, State::Terminal);
278
279 match state {
280 State::EnableQresync(mut enable) => match enable.resume(fragmentizer, arg.take()) {
281 ImapCoroutineState::Yielded(ImapYield::WantsRead) => {
282 self.state = State::EnableQresync(enable);
283 return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsRead);
284 }
285 ImapCoroutineState::Yielded(ImapYield::WantsWrite(bytes)) => {
286 self.state = State::EnableQresync(enable);
287 return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsWrite(
288 bytes,
289 ));
290 }
291 ImapCoroutineState::Complete(Ok(enabled)) => {
292 trace!("watch: ENABLE OK ({enabled:?})");
293 let parameters = vec![SelectParameter::CondStore];
294 let select = ImapMailboxSelect::new(
295 self.mailbox.clone(),
296 ImapMailboxSelectOptions { parameters },
297 );
298 self.state = State::SelectInitial(select);
299 }
300 ImapCoroutineState::Complete(Err(err)) => {
301 return ImapCoroutineState::Complete(Err(err.into()));
302 }
303 },
304
305 State::SelectInitial(mut select) => match select.resume(fragmentizer, arg.take()) {
306 ImapCoroutineState::Yielded(ImapYield::WantsRead) => {
307 self.state = State::SelectInitial(select);
308 return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsRead);
309 }
310 ImapCoroutineState::Yielded(ImapYield::WantsWrite(bytes)) => {
311 self.state = State::SelectInitial(select);
312 return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsWrite(
313 bytes,
314 ));
315 }
316 ImapCoroutineState::Complete(Ok(data)) => {
317 let Some(uid_validity) = data.uid_validity else {
318 return ImapCoroutineState::Complete(Err(
319 ImapMailboxWatchError::MissingUidValidity,
320 ));
321 };
322 let Some(highest_mod_seq) = data.highest_mod_seq else {
323 return ImapCoroutineState::Complete(Err(
324 ImapMailboxWatchError::MissingHighestModSeq,
325 ));
326 };
327
328 self.uid_validity = Some(uid_validity);
329 self.highest_mod_seq = highest_mod_seq;
330 trace!(
331 "watch: SELECT OK uidvalidity={} highestmodseq={}",
332 uid_validity.get(),
333 highest_mod_seq,
334 );
335
336 let sequence_set: SequenceSet = match "1:*".try_into() {
337 Ok(s) => s,
338 Err(_) => {
339 return ImapCoroutineState::Complete(Err(
340 ImapMailboxWatchError::InvalidSequenceSet("1:*".into()),
341 ));
342 }
343 };
344 let item_names = MacroOrMessageDataItemNames::MessageDataItemNames(vec![
345 MessageDataItemName::Uid,
346 MessageDataItemName::Flags,
347 ]);
348 let fetch = ImapMessageFetch::new(
349 sequence_set,
350 item_names,
351 ImapMessageFetchOptions::default(),
352 );
353 self.state = State::FetchBaseline(fetch);
354 }
355 ImapCoroutineState::Complete(Err(err)) => {
356 return ImapCoroutineState::Complete(Err(err.into()));
357 }
358 },
359
360 State::FetchBaseline(mut fetch) => match fetch.resume(fragmentizer, arg.take()) {
361 ImapCoroutineState::Yielded(ImapYield::WantsRead) => {
362 self.state = State::FetchBaseline(fetch);
363 return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsRead);
364 }
365 ImapCoroutineState::Yielded(ImapYield::WantsWrite(bytes)) => {
366 self.state = State::FetchBaseline(fetch);
367 return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsWrite(
368 bytes,
369 ));
370 }
371 ImapCoroutineState::Complete(Ok(data)) => {
372 for (_seq, items) in data {
373 let items_vec = items.into_inner();
374 if let (Some(uid), flags) = extract_uid_flags(&items_vec) {
375 self.shadow.insert(uid, flags);
376 }
377 }
378 trace!(
379 "watch: baseline shadow seeded with {} uids",
380 self.shadow.len(),
381 );
382 self.state = State::BeginIdle;
383 }
384 ImapCoroutineState::Complete(Err(err)) => {
385 return ImapCoroutineState::Complete(Err(err.into()));
386 }
387 },
388
389 State::BeginIdle => {
390 if self.shutdown.load(Ordering::SeqCst) {
391 return ImapCoroutineState::Complete(Ok(()));
392 }
393
394 self.idle_done.store(false, Ordering::SeqCst);
395 self.idle_saw_data = false;
396 let idle = ImapIdle::new(self.idle_done.clone(), ImapIdleOptions::default());
397 self.state = State::Idle(idle);
398 }
399
400 State::Idle(mut idle) => match idle.resume(fragmentizer, arg.take()) {
401 ImapCoroutineState::Yielded(ImapIdleYield::Event(_)) => {
402 trace!("watch: IDLE saw untagged data");
403 self.idle_saw_data = true;
404 self.idle_done.store(true, Ordering::SeqCst);
405 self.state = State::Idle(idle);
406 }
407 ImapCoroutineState::Yielded(ImapIdleYield::WantsRead) => {
408 self.state = State::Idle(idle);
409 return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsRead);
410 }
411 ImapCoroutineState::Yielded(ImapIdleYield::WantsWrite(bytes)) => {
412 self.state = State::Idle(idle);
413 return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsWrite(
414 bytes,
415 ));
416 }
417 ImapCoroutineState::Complete(Ok(())) => {
418 if self.shutdown.load(Ordering::SeqCst) {
419 return ImapCoroutineState::Complete(Ok(()));
420 }
421
422 if self.idle_saw_data {
423 let uid_validity = self.uid_validity.unwrap();
425 let modseq = NonZeroU64::new(self.highest_mod_seq)
426 .unwrap_or_else(|| NonZeroU64::new(1).expect("1 is non-zero"));
427 let parameters = vec![SelectParameter::QResync {
428 uid_validity,
429 mod_sequence_value: modseq,
430 known_uids: None,
431 seq_match_data: None,
432 }];
433 let select = ImapMailboxSelect::new(
434 self.mailbox.clone(),
435 ImapMailboxSelectOptions { parameters },
436 );
437 self.state = State::SelectQresync(select);
438 } else {
439 trace!("watch: IDLE timed out with no data, restarting");
440 self.state = State::BeginIdle;
441 }
442 }
443 ImapCoroutineState::Complete(Err(err)) => {
444 return ImapCoroutineState::Complete(Err(err.into()));
445 }
446 },
447
448 State::SelectQresync(mut select) => match select.resume(fragmentizer, arg.take()) {
449 ImapCoroutineState::Yielded(ImapYield::WantsRead) => {
450 self.state = State::SelectQresync(select);
451 return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsRead);
452 }
453 ImapCoroutineState::Yielded(ImapYield::WantsWrite(bytes)) => {
454 self.state = State::SelectQresync(select);
455 return ImapCoroutineState::Yielded(ImapMailboxWatchYield::WantsWrite(
456 bytes,
457 ));
458 }
459 ImapCoroutineState::Complete(Ok(data)) => {
460 self.compute_deltas(&data);
461 if let Some(new_modseq) = data.highest_mod_seq {
462 self.highest_mod_seq = new_modseq;
463 }
464 self.state = State::EmitDeltas;
465 }
466 ImapCoroutineState::Complete(Err(err)) => {
467 return ImapCoroutineState::Complete(Err(err.into()));
468 }
469 },
470
471 State::EmitDeltas => {
472 if let Some(event) = self.pending.pop_front() {
473 self.state = State::EmitDeltas;
474 return ImapCoroutineState::Yielded(ImapMailboxWatchYield::Event(event));
475 }
476 self.state = State::BeginIdle;
477 }
478
479 State::Terminal => {
480 self.state = State::Terminal;
481 return ImapCoroutineState::Complete(Ok(()));
482 }
483 }
484 }
485 }
486}
487
488fn extract_uid_flags(
491 items: &[MessageDataItem<'static>],
492) -> (Option<NonZeroU32>, Vec<Flag<'static>>) {
493 let mut uid = None;
494 let mut flags = Vec::new();
495 for item in items {
496 match item {
497 MessageDataItem::Uid(u) => uid = Some(*u),
498 MessageDataItem::Flags(fs) => {
499 flags = fs
500 .iter()
501 .filter_map(|f| match f {
502 FlagFetch::Flag(flag) => Some(flag.clone()),
503 _ => None,
504 })
505 .collect();
506 }
507 _ => {}
508 }
509 }
510 (uid, flags)
511}