Skip to main content

daaki_imap/connection/pipeline/
mod.rs

1//! Pipeline builder for batching multiple pipelinable IMAP commands.
2//!
3//! Only commands that are safe to pipeline per RFC 3501 §5.5 have
4//! methods on [`Pipeline`]. State-changing commands (SELECT, STARTTLS,
5//! IDLE, LOGOUT, etc.) intentionally lack pipeline methods.
6
7use crate::types::{
8    Flag, MailboxAttribute, MailboxName, NotifySetParams, SequenceSet, StoreOperation,
9};
10
11// ============================================================================
12// Pipeline builder with typed handle tuple
13// ============================================================================
14
15use std::any::Any;
16use std::marker::PhantomData;
17
18use crate::error::Error;
19use crate::types::response::{
20    AclEntry, EsearchResponse, ListRightsResponse, MetadataResult, NamespaceResponse,
21    QuotaResource, QuotaRootResponse, ThreadNode, UidRange,
22};
23use crate::types::validated::ParsedUidSet;
24use crate::types::{
25    Capability, Command, CopyResult, ExpungeResult, FetchResponse, MoveResult, StatusItem,
26    StatusResult, StoreResult,
27};
28
29use super::dispatch;
30use super::driver::ConsumerErased;
31use super::MailboxInfo;
32use super::SearchResult;
33
34// ---------------------------------------------------------------------------
35// PipelineError
36// ---------------------------------------------------------------------------
37
38/// Error returned by [`Pipeline::execute`] and [`Pipeline::execute_dynamic`].
39///
40/// Distinguished from [`Error`] because pipeline execution has three
41/// failure classes: per-command errors (embedded in the result tuple as
42/// `Result<T, Error>`), pipeline-level driver errors (encoding failure
43/// that aborts the whole batch), and type-mismatch bugs (internal only).
44#[derive(Debug)]
45pub enum PipelineError {
46    /// The driver task has exited — the command channel is closed.
47    Disconnected,
48    /// The driver returned a pipeline-level error (e.g., encoding
49    /// failure that aborted the entire batch before any bytes were
50    /// written to the wire).
51    Driver(Error),
52    /// Internal error: a consumer returned a type that does not match
53    /// the expected downcast target. This indicates a bug in the
54    /// pipeline builder's command-to-consumer mapping.
55    TypeMismatch {
56        /// Zero-based index of the command whose output could not be
57        /// downcast.
58        index: usize,
59    },
60}
61
62impl std::fmt::Display for PipelineError {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        match self {
65            Self::Disconnected => write!(f, "pipeline: driver task disconnected"),
66            Self::Driver(e) => write!(f, "pipeline: driver error: {e}"),
67            Self::TypeMismatch { index } => {
68                write!(f, "pipeline: type mismatch at command index {index}")
69            }
70        }
71    }
72}
73
74impl std::error::Error for PipelineError {
75    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
76        match self {
77            Self::Driver(e) => Some(e),
78            _ => None,
79        }
80    }
81}
82
83// ---------------------------------------------------------------------------
84// UnfoldTuple
85// ---------------------------------------------------------------------------
86
87/// Trait for converting a `Vec<Result<Box<dyn Any + Send>, Error>>`
88/// (the driver's per-command results) into a flat, typed tuple.
89///
90/// Each impl maps a nested type-level list `(C, (B, (A, ())))` to its
91/// corresponding flat tuple `(Result<A, Error>, Result<B, Error>, Result<C, Error>)`.
92/// The nesting order matches the push order: the first command pushed
93/// is the innermost type, and results are extracted in push order from
94/// index 0 upward.
95///
96/// Impls are provided for 0 through 8 commands.
97pub trait UnfoldTuple {
98    /// The flat tuple type produced by unfolding.
99    type Output;
100
101    /// Consume the results vec and produce the typed tuple.
102    ///
103    /// Returns `Err(PipelineError::TypeMismatch)` if any `Ok` result
104    /// fails to downcast to the expected type. Per-command `Err` values
105    /// are preserved as-is in the tuple elements.
106    fn unfold(
107        results: Vec<Result<Box<dyn Any + Send>, Error>>,
108    ) -> Result<Self::Output, PipelineError>;
109}
110
111/// Downcast a single result entry, preserving per-command errors.
112fn downcast_result<T: Any + Send>(
113    result: Result<Box<dyn Any + Send>, Error>,
114    index: usize,
115) -> Result<Result<T, Error>, PipelineError> {
116    match result {
117        Ok(boxed) => match boxed.downcast::<T>() {
118            Ok(val) => Ok(Ok(*val)),
119            Err(_) => Err(PipelineError::TypeMismatch { index }),
120        },
121        Err(e) => Ok(Err(e)),
122    }
123}
124
125/// Empty pipeline — no commands.
126impl UnfoldTuple for () {
127    type Output = ();
128    fn unfold(results: Vec<Result<Box<dyn Any + Send>, Error>>) -> Result<(), PipelineError> {
129        if !results.is_empty() {
130            return Err(PipelineError::TypeMismatch { index: 0 });
131        }
132        Ok(())
133    }
134}
135
136/// Generate `UnfoldTuple` impls for nested type-level lists up to 8 elements.
137///
138/// Each invocation maps a nested type `(TN, (... (T0, ())))` to a flat
139/// tuple `(Result<T0, Error>, ..., Result<TN, Error>)`.
140macro_rules! impl_unfold_tuple {
141    ($nested:ty, $count:literal, [$($idx:literal : $T:ident),+ $(,)?]) => {
142        impl<$($T: Any + Send),+> UnfoldTuple for $nested {
143            type Output = ($(Result<$T, Error>,)+);
144
145            fn unfold(
146                results: Vec<Result<Box<dyn Any + Send>, Error>>,
147            ) -> Result<Self::Output, PipelineError> {
148                if results.len() != $count {
149                    return Err(PipelineError::TypeMismatch { index: 0 });
150                }
151                let mut iter = results.into_iter();
152                Ok(($(
153                    downcast_result::<$T>(
154                        iter.next().ok_or(PipelineError::TypeMismatch { index: $idx })?,
155                        $idx,
156                    )?,
157                )+))
158            }
159        }
160    };
161}
162
163// 1 command: Accumulated = (A, ())
164impl_unfold_tuple!((A, ()), 1, [0: A]);
165// 2 commands: Accumulated = (B, (A, ()))
166impl_unfold_tuple!((B, (A, ())), 2, [0: A, 1: B]);
167// 3 commands: Accumulated = (C, (B, (A, ())))
168impl_unfold_tuple!((C, (B, (A, ()))), 3, [0: A, 1: B, 2: C]);
169// 4 commands
170impl_unfold_tuple!((D, (C, (B, (A, ())))), 4, [0: A, 1: B, 2: C, 3: D]);
171// 5 commands
172impl_unfold_tuple!((E, (D, (C, (B, (A, ()))))), 5, [0: A, 1: B, 2: C, 3: D, 4: E]);
173// 6 commands
174impl_unfold_tuple!((F, (E, (D, (C, (B, (A, ())))))), 6, [0: A, 1: B, 2: C, 3: D, 4: E, 5: F]);
175// 7 commands
176impl_unfold_tuple!(
177    (G, (F, (E, (D, (C, (B, (A, ()))))))),
178    7,
179    [0: A, 1: B, 2: C, 3: D, 4: E, 5: F, 6: G]
180);
181// 8 commands
182impl_unfold_tuple!(
183    (H, (G, (F, (E, (D, (C, (B, (A, ())))))))),
184    8,
185    [0: A, 1: B, 2: C, 3: D, 4: E, 5: F, 6: G, 7: H]
186);
187
188// ---------------------------------------------------------------------------
189// Pipeline builder
190// ---------------------------------------------------------------------------
191
192/// Typed pipeline builder for batching multiple pipelinable IMAP commands.
193///
194/// Built via [`ImapConnection::pipeline`]. Commands are added via
195/// type-specific methods (e.g., [`fetch`](Self::fetch),
196/// [`noop`](Self::noop)). Each method returns a new `Pipeline` with the
197/// command's output type prepended to the `Accumulated` type-level list.
198///
199/// The accumulated type grows as a nested tuple:
200/// `()` → `(A, ())` → `(B, (A, ()))` → etc.
201///
202/// Call [`execute`](Self::execute) to submit the batch and receive a
203/// flat, typed tuple of per-command results, or
204/// [`execute_dynamic`](Self::execute_dynamic) for a `Vec<Result<...>>`.
205///
206/// # Example (illustrative — no wire activity)
207///
208/// ```text
209/// let pipeline = conn.pipeline()
210///     .noop()
211///     .capability();
212/// // pipeline: Pipeline<'_, (Vec<Capability>, ((), ()))>
213/// // execute() would return (Result<(), Error>, Result<Vec<Capability>, Error>)
214/// ```
215pub struct Pipeline<'conn, Accumulated> {
216    conn: &'conn super::ImapConnection,
217    pending: Vec<Box<dyn ConsumerErased>>,
218    commands: Vec<Command>,
219    _marker: PhantomData<Accumulated>,
220}
221
222impl<'conn> Pipeline<'conn, ()> {
223    /// Create a new empty pipeline for the given connection.
224    pub(in crate::connection) fn new(conn: &'conn super::ImapConnection) -> Self {
225        Self {
226            conn,
227            pending: Vec::new(),
228            commands: Vec::new(),
229            _marker: PhantomData,
230        }
231    }
232}
233
234// ---------------------------------------------------------------------------
235// Pipeline command methods
236// ---------------------------------------------------------------------------
237
238/// Helper macro to generate pipeline command methods.
239///
240/// Each invocation adds a public method on `Pipeline<'conn, T>` that
241/// pushes a `Command` variant and its consumer, returning a new pipeline
242/// with the command's output type prepended to the accumulator.
243macro_rules! pipeline_method {
244    (
245        $(#[$meta:meta])*
246        fn $name:ident($($param:ident : $param_ty:ty),* $(,)?) -> $output:ty;
247        command = $cmd:expr;
248        consumer = $consumer:expr;
249    ) => {
250        $(#[$meta])*
251        pub fn $name(mut self $(, $param: $param_ty)*) -> Pipeline<'conn, ($output, T)> {
252            self.commands.push($cmd);
253            self.pending.push(Box::new($consumer) as Box<dyn ConsumerErased>);
254            Pipeline {
255                conn: self.conn,
256                pending: self.pending,
257                commands: self.commands,
258                _marker: PhantomData,
259            }
260        }
261    };
262}
263
264impl<'conn, T> Pipeline<'conn, T> {
265    // =======================================================================
266    // Any-state commands (RFC 3501 §6.1)
267    // =======================================================================
268
269    pipeline_method! {
270        /// NOOP command (RFC 3501 §6.1.2).
271        fn noop() -> ();
272        command = Command::Noop;
273        consumer = dispatch::TaggedOkConsumer::default();
274    }
275
276    pipeline_method! {
277        /// CAPABILITY command (RFC 3501 §6.1.1).
278        fn capability() -> Vec<Capability>;
279        command = Command::Capability;
280        consumer = dispatch::CapabilityConsumer::default();
281    }
282
283    // =======================================================================
284    // Authenticated-state commands (RFC 3501 §6.3)
285    // =======================================================================
286
287    pipeline_method! {
288        /// LIST command (RFC 3501 §6.3.8).
289        fn list(reference: String, pattern: String) -> Result<Vec<MailboxInfo>, Error>;
290        command = Command::List { reference, pattern };
291        consumer = dispatch::ListConsumer::new();
292    }
293
294    /// LIST with selection/return options (RFC 5258 §3, RFC 9051 §6.3.9).
295    ///
296    /// The consumer's NOTIFY marker filter is derived from the
297    /// `selection_options`: when `SUBSCRIBED` is absent, `\NonExistent`
298    /// / `\NoAccess` responses are treated as NOTIFY markers and
299    /// reclassified as events.
300    pub fn list_extended(
301        mut self,
302        selection_options: Vec<String>,
303        reference: String,
304        patterns: Vec<String>,
305        return_options: Vec<String>,
306    ) -> Pipeline<'conn, (Result<Vec<MailboxInfo>, Error>, T)> {
307        // RFC 5258 §3: filter_extended is true when SUBSCRIBED is NOT
308        // in the selection options — those responses are NOTIFY markers.
309        let filter_extended = !selection_options
310            .iter()
311            .any(|o| o.eq_ignore_ascii_case("SUBSCRIBED"));
312        let consumer =
313            dispatch::ListExtendedConsumer::new(filter_extended, selection_options.clone());
314        self.commands.push(Command::ListExtended {
315            selection_options,
316            reference,
317            patterns,
318            return_options,
319        });
320        self.pending
321            .push(Box::new(consumer) as Box<dyn ConsumerErased>);
322        Pipeline {
323            conn: self.conn,
324            pending: self.pending,
325            commands: self.commands,
326            _marker: PhantomData,
327        }
328    }
329
330    pipeline_method! {
331        /// LIST with STATUS return option (RFC 5819 §2).
332        fn list_status(
333            reference: String,
334            pattern: String,
335            status_items: String,
336        ) -> Result<Vec<(MailboxInfo, Vec<StatusItem>)>, Error>;
337        command = Command::ListStatus { reference, pattern, status_items };
338        consumer = dispatch::ListStatusConsumer::new();
339    }
340
341    pipeline_method! {
342        /// LSUB command (RFC 3501 §6.3.9).
343        fn lsub(reference: String, pattern: String) -> Vec<MailboxInfo>;
344        command = Command::Lsub { reference, pattern };
345        consumer = dispatch::LsubConsumer::default();
346    }
347
348    pipeline_method! {
349        /// CREATE command (RFC 3501 §6.3.3).
350        fn create(mailbox: MailboxName) -> Option<String>;
351        command = Command::Create { mailbox };
352        consumer = dispatch::CreateConsumer::default();
353    }
354
355    pipeline_method! {
356        /// CREATE with USE special-use attributes (RFC 6154 §3).
357        fn create_special_use(
358            mailbox: MailboxName,
359            special_use: Vec<MailboxAttribute>,
360        ) -> Option<String>;
361        command = Command::CreateSpecialUse { mailbox, special_use };
362        consumer = dispatch::CreateConsumer::default();
363    }
364
365    pipeline_method! {
366        /// DELETE command (RFC 3501 §6.3.4).
367        fn delete(mailbox: MailboxName) -> ();
368        command = Command::Delete { mailbox };
369        consumer = dispatch::TaggedOkConsumer::default();
370    }
371
372    pipeline_method! {
373        /// RENAME command (RFC 3501 §6.3.5).
374        fn rename(mailbox: MailboxName, new_name: MailboxName) -> ();
375        command = Command::Rename { mailbox, new_name };
376        consumer = dispatch::TaggedOkConsumer::default();
377    }
378
379    pipeline_method! {
380        /// SUBSCRIBE command (RFC 3501 §6.3.6).
381        fn subscribe(mailbox: MailboxName) -> ();
382        command = Command::Subscribe { mailbox };
383        consumer = dispatch::TaggedOkConsumer::default();
384    }
385
386    pipeline_method! {
387        /// UNSUBSCRIBE command (RFC 3501 §6.3.7).
388        fn unsubscribe(mailbox: MailboxName) -> ();
389        command = Command::Unsubscribe { mailbox };
390        consumer = dispatch::TaggedOkConsumer::default();
391    }
392
393    pipeline_method! {
394        /// STATUS command (RFC 3501 §6.3.10).
395        fn status(mailbox: MailboxName, items: String) -> StatusResult;
396        command = Command::Status { mailbox, items };
397        consumer = dispatch::StatusConsumer::new();
398    }
399
400    pipeline_method! {
401        /// NAMESPACE command (RFC 2342).
402        fn namespace() -> NamespaceResponse;
403        command = Command::Namespace;
404        consumer = dispatch::NamespaceConsumer::default();
405    }
406
407    // =======================================================================
408    // Selected-state commands (RFC 3501 §6.4)
409    // =======================================================================
410
411    pipeline_method! {
412        /// CHECK command (RFC 3501 §6.4.1).
413        fn check() -> ();
414        command = Command::Check;
415        consumer = dispatch::TaggedOkConsumer::default();
416    }
417
418    pipeline_method! {
419        /// EXPUNGE command (RFC 3501 §6.4.3).
420        ///
421        /// Pipelining EXPUNGE before sequence-number-based commands
422        /// creates ambiguity because EXPUNGE renumbers messages
423        /// (RFC 3501 §5.5). Callers must ensure no such ambiguity
424        /// exists in their pipeline.
425        fn expunge() -> ExpungeResult;
426        command = Command::Expunge;
427        consumer = dispatch::ExpungeConsumer::new();
428    }
429
430    pipeline_method! {
431        /// SEARCH command (RFC 3501 §6.4.4).
432        fn search(criteria: String) -> Result<SearchResult, Error>;
433        command = Command::Search { criteria };
434        consumer = dispatch::SearchConsumer::new();
435    }
436
437    pipeline_method! {
438        /// SEARCH RETURN command (RFC 4731 §3.2).
439        fn search_return(
440            criteria: String,
441            return_opts: Vec<String>,
442        ) -> Result<EsearchResponse, Error>;
443        command = Command::SearchReturn { criteria, return_opts };
444        consumer = dispatch::EsearchConsumer::new();
445    }
446
447    pipeline_method! {
448        /// SEARCH RETURN (SAVE) command (RFC 5182 §2).
449        fn search_save(criteria: String) -> Result<(), Error>;
450        command = Command::SearchSave { criteria };
451        consumer = dispatch::SearchSaveConsumer::new();
452    }
453
454    pipeline_method! {
455        /// FETCH command (RFC 3501 §6.4.5).
456        fn fetch(
457            sequence_set: SequenceSet,
458            items: String,
459            changed_since: Option<u64>,
460        ) -> Vec<FetchResponse>;
461        command = Command::Fetch { sequence_set, items, changed_since };
462        consumer = dispatch::FetchConsumer::new();
463    }
464
465    pipeline_method! {
466        /// STORE command (RFC 3501 §6.4.6).
467        fn store(
468            sequence_set: SequenceSet,
469            operation: StoreOperation,
470            flags: Vec<Flag>,
471            unchanged_since: Option<u64>,
472        ) -> StoreResult;
473        command = Command::Store { sequence_set, operation, flags, unchanged_since };
474        consumer = dispatch::StoreConsumer::new();
475    }
476
477    pipeline_method! {
478        /// COPY command (RFC 3501 §6.4.7).
479        fn copy(sequence_set: SequenceSet, mailbox: MailboxName) -> CopyResult;
480        command = Command::Copy { sequence_set, mailbox };
481        consumer = dispatch::CopyConsumer::new();
482    }
483
484    pipeline_method! {
485        /// MOVE command (RFC 6851 §3).
486        fn move_messages(sequence_set: SequenceSet, mailbox: MailboxName) -> MoveResult;
487        command = Command::Move { sequence_set, mailbox };
488        consumer = dispatch::MoveConsumer::new();
489    }
490
491    // =======================================================================
492    // UID variants (RFC 3501 §6.4.8)
493    // =======================================================================
494
495    pipeline_method! {
496        /// UID SEARCH command (RFC 3501 §6.4.4).
497        fn uid_search(criteria: String) -> Result<SearchResult, Error>;
498        command = Command::UidSearch { criteria };
499        consumer = dispatch::SearchConsumer::new();
500    }
501
502    pipeline_method! {
503        /// UID SEARCH RETURN command (RFC 4731 §3.2).
504        fn uid_search_return(
505            criteria: String,
506            return_opts: Vec<String>,
507        ) -> Result<EsearchResponse, Error>;
508        command = Command::UidSearchReturn { criteria, return_opts };
509        consumer = dispatch::EsearchConsumer::new();
510    }
511
512    pipeline_method! {
513        /// UID SEARCH RETURN (SAVE) command (RFC 5182 §2).
514        fn uid_search_save(criteria: String) -> Result<(), Error>;
515        command = Command::UidSearchSave { criteria };
516        consumer = dispatch::SearchSaveConsumer::new();
517    }
518
519    /// UID FETCH command (RFC 3501 §6.4.5, RFC 7162 §3.2.6).
520    ///
521    /// Uses [`FetchVanishedConsumer`](dispatch::FetchVanishedConsumer)
522    /// regardless of the `vanished` flag so that VANISHED responses
523    /// are always captured when the server sends them.
524    ///
525    /// Parses the `sequence_set` into a [`ParsedUidSet`] for defensive
526    /// filtering of `VANISHED (EARLIER)` UIDs (RFC 7162 Section 3.2.6).
527    #[allow(clippy::type_complexity)]
528    pub fn uid_fetch(
529        mut self,
530        sequence_set: SequenceSet,
531        items: String,
532        changed_since: Option<u64>,
533        vanished: bool,
534    ) -> Pipeline<'conn, ((Vec<FetchResponse>, Vec<UidRange>), T)> {
535        // Parse before moving sequence_set into the command.
536        let parsed_set = ParsedUidSet::new(&sequence_set);
537        self.commands.push(Command::UidFetch {
538            sequence_set,
539            items,
540            changed_since,
541            vanished,
542        });
543        self.pending
544            .push(Box::new(dispatch::FetchVanishedConsumer::new(parsed_set))
545                as Box<dyn ConsumerErased>);
546        Pipeline {
547            conn: self.conn,
548            pending: self.pending,
549            commands: self.commands,
550            _marker: PhantomData,
551        }
552    }
553
554    pipeline_method! {
555        /// UID STORE command (RFC 3501 §6.4.6).
556        fn uid_store(
557            sequence_set: SequenceSet,
558            operation: StoreOperation,
559            flags: Vec<Flag>,
560            unchanged_since: Option<u64>,
561        ) -> StoreResult;
562        command = Command::UidStore { sequence_set, operation, flags, unchanged_since };
563        consumer = dispatch::StoreConsumer::new();
564    }
565
566    pipeline_method! {
567        /// UID COPY command (RFC 3501 §6.4.7).
568        fn uid_copy(sequence_set: SequenceSet, mailbox: MailboxName) -> CopyResult;
569        command = Command::UidCopy { sequence_set, mailbox };
570        consumer = dispatch::CopyConsumer::new();
571    }
572
573    pipeline_method! {
574        /// UID MOVE command (RFC 6851 §3).
575        fn uid_move_messages(sequence_set: SequenceSet, mailbox: MailboxName) -> MoveResult;
576        command = Command::UidMove { sequence_set, mailbox };
577        consumer = dispatch::MoveConsumer::new();
578    }
579
580    pipeline_method! {
581        /// UID EXPUNGE command (RFC 4315).
582        fn uid_expunge(sequence_set: SequenceSet) -> ExpungeResult;
583        command = Command::UidExpunge { sequence_set };
584        consumer = dispatch::ExpungeConsumer::new();
585    }
586
587    // =======================================================================
588    // Extension commands
589    // =======================================================================
590
591    pipeline_method! {
592        /// ID command (RFC 2971 §3.1).
593        fn id(params: Vec<(String, Option<String>)>) -> Vec<(String, Option<String>)>;
594        command = Command::Id(params);
595        consumer = dispatch::IdConsumer::default();
596    }
597
598    /// GETMETADATA command (RFC 5464 §4.2).
599    pub fn get_metadata(
600        mut self,
601        mailbox: MailboxName,
602        entries: Vec<String>,
603        max_size: Option<u64>,
604        depth: Option<String>,
605    ) -> Pipeline<'conn, (MetadataResult, T)> {
606        let consumer = dispatch::MetadataConsumer::new(mailbox.as_str().to_owned());
607        self.commands.push(Command::GetMetadata {
608            mailbox,
609            entries,
610            max_size,
611            depth,
612        });
613        self.pending
614            .push(Box::new(consumer) as Box<dyn ConsumerErased>);
615        Pipeline {
616            conn: self.conn,
617            pending: self.pending,
618            commands: self.commands,
619            _marker: PhantomData,
620        }
621    }
622
623    pipeline_method! {
624        /// SETMETADATA command (RFC 5464 §4.3).
625        fn set_metadata(
626            mailbox: MailboxName,
627            entries: Vec<(String, Option<Vec<u8>>)>,
628        ) -> ();
629        command = Command::SetMetadata { mailbox, entries };
630        consumer = dispatch::TaggedOkConsumer::default();
631    }
632
633    pipeline_method! {
634        /// THREAD command (RFC 5256 §3).
635        fn thread(algorithm: String, charset: String, criteria: String) -> Vec<ThreadNode>;
636        command = Command::Thread { algorithm, charset, criteria };
637        consumer = dispatch::ThreadConsumer::default();
638    }
639
640    pipeline_method! {
641        /// UID THREAD command (RFC 5256 §3).
642        fn uid_thread(algorithm: String, charset: String, criteria: String) -> Vec<ThreadNode>;
643        command = Command::UidThread { algorithm, charset, criteria };
644        consumer = dispatch::ThreadConsumer::default();
645    }
646
647    pipeline_method! {
648        /// SORT command (RFC 5256 §2).
649        fn sort(sort_criteria: String, charset: String, criteria: String) -> SearchResult;
650        command = Command::Sort { sort_criteria, charset, criteria };
651        consumer = dispatch::SortConsumer::default();
652    }
653
654    pipeline_method! {
655        /// UID SORT command (RFC 5256 §2).
656        fn uid_sort(sort_criteria: String, charset: String, criteria: String) -> SearchResult;
657        command = Command::UidSort { sort_criteria, charset, criteria };
658        consumer = dispatch::SortConsumer::default();
659    }
660
661    pipeline_method! {
662        /// NOTIFY SET command (RFC 5465 §3).
663        fn notify_set(params: NotifySetParams) -> Result<bool, Error>;
664        command = Command::NotifySet(params);
665        consumer = dispatch::NotifySetConsumer::default();
666    }
667
668    pipeline_method! {
669        /// NOTIFY NONE command (RFC 5465 §3).
670        fn notify_none() -> ();
671        command = Command::NotifyNone;
672        consumer = dispatch::TaggedOkConsumer::default();
673    }
674
675    /// GETQUOTA command (RFC 2087 §4.2).
676    pub fn get_quota(mut self, root: String) -> Pipeline<'conn, (Vec<QuotaResource>, T)> {
677        let consumer = dispatch::QuotaConsumer::new(root.clone());
678        self.commands.push(Command::GetQuota { root });
679        self.pending
680            .push(Box::new(consumer) as Box<dyn ConsumerErased>);
681        Pipeline {
682            conn: self.conn,
683            pending: self.pending,
684            commands: self.commands,
685            _marker: PhantomData,
686        }
687    }
688
689    /// GETQUOTAROOT command (RFC 2087 §4.3).
690    pub fn get_quota_root(
691        mut self,
692        mailbox: MailboxName,
693    ) -> Pipeline<'conn, (QuotaRootResponse, T)> {
694        let consumer = dispatch::QuotaRootConsumer::new(mailbox.as_str().to_owned());
695        self.commands.push(Command::GetQuotaRoot { mailbox });
696        self.pending
697            .push(Box::new(consumer) as Box<dyn ConsumerErased>);
698        Pipeline {
699            conn: self.conn,
700            pending: self.pending,
701            commands: self.commands,
702            _marker: PhantomData,
703        }
704    }
705
706    /// SETQUOTA command (RFC 2087 §4.1).
707    ///
708    /// RFC 2087 §4.1: the server MUST respond with QUOTA and QUOTAROOT
709    /// untagged responses. The consumer captures the QUOTA response for
710    /// the requested root.
711    pub fn set_quota(
712        mut self,
713        root: String,
714        resources: Vec<(String, u64)>,
715    ) -> Pipeline<'conn, (Vec<QuotaResource>, T)> {
716        let consumer = dispatch::QuotaConsumer::new(root.clone());
717        self.commands.push(Command::SetQuota { root, resources });
718        self.pending
719            .push(Box::new(consumer) as Box<dyn ConsumerErased>);
720        Pipeline {
721            conn: self.conn,
722            pending: self.pending,
723            commands: self.commands,
724            _marker: PhantomData,
725        }
726    }
727
728    pipeline_method! {
729        /// SETACL command (RFC 4314 §3.1).
730        fn set_acl(mailbox: MailboxName, identifier: String, rights: String) -> ();
731        command = Command::SetAcl { mailbox, identifier, rights };
732        consumer = dispatch::TaggedOkConsumer::default();
733    }
734
735    pipeline_method! {
736        /// DELETEACL command (RFC 4314 §3.2).
737        fn delete_acl(mailbox: MailboxName, identifier: String) -> ();
738        command = Command::DeleteAcl { mailbox, identifier };
739        consumer = dispatch::TaggedOkConsumer::default();
740    }
741
742    /// GETACL command (RFC 4314 §3.3).
743    pub fn get_acl(mut self, mailbox: MailboxName) -> Pipeline<'conn, (Vec<AclEntry>, T)> {
744        let consumer = dispatch::AclConsumer::new(mailbox.as_str().to_owned());
745        self.commands.push(Command::GetAcl { mailbox });
746        self.pending
747            .push(Box::new(consumer) as Box<dyn ConsumerErased>);
748        Pipeline {
749            conn: self.conn,
750            pending: self.pending,
751            commands: self.commands,
752            _marker: PhantomData,
753        }
754    }
755
756    /// LISTRIGHTS command (RFC 4314 §3.4).
757    pub fn list_rights(
758        mut self,
759        mailbox: MailboxName,
760        identifier: String,
761    ) -> Pipeline<'conn, (ListRightsResponse, T)> {
762        let consumer =
763            dispatch::ListRightsConsumer::new(mailbox.as_str().to_owned(), identifier.clone());
764        self.commands.push(Command::ListRights {
765            mailbox,
766            identifier,
767        });
768        self.pending
769            .push(Box::new(consumer) as Box<dyn ConsumerErased>);
770        Pipeline {
771            conn: self.conn,
772            pending: self.pending,
773            commands: self.commands,
774            _marker: PhantomData,
775        }
776    }
777
778    /// MYRIGHTS command (RFC 4314 §3.5).
779    pub fn my_rights(mut self, mailbox: MailboxName) -> Pipeline<'conn, (String, T)> {
780        let consumer = dispatch::MyRightsConsumer::new(mailbox.as_str().to_owned());
781        self.commands.push(Command::MyRights { mailbox });
782        self.pending
783            .push(Box::new(consumer) as Box<dyn ConsumerErased>);
784        Pipeline {
785            conn: self.conn,
786            pending: self.pending,
787            commands: self.commands,
788            _marker: PhantomData,
789        }
790    }
791}
792
793// ---------------------------------------------------------------------------
794// Pipeline execution
795// ---------------------------------------------------------------------------
796
797impl<Accumulated: UnfoldTuple> Pipeline<'_, Accumulated> {
798    /// Submit the accumulated commands as a single batch and await the
799    /// typed result tuple.
800    ///
801    /// Each element in the returned tuple is `Result<T, Error>` where
802    /// `T` is the command's consumer output type. Per-command errors
803    /// (e.g., a server `NO` response) are captured per-element, not
804    /// as a pipeline-level error.
805    ///
806    /// # Errors
807    ///
808    /// - [`PipelineError::Disconnected`] — the driver task has exited.
809    /// - [`PipelineError::Driver`] — the driver aborted the entire
810    ///   batch (e.g., encoding failure before any bytes were written).
811    /// - [`PipelineError::TypeMismatch`] — internal bug in the
812    ///   command-to-consumer mapping.
813    pub async fn execute(self) -> Result<Accumulated::Output, PipelineError> {
814        let results = self.execute_raw().await?;
815        Accumulated::unfold(results)
816    }
817}
818
819impl<T> Pipeline<'_, T> {
820    /// Submit the accumulated commands and return the raw per-command
821    /// results without typed unfolding.
822    ///
823    /// Each entry in the returned `Vec` corresponds to a command in
824    /// push order. Callers must downcast `Box<dyn Any + Send>` to the
825    /// expected consumer output type.
826    ///
827    /// Prefer [`execute`](Pipeline::execute) for type-safe access.
828    pub async fn execute_dynamic(
829        self,
830    ) -> Result<Vec<Result<Box<dyn Any + Send>, Error>>, PipelineError> {
831        self.execute_raw().await
832    }
833
834    /// Internal: send the pipeline to the driver and await the results.
835    async fn execute_raw(self) -> Result<Vec<Result<Box<dyn Any + Send>, Error>>, PipelineError> {
836        let (result_tx, result_rx) = tokio::sync::oneshot::channel();
837        let dcmd = super::driver::DriverCommand::Pipeline {
838            commands: self.commands,
839            consumers: self.pending,
840            result_tx,
841        };
842        if self.conn.cmd_tx.send(dcmd).await.is_err() {
843            return Err(PipelineError::Disconnected);
844        }
845        match result_rx.await {
846            Ok(Ok(results)) => Ok(results),
847            Ok(Err(e)) => Err(PipelineError::Driver(e)),
848            Err(_) => Err(PipelineError::Disconnected),
849        }
850    }
851}
852
853// ---------------------------------------------------------------------------
854// ImapConnection::pipeline()
855// ---------------------------------------------------------------------------
856
857impl super::ImapConnection {
858    /// Begin building a pipelined command batch (RFC 3501 §5.5).
859    ///
860    /// Returns a [`Pipeline`] builder. Chain command methods to
861    /// accumulate commands, then call [`Pipeline::execute`] to submit
862    /// them all in one batch.
863    ///
864    /// Only commands that are safe to pipeline have methods on
865    /// `Pipeline`. State-changing commands (SELECT, STARTTLS, IDLE,
866    /// LOGOUT, AUTHENTICATE, etc.) are intentionally absent — the
867    /// sealed `Pipelinable` trait enforces this at compile time.
868    pub fn pipeline(&self) -> Pipeline<'_, ()> {
869        Pipeline::new(self)
870    }
871}
872
873// ---------------------------------------------------------------------------
874// Tests
875// ---------------------------------------------------------------------------
876
877#[cfg(test)]
878#[path = "tests.rs"]
879mod tests;