1pub mod resolver;
2pub mod tasks;
3
4use std::{
5 any::Any,
6 collections::VecDeque,
7 fmt::{Debug, Formatter},
8 marker::PhantomData,
9};
10
11use imap_next::{
12 client::{Client as ClientNext, CommandHandle, Error, Event},
13 imap_types::{
14 auth::AuthenticateData,
15 command::{Command, CommandBody},
16 core::{Tag, TagGenerator},
17 response::{
18 Bye, CommandContinuationRequest, Data, Greeting, Response, Status, StatusBody, Tagged,
19 },
20 },
21 Interrupt, State,
22};
23use thiserror::Error;
24use tracing::trace;
25
26pub trait Task: Send + 'static {
33 type Output: Any + Send;
37
38 fn command_body(&self) -> CommandBody<'static>;
42
43 fn process_data(&mut self, data: Data<'static>) -> Option<Data<'static>> {
45 Some(data)
47 }
48
49 fn process_untagged(
51 &mut self,
52 status_body: StatusBody<'static>,
53 ) -> Option<StatusBody<'static>> {
54 Some(status_body)
56 }
57
58 fn process_continuation_request(
60 &mut self,
61 continuation: CommandContinuationRequest<'static>,
62 ) -> Option<CommandContinuationRequest<'static>> {
63 Some(continuation)
65 }
66
67 fn process_continuation_request_authenticate(
69 &mut self,
70 continuation: CommandContinuationRequest<'static>,
71 ) -> Result<AuthenticateData<'static>, CommandContinuationRequest<'static>> {
72 Err(continuation)
74 }
75
76 fn process_bye(&mut self, bye: Bye<'static>) -> Option<Bye<'static>> {
78 Some(bye)
80 }
81
82 fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output;
86}
87
88pub struct Scheduler {
90 pub client_next: ClientNext,
91 waiting_tasks: TaskMap,
92 active_tasks: TaskMap,
93 pub tag_generator: TagGenerator,
94}
95
96impl Scheduler {
97 pub fn new(client_next: ClientNext) -> Self {
99 Self {
100 client_next,
101 waiting_tasks: Default::default(),
102 active_tasks: Default::default(),
103 tag_generator: TagGenerator::new(),
104 }
105 }
106
107 pub fn enqueue_task<T>(&mut self, task: T) -> TaskHandle<T>
109 where
110 T: Task,
111 {
112 let tag = self.tag_generator.generate();
113
114 let command = {
115 let body = task.command_body();
116 Command {
117 tag: tag.clone(),
118 body,
119 }
120 };
121
122 trace!(?command, "enqueue task");
123
124 let handle = self.client_next.enqueue_command(command);
125
126 self.waiting_tasks.push_back(handle, tag, Box::new(task));
127
128 TaskHandle::new(handle)
129 }
130
131 pub fn enqueue_input(&mut self, bytes: &[u8]) {
132 self.client_next.enqueue_input(bytes);
133 }
134
135 pub fn progress(&mut self) -> Result<SchedulerEvent, Interrupt<SchedulerError>> {
137 loop {
138 let event = match self.client_next.next() {
139 Ok(event) => event,
140 Err(Interrupt::Io(io)) => return Err(Interrupt::Io(io)),
141 Err(Interrupt::Error(err)) => {
142 if let Error::MalformedMessage { discarded_bytes } = &err {
144 let mut cmd = discarded_bytes.declassify().split(|c| c == &b' ').skip(2);
145 if let Some(cmd) = cmd.next() {
146 if cmd.eq_ignore_ascii_case(b"FETCH") {
147 let fetch = String::from_utf8_lossy(discarded_bytes.declassify());
148 tracing::warn!(?fetch, "skipping invalid fetch");
149 continue;
150 }
151 }
152 }
153
154 return Err(Interrupt::Error(SchedulerError::Flow(err)));
155 }
156 };
157
158 match event {
159 Event::GreetingReceived { greeting } => {
160 return Ok(SchedulerEvent::GreetingReceived(greeting));
161 }
162 Event::CommandSent { handle, .. } => {
163 let (handle, tag, task) = self.waiting_tasks.remove_by_handle(handle).unwrap();
165 self.active_tasks.push_back(handle, tag, task);
166 }
167 Event::CommandRejected { handle, status, .. } => {
168 let body = match status {
169 Status::Tagged(Tagged { body, .. }) => body,
170 _ => unreachable!(),
171 };
172
173 let (_, _, task) = self.active_tasks.remove_by_handle(handle).unwrap();
175
176 let output = Some(task.process_tagged(body));
177
178 return Ok(SchedulerEvent::TaskFinished(TaskToken { handle, output }));
179 }
180 Event::AuthenticateStarted { handle } => {
181 let (handle, tag, task) = self.waiting_tasks.remove_by_handle(handle).unwrap();
182 self.active_tasks.push_back(handle, tag, task);
183 }
184 Event::AuthenticateContinuationRequestReceived {
185 handle,
186 continuation_request,
187 } => {
188 let task = self.active_tasks.get_task_by_handle_mut(handle).unwrap();
189
190 let continuation =
191 task.process_continuation_request_authenticate(continuation_request);
192
193 match continuation {
194 Ok(data) => {
195 self.client_next.set_authenticate_data(data).unwrap();
196 }
197 Err(continuation) => {
198 return Ok(SchedulerEvent::Unsolicited(
199 Response::CommandContinuationRequest(continuation),
200 ));
201 }
202 }
203 }
204 Event::AuthenticateStatusReceived { handle, status, .. } => {
205 let (_, _, task) = self.active_tasks.remove_by_handle(handle).unwrap();
206
207 let body = match status {
208 Status::Untagged(_) => unreachable!(),
209 Status::Tagged(tagged) => tagged.body,
210 Status::Bye(_) => unreachable!(),
211 };
212
213 let output = Some(task.process_tagged(body));
214
215 return Ok(SchedulerEvent::TaskFinished(TaskToken { handle, output }));
216 }
217 Event::DataReceived { data } => {
218 if let Some(data) =
219 trickle_down(data, self.active_tasks.tasks_mut(), |task, data| {
220 task.process_data(data)
221 })
222 {
223 return Ok(SchedulerEvent::Unsolicited(Response::Data(data)));
224 }
225 }
226 Event::ContinuationRequestReceived {
227 continuation_request,
228 } => {
229 if let Some(continuation) = trickle_down(
230 continuation_request,
231 self.active_tasks.tasks_mut(),
232 |task, continuation_request| {
233 task.process_continuation_request(continuation_request)
234 },
235 ) {
236 return Ok(SchedulerEvent::Unsolicited(
237 Response::CommandContinuationRequest(continuation),
238 ));
239 }
240 }
241 Event::StatusReceived { status } => match status {
242 Status::Untagged(body) => {
243 if let Some(body) =
244 trickle_down(body, self.active_tasks.tasks_mut(), |task, body| {
245 task.process_untagged(body)
246 })
247 {
248 return Ok(SchedulerEvent::Unsolicited(Response::Status(
249 Status::Untagged(body),
250 )));
251 }
252 }
253 Status::Bye(bye) => {
254 if let Some(bye) =
255 trickle_down(bye, self.active_tasks.tasks_mut(), |task, bye| {
256 task.process_bye(bye)
257 })
258 {
259 return Ok(SchedulerEvent::Unsolicited(Response::Status(Status::Bye(
260 bye,
261 ))));
262 }
263 }
264 Status::Tagged(Tagged { tag, body }) => {
265 let Some((handle, _, task)) = self.active_tasks.remove_by_tag(&tag) else {
266 return Err(Interrupt::Error(
267 SchedulerError::UnexpectedTaggedResponse(Tagged { tag, body }),
268 ));
269 };
270
271 let output = Some(task.process_tagged(body));
272
273 return Ok(SchedulerEvent::TaskFinished(TaskToken { handle, output }));
274 }
275 },
276 Event::IdleCommandSent { handle, .. } => {
277 let (handle, tag, task) = self.waiting_tasks.remove_by_handle(handle).unwrap();
279 self.active_tasks.push_back(handle, tag, task);
280 }
281 Event::IdleAccepted { .. } => {
282 println!("IDLE accepted!");
283 }
284 Event::IdleRejected { handle, status, .. } => {
285 let body = match status {
286 Status::Tagged(Tagged { body, .. }) => body,
287 _ => unreachable!(),
288 };
289
290 let (_, _, task) = self.active_tasks.remove_by_handle(handle).unwrap();
292
293 let output = Some(task.process_tagged(body));
294
295 return Ok(SchedulerEvent::TaskFinished(TaskToken { handle, output }));
296 }
297 Event::IdleDoneSent { .. } => {
298 println!("IDLE done!");
299 }
300 }
301 }
302 }
303}
304
305impl State for Scheduler {
306 type Event = SchedulerEvent;
307 type Error = SchedulerError;
308
309 fn enqueue_input(&mut self, bytes: &[u8]) {
310 self.enqueue_input(bytes);
311 }
312
313 fn next(&mut self) -> Result<Self::Event, Interrupt<Self::Error>> {
314 self.progress()
315 }
316}
317
318#[derive(Default)]
319struct TaskMap {
320 tasks: VecDeque<(CommandHandle, Tag<'static>, Box<dyn TaskAny>)>,
321}
322
323impl TaskMap {
324 fn push_back(&mut self, handle: CommandHandle, tag: Tag<'static>, task: Box<dyn TaskAny>) {
325 self.tasks.push_back((handle, tag, task));
326 }
327
328 fn get_task_by_handle_mut(&mut self, handle: CommandHandle) -> Option<&mut Box<dyn TaskAny>> {
329 self.tasks
330 .iter_mut()
331 .find_map(|(current_handle, _, task)| (handle == *current_handle).then_some(task))
332 }
333
334 fn tasks_mut(&mut self) -> impl Iterator<Item = &mut Box<dyn TaskAny>> {
335 self.tasks.iter_mut().map(|(_, _, task)| task)
336 }
337
338 fn remove_by_handle(
339 &mut self,
340 handle: CommandHandle,
341 ) -> Option<(CommandHandle, Tag<'static>, Box<dyn TaskAny>)> {
342 let index = self
343 .tasks
344 .iter()
345 .position(|(current_handle, _, _)| handle == *current_handle)?;
346 self.tasks.remove(index)
347 }
348
349 fn remove_by_tag(
350 &mut self,
351 tag: &Tag,
352 ) -> Option<(CommandHandle, Tag<'static>, Box<dyn TaskAny>)> {
353 let index = self
354 .tasks
355 .iter()
356 .position(|(_, current_tag, _)| tag == current_tag)?;
357 self.tasks.remove(index)
358 }
359}
360
361#[derive(Debug)]
362pub enum SchedulerEvent {
363 GreetingReceived(Greeting<'static>),
364 TaskFinished(TaskToken),
365 Unsolicited(Response<'static>),
366}
367
368#[derive(Debug, Error)]
369pub enum SchedulerError {
370 #[error("flow error")]
372 Flow(#[from] Error),
373 #[error("unexpected tag in command completion result")]
381 UnexpectedTaggedResponse(Tagged<'static>),
382 #[error("unexpected BYE response")]
383 UnexpectedByeResponse(Bye<'static>),
384}
385
386#[derive(Eq)]
387pub struct TaskHandle<T: Task> {
388 handle: CommandHandle,
389 _t: PhantomData<T>,
390}
391
392impl<T: Task> TaskHandle<T> {
393 fn new(handle: CommandHandle) -> Self {
394 Self {
395 handle,
396 _t: Default::default(),
397 }
398 }
399
400 pub fn resolve(&self, token: &mut TaskToken) -> Option<T::Output> {
404 if token.handle != self.handle {
405 return None;
406 }
407
408 let output = token.output.take()?;
409 let output = output.downcast::<T::Output>().unwrap();
410
411 Some(*output)
412 }
413}
414
415impl<T: Task> Debug for TaskHandle<T> {
416 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
417 f.debug_struct("TaskHandle")
418 .field("handle", &self.handle)
419 .finish()
420 }
421}
422
423impl<T: Task> Clone for TaskHandle<T> {
424 fn clone(&self) -> Self {
425 *self
426 }
427}
428
429impl<T: Task> Copy for TaskHandle<T> {}
430
431impl<T: Task> PartialEq for TaskHandle<T> {
432 fn eq(&self, other: &Self) -> bool {
433 self.handle == other.handle
434 }
435}
436
437#[derive(Debug)]
438pub struct TaskToken {
439 handle: CommandHandle,
440 output: Option<Box<dyn Any + Send>>,
441}
442
443fn trickle_down<T, F, I>(trickle: T, consumers: I, f: F) -> Option<T>
449where
450 I: Iterator,
451 F: Fn(&mut I::Item, T) -> Option<T>,
452{
453 let mut trickle = Some(trickle);
454
455 for mut consumer in consumers {
456 if let Some(trickle_) = trickle {
457 trickle = f(&mut consumer, trickle_);
458
459 if trickle.is_none() {
460 break;
461 }
462 }
463 }
464
465 trickle
466}
467
468trait TaskAny: Send {
475 fn process_data(&mut self, data: Data<'static>) -> Option<Data<'static>>;
476
477 fn process_untagged(&mut self, status_body: StatusBody<'static>)
478 -> Option<StatusBody<'static>>;
479
480 fn process_continuation_request(
481 &mut self,
482 continuation_request: CommandContinuationRequest<'static>,
483 ) -> Option<CommandContinuationRequest<'static>>;
484
485 fn process_continuation_request_authenticate(
486 &mut self,
487 continuation_request: CommandContinuationRequest<'static>,
488 ) -> Result<AuthenticateData<'static>, CommandContinuationRequest<'static>>;
489
490 fn process_bye(&mut self, bye: Bye<'static>) -> Option<Bye<'static>>;
491
492 fn process_tagged(self: Box<Self>, status_body: StatusBody<'static>) -> Box<dyn Any + Send>;
493}
494
495impl<T> TaskAny for T
496where
497 T: Task,
498{
499 fn process_data(&mut self, data: Data<'static>) -> Option<Data<'static>> {
500 T::process_data(self, data)
501 }
502
503 fn process_untagged(
504 &mut self,
505 status_body: StatusBody<'static>,
506 ) -> Option<StatusBody<'static>> {
507 T::process_untagged(self, status_body)
508 }
509
510 fn process_continuation_request(
511 &mut self,
512 continuation_request: CommandContinuationRequest<'static>,
513 ) -> Option<CommandContinuationRequest<'static>> {
514 T::process_continuation_request(self, continuation_request)
515 }
516
517 fn process_continuation_request_authenticate(
518 &mut self,
519 continuation_request: CommandContinuationRequest<'static>,
520 ) -> Result<AuthenticateData<'static>, CommandContinuationRequest<'static>> {
521 T::process_continuation_request_authenticate(self, continuation_request)
522 }
523
524 fn process_bye(&mut self, bye: Bye<'static>) -> Option<Bye<'static>> {
525 T::process_bye(self, bye)
526 }
527
528 fn process_tagged(self: Box<Self>, status_body: StatusBody<'static>) -> Box<dyn Any + Send> {
530 Box::new(T::process_tagged(*self, status_body))
531 }
532}
533
534#[cfg(test)]
535mod tests {
536 use static_assertions::assert_impl_all;
537
538 use super::Scheduler;
539
540 assert_impl_all!(Scheduler: Send);
541}