imap_client/tasks/
mod.rs

1pub mod resolver;
2pub mod tasks;
3
4use std::{
5    any::Any,
6    collections::VecDeque,
7    fmt::{Debug, Formatter},
8    marker::PhantomData,
9};
10
11use imap_next::{
12    client::{Client as ClientNext, CommandHandle, Error, Event},
13    imap_types::{
14        auth::AuthenticateData,
15        command::{Command, CommandBody},
16        core::{Tag, TagGenerator},
17        response::{
18            Bye, CommandContinuationRequest, Data, Greeting, Response, Status, StatusBody, Tagged,
19        },
20    },
21    Interrupt, State,
22};
23use thiserror::Error;
24use tracing::trace;
25
26/// Tells how a specific IMAP [`Command`] is processed.
27///
28/// Most `process_` trait methods consume interesting responses (returning `None`),
29/// and move out uninteresting responses (returning `Some(...)`).
30///
31/// If no active task is interested in a given response, we call this response "unsolicited".
32pub trait Task: Send + 'static {
33    /// Output of the task.
34    ///
35    /// Returned in [`Self::process_tagged`].
36    type Output: Any + Send;
37
38    /// Returns the [`CommandBody`] to issue for this task.
39    ///
40    /// Note: The [`Scheduler`] will tag the [`CommandBody`] creating a complete [`Command`].
41    fn command_body(&self) -> CommandBody<'static>;
42
43    /// Process data response.
44    fn process_data(&mut self, data: Data<'static>) -> Option<Data<'static>> {
45        // Default: Don't process server data
46        Some(data)
47    }
48
49    /// Process untagged response.
50    fn process_untagged(
51        &mut self,
52        status_body: StatusBody<'static>,
53    ) -> Option<StatusBody<'static>> {
54        // Default: Don't process untagged status
55        Some(status_body)
56    }
57
58    /// Process command continuation request response.
59    fn process_continuation_request(
60        &mut self,
61        continuation: CommandContinuationRequest<'static>,
62    ) -> Option<CommandContinuationRequest<'static>> {
63        // Default: Don't process command continuation request response
64        Some(continuation)
65    }
66
67    /// Process command continuation request response (during authenticate).
68    fn process_continuation_request_authenticate(
69        &mut self,
70        continuation: CommandContinuationRequest<'static>,
71    ) -> Result<AuthenticateData<'static>, CommandContinuationRequest<'static>> {
72        // Default: Don't process command continuation request response (during authenticate)
73        Err(continuation)
74    }
75
76    /// Process bye response.
77    fn process_bye(&mut self, bye: Bye<'static>) -> Option<Bye<'static>> {
78        // Default: Don't process bye
79        Some(bye)
80    }
81
82    /// Process command completion result response.
83    ///
84    /// The [`Scheduler`] already chooses the corresponding response by tag.
85    fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output;
86}
87
88/// Scheduler managing enqueued tasks and routing incoming responses to active tasks.
89pub struct Scheduler {
90    pub client_next: ClientNext,
91    waiting_tasks: TaskMap,
92    active_tasks: TaskMap,
93    pub tag_generator: TagGenerator,
94}
95
96impl Scheduler {
97    /// Create a new scheduler.
98    pub fn new(client_next: ClientNext) -> Self {
99        Self {
100            client_next,
101            waiting_tasks: Default::default(),
102            active_tasks: Default::default(),
103            tag_generator: TagGenerator::new(),
104        }
105    }
106
107    /// Enqueue a [`Task`].
108    pub fn enqueue_task<T>(&mut self, task: T) -> TaskHandle<T>
109    where
110        T: Task,
111    {
112        let tag = self.tag_generator.generate();
113
114        let command = {
115            let body = task.command_body();
116            Command {
117                tag: tag.clone(),
118                body,
119            }
120        };
121
122        trace!(?command, "enqueue task");
123
124        let handle = self.client_next.enqueue_command(command);
125
126        self.waiting_tasks.push_back(handle, tag, Box::new(task));
127
128        TaskHandle::new(handle)
129    }
130
131    pub fn enqueue_input(&mut self, bytes: &[u8]) {
132        self.client_next.enqueue_input(bytes);
133    }
134
135    /// Progress the connection returning the next event.
136    pub fn progress(&mut self) -> Result<SchedulerEvent, Interrupt<SchedulerError>> {
137        loop {
138            let event = match self.client_next.next() {
139                Ok(event) => event,
140                Err(Interrupt::Io(io)) => return Err(Interrupt::Io(io)),
141                Err(Interrupt::Error(err)) => {
142                    // HACK: skip bad fetches, improve me
143                    if let Error::MalformedMessage { discarded_bytes } = &err {
144                        let mut cmd = discarded_bytes.declassify().split(|c| c == &b' ').skip(2);
145                        if let Some(cmd) = cmd.next() {
146                            if cmd.eq_ignore_ascii_case(b"FETCH") {
147                                let fetch = String::from_utf8_lossy(discarded_bytes.declassify());
148                                tracing::warn!(?fetch, "skipping invalid fetch");
149                                continue;
150                            }
151                        }
152                    }
153
154                    return Err(Interrupt::Error(SchedulerError::Flow(err)));
155                }
156            };
157
158            match event {
159                Event::GreetingReceived { greeting } => {
160                    return Ok(SchedulerEvent::GreetingReceived(greeting));
161                }
162                Event::CommandSent { handle, .. } => {
163                    // This `unwrap` can't fail because `waiting_tasks` contains all unsent `Commands`.
164                    let (handle, tag, task) = self.waiting_tasks.remove_by_handle(handle).unwrap();
165                    self.active_tasks.push_back(handle, tag, task);
166                }
167                Event::CommandRejected { handle, status, .. } => {
168                    let body = match status {
169                        Status::Tagged(Tagged { body, .. }) => body,
170                        _ => unreachable!(),
171                    };
172
173                    // This `unwrap` can't fail because `active_tasks` contains all in-progress `Commands`.
174                    let (_, _, task) = self.active_tasks.remove_by_handle(handle).unwrap();
175
176                    let output = Some(task.process_tagged(body));
177
178                    return Ok(SchedulerEvent::TaskFinished(TaskToken { handle, output }));
179                }
180                Event::AuthenticateStarted { handle } => {
181                    let (handle, tag, task) = self.waiting_tasks.remove_by_handle(handle).unwrap();
182                    self.active_tasks.push_back(handle, tag, task);
183                }
184                Event::AuthenticateContinuationRequestReceived {
185                    handle,
186                    continuation_request,
187                } => {
188                    let task = self.active_tasks.get_task_by_handle_mut(handle).unwrap();
189
190                    let continuation =
191                        task.process_continuation_request_authenticate(continuation_request);
192
193                    match continuation {
194                        Ok(data) => {
195                            self.client_next.set_authenticate_data(data).unwrap();
196                        }
197                        Err(continuation) => {
198                            return Ok(SchedulerEvent::Unsolicited(
199                                Response::CommandContinuationRequest(continuation),
200                            ));
201                        }
202                    }
203                }
204                Event::AuthenticateStatusReceived { handle, status, .. } => {
205                    let (_, _, task) = self.active_tasks.remove_by_handle(handle).unwrap();
206
207                    let body = match status {
208                        Status::Untagged(_) => unreachable!(),
209                        Status::Tagged(tagged) => tagged.body,
210                        Status::Bye(_) => unreachable!(),
211                    };
212
213                    let output = Some(task.process_tagged(body));
214
215                    return Ok(SchedulerEvent::TaskFinished(TaskToken { handle, output }));
216                }
217                Event::DataReceived { data } => {
218                    if let Some(data) =
219                        trickle_down(data, self.active_tasks.tasks_mut(), |task, data| {
220                            task.process_data(data)
221                        })
222                    {
223                        return Ok(SchedulerEvent::Unsolicited(Response::Data(data)));
224                    }
225                }
226                Event::ContinuationRequestReceived {
227                    continuation_request,
228                } => {
229                    if let Some(continuation) = trickle_down(
230                        continuation_request,
231                        self.active_tasks.tasks_mut(),
232                        |task, continuation_request| {
233                            task.process_continuation_request(continuation_request)
234                        },
235                    ) {
236                        return Ok(SchedulerEvent::Unsolicited(
237                            Response::CommandContinuationRequest(continuation),
238                        ));
239                    }
240                }
241                Event::StatusReceived { status } => match status {
242                    Status::Untagged(body) => {
243                        if let Some(body) =
244                            trickle_down(body, self.active_tasks.tasks_mut(), |task, body| {
245                                task.process_untagged(body)
246                            })
247                        {
248                            return Ok(SchedulerEvent::Unsolicited(Response::Status(
249                                Status::Untagged(body),
250                            )));
251                        }
252                    }
253                    Status::Bye(bye) => {
254                        if let Some(bye) =
255                            trickle_down(bye, self.active_tasks.tasks_mut(), |task, bye| {
256                                task.process_bye(bye)
257                            })
258                        {
259                            return Ok(SchedulerEvent::Unsolicited(Response::Status(Status::Bye(
260                                bye,
261                            ))));
262                        }
263                    }
264                    Status::Tagged(Tagged { tag, body }) => {
265                        let Some((handle, _, task)) = self.active_tasks.remove_by_tag(&tag) else {
266                            return Err(Interrupt::Error(
267                                SchedulerError::UnexpectedTaggedResponse(Tagged { tag, body }),
268                            ));
269                        };
270
271                        let output = Some(task.process_tagged(body));
272
273                        return Ok(SchedulerEvent::TaskFinished(TaskToken { handle, output }));
274                    }
275                },
276                Event::IdleCommandSent { handle, .. } => {
277                    // This `unwrap` can't fail because `waiting_tasks` contains all unsent `Commands`.
278                    let (handle, tag, task) = self.waiting_tasks.remove_by_handle(handle).unwrap();
279                    self.active_tasks.push_back(handle, tag, task);
280                }
281                Event::IdleAccepted { .. } => {
282                    println!("IDLE accepted!");
283                }
284                Event::IdleRejected { handle, status, .. } => {
285                    let body = match status {
286                        Status::Tagged(Tagged { body, .. }) => body,
287                        _ => unreachable!(),
288                    };
289
290                    // This `unwrap` can't fail because `active_tasks` contains all in-progress `Commands`.
291                    let (_, _, task) = self.active_tasks.remove_by_handle(handle).unwrap();
292
293                    let output = Some(task.process_tagged(body));
294
295                    return Ok(SchedulerEvent::TaskFinished(TaskToken { handle, output }));
296                }
297                Event::IdleDoneSent { .. } => {
298                    println!("IDLE done!");
299                }
300            }
301        }
302    }
303}
304
305impl State for Scheduler {
306    type Event = SchedulerEvent;
307    type Error = SchedulerError;
308
309    fn enqueue_input(&mut self, bytes: &[u8]) {
310        self.enqueue_input(bytes);
311    }
312
313    fn next(&mut self) -> Result<Self::Event, Interrupt<Self::Error>> {
314        self.progress()
315    }
316}
317
318#[derive(Default)]
319struct TaskMap {
320    tasks: VecDeque<(CommandHandle, Tag<'static>, Box<dyn TaskAny>)>,
321}
322
323impl TaskMap {
324    fn push_back(&mut self, handle: CommandHandle, tag: Tag<'static>, task: Box<dyn TaskAny>) {
325        self.tasks.push_back((handle, tag, task));
326    }
327
328    fn get_task_by_handle_mut(&mut self, handle: CommandHandle) -> Option<&mut Box<dyn TaskAny>> {
329        self.tasks
330            .iter_mut()
331            .find_map(|(current_handle, _, task)| (handle == *current_handle).then_some(task))
332    }
333
334    fn tasks_mut(&mut self) -> impl Iterator<Item = &mut Box<dyn TaskAny>> {
335        self.tasks.iter_mut().map(|(_, _, task)| task)
336    }
337
338    fn remove_by_handle(
339        &mut self,
340        handle: CommandHandle,
341    ) -> Option<(CommandHandle, Tag<'static>, Box<dyn TaskAny>)> {
342        let index = self
343            .tasks
344            .iter()
345            .position(|(current_handle, _, _)| handle == *current_handle)?;
346        self.tasks.remove(index)
347    }
348
349    fn remove_by_tag(
350        &mut self,
351        tag: &Tag,
352    ) -> Option<(CommandHandle, Tag<'static>, Box<dyn TaskAny>)> {
353        let index = self
354            .tasks
355            .iter()
356            .position(|(_, current_tag, _)| tag == current_tag)?;
357        self.tasks.remove(index)
358    }
359}
360
361#[derive(Debug)]
362pub enum SchedulerEvent {
363    GreetingReceived(Greeting<'static>),
364    TaskFinished(TaskToken),
365    Unsolicited(Response<'static>),
366}
367
368#[derive(Debug, Error)]
369pub enum SchedulerError {
370    /// Flow error.
371    #[error("flow error")]
372    Flow(#[from] Error),
373    /// Unexpected tag in command completion result.
374    ///
375    /// The scheduler received a tag that cannot be matched to an active command.
376    /// This could be due to a severe implementation error in the scheduler,
377    /// the server, or anything in-between, really.
378    ///
379    /// It's better to halt the execution to avoid damage.
380    #[error("unexpected tag in command completion result")]
381    UnexpectedTaggedResponse(Tagged<'static>),
382    #[error("unexpected BYE response")]
383    UnexpectedByeResponse(Bye<'static>),
384}
385
386#[derive(Eq)]
387pub struct TaskHandle<T: Task> {
388    handle: CommandHandle,
389    _t: PhantomData<T>,
390}
391
392impl<T: Task> TaskHandle<T> {
393    fn new(handle: CommandHandle) -> Self {
394        Self {
395            handle,
396            _t: Default::default(),
397        }
398    }
399
400    /// Try resolving the task invalidating the token.
401    ///
402    /// The token is invalidated iff the return value is `Some`.
403    pub fn resolve(&self, token: &mut TaskToken) -> Option<T::Output> {
404        if token.handle != self.handle {
405            return None;
406        }
407
408        let output = token.output.take()?;
409        let output = output.downcast::<T::Output>().unwrap();
410
411        Some(*output)
412    }
413}
414
415impl<T: Task> Debug for TaskHandle<T> {
416    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
417        f.debug_struct("TaskHandle")
418            .field("handle", &self.handle)
419            .finish()
420    }
421}
422
423impl<T: Task> Clone for TaskHandle<T> {
424    fn clone(&self) -> Self {
425        *self
426    }
427}
428
429impl<T: Task> Copy for TaskHandle<T> {}
430
431impl<T: Task> PartialEq for TaskHandle<T> {
432    fn eq(&self, other: &Self) -> bool {
433        self.handle == other.handle
434    }
435}
436
437#[derive(Debug)]
438pub struct TaskToken {
439    handle: CommandHandle,
440    output: Option<Box<dyn Any + Send>>,
441}
442
443// -------------------------------------------------------------------------------------------------
444
445/// Move `trickle` from consumer to consumer until the first consumer doesn't hand it back.
446///
447/// If none of the consumers is interested in `trickle`, give it back.
448fn trickle_down<T, F, I>(trickle: T, consumers: I, f: F) -> Option<T>
449where
450    I: Iterator,
451    F: Fn(&mut I::Item, T) -> Option<T>,
452{
453    let mut trickle = Some(trickle);
454
455    for mut consumer in consumers {
456        if let Some(trickle_) = trickle {
457            trickle = f(&mut consumer, trickle_);
458
459            if trickle.is_none() {
460                break;
461            }
462        }
463    }
464
465    trickle
466}
467
468// -------------------------------------------------------------------------------------------------
469
470/// Helper trait that ...
471///
472/// * doesn't have an associated type and uses [`Any`] in [`Self::process_tagged`]
473/// * is an object-safe "subset" of [`Task`]
474trait TaskAny: Send {
475    fn process_data(&mut self, data: Data<'static>) -> Option<Data<'static>>;
476
477    fn process_untagged(&mut self, status_body: StatusBody<'static>)
478        -> Option<StatusBody<'static>>;
479
480    fn process_continuation_request(
481        &mut self,
482        continuation_request: CommandContinuationRequest<'static>,
483    ) -> Option<CommandContinuationRequest<'static>>;
484
485    fn process_continuation_request_authenticate(
486        &mut self,
487        continuation_request: CommandContinuationRequest<'static>,
488    ) -> Result<AuthenticateData<'static>, CommandContinuationRequest<'static>>;
489
490    fn process_bye(&mut self, bye: Bye<'static>) -> Option<Bye<'static>>;
491
492    fn process_tagged(self: Box<Self>, status_body: StatusBody<'static>) -> Box<dyn Any + Send>;
493}
494
495impl<T> TaskAny for T
496where
497    T: Task,
498{
499    fn process_data(&mut self, data: Data<'static>) -> Option<Data<'static>> {
500        T::process_data(self, data)
501    }
502
503    fn process_untagged(
504        &mut self,
505        status_body: StatusBody<'static>,
506    ) -> Option<StatusBody<'static>> {
507        T::process_untagged(self, status_body)
508    }
509
510    fn process_continuation_request(
511        &mut self,
512        continuation_request: CommandContinuationRequest<'static>,
513    ) -> Option<CommandContinuationRequest<'static>> {
514        T::process_continuation_request(self, continuation_request)
515    }
516
517    fn process_continuation_request_authenticate(
518        &mut self,
519        continuation_request: CommandContinuationRequest<'static>,
520    ) -> Result<AuthenticateData<'static>, CommandContinuationRequest<'static>> {
521        T::process_continuation_request_authenticate(self, continuation_request)
522    }
523
524    fn process_bye(&mut self, bye: Bye<'static>) -> Option<Bye<'static>> {
525        T::process_bye(self, bye)
526    }
527
528    /// Returns [`Any`] instead of [`Task::Output`].
529    fn process_tagged(self: Box<Self>, status_body: StatusBody<'static>) -> Box<dyn Any + Send> {
530        Box::new(T::process_tagged(*self, status_body))
531    }
532}
533
534#[cfg(test)]
535mod tests {
536    use static_assertions::assert_impl_all;
537
538    use super::Scheduler;
539
540    assert_impl_all!(Scheduler: Send);
541}