1use std::collections::{HashMap, HashSet};
2use std::fmt;
3use std::ops::{Deref, DerefMut};
4use std::pin::Pin;
5use std::str;
6
7use async_channel::{self as channel, bounded};
8#[cfg(feature = "runtime-async-std")]
9use async_std::io::{Read, Write, WriteExt};
10use base64::Engine as _;
11use extensions::id::{format_identification, parse_id};
12use extensions::quota::parse_get_quota_root;
13use futures::{io, Stream, TryStreamExt};
14use imap_proto::{Metadata, RequestId, Response};
15#[cfg(feature = "runtime-tokio")]
16use tokio::io::{AsyncRead as Read, AsyncWrite as Write, AsyncWriteExt};
17
18use super::authenticator::Authenticator;
19use super::error::{Error, ParseError, Result, ValidateError};
20use super::parse::*;
21use super::types::*;
22use crate::extensions::{self, quota::parse_get_quota};
23use crate::imap_stream::ImapStream;
24
25macro_rules! quote {
26 ($x:expr) => {
27 format!("\"{}\"", $x.replace(r"\", r"\\").replace("\"", "\\\""))
28 };
29}
30
31#[derive(Debug)]
41pub struct Session<T: Read + Write + Unpin + fmt::Debug> {
42 pub(crate) conn: Connection<T>,
43 pub(crate) unsolicited_responses_tx: channel::Sender<UnsolicitedResponse>,
44
45 pub unsolicited_responses: channel::Receiver<UnsolicitedResponse>,
48}
49
50impl<T: Read + Write + Unpin + fmt::Debug> Unpin for Session<T> {}
51impl<T: Read + Write + Unpin + fmt::Debug> Unpin for Client<T> {}
52impl<T: Read + Write + Unpin + fmt::Debug> Unpin for Connection<T> {}
53
54impl<T: Read + Write + Unpin + fmt::Debug> AsMut<T> for Session<T> {
57 fn as_mut(&mut self) -> &mut T {
58 self.conn.stream.as_mut()
59 }
60}
61
62#[derive(Debug)]
70pub struct Client<T: Read + Write + Unpin + fmt::Debug> {
71 conn: Connection<T>,
72}
73
74#[derive(Debug)]
77pub struct Connection<T: Read + Write + Unpin + fmt::Debug> {
78 pub(crate) stream: ImapStream<T>,
79
80 pub(crate) request_ids: IdGenerator,
82}
83
84impl<T: Read + Write + Unpin + fmt::Debug> Deref for Client<T> {
87 type Target = Connection<T>;
88
89 fn deref(&self) -> &Connection<T> {
90 &self.conn
91 }
92}
93
94impl<T: Read + Write + Unpin + fmt::Debug> DerefMut for Client<T> {
95 fn deref_mut(&mut self) -> &mut Connection<T> {
96 &mut self.conn
97 }
98}
99
100impl<T: Read + Write + Unpin + fmt::Debug> Deref for Session<T> {
101 type Target = Connection<T>;
102
103 fn deref(&self) -> &Connection<T> {
104 &self.conn
105 }
106}
107
108impl<T: Read + Write + Unpin + fmt::Debug> DerefMut for Session<T> {
109 fn deref_mut(&mut self) -> &mut Connection<T> {
110 &mut self.conn
111 }
112}
113
114macro_rules! ok_or_unauth_client_err {
122 ($r:expr, $self:expr) => {
123 match $r {
124 Ok(o) => o,
125 Err(e) => return Err((e.into(), $self)),
126 }
127 };
128}
129
130impl<T: Read + Write + Unpin + fmt::Debug + Send> Client<T> {
131 pub fn new(stream: T) -> Client<T> {
136 let stream = ImapStream::new(stream);
137
138 Client {
139 conn: Connection {
140 stream,
141 request_ids: IdGenerator::new(),
142 },
143 }
144 }
145
146 pub fn into_inner(self) -> T {
148 let Self { conn, .. } = self;
149 conn.into_inner()
150 }
151
152 pub async fn login<U: AsRef<str>, P: AsRef<str>>(
184 mut self,
185 username: U,
186 password: P,
187 ) -> ::std::result::Result<Session<T>, (Error, Client<T>)> {
188 let u = ok_or_unauth_client_err!(validate_str(username.as_ref()), self);
189 let p = ok_or_unauth_client_err!(validate_str(password.as_ref()), self);
190 ok_or_unauth_client_err!(
191 self.run_command_and_check_ok(&format!("LOGIN {u} {p}"), None)
192 .await,
193 self
194 );
195
196 Ok(Session::new(self.conn))
197 }
198
199 pub async fn authenticate<A: Authenticator, S: AsRef<str>>(
243 mut self,
244 auth_type: S,
245 authenticator: A,
246 ) -> ::std::result::Result<Session<T>, (Error, Client<T>)> {
247 let id = ok_or_unauth_client_err!(
248 self.run_command(&format!("AUTHENTICATE {}", auth_type.as_ref()))
249 .await,
250 self
251 );
252 let session = self.do_auth_handshake(id, authenticator).await?;
253 Ok(session)
254 }
255
256 async fn do_auth_handshake<A: Authenticator>(
258 mut self,
259 id: RequestId,
260 mut authenticator: A,
261 ) -> ::std::result::Result<Session<T>, (Error, Client<T>)> {
262 loop {
265 let Some(res) = ok_or_unauth_client_err!(self.read_response().await, self) else {
266 return Err((Error::ConnectionLost, self));
267 };
268 match res.parsed() {
269 Response::Continue { information, .. } => {
270 let challenge = if let Some(text) = information {
271 ok_or_unauth_client_err!(
272 base64::engine::general_purpose::STANDARD
273 .decode(text.as_ref())
274 .map_err(|e| Error::Parse(ParseError::Authentication(
275 (*text).to_string(),
276 Some(e)
277 ))),
278 self
279 )
280 } else {
281 Vec::new()
282 };
283 let raw_response = &mut authenticator.process(&challenge);
284 let auth_response =
285 base64::engine::general_purpose::STANDARD.encode(raw_response);
286
287 ok_or_unauth_client_err!(
288 self.conn.run_command_untagged(&auth_response).await,
289 self
290 );
291 }
292 _ => {
293 ok_or_unauth_client_err!(self.check_done_ok_from(&id, None, res).await, self);
294 return Ok(Session::new(self.conn));
295 }
296 }
297 }
298 }
299}
300
301impl<T: Read + Write + Unpin + fmt::Debug + Send> Session<T> {
302 unsafe_pinned!(conn: Connection<T>);
303
304 pub(crate) fn get_stream(self: Pin<&mut Self>) -> Pin<&mut ImapStream<T>> {
305 self.conn().stream()
306 }
307
308 fn new(conn: Connection<T>) -> Self {
310 let (tx, rx) = bounded(100);
311 Session {
312 conn,
313 unsolicited_responses: rx,
314 unsolicited_responses_tx: tx,
315 }
316 }
317
318 pub async fn select<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<Mailbox> {
336 let id = self
338 .run_command(&format!("SELECT {}", validate_str(mailbox_name.as_ref())?))
339 .await?;
340 let mbox = parse_mailbox(
341 &mut self.conn.stream,
342 self.unsolicited_responses_tx.clone(),
343 id,
344 )
345 .await?;
346
347 Ok(mbox)
348 }
349
350 pub async fn select_condstore<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<Mailbox> {
353 let id = self
354 .run_command(&format!(
355 "SELECT {} (CONDSTORE)",
356 validate_str(mailbox_name.as_ref())?
357 ))
358 .await?;
359 let mbox = parse_mailbox(
360 &mut self.conn.stream,
361 self.unsolicited_responses_tx.clone(),
362 id,
363 )
364 .await?;
365
366 Ok(mbox)
367 }
368
369 pub async fn examine<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<Mailbox> {
374 let id = self
375 .run_command(&format!("EXAMINE {}", validate_str(mailbox_name.as_ref())?))
376 .await?;
377 let mbox = parse_mailbox(
378 &mut self.conn.stream,
379 self.unsolicited_responses_tx.clone(),
380 id,
381 )
382 .await?;
383
384 Ok(mbox)
385 }
386
387 pub async fn fetch<S1, S2>(
446 &mut self,
447 sequence_set: S1,
448 query: S2,
449 ) -> Result<impl Stream<Item = Result<Fetch>> + '_ + Send>
450 where
451 S1: AsRef<str>,
452 S2: AsRef<str>,
453 {
454 let id = self
455 .run_command(&format!(
456 "FETCH {} {}",
457 sequence_set.as_ref(),
458 query.as_ref()
459 ))
460 .await?;
461 let res = parse_fetches(
462 &mut self.conn.stream,
463 self.unsolicited_responses_tx.clone(),
464 id,
465 );
466
467 Ok(res)
468 }
469
470 pub async fn uid_fetch<S1, S2>(
473 &mut self,
474 uid_set: S1,
475 query: S2,
476 ) -> Result<impl Stream<Item = Result<Fetch>> + '_ + Send + Unpin>
477 where
478 S1: AsRef<str>,
479 S2: AsRef<str>,
480 {
481 let id = self
482 .run_command(&format!(
483 "UID FETCH {} {}",
484 uid_set.as_ref(),
485 query.as_ref()
486 ))
487 .await?;
488 let res = parse_fetches(
489 &mut self.conn.stream,
490 self.unsolicited_responses_tx.clone(),
491 id,
492 );
493 Ok(res)
494 }
495
496 pub async fn noop(&mut self) -> Result<()> {
498 let id = self.run_command("NOOP").await?;
499 parse_noop(
500 &mut self.conn.stream,
501 self.unsolicited_responses_tx.clone(),
502 id,
503 )
504 .await?;
505 Ok(())
506 }
507
508 pub async fn logout(&mut self) -> Result<()> {
510 self.run_command_and_check_ok("LOGOUT").await?;
511 Ok(())
512 }
513
514 pub async fn create<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<()> {
537 self.run_command_and_check_ok(&format!("CREATE {}", validate_str(mailbox_name.as_ref())?))
538 .await?;
539
540 Ok(())
541 }
542
543 pub async fn delete<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<()> {
563 self.run_command_and_check_ok(&format!("DELETE {}", validate_str(mailbox_name.as_ref())?))
564 .await?;
565
566 Ok(())
567 }
568
569 pub async fn rename<S1: AsRef<str>, S2: AsRef<str>>(&mut self, from: S1, to: S2) -> Result<()> {
595 self.run_command_and_check_ok(&format!(
596 "RENAME {} {}",
597 validate_str(from.as_ref())?,
598 validate_str(to.as_ref())?
599 ))
600 .await?;
601
602 Ok(())
603 }
604
605 pub async fn subscribe<S: AsRef<str>>(&mut self, mailbox: S) -> Result<()> {
614 self.run_command_and_check_ok(&format!("SUBSCRIBE {}", validate_str(mailbox.as_ref())?))
615 .await?;
616 Ok(())
617 }
618
619 pub async fn unsubscribe<S: AsRef<str>>(&mut self, mailbox: S) -> Result<()> {
624 self.run_command_and_check_ok(&format!("UNSUBSCRIBE {}", validate_str(mailbox.as_ref())?))
625 .await?;
626 Ok(())
627 }
628
629 pub async fn capabilities(&mut self) -> Result<Capabilities> {
633 let id = self.run_command("CAPABILITY").await?;
634 let c = parse_capabilities(
635 &mut self.conn.stream,
636 self.unsolicited_responses_tx.clone(),
637 id,
638 )
639 .await?;
640 Ok(c)
641 }
642
643 pub async fn expunge(&mut self) -> Result<impl Stream<Item = Result<Seq>> + '_ + Send> {
647 let id = self.run_command("EXPUNGE").await?;
648 let res = parse_expunge(
649 &mut self.conn.stream,
650 self.unsolicited_responses_tx.clone(),
651 id,
652 );
653 Ok(res)
654 }
655
656 pub async fn uid_expunge<S: AsRef<str>>(
679 &mut self,
680 uid_set: S,
681 ) -> Result<impl Stream<Item = Result<Uid>> + '_ + Send> {
682 let id = self
683 .run_command(&format!("UID EXPUNGE {}", uid_set.as_ref()))
684 .await?;
685 let res = parse_expunge(
686 &mut self.conn.stream,
687 self.unsolicited_responses_tx.clone(),
688 id,
689 );
690 Ok(res)
691 }
692
693 pub async fn check(&mut self) -> Result<()> {
704 self.run_command_and_check_ok("CHECK").await?;
705 Ok(())
706 }
707
708 pub async fn close(&mut self) -> Result<()> {
724 self.run_command_and_check_ok("CLOSE").await?;
725 Ok(())
726 }
727
728 pub async fn store<S1, S2>(
778 &mut self,
779 sequence_set: S1,
780 query: S2,
781 ) -> Result<impl Stream<Item = Result<Fetch>> + '_ + Send>
782 where
783 S1: AsRef<str>,
784 S2: AsRef<str>,
785 {
786 let id = self
787 .run_command(&format!(
788 "STORE {} {}",
789 sequence_set.as_ref(),
790 query.as_ref()
791 ))
792 .await?;
793 let res = parse_fetches(
794 &mut self.conn.stream,
795 self.unsolicited_responses_tx.clone(),
796 id,
797 );
798 Ok(res)
799 }
800
801 pub async fn uid_store<S1, S2>(
804 &mut self,
805 uid_set: S1,
806 query: S2,
807 ) -> Result<impl Stream<Item = Result<Fetch>> + '_ + Send>
808 where
809 S1: AsRef<str>,
810 S2: AsRef<str>,
811 {
812 let id = self
813 .run_command(&format!(
814 "UID STORE {} {}",
815 uid_set.as_ref(),
816 query.as_ref()
817 ))
818 .await?;
819 let res = parse_fetches(
820 &mut self.conn.stream,
821 self.unsolicited_responses_tx.clone(),
822 id,
823 );
824 Ok(res)
825 }
826
827 pub async fn copy<S1: AsRef<str>, S2: AsRef<str>>(
835 &mut self,
836 sequence_set: S1,
837 mailbox_name: S2,
838 ) -> Result<()> {
839 self.run_command_and_check_ok(&format!(
840 "COPY {} {}",
841 sequence_set.as_ref(),
842 validate_str(mailbox_name.as_ref())?
843 ))
844 .await?;
845
846 Ok(())
847 }
848
849 pub async fn uid_copy<S1: AsRef<str>, S2: AsRef<str>>(
852 &mut self,
853 uid_set: S1,
854 mailbox_name: S2,
855 ) -> Result<()> {
856 self.run_command_and_check_ok(&format!(
857 "UID COPY {} {}",
858 uid_set.as_ref(),
859 validate_str(mailbox_name.as_ref())?
860 ))
861 .await?;
862
863 Ok(())
864 }
865
866 pub async fn mv<S1: AsRef<str>, S2: AsRef<str>>(
897 &mut self,
898 sequence_set: S1,
899 mailbox_name: S2,
900 ) -> Result<()> {
901 self.run_command_and_check_ok(&format!(
902 "MOVE {} {}",
903 sequence_set.as_ref(),
904 validate_str(mailbox_name.as_ref())?
905 ))
906 .await?;
907
908 Ok(())
909 }
910
911 pub async fn uid_mv<S1: AsRef<str>, S2: AsRef<str>>(
916 &mut self,
917 uid_set: S1,
918 mailbox_name: S2,
919 ) -> Result<()> {
920 self.run_command_and_check_ok(&format!(
921 "UID MOVE {} {}",
922 uid_set.as_ref(),
923 validate_str(mailbox_name.as_ref())?
924 ))
925 .await?;
926
927 Ok(())
928 }
929
930 pub async fn list(
962 &mut self,
963 reference_name: Option<&str>,
964 mailbox_pattern: Option<&str>,
965 ) -> Result<impl Stream<Item = Result<Name>> + '_ + Send> {
966 let id = self
967 .run_command(&format!(
968 "LIST {} {}",
969 validate_str(reference_name.unwrap_or(""))?,
970 mailbox_pattern.unwrap_or("\"\"")
971 ))
972 .await?;
973 let names = parse_names(
974 &mut self.conn.stream,
975 self.unsolicited_responses_tx.clone(),
976 id,
977 );
978
979 Ok(names)
980 }
981
982 pub async fn lsub(
998 &mut self,
999 reference_name: Option<&str>,
1000 mailbox_pattern: Option<&str>,
1001 ) -> Result<impl Stream<Item = Result<Name>> + '_ + Send> {
1002 let id = self
1003 .run_command(&format!(
1004 "LSUB {} {}",
1005 validate_str(reference_name.unwrap_or(""))?,
1006 validate_str(mailbox_pattern.unwrap_or(""))?
1007 ))
1008 .await?;
1009 let names = parse_names(
1010 &mut self.conn.stream,
1011 self.unsolicited_responses_tx.clone(),
1012 id,
1013 );
1014
1015 Ok(names)
1016 }
1017
1018 pub async fn status<S1: AsRef<str>, S2: AsRef<str>>(
1053 &mut self,
1054 mailbox_name: S1,
1055 data_items: S2,
1056 ) -> Result<Mailbox> {
1057 let id = self
1058 .run_command(&format!(
1059 "STATUS {} {}",
1060 validate_str(mailbox_name.as_ref())?,
1061 data_items.as_ref()
1062 ))
1063 .await?;
1064 let mbox = parse_status(
1065 &mut self.conn.stream,
1066 mailbox_name.as_ref(),
1067 self.unsolicited_responses_tx.clone(),
1068 id,
1069 )
1070 .await?;
1071 Ok(mbox)
1072 }
1073
1074 pub fn idle(self) -> extensions::idle::Handle<T> {
1093 extensions::idle::Handle::new(self)
1094 }
1095
1096 pub async fn append(
1116 &mut self,
1117 mailbox: impl AsRef<str>,
1118 flags: Option<&str>,
1119 internaldate: Option<&str>,
1120 content: impl AsRef<[u8]>,
1121 ) -> Result<()> {
1122 let content = content.as_ref();
1123 let id = self
1124 .run_command(&format!(
1125 "APPEND {}{}{}{}{} {{{}}}",
1126 validate_str(mailbox.as_ref())?,
1127 if flags.is_some() { " " } else { "" },
1128 flags.unwrap_or(""),
1129 if internaldate.is_some() { " " } else { "" },
1130 internaldate.unwrap_or(""),
1131 content.len()
1132 ))
1133 .await?;
1134
1135 let Some(res) = self.read_response().await? else {
1136 return Err(Error::Append);
1137 };
1138 let Response::Continue { .. } = res.parsed() else {
1139 return Err(Error::Append);
1140 };
1141
1142 self.stream.as_mut().write_all(content).await?;
1143 self.stream.as_mut().write_all(b"\r\n").await?;
1144 self.stream.flush().await?;
1145 self.conn
1146 .check_done_ok(&id, Some(self.unsolicited_responses_tx.clone()))
1147 .await?;
1148 Ok(())
1149 }
1150
1151 pub async fn search<S: AsRef<str>>(&mut self, query: S) -> Result<HashSet<Seq>> {
1196 let id = self
1197 .run_command(&format!("SEARCH {}", query.as_ref()))
1198 .await?;
1199 let seqs = parse_ids(
1200 &mut self.conn.stream,
1201 self.unsolicited_responses_tx.clone(),
1202 id,
1203 )
1204 .await?;
1205
1206 Ok(seqs)
1207 }
1208
1209 pub async fn uid_search<S: AsRef<str>>(&mut self, query: S) -> Result<HashSet<Uid>> {
1213 let id = self
1214 .run_command(&format!("UID SEARCH {}", query.as_ref()))
1215 .await?;
1216 let uids = parse_ids(
1217 &mut self.conn.stream,
1218 self.unsolicited_responses_tx.clone(),
1219 id,
1220 )
1221 .await?;
1222
1223 Ok(uids)
1224 }
1225
1226 pub async fn get_quota(&mut self, quota_root: &str) -> Result<Quota> {
1228 let id = self
1229 .run_command(format!("GETQUOTA {}", validate_str(quota_root)?))
1230 .await?;
1231 let c = parse_get_quota(
1232 &mut self.conn.stream,
1233 self.unsolicited_responses_tx.clone(),
1234 id,
1235 )
1236 .await?;
1237 Ok(c)
1238 }
1239
1240 pub async fn get_quota_root(
1242 &mut self,
1243 mailbox_name: &str,
1244 ) -> Result<(Vec<QuotaRoot>, Vec<Quota>)> {
1245 let id = self
1246 .run_command(format!("GETQUOTAROOT {}", validate_str(mailbox_name)?))
1247 .await?;
1248 let c = parse_get_quota_root(
1249 &mut self.conn.stream,
1250 self.unsolicited_responses_tx.clone(),
1251 id,
1252 )
1253 .await?;
1254 Ok(c)
1255 }
1256
1257 pub async fn get_metadata(
1259 &mut self,
1260 mailbox_name: &str,
1261 options: &str,
1262 entry_specifier: &str,
1263 ) -> Result<Vec<Metadata>> {
1264 let options = if options.is_empty() {
1265 String::new()
1266 } else {
1267 format!(" {options}")
1268 };
1269 let id = self
1270 .run_command(format!(
1271 "GETMETADATA {} {}{}",
1272 validate_str(mailbox_name)?,
1273 options,
1274 entry_specifier
1275 ))
1276 .await?;
1277 let metadata = parse_metadata(
1278 &mut self.conn.stream,
1279 mailbox_name,
1280 self.unsolicited_responses_tx.clone(),
1281 id,
1282 )
1283 .await?;
1284 Ok(metadata)
1285 }
1286
1287 pub async fn id(
1291 &mut self,
1292 identification: impl IntoIterator<Item = (&str, Option<&str>)>,
1293 ) -> Result<Option<HashMap<String, String>>> {
1294 let id = self
1295 .run_command(format!("ID ({})", format_identification(identification)))
1296 .await?;
1297 let server_identification = parse_id(
1298 &mut self.conn.stream,
1299 self.unsolicited_responses_tx.clone(),
1300 id,
1301 )
1302 .await?;
1303 Ok(server_identification)
1304 }
1305
1306 pub async fn id_nil(&mut self) -> Result<Option<HashMap<String, String>>> {
1310 let id = self.run_command("ID NIL").await?;
1311 let server_identification = parse_id(
1312 &mut self.conn.stream,
1313 self.unsolicited_responses_tx.clone(),
1314 id,
1315 )
1316 .await?;
1317 Ok(server_identification)
1318 }
1319
1320 pub async fn run_command_and_check_ok<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
1323 self.conn
1324 .run_command_and_check_ok(
1325 command.as_ref(),
1326 Some(self.unsolicited_responses_tx.clone()),
1327 )
1328 .await?;
1329
1330 Ok(())
1331 }
1332
1333 pub async fn run_command<S: AsRef<str>>(&mut self, command: S) -> Result<RequestId> {
1335 let id = self.conn.run_command(command.as_ref()).await?;
1336
1337 Ok(id)
1338 }
1339
1340 pub async fn run_command_untagged<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
1342 self.conn.run_command_untagged(command.as_ref()).await?;
1343
1344 Ok(())
1345 }
1346
1347 pub async fn read_response(&mut self) -> io::Result<Option<ResponseData>> {
1349 self.conn.read_response().await
1350 }
1351}
1352
1353impl<T: Read + Write + Unpin + fmt::Debug> Connection<T> {
1354 unsafe_pinned!(stream: ImapStream<T>);
1355
1356 pub fn get_ref(&self) -> &T {
1358 self.stream.get_ref()
1359 }
1360
1361 pub fn get_mut(&mut self) -> &mut T {
1363 self.stream.get_mut()
1364 }
1365
1366 pub fn into_inner(self) -> T {
1368 let Self { stream, .. } = self;
1369 stream.into_inner()
1370 }
1371
1372 pub async fn read_response(&mut self) -> io::Result<Option<ResponseData>> {
1374 self.stream.try_next().await
1375 }
1376
1377 pub(crate) async fn run_command_untagged(&mut self, command: &str) -> Result<()> {
1378 self.stream
1379 .encode(Request(None, command.as_bytes().into()))
1380 .await?;
1381 self.stream.flush().await?;
1382 Ok(())
1383 }
1384
1385 pub(crate) async fn run_command(&mut self, command: &str) -> Result<RequestId> {
1386 let request_id = self.request_ids.next().unwrap(); self.stream
1388 .encode(Request(Some(request_id.clone()), command.as_bytes().into()))
1389 .await?;
1390 self.stream.flush().await?;
1391 Ok(request_id)
1392 }
1393
1394 pub async fn run_command_and_check_ok(
1396 &mut self,
1397 command: &str,
1398 unsolicited: Option<channel::Sender<UnsolicitedResponse>>,
1399 ) -> Result<()> {
1400 let id = self.run_command(command).await?;
1401 self.check_done_ok(&id, unsolicited).await?;
1402
1403 Ok(())
1404 }
1405
1406 pub(crate) async fn check_done_ok(
1407 &mut self,
1408 id: &RequestId,
1409 unsolicited: Option<channel::Sender<UnsolicitedResponse>>,
1410 ) -> Result<()> {
1411 if let Some(first_res) = self.stream.try_next().await? {
1412 self.check_done_ok_from(id, unsolicited, first_res).await
1413 } else {
1414 Err(Error::ConnectionLost)
1415 }
1416 }
1417
1418 pub(crate) async fn check_done_ok_from(
1419 &mut self,
1420 id: &RequestId,
1421 unsolicited: Option<channel::Sender<UnsolicitedResponse>>,
1422 mut response: ResponseData,
1423 ) -> Result<()> {
1424 loop {
1425 if let Response::Done {
1426 status,
1427 code,
1428 information,
1429 tag,
1430 } = response.parsed()
1431 {
1432 self.check_status_ok(status, code.as_ref(), information.as_deref())?;
1433
1434 if tag == id {
1435 return Ok(());
1436 }
1437 }
1438
1439 if let Some(unsolicited) = unsolicited.clone() {
1440 handle_unilateral(response, unsolicited);
1441 }
1442
1443 let Some(res) = self.stream.try_next().await? else {
1444 return Err(Error::ConnectionLost);
1445 };
1446 response = res;
1447 }
1448 }
1449
1450 pub(crate) fn check_status_ok(
1451 &self,
1452 status: &imap_proto::Status,
1453 code: Option<&imap_proto::ResponseCode<'_>>,
1454 information: Option<&str>,
1455 ) -> Result<()> {
1456 use imap_proto::Status;
1457 match status {
1458 Status::Ok => Ok(()),
1459 Status::Bad => Err(Error::Bad(format!("code: {code:?}, info: {information:?}"))),
1460 Status::No => Err(Error::No(format!("code: {code:?}, info: {information:?}"))),
1461 _ => Err(Error::Io(io::Error::other(format!(
1462 "status: {status:?}, code: {code:?}, information: {information:?}"
1463 )))),
1464 }
1465 }
1466}
1467
1468fn validate_str(value: &str) -> Result<String> {
1469 let quoted = quote!(value);
1470 if quoted.find('\n').is_some() {
1471 return Err(Error::Validate(ValidateError('\n')));
1472 }
1473 if quoted.find('\r').is_some() {
1474 return Err(Error::Validate(ValidateError('\r')));
1475 }
1476 Ok(quoted)
1477}
1478
1479#[cfg(test)]
1480mod tests {
1481 use pretty_assertions::assert_eq;
1482
1483 use super::super::error::Result;
1484 use super::super::mock_stream::MockStream;
1485 use super::*;
1486 use std::borrow::Cow;
1487 use std::future::Future;
1488
1489 use async_std::sync::{Arc, Mutex};
1490 use futures::StreamExt;
1491 use imap_proto::Status;
1492
1493 macro_rules! mock_client {
1494 ($s:expr) => {
1495 Client::new($s)
1496 };
1497 }
1498
1499 macro_rules! mock_session {
1500 ($s:expr) => {
1501 Session::new(mock_client!($s).conn)
1502 };
1503 }
1504
1505 macro_rules! assert_eq_bytes {
1506 ($a:expr, $b:expr, $c:expr) => {
1507 assert_eq!(
1508 std::str::from_utf8($a).unwrap(),
1509 std::str::from_utf8($b).unwrap(),
1510 $c
1511 )
1512 };
1513 }
1514
1515 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1516 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1517 async fn fetch_body() {
1518 let response = "a0 OK Logged in.\r\n\
1519 * 2 FETCH (BODY[TEXT] {3}\r\nfoo)\r\n\
1520 a0 OK FETCH completed\r\n";
1521 let mut session = mock_session!(MockStream::new(response.as_bytes().to_vec()));
1522 session.read_response().await.unwrap().unwrap();
1523 session.read_response().await.unwrap().unwrap();
1524 }
1525
1526 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1527 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1528 async fn readline_delay_read() {
1529 let greeting = "* OK Dovecot ready.\r\n";
1530 let mock_stream = MockStream::default()
1531 .with_buf(greeting.as_bytes().to_vec())
1532 .with_delay();
1533
1534 let mut client = mock_client!(mock_stream);
1535 let actual_response = client.read_response().await.unwrap().unwrap();
1536 assert_eq!(
1537 actual_response.parsed(),
1538 &Response::Data {
1539 status: Status::Ok,
1540 code: None,
1541 information: Some(Cow::Borrowed("Dovecot ready.")),
1542 }
1543 );
1544 }
1545
1546 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1547 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1548 async fn readline_eof() {
1549 let mock_stream = MockStream::default().with_eof();
1550 let mut client = mock_client!(mock_stream);
1551 let res = client.read_response().await.unwrap();
1552 assert!(res.is_none());
1553 }
1554
1555 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1556 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1557 #[should_panic]
1558 async fn readline_err() {
1559 let mock_stream = MockStream::default().with_err();
1561 let mut client = mock_client!(mock_stream);
1562 client.read_response().await.unwrap().unwrap();
1563 }
1564
1565 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1566 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1567 async fn authenticate() {
1568 let response = b"+ YmFy\r\n\
1569 A0001 OK Logged in\r\n"
1570 .to_vec();
1571 let command = "A0001 AUTHENTICATE PLAIN\r\n\
1572 Zm9v\r\n";
1573 let mock_stream = MockStream::new(response);
1574 let client = mock_client!(mock_stream);
1575 enum Authenticate {
1576 Auth,
1577 }
1578 impl Authenticator for &Authenticate {
1579 type Response = Vec<u8>;
1580 fn process(&mut self, challenge: &[u8]) -> Self::Response {
1581 assert!(challenge == b"bar", "Invalid authenticate challenge");
1582 b"foo".to_vec()
1583 }
1584 }
1585 let session = client
1586 .authenticate("PLAIN", &Authenticate::Auth)
1587 .await
1588 .ok()
1589 .unwrap();
1590 assert_eq_bytes!(
1591 &session.stream.inner.written_buf,
1592 command.as_bytes(),
1593 "Invalid authenticate command"
1594 );
1595 }
1596
1597 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1598 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1599 async fn login() {
1600 let response = b"A0001 OK Logged in\r\n".to_vec();
1601 let username = "username";
1602 let password = "password";
1603 let command = format!("A0001 LOGIN {} {}\r\n", quote!(username), quote!(password));
1604 let mock_stream = MockStream::new(response);
1605 let client = mock_client!(mock_stream);
1606 if let Ok(session) = client.login(username, password).await {
1607 assert_eq!(
1608 session.stream.inner.written_buf,
1609 command.as_bytes().to_vec(),
1610 "Invalid login command"
1611 );
1612 } else {
1613 unreachable!("invalid login");
1614 }
1615 }
1616
1617 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1618 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1619 async fn logout() {
1620 let response = b"A0001 OK Logout completed.\r\n".to_vec();
1621 let command = "A0001 LOGOUT\r\n";
1622 let mock_stream = MockStream::new(response);
1623 let mut session = mock_session!(mock_stream);
1624 session.logout().await.unwrap();
1625 assert!(
1626 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1627 "Invalid logout command"
1628 );
1629 }
1630
1631 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1632 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1633 async fn rename() {
1634 let response = b"A0001 OK RENAME completed\r\n".to_vec();
1635 let current_mailbox_name = "INBOX";
1636 let new_mailbox_name = "NEWINBOX";
1637 let command = format!(
1638 "A0001 RENAME {} {}\r\n",
1639 quote!(current_mailbox_name),
1640 quote!(new_mailbox_name)
1641 );
1642 let mock_stream = MockStream::new(response);
1643 let mut session = mock_session!(mock_stream);
1644 session
1645 .rename(current_mailbox_name, new_mailbox_name)
1646 .await
1647 .unwrap();
1648 assert!(
1649 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1650 "Invalid rename command"
1651 );
1652 }
1653
1654 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1655 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1656 async fn subscribe() {
1657 let response = b"A0001 OK SUBSCRIBE completed\r\n".to_vec();
1658 let mailbox = "INBOX";
1659 let command = format!("A0001 SUBSCRIBE {}\r\n", quote!(mailbox));
1660 let mock_stream = MockStream::new(response);
1661 let mut session = mock_session!(mock_stream);
1662 session.subscribe(mailbox).await.unwrap();
1663 assert!(
1664 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1665 "Invalid subscribe command"
1666 );
1667 }
1668
1669 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1670 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1671 async fn unsubscribe() {
1672 let response = b"A0001 OK UNSUBSCRIBE completed\r\n".to_vec();
1673 let mailbox = "INBOX";
1674 let command = format!("A0001 UNSUBSCRIBE {}\r\n", quote!(mailbox));
1675 let mock_stream = MockStream::new(response);
1676 let mut session = mock_session!(mock_stream);
1677 session.unsubscribe(mailbox).await.unwrap();
1678 assert!(
1679 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1680 "Invalid unsubscribe command"
1681 );
1682 }
1683
1684 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1685 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1686 async fn expunge() {
1687 let response = b"A0001 OK EXPUNGE completed\r\n".to_vec();
1688 let mock_stream = MockStream::new(response);
1689 let mut session = mock_session!(mock_stream);
1690 session.expunge().await.unwrap().collect::<Vec<_>>().await;
1691 assert!(
1692 session.stream.inner.written_buf == b"A0001 EXPUNGE\r\n".to_vec(),
1693 "Invalid expunge command"
1694 );
1695 }
1696
1697 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1698 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1699 async fn uid_expunge() {
1700 let response = b"* 2 EXPUNGE\r\n\
1701 * 3 EXPUNGE\r\n\
1702 * 4 EXPUNGE\r\n\
1703 A0001 OK UID EXPUNGE completed\r\n"
1704 .to_vec();
1705 let mock_stream = MockStream::new(response);
1706 let mut session = mock_session!(mock_stream);
1707 session
1708 .uid_expunge("2:4")
1709 .await
1710 .unwrap()
1711 .collect::<Vec<_>>()
1712 .await;
1713 assert!(
1714 session.stream.inner.written_buf == b"A0001 UID EXPUNGE 2:4\r\n".to_vec(),
1715 "Invalid expunge command"
1716 );
1717 }
1718
1719 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1720 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1721 async fn check() {
1722 let response = b"A0001 OK CHECK completed\r\n".to_vec();
1723 let mock_stream = MockStream::new(response);
1724 let mut session = mock_session!(mock_stream);
1725 session.check().await.unwrap();
1726 assert!(
1727 session.stream.inner.written_buf == b"A0001 CHECK\r\n".to_vec(),
1728 "Invalid check command"
1729 );
1730 }
1731
1732 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1733 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1734 async fn examine() {
1735 let response = b"* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n\
1736 * OK [PERMANENTFLAGS ()] Read-only mailbox.\r\n\
1737 * 1 EXISTS\r\n\
1738 * 1 RECENT\r\n\
1739 * OK [UNSEEN 1] First unseen.\r\n\
1740 * OK [UIDVALIDITY 1257842737] UIDs valid\r\n\
1741 * OK [UIDNEXT 2] Predicted next UID\r\n\
1742 A0001 OK [READ-ONLY] Select completed.\r\n"
1743 .to_vec();
1744 let expected_mailbox = Mailbox {
1745 flags: vec![
1746 Flag::Answered,
1747 Flag::Flagged,
1748 Flag::Deleted,
1749 Flag::Seen,
1750 Flag::Draft,
1751 ],
1752 exists: 1,
1753 recent: 1,
1754 unseen: Some(1),
1755 permanent_flags: vec![],
1756 uid_next: Some(2),
1757 uid_validity: Some(1257842737),
1758 highest_modseq: None,
1759 };
1760 let mailbox_name = "INBOX";
1761 let command = format!("A0001 EXAMINE {}\r\n", quote!(mailbox_name));
1762 let mock_stream = MockStream::new(response);
1763 let mut session = mock_session!(mock_stream);
1764 let mailbox = session.examine(mailbox_name).await.unwrap();
1765 assert!(
1766 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1767 "Invalid examine command"
1768 );
1769 assert_eq!(mailbox, expected_mailbox);
1770 }
1771
1772 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1773 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1774 async fn select() {
1775 let response = b"* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n\
1776 * OK [PERMANENTFLAGS (\\* \\Answered \\Flagged \\Deleted \\Draft \\Seen)] \
1777 Read-only mailbox.\r\n\
1778 * 1 EXISTS\r\n\
1779 * 1 RECENT\r\n\
1780 * OK [UNSEEN 1] First unseen.\r\n\
1781 * OK [UIDVALIDITY 1257842737] UIDs valid\r\n\
1782 * OK [UIDNEXT 2] Predicted next UID\r\n\
1783 * OK [HIGHESTMODSEQ 90060115205545359] Highest mailbox modsequence\r\n\
1784 A0001 OK [READ-ONLY] Select completed.\r\n"
1785 .to_vec();
1786 let expected_mailbox = Mailbox {
1787 flags: vec![
1788 Flag::Answered,
1789 Flag::Flagged,
1790 Flag::Deleted,
1791 Flag::Seen,
1792 Flag::Draft,
1793 ],
1794 exists: 1,
1795 recent: 1,
1796 unseen: Some(1),
1797 permanent_flags: vec![
1798 Flag::MayCreate,
1799 Flag::Answered,
1800 Flag::Flagged,
1801 Flag::Deleted,
1802 Flag::Draft,
1803 Flag::Seen,
1804 ],
1805 uid_next: Some(2),
1806 uid_validity: Some(1257842737),
1807 highest_modseq: Some(90060115205545359),
1808 };
1809 let mailbox_name = "INBOX";
1810 let command = format!("A0001 SELECT {}\r\n", quote!(mailbox_name));
1811 let mock_stream = MockStream::new(response);
1812 let mut session = mock_session!(mock_stream);
1813 let mailbox = session.select(mailbox_name).await.unwrap();
1814 assert!(
1815 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1816 "Invalid select command"
1817 );
1818 assert_eq!(mailbox, expected_mailbox);
1819 }
1820
1821 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1822 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1823 async fn search() {
1824 let response = b"* SEARCH 1 2 3 4 5\r\n\
1825 A0001 OK Search completed\r\n"
1826 .to_vec();
1827 let mock_stream = MockStream::new(response);
1828 let mut session = mock_session!(mock_stream);
1829 let ids = session.search("Unseen").await.unwrap();
1830 let ids: HashSet<u32> = ids.iter().cloned().collect();
1831 assert!(
1832 session.stream.inner.written_buf == b"A0001 SEARCH Unseen\r\n".to_vec(),
1833 "Invalid search command"
1834 );
1835 assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect());
1836 }
1837
1838 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1839 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1840 async fn uid_search() {
1841 let response = b"* SEARCH 1 2 3 4 5\r\n\
1842 A0001 OK Search completed\r\n"
1843 .to_vec();
1844 let mock_stream = MockStream::new(response);
1845 let mut session = mock_session!(mock_stream);
1846 let ids = session.uid_search("Unseen").await.unwrap();
1847 let ids: HashSet<Uid> = ids.iter().cloned().collect();
1848 assert!(
1849 session.stream.inner.written_buf == b"A0001 UID SEARCH Unseen\r\n".to_vec(),
1850 "Invalid search command"
1851 );
1852 assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect());
1853 }
1854
1855 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1856 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1857 async fn uid_search_unordered() {
1858 let response = b"* SEARCH 1 2 3 4 5\r\n\
1859 A0002 OK CAPABILITY completed\r\n\
1860 A0001 OK Search completed\r\n"
1861 .to_vec();
1862 let mock_stream = MockStream::new(response);
1863 let mut session = mock_session!(mock_stream);
1864 let ids = session.uid_search("Unseen").await.unwrap();
1865 let ids: HashSet<Uid> = ids.iter().cloned().collect();
1866 assert!(
1867 session.stream.inner.written_buf == b"A0001 UID SEARCH Unseen\r\n".to_vec(),
1868 "Invalid search command"
1869 );
1870 assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect());
1871 }
1872
1873 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1874 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1875 async fn capability() {
1876 let response = b"* CAPABILITY IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r\n\
1877 A0001 OK CAPABILITY completed\r\n"
1878 .to_vec();
1879 let expected_capabilities = vec!["IMAP4rev1", "STARTTLS", "AUTH=GSSAPI", "LOGINDISABLED"];
1880 let mock_stream = MockStream::new(response);
1881 let mut session = mock_session!(mock_stream);
1882 let capabilities = session.capabilities().await.unwrap();
1883 assert!(
1884 session.stream.inner.written_buf == b"A0001 CAPABILITY\r\n".to_vec(),
1885 "Invalid capability command"
1886 );
1887 assert_eq!(capabilities.len(), 4);
1888 for e in expected_capabilities {
1889 assert!(capabilities.has_str(e));
1890 }
1891 }
1892
1893 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1894 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1895 async fn create() {
1896 let response = b"A0001 OK CREATE completed\r\n".to_vec();
1897 let mailbox_name = "INBOX";
1898 let command = format!("A0001 CREATE {}\r\n", quote!(mailbox_name));
1899 let mock_stream = MockStream::new(response);
1900 let mut session = mock_session!(mock_stream);
1901 session.create(mailbox_name).await.unwrap();
1902 assert!(
1903 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1904 "Invalid create command"
1905 );
1906 }
1907
1908 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1909 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1910 async fn delete() {
1911 let response = b"A0001 OK DELETE completed\r\n".to_vec();
1912 let mailbox_name = "INBOX";
1913 let command = format!("A0001 DELETE {}\r\n", quote!(mailbox_name));
1914 let mock_stream = MockStream::new(response);
1915 let mut session = mock_session!(mock_stream);
1916 session.delete(mailbox_name).await.unwrap();
1917 assert!(
1918 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1919 "Invalid delete command"
1920 );
1921 }
1922
1923 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1924 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1925 async fn noop() {
1926 let response = b"A0001 OK NOOP completed\r\n".to_vec();
1927 let mock_stream = MockStream::new(response);
1928 let mut session = mock_session!(mock_stream);
1929 session.noop().await.unwrap();
1930 assert!(
1931 session.stream.inner.written_buf == b"A0001 NOOP\r\n".to_vec(),
1932 "Invalid noop command"
1933 );
1934 }
1935
1936 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1937 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1938 async fn close() {
1939 let response = b"A0001 OK CLOSE completed\r\n".to_vec();
1940 let mock_stream = MockStream::new(response);
1941 let mut session = mock_session!(mock_stream);
1942 session.close().await.unwrap();
1943 assert!(
1944 session.stream.inner.written_buf == b"A0001 CLOSE\r\n".to_vec(),
1945 "Invalid close command"
1946 );
1947 }
1948
1949 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1950 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1951 async fn store() {
1952 generic_store(" ", |c, set, query| async move {
1953 c.lock()
1954 .await
1955 .store(set, query)
1956 .await?
1957 .collect::<Vec<_>>()
1958 .await;
1959 Ok(())
1960 })
1961 .await;
1962 }
1963
1964 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1965 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1966 async fn uid_store() {
1967 generic_store(" UID ", |c, set, query| async move {
1968 c.lock()
1969 .await
1970 .uid_store(set, query)
1971 .await?
1972 .collect::<Vec<_>>()
1973 .await;
1974 Ok(())
1975 })
1976 .await;
1977 }
1978
1979 async fn generic_store<'a, F, T, K>(prefix: &'a str, op: F)
1980 where
1981 F: 'a + FnOnce(Arc<Mutex<Session<MockStream>>>, &'a str, &'a str) -> K,
1982 K: 'a + Future<Output = Result<T>>,
1983 {
1984 let res = "* 2 FETCH (FLAGS (\\Deleted \\Seen))\r\n\
1985 * 3 FETCH (FLAGS (\\Deleted))\r\n\
1986 * 4 FETCH (FLAGS (\\Deleted \\Flagged \\Seen))\r\n\
1987 A0001 OK STORE completed\r\n";
1988
1989 generic_with_uid(res, "STORE", "2.4", "+FLAGS (\\Deleted)", prefix, op).await;
1990 }
1991
1992 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1993 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1994 async fn copy() {
1995 generic_copy(" ", |c, set, query| async move {
1996 c.lock().await.copy(set, query).await?;
1997 Ok(())
1998 })
1999 .await;
2000 }
2001
2002 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2003 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2004 async fn uid_copy() {
2005 generic_copy(" UID ", |c, set, query| async move {
2006 c.lock().await.uid_copy(set, query).await?;
2007 Ok(())
2008 })
2009 .await;
2010 }
2011
2012 async fn generic_copy<'a, F, T, K>(prefix: &'a str, op: F)
2013 where
2014 F: 'a + FnOnce(Arc<Mutex<Session<MockStream>>>, &'a str, &'a str) -> K,
2015 K: 'a + Future<Output = Result<T>>,
2016 {
2017 let resp = "A0001 OK COPY completed\r\n".as_bytes().to_vec();
2018 let seq = "2:4";
2019 let query = "MEETING";
2020 let line = format!("A0001{prefix}COPY {seq} {}\r\n", quote!(query));
2021 let session = Arc::new(Mutex::new(mock_session!(MockStream::new(resp))));
2022
2023 {
2024 let _ = op(session.clone(), seq, query).await.unwrap();
2025 }
2026 assert!(
2027 session.lock().await.stream.inner.written_buf == line.as_bytes().to_vec(),
2028 "Invalid command"
2029 );
2030 }
2031
2032 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2033 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2034 async fn mv() {
2035 let response = b"* OK [COPYUID 1511554416 142,399 41:42] Moved UIDs.\r\n\
2036 * 2 EXPUNGE\r\n\
2037 * 1 EXPUNGE\r\n\
2038 A0001 OK Move completed\r\n"
2039 .to_vec();
2040 let mailbox_name = "MEETING";
2041 let command = format!("A0001 MOVE 1:2 {}\r\n", quote!(mailbox_name));
2042 let mock_stream = MockStream::new(response);
2043 let mut session = mock_session!(mock_stream);
2044 session.mv("1:2", mailbox_name).await.unwrap();
2045 assert!(
2046 session.stream.inner.written_buf == command.as_bytes().to_vec(),
2047 "Invalid move command"
2048 );
2049 }
2050
2051 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2052 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2053 async fn uid_mv() {
2054 let response = b"* OK [COPYUID 1511554416 142,399 41:42] Moved UIDs.\r\n\
2055 * 2 EXPUNGE\r\n\
2056 * 1 EXPUNGE\r\n\
2057 A0001 OK Move completed\r\n"
2058 .to_vec();
2059 let mailbox_name = "MEETING";
2060 let command = format!("A0001 UID MOVE 41:42 {}\r\n", quote!(mailbox_name));
2061 let mock_stream = MockStream::new(response);
2062 let mut session = mock_session!(mock_stream);
2063 session.uid_mv("41:42", mailbox_name).await.unwrap();
2064 assert!(
2065 session.stream.inner.written_buf == command.as_bytes().to_vec(),
2066 "Invalid uid move command"
2067 );
2068 }
2069
2070 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2071 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2072 async fn fetch() {
2073 generic_fetch(" ", |c, seq, query| async move {
2074 c.lock()
2075 .await
2076 .fetch(seq, query)
2077 .await?
2078 .collect::<Vec<_>>()
2079 .await;
2080
2081 Ok(())
2082 })
2083 .await;
2084 }
2085
2086 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2087 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2088 async fn uid_fetch() {
2089 generic_fetch(" UID ", |c, seq, query| async move {
2090 c.lock()
2091 .await
2092 .uid_fetch(seq, query)
2093 .await?
2094 .collect::<Vec<_>>()
2095 .await;
2096 Ok(())
2097 })
2098 .await;
2099 }
2100
2101 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2102 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2103 async fn fetch_unexpected_eof() {
2104 let response = b"".to_vec();
2106
2107 let mock_stream = MockStream::new(response);
2108 let mut session = mock_session!(mock_stream);
2109
2110 {
2111 let mut fetch_result = session
2112 .uid_fetch("1:*", "(FLAGS BODY.PEEK[])")
2113 .await
2114 .unwrap();
2115
2116 let err = fetch_result.try_next().await.unwrap_err();
2118 let Error::Io(io_err) = err else {
2119 panic!("Unexpected error type: {err}")
2120 };
2121 assert_eq!(io_err.kind(), io::ErrorKind::UnexpectedEof);
2122 }
2123
2124 assert_eq!(
2125 session.stream.inner.written_buf,
2126 b"A0001 UID FETCH 1:* (FLAGS BODY.PEEK[])\r\n".to_vec()
2127 );
2128 }
2129
2130 async fn generic_fetch<'a, F, T, K>(prefix: &'a str, op: F)
2131 where
2132 F: 'a + FnOnce(Arc<Mutex<Session<MockStream>>>, &'a str, &'a str) -> K,
2133 K: 'a + Future<Output = Result<T>>,
2134 {
2135 generic_with_uid(
2136 "A0001 OK FETCH completed\r\n",
2137 "FETCH",
2138 "1",
2139 "BODY[]",
2140 prefix,
2141 op,
2142 )
2143 .await;
2144 }
2145
2146 async fn generic_with_uid<'a, F, T, K>(
2147 res: &'a str,
2148 cmd: &'a str,
2149 seq: &'a str,
2150 query: &'a str,
2151 prefix: &'a str,
2152 op: F,
2153 ) where
2154 F: 'a + FnOnce(Arc<Mutex<Session<MockStream>>>, &'a str, &'a str) -> K,
2155 K: 'a + Future<Output = Result<T>>,
2156 {
2157 let resp = res.as_bytes().to_vec();
2158 let line = format!("A0001{prefix}{cmd} {seq} {query}\r\n");
2159 let session = Arc::new(Mutex::new(mock_session!(MockStream::new(resp))));
2160
2161 {
2162 let _ = op(session.clone(), seq, query).await.unwrap();
2163 }
2164 assert!(
2165 session.lock().await.stream.inner.written_buf == line.as_bytes().to_vec(),
2166 "Invalid command"
2167 );
2168 }
2169
2170 #[test]
2171 fn quote_backslash() {
2172 assert_eq!("\"test\\\\text\"", quote!(r"test\text"));
2173 }
2174
2175 #[test]
2176 fn quote_dquote() {
2177 assert_eq!("\"test\\\"text\"", quote!("test\"text"));
2178 }
2179
2180 #[test]
2181 fn validate_random() {
2182 assert_eq!(
2183 "\"~iCQ_k;>[&\\\"sVCvUW`e<<P!wJ\"",
2184 &validate_str("~iCQ_k;>[&\"sVCvUW`e<<P!wJ").unwrap()
2185 );
2186 }
2187
2188 #[test]
2189 fn validate_newline() {
2190 if let Err(ref e) = validate_str("test\nstring") {
2191 if let Error::Validate(ref ve) = e {
2192 if ve.0 == '\n' {
2193 return;
2194 }
2195 }
2196 panic!("Wrong error: {e:?}");
2197 }
2198 panic!("No error");
2199 }
2200
2201 #[test]
2202 #[allow(unreachable_patterns)]
2203 fn validate_carriage_return() {
2204 if let Err(ref e) = validate_str("test\rstring") {
2205 if let Error::Validate(ref ve) = e {
2206 if ve.0 == '\r' {
2207 return;
2208 }
2209 }
2210 panic!("Wrong error: {e:?}");
2211 }
2212 panic!("No error");
2213 }
2214
2215 #[cfg(feature = "runtime-tokio")]
2219 async fn handle_client(stream: tokio::io::DuplexStream) -> Result<()> {
2220 use tokio::io::AsyncBufReadExt;
2221
2222 let (reader, mut writer) = tokio::io::split(stream);
2223 let reader = tokio::io::BufReader::new(reader);
2224
2225 let mut lines = reader.lines();
2226 while let Some(line) = lines.next_line().await? {
2227 let (request_id, request) = line.split_once(' ').unwrap();
2228 eprintln!("Received request {request_id}.");
2229
2230 let (id, _) = request
2231 .strip_prefix("FETCH ")
2232 .unwrap()
2233 .split_once(' ')
2234 .unwrap();
2235 let id = id.parse().unwrap();
2236
2237 let mut body = concat!(
2238 "From: Bob <bob@example.com>\r\n",
2239 "To: Alice <alice@example.org>\r\n",
2240 "Subject: Test\r\n",
2241 "Message-Id: <foobar@example.com>\r\n",
2242 "Date: Sun, 22 Mar 2020 00:00:00 +0100\r\n",
2243 "\r\n",
2244 )
2245 .to_string();
2246 for _ in 1..id {
2247 body +=
2248 "012345678901234567890123456789012345678901234567890123456789012345678901\r\n";
2249 }
2250 let body_len = body.len();
2251
2252 let response = format!("* {id} FETCH (RFC822.SIZE {body_len} BODY[] {{{body_len}}}\r\n{body} FLAGS (\\Seen))\r\n");
2253 writer.write_all(response.as_bytes()).await?;
2254 writer
2255 .write_all(format!("{request_id} OK FETCH completed\r\n").as_bytes())
2256 .await?;
2257 writer.flush().await?;
2258 }
2259
2260 Ok(())
2261 }
2262
2263 #[cfg(feature = "runtime-tokio")]
2270 #[cfg_attr(
2271 feature = "runtime-tokio",
2272 tokio::test(flavor = "multi_thread", worker_threads = 2)
2273 )]
2274 async fn large_fetch() -> Result<()> {
2275 use futures::TryStreamExt;
2276
2277 let (client, server) = tokio::io::duplex(4096);
2278 tokio::spawn(handle_client(server));
2279
2280 let client = crate::Client::new(client);
2281 let mut imap_session = Session::new(client.conn);
2282
2283 for i in 200..300 {
2284 eprintln!("Fetching {i}.");
2285 let mut messages_stream = imap_session
2286 .fetch(format!("{i}"), "(RFC822.SIZE BODY.PEEK[] FLAGS)")
2287 .await?;
2288 let fetch = messages_stream
2289 .try_next()
2290 .await?
2291 .expect("no FETCH returned");
2292 let body = fetch.body().expect("message did not have a body!");
2293 assert_eq!(body.len(), 76 + 74 * i);
2294
2295 let no_fetch = messages_stream.try_next().await?;
2296 assert!(no_fetch.is_none());
2297 drop(messages_stream);
2298 }
2299
2300 Ok(())
2301 }
2302
2303 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2304 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2305 async fn status() {
2306 {
2307 let response = b"* STATUS INBOX (UIDNEXT 25)\r\n\
2308 A0001 OK [CLIENTBUG] Status on selected mailbox completed (0.001 + 0.000 secs).\r\n"
2309 .to_vec();
2310
2311 let mock_stream = MockStream::new(response);
2312 let mut session = mock_session!(mock_stream);
2313 let status = session.status("INBOX", "(UIDNEXT)").await.unwrap();
2314 assert_eq!(
2315 session.stream.inner.written_buf,
2316 b"A0001 STATUS \"INBOX\" (UIDNEXT)\r\n".to_vec()
2317 );
2318 assert_eq!(status.uid_next, Some(25));
2319 }
2320
2321 {
2322 let response = b"* STATUS INBOX (RECENT 15)\r\n\
2323 A0001 OK STATUS completed\r\n"
2324 .to_vec();
2325
2326 let mock_stream = MockStream::new(response);
2327 let mut session = mock_session!(mock_stream);
2328 let status = session.status("INBOX", "(RECENT)").await.unwrap();
2329 assert_eq!(
2330 session.stream.inner.written_buf,
2331 b"A0001 STATUS \"INBOX\" (RECENT)\r\n".to_vec()
2332 );
2333 assert_eq!(status.recent, 15);
2334 }
2335
2336 {
2337 let response = b"* STATUS blurdybloop (MESSAGES 231 UIDNEXT 44292)\r\n\
2339 A0001 OK STATUS completed\r\n"
2340 .to_vec();
2341
2342 let mock_stream = MockStream::new(response);
2343 let mut session = mock_session!(mock_stream);
2344 let status = session
2345 .status("blurdybloop", "(UIDNEXT MESSAGES)")
2346 .await
2347 .unwrap();
2348 assert_eq!(
2349 session.stream.inner.written_buf,
2350 b"A0001 STATUS \"blurdybloop\" (UIDNEXT MESSAGES)\r\n".to_vec()
2351 );
2352 assert_eq!(status.uid_next, Some(44292));
2353 assert_eq!(status.exists, 231);
2354 }
2355 }
2356
2357 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2358 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2359 async fn append() {
2360 {
2361 let response = b"+ OK\r\nA0001 OK [APPENDUID 1725735035 2] Append completed (0.052 + 12.097 + 0.049 secs).\r\n".to_vec();
2365
2366 let mock_stream = MockStream::new(response);
2367 let mut session = mock_session!(mock_stream);
2368 session
2369 .append("INBOX", Some(r"(\Seen)"), None, "foobarbaz")
2370 .await
2371 .unwrap();
2372 assert_eq!(
2373 session.stream.inner.written_buf,
2374 b"A0001 APPEND \"INBOX\" (\\Seen) {9}\r\nfoobarbaz\r\n".to_vec()
2375 );
2376 }
2377
2378 {
2379 let response = b"+ OK\r\n* 3 EXISTS\r\n* 2 RECENT\r\nA0001 OK [APPENDUID 1725735035 2] Append completed (0.052 + 12.097 + 0.049 secs).\r\n".to_vec();
2383
2384 let mock_stream = MockStream::new(response);
2385 let mut session = mock_session!(mock_stream);
2386 session
2387 .append("INBOX", Some(r"(\Seen)"), None, "foobarbaz")
2388 .await
2389 .unwrap();
2390 assert_eq!(
2391 session.stream.inner.written_buf,
2392 b"A0001 APPEND \"INBOX\" (\\Seen) {9}\r\nfoobarbaz\r\n".to_vec()
2393 );
2394 let exists_response = session.unsolicited_responses.recv().await.unwrap();
2395 assert_eq!(exists_response, UnsolicitedResponse::Exists(3));
2396 let recent_response = session.unsolicited_responses.recv().await.unwrap();
2397 assert_eq!(recent_response, UnsolicitedResponse::Recent(2));
2398 }
2399
2400 {
2401 let response =
2403 b"A0001 NO [TRYCREATE] Mailbox doesn't exist: foobar (0.001 + 0.000 secs)."
2404 .to_vec();
2405 let mock_stream = MockStream::new(response);
2406 let mut session = mock_session!(mock_stream);
2407 session
2408 .append("foobar", None, None, "foobarbaz")
2409 .await
2410 .unwrap_err();
2411 assert_eq!(
2412 session.stream.inner.written_buf,
2413 b"A0001 APPEND \"foobar\" {9}\r\n".to_vec()
2414 );
2415 }
2416 }
2417
2418 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2419 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2420 async fn get_metadata() {
2421 {
2422 let response = b"* METADATA \"INBOX\" (/private/comment \"My own comment\")\r\n\
2423 A0001 OK GETMETADATA complete\r\n"
2424 .to_vec();
2425
2426 let mock_stream = MockStream::new(response);
2427 let mut session = mock_session!(mock_stream);
2428 let metadata = session
2429 .get_metadata("INBOX", "", "/private/comment")
2430 .await
2431 .unwrap();
2432 assert_eq!(
2433 session.stream.inner.written_buf,
2434 b"A0001 GETMETADATA \"INBOX\" /private/comment\r\n".to_vec()
2435 );
2436 assert_eq!(metadata.len(), 1);
2437 assert_eq!(metadata[0].entry, "/private/comment");
2438 assert_eq!(metadata[0].value.as_ref().unwrap(), "My own comment");
2439 }
2440
2441 {
2442 let response = b"* METADATA \"INBOX\" (/shared/comment \"Shared comment\" /private/comment \"My own comment\")\r\n\
2443 A0001 OK GETMETADATA complete\r\n"
2444 .to_vec();
2445
2446 let mock_stream = MockStream::new(response);
2447 let mut session = mock_session!(mock_stream);
2448 let metadata = session
2449 .get_metadata("INBOX", "", "(/shared/comment /private/comment)")
2450 .await
2451 .unwrap();
2452 assert_eq!(
2453 session.stream.inner.written_buf,
2454 b"A0001 GETMETADATA \"INBOX\" (/shared/comment /private/comment)\r\n".to_vec()
2455 );
2456 assert_eq!(metadata.len(), 2);
2457 assert_eq!(metadata[0].entry, "/shared/comment");
2458 assert_eq!(metadata[0].value.as_ref().unwrap(), "Shared comment");
2459 assert_eq!(metadata[1].entry, "/private/comment");
2460 assert_eq!(metadata[1].value.as_ref().unwrap(), "My own comment");
2461 }
2462
2463 {
2464 let response = b"* METADATA \"\" (/shared/comment {15}\r\nChatmail server /shared/admin {28}\r\nmailto:root@nine.testrun.org)\r\n\
2465 A0001 OK OK Getmetadata completed (0.001 + 0.000 secs).\r\n"
2466 .to_vec();
2467
2468 let mock_stream = MockStream::new(response);
2469 let mut session = mock_session!(mock_stream);
2470 let metadata = session
2471 .get_metadata("", "", "(/shared/comment /shared/admin)")
2472 .await
2473 .unwrap();
2474 assert_eq!(
2475 session.stream.inner.written_buf,
2476 b"A0001 GETMETADATA \"\" (/shared/comment /shared/admin)\r\n".to_vec()
2477 );
2478 assert_eq!(metadata.len(), 2);
2479 assert_eq!(metadata[0].entry, "/shared/comment");
2480 assert_eq!(metadata[0].value.as_ref().unwrap(), "Chatmail server");
2481 assert_eq!(metadata[1].entry, "/shared/admin");
2482 assert_eq!(
2483 metadata[1].value.as_ref().unwrap(),
2484 "mailto:root@nine.testrun.org"
2485 );
2486 }
2487
2488 {
2489 let response = b"* METADATA \"\" (/shared/comment \"Chatmail server\")\r\n\
2490 * METADATA \"\" (/shared/admin \"mailto:root@nine.testrun.org\")\r\n\
2491 A0001 OK OK Getmetadata completed (0.001 + 0.000 secs).\r\n"
2492 .to_vec();
2493
2494 let mock_stream = MockStream::new(response);
2495 let mut session = mock_session!(mock_stream);
2496 let metadata = session
2497 .get_metadata("", "", "(/shared/comment /shared/admin)")
2498 .await
2499 .unwrap();
2500 assert_eq!(
2501 session.stream.inner.written_buf,
2502 b"A0001 GETMETADATA \"\" (/shared/comment /shared/admin)\r\n".to_vec()
2503 );
2504 assert_eq!(metadata.len(), 2);
2505 assert_eq!(metadata[0].entry, "/shared/comment");
2506 assert_eq!(metadata[0].value.as_ref().unwrap(), "Chatmail server");
2507 assert_eq!(metadata[1].entry, "/shared/admin");
2508 assert_eq!(
2509 metadata[1].value.as_ref().unwrap(),
2510 "mailto:root@nine.testrun.org"
2511 );
2512 }
2513
2514 {
2515 let response = b"* METADATA \"\" (/shared/comment NIL /shared/admin NIL)\r\n\
2516 A0001 OK OK Getmetadata completed (0.001 + 0.000 secs).\r\n"
2517 .to_vec();
2518
2519 let mock_stream = MockStream::new(response);
2520 let mut session = mock_session!(mock_stream);
2521 let metadata = session
2522 .get_metadata("", "", "(/shared/comment /shared/admin)")
2523 .await
2524 .unwrap();
2525 assert_eq!(
2526 session.stream.inner.written_buf,
2527 b"A0001 GETMETADATA \"\" (/shared/comment /shared/admin)\r\n".to_vec()
2528 );
2529 assert_eq!(metadata.len(), 2);
2530 assert_eq!(metadata[0].entry, "/shared/comment");
2531 assert_eq!(metadata[0].value, None);
2532 assert_eq!(metadata[1].entry, "/shared/admin");
2533 assert_eq!(metadata[1].value, None);
2534 }
2535 }
2536
2537 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2538 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2539 async fn test_get_quota_root() {
2540 {
2541 let response = b"* QUOTAROOT Sent Userquota\r\n\
2542 * QUOTA Userquota (STORAGE 4855 48576)\r\n\
2543 A0001 OK Getquotaroot completed (0.004 + 0.000 + 0.004 secs).\r\n"
2544 .to_vec();
2545
2546 let mock_stream = MockStream::new(response);
2547 let mut session = mock_session!(mock_stream);
2548 let (quotaroots, quota) = dbg!(session.get_quota_root("Sent").await.unwrap());
2549 assert_eq!(
2550 str::from_utf8(&session.stream.inner.written_buf).unwrap(),
2551 "A0001 GETQUOTAROOT \"Sent\"\r\n"
2552 );
2553 assert_eq!(
2554 quotaroots,
2555 vec![QuotaRoot {
2556 mailbox_name: "Sent".to_string(),
2557 quota_root_names: vec!["Userquota".to_string(),],
2558 },],
2559 );
2560 assert_eq!(
2561 quota,
2562 vec![Quota {
2563 root_name: "Userquota".to_string(),
2564 resources: vec![QuotaResource {
2565 name: QuotaResourceName::Storage,
2566 usage: 4855,
2567 limit: 48576,
2568 }],
2569 }]
2570 );
2571 assert_eq!(quota[0].resources[0].get_usage_percentage(), 9);
2572 }
2573
2574 {
2575 let response = b"* QUOTAROOT \"INBOX\" \"#19\"\r\n\
2576 * QUOTA \"#19\" (STORAGE 0 0)\r\n\
2577 A0001 OK GETQUOTAROOT successful.\r\n"
2578 .to_vec();
2579
2580 let mock_stream = MockStream::new(response);
2581 let mut session = mock_session!(mock_stream);
2582 let (quotaroots, quota) = session.get_quota_root("INBOX").await.unwrap();
2583 assert_eq!(
2584 str::from_utf8(&session.stream.inner.written_buf).unwrap(),
2585 "A0001 GETQUOTAROOT \"INBOX\"\r\n"
2586 );
2587 assert_eq!(
2588 quotaroots,
2589 vec![QuotaRoot {
2590 mailbox_name: "INBOX".to_string(),
2591 quota_root_names: vec!["#19".to_string(),],
2592 },],
2593 );
2594 assert_eq!(
2595 quota,
2596 vec![Quota {
2597 root_name: "#19".to_string(),
2598 resources: vec![QuotaResource {
2599 name: QuotaResourceName::Storage,
2600 usage: 0,
2601 limit: 0,
2602 }],
2603 }]
2604 );
2605 assert_eq!(quota[0].resources[0].get_usage_percentage(), 0);
2606 }
2607 }
2608
2609 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2610 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2611 async fn test_parsing_error() {
2612 let response = b"220 mail.example.org ESMTP Postcow\r\n".to_vec();
2614 let command = "A0001 NOOP\r\n";
2615 let mock_stream = MockStream::new(response);
2616 let mut session = mock_session!(mock_stream);
2617 assert!(session
2618 .noop()
2619 .await
2620 .unwrap_err()
2621 .to_string()
2622 .contains("220 mail.example.org ESMTP Postcow"));
2623 assert!(
2624 session.stream.inner.written_buf == command.as_bytes().to_vec(),
2625 "Invalid NOOP command"
2626 );
2627 }
2628}