Skip to main content

async_imap/
client.rs

1use std::collections::{HashMap, HashSet};
2use std::fmt;
3use std::ops::{Deref, DerefMut};
4use std::pin::Pin;
5use std::str;
6
7use async_channel::{self as channel, bounded};
8#[cfg(feature = "runtime-async-std")]
9use async_std::io::{Read, Write, WriteExt};
10use base64::Engine as _;
11use extensions::id::{format_identification, parse_id};
12use extensions::quota::parse_get_quota_root;
13use futures::{io, Stream, StreamExt};
14use imap_proto::{Metadata, RequestId, Response};
15#[cfg(feature = "runtime-tokio")]
16use tokio::io::{AsyncRead as Read, AsyncWrite as Write, AsyncWriteExt};
17
18use super::authenticator::Authenticator;
19use super::error::{Error, ParseError, Result, ValidateError};
20use super::parse::*;
21use super::types::*;
22use crate::extensions::{self, quota::parse_get_quota};
23use crate::imap_stream::ImapStream;
24
25macro_rules! quote {
26    ($x:expr) => {
27        format!("\"{}\"", $x.replace(r"\", r"\\").replace("\"", "\\\""))
28    };
29}
30
31/// An authenticated IMAP session providing the usual IMAP commands. This type is what you get from
32/// a succesful login attempt.
33///
34/// Note that the server *is* allowed to unilaterally send things to the client for messages in
35/// a selected mailbox whose status has changed. See the note on [unilateral server responses
36/// in RFC 3501](https://tools.ietf.org/html/rfc3501#section-7). Any such messages are parsed out
37/// and sent on `Session::unsolicited_responses`.
38// Both `Client` and `Session` deref to [`Connection`](struct.Connection.html), the underlying
39// primitives type.
40#[derive(Debug)]
41pub struct Session<T: Read + Write + Unpin + fmt::Debug> {
42    pub(crate) conn: Connection<T>,
43    pub(crate) unsolicited_responses_tx: channel::Sender<UnsolicitedResponse>,
44
45    /// Server responses that are not related to the current command. See also the note on
46    /// [unilateral server responses in RFC 3501](https://tools.ietf.org/html/rfc3501#section-7).
47    pub unsolicited_responses: channel::Receiver<UnsolicitedResponse>,
48}
49
50impl<T: Read + Write + Unpin + fmt::Debug> Unpin for Session<T> {}
51impl<T: Read + Write + Unpin + fmt::Debug> Unpin for Client<T> {}
52impl<T: Read + Write + Unpin + fmt::Debug> Unpin for Connection<T> {}
53
54// Make it possible to access the inner connection and modify its settings, such as read/write
55// timeouts.
56impl<T: Read + Write + Unpin + fmt::Debug> AsMut<T> for Session<T> {
57    fn as_mut(&mut self) -> &mut T {
58        self.conn.stream.as_mut()
59    }
60}
61
62/// An (unauthenticated) handle to talk to an IMAP server.
63///
64/// This is what you get when first
65/// connecting. A succesfull call to [`Client::login`] or [`Client::authenticate`] will return a
66/// [`Session`] instance that provides the usual IMAP methods.
67// Both `Client` and `Session` deref to [`Connection`](struct.Connection.html), the underlying
68// primitives type.
69#[derive(Debug)]
70pub struct Client<T: Read + Write + Unpin + fmt::Debug> {
71    conn: Connection<T>,
72}
73
74/// The underlying primitives type. Both `Client`(unauthenticated) and `Session`(after succesful
75/// login) use a `Connection` internally for the TCP stream primitives.
76#[derive(Debug)]
77pub struct Connection<T: Read + Write + Unpin + fmt::Debug> {
78    pub(crate) stream: ImapStream<T>,
79
80    /// Manages the request ids.
81    pub(crate) request_ids: IdGenerator,
82}
83
84// `Deref` instances are so we can make use of the same underlying primitives in `Client` and
85// `Session`
86impl<T: Read + Write + Unpin + fmt::Debug> Deref for Client<T> {
87    type Target = Connection<T>;
88
89    fn deref(&self) -> &Connection<T> {
90        &self.conn
91    }
92}
93
94impl<T: Read + Write + Unpin + fmt::Debug> DerefMut for Client<T> {
95    fn deref_mut(&mut self) -> &mut Connection<T> {
96        &mut self.conn
97    }
98}
99
100impl<T: Read + Write + Unpin + fmt::Debug> Deref for Session<T> {
101    type Target = Connection<T>;
102
103    fn deref(&self) -> &Connection<T> {
104        &self.conn
105    }
106}
107
108impl<T: Read + Write + Unpin + fmt::Debug> DerefMut for Session<T> {
109    fn deref_mut(&mut self) -> &mut Connection<T> {
110        &mut self.conn
111    }
112}
113
114// As the pattern of returning the unauthenticated `Client` (a.k.a. `self`) back with a login error
115// is relatively common, it's abstacted away into a macro here.
116//
117// Note: 1) using `.map_err(|e| (e, self))` or similar here makes the closure own self, so we can't
118//          do that.
119//       2) in theory we wouldn't need the second parameter, and could just use the identifier
120//          `self` from the surrounding function, but being explicit here seems a lot cleaner.
121macro_rules! ok_or_unauth_client_err {
122    ($r:expr, $self:expr) => {
123        match $r {
124            Ok(o) => o,
125            Err(e) => return Err((e, $self)),
126        }
127    };
128}
129
130impl<T: Read + Write + Unpin + fmt::Debug + Send> Client<T> {
131    /// Creates a new client over the given stream.
132    ///
133    /// This method primarily exists for writing tests that mock the underlying transport, but can
134    /// also be used to support IMAP over custom tunnels.
135    pub fn new(stream: T) -> Client<T> {
136        let stream = ImapStream::new(stream);
137
138        Client {
139            conn: Connection {
140                stream,
141                request_ids: IdGenerator::new(),
142            },
143        }
144    }
145
146    /// Convert this Client into the raw underlying stream.
147    pub fn into_inner(self) -> T {
148        let Self { conn, .. } = self;
149        conn.into_inner()
150    }
151
152    /// Log in to the IMAP server. Upon success a [`Session`](struct.Session.html) instance is
153    /// returned; on error the original `Client` instance is returned in addition to the error.
154    /// This is because `login` takes ownership of `self`, so in order to try again (e.g. after
155    /// prompting the user for credetials), ownership of the original `Client` needs to be
156    /// transferred back to the caller.
157    ///
158    /// ```ignore
159    /// # fn main() -> async_imap::error::Result<()> {
160    /// # async_std::task::block_on(async {
161    ///
162    /// let tls = async_native_tls::TlsConnector::new();
163    /// let client = async_imap::connect(
164    ///     ("imap.example.org", 993),
165    ///     "imap.example.org",
166    ///     tls
167    /// ).await?;
168    ///
169    /// match client.login("user", "pass").await {
170    ///     Ok(s) => {
171    ///         // you are successfully authenticated!
172    ///     },
173    ///     Err((e, orig_client)) => {
174    ///         eprintln!("error logging in: {}", e);
175    ///         // prompt user and try again with orig_client here
176    ///         return Err(e);
177    ///     }
178    /// }
179    ///
180    /// # Ok(())
181    /// # }) }
182    /// ```
183    pub async fn login<U: AsRef<str>, P: AsRef<str>>(
184        mut self,
185        username: U,
186        password: P,
187    ) -> ::std::result::Result<Session<T>, (Error, Client<T>)> {
188        let u = ok_or_unauth_client_err!(validate_str(username.as_ref()), self);
189        let p = ok_or_unauth_client_err!(validate_str(password.as_ref()), self);
190        ok_or_unauth_client_err!(
191            self.run_command_and_check_ok(&format!("LOGIN {} {}", u, p), None)
192                .await,
193            self
194        );
195
196        Ok(Session::new(self.conn))
197    }
198
199    /// Authenticate with the server using the given custom `authenticator` to handle the server's
200    /// challenge.
201    ///
202    /// ```ignore
203    /// struct OAuth2 {
204    ///     user: String,
205    ///     access_token: String,
206    /// }
207    ///
208    /// impl async_imap::Authenticator for &OAuth2 {
209    ///     type Response = String;
210    ///     fn process(&mut self, _: &[u8]) -> Self::Response {
211    ///         format!(
212    ///             "user={}\x01auth=Bearer {}\x01\x01",
213    ///             self.user, self.access_token
214    ///         )
215    ///     }
216    /// }
217    ///
218    /// # fn main() -> async_imap::error::Result<()> {
219    /// # async_std::task::block_on(async {
220    ///
221    ///     let auth = OAuth2 {
222    ///         user: String::from("me@example.com"),
223    ///         access_token: String::from("<access_token>"),
224    ///     };
225    ///
226    ///     let domain = "imap.example.com";
227    ///     let tls = async_native_tls::TlsConnector::new();
228    ///     let client = async_imap::connect((domain, 993), domain, tls).await?;
229    ///     match client.authenticate("XOAUTH2", &auth).await {
230    ///         Ok(session) => {
231    ///             // you are successfully authenticated!
232    ///         },
233    ///         Err((err, orig_client)) => {
234    ///             eprintln!("error authenticating: {}", err);
235    ///             // prompt user and try again with orig_client here
236    ///             return Err(err);
237    ///         }
238    ///     };
239    /// # Ok(())
240    /// # }) }
241    /// ```
242    pub async fn authenticate<A: Authenticator, S: AsRef<str>>(
243        mut self,
244        auth_type: S,
245        authenticator: A,
246    ) -> ::std::result::Result<Session<T>, (Error, Client<T>)> {
247        let id = ok_or_unauth_client_err!(
248            self.run_command(&format!("AUTHENTICATE {}", auth_type.as_ref()))
249                .await,
250            self
251        );
252        let session = self.do_auth_handshake(id, authenticator).await?;
253        Ok(session)
254    }
255
256    /// This func does the handshake process once the authenticate command is made.
257    async fn do_auth_handshake<A: Authenticator>(
258        mut self,
259        id: RequestId,
260        mut authenticator: A,
261    ) -> ::std::result::Result<Session<T>, (Error, Client<T>)> {
262        // explicit match blocks neccessary to convert error to tuple and not bind self too
263        // early (see also comment on `login`)
264        loop {
265            if let Some(res) = self.read_response().await {
266                let res = ok_or_unauth_client_err!(res.map_err(Into::into), self);
267                match res.parsed() {
268                    Response::Continue { information, .. } => {
269                        let challenge = if let Some(text) = information {
270                            ok_or_unauth_client_err!(
271                                base64::engine::general_purpose::STANDARD
272                                    .decode(text.as_ref())
273                                    .map_err(|e| Error::Parse(ParseError::Authentication(
274                                        (*text).to_string(),
275                                        Some(e)
276                                    ))),
277                                self
278                            )
279                        } else {
280                            Vec::new()
281                        };
282                        let raw_response = &mut authenticator.process(&challenge);
283                        let auth_response =
284                            base64::engine::general_purpose::STANDARD.encode(raw_response);
285
286                        ok_or_unauth_client_err!(
287                            self.conn.run_command_untagged(&auth_response).await,
288                            self
289                        );
290                    }
291                    _ => {
292                        ok_or_unauth_client_err!(
293                            self.check_done_ok_from(&id, None, res).await,
294                            self
295                        );
296                        return Ok(Session::new(self.conn));
297                    }
298                }
299            } else {
300                return Err((Error::ConnectionLost, self));
301            }
302        }
303    }
304}
305
306impl<T: Read + Write + Unpin + fmt::Debug + Send> Session<T> {
307    unsafe_pinned!(conn: Connection<T>);
308
309    pub(crate) fn get_stream(self: Pin<&mut Self>) -> Pin<&mut ImapStream<T>> {
310        self.conn().stream()
311    }
312
313    // not public, just to avoid duplicating the channel creation code
314    fn new(conn: Connection<T>) -> Self {
315        let (tx, rx) = bounded(100);
316        Session {
317            conn,
318            unsolicited_responses: rx,
319            unsolicited_responses_tx: tx,
320        }
321    }
322
323    /// Selects a mailbox.
324    ///
325    /// The `SELECT` command selects a mailbox so that messages in the mailbox can be accessed.
326    /// Note that earlier versions of this protocol only required the FLAGS, EXISTS, and RECENT
327    /// untagged data; consequently, client implementations SHOULD implement default behavior for
328    /// missing data as discussed with the individual item.
329    ///
330    /// Only one mailbox can be selected at a time in a connection; simultaneous access to multiple
331    /// mailboxes requires multiple connections.  The `SELECT` command automatically deselects any
332    /// currently selected mailbox before attempting the new selection. Consequently, if a mailbox
333    /// is selected and a `SELECT` command that fails is attempted, no mailbox is selected.
334    ///
335    /// Note that the server *is* allowed to unilaterally send things to the client for messages in
336    /// a selected mailbox whose status has changed. See the note on [unilateral server responses
337    /// in RFC 3501](https://tools.ietf.org/html/rfc3501#section-7). This means that if run commands,
338    /// you *may* see additional untagged `RECENT`, `EXISTS`, `FETCH`, and `EXPUNGE` responses.
339    /// You can get them from the `unsolicited_responses` channel of the [`Session`](struct.Session.html).
340    pub async fn select<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<Mailbox> {
341        // TODO: also note READ/WRITE vs READ-only mode!
342        let id = self
343            .run_command(&format!("SELECT {}", validate_str(mailbox_name.as_ref())?))
344            .await?;
345        let mbox = parse_mailbox(
346            &mut self.conn.stream,
347            self.unsolicited_responses_tx.clone(),
348            id,
349        )
350        .await?;
351
352        Ok(mbox)
353    }
354
355    /// Selects a mailbox with `(CONDSTORE)` parameter as defined in
356    /// [RFC 7162](https://www.rfc-editor.org/rfc/rfc7162.html#section-3.1.8).
357    pub async fn select_condstore<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<Mailbox> {
358        let id = self
359            .run_command(&format!(
360                "SELECT {} (CONDSTORE)",
361                validate_str(mailbox_name.as_ref())?
362            ))
363            .await?;
364        let mbox = parse_mailbox(
365            &mut self.conn.stream,
366            self.unsolicited_responses_tx.clone(),
367            id,
368        )
369        .await?;
370
371        Ok(mbox)
372    }
373
374    /// The `EXAMINE` command is identical to [`Session::select`] and returns the same output;
375    /// however, the selected mailbox is identified as read-only. No changes to the permanent state
376    /// of the mailbox, including per-user state, will happen in a mailbox opened with `examine`;
377    /// in particular, messagess cannot lose [`Flag::Recent`] in an examined mailbox.
378    pub async fn examine<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<Mailbox> {
379        let id = self
380            .run_command(&format!("EXAMINE {}", validate_str(mailbox_name.as_ref())?))
381            .await?;
382        let mbox = parse_mailbox(
383            &mut self.conn.stream,
384            self.unsolicited_responses_tx.clone(),
385            id,
386        )
387        .await?;
388
389        Ok(mbox)
390    }
391
392    /// Fetch retreives data associated with a set of messages in the mailbox.
393    ///
394    /// Note that the server *is* allowed to unilaterally include `FETCH` responses for other
395    /// messages in the selected mailbox whose status has changed. See the note on [unilateral
396    /// server responses in RFC 3501](https://tools.ietf.org/html/rfc3501#section-7).
397    ///
398    /// `query` is a list of "data items" (space-separated in parentheses if `>1`). There are three
399    /// "macro items" which specify commonly-used sets of data items, and can be used instead of
400    /// data items.  A macro must be used by itself, and not in conjunction with other macros or
401    /// data items. They are:
402    ///
403    ///  - `ALL`: equivalent to: `(FLAGS INTERNALDATE RFC822.SIZE ENVELOPE)`
404    ///  - `FAST`: equivalent to: `(FLAGS INTERNALDATE RFC822.SIZE)`
405    ///
406    /// The currently defined data items that can be fetched are listen [in the
407    /// RFC](https://tools.ietf.org/html/rfc3501#section-6.4.5), but here are some common ones:
408    ///
409    ///  - `FLAGS`: The flags that are set for this message.
410    ///  - `INTERNALDATE`: The internal date of the message.
411    ///  - `BODY[<section>]`:
412    ///
413    ///    The text of a particular body section.  The section specification is a set of zero or
414    ///    more part specifiers delimited by periods.  A part specifier is either a part number
415    ///    (see RFC) or one of the following: `HEADER`, `HEADER.FIELDS`, `HEADER.FIELDS.NOT`,
416    ///    `MIME`, and `TEXT`.  An empty section specification (i.e., `BODY[]`) refers to the
417    ///    entire message, including the header.
418    ///
419    ///    The `HEADER`, `HEADER.FIELDS`, and `HEADER.FIELDS.NOT` part specifiers refer to the
420    ///    [RFC-2822](https://tools.ietf.org/html/rfc2822) header of the message or of an
421    ///    encapsulated [MIME-IMT](https://tools.ietf.org/html/rfc2046)
422    ///    MESSAGE/[RFC822](https://tools.ietf.org/html/rfc822) message. `HEADER.FIELDS` and
423    ///    `HEADER.FIELDS.NOT` are followed by a list of field-name (as defined in
424    ///    [RFC-2822](https://tools.ietf.org/html/rfc2822)) names, and return a subset of the
425    ///    header.  The subset returned by `HEADER.FIELDS` contains only those header fields with
426    ///    a field-name that matches one of the names in the list; similarly, the subset returned
427    ///    by `HEADER.FIELDS.NOT` contains only the header fields with a non-matching field-name.
428    ///    The field-matching is case-insensitive but otherwise exact.  Subsetting does not
429    ///    exclude the [RFC-2822](https://tools.ietf.org/html/rfc2822) delimiting blank line
430    ///    between the header and the body; the blank line is included in all header fetches,
431    ///    except in the case of a message which has no body and no blank line.
432    ///
433    ///    The `MIME` part specifier refers to the [MIME-IMB](https://tools.ietf.org/html/rfc2045)
434    ///    header for this part.
435    ///
436    ///    The `TEXT` part specifier refers to the text body of the message,
437    ///    omitting the [RFC-2822](https://tools.ietf.org/html/rfc2822) header.
438    ///
439    ///    [`Flag::Seen`] is implicitly set when `BODY` is fetched; if this causes the flags to
440    ///    change, they will generally be included as part of the `FETCH` responses.
441    ///  - `BODY.PEEK[<section>]`: An alternate form of `BODY[<section>]` that does not implicitly
442    ///    set [`Flag::Seen`].
443    ///  - `ENVELOPE`: The envelope structure of the message.  This is computed by the server by
444    ///    parsing the [RFC-2822](https://tools.ietf.org/html/rfc2822) header into the component
445    ///    parts, defaulting various fields as necessary.
446    ///  - `RFC822`: Functionally equivalent to `BODY[]`.
447    ///  - `RFC822.HEADER`: Functionally equivalent to `BODY.PEEK[HEADER]`.
448    ///  - `RFC822.SIZE`: The [RFC-2822](https://tools.ietf.org/html/rfc2822) size of the message.
449    ///  - `UID`: The unique identifier for the message.
450    pub async fn fetch<S1, S2>(
451        &mut self,
452        sequence_set: S1,
453        query: S2,
454    ) -> Result<impl Stream<Item = Result<Fetch>> + '_ + Send>
455    where
456        S1: AsRef<str>,
457        S2: AsRef<str>,
458    {
459        let id = self
460            .run_command(&format!(
461                "FETCH {} {}",
462                sequence_set.as_ref(),
463                query.as_ref()
464            ))
465            .await?;
466        let res = parse_fetches(
467            &mut self.conn.stream,
468            self.unsolicited_responses_tx.clone(),
469            id,
470        );
471
472        Ok(res)
473    }
474
475    /// Equivalent to [`Session::fetch`], except that all identifiers in `uid_set` are
476    /// [`Uid`]s. See also the [`UID` command](https://tools.ietf.org/html/rfc3501#section-6.4.8).
477    pub async fn uid_fetch<S1, S2>(
478        &mut self,
479        uid_set: S1,
480        query: S2,
481    ) -> Result<impl Stream<Item = Result<Fetch>> + '_ + Send + Unpin>
482    where
483        S1: AsRef<str>,
484        S2: AsRef<str>,
485    {
486        let id = self
487            .run_command(&format!(
488                "UID FETCH {} {}",
489                uid_set.as_ref(),
490                query.as_ref()
491            ))
492            .await?;
493        let res = parse_fetches(
494            &mut self.conn.stream,
495            self.unsolicited_responses_tx.clone(),
496            id,
497        );
498        Ok(res)
499    }
500
501    /// Noop always succeeds, and it does nothing.
502    pub async fn noop(&mut self) -> Result<()> {
503        let id = self.run_command("NOOP").await?;
504        parse_noop(
505            &mut self.conn.stream,
506            self.unsolicited_responses_tx.clone(),
507            id,
508        )
509        .await?;
510        Ok(())
511    }
512
513    /// Logout informs the server that the client is done with the connection.
514    pub async fn logout(&mut self) -> Result<()> {
515        self.run_command_and_check_ok("LOGOUT").await?;
516        Ok(())
517    }
518
519    /// The [`CREATE` command](https://tools.ietf.org/html/rfc3501#section-6.3.3) creates a mailbox
520    /// with the given name.  `Ok` is returned only if a new mailbox with that name has been
521    /// created.  It is an error to attempt to create `INBOX` or a mailbox with a name that
522    /// refers to an extant mailbox.  Any error in creation will return [`Error::No`].
523    ///
524    /// If the mailbox name is suffixed with the server's hierarchy separator character (as
525    /// returned from the server by [`Session::list`]), this is a declaration that the client
526    /// intends to create mailbox names under this name in the hierarchy.  Servers that do not
527    /// require this declaration will ignore the declaration.  In any case, the name created is
528    /// without the trailing hierarchy delimiter.
529    ///
530    /// If the server's hierarchy separator character appears elsewhere in the name, the server
531    /// will generally create any superior hierarchical names that are needed for the `CREATE`
532    /// command to be successfully completed.  In other words, an attempt to create `foo/bar/zap`
533    /// on a server in which `/` is the hierarchy separator character will usually create `foo/`
534    /// and `foo/bar/` if they do not already exist.
535    ///
536    /// If a new mailbox is created with the same name as a mailbox which was deleted, its unique
537    /// identifiers will be greater than any unique identifiers used in the previous incarnation of
538    /// the mailbox UNLESS the new incarnation has a different unique identifier validity value.
539    /// See the description of the [`UID`
540    /// command](https://tools.ietf.org/html/rfc3501#section-6.4.8) for more detail.
541    pub async fn create<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<()> {
542        self.run_command_and_check_ok(&format!("CREATE {}", validate_str(mailbox_name.as_ref())?))
543            .await?;
544
545        Ok(())
546    }
547
548    /// The [`DELETE` command](https://tools.ietf.org/html/rfc3501#section-6.3.4) permanently
549    /// removes the mailbox with the given name.  `Ok` is returned only if the mailbox has been
550    /// deleted.  It is an error to attempt to delete `INBOX` or a mailbox name that does not
551    /// exist.
552    ///
553    /// The `DELETE` command will not remove inferior hierarchical names. For example, if a mailbox
554    /// `foo` has an inferior `foo.bar` (assuming `.` is the hierarchy delimiter character),
555    /// removing `foo` will not remove `foo.bar`.  It is an error to attempt to delete a name that
556    /// has inferior hierarchical names and also has [`NameAttribute::NoSelect`].
557    ///
558    /// It is permitted to delete a name that has inferior hierarchical names and does not have
559    /// [`NameAttribute::NoSelect`].  In this case, all messages in that mailbox are removed, and
560    /// the name will acquire [`NameAttribute::NoSelect`].
561    ///
562    /// The value of the highest-used unique identifier of the deleted mailbox will be preserved so
563    /// that a new mailbox created with the same name will not reuse the identifiers of the former
564    /// incarnation, UNLESS the new incarnation has a different unique identifier validity value.
565    /// See the description of the [`UID`
566    /// command](https://tools.ietf.org/html/rfc3501#section-6.4.8) for more detail.
567    pub async fn delete<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<()> {
568        self.run_command_and_check_ok(&format!("DELETE {}", validate_str(mailbox_name.as_ref())?))
569            .await?;
570
571        Ok(())
572    }
573
574    /// The [`RENAME` command](https://tools.ietf.org/html/rfc3501#section-6.3.5) changes the name
575    /// of a mailbox.  `Ok` is returned only if the mailbox has been renamed.  It is an error to
576    /// attempt to rename from a mailbox name that does not exist or to a mailbox name that already
577    /// exists.  Any error in renaming will return [`Error::No`].
578    ///
579    /// If the name has inferior hierarchical names, then the inferior hierarchical names will also
580    /// be renamed.  For example, a rename of `foo` to `zap` will rename `foo/bar` (assuming `/` is
581    /// the hierarchy delimiter character) to `zap/bar`.
582    ///
583    /// If the server's hierarchy separator character appears in the name, the server will
584    /// generally create any superior hierarchical names that are needed for the `RENAME` command
585    /// to complete successfully.  In other words, an attempt to rename `foo/bar/zap` to
586    /// `baz/rag/zowie` on a server in which `/` is the hierarchy separator character will
587    /// generally create `baz/` and `baz/rag/` if they do not already exist.
588    ///
589    /// The value of the highest-used unique identifier of the old mailbox name will be preserved
590    /// so that a new mailbox created with the same name will not reuse the identifiers of the
591    /// former incarnation, UNLESS the new incarnation has a different unique identifier validity
592    /// value. See the description of the [`UID`
593    /// command](https://tools.ietf.org/html/rfc3501#section-6.4.8) for more detail.
594    ///
595    /// Renaming `INBOX` is permitted, and has special behavior.  It moves all messages in `INBOX`
596    /// to a new mailbox with the given name, leaving `INBOX` empty.  If the server implementation
597    /// supports inferior hierarchical names of `INBOX`, these are unaffected by a rename of
598    /// `INBOX`.
599    pub async fn rename<S1: AsRef<str>, S2: AsRef<str>>(&mut self, from: S1, to: S2) -> Result<()> {
600        self.run_command_and_check_ok(&format!(
601            "RENAME {} {}",
602            quote!(from.as_ref()),
603            quote!(to.as_ref())
604        ))
605        .await?;
606
607        Ok(())
608    }
609
610    /// The [`SUBSCRIBE` command](https://tools.ietf.org/html/rfc3501#section-6.3.6) adds the
611    /// specified mailbox name to the server's set of "active" or "subscribed" mailboxes as
612    /// returned by [`Session::lsub`].  This command returns `Ok` only if the subscription is
613    /// successful.
614    ///
615    /// The server may validate the mailbox argument to `SUBSCRIBE` to verify that it exists.
616    /// However, it will not unilaterally remove an existing mailbox name from the subscription
617    /// list even if a mailbox by that name no longer exists.
618    pub async fn subscribe<S: AsRef<str>>(&mut self, mailbox: S) -> Result<()> {
619        self.run_command_and_check_ok(&format!("SUBSCRIBE {}", quote!(mailbox.as_ref())))
620            .await?;
621        Ok(())
622    }
623
624    /// The [`UNSUBSCRIBE` command](https://tools.ietf.org/html/rfc3501#section-6.3.7) removes the
625    /// specified mailbox name from the server's set of "active" or "subscribed" mailboxes as
626    /// returned by [`Session::lsub`].  This command returns `Ok` only if the unsubscription is
627    /// successful.
628    pub async fn unsubscribe<S: AsRef<str>>(&mut self, mailbox: S) -> Result<()> {
629        self.run_command_and_check_ok(&format!("UNSUBSCRIBE {}", quote!(mailbox.as_ref())))
630            .await?;
631        Ok(())
632    }
633
634    /// The [`CAPABILITY` command](https://tools.ietf.org/html/rfc3501#section-6.1.1) requests a
635    /// listing of capabilities that the server supports.  The server will include "IMAP4rev1" as
636    /// one of the listed capabilities. See [`Capabilities`] for further details.
637    pub async fn capabilities(&mut self) -> Result<Capabilities> {
638        let id = self.run_command("CAPABILITY").await?;
639        let c = parse_capabilities(
640            &mut self.conn.stream,
641            self.unsolicited_responses_tx.clone(),
642            id,
643        )
644        .await?;
645        Ok(c)
646    }
647
648    /// The [`EXPUNGE` command](https://tools.ietf.org/html/rfc3501#section-6.4.3) permanently
649    /// removes all messages that have [`Flag::Deleted`] set from the currently selected mailbox.
650    /// The message sequence number of each message that is removed is returned.
651    pub async fn expunge(&mut self) -> Result<impl Stream<Item = Result<Seq>> + '_ + Send> {
652        let id = self.run_command("EXPUNGE").await?;
653        let res = parse_expunge(
654            &mut self.conn.stream,
655            self.unsolicited_responses_tx.clone(),
656            id,
657        );
658        Ok(res)
659    }
660
661    /// The [`UID EXPUNGE` command](https://tools.ietf.org/html/rfc4315#section-2.1) permanently
662    /// removes all messages that both have [`Flag::Deleted`] set and have a [`Uid`] that is
663    /// included in the specified sequence set from the currently selected mailbox.  If a message
664    /// either does not have [`Flag::Deleted`] set or has a [`Uid`] that is not included in the
665    /// specified sequence set, it is not affected.
666    ///
667    /// This command is particularly useful for disconnected use clients. By using `uid_expunge`
668    /// instead of [`Self::expunge`] when resynchronizing with the server, the client can ensure that it
669    /// does not inadvertantly remove any messages that have been marked as [`Flag::Deleted`] by
670    /// other clients between the time that the client was last connected and the time the client
671    /// resynchronizes.
672    ///
673    /// This command requires that the server supports [RFC
674    /// 4315](https://tools.ietf.org/html/rfc4315) as indicated by the `UIDPLUS` capability (see
675    /// [`Session::capabilities`]). If the server does not support the `UIDPLUS` capability, the
676    /// client should fall back to using [`Session::store`] to temporarily remove [`Flag::Deleted`]
677    /// from messages it does not want to remove, then invoking [`Session::expunge`].  Finally, the
678    /// client should use [`Session::store`] to restore [`Flag::Deleted`] on the messages in which
679    /// it was temporarily removed.
680    ///
681    /// Alternatively, the client may fall back to using just [`Session::expunge`], risking the
682    /// unintended removal of some messages.
683    pub async fn uid_expunge<S: AsRef<str>>(
684        &mut self,
685        uid_set: S,
686    ) -> Result<impl Stream<Item = Result<Uid>> + '_ + Send> {
687        let id = self
688            .run_command(&format!("UID EXPUNGE {}", uid_set.as_ref()))
689            .await?;
690        let res = parse_expunge(
691            &mut self.conn.stream,
692            self.unsolicited_responses_tx.clone(),
693            id,
694        );
695        Ok(res)
696    }
697
698    /// The [`CHECK` command](https://tools.ietf.org/html/rfc3501#section-6.4.1) requests a
699    /// checkpoint of the currently selected mailbox.  A checkpoint refers to any
700    /// implementation-dependent housekeeping associated with the mailbox (e.g., resolving the
701    /// server's in-memory state of the mailbox with the state on its disk) that is not normally
702    /// executed as part of each command.  A checkpoint MAY take a non-instantaneous amount of real
703    /// time to complete.  If a server implementation has no such housekeeping considerations,
704    /// [`Session::check`] is equivalent to [`Session::noop`].
705    ///
706    /// There is no guarantee that an `EXISTS` untagged response will happen as a result of
707    /// `CHECK`.  [`Session::noop`] SHOULD be used for new message polling.
708    pub async fn check(&mut self) -> Result<()> {
709        self.run_command_and_check_ok("CHECK").await?;
710        Ok(())
711    }
712
713    /// The [`CLOSE` command](https://tools.ietf.org/html/rfc3501#section-6.4.2) permanently
714    /// removes all messages that have [`Flag::Deleted`] set from the currently selected mailbox,
715    /// and returns to the authenticated state from the selected state.  No `EXPUNGE` responses are
716    /// sent.
717    ///
718    /// No messages are removed, and no error is given, if the mailbox is selected by
719    /// [`Session::examine`] or is otherwise selected read-only.
720    ///
721    /// Even if a mailbox is selected, [`Session::select`], [`Session::examine`], or
722    /// [`Session::logout`] command MAY be issued without previously invoking [`Session::close`].
723    /// [`Session::select`], [`Session::examine`], and [`Session::logout`] implicitly close the
724    /// currently selected mailbox without doing an expunge.  However, when many messages are
725    /// deleted, a `CLOSE-LOGOUT` or `CLOSE-SELECT` sequence is considerably faster than an
726    /// `EXPUNGE-LOGOUT` or `EXPUNGE-SELECT` because no `EXPUNGE` responses (which the client would
727    /// probably ignore) are sent.
728    pub async fn close(&mut self) -> Result<()> {
729        self.run_command_and_check_ok("CLOSE").await?;
730        Ok(())
731    }
732
733    /// The [`STORE` command](https://tools.ietf.org/html/rfc3501#section-6.4.6) alters data
734    /// associated with a message in the mailbox.  Normally, `STORE` will return the updated value
735    /// of the data with an untagged FETCH response.  A suffix of `.SILENT` in `query` prevents the
736    /// untagged `FETCH`, and the server assumes that the client has determined the updated value
737    /// itself or does not care about the updated value.
738    ///
739    /// The currently defined data items that can be stored are:
740    ///
741    ///  - `FLAGS <flag list>`:
742    ///
743    ///    Replace the flags for the message (other than [`Flag::Recent`]) with the argument.  The
744    ///    new value of the flags is returned as if a `FETCH` of those flags was done.
745    ///
746    ///  - `FLAGS.SILENT <flag list>`: Equivalent to `FLAGS`, but without returning a new value.
747    ///
748    ///  - `+FLAGS <flag list>`
749    ///
750    ///    Add the argument to the flags for the message.  The new value of the flags is returned
751    ///    as if a `FETCH` of those flags was done.
752    ///  - `+FLAGS.SILENT <flag list>`: Equivalent to `+FLAGS`, but without returning a new value.
753    ///
754    ///  - `-FLAGS <flag list>`
755    ///
756    ///    Remove the argument from the flags for the message.  The new value of the flags is
757    ///    returned as if a `FETCH` of those flags was done.
758    ///
759    ///  - `-FLAGS.SILENT <flag list>`: Equivalent to `-FLAGS`, but without returning a new value.
760    ///
761    /// In all cases, `<flag list>` is a space-separated list enclosed in parentheses.
762    ///
763    /// # Examples
764    ///
765    /// Delete a message:
766    ///
767    /// ```no_run
768    /// use async_imap::{types::Seq, Session, error::Result};
769    /// #[cfg(feature = "runtime-async-std")]
770    /// use async_std::net::TcpStream;
771    /// #[cfg(feature = "runtime-tokio")]
772    /// use tokio::net::TcpStream;
773    /// use futures::TryStreamExt;
774    ///
775    /// async fn delete(seq: Seq, s: &mut Session<TcpStream>) -> Result<()> {
776    ///     let updates_stream = s.store(format!("{}", seq), "+FLAGS (\\Deleted)").await?;
777    ///     let _updates: Vec<_> = updates_stream.try_collect().await?;
778    ///     s.expunge().await?;
779    ///     Ok(())
780    /// }
781    /// ```
782    pub async fn store<S1, S2>(
783        &mut self,
784        sequence_set: S1,
785        query: S2,
786    ) -> Result<impl Stream<Item = Result<Fetch>> + '_ + Send>
787    where
788        S1: AsRef<str>,
789        S2: AsRef<str>,
790    {
791        let id = self
792            .run_command(&format!(
793                "STORE {} {}",
794                sequence_set.as_ref(),
795                query.as_ref()
796            ))
797            .await?;
798        let res = parse_fetches(
799            &mut self.conn.stream,
800            self.unsolicited_responses_tx.clone(),
801            id,
802        );
803        Ok(res)
804    }
805
806    /// Equivalent to [`Session::store`], except that all identifiers in `sequence_set` are
807    /// [`Uid`]s. See also the [`UID` command](https://tools.ietf.org/html/rfc3501#section-6.4.8).
808    pub async fn uid_store<S1, S2>(
809        &mut self,
810        uid_set: S1,
811        query: S2,
812    ) -> Result<impl Stream<Item = Result<Fetch>> + '_ + Send>
813    where
814        S1: AsRef<str>,
815        S2: AsRef<str>,
816    {
817        let id = self
818            .run_command(&format!(
819                "UID STORE {} {}",
820                uid_set.as_ref(),
821                query.as_ref()
822            ))
823            .await?;
824        let res = parse_fetches(
825            &mut self.conn.stream,
826            self.unsolicited_responses_tx.clone(),
827            id,
828        );
829        Ok(res)
830    }
831
832    /// The [`COPY` command](https://tools.ietf.org/html/rfc3501#section-6.4.7) copies the
833    /// specified message(s) to the end of the specified destination mailbox.  The flags and
834    /// internal date of the message(s) will generally be preserved, and [`Flag::Recent`] will
835    /// generally be set, in the copy.
836    ///
837    /// If the `COPY` command is unsuccessful for any reason, the server restores the destination
838    /// mailbox to its state before the `COPY` attempt.
839    pub async fn copy<S1: AsRef<str>, S2: AsRef<str>>(
840        &mut self,
841        sequence_set: S1,
842        mailbox_name: S2,
843    ) -> Result<()> {
844        self.run_command_and_check_ok(&format!(
845            "COPY {} {}",
846            sequence_set.as_ref(),
847            mailbox_name.as_ref()
848        ))
849        .await?;
850
851        Ok(())
852    }
853
854    /// Equivalent to [`Session::copy`], except that all identifiers in `sequence_set` are
855    /// [`Uid`]s. See also the [`UID` command](https://tools.ietf.org/html/rfc3501#section-6.4.8).
856    pub async fn uid_copy<S1: AsRef<str>, S2: AsRef<str>>(
857        &mut self,
858        uid_set: S1,
859        mailbox_name: S2,
860    ) -> Result<()> {
861        self.run_command_and_check_ok(&format!(
862            "UID COPY {} {}",
863            uid_set.as_ref(),
864            mailbox_name.as_ref()
865        ))
866        .await?;
867
868        Ok(())
869    }
870
871    /// The [`MOVE` command](https://tools.ietf.org/html/rfc6851#section-3.1) takes two
872    /// arguments: a sequence set and a named mailbox. Each message included in the set is moved,
873    /// rather than copied, from the selected (source) mailbox to the named (target) mailbox.
874    ///
875    /// This means that a new message is created in the target mailbox with a
876    /// new [`Uid`], the original message is removed from the source mailbox, and
877    /// it appears to the client as a single action.  This has the same
878    /// effect for each message as this sequence:
879    ///
880    ///   1. COPY
881    ///   2. STORE +FLAGS.SILENT \DELETED
882    ///   3. EXPUNGE
883    ///
884    /// This command requires that the server supports [RFC
885    /// 6851](https://tools.ietf.org/html/rfc6851) as indicated by the `MOVE` capability (see
886    /// [`Session::capabilities`]).
887    ///
888    /// Although the effect of the `MOVE` is the same as the preceding steps, the semantics are not
889    /// identical: The intermediate states produced by those steps do not occur, and the response
890    /// codes are different.  In particular, though the `COPY` and `EXPUNGE` response codes will be
891    /// returned, response codes for a `store` will not be generated and [`Flag::Deleted`] will not
892    /// be set for any message.
893    ///
894    /// Because a `MOVE` applies to a set of messages, it might fail partway through the set.
895    /// Regardless of whether the command is successful in moving the entire set, each individual
896    /// message will either be moved or unaffected.  The server will leave each message in a state
897    /// where it is in at least one of the source or target mailboxes (no message can be lost or
898    /// orphaned).  The server will generally not leave any message in both mailboxes (it would be
899    /// bad for a partial failure to result in a bunch of duplicate messages).  This is true even
900    /// if the server returns with [`Error::No`].
901    pub async fn mv<S1: AsRef<str>, S2: AsRef<str>>(
902        &mut self,
903        sequence_set: S1,
904        mailbox_name: S2,
905    ) -> Result<()> {
906        self.run_command_and_check_ok(&format!(
907            "MOVE {} {}",
908            sequence_set.as_ref(),
909            validate_str(mailbox_name.as_ref())?
910        ))
911        .await?;
912
913        Ok(())
914    }
915
916    /// Equivalent to [`Session::copy`], except that all identifiers in `sequence_set` are
917    /// [`Uid`]s. See also the [`UID` command](https://tools.ietf.org/html/rfc3501#section-6.4.8)
918    /// and the [semantics of `MOVE` and `UID
919    /// MOVE`](https://tools.ietf.org/html/rfc6851#section-3.3).
920    pub async fn uid_mv<S1: AsRef<str>, S2: AsRef<str>>(
921        &mut self,
922        uid_set: S1,
923        mailbox_name: S2,
924    ) -> Result<()> {
925        self.run_command_and_check_ok(&format!(
926            "UID MOVE {} {}",
927            uid_set.as_ref(),
928            validate_str(mailbox_name.as_ref())?
929        ))
930        .await?;
931
932        Ok(())
933    }
934
935    /// The [`LIST` command](https://tools.ietf.org/html/rfc3501#section-6.3.8) returns a subset of
936    /// names from the complete set of all names available to the client.  It returns the name
937    /// attributes, hierarchy delimiter, and name of each such name; see [`Name`] for more detail.
938    ///
939    /// If `reference_name` is `None` (or `""`), the currently selected mailbox is used.
940    /// The returned mailbox names must match the supplied `mailbox_pattern`.  A non-empty
941    /// reference name argument is the name of a mailbox or a level of mailbox hierarchy, and
942    /// indicates the context in which the mailbox name is interpreted.
943    ///
944    /// If `mailbox_pattern` is `None` (or `""`), it is a special request to return the hierarchy
945    /// delimiter and the root name of the name given in the reference.  The value returned as the
946    /// root MAY be the empty string if the reference is non-rooted or is an empty string.  In all
947    /// cases, a hierarchy delimiter (or `NIL` if there is no hierarchy) is returned.  This permits
948    /// a client to get the hierarchy delimiter (or find out that the mailbox names are flat) even
949    /// when no mailboxes by that name currently exist.
950    ///
951    /// The reference and mailbox name arguments are interpreted into a canonical form that
952    /// represents an unambiguous left-to-right hierarchy.  The returned mailbox names will be in
953    /// the interpreted form.
954    ///
955    /// The character `*` is a wildcard, and matches zero or more characters at this position.  The
956    /// character `%` is similar to `*`, but it does not match a hierarchy delimiter.  If the `%`
957    /// wildcard is the last character of a mailbox name argument, matching levels of hierarchy are
958    /// also returned.  If these levels of hierarchy are not also selectable mailboxes, they are
959    /// returned with [`NameAttribute::NoSelect`].
960    ///
961    /// The special name `INBOX` is included if `INBOX` is supported by this server for this user
962    /// and if the uppercase string `INBOX` matches the interpreted reference and mailbox name
963    /// arguments with wildcards.  The criteria for omitting `INBOX` is whether `SELECT INBOX` will
964    /// return failure; it is not relevant whether the user's real `INBOX` resides on this or some
965    /// other server.
966    pub async fn list(
967        &mut self,
968        reference_name: Option<&str>,
969        mailbox_pattern: Option<&str>,
970    ) -> Result<impl Stream<Item = Result<Name>> + '_ + Send> {
971        let id = self
972            .run_command(&format!(
973                "LIST {} {}",
974                quote!(reference_name.unwrap_or("")),
975                mailbox_pattern.unwrap_or("\"\"")
976            ))
977            .await?;
978
979        Ok(parse_names(
980            &mut self.conn.stream,
981            self.unsolicited_responses_tx.clone(),
982            id,
983        ))
984    }
985
986    /// The [`LSUB` command](https://tools.ietf.org/html/rfc3501#section-6.3.9) returns a subset of
987    /// names from the set of names that the user has declared as being "active" or "subscribed".
988    /// The arguments to this method the same as for [`Session::list`].
989    ///
990    /// The returned [`Name`]s MAY contain different mailbox flags from response to
991    /// [`Session::list`].  If this should happen, the flags returned by [`Session::list`] are
992    /// considered more authoritative.
993    ///
994    /// A special situation occurs when invoking `lsub` with the `%` wildcard. Consider what
995    /// happens if `foo/bar` (with a hierarchy delimiter of `/`) is subscribed but `foo` is not.  A
996    /// `%` wildcard to `lsub` must return `foo`, not `foo/bar`, and it will be flagged with
997    /// [`NameAttribute::NoSelect`].
998    ///
999    /// The server will not unilaterally remove an existing mailbox name from the subscription list
1000    /// even if a mailbox by that name no longer exists.
1001    pub async fn lsub(
1002        &mut self,
1003        reference_name: Option<&str>,
1004        mailbox_pattern: Option<&str>,
1005    ) -> Result<impl Stream<Item = Result<Name>> + '_ + Send> {
1006        let id = self
1007            .run_command(&format!(
1008                "LSUB {} {}",
1009                quote!(reference_name.unwrap_or("")),
1010                mailbox_pattern.unwrap_or("")
1011            ))
1012            .await?;
1013        let names = parse_names(
1014            &mut self.conn.stream,
1015            self.unsolicited_responses_tx.clone(),
1016            id,
1017        );
1018
1019        Ok(names)
1020    }
1021
1022    /// The [`STATUS` command](https://tools.ietf.org/html/rfc3501#section-6.3.10) requests the
1023    /// status of the indicated mailbox. It does not change the currently selected mailbox, nor
1024    /// does it affect the state of any messages in the queried mailbox (in particular, `status`
1025    /// will not cause messages to lose [`Flag::Recent`]).
1026    ///
1027    /// `status` provides an alternative to opening a second [`Session`] and using
1028    /// [`Session::examine`] on a mailbox to query that mailbox's status without deselecting the
1029    /// current mailbox in the first `Session`.
1030    ///
1031    /// Unlike [`Session::list`], `status` is not guaranteed to be fast in its response.  Under
1032    /// certain circumstances, it can be quite slow.  In some implementations, the server is
1033    /// obliged to open the mailbox read-only internally to obtain certain status information.
1034    /// Also unlike [`Session::list`], `status` does not accept wildcards.
1035    ///
1036    /// > Note: `status` is intended to access the status of mailboxes other than the currently
1037    /// > selected mailbox.  Because `status` can cause the mailbox to be opened internally, and
1038    /// > because this information is available by other means on the selected mailbox, `status`
1039    /// > SHOULD NOT be used on the currently selected mailbox.
1040    ///
1041    /// The STATUS command MUST NOT be used as a "check for new messages in the selected mailbox"
1042    /// operation (refer to sections [7](https://tools.ietf.org/html/rfc3501#section-7),
1043    /// [7.3.1](https://tools.ietf.org/html/rfc3501#section-7.3.1), and
1044    /// [7.3.2](https://tools.ietf.org/html/rfc3501#section-7.3.2) for more information about the
1045    /// proper method for new message checking).
1046    ///
1047    /// The currently defined status data items that can be requested are:
1048    ///
1049    ///  - `MESSAGES`: The number of messages in the mailbox.
1050    ///  - `RECENT`: The number of messages with [`Flag::Recent`] set.
1051    ///  - `UIDNEXT`: The next [`Uid`] of the mailbox.
1052    ///  - `UIDVALIDITY`: The unique identifier validity value of the mailbox (see [`Uid`]).
1053    ///  - `UNSEEN`: The number of messages which do not have [`Flag::Seen`] set.
1054    ///
1055    /// `data_items` is a space-separated list enclosed in parentheses.
1056    pub async fn status<S1: AsRef<str>, S2: AsRef<str>>(
1057        &mut self,
1058        mailbox_name: S1,
1059        data_items: S2,
1060    ) -> Result<Mailbox> {
1061        let id = self
1062            .run_command(&format!(
1063                "STATUS {} {}",
1064                validate_str(mailbox_name.as_ref())?,
1065                data_items.as_ref()
1066            ))
1067            .await?;
1068        let mbox = parse_status(
1069            &mut self.conn.stream,
1070            mailbox_name.as_ref(),
1071            self.unsolicited_responses_tx.clone(),
1072            id,
1073        )
1074        .await?;
1075        Ok(mbox)
1076    }
1077
1078    /// This method returns a handle that lets you use the [`IDLE`
1079    /// command](https://tools.ietf.org/html/rfc2177#section-3) to listen for changes to the
1080    /// currently selected mailbox.
1081    ///
1082    /// It's often more desirable to have the server transmit updates to the client in real time.
1083    /// This allows a user to see new mail immediately.  It also helps some real-time applications
1084    /// based on IMAP, which might otherwise need to poll extremely often (such as every few
1085    /// seconds).  While the spec actually does allow a server to push `EXISTS` responses
1086    /// aysynchronously, a client can't expect this behaviour and must poll.  This method provides
1087    /// you with such a mechanism.
1088    ///
1089    /// `idle` may be used with any server that returns `IDLE` as one of the supported capabilities
1090    /// (see [`Session::capabilities`]). If the server does not advertise the `IDLE` capability,
1091    /// the client MUST NOT use `idle` and must instead poll for mailbox updates.  In particular,
1092    /// the client MUST continue to be able to accept unsolicited untagged responses to ANY
1093    /// command, as specified in the base IMAP specification.
1094    ///
1095    /// See [`extensions::idle::Handle`] for details.
1096    pub fn idle(self) -> extensions::idle::Handle<T> {
1097        extensions::idle::Handle::new(self)
1098    }
1099
1100    /// The [`APPEND` command](https://tools.ietf.org/html/rfc3501#section-6.3.11) appends
1101    /// `content` as a new message to the end of the specified destination `mailbox`.  This
1102    /// argument SHOULD be in the format of an [RFC-2822](https://tools.ietf.org/html/rfc2822)
1103    /// message.
1104    ///
1105    /// > Note: There MAY be exceptions, e.g., draft messages, in which required RFC-2822 header
1106    /// > lines are omitted in the message literal argument to `append`.  The full implications of
1107    /// > doing so MUST be understood and carefully weighed.
1108    ///
1109    /// If the append is unsuccessful for any reason, the mailbox is restored to its state before
1110    /// the append attempt; no partial appending will happen.
1111    ///
1112    /// If the destination `mailbox` does not exist, the server returns an error, and does not
1113    /// automatically create the mailbox.
1114    ///
1115    /// If the mailbox is currently selected, the normal new message actions will generally occur.
1116    /// Specifically, the server will generally notify the client immediately via an untagged
1117    /// `EXISTS` response.  If the server does not do so, the client MAY issue a `NOOP` command (or
1118    /// failing that, a `CHECK` command) after one or more `APPEND` commands.
1119    pub async fn append(
1120        &mut self,
1121        mailbox: impl AsRef<str>,
1122        flags: Option<&str>,
1123        internaldate: Option<&str>,
1124        content: impl AsRef<[u8]>,
1125    ) -> Result<()> {
1126        let content = content.as_ref();
1127        let id = self
1128            .run_command(&format!(
1129                "APPEND \"{}\"{}{}{}{} {{{}}}",
1130                mailbox.as_ref(),
1131                if flags.is_some() { " " } else { "" },
1132                flags.unwrap_or(""),
1133                if internaldate.is_some() { " " } else { "" },
1134                internaldate.unwrap_or(""),
1135                content.len()
1136            ))
1137            .await?;
1138
1139        match self.read_response().await {
1140            Some(Ok(res)) => {
1141                if let Response::Continue { .. } = res.parsed() {
1142                    self.stream.as_mut().write_all(content).await?;
1143                    self.stream.as_mut().write_all(b"\r\n").await?;
1144                    self.stream.flush().await?;
1145                    self.conn
1146                        .check_done_ok(&id, Some(self.unsolicited_responses_tx.clone()))
1147                        .await?;
1148                    Ok(())
1149                } else {
1150                    Err(Error::Append)
1151                }
1152            }
1153            Some(Err(err)) => Err(err.into()),
1154            _ => Err(Error::Append),
1155        }
1156    }
1157
1158    /// The [`SEARCH` command](https://tools.ietf.org/html/rfc3501#section-6.4.4) searches the
1159    /// mailbox for messages that match the given `query`.  `query` consist of one or more search
1160    /// keys separated by spaces.  The response from the server contains a listing of [`Seq`]s
1161    /// corresponding to those messages that match the searching criteria.
1162    ///
1163    /// When multiple search keys are specified, the result is the intersection of all the messages
1164    /// that match those keys.  Or, in other words, only messages that match *all* the keys. For
1165    /// example, the criteria
1166    ///
1167    /// ```text
1168    /// DELETED FROM "SMITH" SINCE 1-Feb-1994
1169    /// ```
1170    ///
1171    /// refers to all deleted messages from Smith that were placed in the mailbox since February 1,
1172    /// 1994.  A search key can also be a parenthesized list of one or more search keys (e.g., for
1173    /// use with the `OR` and `NOT` keys).
1174    ///
1175    /// In all search keys that use strings, a message matches the key if the string is a substring
1176    /// of the field.  The matching is case-insensitive.
1177    ///
1178    /// Below is a selection of common search keys.  The full list can be found in the
1179    /// specification of the [`SEARCH command`](https://tools.ietf.org/html/rfc3501#section-6.4.4).
1180    ///
1181    ///  - `NEW`: Messages that have [`Flag::Recent`] set but not [`Flag::Seen`]. This is functionally equivalent to `(RECENT UNSEEN)`.
1182    ///  - `OLD`: Messages that do not have [`Flag::Recent`] set.  This is functionally equivalent to `NOT RECENT` (as opposed to `NOT NEW`).
1183    ///  - `RECENT`: Messages that have [`Flag::Recent`] set.
1184    ///  - `ANSWERED`: Messages with [`Flag::Answered`] set.
1185    ///  - `DELETED`: Messages with [`Flag::Deleted`] set.
1186    ///  - `DRAFT`: Messages with [`Flag::Draft`] set.
1187    ///  - `FLAGGED`: Messages with [`Flag::Flagged`] set.
1188    ///  - `SEEN`: Messages that have [`Flag::Seen`] set.
1189    ///  - `<sequence set>`: Messages with message sequence numbers corresponding to the specified message sequence number set.
1190    ///  - `UID <sequence set>`: Messages with [`Uid`] corresponding to the specified unique identifier set.  Sequence set ranges are permitted.
1191    ///
1192    ///  - `SUBJECT <string>`: Messages that contain the specified string in the envelope structure's `SUBJECT` field.
1193    ///  - `BODY <string>`: Messages that contain the specified string in the body of the message.
1194    ///  - `FROM <string>`: Messages that contain the specified string in the envelope structure's `FROM` field.
1195    ///  - `TO <string>`: Messages that contain the specified string in the envelope structure's `TO` field.
1196    ///
1197    ///  - `NOT <search-key>`: Messages that do not match the specified search key.
1198    ///  - `OR <search-key1> <search-key2>`: Messages that match either search key.
1199    ///
1200    ///  - `BEFORE <date>`: Messages whose internal date (disregarding time and timezone) is earlier than the specified date.
1201    ///  - `SINCE <date>`: Messages whose internal date (disregarding time and timezone) is within or later than the specified date.
1202    pub async fn search<S: AsRef<str>>(&mut self, query: S) -> Result<HashSet<Seq>> {
1203        let id = self
1204            .run_command(&format!("SEARCH {}", query.as_ref()))
1205            .await?;
1206        let seqs = parse_ids(
1207            &mut self.conn.stream,
1208            self.unsolicited_responses_tx.clone(),
1209            id,
1210        )
1211        .await?;
1212
1213        Ok(seqs)
1214    }
1215
1216    /// Equivalent to [`Session::search`], except that the returned identifiers
1217    /// are [`Uid`] instead of [`Seq`]. See also the [`UID`
1218    /// command](https://tools.ietf.org/html/rfc3501#section-6.4.8).
1219    pub async fn uid_search<S: AsRef<str>>(&mut self, query: S) -> Result<HashSet<Uid>> {
1220        let id = self
1221            .run_command(&format!("UID SEARCH {}", query.as_ref()))
1222            .await?;
1223        let uids = parse_ids(
1224            &mut self.conn.stream,
1225            self.unsolicited_responses_tx.clone(),
1226            id,
1227        )
1228        .await?;
1229
1230        Ok(uids)
1231    }
1232
1233    /// The [`GETQUOTA` command](https://tools.ietf.org/html/rfc2087#section-4.2)
1234    pub async fn get_quota(&mut self, quota_root: &str) -> Result<Quota> {
1235        let id = self
1236            .run_command(format!("GETQUOTA {}", quote!(quota_root)))
1237            .await?;
1238        let c = parse_get_quota(
1239            &mut self.conn.stream,
1240            self.unsolicited_responses_tx.clone(),
1241            id,
1242        )
1243        .await?;
1244        Ok(c)
1245    }
1246
1247    /// The [`GETQUOTAROOT` command](https://tools.ietf.org/html/rfc2087#section-4.3)
1248    pub async fn get_quota_root(
1249        &mut self,
1250        mailbox_name: &str,
1251    ) -> Result<(Vec<QuotaRoot>, Vec<Quota>)> {
1252        let id = self
1253            .run_command(format!("GETQUOTAROOT {}", quote!(mailbox_name)))
1254            .await?;
1255        let c = parse_get_quota_root(
1256            &mut self.conn.stream,
1257            self.unsolicited_responses_tx.clone(),
1258            id,
1259        )
1260        .await?;
1261        Ok(c)
1262    }
1263
1264    /// The [`GETMETADATA` command](https://datatracker.ietf.org/doc/html/rfc5464.html#section-4.2)
1265    pub async fn get_metadata(
1266        &mut self,
1267        mailbox_name: &str,
1268        options: &str,
1269        entry_specifier: &str,
1270    ) -> Result<Vec<Metadata>> {
1271        let options = if options.is_empty() {
1272            String::new()
1273        } else {
1274            format!(" {options}")
1275        };
1276        let id = self
1277            .run_command(format!(
1278                "GETMETADATA {} {}{}",
1279                quote!(mailbox_name),
1280                options,
1281                entry_specifier
1282            ))
1283            .await?;
1284        let metadata = parse_metadata(
1285            &mut self.conn.stream,
1286            mailbox_name,
1287            self.unsolicited_responses_tx.clone(),
1288            id,
1289        )
1290        .await?;
1291        Ok(metadata)
1292    }
1293
1294    /// The [`ID` command](https://datatracker.ietf.org/doc/html/rfc2971)
1295    ///
1296    /// `identification` is an iterable sequence of pairs such as `("name", Some("MyMailClient"))`.
1297    pub async fn id(
1298        &mut self,
1299        identification: impl IntoIterator<Item = (&str, Option<&str>)>,
1300    ) -> Result<Option<HashMap<String, String>>> {
1301        let id = self
1302            .run_command(format!("ID ({})", format_identification(identification)))
1303            .await?;
1304        let server_identification = parse_id(
1305            &mut self.conn.stream,
1306            self.unsolicited_responses_tx.clone(),
1307            id,
1308        )
1309        .await?;
1310        Ok(server_identification)
1311    }
1312
1313    /// Similar to `id`, but don't identify ourselves.
1314    ///
1315    /// Sends `ID NIL` command and returns server response.
1316    pub async fn id_nil(&mut self) -> Result<Option<HashMap<String, String>>> {
1317        let id = self.run_command("ID NIL").await?;
1318        let server_identification = parse_id(
1319            &mut self.conn.stream,
1320            self.unsolicited_responses_tx.clone(),
1321            id,
1322        )
1323        .await?;
1324        Ok(server_identification)
1325    }
1326
1327    // these are only here because they are public interface, the rest is in `Connection`
1328    /// Runs a command and checks if it returns OK.
1329    pub async fn run_command_and_check_ok<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
1330        self.conn
1331            .run_command_and_check_ok(
1332                command.as_ref(),
1333                Some(self.unsolicited_responses_tx.clone()),
1334            )
1335            .await?;
1336
1337        Ok(())
1338    }
1339
1340    /// Runs any command passed to it.
1341    pub async fn run_command<S: AsRef<str>>(&mut self, command: S) -> Result<RequestId> {
1342        let id = self.conn.run_command(command.as_ref()).await?;
1343
1344        Ok(id)
1345    }
1346
1347    /// Runs an arbitrary command, without adding a tag to it.
1348    pub async fn run_command_untagged<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
1349        self.conn.run_command_untagged(command.as_ref()).await?;
1350
1351        Ok(())
1352    }
1353
1354    /// Read the next response on the connection.
1355    pub async fn read_response(&mut self) -> Option<io::Result<ResponseData>> {
1356        self.conn.read_response().await
1357    }
1358
1359    /// Enable server capabilities for this authenticated session.
1360    pub async fn enable<S: AsRef<str>>(&mut self, caps: &[S]) -> Result<Capabilities> {
1361        let command = format!(
1362            "ENABLE {}",
1363            caps.iter()
1364                .map(|cap| cap.as_ref())
1365                .collect::<Vec<_>>()
1366                .join(" ")
1367        );
1368        let id = self.run_command(command).await?;
1369        parse_capabilities(
1370            &mut self.conn.stream,
1371            self.unsolicited_responses_tx.clone(),
1372            id,
1373        )
1374        .await
1375    }
1376
1377    /// Return namespace information as defined by RFC 2342.
1378    pub async fn namespace(&mut self) -> Result<Namespace> {
1379        let responses = self.collect_responses("NAMESPACE").await?;
1380        let mut namespace = Namespace::default();
1381        for response in &responses {
1382            if let Some(raw) = response.raw_str() {
1383                if raw.starts_with("* NAMESPACE ") {
1384                    namespace = parse_namespace_response(raw)?;
1385                }
1386            }
1387        }
1388        Ok(namespace)
1389    }
1390
1391    /// LIST with STATUS return data in one roundtrip.
1392    pub async fn list_status<S1: AsRef<str>, S2: AsRef<str>>(
1393        &mut self,
1394        reference_name: S1,
1395        mailbox_pattern: S2,
1396    ) -> Result<Vec<ListStatus>> {
1397        let command = format!(
1398            "LIST {} {} RETURN (STATUS (MESSAGES UNSEEN UIDNEXT UIDVALIDITY HIGHESTMODSEQ))",
1399            quote!(reference_name.as_ref()),
1400            quote!(mailbox_pattern.as_ref())
1401        );
1402        let id = self.run_command(command).await?;
1403        parse_list_status(
1404            &mut self.conn.stream,
1405            self.unsolicited_responses_tx.clone(),
1406            id,
1407        )
1408        .await
1409    }
1410
1411    /// SELECT a mailbox using QRESYNC parameters.
1412    pub async fn select_qresync<S1: AsRef<str>, S2: AsRef<str>>(
1413        &mut self,
1414        mailbox_name: S1,
1415        qresync_args: S2,
1416    ) -> Result<QresyncResponse> {
1417        let command = format!(
1418            "SELECT {} (QRESYNC ({}))",
1419            validate_str(mailbox_name.as_ref())?,
1420            qresync_args.as_ref()
1421        );
1422        let id = self.run_command(command).await?;
1423        parse_qresync(
1424            &mut self.conn.stream,
1425            self.unsolicited_responses_tx.clone(),
1426            id,
1427        )
1428        .await
1429    }
1430
1431    async fn collect_responses<S: AsRef<str>>(&mut self, command: S) -> Result<Vec<ResponseData>> {
1432        let id = self.run_command(command).await?;
1433        let mut responses = Vec::new();
1434
1435        while let Some(response) = self.read_response().await {
1436            let response = response?;
1437            match response.parsed() {
1438                Response::Done {
1439                    tag,
1440                    status,
1441                    code,
1442                    information,
1443                    ..
1444                } if tag == &id => {
1445                    self.conn
1446                        .check_status_ok(status, code.as_ref(), information.as_deref())?;
1447                    break;
1448                }
1449                _ => responses.push(response),
1450            }
1451        }
1452
1453        Ok(responses)
1454    }
1455}
1456
1457fn parse_namespace_response(raw: &str) -> Result<Namespace> {
1458    let payload = raw
1459        .trim()
1460        .strip_prefix("* NAMESPACE ")
1461        .ok_or_else(|| Error::Parse(ParseError::Invalid(raw.as_bytes().to_vec())))?;
1462    let sections = split_namespace_sections(payload)?;
1463    Ok(Namespace {
1464        personal: parse_namespace_group(sections.first().copied().unwrap_or("NIL")),
1465        other_users: parse_namespace_group(sections.get(1).copied().unwrap_or("NIL")),
1466        shared: parse_namespace_group(sections.get(2).copied().unwrap_or("NIL")),
1467    })
1468}
1469
1470fn split_namespace_sections(payload: &str) -> Result<Vec<&str>> {
1471    let mut sections = Vec::new();
1472    let mut depth = 0i32;
1473    let mut in_quotes = false;
1474    let bytes = payload.as_bytes();
1475    let mut start = 0usize;
1476
1477    for (idx, ch) in payload.char_indices() {
1478        match ch {
1479            '"' if idx == 0 || bytes[idx.saturating_sub(1)] != b'\\' => in_quotes = !in_quotes,
1480            '(' if !in_quotes => depth += 1,
1481            ')' if !in_quotes => depth -= 1,
1482            ' ' if !in_quotes && depth == 0 => {
1483                sections.push(payload[start..idx].trim());
1484                start = idx + 1;
1485            }
1486            _ => {}
1487        }
1488    }
1489
1490    sections.push(payload[start..].trim());
1491    if sections.len() == 3 {
1492        Ok(sections)
1493    } else {
1494        Err(Error::Parse(ParseError::Invalid(
1495            payload.as_bytes().to_vec(),
1496        )))
1497    }
1498}
1499
1500fn parse_namespace_group(section: &str) -> Vec<NamespaceEntry> {
1501    if section.eq_ignore_ascii_case("NIL") {
1502        return Vec::new();
1503    }
1504
1505    let mut entries = Vec::new();
1506    let mut rest = section;
1507    while let Some(start) = rest.find("(\"") {
1508        let slice = &rest[start + 2..];
1509        let Some(prefix_end) = slice.find('"') else {
1510            break;
1511        };
1512        let prefix = &slice[..prefix_end];
1513        let slice = &slice[prefix_end + 1..].trim_start();
1514
1515        let (delimiter, next_rest) = if let Some(stripped) = slice.strip_prefix("NIL") {
1516            (None, stripped)
1517        } else if let Some(stripped) = slice.strip_prefix('"') {
1518            let Some(delim_end) = stripped.find('"') else {
1519                break;
1520            };
1521            (
1522                Some(stripped[..delim_end].to_string()),
1523                &stripped[delim_end + 1..],
1524            )
1525        } else {
1526            break;
1527        };
1528
1529        entries.push(NamespaceEntry {
1530            prefix: prefix.to_string(),
1531            delimiter,
1532        });
1533        rest = next_rest;
1534    }
1535
1536    entries
1537}
1538
1539impl<T: Read + Write + Unpin + fmt::Debug> Connection<T> {
1540    unsafe_pinned!(stream: ImapStream<T>);
1541
1542    /// Convert this connection into the raw underlying stream.
1543    pub fn into_inner(self) -> T {
1544        let Self { stream, .. } = self;
1545        stream.into_inner()
1546    }
1547
1548    /// Read the next response on the connection.
1549    pub async fn read_response(&mut self) -> Option<io::Result<ResponseData>> {
1550        self.stream.next().await
1551    }
1552
1553    pub(crate) async fn run_command_untagged(&mut self, command: &str) -> Result<()> {
1554        self.stream
1555            .encode(Request(None, command.as_bytes().into()))
1556            .await?;
1557        self.stream.flush().await?;
1558        Ok(())
1559    }
1560
1561    pub(crate) async fn run_command(&mut self, command: &str) -> Result<RequestId> {
1562        let request_id = self.request_ids.next().unwrap(); // safe: never returns Err
1563        self.stream
1564            .encode(Request(Some(request_id.clone()), command.as_bytes().into()))
1565            .await?;
1566        self.stream.flush().await?;
1567        Ok(request_id)
1568    }
1569
1570    /// Execute a command and check that the next response is a matching done.
1571    pub async fn run_command_and_check_ok(
1572        &mut self,
1573        command: &str,
1574        unsolicited: Option<channel::Sender<UnsolicitedResponse>>,
1575    ) -> Result<()> {
1576        let id = self.run_command(command).await?;
1577        self.check_done_ok(&id, unsolicited).await?;
1578
1579        Ok(())
1580    }
1581
1582    pub(crate) async fn check_done_ok(
1583        &mut self,
1584        id: &RequestId,
1585        unsolicited: Option<channel::Sender<UnsolicitedResponse>>,
1586    ) -> Result<()> {
1587        if let Some(first_res) = self.stream.next().await {
1588            self.check_done_ok_from(id, unsolicited, first_res?).await
1589        } else {
1590            Err(Error::ConnectionLost)
1591        }
1592    }
1593
1594    pub(crate) async fn check_done_ok_from(
1595        &mut self,
1596        id: &RequestId,
1597        unsolicited: Option<channel::Sender<UnsolicitedResponse>>,
1598        mut response: ResponseData,
1599    ) -> Result<()> {
1600        loop {
1601            if let Response::Done {
1602                status,
1603                code,
1604                information,
1605                tag,
1606            } = response.parsed()
1607            {
1608                self.check_status_ok(status, code.as_ref(), information.as_deref())?;
1609
1610                if tag == id {
1611                    return Ok(());
1612                }
1613            }
1614
1615            if let Some(unsolicited) = unsolicited.clone() {
1616                handle_unilateral(response, unsolicited);
1617            }
1618
1619            if let Some(res) = self.stream.next().await {
1620                response = res?;
1621            } else {
1622                return Err(Error::ConnectionLost);
1623            }
1624        }
1625    }
1626
1627    pub(crate) fn check_status_ok(
1628        &self,
1629        status: &imap_proto::Status,
1630        code: Option<&imap_proto::ResponseCode<'_>>,
1631        information: Option<&str>,
1632    ) -> Result<()> {
1633        use imap_proto::Status;
1634        match status {
1635            Status::Ok => Ok(()),
1636            Status::Bad => Err(Error::Bad(format!(
1637                "code: {:?}, info: {:?}",
1638                code, information
1639            ))),
1640            Status::No => Err(Error::No(format!(
1641                "code: {:?}, info: {:?}",
1642                code, information
1643            ))),
1644            _ => Err(Error::Io(io::Error::other(format!(
1645                "status: {:?}, code: {:?}, information: {:?}",
1646                status, code, information
1647            )))),
1648        }
1649    }
1650}
1651
1652fn validate_str(value: &str) -> Result<String> {
1653    let quoted = quote!(value);
1654    if quoted.find('\n').is_some() {
1655        return Err(Error::Validate(ValidateError('\n')));
1656    }
1657    if quoted.find('\r').is_some() {
1658        return Err(Error::Validate(ValidateError('\r')));
1659    }
1660    Ok(quoted)
1661}
1662
1663#[cfg(test)]
1664mod tests {
1665    use pretty_assertions::assert_eq;
1666
1667    use super::super::error::Result;
1668    use super::super::mock_stream::MockStream;
1669    use super::*;
1670    use std::borrow::Cow;
1671    use std::future::Future;
1672
1673    use async_std::sync::{Arc, Mutex};
1674    use imap_proto::Status;
1675
1676    macro_rules! mock_client {
1677        ($s:expr) => {
1678            Client::new($s)
1679        };
1680    }
1681
1682    macro_rules! mock_session {
1683        ($s:expr) => {
1684            Session::new(mock_client!($s).conn)
1685        };
1686    }
1687
1688    macro_rules! assert_eq_bytes {
1689        ($a:expr, $b:expr, $c:expr) => {
1690            assert_eq!(
1691                std::str::from_utf8($a).unwrap(),
1692                std::str::from_utf8($b).unwrap(),
1693                $c
1694            )
1695        };
1696    }
1697
1698    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1699    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1700    async fn fetch_body() {
1701        let response = "a0 OK Logged in.\r\n\
1702                        * 2 FETCH (BODY[TEXT] {3}\r\nfoo)\r\n\
1703                        a0 OK FETCH completed\r\n";
1704        let mut session = mock_session!(MockStream::new(response.as_bytes().to_vec()));
1705        session.read_response().await.unwrap().unwrap();
1706        session.read_response().await.unwrap().unwrap();
1707    }
1708
1709    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1710    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1711    async fn readline_delay_read() {
1712        let greeting = "* OK Dovecot ready.\r\n";
1713        let mock_stream = MockStream::default()
1714            .with_buf(greeting.as_bytes().to_vec())
1715            .with_delay();
1716
1717        let mut client = mock_client!(mock_stream);
1718        let actual_response = client.read_response().await.unwrap().unwrap();
1719        assert_eq!(
1720            actual_response.parsed(),
1721            &Response::Data {
1722                status: Status::Ok,
1723                code: None,
1724                information: Some(Cow::Borrowed("Dovecot ready.")),
1725            }
1726        );
1727    }
1728
1729    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1730    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1731    async fn readline_eof() {
1732        let mock_stream = MockStream::default().with_eof();
1733        let mut client = mock_client!(mock_stream);
1734        let res = client.read_response().await;
1735        assert!(res.is_none());
1736    }
1737
1738    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1739    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1740    #[should_panic]
1741    async fn readline_err() {
1742        // TODO Check the error test
1743        let mock_stream = MockStream::default().with_err();
1744        let mut client = mock_client!(mock_stream);
1745        client.read_response().await.unwrap().unwrap();
1746    }
1747
1748    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1749    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1750    async fn authenticate() {
1751        let response = b"+ YmFy\r\n\
1752                         A0001 OK Logged in\r\n"
1753            .to_vec();
1754        let command = "A0001 AUTHENTICATE PLAIN\r\n\
1755                       Zm9v\r\n";
1756        let mock_stream = MockStream::new(response);
1757        let client = mock_client!(mock_stream);
1758        enum Authenticate {
1759            Auth,
1760        }
1761        impl Authenticator for &Authenticate {
1762            type Response = Vec<u8>;
1763            fn process(&mut self, challenge: &[u8]) -> Self::Response {
1764                assert!(challenge == b"bar", "Invalid authenticate challenge");
1765                b"foo".to_vec()
1766            }
1767        }
1768        let session = client
1769            .authenticate("PLAIN", &Authenticate::Auth)
1770            .await
1771            .ok()
1772            .unwrap();
1773        assert_eq_bytes!(
1774            &session.stream.inner.written_buf,
1775            command.as_bytes(),
1776            "Invalid authenticate command"
1777        );
1778    }
1779
1780    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1781    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1782    async fn login() {
1783        let response = b"A0001 OK Logged in\r\n".to_vec();
1784        let username = "username";
1785        let password = "password";
1786        let command = format!("A0001 LOGIN {} {}\r\n", quote!(username), quote!(password));
1787        let mock_stream = MockStream::new(response);
1788        let client = mock_client!(mock_stream);
1789        if let Ok(session) = client.login(username, password).await {
1790            assert_eq!(
1791                session.stream.inner.written_buf,
1792                command.as_bytes().to_vec(),
1793                "Invalid login command"
1794            );
1795        } else {
1796            unreachable!("invalid login");
1797        }
1798    }
1799
1800    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1801    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1802    async fn logout() {
1803        let response = b"A0001 OK Logout completed.\r\n".to_vec();
1804        let command = "A0001 LOGOUT\r\n";
1805        let mock_stream = MockStream::new(response);
1806        let mut session = mock_session!(mock_stream);
1807        session.logout().await.unwrap();
1808        assert!(
1809            session.stream.inner.written_buf == command.as_bytes().to_vec(),
1810            "Invalid logout command"
1811        );
1812    }
1813
1814    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1815    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1816    async fn rename() {
1817        let response = b"A0001 OK RENAME completed\r\n".to_vec();
1818        let current_mailbox_name = "INBOX";
1819        let new_mailbox_name = "NEWINBOX";
1820        let command = format!(
1821            "A0001 RENAME {} {}\r\n",
1822            quote!(current_mailbox_name),
1823            quote!(new_mailbox_name)
1824        );
1825        let mock_stream = MockStream::new(response);
1826        let mut session = mock_session!(mock_stream);
1827        session
1828            .rename(current_mailbox_name, new_mailbox_name)
1829            .await
1830            .unwrap();
1831        assert!(
1832            session.stream.inner.written_buf == command.as_bytes().to_vec(),
1833            "Invalid rename command"
1834        );
1835    }
1836
1837    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1838    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1839    async fn subscribe() {
1840        let response = b"A0001 OK SUBSCRIBE completed\r\n".to_vec();
1841        let mailbox = "INBOX";
1842        let command = format!("A0001 SUBSCRIBE {}\r\n", quote!(mailbox));
1843        let mock_stream = MockStream::new(response);
1844        let mut session = mock_session!(mock_stream);
1845        session.subscribe(mailbox).await.unwrap();
1846        assert!(
1847            session.stream.inner.written_buf == command.as_bytes().to_vec(),
1848            "Invalid subscribe command"
1849        );
1850    }
1851
1852    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1853    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1854    async fn unsubscribe() {
1855        let response = b"A0001 OK UNSUBSCRIBE completed\r\n".to_vec();
1856        let mailbox = "INBOX";
1857        let command = format!("A0001 UNSUBSCRIBE {}\r\n", quote!(mailbox));
1858        let mock_stream = MockStream::new(response);
1859        let mut session = mock_session!(mock_stream);
1860        session.unsubscribe(mailbox).await.unwrap();
1861        assert!(
1862            session.stream.inner.written_buf == command.as_bytes().to_vec(),
1863            "Invalid unsubscribe command"
1864        );
1865    }
1866
1867    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1868    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1869    async fn expunge() {
1870        let response = b"A0001 OK EXPUNGE completed\r\n".to_vec();
1871        let mock_stream = MockStream::new(response);
1872        let mut session = mock_session!(mock_stream);
1873        session.expunge().await.unwrap().collect::<Vec<_>>().await;
1874        assert!(
1875            session.stream.inner.written_buf == b"A0001 EXPUNGE\r\n".to_vec(),
1876            "Invalid expunge command"
1877        );
1878    }
1879
1880    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1881    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1882    async fn uid_expunge() {
1883        let response = b"* 2 EXPUNGE\r\n\
1884            * 3 EXPUNGE\r\n\
1885            * 4 EXPUNGE\r\n\
1886            A0001 OK UID EXPUNGE completed\r\n"
1887            .to_vec();
1888        let mock_stream = MockStream::new(response);
1889        let mut session = mock_session!(mock_stream);
1890        session
1891            .uid_expunge("2:4")
1892            .await
1893            .unwrap()
1894            .collect::<Vec<_>>()
1895            .await;
1896        assert!(
1897            session.stream.inner.written_buf == b"A0001 UID EXPUNGE 2:4\r\n".to_vec(),
1898            "Invalid expunge command"
1899        );
1900    }
1901
1902    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1903    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1904    async fn check() {
1905        let response = b"A0001 OK CHECK completed\r\n".to_vec();
1906        let mock_stream = MockStream::new(response);
1907        let mut session = mock_session!(mock_stream);
1908        session.check().await.unwrap();
1909        assert!(
1910            session.stream.inner.written_buf == b"A0001 CHECK\r\n".to_vec(),
1911            "Invalid check command"
1912        );
1913    }
1914
1915    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1916    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1917    async fn examine() {
1918        let response = b"* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n\
1919            * OK [PERMANENTFLAGS ()] Read-only mailbox.\r\n\
1920            * 1 EXISTS\r\n\
1921            * 1 RECENT\r\n\
1922            * OK [UNSEEN 1] First unseen.\r\n\
1923            * OK [UIDVALIDITY 1257842737] UIDs valid\r\n\
1924            * OK [UIDNEXT 2] Predicted next UID\r\n\
1925            A0001 OK [READ-ONLY] Select completed.\r\n"
1926            .to_vec();
1927        let expected_mailbox = Mailbox {
1928            flags: vec![
1929                Flag::Answered,
1930                Flag::Flagged,
1931                Flag::Deleted,
1932                Flag::Seen,
1933                Flag::Draft,
1934            ],
1935            exists: 1,
1936            recent: 1,
1937            unseen: Some(1),
1938            permanent_flags: vec![],
1939            uid_next: Some(2),
1940            uid_validity: Some(1257842737),
1941            highest_modseq: None,
1942        };
1943        let mailbox_name = "INBOX";
1944        let command = format!("A0001 EXAMINE {}\r\n", quote!(mailbox_name));
1945        let mock_stream = MockStream::new(response);
1946        let mut session = mock_session!(mock_stream);
1947        let mailbox = session.examine(mailbox_name).await.unwrap();
1948        assert!(
1949            session.stream.inner.written_buf == command.as_bytes().to_vec(),
1950            "Invalid examine command"
1951        );
1952        assert_eq!(mailbox, expected_mailbox);
1953    }
1954
1955    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1956    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1957    async fn select() {
1958        let response = b"* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n\
1959            * OK [PERMANENTFLAGS (\\* \\Answered \\Flagged \\Deleted \\Draft \\Seen)] \
1960              Read-only mailbox.\r\n\
1961            * 1 EXISTS\r\n\
1962            * 1 RECENT\r\n\
1963            * OK [UNSEEN 1] First unseen.\r\n\
1964            * OK [UIDVALIDITY 1257842737] UIDs valid\r\n\
1965            * OK [UIDNEXT 2] Predicted next UID\r\n\
1966            * OK [HIGHESTMODSEQ 90060115205545359] Highest mailbox modsequence\r\n\
1967            A0001 OK [READ-ONLY] Select completed.\r\n"
1968            .to_vec();
1969        let expected_mailbox = Mailbox {
1970            flags: vec![
1971                Flag::Answered,
1972                Flag::Flagged,
1973                Flag::Deleted,
1974                Flag::Seen,
1975                Flag::Draft,
1976            ],
1977            exists: 1,
1978            recent: 1,
1979            unseen: Some(1),
1980            permanent_flags: vec![
1981                Flag::MayCreate,
1982                Flag::Answered,
1983                Flag::Flagged,
1984                Flag::Deleted,
1985                Flag::Draft,
1986                Flag::Seen,
1987            ],
1988            uid_next: Some(2),
1989            uid_validity: Some(1257842737),
1990            highest_modseq: Some(90060115205545359),
1991        };
1992        let mailbox_name = "INBOX";
1993        let command = format!("A0001 SELECT {}\r\n", quote!(mailbox_name));
1994        let mock_stream = MockStream::new(response);
1995        let mut session = mock_session!(mock_stream);
1996        let mailbox = session.select(mailbox_name).await.unwrap();
1997        assert!(
1998            session.stream.inner.written_buf == command.as_bytes().to_vec(),
1999            "Invalid select command"
2000        );
2001        assert_eq!(mailbox, expected_mailbox);
2002    }
2003
2004    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2005    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2006    async fn search() {
2007        let response = b"* SEARCH 1 2 3 4 5\r\n\
2008            A0001 OK Search completed\r\n"
2009            .to_vec();
2010        let mock_stream = MockStream::new(response);
2011        let mut session = mock_session!(mock_stream);
2012        let ids = session.search("Unseen").await.unwrap();
2013        let ids: HashSet<u32> = ids.iter().cloned().collect();
2014        assert!(
2015            session.stream.inner.written_buf == b"A0001 SEARCH Unseen\r\n".to_vec(),
2016            "Invalid search command"
2017        );
2018        assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect());
2019    }
2020
2021    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2022    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2023    async fn uid_search() {
2024        let response = b"* SEARCH 1 2 3 4 5\r\n\
2025            A0001 OK Search completed\r\n"
2026            .to_vec();
2027        let mock_stream = MockStream::new(response);
2028        let mut session = mock_session!(mock_stream);
2029        let ids = session.uid_search("Unseen").await.unwrap();
2030        let ids: HashSet<Uid> = ids.iter().cloned().collect();
2031        assert!(
2032            session.stream.inner.written_buf == b"A0001 UID SEARCH Unseen\r\n".to_vec(),
2033            "Invalid search command"
2034        );
2035        assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect());
2036    }
2037
2038    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2039    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2040    async fn uid_search_unordered() {
2041        let response = b"* SEARCH 1 2 3 4 5\r\n\
2042            A0002 OK CAPABILITY completed\r\n\
2043            A0001 OK Search completed\r\n"
2044            .to_vec();
2045        let mock_stream = MockStream::new(response);
2046        let mut session = mock_session!(mock_stream);
2047        let ids = session.uid_search("Unseen").await.unwrap();
2048        let ids: HashSet<Uid> = ids.iter().cloned().collect();
2049        assert!(
2050            session.stream.inner.written_buf == b"A0001 UID SEARCH Unseen\r\n".to_vec(),
2051            "Invalid search command"
2052        );
2053        assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect());
2054    }
2055
2056    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2057    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2058    async fn capability() {
2059        let response = b"* CAPABILITY IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r\n\
2060            A0001 OK CAPABILITY completed\r\n"
2061            .to_vec();
2062        let expected_capabilities = vec!["IMAP4rev1", "STARTTLS", "AUTH=GSSAPI", "LOGINDISABLED"];
2063        let mock_stream = MockStream::new(response);
2064        let mut session = mock_session!(mock_stream);
2065        let capabilities = session.capabilities().await.unwrap();
2066        assert!(
2067            session.stream.inner.written_buf == b"A0001 CAPABILITY\r\n".to_vec(),
2068            "Invalid capability command"
2069        );
2070        assert_eq!(capabilities.len(), 4);
2071        for e in expected_capabilities {
2072            assert!(capabilities.has_str(e));
2073        }
2074    }
2075
2076    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2077    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2078    async fn create() {
2079        let response = b"A0001 OK CREATE completed\r\n".to_vec();
2080        let mailbox_name = "INBOX";
2081        let command = format!("A0001 CREATE {}\r\n", quote!(mailbox_name));
2082        let mock_stream = MockStream::new(response);
2083        let mut session = mock_session!(mock_stream);
2084        session.create(mailbox_name).await.unwrap();
2085        assert!(
2086            session.stream.inner.written_buf == command.as_bytes().to_vec(),
2087            "Invalid create command"
2088        );
2089    }
2090
2091    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2092    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2093    async fn delete() {
2094        let response = b"A0001 OK DELETE completed\r\n".to_vec();
2095        let mailbox_name = "INBOX";
2096        let command = format!("A0001 DELETE {}\r\n", quote!(mailbox_name));
2097        let mock_stream = MockStream::new(response);
2098        let mut session = mock_session!(mock_stream);
2099        session.delete(mailbox_name).await.unwrap();
2100        assert!(
2101            session.stream.inner.written_buf == command.as_bytes().to_vec(),
2102            "Invalid delete command"
2103        );
2104    }
2105
2106    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2107    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2108    async fn noop() {
2109        let response = b"A0001 OK NOOP completed\r\n".to_vec();
2110        let mock_stream = MockStream::new(response);
2111        let mut session = mock_session!(mock_stream);
2112        session.noop().await.unwrap();
2113        assert!(
2114            session.stream.inner.written_buf == b"A0001 NOOP\r\n".to_vec(),
2115            "Invalid noop command"
2116        );
2117    }
2118
2119    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2120    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2121    async fn close() {
2122        let response = b"A0001 OK CLOSE completed\r\n".to_vec();
2123        let mock_stream = MockStream::new(response);
2124        let mut session = mock_session!(mock_stream);
2125        session.close().await.unwrap();
2126        assert!(
2127            session.stream.inner.written_buf == b"A0001 CLOSE\r\n".to_vec(),
2128            "Invalid close command"
2129        );
2130    }
2131
2132    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2133    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2134    async fn store() {
2135        generic_store(" ", |c, set, query| async move {
2136            c.lock()
2137                .await
2138                .store(set, query)
2139                .await?
2140                .collect::<Vec<_>>()
2141                .await;
2142            Ok(())
2143        })
2144        .await;
2145    }
2146
2147    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2148    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2149    async fn uid_store() {
2150        generic_store(" UID ", |c, set, query| async move {
2151            c.lock()
2152                .await
2153                .uid_store(set, query)
2154                .await?
2155                .collect::<Vec<_>>()
2156                .await;
2157            Ok(())
2158        })
2159        .await;
2160    }
2161
2162    async fn generic_store<'a, F, T, K>(prefix: &'a str, op: F)
2163    where
2164        F: 'a + FnOnce(Arc<Mutex<Session<MockStream>>>, &'a str, &'a str) -> K,
2165        K: 'a + Future<Output = Result<T>>,
2166    {
2167        let res = "* 2 FETCH (FLAGS (\\Deleted \\Seen))\r\n\
2168                   * 3 FETCH (FLAGS (\\Deleted))\r\n\
2169                   * 4 FETCH (FLAGS (\\Deleted \\Flagged \\Seen))\r\n\
2170                   A0001 OK STORE completed\r\n";
2171
2172        generic_with_uid(res, "STORE", "2.4", "+FLAGS (\\Deleted)", prefix, op).await;
2173    }
2174
2175    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2176    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2177    async fn copy() {
2178        generic_copy(" ", |c, set, query| async move {
2179            c.lock().await.copy(set, query).await?;
2180            Ok(())
2181        })
2182        .await;
2183    }
2184
2185    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2186    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2187    async fn uid_copy() {
2188        generic_copy(" UID ", |c, set, query| async move {
2189            c.lock().await.uid_copy(set, query).await?;
2190            Ok(())
2191        })
2192        .await;
2193    }
2194
2195    async fn generic_copy<'a, F, T, K>(prefix: &'a str, op: F)
2196    where
2197        F: 'a + FnOnce(Arc<Mutex<Session<MockStream>>>, &'a str, &'a str) -> K,
2198        K: 'a + Future<Output = Result<T>>,
2199    {
2200        generic_with_uid(
2201            "A0001 OK COPY completed\r\n",
2202            "COPY",
2203            "2:4",
2204            "MEETING",
2205            prefix,
2206            op,
2207        )
2208        .await;
2209    }
2210
2211    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2212    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2213    async fn mv() {
2214        let response = b"* OK [COPYUID 1511554416 142,399 41:42] Moved UIDs.\r\n\
2215            * 2 EXPUNGE\r\n\
2216            * 1 EXPUNGE\r\n\
2217            A0001 OK Move completed\r\n"
2218            .to_vec();
2219        let mailbox_name = "MEETING";
2220        let command = format!("A0001 MOVE 1:2 {}\r\n", quote!(mailbox_name));
2221        let mock_stream = MockStream::new(response);
2222        let mut session = mock_session!(mock_stream);
2223        session.mv("1:2", mailbox_name).await.unwrap();
2224        assert!(
2225            session.stream.inner.written_buf == command.as_bytes().to_vec(),
2226            "Invalid move command"
2227        );
2228    }
2229
2230    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2231    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2232    async fn uid_mv() {
2233        let response = b"* OK [COPYUID 1511554416 142,399 41:42] Moved UIDs.\r\n\
2234            * 2 EXPUNGE\r\n\
2235            * 1 EXPUNGE\r\n\
2236            A0001 OK Move completed\r\n"
2237            .to_vec();
2238        let mailbox_name = "MEETING";
2239        let command = format!("A0001 UID MOVE 41:42 {}\r\n", quote!(mailbox_name));
2240        let mock_stream = MockStream::new(response);
2241        let mut session = mock_session!(mock_stream);
2242        session.uid_mv("41:42", mailbox_name).await.unwrap();
2243        assert!(
2244            session.stream.inner.written_buf == command.as_bytes().to_vec(),
2245            "Invalid uid move command"
2246        );
2247    }
2248
2249    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2250    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2251    async fn fetch() {
2252        generic_fetch(" ", |c, seq, query| async move {
2253            c.lock()
2254                .await
2255                .fetch(seq, query)
2256                .await?
2257                .collect::<Vec<_>>()
2258                .await;
2259
2260            Ok(())
2261        })
2262        .await;
2263    }
2264
2265    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2266    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2267    async fn uid_fetch() {
2268        generic_fetch(" UID ", |c, seq, query| async move {
2269            c.lock()
2270                .await
2271                .uid_fetch(seq, query)
2272                .await?
2273                .collect::<Vec<_>>()
2274                .await;
2275            Ok(())
2276        })
2277        .await;
2278    }
2279
2280    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2281    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2282    async fn fetch_unexpected_eof() {
2283        // Connection is lost, there will never be any response.
2284        let response = b"".to_vec();
2285
2286        let mock_stream = MockStream::new(response);
2287        let mut session = mock_session!(mock_stream);
2288
2289        {
2290            let mut fetch_result = session
2291                .uid_fetch("1:*", "(FLAGS BODY.PEEK[])")
2292                .await
2293                .unwrap();
2294
2295            // Unexpected EOF.
2296            let err = fetch_result.next().await.unwrap().unwrap_err();
2297            let Error::Io(io_err) = err else {
2298                panic!("Unexpected error type: {err}")
2299            };
2300            assert_eq!(io_err.kind(), io::ErrorKind::UnexpectedEof);
2301        }
2302
2303        assert_eq!(
2304            session.stream.inner.written_buf,
2305            b"A0001 UID FETCH 1:* (FLAGS BODY.PEEK[])\r\n".to_vec()
2306        );
2307    }
2308
2309    async fn generic_fetch<'a, F, T, K>(prefix: &'a str, op: F)
2310    where
2311        F: 'a + FnOnce(Arc<Mutex<Session<MockStream>>>, &'a str, &'a str) -> K,
2312        K: 'a + Future<Output = Result<T>>,
2313    {
2314        generic_with_uid(
2315            "A0001 OK FETCH completed\r\n",
2316            "FETCH",
2317            "1",
2318            "BODY[]",
2319            prefix,
2320            op,
2321        )
2322        .await;
2323    }
2324
2325    async fn generic_with_uid<'a, F, T, K>(
2326        res: &'a str,
2327        cmd: &'a str,
2328        seq: &'a str,
2329        query: &'a str,
2330        prefix: &'a str,
2331        op: F,
2332    ) where
2333        F: 'a + FnOnce(Arc<Mutex<Session<MockStream>>>, &'a str, &'a str) -> K,
2334        K: 'a + Future<Output = Result<T>>,
2335    {
2336        let resp = res.as_bytes().to_vec();
2337        let line = format!("A0001{}{} {} {}\r\n", prefix, cmd, seq, query);
2338        let session = Arc::new(Mutex::new(mock_session!(MockStream::new(resp))));
2339
2340        {
2341            let _ = op(session.clone(), seq, query).await.unwrap();
2342        }
2343        assert!(
2344            session.lock().await.stream.inner.written_buf == line.as_bytes().to_vec(),
2345            "Invalid command"
2346        );
2347    }
2348
2349    #[test]
2350    fn quote_backslash() {
2351        assert_eq!("\"test\\\\text\"", quote!(r"test\text"));
2352    }
2353
2354    #[test]
2355    fn quote_dquote() {
2356        assert_eq!("\"test\\\"text\"", quote!("test\"text"));
2357    }
2358
2359    #[test]
2360    fn validate_random() {
2361        assert_eq!(
2362            "\"~iCQ_k;>[&\\\"sVCvUW`e<<P!wJ\"",
2363            &validate_str("~iCQ_k;>[&\"sVCvUW`e<<P!wJ").unwrap()
2364        );
2365    }
2366
2367    #[test]
2368    fn validate_newline() {
2369        if let Err(ref e) = validate_str("test\nstring") {
2370            if let Error::Validate(ref ve) = e {
2371                if ve.0 == '\n' {
2372                    return;
2373                }
2374            }
2375            panic!("Wrong error: {:?}", e);
2376        }
2377        panic!("No error");
2378    }
2379
2380    #[test]
2381    #[allow(unreachable_patterns)]
2382    fn validate_carriage_return() {
2383        if let Err(ref e) = validate_str("test\rstring") {
2384            if let Error::Validate(ref ve) = e {
2385                if ve.0 == '\r' {
2386                    return;
2387                }
2388            }
2389            panic!("Wrong error: {:?}", e);
2390        }
2391        panic!("No error");
2392    }
2393
2394    /// Emulates a server responding to `FETCH` requests
2395    /// with a body of 76 bytes of headers and N 74-byte lines,
2396    /// where N is the requested message sequence number.
2397    #[cfg(feature = "runtime-tokio")]
2398    async fn handle_client(stream: tokio::io::DuplexStream) -> Result<()> {
2399        use tokio::io::AsyncBufReadExt;
2400
2401        let (reader, mut writer) = tokio::io::split(stream);
2402        let reader = tokio::io::BufReader::new(reader);
2403
2404        let mut lines = reader.lines();
2405        while let Some(line) = lines.next_line().await? {
2406            let (request_id, request) = line.split_once(' ').unwrap();
2407            eprintln!("Received request {request_id}.");
2408
2409            let (id, _) = request
2410                .strip_prefix("FETCH ")
2411                .unwrap()
2412                .split_once(' ')
2413                .unwrap();
2414            let id = id.parse().unwrap();
2415
2416            let mut body = concat!(
2417                "From: Bob <bob@example.com>\r\n",
2418                "To: Alice <alice@example.org>\r\n",
2419                "Subject: Test\r\n",
2420                "Message-Id: <foobar@example.com>\r\n",
2421                "Date: Sun, 22 Mar 2020 00:00:00 +0100\r\n",
2422                "\r\n",
2423            )
2424            .to_string();
2425            for _ in 1..id {
2426                body +=
2427                    "012345678901234567890123456789012345678901234567890123456789012345678901\r\n";
2428            }
2429            let body_len = body.len();
2430
2431            let response = format!("* {id} FETCH (RFC822.SIZE {body_len} BODY[] {{{body_len}}}\r\n{body} FLAGS (\\Seen))\r\n");
2432            writer.write_all(response.as_bytes()).await?;
2433            writer
2434                .write_all(format!("{request_id} OK FETCH completed\r\n").as_bytes())
2435                .await?;
2436            writer.flush().await?;
2437        }
2438
2439        Ok(())
2440    }
2441
2442    /// Test requestng 1000 messages each larger than a previous one.
2443    ///
2444    /// This is a regression test for v0.6.0 async-imap,
2445    /// which sometimes failed to allocate free buffer space,
2446    /// read into a buffer of zero size and erroneously detected it
2447    /// as the end of stream.
2448    #[cfg(feature = "runtime-tokio")]
2449    #[cfg_attr(
2450        feature = "runtime-tokio",
2451        tokio::test(flavor = "multi_thread", worker_threads = 2)
2452    )]
2453    async fn large_fetch() -> Result<()> {
2454        use futures::TryStreamExt;
2455
2456        let (client, server) = tokio::io::duplex(4096);
2457        tokio::spawn(handle_client(server));
2458
2459        let client = crate::Client::new(client);
2460        let mut imap_session = Session::new(client.conn);
2461
2462        for i in 200..300 {
2463            eprintln!("Fetching {i}.");
2464            let mut messages_stream = imap_session
2465                .fetch(format!("{i}"), "(RFC822.SIZE BODY.PEEK[] FLAGS)")
2466                .await?;
2467            let fetch = messages_stream
2468                .try_next()
2469                .await?
2470                .expect("no FETCH returned");
2471            let body = fetch.body().expect("message did not have a body!");
2472            assert_eq!(body.len(), 76 + 74 * i);
2473
2474            let no_fetch = messages_stream.try_next().await?;
2475            assert!(no_fetch.is_none());
2476            drop(messages_stream);
2477        }
2478
2479        Ok(())
2480    }
2481
2482    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2483    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2484    async fn status() {
2485        {
2486            let response = b"* STATUS INBOX (UIDNEXT 25)\r\n\
2487                A0001 OK [CLIENTBUG] Status on selected mailbox completed (0.001 + 0.000 secs).\r\n"
2488                .to_vec();
2489
2490            let mock_stream = MockStream::new(response);
2491            let mut session = mock_session!(mock_stream);
2492            let status = session.status("INBOX", "(UIDNEXT)").await.unwrap();
2493            assert_eq!(
2494                session.stream.inner.written_buf,
2495                b"A0001 STATUS \"INBOX\" (UIDNEXT)\r\n".to_vec()
2496            );
2497            assert_eq!(status.uid_next, Some(25));
2498        }
2499
2500        {
2501            let response = b"* STATUS INBOX (RECENT 15)\r\n\
2502                A0001 OK STATUS completed\r\n"
2503                .to_vec();
2504
2505            let mock_stream = MockStream::new(response);
2506            let mut session = mock_session!(mock_stream);
2507            let status = session.status("INBOX", "(RECENT)").await.unwrap();
2508            assert_eq!(
2509                session.stream.inner.written_buf,
2510                b"A0001 STATUS \"INBOX\" (RECENT)\r\n".to_vec()
2511            );
2512            assert_eq!(status.recent, 15);
2513        }
2514
2515        {
2516            // Example from RFC 3501.
2517            let response = b"* STATUS blurdybloop (MESSAGES 231 UIDNEXT 44292)\r\n\
2518                A0001 OK STATUS completed\r\n"
2519                .to_vec();
2520
2521            let mock_stream = MockStream::new(response);
2522            let mut session = mock_session!(mock_stream);
2523            let status = session
2524                .status("blurdybloop", "(UIDNEXT MESSAGES)")
2525                .await
2526                .unwrap();
2527            assert_eq!(
2528                session.stream.inner.written_buf,
2529                b"A0001 STATUS \"blurdybloop\" (UIDNEXT MESSAGES)\r\n".to_vec()
2530            );
2531            assert_eq!(status.uid_next, Some(44292));
2532            assert_eq!(status.exists, 231);
2533        }
2534    }
2535
2536    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2537    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2538    async fn append() {
2539        {
2540            // APPEND command when INBOX is *not* selected.
2541            //
2542            // Only APPENDUID response is returned.
2543            let response = b"+ OK\r\nA0001 OK [APPENDUID 1725735035 2] Append completed (0.052 + 12.097 + 0.049 secs).\r\n".to_vec();
2544
2545            let mock_stream = MockStream::new(response);
2546            let mut session = mock_session!(mock_stream);
2547            session
2548                .append("INBOX", Some(r"(\Seen)"), None, "foobarbaz")
2549                .await
2550                .unwrap();
2551            assert_eq!(
2552                session.stream.inner.written_buf,
2553                b"A0001 APPEND \"INBOX\" (\\Seen) {9}\r\nfoobarbaz\r\n".to_vec()
2554            );
2555        }
2556
2557        {
2558            // APPEND command when INBOX is selected.
2559            //
2560            // EXISTS response is returned before APPENDUID response is returned.
2561            let response = b"+ OK\r\n* 3 EXISTS\r\n* 2 RECENT\r\nA0001 OK [APPENDUID 1725735035 2] Append completed (0.052 + 12.097 + 0.049 secs).\r\n".to_vec();
2562
2563            let mock_stream = MockStream::new(response);
2564            let mut session = mock_session!(mock_stream);
2565            session
2566                .append("INBOX", Some(r"(\Seen)"), None, "foobarbaz")
2567                .await
2568                .unwrap();
2569            assert_eq!(
2570                session.stream.inner.written_buf,
2571                b"A0001 APPEND \"INBOX\" (\\Seen) {9}\r\nfoobarbaz\r\n".to_vec()
2572            );
2573            let exists_response = session.unsolicited_responses.recv().await.unwrap();
2574            assert_eq!(exists_response, UnsolicitedResponse::Exists(3));
2575            let recent_response = session.unsolicited_responses.recv().await.unwrap();
2576            assert_eq!(recent_response, UnsolicitedResponse::Recent(2));
2577        }
2578
2579        {
2580            // APPEND to nonexisting folder fails.
2581            let response =
2582                b"A0001 NO [TRYCREATE] Mailbox doesn't exist: foobar (0.001 + 0.000 secs)."
2583                    .to_vec();
2584            let mock_stream = MockStream::new(response);
2585            let mut session = mock_session!(mock_stream);
2586            session
2587                .append("foobar", None, None, "foobarbaz")
2588                .await
2589                .unwrap_err();
2590            assert_eq!(
2591                session.stream.inner.written_buf,
2592                b"A0001 APPEND \"foobar\" {9}\r\n".to_vec()
2593            );
2594        }
2595    }
2596
2597    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2598    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2599    async fn get_metadata() {
2600        {
2601            let response = b"* METADATA \"INBOX\" (/private/comment \"My own comment\")\r\n\
2602                A0001 OK GETMETADATA complete\r\n"
2603                .to_vec();
2604
2605            let mock_stream = MockStream::new(response);
2606            let mut session = mock_session!(mock_stream);
2607            let metadata = session
2608                .get_metadata("INBOX", "", "/private/comment")
2609                .await
2610                .unwrap();
2611            assert_eq!(
2612                session.stream.inner.written_buf,
2613                b"A0001 GETMETADATA \"INBOX\" /private/comment\r\n".to_vec()
2614            );
2615            assert_eq!(metadata.len(), 1);
2616            assert_eq!(metadata[0].entry, "/private/comment");
2617            assert_eq!(metadata[0].value.as_ref().unwrap(), "My own comment");
2618        }
2619
2620        {
2621            let response = b"* METADATA \"INBOX\" (/shared/comment \"Shared comment\" /private/comment \"My own comment\")\r\n\
2622                A0001 OK GETMETADATA complete\r\n"
2623                .to_vec();
2624
2625            let mock_stream = MockStream::new(response);
2626            let mut session = mock_session!(mock_stream);
2627            let metadata = session
2628                .get_metadata("INBOX", "", "(/shared/comment /private/comment)")
2629                .await
2630                .unwrap();
2631            assert_eq!(
2632                session.stream.inner.written_buf,
2633                b"A0001 GETMETADATA \"INBOX\" (/shared/comment /private/comment)\r\n".to_vec()
2634            );
2635            assert_eq!(metadata.len(), 2);
2636            assert_eq!(metadata[0].entry, "/shared/comment");
2637            assert_eq!(metadata[0].value.as_ref().unwrap(), "Shared comment");
2638            assert_eq!(metadata[1].entry, "/private/comment");
2639            assert_eq!(metadata[1].value.as_ref().unwrap(), "My own comment");
2640        }
2641
2642        {
2643            let response = b"* METADATA \"\" (/shared/comment {15}\r\nChatmail server /shared/admin {28}\r\nmailto:root@nine.testrun.org)\r\n\
2644                A0001 OK OK Getmetadata completed (0.001 + 0.000 secs).\r\n"
2645                .to_vec();
2646
2647            let mock_stream = MockStream::new(response);
2648            let mut session = mock_session!(mock_stream);
2649            let metadata = session
2650                .get_metadata("", "", "(/shared/comment /shared/admin)")
2651                .await
2652                .unwrap();
2653            assert_eq!(
2654                session.stream.inner.written_buf,
2655                b"A0001 GETMETADATA \"\" (/shared/comment /shared/admin)\r\n".to_vec()
2656            );
2657            assert_eq!(metadata.len(), 2);
2658            assert_eq!(metadata[0].entry, "/shared/comment");
2659            assert_eq!(metadata[0].value.as_ref().unwrap(), "Chatmail server");
2660            assert_eq!(metadata[1].entry, "/shared/admin");
2661            assert_eq!(
2662                metadata[1].value.as_ref().unwrap(),
2663                "mailto:root@nine.testrun.org"
2664            );
2665        }
2666
2667        {
2668            let response = b"* METADATA \"\" (/shared/comment \"Chatmail server\")\r\n\
2669                * METADATA \"\" (/shared/admin \"mailto:root@nine.testrun.org\")\r\n\
2670                A0001 OK OK Getmetadata completed (0.001 + 0.000 secs).\r\n"
2671                .to_vec();
2672
2673            let mock_stream = MockStream::new(response);
2674            let mut session = mock_session!(mock_stream);
2675            let metadata = session
2676                .get_metadata("", "", "(/shared/comment /shared/admin)")
2677                .await
2678                .unwrap();
2679            assert_eq!(
2680                session.stream.inner.written_buf,
2681                b"A0001 GETMETADATA \"\" (/shared/comment /shared/admin)\r\n".to_vec()
2682            );
2683            assert_eq!(metadata.len(), 2);
2684            assert_eq!(metadata[0].entry, "/shared/comment");
2685            assert_eq!(metadata[0].value.as_ref().unwrap(), "Chatmail server");
2686            assert_eq!(metadata[1].entry, "/shared/admin");
2687            assert_eq!(
2688                metadata[1].value.as_ref().unwrap(),
2689                "mailto:root@nine.testrun.org"
2690            );
2691        }
2692
2693        {
2694            let response = b"* METADATA \"\" (/shared/comment NIL /shared/admin NIL)\r\n\
2695                A0001 OK OK Getmetadata completed (0.001 + 0.000 secs).\r\n"
2696                .to_vec();
2697
2698            let mock_stream = MockStream::new(response);
2699            let mut session = mock_session!(mock_stream);
2700            let metadata = session
2701                .get_metadata("", "", "(/shared/comment /shared/admin)")
2702                .await
2703                .unwrap();
2704            assert_eq!(
2705                session.stream.inner.written_buf,
2706                b"A0001 GETMETADATA \"\" (/shared/comment /shared/admin)\r\n".to_vec()
2707            );
2708            assert_eq!(metadata.len(), 2);
2709            assert_eq!(metadata[0].entry, "/shared/comment");
2710            assert_eq!(metadata[0].value, None);
2711            assert_eq!(metadata[1].entry, "/shared/admin");
2712            assert_eq!(metadata[1].value, None);
2713        }
2714    }
2715
2716    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2717    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2718    async fn test_get_quota_root() {
2719        {
2720            let response = b"* QUOTAROOT Sent Userquota\r\n\
2721                    * QUOTA Userquota (STORAGE 4855 48576)\r\n\
2722                    A0001 OK Getquotaroot completed (0.004 + 0.000 + 0.004 secs).\r\n"
2723                .to_vec();
2724
2725            let mock_stream = MockStream::new(response);
2726            let mut session = mock_session!(mock_stream);
2727            let (quotaroots, quota) = dbg!(session.get_quota_root("Sent").await.unwrap());
2728            assert_eq!(
2729                str::from_utf8(&session.stream.inner.written_buf).unwrap(),
2730                "A0001 GETQUOTAROOT \"Sent\"\r\n"
2731            );
2732            assert_eq!(
2733                quotaroots,
2734                vec![QuotaRoot {
2735                    mailbox_name: "Sent".to_string(),
2736                    quota_root_names: vec!["Userquota".to_string(),],
2737                },],
2738            );
2739            assert_eq!(
2740                quota,
2741                vec![Quota {
2742                    root_name: "Userquota".to_string(),
2743                    resources: vec![QuotaResource {
2744                        name: QuotaResourceName::Storage,
2745                        usage: 4855,
2746                        limit: 48576,
2747                    }],
2748                }]
2749            );
2750            assert_eq!(quota[0].resources[0].get_usage_percentage(), 9);
2751        }
2752
2753        {
2754            let response = b"* QUOTAROOT \"INBOX\" \"#19\"\r\n\
2755                    * QUOTA \"#19\" (STORAGE 0 0)\r\n\
2756                    A0001 OK GETQUOTAROOT successful.\r\n"
2757                .to_vec();
2758
2759            let mock_stream = MockStream::new(response);
2760            let mut session = mock_session!(mock_stream);
2761            let (quotaroots, quota) = session.get_quota_root("INBOX").await.unwrap();
2762            assert_eq!(
2763                str::from_utf8(&session.stream.inner.written_buf).unwrap(),
2764                "A0001 GETQUOTAROOT \"INBOX\"\r\n"
2765            );
2766            assert_eq!(
2767                quotaroots,
2768                vec![QuotaRoot {
2769                    mailbox_name: "INBOX".to_string(),
2770                    quota_root_names: vec!["#19".to_string(),],
2771                },],
2772            );
2773            assert_eq!(
2774                quota,
2775                vec![Quota {
2776                    root_name: "#19".to_string(),
2777                    resources: vec![QuotaResource {
2778                        name: QuotaResourceName::Storage,
2779                        usage: 0,
2780                        limit: 0,
2781                    }],
2782                }]
2783            );
2784            assert_eq!(quota[0].resources[0].get_usage_percentage(), 0);
2785        }
2786    }
2787
2788    #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2789    #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2790    async fn test_parsing_error() {
2791        // Simulate someone connecting to SMTP server with IMAP client.
2792        let response = b"220 mail.example.org ESMTP Postcow\r\n".to_vec();
2793        let command = "A0001 NOOP\r\n";
2794        let mock_stream = MockStream::new(response);
2795        let mut session = mock_session!(mock_stream);
2796        assert!(session
2797            .noop()
2798            .await
2799            .unwrap_err()
2800            .to_string()
2801            .contains("220 mail.example.org ESMTP Postcow"));
2802        assert!(
2803            session.stream.inner.written_buf == command.as_bytes().to_vec(),
2804            "Invalid NOOP command"
2805        );
2806    }
2807}