use std::time::Duration;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tracing::trace;
use super::driver::{DriverCommand, IdleTermination};
use super::typed_event::TypedEvent;
use super::{IdleEvent, ImapConnection};
use crate::error::Error;
use crate::types::response::{UntaggedResponse, UntaggedStatus};
impl ImapConnection {
pub async fn idle(
&self,
timeout: Duration,
cancel: CancellationToken,
) -> Result<IdleEvent, Error> {
let (done_tx, done_rx) = oneshot::channel();
let (result_tx, result_rx) = oneshot::channel();
let dcmd = DriverCommand::Idle { done_rx, result_tx };
if self.cmd_tx.send(dcmd).await.is_err() {
return Err(self.observe_driver_panic().await);
}
let deadline = tokio::time::Instant::now() + timeout;
let mut result_rx = result_rx;
let idle_event: IdleEvent = loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
break IdleEvent::Timeout;
}
tokio::select! {
biased;
() = cancel.cancelled() => break IdleEvent::Cancelled,
maybe_ev = self.next_event(remaining) => {
match maybe_ev {
Ok(Some(ev)) => {
if let Some(e) = typed_event_to_idle_event(ev) {
break e;
}
}
Ok(None) => break IdleEvent::Timeout,
Err(Error::DriverGone) => {
return Err(Error::DriverGone);
}
Err(e) => return Err(e),
}
}
idle_result = &mut result_rx => {
return match idle_result {
Ok(Ok(IdleTermination::ServerTerminated)) => {
Ok(IdleEvent::ServerTerminated)
}
Ok(Ok(IdleTermination::ClientDone)) => {
Ok(IdleEvent::ServerTerminated)
}
Ok(Err(e)) => Err(e),
Err(_) => Err(self.observe_driver_panic().await),
};
}
}
};
let _ = done_tx.send(());
trace!("idle handle: sent DONE signal");
match result_rx.await {
Ok(Ok(_)) => {}
Ok(Err(e)) => return Err(e),
Err(_) => return Err(self.observe_driver_panic().await),
}
Ok(idle_event)
}
}
fn typed_event_to_idle_event(ev: TypedEvent) -> Option<IdleEvent> {
match ev {
TypedEvent::Exists(n) => Some(IdleEvent::Exists(n)),
TypedEvent::Recent(n) => Some(IdleEvent::Recent(n)),
TypedEvent::Expunge(n) => Some(IdleEvent::Expunge(n)),
TypedEvent::FetchUpdate(f) => Some(IdleEvent::Fetch(f)),
TypedEvent::Alert(text) => Some(IdleEvent::Alert(text)),
TypedEvent::MailboxEvent(info) => Some(IdleEvent::MailboxEvent(info)),
TypedEvent::Vanished { earlier, uids } => Some(IdleEvent::Vanished { earlier, uids }),
TypedEvent::NotificationOverflow { code, text } => Some(IdleEvent::NotificationOverflow {
code_text: code,
resp_text: text,
}),
TypedEvent::CapabilityChange(_) => {
trace!("idle: suppressing CapabilityChange (handled by state machine)");
None
}
TypedEvent::Bye { code, text } => Some(IdleEvent::Bye { code, text }),
TypedEvent::QueueOverflow {
dropped_count,
since: _,
} => Some(IdleEvent::Alert(format!(
"event queue overflow: {dropped_count} events dropped"
))),
TypedEvent::MetadataChange {} | TypedEvent::ServerMetadataChange {} => {
Some(IdleEvent::ExtensionEvent(
"METADATA change during IDLE".into(),
))
}
TypedEvent::Extension(resp) => untagged_to_idle_event(*resp),
}
}
fn untagged_to_idle_event(resp: UntaggedResponse) -> Option<IdleEvent> {
match resp {
UntaggedResponse::MailboxStatus { mailbox, items } => {
Some(IdleEvent::MailboxStatus { mailbox, items })
}
UntaggedResponse::Metadata { mailbox, entries } => {
Some(IdleEvent::MetadataChange { mailbox, entries })
}
UntaggedResponse::Search { uids, mod_seq: _ } => {
Some(IdleEvent::SearchUpdate(Box::new(
crate::types::EsearchResponse {
tag: None,
uid: false,
all: uids
.into_iter()
.map(crate::types::UidRange::single)
.collect(),
min: None,
max: None,
count: None,
mod_seq: None,
},
)))
}
UntaggedResponse::Esearch(e) => Some(IdleEvent::SearchUpdate(Box::new(e))),
UntaggedResponse::Status {
status,
code: Some(code),
text,
} => Some(IdleEvent::StatusUpdate { status, code, text }),
UntaggedResponse::Status {
status: UntaggedStatus::Ok,
code: None,
text,
} => {
trace!("idle: suppressing informational OK keepalive: {text:?}");
None
}
other => Some(IdleEvent::ExtensionEvent(format!("{other:?}"))),
}
}
#[cfg(test)]
#[path = "idle_tests.rs"]
mod tests;