Skip to main content

imap_client/
session.rs

1//! Type-state IMAP session. Compile-time enforcement of valid state
2//! transitions plus credential-vs-cleartext separation.
3//!
4//! ## States
5//!
6//! | State          | Allowed commands |
7//! |----------------|------------------|
8//! | Unauthenticated | `STARTTLS` (PlainText only), `LOGIN` / `AUTHENTICATE` (Tls only), `CAPABILITY`, `LOGOUT`, `NOOP` |
9//! | Authenticated  | `SELECT` / `EXAMINE`, `LIST`, `LSUB`, `STATUS`, `LOGOUT`, `NOOP`, `CAPABILITY`, `ENABLE` |
10//! | Selected       | All Authenticated commands plus `FETCH`, `STORE`, `SEARCH`, `EXPUNGE`, `IDLE`, `CLOSE`, `UNSELECT`, `MOVE` |
11//!
12//! ## Capability re-fetch
13//!
14//! Per RFC 3501 §6.1.1 / RFC 9051 §6.2 capabilities MUST be re-evaluated
15//! after STARTTLS and after a successful authentication. The session does
16//! this automatically: if the server's tagged OK response carries a
17//! `[CAPABILITY …]` response code we use it, otherwise we issue an
18//! explicit `CAPABILITY` round-trip.
19
20use std::marker::PhantomData;
21use std::time::Duration;
22
23use base64::Engine as _;
24use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
25use tokio::sync::broadcast;
26
27use crate::capabilities::Capabilities;
28use crate::client::RawClient;
29use crate::credentials::{Password, imap_quoted};
30use crate::error::ClientError;
31use crate::flags::{Flag, StoreAction};
32use crate::idle::IdleHandle;
33use crate::search::SearchQuery;
34
35use imap_core::ast::{DataResponse, FetchAttribute, Response};
36use imap_core::parser::parse_response;
37
38// --- State markers -----------------------------------------------------
39
40/// Type-state marker: the session has not yet authenticated.
41pub struct Unauthenticated;
42/// Type-state marker: the session is authenticated but no mailbox is selected.
43pub struct Authenticated;
44/// Type-state marker: a mailbox is selected and message commands are available.
45pub struct Selected;
46
47// --- Transport markers -------------------------------------------------
48
49/// Transport marker: the connection is cleartext (pre-TLS).
50pub struct PlainText;
51/// Transport marker: the connection is encrypted with TLS.
52pub struct Tls;
53
54/// Generic session enforcing compile-time state transitions.
55///
56/// `State` is one of [`Unauthenticated`], [`Authenticated`], or [`Selected`];
57/// `Transport` is [`PlainText`] or [`Tls`]. The type parameters gate which
58/// commands are callable — e.g. `LOGIN` only exists on
59/// `Session<Unauthenticated, Tls>`.
60pub struct Session<State, Transport> {
61    raw: RawClient,
62    /// Server capabilities known for this session, refreshed after STARTTLS
63    /// and after authentication.
64    pub capabilities: Capabilities,
65    _state: PhantomData<State>,
66    _transport: PhantomData<Transport>,
67}
68
69/// The decoded result of a `FETCH` for a single message.
70#[derive(Debug, Clone)]
71pub struct FetchResult {
72    /// The message sequence number.
73    pub seq: u32,
74    /// The message UID, if it was included in the fetch.
75    pub uid: Option<u32>,
76    /// The fetched body bytes (e.g. from `BODY[]`), if requested and present.
77    pub body: Option<Vec<u8>>,
78}
79
80// --- State-agnostic helpers --------------------------------------------
81
82impl<S, T> Session<S, T> {
83    fn transition_state<NewState>(self) -> Session<NewState, T> {
84        Session {
85            raw: self.raw,
86            capabilities: self.capabilities,
87            _state: PhantomData,
88            _transport: PhantomData,
89        }
90    }
91
92    /// Re-tag the session's transport type parameter (e.g. `PlainText` → `Tls`
93    /// after a STARTTLS upgrade). Performs no I/O.
94    pub fn transition_transport<NewTransport>(self) -> Session<S, NewTransport> {
95        Session {
96            raw: self.raw,
97            capabilities: self.capabilities,
98            _state: PhantomData,
99            _transport: PhantomData,
100        }
101    }
102
103    /// Subscribe to untagged server frames for this session
104    /// (see [`RawClient::events`](crate::client::RawClient::events)).
105    pub fn events(&self) -> broadcast::Receiver<Vec<u8>> {
106        self.raw.events()
107    }
108
109    /// Issue an explicit `CAPABILITY` command and update the cached set.
110    /// Used after STARTTLS and after authentication when the OK response
111    /// did not carry a `[CAPABILITY …]` response code.
112    pub async fn refresh_capabilities(&mut self) -> Result<(), ClientError> {
113        let mut events = self.raw.events();
114        let _resp = self.raw.execute_command("CAPABILITY").await?;
115        // The CAPABILITY response is broadcast as an untagged data frame.
116        while let Ok(event) = events.try_recv() {
117            if let Ok((_, response)) = parse_response(&event)
118                && self.capabilities.try_update_from(&response)
119            {
120                return Ok(());
121            }
122        }
123        Ok(())
124    }
125
126    /// If `frame` carries a `[CAPABILITY …]` response code, use it; else
127    /// issue an explicit `CAPABILITY` round-trip.
128    async fn refresh_capabilities_from_frame(&mut self, frame: &[u8]) -> Result<(), ClientError> {
129        if let Ok((_, response)) = parse_response(frame)
130            && self.capabilities.try_update_from(&response)
131        {
132            return Ok(());
133        }
134        self.refresh_capabilities().await
135    }
136}
137
138impl<T> Session<Unauthenticated, T> {
139    /// Wrap a connected [`RawClient`] as a fresh, unauthenticated session with
140    /// the given initial capabilities.
141    pub fn new(raw: RawClient, capabilities: Capabilities) -> Self {
142        Self::new_in_state(raw, capabilities)
143    }
144}
145
146impl<S, T> Session<S, T> {
147    pub(crate) fn new_in_state(raw: RawClient, capabilities: Capabilities) -> Self {
148        Self {
149            raw,
150            capabilities,
151            _state: PhantomData,
152            _transport: PhantomData,
153        }
154    }
155
156    /// `LOGOUT` is valid in any state; the session is consumed because the
157    /// connection is closed afterwards.
158    pub async fn logout(mut self) -> Result<(), ClientError> {
159        // Server replies `* BYE` (broadcast) then a tagged OK. Both BYE and
160        // ConnectionClosed are acceptable outcomes.
161        match self.raw.execute_command("LOGOUT").await {
162            Ok(_) | Err(ClientError::ConnectionClosed) => Ok(()),
163            Err(e) => Err(e),
164        }
165    }
166
167    /// `NOOP` keeps the connection alive and triggers status updates.
168    pub async fn noop(&mut self) -> Result<(), ClientError> {
169        self.raw.execute_command("NOOP").await.map(|_| ())
170    }
171}
172
173// --- Unauthenticated/Tls — login & SASL ---------------------------------
174
175impl Session<Unauthenticated, Tls> {
176    /// `LOGIN` over TLS. The username and password are sent as IMAP
177    /// quoted strings with proper escaping (`"`, `\`).
178    ///
179    /// Returns [`ClientError::CommandFailed`] if the credentials cannot be
180    /// represented as quoted strings (8-bit, control bytes, CR, LF). Use
181    /// [`Self::authenticate_plain`] in that case.
182    pub async fn login(
183        mut self,
184        user: &str,
185        pass: Password,
186    ) -> Result<Session<Authenticated, Tls>, ClientError> {
187        let user_q = imap_quoted(user)?;
188        let pass_q = pass.as_imap_quoted()?;
189        let cmd = format!("LOGIN {} {}", user_q, pass_q);
190
191        let frame = self.raw.execute_command(&cmd).await?;
192        self.refresh_capabilities_from_frame(&frame).await?;
193        Ok(self.transition_state())
194    }
195
196    /// `AUTHENTICATE PLAIN` (RFC 4616). Credentials travel as base64 over
197    /// the SASL exchange — no quoting limitations and 8-bit safe.
198    pub async fn authenticate_plain(
199        mut self,
200        user: &str,
201        pass: &Password,
202    ) -> Result<Session<Authenticated, Tls>, ClientError> {
203        if user.as_bytes().contains(&0) {
204            return Err(ClientError::CommandFailed(
205                "username must not contain NUL".into(),
206            ));
207        }
208        if pass.as_str().as_bytes().contains(&0) {
209            return Err(ClientError::CommandFailed(
210                "password must not contain NUL".into(),
211            ));
212        }
213
214        let mut events = self.raw.events();
215        let (_tag, reply_rx) = self.raw.send_command_async("AUTHENTICATE PLAIN").await?;
216
217        // Wait for `+ ...` continuation request.
218        wait_for_continuation(&mut events, Duration::from_secs(30)).await?;
219
220        // SASL PLAIN: \0<authzid>\0<authcid>\0<password>; we use empty authzid.
221        let mut sasl = Vec::with_capacity(2 + user.len() + pass.as_str().len());
222        sasl.push(0);
223        sasl.extend_from_slice(user.as_bytes());
224        sasl.push(0);
225        sasl.extend_from_slice(pass.as_str().as_bytes());
226        let mut payload = BASE64_STANDARD.encode(&sasl).into_bytes();
227        payload.extend_from_slice(b"\r\n");
228        self.raw.send_raw(payload).await?;
229
230        let frame = match reply_rx.await {
231            Ok(r) => r?,
232            Err(_) => return Err(ClientError::ConnectionClosed),
233        };
234        self.refresh_capabilities_from_frame(&frame).await?;
235        Ok(self.transition_state())
236    }
237}
238
239// --- Authenticated -----------------------------------------------------
240
241impl<T> Session<Authenticated, T> {
242    /// `SELECT` — open a mailbox read-write and transition to `Selected`.
243    pub async fn select(mut self, mailbox: &str) -> Result<Session<Selected, T>, ClientError> {
244        let mb = imap_quoted(mailbox)?;
245        let cmd = format!("SELECT {}", mb);
246        self.raw.execute_command(&cmd).await?;
247        Ok(self.transition_state())
248    }
249
250    /// `EXAMINE` — open a mailbox read-only and transition to `Selected`.
251    pub async fn examine(mut self, mailbox: &str) -> Result<Session<Selected, T>, ClientError> {
252        let mb = imap_quoted(mailbox)?;
253        let cmd = format!("EXAMINE {}", mb);
254        self.raw.execute_command(&cmd).await?;
255        Ok(self.transition_state())
256    }
257
258    /// `LIST` — list mailboxes matching `mailbox_mask` under `reference`.
259    pub async fn list(
260        &mut self,
261        reference: &str,
262        mailbox_mask: &str,
263    ) -> Result<Vec<u8>, ClientError> {
264        let cmd = format!(
265            "LIST {} {}",
266            imap_quoted(reference)?,
267            imap_quoted(mailbox_mask)?
268        );
269        self.raw.execute_command(&cmd).await
270    }
271}
272
273// --- Selected ----------------------------------------------------------
274
275impl<T> Session<Selected, T> {
276    /// `FETCH` returning the raw tagged-response bytes.
277    pub async fn fetch_raw(
278        &mut self,
279        sequence_set: &str,
280        items: &str,
281    ) -> Result<Vec<u8>, ClientError> {
282        let cmd = format!("FETCH {} {}", sequence_set, items);
283        self.raw.execute_command(&cmd).await
284    }
285
286    /// `FETCH` returning structured [`FetchResult`] entries derived from
287    /// the broadcast untagged FETCH frames.
288    pub async fn fetch(
289        &mut self,
290        sequence_set: &str,
291        items: &str,
292    ) -> Result<Vec<FetchResult>, ClientError> {
293        let mut events = self.raw.events();
294        let cmd = format!("FETCH {} {}", sequence_set, items);
295        let _resp = self.raw.execute_command(&cmd).await?;
296
297        let mut results = Vec::new();
298        while let Ok(event) = events.try_recv() {
299            if let Ok((_, Response::Data(DataResponse::Fetch { seq, attributes }))) =
300                parse_response(&event)
301            {
302                let mut uid = None;
303                let mut body = None;
304                for attr in attributes {
305                    match attr {
306                        FetchAttribute::Uid(u) => uid = Some(u),
307                        FetchAttribute::BodySection { data: Some(d), .. } => {
308                            body = Some(d.to_vec())
309                        }
310                        FetchAttribute::Body(b) => body = Some(b.to_vec()),
311                        FetchAttribute::Rfc822(b) => body = Some(b.to_vec()),
312                        _ => {}
313                    }
314                }
315                results.push(FetchResult { seq, uid, body });
316            }
317        }
318        Ok(results)
319    }
320
321    /// Convenience: fetch the body of the first message in `sequence_set`.
322    pub async fn fetch_body(&mut self, sequence_set: &str) -> Result<Option<String>, ClientError> {
323        let results = self.fetch(sequence_set, "BODY[]").await?;
324        if let Some(res) = results.first()
325            && let Some(body) = &res.body
326        {
327            return Ok(Some(String::from_utf8_lossy(body).into_owned()));
328        }
329        Ok(None)
330    }
331
332    /// `SEARCH` — return the message **sequence numbers** matching `query`.
333    pub async fn search(&mut self, query: SearchQuery) -> Result<Vec<u32>, ClientError> {
334        self.run_search(&format!("SEARCH {}", query.build())).await
335    }
336
337    /// `UID SEARCH` — return the message **UIDs** matching `query`.
338    pub async fn uid_search(&mut self, query: SearchQuery) -> Result<Vec<u32>, ClientError> {
339        self.run_search(&format!("UID SEARCH {}", query.build()))
340            .await
341    }
342
343    async fn run_search(&mut self, cmd: &str) -> Result<Vec<u32>, ClientError> {
344        let mut events = self.raw.events();
345        let _resp = self.raw.execute_command(cmd).await?;
346
347        let mut all_ids = Vec::new();
348        while let Ok(event) = events.try_recv() {
349            if let Ok((_, Response::Data(DataResponse::Search(ids)))) = parse_response(&event) {
350                all_ids.extend(ids);
351            }
352        }
353        Ok(all_ids)
354    }
355
356    /// `STORE` — apply a flag `action` (add / remove / set) to the messages in
357    /// `sequence_set` (a sequence-number set such as `"1:5"` or `"1,3,5"`).
358    pub async fn store(
359        &mut self,
360        sequence_set: &str,
361        action: StoreAction,
362        flags: &[Flag],
363    ) -> Result<Vec<u8>, ClientError> {
364        let flags_str = flags
365            .iter()
366            .map(|f| f.to_string())
367            .collect::<Vec<_>>()
368            .join(" ");
369        let cmd = format!(
370            "STORE {} {} ({})",
371            sequence_set,
372            action.to_imap_prefix(false),
373            flags_str
374        );
375        self.raw.execute_command(&cmd).await
376    }
377
378    /// `UID STORE` — like [`store`](Self::store) but addressed by UID set.
379    pub async fn uid_store(
380        &mut self,
381        uid_set: &str,
382        action: StoreAction,
383        flags: &[Flag],
384    ) -> Result<Vec<u8>, ClientError> {
385        let flags_str = flags
386            .iter()
387            .map(|f| f.to_string())
388            .collect::<Vec<_>>()
389            .join(" ");
390        let cmd = format!(
391            "UID STORE {} {} ({})",
392            uid_set,
393            action.to_imap_prefix(false),
394            flags_str
395        );
396        self.raw.execute_command(&cmd).await
397    }
398
399    /// `EXPUNGE` — permanently remove messages flagged `\Deleted` from the
400    /// selected mailbox.
401    pub async fn expunge(&mut self) -> Result<Vec<u8>, ClientError> {
402        self.raw.execute_command("EXPUNGE").await
403    }
404
405    /// `CLOSE` — implicitly expunges deleted messages and transitions back
406    /// to `Authenticated`.
407    pub async fn close_mailbox(mut self) -> Result<Session<Authenticated, T>, ClientError> {
408        self.raw.execute_command("CLOSE").await?;
409        Ok(self.transition_state())
410    }
411
412    /// `UNSELECT` (RFC 3691) — like `CLOSE` but without expunging. Errors
413    /// if the server has not advertised the `UNSELECT` capability.
414    pub async fn unselect(mut self) -> Result<Session<Authenticated, T>, ClientError> {
415        if !self.capabilities.unselect {
416            return Err(ClientError::UnsupportedCapability("UNSELECT"));
417        }
418        self.raw.execute_command("UNSELECT").await?;
419        Ok(self.transition_state())
420    }
421
422    /// `CHECK` — implementation-defined housekeeping checkpoint.
423    pub async fn check(&mut self) -> Result<(), ClientError> {
424        self.raw.execute_command("CHECK").await.map(|_| ())
425    }
426
427    /// `IDLE` (RFC 2177). Returns an [`IdleHandle`]; call `stop()` on it
428    /// to gracefully terminate. Callers must re-issue IDLE at least every
429    /// ~28 minutes — see [`crate::idle`] for details.
430    pub async fn idle(&mut self) -> Result<IdleHandle, ClientError> {
431        if !self.capabilities.idle {
432            return Err(ClientError::UnsupportedCapability("IDLE"));
433        }
434        let mut events = self.raw.events();
435        let writer = self.raw.writer();
436        let (_tag, reply_rx) = self.raw.send_command_async("IDLE").await?;
437        wait_for_continuation(&mut events, Duration::from_secs(30)).await?;
438        Ok(IdleHandle::new(writer, reply_rx))
439    }
440
441    /// `MOVE` (RFC 6851). Errors if the server has not advertised `MOVE`.
442    pub async fn move_messages(
443        &mut self,
444        sequence_set: &str,
445        mailbox: &str,
446    ) -> Result<Vec<u8>, ClientError> {
447        if !self.capabilities.move_ext {
448            return Err(ClientError::UnsupportedCapability("MOVE"));
449        }
450        let cmd = format!("MOVE {} {}", sequence_set, imap_quoted(mailbox)?);
451        self.raw.execute_command(&cmd).await
452    }
453}
454
455// --- Helpers -----------------------------------------------------------
456
457/// Wait for the next `+ ...` continuation request on the broadcast
458/// channel, dropping any other untagged frames in the interim.
459async fn wait_for_continuation(
460    events: &mut broadcast::Receiver<Vec<u8>>,
461    timeout: Duration,
462) -> Result<(), ClientError> {
463    let deadline = tokio::time::Instant::now() + timeout;
464    loop {
465        let now = tokio::time::Instant::now();
466        if now >= deadline {
467            return Err(ClientError::Timeout);
468        }
469        match tokio::time::timeout(deadline - now, events.recv()).await {
470            Ok(Ok(frame)) => {
471                if frame.starts_with(b"+") {
472                    return Ok(());
473                }
474                // Otherwise it's an unrelated untagged frame — keep waiting.
475            }
476            Ok(Err(broadcast::error::RecvError::Closed)) => {
477                return Err(ClientError::ConnectionClosed);
478            }
479            Ok(Err(broadcast::error::RecvError::Lagged(_))) => {
480                // We may have missed the continuation — the dispatcher's
481                // bound is generous, but treat lag as a protocol error
482                // rather than guessing.
483                return Err(ClientError::CommandFailed(
484                    "broadcast lagged; continuation may have been missed".into(),
485                ));
486            }
487            Err(_) => return Err(ClientError::Timeout),
488        }
489    }
490}
491
492// --- Tests -------------------------------------------------------------
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497    use crate::{Flag, SearchQuery, StoreAction, Tls};
498    use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex};
499
500    async fn read_cmd_tag(server: &mut tokio::io::DuplexStream) -> String {
501        let mut buf = [0u8; 1024];
502        let n = server.read(&mut buf).await.unwrap();
503        String::from_utf8_lossy(&buf[..n])
504            .split_whitespace()
505            .next()
506            .unwrap()
507            .to_owned()
508    }
509
510    fn unauth_session(client_io: tokio::io::DuplexStream) -> Session<Unauthenticated, Tls> {
511        let raw = RawClient::new(client_io);
512        Session::<Unauthenticated, Tls>::new_in_state(raw, Capabilities::default())
513    }
514
515    fn auth_session(client_io: tokio::io::DuplexStream) -> Session<Authenticated, Tls> {
516        let raw = RawClient::new(client_io);
517        Session::<Authenticated, Tls>::new_in_state(raw, Capabilities::default())
518    }
519
520    fn selected_session(client_io: tokio::io::DuplexStream) -> Session<Selected, Tls> {
521        let raw = RawClient::new(client_io);
522        Session::<Selected, Tls>::new_in_state(raw, Capabilities::default())
523    }
524
525    fn selected_session_with_caps(
526        client_io: tokio::io::DuplexStream,
527        caps: Capabilities,
528    ) -> Session<Selected, Tls> {
529        let raw = RawClient::new(client_io);
530        Session::<Selected, Tls>::new_in_state(raw, caps)
531    }
532
533    #[tokio::test]
534    async fn test_session_login() {
535        let (client_io, mut server_io) = duplex(1024);
536        let session = unauth_session(client_io);
537
538        let login_task =
539            tokio::spawn(async move { session.login("user", Password::new("pass")).await });
540
541        // Server reads `A0001 LOGIN "user" "pass"\r\n`
542        let mut buf = [0u8; 1024];
543        let n = server_io.read(&mut buf).await.unwrap();
544        let cmd = String::from_utf8_lossy(&buf[..n]).into_owned();
545        assert!(cmd.contains("LOGIN \"user\" \"pass\""));
546        let tag = cmd.split_whitespace().next().unwrap().to_owned();
547
548        server_io
549            .write_all(
550                format!("{} OK [CAPABILITY IMAP4rev2 IDLE] LOGIN completed\r\n", tag).as_bytes(),
551            )
552            .await
553            .unwrap();
554
555        let auth = login_task.await.unwrap().unwrap();
556        // No second CAPABILITY round-trip — caps came from the response code.
557        assert!(auth.capabilities.imap4rev2);
558        assert!(auth.capabilities.idle);
559    }
560
561    #[tokio::test]
562    async fn test_session_login_falls_back_to_capability_round_trip() {
563        let (client_io, mut server_io) = duplex(1024);
564        let session = unauth_session(client_io);
565
566        let login_task =
567            tokio::spawn(async move { session.login("user", Password::new("pass")).await });
568
569        let tag = read_cmd_tag(&mut server_io).await;
570        server_io
571            .write_all(format!("{} OK LOGIN completed\r\n", tag).as_bytes())
572            .await
573            .unwrap();
574
575        // Server now sees CAPABILITY round-trip.
576        let tag2 = read_cmd_tag(&mut server_io).await;
577        server_io
578            .write_all(b"* CAPABILITY IMAP4rev2 STARTTLS UNSELECT\r\n")
579            .await
580            .unwrap();
581        server_io
582            .write_all(format!("{} OK CAPABILITY done\r\n", tag2).as_bytes())
583            .await
584            .unwrap();
585
586        let auth = login_task.await.unwrap().unwrap();
587        assert!(auth.capabilities.imap4rev2);
588        assert!(auth.capabilities.unselect);
589    }
590
591    #[tokio::test]
592    async fn test_login_escapes_quote_and_backslash() {
593        let (client_io, mut server_io) = duplex(1024);
594        let session = unauth_session(client_io);
595
596        let login_task = tokio::spawn(async move {
597            session
598                .login("user\"with\\specials", Password::new("p@ss\"\\"))
599                .await
600        });
601
602        let mut buf = [0u8; 1024];
603        let n = server_io.read(&mut buf).await.unwrap();
604        let cmd = String::from_utf8_lossy(&buf[..n]).into_owned();
605        // Both quote and backslash must be escaped on the wire.
606        assert!(cmd.contains(r#"LOGIN "user\"with\\specials" "p@ss\"\\""#));
607        let tag = cmd.split_whitespace().next().unwrap().to_owned();
608        // Include [CAPABILITY] response code to skip the explicit refresh.
609        server_io
610            .write_all(format!("{} OK [CAPABILITY IMAP4rev2] done\r\n", tag).as_bytes())
611            .await
612            .unwrap();
613        let _ = login_task.await.unwrap().unwrap();
614    }
615
616    #[tokio::test]
617    async fn test_login_rejects_8bit_password() {
618        let (client_io, _server_io) = duplex(1024);
619        let session = unauth_session(client_io);
620        let r = session.login("user", Password::new("café")).await;
621        assert!(matches!(r, Err(ClientError::CommandFailed(_))));
622    }
623
624    #[tokio::test]
625    async fn test_login_failure() {
626        let (client_io, mut server_io) = duplex(1024);
627        let session = unauth_session(client_io);
628        let task = tokio::spawn(async move { session.login("user", Password::new("pass")).await });
629        let tag = read_cmd_tag(&mut server_io).await;
630        server_io
631            .write_all(format!("{} NO authentication failed\r\n", tag).as_bytes())
632            .await
633            .unwrap();
634        match task.await.unwrap() {
635            Err(ClientError::CommandFailed(t)) => assert!(t.contains("authentication")),
636            _ => panic!("unexpected variant"),
637        }
638    }
639
640    #[tokio::test]
641    async fn test_authenticate_plain() {
642        let (client_io, mut server_io) = duplex(1024);
643        let session = unauth_session(client_io);
644        let task = tokio::spawn(async move {
645            session
646                .authenticate_plain("user", &Password::new("pass"))
647                .await
648        });
649
650        // Server sees `A0001 AUTHENTICATE PLAIN\r\n`
651        let mut buf = [0u8; 1024];
652        let n = server_io.read(&mut buf).await.unwrap();
653        let cmd = String::from_utf8_lossy(&buf[..n]).into_owned();
654        assert!(cmd.contains("AUTHENTICATE PLAIN"));
655        let tag = cmd.split_whitespace().next().unwrap().to_owned();
656
657        // Send continuation
658        server_io.write_all(b"+ \r\n").await.unwrap();
659
660        // Read base64 SASL payload
661        let n = server_io.read(&mut buf).await.unwrap();
662        let payload_b64 = String::from_utf8_lossy(&buf[..n]).trim_end().to_string();
663        let payload = base64::engine::general_purpose::STANDARD
664            .decode(payload_b64)
665            .unwrap();
666        assert_eq!(payload, b"\0user\0pass");
667
668        server_io
669            .write_all(format!("{} OK [CAPABILITY IMAP4rev2] auth done\r\n", tag).as_bytes())
670            .await
671            .unwrap();
672
673        let _auth = task.await.unwrap().unwrap();
674    }
675
676    #[tokio::test]
677    async fn test_authenticate_plain_failure() {
678        let (client_io, mut server_io) = duplex(1024);
679        let session = unauth_session(client_io);
680        let task = tokio::spawn(async move {
681            session
682                .authenticate_plain("user", &Password::new("pass"))
683                .await
684        });
685
686        let tag = read_cmd_tag(&mut server_io).await;
687        server_io.write_all(b"+ \r\n").await.unwrap();
688        let mut buf = [0u8; 1024];
689        let _ = server_io.read(&mut buf).await.unwrap();
690        server_io
691            .write_all(format!("{} NO bad creds\r\n", tag).as_bytes())
692            .await
693            .unwrap();
694
695        assert!(matches!(
696            task.await.unwrap(),
697            Err(ClientError::CommandFailed(_))
698        ));
699    }
700
701    #[tokio::test]
702    async fn test_session_select() {
703        let (client_io, mut server_io) = duplex(1024);
704        let session = auth_session(client_io);
705        let task = tokio::spawn(async move { session.select("INBOX").await });
706        let tag = read_cmd_tag(&mut server_io).await;
707        server_io
708            .write_all(format!("{} OK SELECT completed\r\n", tag).as_bytes())
709            .await
710            .unwrap();
711        let _selected = task.await.unwrap().unwrap();
712    }
713
714    #[tokio::test]
715    async fn test_session_examine() {
716        let (client_io, mut server_io) = duplex(1024);
717        let session = auth_session(client_io);
718        let task = tokio::spawn(async move { session.examine("INBOX").await });
719        let mut buf = [0u8; 1024];
720        let n = server_io.read(&mut buf).await.unwrap();
721        let cmd = String::from_utf8_lossy(&buf[..n]).into_owned();
722        assert!(cmd.contains("EXAMINE \"INBOX\""));
723        let tag = cmd.split_whitespace().next().unwrap();
724        server_io
725            .write_all(format!("{} OK EXAMINE completed\r\n", tag).as_bytes())
726            .await
727            .unwrap();
728        let _ = task.await.unwrap().unwrap();
729    }
730
731    #[tokio::test]
732    async fn test_session_select_failure() {
733        let (client_io, mut server_io) = duplex(1024);
734        let session = auth_session(client_io);
735        let task = tokio::spawn(async move { session.select("BadBox").await });
736        let tag = read_cmd_tag(&mut server_io).await;
737        server_io
738            .write_all(format!("{} NO no such mailbox\r\n", tag).as_bytes())
739            .await
740            .unwrap();
741        assert!(matches!(
742            task.await.unwrap(),
743            Err(ClientError::CommandFailed(_))
744        ));
745    }
746
747    #[tokio::test]
748    async fn test_session_logout() {
749        let (client_io, mut server_io) = duplex(1024);
750        let session = auth_session(client_io);
751        let task = tokio::spawn(async move { session.logout().await });
752        let tag = read_cmd_tag(&mut server_io).await;
753        server_io.write_all(b"* BYE goodbye\r\n").await.unwrap();
754        server_io
755            .write_all(format!("{} OK LOGOUT completed\r\n", tag).as_bytes())
756            .await
757            .unwrap();
758        task.await.unwrap().unwrap();
759    }
760
761    #[tokio::test]
762    async fn test_session_noop() {
763        let (client_io, mut server_io) = duplex(1024);
764        let mut session = auth_session(client_io);
765        let task = tokio::spawn(async move {
766            let r = session.noop().await;
767            (session, r)
768        });
769        let tag = read_cmd_tag(&mut server_io).await;
770        server_io
771            .write_all(format!("{} OK NOOP done\r\n", tag).as_bytes())
772            .await
773            .unwrap();
774        let (_session, r) = task.await.unwrap();
775        r.unwrap();
776    }
777
778    #[tokio::test]
779    async fn test_session_close() {
780        let (client_io, mut server_io) = duplex(1024);
781        let session = selected_session(client_io);
782        let task = tokio::spawn(async move { session.close_mailbox().await });
783        let tag = read_cmd_tag(&mut server_io).await;
784        server_io
785            .write_all(format!("{} OK CLOSE done\r\n", tag).as_bytes())
786            .await
787            .unwrap();
788        let _ = task.await.unwrap().unwrap();
789    }
790
791    #[tokio::test]
792    async fn test_session_unselect_unsupported() {
793        let (client_io, _server_io) = duplex(1024);
794        let session = selected_session(client_io);
795        // Default Capabilities has unselect=false.
796        match session.unselect().await {
797            Err(ClientError::UnsupportedCapability("UNSELECT")) => {}
798            _ => panic!("unexpected variant"),
799        }
800    }
801
802    #[tokio::test]
803    async fn test_session_unselect_supported() {
804        let (client_io, mut server_io) = duplex(1024);
805        let caps = Capabilities {
806            unselect: true,
807            ..Default::default()
808        };
809        let session = selected_session_with_caps(client_io, caps);
810        let task = tokio::spawn(async move { session.unselect().await });
811        let tag = read_cmd_tag(&mut server_io).await;
812        server_io
813            .write_all(format!("{} OK UNSELECT done\r\n", tag).as_bytes())
814            .await
815            .unwrap();
816        let _ = task.await.unwrap().unwrap();
817    }
818
819    #[tokio::test]
820    async fn test_session_check() {
821        let (client_io, mut server_io) = duplex(1024);
822        let mut session = selected_session(client_io);
823        let task = tokio::spawn(async move {
824            let r = session.check().await;
825            (session, r)
826        });
827        let tag = read_cmd_tag(&mut server_io).await;
828        server_io
829            .write_all(format!("{} OK CHECK done\r\n", tag).as_bytes())
830            .await
831            .unwrap();
832        let (_, r) = task.await.unwrap();
833        r.unwrap();
834    }
835
836    #[tokio::test]
837    async fn test_session_idle_unsupported() {
838        let (client_io, _server_io) = duplex(1024);
839        let mut session = selected_session(client_io);
840        match session.idle().await {
841            Err(ClientError::UnsupportedCapability("IDLE")) => {}
842            _ => panic!("unexpected variant"),
843        }
844    }
845
846    #[tokio::test]
847    async fn test_session_idle_flow() {
848        let (client_io, mut server_io) = duplex(1024);
849        let caps = Capabilities {
850            idle: true,
851            ..Default::default()
852        };
853        let mut session = selected_session_with_caps(client_io, caps);
854        let task = tokio::spawn(async move {
855            let h = session.idle().await.unwrap();
856            (session, h)
857        });
858
859        let tag = read_cmd_tag(&mut server_io).await;
860        server_io.write_all(b"+ idling\r\n").await.unwrap();
861
862        let (_session, handle) = task.await.unwrap();
863
864        let stop_task = tokio::spawn(async move { handle.stop().await });
865        let mut buf = [0u8; 1024];
866        let n = server_io.read(&mut buf).await.unwrap();
867        assert_eq!(&buf[..n], b"DONE\r\n");
868        server_io
869            .write_all(format!("{} OK IDLE done\r\n", tag).as_bytes())
870            .await
871            .unwrap();
872        stop_task.await.unwrap().unwrap();
873    }
874
875    #[tokio::test]
876    async fn test_session_search() {
877        let (client_io, mut server_io) = duplex(1024);
878        let mut session = selected_session(client_io);
879        let task = tokio::spawn(async move { session.search(SearchQuery::subject("test")).await });
880        let tag = read_cmd_tag(&mut server_io).await;
881        server_io.write_all(b"* SEARCH 1 2 3\r\n").await.unwrap();
882        server_io
883            .write_all(format!("{} OK SEARCH completed\r\n", tag).as_bytes())
884            .await
885            .unwrap();
886        assert_eq!(task.await.unwrap().unwrap(), vec![1, 2, 3]);
887    }
888
889    #[tokio::test]
890    async fn test_session_uid_search() {
891        let (client_io, mut server_io) = duplex(1024);
892        let mut session = selected_session(client_io);
893        let task = tokio::spawn(async move { session.uid_search(SearchQuery::subject("t")).await });
894        let mut buf = [0u8; 1024];
895        let n = server_io.read(&mut buf).await.unwrap();
896        let cmd = String::from_utf8_lossy(&buf[..n]).into_owned();
897        assert!(cmd.contains("UID SEARCH"));
898        let tag = cmd.split_whitespace().next().unwrap();
899        server_io.write_all(b"* SEARCH 4 5 6\r\n").await.unwrap();
900        server_io
901            .write_all(format!("{} OK done\r\n", tag).as_bytes())
902            .await
903            .unwrap();
904        assert_eq!(task.await.unwrap().unwrap(), vec![4, 5, 6]);
905    }
906
907    #[tokio::test]
908    async fn test_session_search_failure() {
909        let (client_io, mut server_io) = duplex(1024);
910        let mut session = selected_session(client_io);
911        let task = tokio::spawn(async move { session.search(SearchQuery::subject("t")).await });
912        let tag = read_cmd_tag(&mut server_io).await;
913        server_io
914            .write_all(format!("{} NO failed\r\n", tag).as_bytes())
915            .await
916            .unwrap();
917        assert!(task.await.unwrap().is_err());
918    }
919
920    #[tokio::test]
921    async fn test_session_run_search_multiple_events() {
922        let (client_io, mut server_io) = duplex(1024);
923        let mut session = selected_session(client_io);
924        let task = tokio::spawn(async move { session.search(SearchQuery::subject("t")).await });
925        let tag = read_cmd_tag(&mut server_io).await;
926        server_io.write_all(b"* SEARCH 1 2\r\n").await.unwrap();
927        server_io.write_all(b"* SEARCH 3 4\r\n").await.unwrap();
928        server_io
929            .write_all(format!("{} OK done\r\n", tag).as_bytes())
930            .await
931            .unwrap();
932        assert_eq!(task.await.unwrap().unwrap(), vec![1, 2, 3, 4]);
933    }
934
935    #[tokio::test]
936    async fn test_session_store_and_uid_store() {
937        for (kind, expected_prefix) in [("STORE", "STORE"), ("UID STORE", "UID STORE")] {
938            let (client_io, mut server_io) = duplex(1024);
939            let mut session = selected_session(client_io);
940            let task = tokio::spawn(async move {
941                if kind == "STORE" {
942                    session.store("1", StoreAction::Add, &[Flag::Seen]).await
943                } else {
944                    session
945                        .uid_store("1", StoreAction::Add, &[Flag::Seen])
946                        .await
947                }
948            });
949            let mut buf = [0u8; 1024];
950            let n = server_io.read(&mut buf).await.unwrap();
951            assert!(String::from_utf8_lossy(&buf[..n]).contains(expected_prefix));
952            let tag = String::from_utf8_lossy(&buf[..n])
953                .split_whitespace()
954                .next()
955                .unwrap()
956                .to_owned();
957            server_io
958                .write_all(format!("{} OK done\r\n", tag).as_bytes())
959                .await
960                .unwrap();
961            let _ = task.await.unwrap().unwrap();
962        }
963    }
964
965    #[tokio::test]
966    async fn test_session_expunge() {
967        let (client_io, mut server_io) = duplex(1024);
968        let mut session = selected_session(client_io);
969        let task = tokio::spawn(async move { session.expunge().await });
970        let tag = read_cmd_tag(&mut server_io).await;
971        server_io
972            .write_all(format!("{} OK done\r\n", tag).as_bytes())
973            .await
974            .unwrap();
975        let _ = task.await.unwrap().unwrap();
976    }
977
978    #[tokio::test]
979    async fn test_session_move_unsupported() {
980        let (client_io, _server_io) = duplex(1024);
981        let mut session = selected_session(client_io);
982        match session.move_messages("1", "Archive").await {
983            Err(ClientError::UnsupportedCapability("MOVE")) => {}
984            _ => panic!("unexpected variant"),
985        }
986    }
987
988    #[tokio::test]
989    async fn test_session_move_supported() {
990        let (client_io, mut server_io) = duplex(1024);
991        let caps = Capabilities {
992            move_ext: true,
993            ..Default::default()
994        };
995        let mut session = selected_session_with_caps(client_io, caps);
996        let task = tokio::spawn(async move { session.move_messages("1", "Archive").await });
997        let mut buf = [0u8; 1024];
998        let n = server_io.read(&mut buf).await.unwrap();
999        let cmd = String::from_utf8_lossy(&buf[..n]).into_owned();
1000        assert!(cmd.contains(r#"MOVE 1 "Archive""#));
1001        let tag = cmd.split_whitespace().next().unwrap();
1002        server_io
1003            .write_all(format!("{} OK done\r\n", tag).as_bytes())
1004            .await
1005            .unwrap();
1006        let _ = task.await.unwrap().unwrap();
1007    }
1008
1009    #[tokio::test]
1010    async fn test_session_list() {
1011        let (client_io, mut server_io) = duplex(1024);
1012        let mut session = auth_session(client_io);
1013        let task = tokio::spawn(async move { session.list("", "*").await });
1014        let tag = read_cmd_tag(&mut server_io).await;
1015        server_io
1016            .write_all(format!("{} OK done\r\n", tag).as_bytes())
1017            .await
1018            .unwrap();
1019        let _ = task.await.unwrap().unwrap();
1020    }
1021
1022    #[tokio::test]
1023    async fn test_session_list_failure() {
1024        let (client_io, mut server_io) = duplex(1024);
1025        let mut session = auth_session(client_io);
1026        let task = tokio::spawn(async move { session.list("", "*").await });
1027        let tag = read_cmd_tag(&mut server_io).await;
1028        server_io
1029            .write_all(format!("{} NO failed\r\n", tag).as_bytes())
1030            .await
1031            .unwrap();
1032        assert!(task.await.unwrap().is_err());
1033    }
1034
1035    #[tokio::test]
1036    async fn test_session_fetch_raw() {
1037        let (client_io, mut server_io) = duplex(1024);
1038        let mut session = selected_session(client_io);
1039        let task = tokio::spawn(async move { session.fetch_raw("1", "ALL").await });
1040        let tag = read_cmd_tag(&mut server_io).await;
1041        server_io
1042            .write_all(format!("{} OK done\r\n", tag).as_bytes())
1043            .await
1044            .unwrap();
1045        let _ = task.await.unwrap().unwrap();
1046    }
1047
1048    #[tokio::test]
1049    async fn test_session_fetch_structured() {
1050        let (client_io, mut server_io) = duplex(1024);
1051        let mut session = selected_session(client_io);
1052        let task = tokio::spawn(async move { session.fetch("1", "BODY[]").await });
1053        let tag = read_cmd_tag(&mut server_io).await;
1054        server_io
1055            .write_all(b"* 1 FETCH (BODY[] {10}\r\n0123456789 UID 123)\r\n")
1056            .await
1057            .unwrap();
1058        server_io
1059            .write_all(format!("{} OK done\r\n", tag).as_bytes())
1060            .await
1061            .unwrap();
1062        let results = task.await.unwrap().unwrap();
1063        assert_eq!(results.len(), 1);
1064        assert_eq!(results[0].seq, 1);
1065        assert_eq!(results[0].uid, Some(123));
1066        assert_eq!(results[0].body.as_deref(), Some(&b"0123456789"[..]));
1067    }
1068
1069    #[tokio::test]
1070    async fn test_session_fetch_body_helper() {
1071        let (client_io, mut server_io) = duplex(1024);
1072        let mut session = selected_session(client_io);
1073        let task = tokio::spawn(async move { session.fetch_body("1").await });
1074        let tag = read_cmd_tag(&mut server_io).await;
1075        server_io
1076            .write_all(b"* 1 FETCH (BODY[] {10}\r\n0123456789)\r\n")
1077            .await
1078            .unwrap();
1079        server_io
1080            .write_all(format!("{} OK done\r\n", tag).as_bytes())
1081            .await
1082            .unwrap();
1083        assert_eq!(task.await.unwrap().unwrap(), Some("0123456789".to_string()));
1084    }
1085
1086    #[tokio::test]
1087    async fn test_session_transition_transport() {
1088        let (client_io, _server_io) = duplex(1024);
1089        let session = unauth_session(client_io);
1090        let _ = session.transition_transport::<crate::PlainText>();
1091    }
1092
1093    #[tokio::test]
1094    async fn test_session_events() {
1095        let (client_io, _server_io) = duplex(1024);
1096        let session = unauth_session(client_io);
1097        let _ = session.events();
1098    }
1099
1100    #[tokio::test]
1101    async fn test_refresh_capabilities_explicit() {
1102        let (client_io, mut server_io) = duplex(1024);
1103        let mut session = unauth_session(client_io);
1104        let task = tokio::spawn(async move {
1105            let r = session.refresh_capabilities().await;
1106            (session, r)
1107        });
1108        let tag = read_cmd_tag(&mut server_io).await;
1109        server_io
1110            .write_all(b"* CAPABILITY IMAP4rev2 IDLE\r\n")
1111            .await
1112            .unwrap();
1113        server_io
1114            .write_all(format!("{} OK done\r\n", tag).as_bytes())
1115            .await
1116            .unwrap();
1117        let (session, r) = task.await.unwrap();
1118        r.unwrap();
1119        assert!(session.capabilities.imap4rev2);
1120        assert!(session.capabilities.idle);
1121    }
1122}