1use std::marker::PhantomData;
21use std::time::Duration;
22
23use base64::Engine as _;
24use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
25use tokio::sync::broadcast;
26
27use crate::capabilities::Capabilities;
28use crate::client::RawClient;
29use crate::credentials::{Password, imap_quoted};
30use crate::error::ClientError;
31use crate::flags::{Flag, StoreAction};
32use crate::idle::IdleHandle;
33use crate::search::SearchQuery;
34
35use imap_core::ast::{DataResponse, FetchAttribute, Response};
36use imap_core::parser::parse_response;
37
38pub struct Unauthenticated;
42pub struct Authenticated;
44pub struct Selected;
46
47pub struct PlainText;
51pub struct Tls;
53
54pub struct Session<State, Transport> {
61 raw: RawClient,
62 pub capabilities: Capabilities,
65 _state: PhantomData<State>,
66 _transport: PhantomData<Transport>,
67}
68
69#[derive(Debug, Clone)]
71pub struct FetchResult {
72 pub seq: u32,
74 pub uid: Option<u32>,
76 pub body: Option<Vec<u8>>,
78}
79
80impl<S, T> Session<S, T> {
83 fn transition_state<NewState>(self) -> Session<NewState, T> {
84 Session {
85 raw: self.raw,
86 capabilities: self.capabilities,
87 _state: PhantomData,
88 _transport: PhantomData,
89 }
90 }
91
92 pub fn transition_transport<NewTransport>(self) -> Session<S, NewTransport> {
95 Session {
96 raw: self.raw,
97 capabilities: self.capabilities,
98 _state: PhantomData,
99 _transport: PhantomData,
100 }
101 }
102
103 pub fn events(&self) -> broadcast::Receiver<Vec<u8>> {
106 self.raw.events()
107 }
108
109 pub async fn refresh_capabilities(&mut self) -> Result<(), ClientError> {
113 let mut events = self.raw.events();
114 let _resp = self.raw.execute_command("CAPABILITY").await?;
115 while let Ok(event) = events.try_recv() {
117 if let Ok((_, response)) = parse_response(&event)
118 && self.capabilities.try_update_from(&response)
119 {
120 return Ok(());
121 }
122 }
123 Ok(())
124 }
125
126 async fn refresh_capabilities_from_frame(&mut self, frame: &[u8]) -> Result<(), ClientError> {
129 if let Ok((_, response)) = parse_response(frame)
130 && self.capabilities.try_update_from(&response)
131 {
132 return Ok(());
133 }
134 self.refresh_capabilities().await
135 }
136}
137
138impl<T> Session<Unauthenticated, T> {
139 pub fn new(raw: RawClient, capabilities: Capabilities) -> Self {
142 Self::new_in_state(raw, capabilities)
143 }
144}
145
146impl<S, T> Session<S, T> {
147 pub(crate) fn new_in_state(raw: RawClient, capabilities: Capabilities) -> Self {
148 Self {
149 raw,
150 capabilities,
151 _state: PhantomData,
152 _transport: PhantomData,
153 }
154 }
155
156 pub async fn logout(mut self) -> Result<(), ClientError> {
159 match self.raw.execute_command("LOGOUT").await {
162 Ok(_) | Err(ClientError::ConnectionClosed) => Ok(()),
163 Err(e) => Err(e),
164 }
165 }
166
167 pub async fn noop(&mut self) -> Result<(), ClientError> {
169 self.raw.execute_command("NOOP").await.map(|_| ())
170 }
171}
172
173impl Session<Unauthenticated, Tls> {
176 pub async fn login(
183 mut self,
184 user: &str,
185 pass: Password,
186 ) -> Result<Session<Authenticated, Tls>, ClientError> {
187 let user_q = imap_quoted(user)?;
188 let pass_q = pass.as_imap_quoted()?;
189 let cmd = format!("LOGIN {} {}", user_q, pass_q);
190
191 let frame = self.raw.execute_command(&cmd).await?;
192 self.refresh_capabilities_from_frame(&frame).await?;
193 Ok(self.transition_state())
194 }
195
196 pub async fn authenticate_plain(
199 mut self,
200 user: &str,
201 pass: &Password,
202 ) -> Result<Session<Authenticated, Tls>, ClientError> {
203 if user.as_bytes().contains(&0) {
204 return Err(ClientError::CommandFailed(
205 "username must not contain NUL".into(),
206 ));
207 }
208 if pass.as_str().as_bytes().contains(&0) {
209 return Err(ClientError::CommandFailed(
210 "password must not contain NUL".into(),
211 ));
212 }
213
214 let mut events = self.raw.events();
215 let (_tag, reply_rx) = self.raw.send_command_async("AUTHENTICATE PLAIN").await?;
216
217 wait_for_continuation(&mut events, Duration::from_secs(30)).await?;
219
220 let mut sasl = Vec::with_capacity(2 + user.len() + pass.as_str().len());
222 sasl.push(0);
223 sasl.extend_from_slice(user.as_bytes());
224 sasl.push(0);
225 sasl.extend_from_slice(pass.as_str().as_bytes());
226 let mut payload = BASE64_STANDARD.encode(&sasl).into_bytes();
227 payload.extend_from_slice(b"\r\n");
228 self.raw.send_raw(payload).await?;
229
230 let frame = match reply_rx.await {
231 Ok(r) => r?,
232 Err(_) => return Err(ClientError::ConnectionClosed),
233 };
234 self.refresh_capabilities_from_frame(&frame).await?;
235 Ok(self.transition_state())
236 }
237}
238
239impl<T> Session<Authenticated, T> {
242 pub async fn select(mut self, mailbox: &str) -> Result<Session<Selected, T>, ClientError> {
244 let mb = imap_quoted(mailbox)?;
245 let cmd = format!("SELECT {}", mb);
246 self.raw.execute_command(&cmd).await?;
247 Ok(self.transition_state())
248 }
249
250 pub async fn examine(mut self, mailbox: &str) -> Result<Session<Selected, T>, ClientError> {
252 let mb = imap_quoted(mailbox)?;
253 let cmd = format!("EXAMINE {}", mb);
254 self.raw.execute_command(&cmd).await?;
255 Ok(self.transition_state())
256 }
257
258 pub async fn list(
260 &mut self,
261 reference: &str,
262 mailbox_mask: &str,
263 ) -> Result<Vec<u8>, ClientError> {
264 let cmd = format!(
265 "LIST {} {}",
266 imap_quoted(reference)?,
267 imap_quoted(mailbox_mask)?
268 );
269 self.raw.execute_command(&cmd).await
270 }
271}
272
273impl<T> Session<Selected, T> {
276 pub async fn fetch_raw(
278 &mut self,
279 sequence_set: &str,
280 items: &str,
281 ) -> Result<Vec<u8>, ClientError> {
282 let cmd = format!("FETCH {} {}", sequence_set, items);
283 self.raw.execute_command(&cmd).await
284 }
285
286 pub async fn fetch(
289 &mut self,
290 sequence_set: &str,
291 items: &str,
292 ) -> Result<Vec<FetchResult>, ClientError> {
293 let mut events = self.raw.events();
294 let cmd = format!("FETCH {} {}", sequence_set, items);
295 let _resp = self.raw.execute_command(&cmd).await?;
296
297 let mut results = Vec::new();
298 while let Ok(event) = events.try_recv() {
299 if let Ok((_, Response::Data(DataResponse::Fetch { seq, attributes }))) =
300 parse_response(&event)
301 {
302 let mut uid = None;
303 let mut body = None;
304 for attr in attributes {
305 match attr {
306 FetchAttribute::Uid(u) => uid = Some(u),
307 FetchAttribute::BodySection { data: Some(d), .. } => {
308 body = Some(d.to_vec())
309 }
310 FetchAttribute::Body(b) => body = Some(b.to_vec()),
311 FetchAttribute::Rfc822(b) => body = Some(b.to_vec()),
312 _ => {}
313 }
314 }
315 results.push(FetchResult { seq, uid, body });
316 }
317 }
318 Ok(results)
319 }
320
321 pub async fn fetch_body(&mut self, sequence_set: &str) -> Result<Option<String>, ClientError> {
323 let results = self.fetch(sequence_set, "BODY[]").await?;
324 if let Some(res) = results.first()
325 && let Some(body) = &res.body
326 {
327 return Ok(Some(String::from_utf8_lossy(body).into_owned()));
328 }
329 Ok(None)
330 }
331
332 pub async fn search(&mut self, query: SearchQuery) -> Result<Vec<u32>, ClientError> {
334 self.run_search(&format!("SEARCH {}", query.build())).await
335 }
336
337 pub async fn uid_search(&mut self, query: SearchQuery) -> Result<Vec<u32>, ClientError> {
339 self.run_search(&format!("UID SEARCH {}", query.build()))
340 .await
341 }
342
343 async fn run_search(&mut self, cmd: &str) -> Result<Vec<u32>, ClientError> {
344 let mut events = self.raw.events();
345 let _resp = self.raw.execute_command(cmd).await?;
346
347 let mut all_ids = Vec::new();
348 while let Ok(event) = events.try_recv() {
349 if let Ok((_, Response::Data(DataResponse::Search(ids)))) = parse_response(&event) {
350 all_ids.extend(ids);
351 }
352 }
353 Ok(all_ids)
354 }
355
356 pub async fn store(
359 &mut self,
360 sequence_set: &str,
361 action: StoreAction,
362 flags: &[Flag],
363 ) -> Result<Vec<u8>, ClientError> {
364 let flags_str = flags
365 .iter()
366 .map(|f| f.to_string())
367 .collect::<Vec<_>>()
368 .join(" ");
369 let cmd = format!(
370 "STORE {} {} ({})",
371 sequence_set,
372 action.to_imap_prefix(false),
373 flags_str
374 );
375 self.raw.execute_command(&cmd).await
376 }
377
378 pub async fn uid_store(
380 &mut self,
381 uid_set: &str,
382 action: StoreAction,
383 flags: &[Flag],
384 ) -> Result<Vec<u8>, ClientError> {
385 let flags_str = flags
386 .iter()
387 .map(|f| f.to_string())
388 .collect::<Vec<_>>()
389 .join(" ");
390 let cmd = format!(
391 "UID STORE {} {} ({})",
392 uid_set,
393 action.to_imap_prefix(false),
394 flags_str
395 );
396 self.raw.execute_command(&cmd).await
397 }
398
399 pub async fn expunge(&mut self) -> Result<Vec<u8>, ClientError> {
402 self.raw.execute_command("EXPUNGE").await
403 }
404
405 pub async fn close_mailbox(mut self) -> Result<Session<Authenticated, T>, ClientError> {
408 self.raw.execute_command("CLOSE").await?;
409 Ok(self.transition_state())
410 }
411
412 pub async fn unselect(mut self) -> Result<Session<Authenticated, T>, ClientError> {
415 if !self.capabilities.unselect {
416 return Err(ClientError::UnsupportedCapability("UNSELECT"));
417 }
418 self.raw.execute_command("UNSELECT").await?;
419 Ok(self.transition_state())
420 }
421
422 pub async fn check(&mut self) -> Result<(), ClientError> {
424 self.raw.execute_command("CHECK").await.map(|_| ())
425 }
426
427 pub async fn idle(&mut self) -> Result<IdleHandle, ClientError> {
431 if !self.capabilities.idle {
432 return Err(ClientError::UnsupportedCapability("IDLE"));
433 }
434 let mut events = self.raw.events();
435 let writer = self.raw.writer();
436 let (_tag, reply_rx) = self.raw.send_command_async("IDLE").await?;
437 wait_for_continuation(&mut events, Duration::from_secs(30)).await?;
438 Ok(IdleHandle::new(writer, reply_rx))
439 }
440
441 pub async fn move_messages(
443 &mut self,
444 sequence_set: &str,
445 mailbox: &str,
446 ) -> Result<Vec<u8>, ClientError> {
447 if !self.capabilities.move_ext {
448 return Err(ClientError::UnsupportedCapability("MOVE"));
449 }
450 let cmd = format!("MOVE {} {}", sequence_set, imap_quoted(mailbox)?);
451 self.raw.execute_command(&cmd).await
452 }
453}
454
455async fn wait_for_continuation(
460 events: &mut broadcast::Receiver<Vec<u8>>,
461 timeout: Duration,
462) -> Result<(), ClientError> {
463 let deadline = tokio::time::Instant::now() + timeout;
464 loop {
465 let now = tokio::time::Instant::now();
466 if now >= deadline {
467 return Err(ClientError::Timeout);
468 }
469 match tokio::time::timeout(deadline - now, events.recv()).await {
470 Ok(Ok(frame)) => {
471 if frame.starts_with(b"+") {
472 return Ok(());
473 }
474 }
476 Ok(Err(broadcast::error::RecvError::Closed)) => {
477 return Err(ClientError::ConnectionClosed);
478 }
479 Ok(Err(broadcast::error::RecvError::Lagged(_))) => {
480 return Err(ClientError::CommandFailed(
484 "broadcast lagged; continuation may have been missed".into(),
485 ));
486 }
487 Err(_) => return Err(ClientError::Timeout),
488 }
489 }
490}
491
492#[cfg(test)]
495mod tests {
496 use super::*;
497 use crate::{Flag, SearchQuery, StoreAction, Tls};
498 use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex};
499
500 async fn read_cmd_tag(server: &mut tokio::io::DuplexStream) -> String {
501 let mut buf = [0u8; 1024];
502 let n = server.read(&mut buf).await.unwrap();
503 String::from_utf8_lossy(&buf[..n])
504 .split_whitespace()
505 .next()
506 .unwrap()
507 .to_owned()
508 }
509
510 fn unauth_session(client_io: tokio::io::DuplexStream) -> Session<Unauthenticated, Tls> {
511 let raw = RawClient::new(client_io);
512 Session::<Unauthenticated, Tls>::new_in_state(raw, Capabilities::default())
513 }
514
515 fn auth_session(client_io: tokio::io::DuplexStream) -> Session<Authenticated, Tls> {
516 let raw = RawClient::new(client_io);
517 Session::<Authenticated, Tls>::new_in_state(raw, Capabilities::default())
518 }
519
520 fn selected_session(client_io: tokio::io::DuplexStream) -> Session<Selected, Tls> {
521 let raw = RawClient::new(client_io);
522 Session::<Selected, Tls>::new_in_state(raw, Capabilities::default())
523 }
524
525 fn selected_session_with_caps(
526 client_io: tokio::io::DuplexStream,
527 caps: Capabilities,
528 ) -> Session<Selected, Tls> {
529 let raw = RawClient::new(client_io);
530 Session::<Selected, Tls>::new_in_state(raw, caps)
531 }
532
533 #[tokio::test]
534 async fn test_session_login() {
535 let (client_io, mut server_io) = duplex(1024);
536 let session = unauth_session(client_io);
537
538 let login_task =
539 tokio::spawn(async move { session.login("user", Password::new("pass")).await });
540
541 let mut buf = [0u8; 1024];
543 let n = server_io.read(&mut buf).await.unwrap();
544 let cmd = String::from_utf8_lossy(&buf[..n]).into_owned();
545 assert!(cmd.contains("LOGIN \"user\" \"pass\""));
546 let tag = cmd.split_whitespace().next().unwrap().to_owned();
547
548 server_io
549 .write_all(
550 format!("{} OK [CAPABILITY IMAP4rev2 IDLE] LOGIN completed\r\n", tag).as_bytes(),
551 )
552 .await
553 .unwrap();
554
555 let auth = login_task.await.unwrap().unwrap();
556 assert!(auth.capabilities.imap4rev2);
558 assert!(auth.capabilities.idle);
559 }
560
561 #[tokio::test]
562 async fn test_session_login_falls_back_to_capability_round_trip() {
563 let (client_io, mut server_io) = duplex(1024);
564 let session = unauth_session(client_io);
565
566 let login_task =
567 tokio::spawn(async move { session.login("user", Password::new("pass")).await });
568
569 let tag = read_cmd_tag(&mut server_io).await;
570 server_io
571 .write_all(format!("{} OK LOGIN completed\r\n", tag).as_bytes())
572 .await
573 .unwrap();
574
575 let tag2 = read_cmd_tag(&mut server_io).await;
577 server_io
578 .write_all(b"* CAPABILITY IMAP4rev2 STARTTLS UNSELECT\r\n")
579 .await
580 .unwrap();
581 server_io
582 .write_all(format!("{} OK CAPABILITY done\r\n", tag2).as_bytes())
583 .await
584 .unwrap();
585
586 let auth = login_task.await.unwrap().unwrap();
587 assert!(auth.capabilities.imap4rev2);
588 assert!(auth.capabilities.unselect);
589 }
590
591 #[tokio::test]
592 async fn test_login_escapes_quote_and_backslash() {
593 let (client_io, mut server_io) = duplex(1024);
594 let session = unauth_session(client_io);
595
596 let login_task = tokio::spawn(async move {
597 session
598 .login("user\"with\\specials", Password::new("p@ss\"\\"))
599 .await
600 });
601
602 let mut buf = [0u8; 1024];
603 let n = server_io.read(&mut buf).await.unwrap();
604 let cmd = String::from_utf8_lossy(&buf[..n]).into_owned();
605 assert!(cmd.contains(r#"LOGIN "user\"with\\specials" "p@ss\"\\""#));
607 let tag = cmd.split_whitespace().next().unwrap().to_owned();
608 server_io
610 .write_all(format!("{} OK [CAPABILITY IMAP4rev2] done\r\n", tag).as_bytes())
611 .await
612 .unwrap();
613 let _ = login_task.await.unwrap().unwrap();
614 }
615
616 #[tokio::test]
617 async fn test_login_rejects_8bit_password() {
618 let (client_io, _server_io) = duplex(1024);
619 let session = unauth_session(client_io);
620 let r = session.login("user", Password::new("café")).await;
621 assert!(matches!(r, Err(ClientError::CommandFailed(_))));
622 }
623
624 #[tokio::test]
625 async fn test_login_failure() {
626 let (client_io, mut server_io) = duplex(1024);
627 let session = unauth_session(client_io);
628 let task = tokio::spawn(async move { session.login("user", Password::new("pass")).await });
629 let tag = read_cmd_tag(&mut server_io).await;
630 server_io
631 .write_all(format!("{} NO authentication failed\r\n", tag).as_bytes())
632 .await
633 .unwrap();
634 match task.await.unwrap() {
635 Err(ClientError::CommandFailed(t)) => assert!(t.contains("authentication")),
636 _ => panic!("unexpected variant"),
637 }
638 }
639
640 #[tokio::test]
641 async fn test_authenticate_plain() {
642 let (client_io, mut server_io) = duplex(1024);
643 let session = unauth_session(client_io);
644 let task = tokio::spawn(async move {
645 session
646 .authenticate_plain("user", &Password::new("pass"))
647 .await
648 });
649
650 let mut buf = [0u8; 1024];
652 let n = server_io.read(&mut buf).await.unwrap();
653 let cmd = String::from_utf8_lossy(&buf[..n]).into_owned();
654 assert!(cmd.contains("AUTHENTICATE PLAIN"));
655 let tag = cmd.split_whitespace().next().unwrap().to_owned();
656
657 server_io.write_all(b"+ \r\n").await.unwrap();
659
660 let n = server_io.read(&mut buf).await.unwrap();
662 let payload_b64 = String::from_utf8_lossy(&buf[..n]).trim_end().to_string();
663 let payload = base64::engine::general_purpose::STANDARD
664 .decode(payload_b64)
665 .unwrap();
666 assert_eq!(payload, b"\0user\0pass");
667
668 server_io
669 .write_all(format!("{} OK [CAPABILITY IMAP4rev2] auth done\r\n", tag).as_bytes())
670 .await
671 .unwrap();
672
673 let _auth = task.await.unwrap().unwrap();
674 }
675
676 #[tokio::test]
677 async fn test_authenticate_plain_failure() {
678 let (client_io, mut server_io) = duplex(1024);
679 let session = unauth_session(client_io);
680 let task = tokio::spawn(async move {
681 session
682 .authenticate_plain("user", &Password::new("pass"))
683 .await
684 });
685
686 let tag = read_cmd_tag(&mut server_io).await;
687 server_io.write_all(b"+ \r\n").await.unwrap();
688 let mut buf = [0u8; 1024];
689 let _ = server_io.read(&mut buf).await.unwrap();
690 server_io
691 .write_all(format!("{} NO bad creds\r\n", tag).as_bytes())
692 .await
693 .unwrap();
694
695 assert!(matches!(
696 task.await.unwrap(),
697 Err(ClientError::CommandFailed(_))
698 ));
699 }
700
701 #[tokio::test]
702 async fn test_session_select() {
703 let (client_io, mut server_io) = duplex(1024);
704 let session = auth_session(client_io);
705 let task = tokio::spawn(async move { session.select("INBOX").await });
706 let tag = read_cmd_tag(&mut server_io).await;
707 server_io
708 .write_all(format!("{} OK SELECT completed\r\n", tag).as_bytes())
709 .await
710 .unwrap();
711 let _selected = task.await.unwrap().unwrap();
712 }
713
714 #[tokio::test]
715 async fn test_session_examine() {
716 let (client_io, mut server_io) = duplex(1024);
717 let session = auth_session(client_io);
718 let task = tokio::spawn(async move { session.examine("INBOX").await });
719 let mut buf = [0u8; 1024];
720 let n = server_io.read(&mut buf).await.unwrap();
721 let cmd = String::from_utf8_lossy(&buf[..n]).into_owned();
722 assert!(cmd.contains("EXAMINE \"INBOX\""));
723 let tag = cmd.split_whitespace().next().unwrap();
724 server_io
725 .write_all(format!("{} OK EXAMINE completed\r\n", tag).as_bytes())
726 .await
727 .unwrap();
728 let _ = task.await.unwrap().unwrap();
729 }
730
731 #[tokio::test]
732 async fn test_session_select_failure() {
733 let (client_io, mut server_io) = duplex(1024);
734 let session = auth_session(client_io);
735 let task = tokio::spawn(async move { session.select("BadBox").await });
736 let tag = read_cmd_tag(&mut server_io).await;
737 server_io
738 .write_all(format!("{} NO no such mailbox\r\n", tag).as_bytes())
739 .await
740 .unwrap();
741 assert!(matches!(
742 task.await.unwrap(),
743 Err(ClientError::CommandFailed(_))
744 ));
745 }
746
747 #[tokio::test]
748 async fn test_session_logout() {
749 let (client_io, mut server_io) = duplex(1024);
750 let session = auth_session(client_io);
751 let task = tokio::spawn(async move { session.logout().await });
752 let tag = read_cmd_tag(&mut server_io).await;
753 server_io.write_all(b"* BYE goodbye\r\n").await.unwrap();
754 server_io
755 .write_all(format!("{} OK LOGOUT completed\r\n", tag).as_bytes())
756 .await
757 .unwrap();
758 task.await.unwrap().unwrap();
759 }
760
761 #[tokio::test]
762 async fn test_session_noop() {
763 let (client_io, mut server_io) = duplex(1024);
764 let mut session = auth_session(client_io);
765 let task = tokio::spawn(async move {
766 let r = session.noop().await;
767 (session, r)
768 });
769 let tag = read_cmd_tag(&mut server_io).await;
770 server_io
771 .write_all(format!("{} OK NOOP done\r\n", tag).as_bytes())
772 .await
773 .unwrap();
774 let (_session, r) = task.await.unwrap();
775 r.unwrap();
776 }
777
778 #[tokio::test]
779 async fn test_session_close() {
780 let (client_io, mut server_io) = duplex(1024);
781 let session = selected_session(client_io);
782 let task = tokio::spawn(async move { session.close_mailbox().await });
783 let tag = read_cmd_tag(&mut server_io).await;
784 server_io
785 .write_all(format!("{} OK CLOSE done\r\n", tag).as_bytes())
786 .await
787 .unwrap();
788 let _ = task.await.unwrap().unwrap();
789 }
790
791 #[tokio::test]
792 async fn test_session_unselect_unsupported() {
793 let (client_io, _server_io) = duplex(1024);
794 let session = selected_session(client_io);
795 match session.unselect().await {
797 Err(ClientError::UnsupportedCapability("UNSELECT")) => {}
798 _ => panic!("unexpected variant"),
799 }
800 }
801
802 #[tokio::test]
803 async fn test_session_unselect_supported() {
804 let (client_io, mut server_io) = duplex(1024);
805 let caps = Capabilities {
806 unselect: true,
807 ..Default::default()
808 };
809 let session = selected_session_with_caps(client_io, caps);
810 let task = tokio::spawn(async move { session.unselect().await });
811 let tag = read_cmd_tag(&mut server_io).await;
812 server_io
813 .write_all(format!("{} OK UNSELECT done\r\n", tag).as_bytes())
814 .await
815 .unwrap();
816 let _ = task.await.unwrap().unwrap();
817 }
818
819 #[tokio::test]
820 async fn test_session_check() {
821 let (client_io, mut server_io) = duplex(1024);
822 let mut session = selected_session(client_io);
823 let task = tokio::spawn(async move {
824 let r = session.check().await;
825 (session, r)
826 });
827 let tag = read_cmd_tag(&mut server_io).await;
828 server_io
829 .write_all(format!("{} OK CHECK done\r\n", tag).as_bytes())
830 .await
831 .unwrap();
832 let (_, r) = task.await.unwrap();
833 r.unwrap();
834 }
835
836 #[tokio::test]
837 async fn test_session_idle_unsupported() {
838 let (client_io, _server_io) = duplex(1024);
839 let mut session = selected_session(client_io);
840 match session.idle().await {
841 Err(ClientError::UnsupportedCapability("IDLE")) => {}
842 _ => panic!("unexpected variant"),
843 }
844 }
845
846 #[tokio::test]
847 async fn test_session_idle_flow() {
848 let (client_io, mut server_io) = duplex(1024);
849 let caps = Capabilities {
850 idle: true,
851 ..Default::default()
852 };
853 let mut session = selected_session_with_caps(client_io, caps);
854 let task = tokio::spawn(async move {
855 let h = session.idle().await.unwrap();
856 (session, h)
857 });
858
859 let tag = read_cmd_tag(&mut server_io).await;
860 server_io.write_all(b"+ idling\r\n").await.unwrap();
861
862 let (_session, handle) = task.await.unwrap();
863
864 let stop_task = tokio::spawn(async move { handle.stop().await });
865 let mut buf = [0u8; 1024];
866 let n = server_io.read(&mut buf).await.unwrap();
867 assert_eq!(&buf[..n], b"DONE\r\n");
868 server_io
869 .write_all(format!("{} OK IDLE done\r\n", tag).as_bytes())
870 .await
871 .unwrap();
872 stop_task.await.unwrap().unwrap();
873 }
874
875 #[tokio::test]
876 async fn test_session_search() {
877 let (client_io, mut server_io) = duplex(1024);
878 let mut session = selected_session(client_io);
879 let task = tokio::spawn(async move { session.search(SearchQuery::subject("test")).await });
880 let tag = read_cmd_tag(&mut server_io).await;
881 server_io.write_all(b"* SEARCH 1 2 3\r\n").await.unwrap();
882 server_io
883 .write_all(format!("{} OK SEARCH completed\r\n", tag).as_bytes())
884 .await
885 .unwrap();
886 assert_eq!(task.await.unwrap().unwrap(), vec![1, 2, 3]);
887 }
888
889 #[tokio::test]
890 async fn test_session_uid_search() {
891 let (client_io, mut server_io) = duplex(1024);
892 let mut session = selected_session(client_io);
893 let task = tokio::spawn(async move { session.uid_search(SearchQuery::subject("t")).await });
894 let mut buf = [0u8; 1024];
895 let n = server_io.read(&mut buf).await.unwrap();
896 let cmd = String::from_utf8_lossy(&buf[..n]).into_owned();
897 assert!(cmd.contains("UID SEARCH"));
898 let tag = cmd.split_whitespace().next().unwrap();
899 server_io.write_all(b"* SEARCH 4 5 6\r\n").await.unwrap();
900 server_io
901 .write_all(format!("{} OK done\r\n", tag).as_bytes())
902 .await
903 .unwrap();
904 assert_eq!(task.await.unwrap().unwrap(), vec![4, 5, 6]);
905 }
906
907 #[tokio::test]
908 async fn test_session_search_failure() {
909 let (client_io, mut server_io) = duplex(1024);
910 let mut session = selected_session(client_io);
911 let task = tokio::spawn(async move { session.search(SearchQuery::subject("t")).await });
912 let tag = read_cmd_tag(&mut server_io).await;
913 server_io
914 .write_all(format!("{} NO failed\r\n", tag).as_bytes())
915 .await
916 .unwrap();
917 assert!(task.await.unwrap().is_err());
918 }
919
920 #[tokio::test]
921 async fn test_session_run_search_multiple_events() {
922 let (client_io, mut server_io) = duplex(1024);
923 let mut session = selected_session(client_io);
924 let task = tokio::spawn(async move { session.search(SearchQuery::subject("t")).await });
925 let tag = read_cmd_tag(&mut server_io).await;
926 server_io.write_all(b"* SEARCH 1 2\r\n").await.unwrap();
927 server_io.write_all(b"* SEARCH 3 4\r\n").await.unwrap();
928 server_io
929 .write_all(format!("{} OK done\r\n", tag).as_bytes())
930 .await
931 .unwrap();
932 assert_eq!(task.await.unwrap().unwrap(), vec![1, 2, 3, 4]);
933 }
934
935 #[tokio::test]
936 async fn test_session_store_and_uid_store() {
937 for (kind, expected_prefix) in [("STORE", "STORE"), ("UID STORE", "UID STORE")] {
938 let (client_io, mut server_io) = duplex(1024);
939 let mut session = selected_session(client_io);
940 let task = tokio::spawn(async move {
941 if kind == "STORE" {
942 session.store("1", StoreAction::Add, &[Flag::Seen]).await
943 } else {
944 session
945 .uid_store("1", StoreAction::Add, &[Flag::Seen])
946 .await
947 }
948 });
949 let mut buf = [0u8; 1024];
950 let n = server_io.read(&mut buf).await.unwrap();
951 assert!(String::from_utf8_lossy(&buf[..n]).contains(expected_prefix));
952 let tag = String::from_utf8_lossy(&buf[..n])
953 .split_whitespace()
954 .next()
955 .unwrap()
956 .to_owned();
957 server_io
958 .write_all(format!("{} OK done\r\n", tag).as_bytes())
959 .await
960 .unwrap();
961 let _ = task.await.unwrap().unwrap();
962 }
963 }
964
965 #[tokio::test]
966 async fn test_session_expunge() {
967 let (client_io, mut server_io) = duplex(1024);
968 let mut session = selected_session(client_io);
969 let task = tokio::spawn(async move { session.expunge().await });
970 let tag = read_cmd_tag(&mut server_io).await;
971 server_io
972 .write_all(format!("{} OK done\r\n", tag).as_bytes())
973 .await
974 .unwrap();
975 let _ = task.await.unwrap().unwrap();
976 }
977
978 #[tokio::test]
979 async fn test_session_move_unsupported() {
980 let (client_io, _server_io) = duplex(1024);
981 let mut session = selected_session(client_io);
982 match session.move_messages("1", "Archive").await {
983 Err(ClientError::UnsupportedCapability("MOVE")) => {}
984 _ => panic!("unexpected variant"),
985 }
986 }
987
988 #[tokio::test]
989 async fn test_session_move_supported() {
990 let (client_io, mut server_io) = duplex(1024);
991 let caps = Capabilities {
992 move_ext: true,
993 ..Default::default()
994 };
995 let mut session = selected_session_with_caps(client_io, caps);
996 let task = tokio::spawn(async move { session.move_messages("1", "Archive").await });
997 let mut buf = [0u8; 1024];
998 let n = server_io.read(&mut buf).await.unwrap();
999 let cmd = String::from_utf8_lossy(&buf[..n]).into_owned();
1000 assert!(cmd.contains(r#"MOVE 1 "Archive""#));
1001 let tag = cmd.split_whitespace().next().unwrap();
1002 server_io
1003 .write_all(format!("{} OK done\r\n", tag).as_bytes())
1004 .await
1005 .unwrap();
1006 let _ = task.await.unwrap().unwrap();
1007 }
1008
1009 #[tokio::test]
1010 async fn test_session_list() {
1011 let (client_io, mut server_io) = duplex(1024);
1012 let mut session = auth_session(client_io);
1013 let task = tokio::spawn(async move { session.list("", "*").await });
1014 let tag = read_cmd_tag(&mut server_io).await;
1015 server_io
1016 .write_all(format!("{} OK done\r\n", tag).as_bytes())
1017 .await
1018 .unwrap();
1019 let _ = task.await.unwrap().unwrap();
1020 }
1021
1022 #[tokio::test]
1023 async fn test_session_list_failure() {
1024 let (client_io, mut server_io) = duplex(1024);
1025 let mut session = auth_session(client_io);
1026 let task = tokio::spawn(async move { session.list("", "*").await });
1027 let tag = read_cmd_tag(&mut server_io).await;
1028 server_io
1029 .write_all(format!("{} NO failed\r\n", tag).as_bytes())
1030 .await
1031 .unwrap();
1032 assert!(task.await.unwrap().is_err());
1033 }
1034
1035 #[tokio::test]
1036 async fn test_session_fetch_raw() {
1037 let (client_io, mut server_io) = duplex(1024);
1038 let mut session = selected_session(client_io);
1039 let task = tokio::spawn(async move { session.fetch_raw("1", "ALL").await });
1040 let tag = read_cmd_tag(&mut server_io).await;
1041 server_io
1042 .write_all(format!("{} OK done\r\n", tag).as_bytes())
1043 .await
1044 .unwrap();
1045 let _ = task.await.unwrap().unwrap();
1046 }
1047
1048 #[tokio::test]
1049 async fn test_session_fetch_structured() {
1050 let (client_io, mut server_io) = duplex(1024);
1051 let mut session = selected_session(client_io);
1052 let task = tokio::spawn(async move { session.fetch("1", "BODY[]").await });
1053 let tag = read_cmd_tag(&mut server_io).await;
1054 server_io
1055 .write_all(b"* 1 FETCH (BODY[] {10}\r\n0123456789 UID 123)\r\n")
1056 .await
1057 .unwrap();
1058 server_io
1059 .write_all(format!("{} OK done\r\n", tag).as_bytes())
1060 .await
1061 .unwrap();
1062 let results = task.await.unwrap().unwrap();
1063 assert_eq!(results.len(), 1);
1064 assert_eq!(results[0].seq, 1);
1065 assert_eq!(results[0].uid, Some(123));
1066 assert_eq!(results[0].body.as_deref(), Some(&b"0123456789"[..]));
1067 }
1068
1069 #[tokio::test]
1070 async fn test_session_fetch_body_helper() {
1071 let (client_io, mut server_io) = duplex(1024);
1072 let mut session = selected_session(client_io);
1073 let task = tokio::spawn(async move { session.fetch_body("1").await });
1074 let tag = read_cmd_tag(&mut server_io).await;
1075 server_io
1076 .write_all(b"* 1 FETCH (BODY[] {10}\r\n0123456789)\r\n")
1077 .await
1078 .unwrap();
1079 server_io
1080 .write_all(format!("{} OK done\r\n", tag).as_bytes())
1081 .await
1082 .unwrap();
1083 assert_eq!(task.await.unwrap().unwrap(), Some("0123456789".to_string()));
1084 }
1085
1086 #[tokio::test]
1087 async fn test_session_transition_transport() {
1088 let (client_io, _server_io) = duplex(1024);
1089 let session = unauth_session(client_io);
1090 let _ = session.transition_transport::<crate::PlainText>();
1091 }
1092
1093 #[tokio::test]
1094 async fn test_session_events() {
1095 let (client_io, _server_io) = duplex(1024);
1096 let session = unauth_session(client_io);
1097 let _ = session.events();
1098 }
1099
1100 #[tokio::test]
1101 async fn test_refresh_capabilities_explicit() {
1102 let (client_io, mut server_io) = duplex(1024);
1103 let mut session = unauth_session(client_io);
1104 let task = tokio::spawn(async move {
1105 let r = session.refresh_capabilities().await;
1106 (session, r)
1107 });
1108 let tag = read_cmd_tag(&mut server_io).await;
1109 server_io
1110 .write_all(b"* CAPABILITY IMAP4rev2 IDLE\r\n")
1111 .await
1112 .unwrap();
1113 server_io
1114 .write_all(format!("{} OK done\r\n", tag).as_bytes())
1115 .await
1116 .unwrap();
1117 let (session, r) = task.await.unwrap();
1118 r.unwrap();
1119 assert!(session.capabilities.imap4rev2);
1120 assert!(session.capabilities.idle);
1121 }
1122}