use crate::connection::helpers::inbox_eq;
use crate::connection::NotifyFlags;
use crate::error::Error;
use crate::types::response::{
Capability, ContinuationRequest, ResponseCode, TaggedResponse, UntaggedResponse, UntaggedStatus,
};
use crate::types::validated::MailboxName;
pub(crate) trait Consumer: Send {
type Output: Send + 'static;
fn on_response(
&mut self,
resp: UntaggedResponse,
notify_snapshot: NotifyFlags,
ctx: &ConsumerContext,
);
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
ctx: &ConsumerContext,
) -> Result<Finalized<Self::Output>, Error>;
}
pub(crate) struct Finalized<T> {
pub output: T,
pub reclassified_as_events: Vec<UntaggedResponse>,
}
pub(crate) trait ContinuationConsumer: Consumer {
fn on_continuation(
&mut self,
cont: ContinuationRequest,
ctx: &ConsumerContext,
) -> Result<ContinuationReply, Error>;
}
pub(crate) enum ContinuationReply {
Write(Vec<u8>),
}
pub(crate) struct ConsumerContext<'a> {
pub(in crate::connection) capabilities: &'a [Capability],
pub(in crate::connection) enabled: &'a [String],
pub(in crate::connection) command_target: Option<&'a MailboxName>,
pub(in crate::connection) command_tag: &'a str,
}
impl ConsumerContext<'_> {
pub(crate) fn capabilities(&self) -> &[Capability] {
self.capabilities
}
pub(crate) fn enabled(&self) -> &[String] {
self.enabled
}
pub(crate) fn command_target(&self) -> Option<&MailboxName> {
self.command_target
}
pub(crate) fn command_tag(&self) -> &str {
self.command_tag
}
}
#[derive(Default)]
pub(crate) struct TaggedOkConsumer {
buffered: Vec<UntaggedResponse>,
}
impl Consumer for TaggedOkConsumer {
type Output = ();
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
self.buffered.push(resp);
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<()>, Error> {
tagged.require_ok()?;
Ok(Finalized {
output: (),
reclassified_as_events: self.buffered,
})
}
}
#[derive(Default)]
pub(crate) struct CapabilityConsumer {
caps: Option<Vec<Capability>>,
buffered: Vec<UntaggedResponse>,
}
impl Consumer for CapabilityConsumer {
type Output = Vec<Capability>;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
if let UntaggedResponse::Capability(ref c) = resp {
self.caps = Some(c.clone());
} else {
self.buffered.push(resp);
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Vec<Capability>>, Error> {
let tagged = tagged.require_ok()?;
let caps = if let Some(c) = self.caps {
c
} else if let Some(ResponseCode::Capability(c)) = tagged.code {
c
} else {
return Err(Error::Protocol(
"CAPABILITY OK but no capability data in response \
(RFC 3501 Section 6.1.1)"
.into(),
));
};
Ok(Finalized {
output: caps,
reclassified_as_events: self.buffered,
})
}
}
#[derive(Default)]
pub(crate) struct LogoutConsumer {
saw_bye: bool,
buffered: Vec<UntaggedResponse>,
}
impl Consumer for LogoutConsumer {
type Output = ();
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
if matches!(
&resp,
UntaggedResponse::Status {
status: UntaggedStatus::Bye,
..
}
) {
self.saw_bye = true;
}
self.buffered.push(resp);
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<()>, Error> {
if !self.saw_bye {
return Err(Error::Protocol(
"LOGOUT: server did not send mandatory BYE \
(RFC 3501 Section 6.1.3)"
.into(),
));
}
tagged.require_ok()?;
Ok(Finalized {
output: (),
reclassified_as_events: self.buffered,
})
}
}
#[derive(Default)]
pub(crate) struct CreateConsumer {
buffered: Vec<UntaggedResponse>,
}
impl Consumer for CreateConsumer {
type Output = Option<String>;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
self.buffered.push(resp);
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Option<String>>, Error> {
let tagged = tagged.require_ok()?;
let mailbox_id = match tagged.code {
Some(ResponseCode::MailboxId(id)) => Some(id),
_ => None,
};
Ok(Finalized {
output: mailbox_id,
reclassified_as_events: self.buffered,
})
}
}
use crate::types::response::StatusKind;
fn require_ok_auth(tagged: TaggedResponse) -> Result<TaggedResponse, Error> {
match tagged.status {
StatusKind::Ok => Ok(tagged),
StatusKind::No => Err(Error::auth_with_code(tagged.text, tagged.code)),
StatusKind::Bad => Err(Error::bad_with_code(tagged.text, tagged.code)),
}
}
#[derive(Default)]
pub(crate) struct LoginConsumer {
caps_seen: bool,
buffered: Vec<UntaggedResponse>,
}
impl Consumer for LoginConsumer {
type Output = bool;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
if matches!(&resp, UntaggedResponse::Capability(_)) {
self.caps_seen = true;
}
self.buffered.push(resp);
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<bool>, Error> {
let tagged = require_ok_auth(tagged)?;
let caps_in_tagged = matches!(&tagged.code, Some(ResponseCode::Capability(_)));
Ok(Finalized {
output: self.caps_seen || caps_in_tagged,
reclassified_as_events: self.buffered,
})
}
}
pub(crate) struct AuthenticatePlainConsumer {
encoded: String,
initial_sent: bool,
caps_seen: bool,
buffered: Vec<UntaggedResponse>,
}
impl AuthenticatePlainConsumer {
pub(crate) fn new(encoded: String, sasl_ir_used: bool) -> Self {
Self {
encoded,
initial_sent: sasl_ir_used,
caps_seen: false,
buffered: Vec::new(),
}
}
}
impl Consumer for AuthenticatePlainConsumer {
type Output = bool;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
if matches!(&resp, UntaggedResponse::Capability(_)) {
self.caps_seen = true;
}
self.buffered.push(resp);
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<bool>, Error> {
let tagged = require_ok_auth(tagged)?;
let caps_in_tagged = matches!(&tagged.code, Some(ResponseCode::Capability(_)));
Ok(Finalized {
output: self.caps_seen || caps_in_tagged,
reclassified_as_events: self.buffered,
})
}
}
impl ContinuationConsumer for AuthenticatePlainConsumer {
fn on_continuation(
&mut self,
_cont: ContinuationRequest,
_ctx: &ConsumerContext,
) -> Result<ContinuationReply, Error> {
if self.initial_sent {
return Err(Error::Protocol(
"unexpected continuation after PLAIN initial response \
(RFC 4616 Section 2)"
.into(),
));
}
self.initial_sent = true;
let mut bytes = Vec::with_capacity(self.encoded.len() + 2);
bytes.extend_from_slice(self.encoded.as_bytes());
bytes.extend_from_slice(b"\r\n");
Ok(ContinuationReply::Write(bytes))
}
}
pub(crate) struct AuthenticateXoauth2Consumer {
encoded: String,
initial_sent: bool,
caps_seen: bool,
buffered: Vec<UntaggedResponse>,
}
impl AuthenticateXoauth2Consumer {
pub(crate) fn new(encoded: String, sasl_ir_used: bool) -> Self {
Self {
encoded,
initial_sent: sasl_ir_used,
caps_seen: false,
buffered: Vec::new(),
}
}
}
impl Consumer for AuthenticateXoauth2Consumer {
type Output = bool;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
if matches!(&resp, UntaggedResponse::Capability(_)) {
self.caps_seen = true;
}
self.buffered.push(resp);
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<bool>, Error> {
let tagged = require_ok_auth(tagged)?;
let caps_in_tagged = matches!(&tagged.code, Some(ResponseCode::Capability(_)));
Ok(Finalized {
output: self.caps_seen || caps_in_tagged,
reclassified_as_events: self.buffered,
})
}
}
impl ContinuationConsumer for AuthenticateXoauth2Consumer {
fn on_continuation(
&mut self,
_cont: ContinuationRequest,
_ctx: &ConsumerContext,
) -> Result<ContinuationReply, Error> {
if self.initial_sent {
Ok(ContinuationReply::Write(b"\r\n".to_vec()))
} else {
self.initial_sent = true;
let mut bytes = Vec::with_capacity(self.encoded.len() + 2);
bytes.extend_from_slice(self.encoded.as_bytes());
bytes.extend_from_slice(b"\r\n");
Ok(ContinuationReply::Write(bytes))
}
}
}
use crate::types::SelectedMailbox;
pub(crate) struct SelectConsumer {
is_examine: bool,
responses: Vec<UntaggedResponse>,
}
impl SelectConsumer {
pub(crate) fn new(is_examine: bool) -> Self {
Self {
is_examine,
responses: Vec::new(),
}
}
}
impl Consumer for SelectConsumer {
type Output = Result<SelectedMailbox, Error>;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
self.responses.push(resp);
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
ctx: &ConsumerContext,
) -> Result<Finalized<Result<SelectedMailbox, Error>>, Error> {
match tagged.status {
StatusKind::No => Ok(Finalized {
output: Err(Error::no_with_code(tagged.text, tagged.code)),
reclassified_as_events: self.responses,
}),
StatusKind::Bad => Ok(Finalized {
output: Err(Error::bad_with_code(tagged.text, tagged.code)),
reclassified_as_events: self.responses,
}),
StatusKind::Ok => {
let read_only = if self.is_examine {
true
} else {
tagged.code.as_ref() == Some(&ResponseCode::ReadOnly)
};
let effective = super::selected_mailbox_effective_responses(&self.responses);
if let Err(e) = validate_select_responses(effective, self.is_examine, ctx) {
return Ok(Finalized {
output: Err(e),
reclassified_as_events: self.responses,
});
}
let result = super::build_selected_mailbox(&self.responses, &tagged, read_only);
let reclassified = reclassify_select_responses(self.responses, ctx);
Ok(Finalized {
output: Ok(result),
reclassified_as_events: reclassified,
})
}
}
}
}
fn reclassify_select_responses(
responses: Vec<UntaggedResponse>,
ctx: &ConsumerContext,
) -> Vec<UntaggedResponse> {
let closed_idx = responses.iter().rposition(|r| {
matches!(
r,
UntaggedResponse::Status {
code: Some(ResponseCode::Closed),
..
}
)
});
let mut reclassified = Vec::new();
let mut consumed_select_list = false;
match closed_idx {
Some(idx) => {
let mut owned = responses;
let post = owned.split_off(idx + 1);
owned.pop();
reclassified = owned;
for r in post {
if !is_select_solicited_response(&r, ctx, &mut consumed_select_list) {
reclassified.push(r);
}
}
}
None => {
for r in responses {
if !is_select_solicited_response(&r, ctx, &mut consumed_select_list) {
reclassified.push(r);
}
}
}
}
reclassified
}
fn is_select_solicited_response(
resp: &UntaggedResponse,
ctx: &ConsumerContext,
consumed_select_list: &mut bool,
) -> bool {
match resp {
UntaggedResponse::Exists(_)
| UntaggedResponse::Recent(_)
| UntaggedResponse::Flags(_)
| UntaggedResponse::Vanished { .. }
| UntaggedResponse::Fetch(_)
| UntaggedResponse::Status { code: Some(_), .. } => true,
UntaggedResponse::List(info) => {
if *consumed_select_list {
return false;
}
if let Some(target) = ctx.command_target() {
if inbox_eq(target.as_str(), info.name.as_str())
&& !super::is_notify_list_event(info, true)
{
*consumed_select_list = true;
return true;
}
}
false
}
_ => false,
}
}
fn validate_select_responses(
effective: &[UntaggedResponse],
is_examine: bool,
ctx: &ConsumerContext,
) -> Result<(), Error> {
let is_rev2 = {
let has_rev2 = ctx.capabilities().contains(&Capability::Imap4Rev2);
let has_rev1 = ctx.capabilities().contains(&Capability::Imap4Rev1);
if has_rev2 && has_rev1 {
ctx.enabled()
.iter()
.any(|e| e.eq_ignore_ascii_case("IMAP4REV2"))
} else {
has_rev2
}
};
let command_name = if is_examine { "EXAMINE" } else { "SELECT" };
let section = match (is_rev2, is_examine) {
(true, false) => "RFC 9051 Section 6.3.2",
(true, true) => "RFC 9051 Section 6.3.3",
(false, false) => "RFC 3501 Section 6.3.1",
(false, true) => "RFC 3501 Section 6.3.2",
};
let mut saw_flags = false;
let mut saw_exists = false;
let mut saw_recent = false;
let mut saw_list = false;
for resp in effective {
match resp {
UntaggedResponse::Flags(_) => saw_flags = true,
UntaggedResponse::Exists(_) => saw_exists = true,
UntaggedResponse::Recent(_) => saw_recent = true,
UntaggedResponse::List(info) => {
if let Some(target) = ctx.command_target() {
if inbox_eq(target.as_str(), info.name.as_str())
&& !super::is_notify_list_event(info, true)
{
saw_list = true;
}
}
}
_ => {}
}
}
if !saw_flags {
return Err(Error::Protocol(format!(
"{command_name} completed without the required FLAGS response ({section})"
)));
}
if !saw_exists {
return Err(Error::Protocol(format!(
"{command_name} completed without the required EXISTS response ({section})"
)));
}
if is_rev2 {
if !saw_list {
return Err(Error::Protocol(format!(
"{command_name} completed without the required LIST response ({section})"
)));
}
} else if !saw_recent {
return Err(Error::Protocol(format!(
"{command_name} completed without the required RECENT response ({section})"
)));
}
Ok(())
}
#[derive(Default)]
pub(crate) struct AppendConsumer {
buffered: Vec<UntaggedResponse>,
code: Option<ResponseCode>,
}
impl Consumer for AppendConsumer {
type Output = Option<(u32, u32)>;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Status {
status: UntaggedStatus::Ok,
code: code_opt @ Some(ResponseCode::AppendUid { .. }),
..
} if self.code.is_none() => {
self.code = code_opt;
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Option<(u32, u32)>>, Error> {
let tagged = tagged.require_ok()?;
let code = tagged.code.or(self.code);
let append_uid = match code {
Some(ResponseCode::AppendUid { uid_validity, uids }) => {
uids.first().map(|r| (uid_validity, r.start))
}
_ => None,
};
Ok(Finalized {
output: append_uid,
reclassified_as_events: self.buffered,
})
}
}
#[derive(Default)]
pub(crate) struct MultiAppendConsumer {
buffered: Vec<UntaggedResponse>,
code: Option<ResponseCode>,
}
impl Consumer for MultiAppendConsumer {
type Output = Vec<(u32, u32)>;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Status {
status: UntaggedStatus::Ok,
code: code_opt @ Some(ResponseCode::AppendUid { .. }),
..
} if self.code.is_none() => {
self.code = code_opt;
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Vec<(u32, u32)>>, Error> {
let tagged = tagged.require_ok()?;
let mut results = Vec::new();
let code = tagged.code.or(self.code);
if let Some(ResponseCode::AppendUid { uid_validity, uids }) = code {
for range in &uids {
if let Some(end) = range.end {
for uid in range.start..=end {
results.push((uid_validity, uid));
}
} else {
results.push((uid_validity, range.start));
}
}
}
Ok(Finalized {
output: results,
reclassified_as_events: self.buffered,
})
}
}
use crate::types::FetchResponse;
pub(crate) const DEFAULT_FETCH_WARN_BYTES: usize = 10 * 1024 * 1024;
pub(crate) fn estimate_fetch_response_bytes(fr: &FetchResponse) -> usize {
let mut size: usize = 256;
for bs in &fr.body_sections {
size += bs.data.as_ref().map_or(0, Vec::len);
}
for bin in &fr.binary_sections {
size += bin.data.as_ref().map_or(0, Vec::len);
}
size
}
pub(crate) struct FetchConsumer {
fetches: Vec<FetchResponse>,
buffered: Vec<UntaggedResponse>,
estimated_bytes: usize,
warn_threshold: usize,
warned: bool,
}
impl FetchConsumer {
pub(crate) fn new() -> Self {
Self {
fetches: Vec::new(),
buffered: Vec::new(),
estimated_bytes: 0,
warn_threshold: DEFAULT_FETCH_WARN_BYTES,
warned: false,
}
}
}
impl Consumer for FetchConsumer {
type Output = Vec<FetchResponse>;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
if let UntaggedResponse::Fetch(fr) = resp {
self.estimated_bytes += estimate_fetch_response_bytes(&fr);
if !self.warned && self.estimated_bytes > self.warn_threshold {
tracing::warn!(
estimated_bytes = self.estimated_bytes,
threshold = self.warn_threshold,
"FETCH response buffer exceeds {} MB — consider \
uid_fetch_streaming for large result sets",
self.warn_threshold / (1024 * 1024),
);
self.warned = true;
}
self.fetches.push(*fr);
} else {
self.buffered.push(resp);
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Vec<FetchResponse>>, Error> {
tagged.require_ok()?;
Ok(Finalized {
output: self.fetches,
reclassified_as_events: self.buffered,
})
}
}
pub(crate) struct StreamingFetchConsumer {
tx: tokio::sync::mpsc::Sender<Result<FetchResponse, Error>>,
ambiguous_buffer: Vec<UntaggedResponse>,
}
impl StreamingFetchConsumer {
pub(crate) fn new(tx: tokio::sync::mpsc::Sender<Result<FetchResponse, Error>>) -> Self {
Self {
tx,
ambiguous_buffer: Vec::new(),
}
}
}
impl Consumer for StreamingFetchConsumer {
type Output = ();
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
if let UntaggedResponse::Fetch(fr) = resp {
let _ = self.tx.try_send(Ok(*fr));
} else {
self.ambiguous_buffer.push(resp);
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<()>, Error> {
tagged.require_ok()?;
Ok(Finalized {
output: (),
reclassified_as_events: self.ambiguous_buffer,
})
}
}
use crate::types::StoreResult;
pub(crate) struct StoreConsumer {
fetches: Vec<FetchResponse>,
buffered: Vec<UntaggedResponse>,
}
impl StoreConsumer {
pub(crate) fn new() -> Self {
Self {
fetches: Vec::new(),
buffered: Vec::new(),
}
}
}
impl Consumer for StoreConsumer {
type Output = StoreResult;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
if let UntaggedResponse::Fetch(fr) = resp {
self.fetches.push(*fr);
} else {
self.buffered.push(resp);
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<StoreResult>, Error> {
let tagged = tagged.require_ok()?;
Ok(Finalized {
output: StoreResult {
fetches: self.fetches,
code: tagged.code,
},
reclassified_as_events: self.buffered,
})
}
}
use crate::types::validated::ParsedUidSet;
use crate::types::UidRange;
pub(crate) struct FetchVanishedConsumer {
fetches: Vec<FetchResponse>,
vanished_uids: Vec<UidRange>,
requested_set: Option<ParsedUidSet>,
dropped_vanished_count: usize,
buffered: Vec<UntaggedResponse>,
estimated_bytes: usize,
warn_threshold: usize,
warned: bool,
}
impl FetchVanishedConsumer {
pub(crate) fn new(requested_set: Option<ParsedUidSet>) -> Self {
Self {
fetches: Vec::new(),
vanished_uids: Vec::new(),
requested_set,
dropped_vanished_count: 0,
buffered: Vec::new(),
estimated_bytes: 0,
warn_threshold: DEFAULT_FETCH_WARN_BYTES,
warned: false,
}
}
}
impl Consumer for FetchVanishedConsumer {
type Output = (Vec<FetchResponse>, Vec<UidRange>);
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Fetch(fr) => {
self.estimated_bytes += estimate_fetch_response_bytes(&fr);
if !self.warned && self.estimated_bytes > self.warn_threshold {
tracing::warn!(
estimated_bytes = self.estimated_bytes,
threshold = self.warn_threshold,
"FETCH response buffer exceeds {} MB — consider \
uid_fetch_streaming for large result sets",
self.warn_threshold / (1024 * 1024),
);
self.warned = true;
}
self.fetches.push(*fr);
}
UntaggedResponse::Vanished {
earlier: true,
uids,
} => {
if let Some(ref set) = self.requested_set {
let (filtered, dropped) = set.intersect_uid_ranges(&uids);
self.dropped_vanished_count += dropped;
self.vanished_uids.extend(filtered);
} else {
self.vanished_uids.extend(uids);
}
}
_ => {
self.buffered.push(resp);
}
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<(Vec<FetchResponse>, Vec<UidRange>)>, Error> {
tagged.require_ok()?;
if self.dropped_vanished_count > 0 {
tracing::debug!(
dropped = self.dropped_vanished_count,
"filtered out-of-set VANISHED (EARLIER) UIDs per RFC 7162 Section 3.2.6",
);
}
Ok(Finalized {
output: (self.fetches, self.vanished_uids),
reclassified_as_events: self.buffered,
})
}
}
use crate::types::{MailboxInfo, StatusItem, StatusResult};
pub(crate) struct ListConsumer {
mailboxes: Vec<MailboxInfo>,
marker_events: Vec<UntaggedResponse>,
buffered: Vec<UntaggedResponse>,
}
impl ListConsumer {
pub(crate) fn new() -> Self {
Self {
mailboxes: Vec::new(),
marker_events: Vec::new(),
buffered: Vec::new(),
}
}
}
impl Consumer for ListConsumer {
type Output = Result<Vec<MailboxInfo>, Error>;
fn on_response(
&mut self,
resp: UntaggedResponse,
notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::List(info) => {
if notify_snapshot.list && super::is_notify_list_event(&info, true) {
self.marker_events.push(UntaggedResponse::List(info));
} else {
self.mailboxes.push(info);
}
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Result<Vec<MailboxInfo>, Error>>, Error> {
let mut reclassified = self.marker_events;
reclassified.extend(self.buffered);
match tagged.require_ok() {
Ok(_) => Ok(Finalized {
output: Ok(self.mailboxes),
reclassified_as_events: reclassified,
}),
Err(e) => {
Ok(Finalized {
output: Err(e),
reclassified_as_events: reclassified,
})
}
}
}
}
#[derive(Default)]
pub(crate) struct LsubConsumer {
mailboxes: Vec<MailboxInfo>,
buffered: Vec<UntaggedResponse>,
}
impl Consumer for LsubConsumer {
type Output = Vec<MailboxInfo>;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Lsub(info) => {
self.mailboxes.push(info);
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Vec<MailboxInfo>>, Error> {
tagged.require_ok()?;
Ok(Finalized {
output: self.mailboxes,
reclassified_as_events: self.buffered,
})
}
}
pub(crate) struct ListExtendedConsumer {
filter_extended: bool,
selection_options: Vec<String>,
mailboxes: Vec<MailboxInfo>,
marker_events: Vec<UntaggedResponse>,
mismatch_events: Vec<UntaggedResponse>,
buffered: Vec<UntaggedResponse>,
}
impl ListExtendedConsumer {
pub(crate) fn new(filter_extended: bool, selection_options: Vec<String>) -> Self {
Self {
filter_extended,
selection_options,
mailboxes: Vec::new(),
marker_events: Vec::new(),
mismatch_events: Vec::new(),
buffered: Vec::new(),
}
}
}
impl Consumer for ListExtendedConsumer {
type Output = Result<Vec<MailboxInfo>, Error>;
fn on_response(
&mut self,
resp: UntaggedResponse,
notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::List(info) => {
if notify_snapshot.list {
if super::is_notify_list_event(&info, self.filter_extended) {
self.marker_events.push(UntaggedResponse::List(info));
return;
}
let opts: Vec<&str> =
self.selection_options.iter().map(String::as_str).collect();
if super::is_notify_selection_mismatch(&info, &opts) {
self.mismatch_events.push(UntaggedResponse::List(info));
return;
}
}
self.mailboxes.push(info);
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Result<Vec<MailboxInfo>, Error>>, Error> {
let mut reclassified = self.marker_events;
reclassified.extend(self.mismatch_events);
reclassified.extend(self.buffered);
match tagged.require_ok() {
Ok(_) => Ok(Finalized {
output: Ok(self.mailboxes),
reclassified_as_events: reclassified,
}),
Err(e) => {
Ok(Finalized {
output: Err(e),
reclassified_as_events: reclassified,
})
}
}
}
}
pub(crate) struct ListStatusConsumer {
results: Vec<(MailboxInfo, Option<Vec<StatusItem>>)>,
pending_status: Vec<(MailboxName, Vec<StatusItem>)>,
marker_events: Vec<UntaggedResponse>,
buffered: Vec<UntaggedResponse>,
}
impl ListStatusConsumer {
pub(crate) fn new() -> Self {
Self {
results: Vec::new(),
pending_status: Vec::new(),
marker_events: Vec::new(),
buffered: Vec::new(),
}
}
}
impl Consumer for ListStatusConsumer {
type Output = Result<Vec<(MailboxInfo, Vec<StatusItem>)>, Error>;
fn on_response(
&mut self,
resp: UntaggedResponse,
notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::List(info) => {
if notify_snapshot.list && super::is_notify_list_event(&info, true) {
self.marker_events.push(UntaggedResponse::List(info));
return;
}
self.results.push((info, None));
}
UntaggedResponse::MailboxStatus { mailbox, items } => {
if let Some((_, ref mut status)) = self
.results
.iter_mut()
.find(|(mb, s)| s.is_none() && inbox_eq(mb.name.as_str(), mailbox.as_str()))
{
*status = Some(items);
} else {
self.pending_status.push((mailbox, items));
}
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Result<Vec<(MailboxInfo, Vec<StatusItem>)>, Error>>, Error> {
let mut reclassified = self.marker_events;
reclassified.extend(self.buffered);
if let Err(e) = tagged.require_ok() {
return Ok(Finalized {
output: Err(e),
reclassified_as_events: reclassified,
});
}
let mut results = self.results;
for (decoded, items) in self.pending_status {
if let Some((_, ref mut status)) = results
.iter_mut()
.find(|(mb, s)| s.is_none() && inbox_eq(mb.name.as_str(), decoded.as_str()))
{
*status = Some(items);
}
}
let paired: Vec<(MailboxInfo, Vec<StatusItem>)> = results
.into_iter()
.map(|(mb, status)| {
let items = status.unwrap_or_default();
(mb, items)
})
.collect();
Ok(Finalized {
output: Ok(paired),
reclassified_as_events: reclassified,
})
}
}
pub(crate) struct StatusConsumer {
matching: Vec<(UntaggedResponse, bool)>,
buffered: Vec<UntaggedResponse>,
}
impl StatusConsumer {
pub(crate) fn new() -> Self {
Self {
matching: Vec::new(),
buffered: Vec::new(),
}
}
}
impl Consumer for StatusConsumer {
type Output = StatusResult;
fn on_response(
&mut self,
resp: UntaggedResponse,
notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::MailboxStatus { .. } => {
self.matching.push((resp, notify_snapshot.status));
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
ctx: &ConsumerContext,
) -> Result<Finalized<StatusResult>, Error> {
tagged.require_ok()?;
let mut matching = self.matching;
let Some((last_resp, _)) = matching.pop() else {
let target = ctx
.command_target()
.map_or_else(|| "<unknown>".to_owned(), |t| t.as_str().to_owned());
return Err(Error::Protocol(format!(
"STATUS OK but no matching untagged STATUS response \
for mailbox '{target}' (RFC 3501 Sections 5.2, 6.3.10)"
)));
};
let UntaggedResponse::MailboxStatus {
items: primary_items,
..
} = last_resp
else {
return Err(Error::Protocol(
"internal: matching predicate returned non-MailboxStatus \
variant"
.into(),
));
};
let mut ambiguous = Vec::new();
let mut non_notify_extras: Vec<UntaggedResponse> = Vec::new();
for (resp, had_notify) in matching {
if had_notify {
if let UntaggedResponse::MailboxStatus { items, .. } = resp {
ambiguous.push(items);
}
} else {
non_notify_extras.push(resp);
}
}
let mut reclassified = self.buffered;
reclassified.extend(non_notify_extras);
Ok(Finalized {
output: StatusResult {
items: primary_items,
ambiguous,
},
reclassified_as_events: reclassified,
})
}
}
use crate::connection::SearchResult;
use crate::types::response::{
AclEntry, EsearchResponse, ListRightsResponse, MetadataResult, QuotaResource,
QuotaRootResponse, ThreadNode,
};
use crate::types::{CopyResult, ExpungeResult, MoveResult};
pub(crate) struct QuotaConsumer {
root: String,
result: Option<Vec<QuotaResource>>,
buffered: Vec<UntaggedResponse>,
}
impl QuotaConsumer {
pub(crate) fn new(root: String) -> Self {
Self {
root,
result: None,
buffered: Vec::new(),
}
}
}
impl Consumer for QuotaConsumer {
type Output = Vec<QuotaResource>;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Quota { root, resources }
if root == self.root && self.result.is_none() =>
{
self.result = Some(resources);
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Vec<QuotaResource>>, Error> {
tagged.require_ok()?;
let resources = self.result.ok_or_else(|| {
Error::Protocol(format!(
"server sent OK but no QUOTA response for root '{}' \
(RFC 2087 Section 4.2)",
self.root,
))
})?;
Ok(Finalized {
output: resources,
reclassified_as_events: self.buffered,
})
}
}
pub(crate) struct QuotaRootConsumer {
mailbox: String,
roots: Option<Vec<String>>,
quotas: Vec<(String, Vec<QuotaResource>)>,
buffered: Vec<UntaggedResponse>,
}
impl QuotaRootConsumer {
pub(crate) fn new(mailbox: String) -> Self {
Self {
mailbox,
roots: None,
quotas: Vec::new(),
buffered: Vec::new(),
}
}
}
impl Consumer for QuotaRootConsumer {
type Output = QuotaRootResponse;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::QuotaRoot { mailbox, roots }
if inbox_eq(&self.mailbox, mailbox.as_str()) && self.roots.is_none() =>
{
self.roots = Some(roots);
}
UntaggedResponse::Quota { root, resources } => {
self.quotas.push((root, resources));
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<QuotaRootResponse>, Error> {
tagged.require_ok()?;
let roots = self.roots.ok_or_else(|| {
Error::Protocol(format!(
"server sent OK but no QUOTAROOT response for mailbox '{}' \
(RFC 2087 Section 4.3)",
self.mailbox,
))
})?;
let mut resources: Vec<(String, Vec<QuotaResource>)> = Vec::new();
let mut buffered = self.buffered;
for (root, res) in self.quotas {
if roots.iter().any(|expected| expected == &root) {
resources.push((root, res));
} else {
buffered.push(UntaggedResponse::Quota {
root,
resources: res,
});
}
}
if roots.is_empty() {
return Ok(Finalized {
output: QuotaRootResponse { roots, resources },
reclassified_as_events: buffered,
});
}
if resources.is_empty() {
return Err(Error::Protocol(format!(
"server sent OK but no QUOTA response for QUOTAROOT mailbox \
'{}' (RFC 2087 Section 4.3)",
self.mailbox,
)));
}
Ok(Finalized {
output: QuotaRootResponse { roots, resources },
reclassified_as_events: buffered,
})
}
}
pub(crate) struct AclConsumer {
mailbox: String,
result: Option<Vec<AclEntry>>,
buffered: Vec<UntaggedResponse>,
}
impl AclConsumer {
pub(crate) fn new(mailbox: String) -> Self {
Self {
mailbox,
result: None,
buffered: Vec::new(),
}
}
}
impl Consumer for AclConsumer {
type Output = Vec<AclEntry>;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Acl { mailbox, entries }
if inbox_eq(&self.mailbox, mailbox.as_str()) && self.result.is_none() =>
{
self.result = Some(entries);
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Vec<AclEntry>>, Error> {
tagged.require_ok()?;
let entries = self.result.ok_or_else(|| {
Error::Protocol(format!(
"server sent OK but no ACL response for mailbox '{}' \
(RFC 4314 Section 3.3)",
self.mailbox,
))
})?;
Ok(Finalized {
output: entries,
reclassified_as_events: self.buffered,
})
}
}
pub(crate) struct ListRightsConsumer {
mailbox: String,
identifier: String,
result: Option<ListRightsResponse>,
buffered: Vec<UntaggedResponse>,
}
impl ListRightsConsumer {
pub(crate) fn new(mailbox: String, identifier: String) -> Self {
Self {
mailbox,
identifier,
result: None,
buffered: Vec::new(),
}
}
}
impl Consumer for ListRightsConsumer {
type Output = ListRightsResponse;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::ListRights {
mailbox,
identifier,
required,
optional,
} if inbox_eq(&self.mailbox, mailbox.as_str())
&& identifier == self.identifier
&& self.result.is_none() =>
{
self.result = Some(ListRightsResponse { required, optional });
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<ListRightsResponse>, Error> {
tagged.require_ok()?;
let result = self.result.ok_or_else(|| {
Error::Protocol(format!(
"server sent OK but no LISTRIGHTS response for mailbox '{}' \
and identifier '{}' (RFC 4314 Section 3.4)",
self.mailbox, self.identifier,
))
})?;
Ok(Finalized {
output: result,
reclassified_as_events: self.buffered,
})
}
}
pub(crate) struct MyRightsConsumer {
mailbox: String,
result: Option<String>,
buffered: Vec<UntaggedResponse>,
}
impl MyRightsConsumer {
pub(crate) fn new(mailbox: String) -> Self {
Self {
mailbox,
result: None,
buffered: Vec::new(),
}
}
}
impl Consumer for MyRightsConsumer {
type Output = String;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::MyRights { mailbox, rights }
if inbox_eq(&self.mailbox, mailbox.as_str()) && self.result.is_none() =>
{
self.result = Some(rights);
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<String>, Error> {
tagged.require_ok()?;
let rights = self.result.ok_or_else(|| {
Error::Protocol(format!(
"server sent OK but no MYRIGHTS response for mailbox '{}' \
(RFC 4314 Section 3.5)",
self.mailbox,
))
})?;
Ok(Finalized {
output: rights,
reclassified_as_events: self.buffered,
})
}
}
pub(crate) struct MetadataConsumer {
mailbox: String,
entries: Vec<crate::types::response::MetadataEntry>,
notify_ambiguity: bool,
saw_matching: bool,
buffered: Vec<UntaggedResponse>,
}
impl MetadataConsumer {
pub(crate) fn new(mailbox: String) -> Self {
Self {
mailbox,
entries: Vec::new(),
notify_ambiguity: false,
saw_matching: false,
buffered: Vec::new(),
}
}
}
impl Consumer for MetadataConsumer {
type Output = MetadataResult;
fn on_response(
&mut self,
resp: UntaggedResponse,
notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Metadata { mailbox, entries }
if inbox_eq(&self.mailbox, mailbox.as_str()) =>
{
if notify_snapshot.metadata {
self.notify_ambiguity = true;
}
self.saw_matching = true;
self.entries.extend(entries);
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<MetadataResult>, Error> {
tagged.require_ok()?;
if !self.saw_matching {
return Err(Error::Protocol(
"server completed GETMETADATA without the required METADATA \
response for the requested mailbox (RFC 5464 Section 4.2)"
.into(),
));
}
Ok(Finalized {
output: MetadataResult {
entries: self.entries,
notify_ambiguity: self.notify_ambiguity,
},
reclassified_as_events: self.buffered,
})
}
}
#[derive(Default)]
pub(crate) struct ThreadConsumer {
result: Option<Vec<ThreadNode>>,
buffered: Vec<UntaggedResponse>,
}
impl Consumer for ThreadConsumer {
type Output = Vec<ThreadNode>;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Thread(threads) if self.result.is_none() => {
self.result = Some(threads);
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Vec<ThreadNode>>, Error> {
tagged.require_ok()?;
let threads = self.result.unwrap_or_default();
Ok(Finalized {
output: threads,
reclassified_as_events: self.buffered,
})
}
}
#[derive(Default)]
pub(crate) struct SortConsumer {
result: Option<(Vec<u32>, Option<u64>)>,
buffered: Vec<UntaggedResponse>,
}
impl Consumer for SortConsumer {
type Output = SearchResult;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Sort { nums, mod_seq } if self.result.is_none() => {
self.result = Some((nums, mod_seq));
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<SearchResult>, Error> {
tagged.require_ok()?;
let (ids, mod_seq) = self.result.unwrap_or_default();
Ok(Finalized {
output: SearchResult {
ids,
mod_seq,
truncated: false,
},
reclassified_as_events: self.buffered,
})
}
}
#[derive(Default)]
pub(crate) struct NotifySetConsumer {
saw_overflow: bool,
buffered: Vec<UntaggedResponse>,
}
impl Consumer for NotifySetConsumer {
type Output = Result<bool, Error>;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
if matches!(
&resp,
UntaggedResponse::Status {
code: Some(ResponseCode::NotificationOverflow(_)),
..
}
) {
self.saw_overflow = true;
}
self.buffered.push(resp);
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Result<bool, Error>>, Error> {
match tagged.status {
StatusKind::Ok => {
let overflow = self.saw_overflow
|| matches!(tagged.code, Some(ResponseCode::NotificationOverflow(_)));
Ok(Finalized {
output: Ok(overflow),
reclassified_as_events: self.buffered,
})
}
StatusKind::No => Ok(Finalized {
output: Err(Error::no_with_code(tagged.text, tagged.code)),
reclassified_as_events: self.buffered,
}),
StatusKind::Bad => Ok(Finalized {
output: Err(Error::bad_with_code(tagged.text, tagged.code)),
reclassified_as_events: self.buffered,
}),
}
}
}
pub(crate) struct SearchConsumer {
tag_correlated: Vec<EsearchResponse>,
tagless_esearch: Vec<EsearchResponse>,
search_responses: Vec<(Vec<u32>, Option<u64>)>,
buffered: Vec<UntaggedResponse>,
}
impl SearchConsumer {
pub(crate) fn new() -> Self {
Self {
tag_correlated: Vec::new(),
tagless_esearch: Vec::new(),
search_responses: Vec::new(),
buffered: Vec::new(),
}
}
fn drain_all_into_buffered(&mut self) {
for e in self.tag_correlated.drain(..) {
self.buffered.push(UntaggedResponse::Esearch(e));
}
for e in self.tagless_esearch.drain(..) {
self.buffered.push(UntaggedResponse::Esearch(e));
}
for (uids, mod_seq) in self.search_responses.drain(..) {
self.buffered
.push(UntaggedResponse::Search { uids, mod_seq });
}
}
}
impl Consumer for SearchConsumer {
type Output = Result<SearchResult, Error>;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Esearch(e) if e.tag.as_deref() == Some(ctx.command_tag()) => {
self.tag_correlated.push(e);
}
UntaggedResponse::Esearch(e) if e.tag.is_none() => {
self.tagless_esearch.push(e);
}
UntaggedResponse::Search { uids, mod_seq } => {
self.search_responses.push((uids, mod_seq));
}
other => self.buffered.push(other),
}
}
fn finalize(
mut self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Result<SearchResult, Error>>, Error> {
if let Err(e) = tagged.require_ok() {
self.drain_all_into_buffered();
return Ok(Finalized {
output: Err(e),
reclassified_as_events: self.buffered,
});
}
if let Some(esearch) = self.tag_correlated.first() {
let (ids, truncated) = super::expand_uid_ranges(&esearch.all);
let result = SearchResult {
ids,
mod_seq: esearch.mod_seq,
truncated,
};
let mut buffered = self.buffered;
for e in self.tag_correlated.into_iter().skip(1) {
buffered.push(UntaggedResponse::Esearch(e));
}
for e in self.tagless_esearch {
buffered.push(UntaggedResponse::Esearch(e));
}
for (uids, mod_seq) in self.search_responses {
buffered.push(UntaggedResponse::Search { uids, mod_seq });
}
return Ok(Finalized {
output: Ok(result),
reclassified_as_events: buffered,
});
}
if let Some(esearch) = self.tagless_esearch.first() {
let (ids, truncated) = super::expand_uid_ranges(&esearch.all);
let result = SearchResult {
ids,
mod_seq: esearch.mod_seq,
truncated,
};
let mut buffered = self.buffered;
for e in self.tagless_esearch.into_iter().skip(1) {
buffered.push(UntaggedResponse::Esearch(e));
}
for (uids, mod_seq) in self.search_responses {
buffered.push(UntaggedResponse::Search { uids, mod_seq });
}
return Ok(Finalized {
output: Ok(result),
reclassified_as_events: buffered,
});
}
let mut search_iter = self.search_responses.into_iter();
if let Some((uids, mod_seq)) = search_iter.next() {
let mut buffered = self.buffered;
for (uids, mod_seq) in search_iter {
buffered.push(UntaggedResponse::Search { uids, mod_seq });
}
return Ok(Finalized {
output: Ok(SearchResult {
ids: uids,
mod_seq,
truncated: false,
}),
reclassified_as_events: buffered,
});
}
Ok(Finalized {
output: Err(Error::Protocol(
"SEARCH OK but no untagged SEARCH/ESEARCH response \
(RFC 3501 Section 6.4.4)"
.into(),
)),
reclassified_as_events: self.buffered,
})
}
}
pub(crate) struct EsearchConsumer {
result: Option<EsearchResponse>,
buffered: Vec<UntaggedResponse>,
}
impl EsearchConsumer {
pub(crate) fn new() -> Self {
Self {
result: None,
buffered: Vec::new(),
}
}
}
impl Consumer for EsearchConsumer {
type Output = Result<EsearchResponse, Error>;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Esearch(e)
if self.result.is_none()
&& (e.tag.is_none() || e.tag.as_deref() == Some(ctx.command_tag())) =>
{
self.result = Some(e);
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Result<EsearchResponse, Error>>, Error> {
let buffered = self.buffered;
if let Err(e) = tagged.require_ok() {
return Ok(Finalized {
output: Err(e),
reclassified_as_events: buffered,
});
}
let output = self.result.ok_or_else(|| {
Error::Protocol(
"SEARCH RETURN OK but no ESEARCH response \
(RFC 4731 Section 3.1)"
.into(),
)
});
Ok(Finalized {
output,
reclassified_as_events: buffered,
})
}
}
pub(crate) struct SearchSaveConsumer {
buffered: Vec<UntaggedResponse>,
}
impl SearchSaveConsumer {
pub(crate) fn new() -> Self {
Self {
buffered: Vec::new(),
}
}
}
impl Consumer for SearchSaveConsumer {
type Output = Result<(), Error>;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Search { .. } => {}
UntaggedResponse::Esearch(e)
if e.tag.is_none() || e.tag.as_deref() == Some(ctx.command_tag()) => {}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Result<(), Error>>, Error> {
Ok(Finalized {
output: tagged.require_ok().map(|_| ()),
reclassified_as_events: self.buffered,
})
}
}
pub(crate) struct CopyConsumer {
buffered: Vec<UntaggedResponse>,
code: Option<ResponseCode>,
}
impl CopyConsumer {
pub(crate) fn new() -> Self {
Self {
buffered: Vec::new(),
code: None,
}
}
}
impl Consumer for CopyConsumer {
type Output = CopyResult;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Status {
status: UntaggedStatus::Ok,
code: code_opt @ Some(ResponseCode::CopyUid { .. }),
..
} if self.code.is_none() => {
self.code = code_opt;
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<CopyResult>, Error> {
let tagged = tagged.require_ok()?;
Ok(Finalized {
output: CopyResult {
code: tagged.code.or(self.code),
},
reclassified_as_events: self.buffered,
})
}
}
pub(crate) struct MoveConsumer {
expunged: Vec<u32>,
vanished: Vec<UidRange>,
buffered: Vec<UntaggedResponse>,
code: Option<ResponseCode>,
}
impl MoveConsumer {
pub(crate) fn new() -> Self {
Self {
expunged: Vec::new(),
vanished: Vec::new(),
buffered: Vec::new(),
code: None,
}
}
}
impl Consumer for MoveConsumer {
type Output = MoveResult;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Expunge(n) => {
self.expunged.push(n);
}
UntaggedResponse::Vanished { uids, .. } => {
self.vanished.extend(uids);
}
UntaggedResponse::Status {
status: UntaggedStatus::Ok,
code: code_opt @ Some(ResponseCode::CopyUid { .. }),
..
} if self.code.is_none() => {
self.code = code_opt;
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
ctx: &ConsumerContext,
) -> Result<Finalized<MoveResult>, Error> {
let tagged = tagged.require_ok()?;
let expunged = if ctx.enabled().iter().any(|e| e == "QRESYNC") {
ExpungeResult::Vanished(self.vanished)
} else {
ExpungeResult::Expunged(self.expunged)
};
Ok(Finalized {
output: MoveResult {
code: tagged.code.or(self.code),
expunged,
},
reclassified_as_events: self.buffered,
})
}
}
pub(crate) struct ExpungeConsumer {
expunged: Vec<u32>,
vanished: Vec<UidRange>,
buffered: Vec<UntaggedResponse>,
}
impl ExpungeConsumer {
pub(crate) fn new() -> Self {
Self {
expunged: Vec::new(),
vanished: Vec::new(),
buffered: Vec::new(),
}
}
}
impl Consumer for ExpungeConsumer {
type Output = ExpungeResult;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Expunge(n) => {
self.expunged.push(n);
}
UntaggedResponse::Vanished { uids, .. } => {
self.vanished.extend(uids);
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
ctx: &ConsumerContext,
) -> Result<Finalized<ExpungeResult>, Error> {
tagged.require_ok()?;
let result = if ctx.enabled().iter().any(|e| e == "QRESYNC") {
ExpungeResult::Vanished(self.vanished)
} else {
ExpungeResult::Expunged(self.expunged)
};
Ok(Finalized {
output: result,
reclassified_as_events: self.buffered,
})
}
}
#[derive(Default)]
pub(crate) struct IdConsumer {
pairs: Option<Vec<(String, Option<String>)>>,
buffered: Vec<UntaggedResponse>,
}
impl Consumer for IdConsumer {
type Output = Vec<(String, Option<String>)>;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Id(pairs) if self.pairs.is_none() => {
self.pairs = Some(pairs);
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Vec<(String, Option<String>)>>, Error> {
tagged.require_ok()?;
let pairs = self.pairs.ok_or_else(|| {
Error::Protocol("ID OK but no untagged ID response (RFC 2971 Section 3.2)".into())
})?;
Ok(Finalized {
output: pairs,
reclassified_as_events: self.buffered,
})
}
}
#[derive(Default)]
pub(crate) struct NamespaceConsumer {
namespace: Option<(
Vec<crate::types::NamespaceDescriptor>,
Vec<crate::types::NamespaceDescriptor>,
Vec<crate::types::NamespaceDescriptor>,
)>,
buffered: Vec<UntaggedResponse>,
}
impl Consumer for NamespaceConsumer {
type Output = crate::types::NamespaceResponse;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Namespace {
personal,
other,
shared,
} if self.namespace.is_none() => {
self.namespace = Some((personal, other, shared));
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<crate::types::NamespaceResponse>, Error> {
tagged.require_ok()?;
let (personal, other, shared) = self.namespace.ok_or_else(|| {
Error::Protocol(
"NAMESPACE OK but no untagged NAMESPACE response (RFC 2342 Section 5)".into(),
)
})?;
Ok(Finalized {
output: crate::types::NamespaceResponse {
personal,
other,
shared,
},
reclassified_as_events: self.buffered,
})
}
}
#[derive(Default)]
pub(crate) struct EnableConsumer {
caps: Option<Vec<String>>,
buffered: Vec<UntaggedResponse>,
}
impl Consumer for EnableConsumer {
type Output = Vec<String>;
fn on_response(
&mut self,
resp: UntaggedResponse,
_notify_snapshot: NotifyFlags,
_ctx: &ConsumerContext,
) {
match resp {
UntaggedResponse::Enabled(exts) if self.caps.is_none() => {
self.caps = Some(exts);
}
other => self.buffered.push(other),
}
}
fn finalize(
self: Box<Self>,
tagged: TaggedResponse,
_ctx: &ConsumerContext,
) -> Result<Finalized<Vec<String>>, Error> {
tagged.require_ok()?;
let exts = self.caps.unwrap_or_else(|| {
tracing::warn!(
"server omitted ENABLED response (RFC 5161 Section 3.2) \
— treating as empty"
);
Vec::new()
});
Ok(Finalized {
output: exts,
reclassified_as_events: self.buffered,
})
}
}
#[cfg(test)]
#[path = "dispatch_tests.rs"]
mod tests;