1use std::collections::{HashMap, HashSet};
2use std::fmt;
3use std::ops::{Deref, DerefMut};
4use std::pin::Pin;
5use std::str;
6
7use async_channel::{self as channel, bounded};
8#[cfg(feature = "runtime-async-std")]
9use async_std::io::{Read, Write, WriteExt};
10use base64::Engine as _;
11use extensions::id::{format_identification, parse_id};
12use extensions::quota::parse_get_quota_root;
13use futures::{io, Stream, StreamExt};
14use imap_proto::{Metadata, RequestId, Response};
15#[cfg(feature = "runtime-tokio")]
16use tokio::io::{AsyncRead as Read, AsyncWrite as Write, AsyncWriteExt};
17
18use super::authenticator::Authenticator;
19use super::error::{Error, ParseError, Result, ValidateError};
20use super::parse::*;
21use super::types::*;
22use crate::extensions::{self, quota::parse_get_quota};
23use crate::imap_stream::ImapStream;
24
25macro_rules! quote {
26 ($x:expr) => {
27 format!("\"{}\"", $x.replace(r"\", r"\\").replace("\"", "\\\""))
28 };
29}
30
31#[derive(Debug)]
41pub struct Session<T: Read + Write + Unpin + fmt::Debug> {
42 pub(crate) conn: Connection<T>,
43 pub(crate) unsolicited_responses_tx: channel::Sender<UnsolicitedResponse>,
44
45 pub unsolicited_responses: channel::Receiver<UnsolicitedResponse>,
48}
49
50impl<T: Read + Write + Unpin + fmt::Debug> Unpin for Session<T> {}
51impl<T: Read + Write + Unpin + fmt::Debug> Unpin for Client<T> {}
52impl<T: Read + Write + Unpin + fmt::Debug> Unpin for Connection<T> {}
53
54impl<T: Read + Write + Unpin + fmt::Debug> AsMut<T> for Session<T> {
57 fn as_mut(&mut self) -> &mut T {
58 self.conn.stream.as_mut()
59 }
60}
61
62#[derive(Debug)]
70pub struct Client<T: Read + Write + Unpin + fmt::Debug> {
71 conn: Connection<T>,
72}
73
74#[derive(Debug)]
77pub struct Connection<T: Read + Write + Unpin + fmt::Debug> {
78 pub(crate) stream: ImapStream<T>,
79
80 pub(crate) request_ids: IdGenerator,
82}
83
84impl<T: Read + Write + Unpin + fmt::Debug> Deref for Client<T> {
87 type Target = Connection<T>;
88
89 fn deref(&self) -> &Connection<T> {
90 &self.conn
91 }
92}
93
94impl<T: Read + Write + Unpin + fmt::Debug> DerefMut for Client<T> {
95 fn deref_mut(&mut self) -> &mut Connection<T> {
96 &mut self.conn
97 }
98}
99
100impl<T: Read + Write + Unpin + fmt::Debug> Deref for Session<T> {
101 type Target = Connection<T>;
102
103 fn deref(&self) -> &Connection<T> {
104 &self.conn
105 }
106}
107
108impl<T: Read + Write + Unpin + fmt::Debug> DerefMut for Session<T> {
109 fn deref_mut(&mut self) -> &mut Connection<T> {
110 &mut self.conn
111 }
112}
113
114macro_rules! ok_or_unauth_client_err {
122 ($r:expr, $self:expr) => {
123 match $r {
124 Ok(o) => o,
125 Err(e) => return Err((e, $self)),
126 }
127 };
128}
129
130impl<T: Read + Write + Unpin + fmt::Debug + Send> Client<T> {
131 pub fn new(stream: T) -> Client<T> {
136 let stream = ImapStream::new(stream);
137
138 Client {
139 conn: Connection {
140 stream,
141 request_ids: IdGenerator::new(),
142 },
143 }
144 }
145
146 pub fn into_inner(self) -> T {
148 let Self { conn, .. } = self;
149 conn.into_inner()
150 }
151
152 pub async fn login<U: AsRef<str>, P: AsRef<str>>(
184 mut self,
185 username: U,
186 password: P,
187 ) -> ::std::result::Result<Session<T>, (Error, Client<T>)> {
188 let u = ok_or_unauth_client_err!(validate_str(username.as_ref()), self);
189 let p = ok_or_unauth_client_err!(validate_str(password.as_ref()), self);
190 ok_or_unauth_client_err!(
191 self.run_command_and_check_ok(&format!("LOGIN {} {}", u, p), None)
192 .await,
193 self
194 );
195
196 Ok(Session::new(self.conn))
197 }
198
199 pub async fn authenticate<A: Authenticator, S: AsRef<str>>(
243 mut self,
244 auth_type: S,
245 authenticator: A,
246 ) -> ::std::result::Result<Session<T>, (Error, Client<T>)> {
247 let id = ok_or_unauth_client_err!(
248 self.run_command(&format!("AUTHENTICATE {}", auth_type.as_ref()))
249 .await,
250 self
251 );
252 let session = self.do_auth_handshake(id, authenticator).await?;
253 Ok(session)
254 }
255
256 async fn do_auth_handshake<A: Authenticator>(
258 mut self,
259 id: RequestId,
260 mut authenticator: A,
261 ) -> ::std::result::Result<Session<T>, (Error, Client<T>)> {
262 loop {
265 if let Some(res) = self.read_response().await {
266 let res = ok_or_unauth_client_err!(res.map_err(Into::into), self);
267 match res.parsed() {
268 Response::Continue { information, .. } => {
269 let challenge = if let Some(text) = information {
270 ok_or_unauth_client_err!(
271 base64::engine::general_purpose::STANDARD
272 .decode(text.as_ref())
273 .map_err(|e| Error::Parse(ParseError::Authentication(
274 (*text).to_string(),
275 Some(e)
276 ))),
277 self
278 )
279 } else {
280 Vec::new()
281 };
282 let raw_response = &mut authenticator.process(&challenge);
283 let auth_response =
284 base64::engine::general_purpose::STANDARD.encode(raw_response);
285
286 ok_or_unauth_client_err!(
287 self.conn.run_command_untagged(&auth_response).await,
288 self
289 );
290 }
291 _ => {
292 ok_or_unauth_client_err!(
293 self.check_done_ok_from(&id, None, res).await,
294 self
295 );
296 return Ok(Session::new(self.conn));
297 }
298 }
299 } else {
300 return Err((Error::ConnectionLost, self));
301 }
302 }
303 }
304}
305
306impl<T: Read + Write + Unpin + fmt::Debug + Send> Session<T> {
307 unsafe_pinned!(conn: Connection<T>);
308
309 pub(crate) fn get_stream(self: Pin<&mut Self>) -> Pin<&mut ImapStream<T>> {
310 self.conn().stream()
311 }
312
313 fn new(conn: Connection<T>) -> Self {
315 let (tx, rx) = bounded(100);
316 Session {
317 conn,
318 unsolicited_responses: rx,
319 unsolicited_responses_tx: tx,
320 }
321 }
322
323 pub async fn select<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<Mailbox> {
341 let id = self
343 .run_command(&format!("SELECT {}", validate_str(mailbox_name.as_ref())?))
344 .await?;
345 let mbox = parse_mailbox(
346 &mut self.conn.stream,
347 self.unsolicited_responses_tx.clone(),
348 id,
349 )
350 .await?;
351
352 Ok(mbox)
353 }
354
355 pub async fn select_condstore<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<Mailbox> {
358 let id = self
359 .run_command(&format!(
360 "SELECT {} (CONDSTORE)",
361 validate_str(mailbox_name.as_ref())?
362 ))
363 .await?;
364 let mbox = parse_mailbox(
365 &mut self.conn.stream,
366 self.unsolicited_responses_tx.clone(),
367 id,
368 )
369 .await?;
370
371 Ok(mbox)
372 }
373
374 pub async fn examine<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<Mailbox> {
379 let id = self
380 .run_command(&format!("EXAMINE {}", validate_str(mailbox_name.as_ref())?))
381 .await?;
382 let mbox = parse_mailbox(
383 &mut self.conn.stream,
384 self.unsolicited_responses_tx.clone(),
385 id,
386 )
387 .await?;
388
389 Ok(mbox)
390 }
391
392 pub async fn fetch<S1, S2>(
451 &mut self,
452 sequence_set: S1,
453 query: S2,
454 ) -> Result<impl Stream<Item = Result<Fetch>> + '_ + Send>
455 where
456 S1: AsRef<str>,
457 S2: AsRef<str>,
458 {
459 let id = self
460 .run_command(&format!(
461 "FETCH {} {}",
462 sequence_set.as_ref(),
463 query.as_ref()
464 ))
465 .await?;
466 let res = parse_fetches(
467 &mut self.conn.stream,
468 self.unsolicited_responses_tx.clone(),
469 id,
470 );
471
472 Ok(res)
473 }
474
475 pub async fn uid_fetch<S1, S2>(
478 &mut self,
479 uid_set: S1,
480 query: S2,
481 ) -> Result<impl Stream<Item = Result<Fetch>> + '_ + Send + Unpin>
482 where
483 S1: AsRef<str>,
484 S2: AsRef<str>,
485 {
486 let id = self
487 .run_command(&format!(
488 "UID FETCH {} {}",
489 uid_set.as_ref(),
490 query.as_ref()
491 ))
492 .await?;
493 let res = parse_fetches(
494 &mut self.conn.stream,
495 self.unsolicited_responses_tx.clone(),
496 id,
497 );
498 Ok(res)
499 }
500
501 pub async fn noop(&mut self) -> Result<()> {
503 let id = self.run_command("NOOP").await?;
504 parse_noop(
505 &mut self.conn.stream,
506 self.unsolicited_responses_tx.clone(),
507 id,
508 )
509 .await?;
510 Ok(())
511 }
512
513 pub async fn logout(&mut self) -> Result<()> {
515 self.run_command_and_check_ok("LOGOUT").await?;
516 Ok(())
517 }
518
519 pub async fn create<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<()> {
542 self.run_command_and_check_ok(&format!("CREATE {}", validate_str(mailbox_name.as_ref())?))
543 .await?;
544
545 Ok(())
546 }
547
548 pub async fn delete<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<()> {
568 self.run_command_and_check_ok(&format!("DELETE {}", validate_str(mailbox_name.as_ref())?))
569 .await?;
570
571 Ok(())
572 }
573
574 pub async fn rename<S1: AsRef<str>, S2: AsRef<str>>(&mut self, from: S1, to: S2) -> Result<()> {
600 self.run_command_and_check_ok(&format!(
601 "RENAME {} {}",
602 quote!(from.as_ref()),
603 quote!(to.as_ref())
604 ))
605 .await?;
606
607 Ok(())
608 }
609
610 pub async fn subscribe<S: AsRef<str>>(&mut self, mailbox: S) -> Result<()> {
619 self.run_command_and_check_ok(&format!("SUBSCRIBE {}", quote!(mailbox.as_ref())))
620 .await?;
621 Ok(())
622 }
623
624 pub async fn unsubscribe<S: AsRef<str>>(&mut self, mailbox: S) -> Result<()> {
629 self.run_command_and_check_ok(&format!("UNSUBSCRIBE {}", quote!(mailbox.as_ref())))
630 .await?;
631 Ok(())
632 }
633
634 pub async fn capabilities(&mut self) -> Result<Capabilities> {
638 let id = self.run_command("CAPABILITY").await?;
639 let c = parse_capabilities(
640 &mut self.conn.stream,
641 self.unsolicited_responses_tx.clone(),
642 id,
643 )
644 .await?;
645 Ok(c)
646 }
647
648 pub async fn expunge(&mut self) -> Result<impl Stream<Item = Result<Seq>> + '_ + Send> {
652 let id = self.run_command("EXPUNGE").await?;
653 let res = parse_expunge(
654 &mut self.conn.stream,
655 self.unsolicited_responses_tx.clone(),
656 id,
657 );
658 Ok(res)
659 }
660
661 pub async fn uid_expunge<S: AsRef<str>>(
684 &mut self,
685 uid_set: S,
686 ) -> Result<impl Stream<Item = Result<Uid>> + '_ + Send> {
687 let id = self
688 .run_command(&format!("UID EXPUNGE {}", uid_set.as_ref()))
689 .await?;
690 let res = parse_expunge(
691 &mut self.conn.stream,
692 self.unsolicited_responses_tx.clone(),
693 id,
694 );
695 Ok(res)
696 }
697
698 pub async fn check(&mut self) -> Result<()> {
709 self.run_command_and_check_ok("CHECK").await?;
710 Ok(())
711 }
712
713 pub async fn close(&mut self) -> Result<()> {
729 self.run_command_and_check_ok("CLOSE").await?;
730 Ok(())
731 }
732
733 pub async fn store<S1, S2>(
783 &mut self,
784 sequence_set: S1,
785 query: S2,
786 ) -> Result<impl Stream<Item = Result<Fetch>> + '_ + Send>
787 where
788 S1: AsRef<str>,
789 S2: AsRef<str>,
790 {
791 let id = self
792 .run_command(&format!(
793 "STORE {} {}",
794 sequence_set.as_ref(),
795 query.as_ref()
796 ))
797 .await?;
798 let res = parse_fetches(
799 &mut self.conn.stream,
800 self.unsolicited_responses_tx.clone(),
801 id,
802 );
803 Ok(res)
804 }
805
806 pub async fn uid_store<S1, S2>(
809 &mut self,
810 uid_set: S1,
811 query: S2,
812 ) -> Result<impl Stream<Item = Result<Fetch>> + '_ + Send>
813 where
814 S1: AsRef<str>,
815 S2: AsRef<str>,
816 {
817 let id = self
818 .run_command(&format!(
819 "UID STORE {} {}",
820 uid_set.as_ref(),
821 query.as_ref()
822 ))
823 .await?;
824 let res = parse_fetches(
825 &mut self.conn.stream,
826 self.unsolicited_responses_tx.clone(),
827 id,
828 );
829 Ok(res)
830 }
831
832 pub async fn copy<S1: AsRef<str>, S2: AsRef<str>>(
840 &mut self,
841 sequence_set: S1,
842 mailbox_name: S2,
843 ) -> Result<()> {
844 self.run_command_and_check_ok(&format!(
845 "COPY {} {}",
846 sequence_set.as_ref(),
847 mailbox_name.as_ref()
848 ))
849 .await?;
850
851 Ok(())
852 }
853
854 pub async fn uid_copy<S1: AsRef<str>, S2: AsRef<str>>(
857 &mut self,
858 uid_set: S1,
859 mailbox_name: S2,
860 ) -> Result<()> {
861 self.run_command_and_check_ok(&format!(
862 "UID COPY {} {}",
863 uid_set.as_ref(),
864 mailbox_name.as_ref()
865 ))
866 .await?;
867
868 Ok(())
869 }
870
871 pub async fn mv<S1: AsRef<str>, S2: AsRef<str>>(
902 &mut self,
903 sequence_set: S1,
904 mailbox_name: S2,
905 ) -> Result<()> {
906 self.run_command_and_check_ok(&format!(
907 "MOVE {} {}",
908 sequence_set.as_ref(),
909 validate_str(mailbox_name.as_ref())?
910 ))
911 .await?;
912
913 Ok(())
914 }
915
916 pub async fn uid_mv<S1: AsRef<str>, S2: AsRef<str>>(
921 &mut self,
922 uid_set: S1,
923 mailbox_name: S2,
924 ) -> Result<()> {
925 self.run_command_and_check_ok(&format!(
926 "UID MOVE {} {}",
927 uid_set.as_ref(),
928 validate_str(mailbox_name.as_ref())?
929 ))
930 .await?;
931
932 Ok(())
933 }
934
935 pub async fn list(
967 &mut self,
968 reference_name: Option<&str>,
969 mailbox_pattern: Option<&str>,
970 ) -> Result<impl Stream<Item = Result<Name>> + '_ + Send> {
971 let id = self
972 .run_command(&format!(
973 "LIST {} {}",
974 quote!(reference_name.unwrap_or("")),
975 mailbox_pattern.unwrap_or("\"\"")
976 ))
977 .await?;
978
979 Ok(parse_names(
980 &mut self.conn.stream,
981 self.unsolicited_responses_tx.clone(),
982 id,
983 ))
984 }
985
986 pub async fn lsub(
1002 &mut self,
1003 reference_name: Option<&str>,
1004 mailbox_pattern: Option<&str>,
1005 ) -> Result<impl Stream<Item = Result<Name>> + '_ + Send> {
1006 let id = self
1007 .run_command(&format!(
1008 "LSUB {} {}",
1009 quote!(reference_name.unwrap_or("")),
1010 mailbox_pattern.unwrap_or("")
1011 ))
1012 .await?;
1013 let names = parse_names(
1014 &mut self.conn.stream,
1015 self.unsolicited_responses_tx.clone(),
1016 id,
1017 );
1018
1019 Ok(names)
1020 }
1021
1022 pub async fn status<S1: AsRef<str>, S2: AsRef<str>>(
1057 &mut self,
1058 mailbox_name: S1,
1059 data_items: S2,
1060 ) -> Result<Mailbox> {
1061 let id = self
1062 .run_command(&format!(
1063 "STATUS {} {}",
1064 validate_str(mailbox_name.as_ref())?,
1065 data_items.as_ref()
1066 ))
1067 .await?;
1068 let mbox = parse_status(
1069 &mut self.conn.stream,
1070 mailbox_name.as_ref(),
1071 self.unsolicited_responses_tx.clone(),
1072 id,
1073 )
1074 .await?;
1075 Ok(mbox)
1076 }
1077
1078 pub fn idle(self) -> extensions::idle::Handle<T> {
1097 extensions::idle::Handle::new(self)
1098 }
1099
1100 pub async fn append(
1120 &mut self,
1121 mailbox: impl AsRef<str>,
1122 flags: Option<&str>,
1123 internaldate: Option<&str>,
1124 content: impl AsRef<[u8]>,
1125 ) -> Result<()> {
1126 let content = content.as_ref();
1127 let id = self
1128 .run_command(&format!(
1129 "APPEND \"{}\"{}{}{}{} {{{}}}",
1130 mailbox.as_ref(),
1131 if flags.is_some() { " " } else { "" },
1132 flags.unwrap_or(""),
1133 if internaldate.is_some() { " " } else { "" },
1134 internaldate.unwrap_or(""),
1135 content.len()
1136 ))
1137 .await?;
1138
1139 match self.read_response().await {
1140 Some(Ok(res)) => {
1141 if let Response::Continue { .. } = res.parsed() {
1142 self.stream.as_mut().write_all(content).await?;
1143 self.stream.as_mut().write_all(b"\r\n").await?;
1144 self.stream.flush().await?;
1145 self.conn
1146 .check_done_ok(&id, Some(self.unsolicited_responses_tx.clone()))
1147 .await?;
1148 Ok(())
1149 } else {
1150 Err(Error::Append)
1151 }
1152 }
1153 Some(Err(err)) => Err(err.into()),
1154 _ => Err(Error::Append),
1155 }
1156 }
1157
1158 pub async fn search<S: AsRef<str>>(&mut self, query: S) -> Result<HashSet<Seq>> {
1203 let id = self
1204 .run_command(&format!("SEARCH {}", query.as_ref()))
1205 .await?;
1206 let seqs = parse_ids(
1207 &mut self.conn.stream,
1208 self.unsolicited_responses_tx.clone(),
1209 id,
1210 )
1211 .await?;
1212
1213 Ok(seqs)
1214 }
1215
1216 pub async fn uid_search<S: AsRef<str>>(&mut self, query: S) -> Result<HashSet<Uid>> {
1220 let id = self
1221 .run_command(&format!("UID SEARCH {}", query.as_ref()))
1222 .await?;
1223 let uids = parse_ids(
1224 &mut self.conn.stream,
1225 self.unsolicited_responses_tx.clone(),
1226 id,
1227 )
1228 .await?;
1229
1230 Ok(uids)
1231 }
1232
1233 pub async fn get_quota(&mut self, quota_root: &str) -> Result<Quota> {
1235 let id = self
1236 .run_command(format!("GETQUOTA {}", quote!(quota_root)))
1237 .await?;
1238 let c = parse_get_quota(
1239 &mut self.conn.stream,
1240 self.unsolicited_responses_tx.clone(),
1241 id,
1242 )
1243 .await?;
1244 Ok(c)
1245 }
1246
1247 pub async fn get_quota_root(
1249 &mut self,
1250 mailbox_name: &str,
1251 ) -> Result<(Vec<QuotaRoot>, Vec<Quota>)> {
1252 let id = self
1253 .run_command(format!("GETQUOTAROOT {}", quote!(mailbox_name)))
1254 .await?;
1255 let c = parse_get_quota_root(
1256 &mut self.conn.stream,
1257 self.unsolicited_responses_tx.clone(),
1258 id,
1259 )
1260 .await?;
1261 Ok(c)
1262 }
1263
1264 pub async fn get_metadata(
1266 &mut self,
1267 mailbox_name: &str,
1268 options: &str,
1269 entry_specifier: &str,
1270 ) -> Result<Vec<Metadata>> {
1271 let options = if options.is_empty() {
1272 String::new()
1273 } else {
1274 format!(" {options}")
1275 };
1276 let id = self
1277 .run_command(format!(
1278 "GETMETADATA {} {}{}",
1279 quote!(mailbox_name),
1280 options,
1281 entry_specifier
1282 ))
1283 .await?;
1284 let metadata = parse_metadata(
1285 &mut self.conn.stream,
1286 mailbox_name,
1287 self.unsolicited_responses_tx.clone(),
1288 id,
1289 )
1290 .await?;
1291 Ok(metadata)
1292 }
1293
1294 pub async fn id(
1298 &mut self,
1299 identification: impl IntoIterator<Item = (&str, Option<&str>)>,
1300 ) -> Result<Option<HashMap<String, String>>> {
1301 let id = self
1302 .run_command(format!("ID ({})", format_identification(identification)))
1303 .await?;
1304 let server_identification = parse_id(
1305 &mut self.conn.stream,
1306 self.unsolicited_responses_tx.clone(),
1307 id,
1308 )
1309 .await?;
1310 Ok(server_identification)
1311 }
1312
1313 pub async fn id_nil(&mut self) -> Result<Option<HashMap<String, String>>> {
1317 let id = self.run_command("ID NIL").await?;
1318 let server_identification = parse_id(
1319 &mut self.conn.stream,
1320 self.unsolicited_responses_tx.clone(),
1321 id,
1322 )
1323 .await?;
1324 Ok(server_identification)
1325 }
1326
1327 pub async fn run_command_and_check_ok<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
1330 self.conn
1331 .run_command_and_check_ok(
1332 command.as_ref(),
1333 Some(self.unsolicited_responses_tx.clone()),
1334 )
1335 .await?;
1336
1337 Ok(())
1338 }
1339
1340 pub async fn run_command<S: AsRef<str>>(&mut self, command: S) -> Result<RequestId> {
1342 let id = self.conn.run_command(command.as_ref()).await?;
1343
1344 Ok(id)
1345 }
1346
1347 pub async fn run_command_untagged<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
1349 self.conn.run_command_untagged(command.as_ref()).await?;
1350
1351 Ok(())
1352 }
1353
1354 pub async fn read_response(&mut self) -> Option<io::Result<ResponseData>> {
1356 self.conn.read_response().await
1357 }
1358}
1359
1360impl<T: Read + Write + Unpin + fmt::Debug> Connection<T> {
1361 unsafe_pinned!(stream: ImapStream<T>);
1362
1363 pub fn into_inner(self) -> T {
1365 let Self { stream, .. } = self;
1366 stream.into_inner()
1367 }
1368
1369 pub async fn read_response(&mut self) -> Option<io::Result<ResponseData>> {
1371 self.stream.next().await
1372 }
1373
1374 pub(crate) async fn run_command_untagged(&mut self, command: &str) -> Result<()> {
1375 self.stream
1376 .encode(Request(None, command.as_bytes().into()))
1377 .await?;
1378 self.stream.flush().await?;
1379 Ok(())
1380 }
1381
1382 pub(crate) async fn run_command(&mut self, command: &str) -> Result<RequestId> {
1383 let request_id = self.request_ids.next().unwrap(); self.stream
1385 .encode(Request(Some(request_id.clone()), command.as_bytes().into()))
1386 .await?;
1387 self.stream.flush().await?;
1388 Ok(request_id)
1389 }
1390
1391 pub async fn run_command_and_check_ok(
1393 &mut self,
1394 command: &str,
1395 unsolicited: Option<channel::Sender<UnsolicitedResponse>>,
1396 ) -> Result<()> {
1397 let id = self.run_command(command).await?;
1398 self.check_done_ok(&id, unsolicited).await?;
1399
1400 Ok(())
1401 }
1402
1403 pub(crate) async fn check_done_ok(
1404 &mut self,
1405 id: &RequestId,
1406 unsolicited: Option<channel::Sender<UnsolicitedResponse>>,
1407 ) -> Result<()> {
1408 if let Some(first_res) = self.stream.next().await {
1409 self.check_done_ok_from(id, unsolicited, first_res?).await
1410 } else {
1411 Err(Error::ConnectionLost)
1412 }
1413 }
1414
1415 pub(crate) async fn check_done_ok_from(
1416 &mut self,
1417 id: &RequestId,
1418 unsolicited: Option<channel::Sender<UnsolicitedResponse>>,
1419 mut response: ResponseData,
1420 ) -> Result<()> {
1421 loop {
1422 if let Response::Done {
1423 status,
1424 code,
1425 information,
1426 tag,
1427 } = response.parsed()
1428 {
1429 self.check_status_ok(status, code.as_ref(), information.as_deref())?;
1430
1431 if tag == id {
1432 return Ok(());
1433 }
1434 }
1435
1436 if let Some(unsolicited) = unsolicited.clone() {
1437 handle_unilateral(response, unsolicited);
1438 }
1439
1440 if let Some(res) = self.stream.next().await {
1441 response = res?;
1442 } else {
1443 return Err(Error::ConnectionLost);
1444 }
1445 }
1446 }
1447
1448 pub(crate) fn check_status_ok(
1449 &self,
1450 status: &imap_proto::Status,
1451 code: Option<&imap_proto::ResponseCode<'_>>,
1452 information: Option<&str>,
1453 ) -> Result<()> {
1454 use imap_proto::Status;
1455 match status {
1456 Status::Ok => Ok(()),
1457 Status::Bad => Err(Error::Bad(format!(
1458 "code: {:?}, info: {:?}",
1459 code, information
1460 ))),
1461 Status::No => Err(Error::No(format!(
1462 "code: {:?}, info: {:?}",
1463 code, information
1464 ))),
1465 _ => Err(Error::Io(io::Error::new(
1466 io::ErrorKind::Other,
1467 format!(
1468 "status: {:?}, code: {:?}, information: {:?}",
1469 status, code, information
1470 ),
1471 ))),
1472 }
1473 }
1474}
1475
1476fn validate_str(value: &str) -> Result<String> {
1477 let quoted = quote!(value);
1478 if quoted.find('\n').is_some() {
1479 return Err(Error::Validate(ValidateError('\n')));
1480 }
1481 if quoted.find('\r').is_some() {
1482 return Err(Error::Validate(ValidateError('\r')));
1483 }
1484 Ok(quoted)
1485}
1486
1487#[cfg(test)]
1488mod tests {
1489 use pretty_assertions::assert_eq;
1490
1491 use super::super::error::Result;
1492 use super::super::mock_stream::MockStream;
1493 use super::*;
1494 use std::borrow::Cow;
1495 use std::future::Future;
1496
1497 use async_std::sync::{Arc, Mutex};
1498 use imap_proto::Status;
1499
1500 macro_rules! mock_client {
1501 ($s:expr) => {
1502 Client::new($s)
1503 };
1504 }
1505
1506 macro_rules! mock_session {
1507 ($s:expr) => {
1508 Session::new(mock_client!($s).conn)
1509 };
1510 }
1511
1512 macro_rules! assert_eq_bytes {
1513 ($a:expr, $b:expr, $c:expr) => {
1514 assert_eq!(
1515 std::str::from_utf8($a).unwrap(),
1516 std::str::from_utf8($b).unwrap(),
1517 $c
1518 )
1519 };
1520 }
1521
1522 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1523 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1524 async fn fetch_body() {
1525 let response = "a0 OK Logged in.\r\n\
1526 * 2 FETCH (BODY[TEXT] {3}\r\nfoo)\r\n\
1527 a0 OK FETCH completed\r\n";
1528 let mut session = mock_session!(MockStream::new(response.as_bytes().to_vec()));
1529 session.read_response().await.unwrap().unwrap();
1530 session.read_response().await.unwrap().unwrap();
1531 }
1532
1533 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1534 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1535 async fn readline_delay_read() {
1536 let greeting = "* OK Dovecot ready.\r\n";
1537 let mock_stream = MockStream::default()
1538 .with_buf(greeting.as_bytes().to_vec())
1539 .with_delay();
1540
1541 let mut client = mock_client!(mock_stream);
1542 let actual_response = client.read_response().await.unwrap().unwrap();
1543 assert_eq!(
1544 actual_response.parsed(),
1545 &Response::Data {
1546 status: Status::Ok,
1547 code: None,
1548 information: Some(Cow::Borrowed("Dovecot ready.")),
1549 }
1550 );
1551 }
1552
1553 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1554 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1555 async fn readline_eof() {
1556 let mock_stream = MockStream::default().with_eof();
1557 let mut client = mock_client!(mock_stream);
1558 let res = client.read_response().await;
1559 assert!(res.is_none());
1560 }
1561
1562 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1563 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1564 #[should_panic]
1565 async fn readline_err() {
1566 let mock_stream = MockStream::default().with_err();
1568 let mut client = mock_client!(mock_stream);
1569 client.read_response().await.unwrap().unwrap();
1570 }
1571
1572 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1573 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1574 async fn authenticate() {
1575 let response = b"+ YmFy\r\n\
1576 A0001 OK Logged in\r\n"
1577 .to_vec();
1578 let command = "A0001 AUTHENTICATE PLAIN\r\n\
1579 Zm9v\r\n";
1580 let mock_stream = MockStream::new(response);
1581 let client = mock_client!(mock_stream);
1582 enum Authenticate {
1583 Auth,
1584 }
1585 impl Authenticator for &Authenticate {
1586 type Response = Vec<u8>;
1587 fn process(&mut self, challenge: &[u8]) -> Self::Response {
1588 assert!(challenge == b"bar", "Invalid authenticate challenge");
1589 b"foo".to_vec()
1590 }
1591 }
1592 let session = client
1593 .authenticate("PLAIN", &Authenticate::Auth)
1594 .await
1595 .ok()
1596 .unwrap();
1597 assert_eq_bytes!(
1598 &session.stream.inner.written_buf,
1599 command.as_bytes(),
1600 "Invalid authenticate command"
1601 );
1602 }
1603
1604 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1605 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1606 async fn login() {
1607 let response = b"A0001 OK Logged in\r\n".to_vec();
1608 let username = "username";
1609 let password = "password";
1610 let command = format!("A0001 LOGIN {} {}\r\n", quote!(username), quote!(password));
1611 let mock_stream = MockStream::new(response);
1612 let client = mock_client!(mock_stream);
1613 if let Ok(session) = client.login(username, password).await {
1614 assert_eq!(
1615 session.stream.inner.written_buf,
1616 command.as_bytes().to_vec(),
1617 "Invalid login command"
1618 );
1619 } else {
1620 unreachable!("invalid login");
1621 }
1622 }
1623
1624 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1625 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1626 async fn logout() {
1627 let response = b"A0001 OK Logout completed.\r\n".to_vec();
1628 let command = "A0001 LOGOUT\r\n";
1629 let mock_stream = MockStream::new(response);
1630 let mut session = mock_session!(mock_stream);
1631 session.logout().await.unwrap();
1632 assert!(
1633 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1634 "Invalid logout command"
1635 );
1636 }
1637
1638 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1639 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1640 async fn rename() {
1641 let response = b"A0001 OK RENAME completed\r\n".to_vec();
1642 let current_mailbox_name = "INBOX";
1643 let new_mailbox_name = "NEWINBOX";
1644 let command = format!(
1645 "A0001 RENAME {} {}\r\n",
1646 quote!(current_mailbox_name),
1647 quote!(new_mailbox_name)
1648 );
1649 let mock_stream = MockStream::new(response);
1650 let mut session = mock_session!(mock_stream);
1651 session
1652 .rename(current_mailbox_name, new_mailbox_name)
1653 .await
1654 .unwrap();
1655 assert!(
1656 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1657 "Invalid rename command"
1658 );
1659 }
1660
1661 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1662 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1663 async fn subscribe() {
1664 let response = b"A0001 OK SUBSCRIBE completed\r\n".to_vec();
1665 let mailbox = "INBOX";
1666 let command = format!("A0001 SUBSCRIBE {}\r\n", quote!(mailbox));
1667 let mock_stream = MockStream::new(response);
1668 let mut session = mock_session!(mock_stream);
1669 session.subscribe(mailbox).await.unwrap();
1670 assert!(
1671 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1672 "Invalid subscribe command"
1673 );
1674 }
1675
1676 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1677 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1678 async fn unsubscribe() {
1679 let response = b"A0001 OK UNSUBSCRIBE completed\r\n".to_vec();
1680 let mailbox = "INBOX";
1681 let command = format!("A0001 UNSUBSCRIBE {}\r\n", quote!(mailbox));
1682 let mock_stream = MockStream::new(response);
1683 let mut session = mock_session!(mock_stream);
1684 session.unsubscribe(mailbox).await.unwrap();
1685 assert!(
1686 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1687 "Invalid unsubscribe command"
1688 );
1689 }
1690
1691 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1692 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1693 async fn expunge() {
1694 let response = b"A0001 OK EXPUNGE completed\r\n".to_vec();
1695 let mock_stream = MockStream::new(response);
1696 let mut session = mock_session!(mock_stream);
1697 session.expunge().await.unwrap().collect::<Vec<_>>().await;
1698 assert!(
1699 session.stream.inner.written_buf == b"A0001 EXPUNGE\r\n".to_vec(),
1700 "Invalid expunge command"
1701 );
1702 }
1703
1704 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1705 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1706 async fn uid_expunge() {
1707 let response = b"* 2 EXPUNGE\r\n\
1708 * 3 EXPUNGE\r\n\
1709 * 4 EXPUNGE\r\n\
1710 A0001 OK UID EXPUNGE completed\r\n"
1711 .to_vec();
1712 let mock_stream = MockStream::new(response);
1713 let mut session = mock_session!(mock_stream);
1714 session
1715 .uid_expunge("2:4")
1716 .await
1717 .unwrap()
1718 .collect::<Vec<_>>()
1719 .await;
1720 assert!(
1721 session.stream.inner.written_buf == b"A0001 UID EXPUNGE 2:4\r\n".to_vec(),
1722 "Invalid expunge command"
1723 );
1724 }
1725
1726 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1727 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1728 async fn check() {
1729 let response = b"A0001 OK CHECK completed\r\n".to_vec();
1730 let mock_stream = MockStream::new(response);
1731 let mut session = mock_session!(mock_stream);
1732 session.check().await.unwrap();
1733 assert!(
1734 session.stream.inner.written_buf == b"A0001 CHECK\r\n".to_vec(),
1735 "Invalid check command"
1736 );
1737 }
1738
1739 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1740 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1741 async fn examine() {
1742 let response = b"* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n\
1743 * OK [PERMANENTFLAGS ()] Read-only mailbox.\r\n\
1744 * 1 EXISTS\r\n\
1745 * 1 RECENT\r\n\
1746 * OK [UNSEEN 1] First unseen.\r\n\
1747 * OK [UIDVALIDITY 1257842737] UIDs valid\r\n\
1748 * OK [UIDNEXT 2] Predicted next UID\r\n\
1749 A0001 OK [READ-ONLY] Select completed.\r\n"
1750 .to_vec();
1751 let expected_mailbox = Mailbox {
1752 flags: vec![
1753 Flag::Answered,
1754 Flag::Flagged,
1755 Flag::Deleted,
1756 Flag::Seen,
1757 Flag::Draft,
1758 ],
1759 exists: 1,
1760 recent: 1,
1761 unseen: Some(1),
1762 permanent_flags: vec![],
1763 uid_next: Some(2),
1764 uid_validity: Some(1257842737),
1765 highest_modseq: None,
1766 };
1767 let mailbox_name = "INBOX";
1768 let command = format!("A0001 EXAMINE {}\r\n", quote!(mailbox_name));
1769 let mock_stream = MockStream::new(response);
1770 let mut session = mock_session!(mock_stream);
1771 let mailbox = session.examine(mailbox_name).await.unwrap();
1772 assert!(
1773 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1774 "Invalid examine command"
1775 );
1776 assert_eq!(mailbox, expected_mailbox);
1777 }
1778
1779 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1780 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1781 async fn select() {
1782 let response = b"* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n\
1783 * OK [PERMANENTFLAGS (\\* \\Answered \\Flagged \\Deleted \\Draft \\Seen)] \
1784 Read-only mailbox.\r\n\
1785 * 1 EXISTS\r\n\
1786 * 1 RECENT\r\n\
1787 * OK [UNSEEN 1] First unseen.\r\n\
1788 * OK [UIDVALIDITY 1257842737] UIDs valid\r\n\
1789 * OK [UIDNEXT 2] Predicted next UID\r\n\
1790 * OK [HIGHESTMODSEQ 90060115205545359] Highest mailbox modsequence\r\n\
1791 A0001 OK [READ-ONLY] Select completed.\r\n"
1792 .to_vec();
1793 let expected_mailbox = Mailbox {
1794 flags: vec![
1795 Flag::Answered,
1796 Flag::Flagged,
1797 Flag::Deleted,
1798 Flag::Seen,
1799 Flag::Draft,
1800 ],
1801 exists: 1,
1802 recent: 1,
1803 unseen: Some(1),
1804 permanent_flags: vec![
1805 Flag::MayCreate,
1806 Flag::Answered,
1807 Flag::Flagged,
1808 Flag::Deleted,
1809 Flag::Draft,
1810 Flag::Seen,
1811 ],
1812 uid_next: Some(2),
1813 uid_validity: Some(1257842737),
1814 highest_modseq: Some(90060115205545359),
1815 };
1816 let mailbox_name = "INBOX";
1817 let command = format!("A0001 SELECT {}\r\n", quote!(mailbox_name));
1818 let mock_stream = MockStream::new(response);
1819 let mut session = mock_session!(mock_stream);
1820 let mailbox = session.select(mailbox_name).await.unwrap();
1821 assert!(
1822 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1823 "Invalid select command"
1824 );
1825 assert_eq!(mailbox, expected_mailbox);
1826 }
1827
1828 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1829 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1830 async fn search() {
1831 let response = b"* SEARCH 1 2 3 4 5\r\n\
1832 A0001 OK Search completed\r\n"
1833 .to_vec();
1834 let mock_stream = MockStream::new(response);
1835 let mut session = mock_session!(mock_stream);
1836 let ids = session.search("Unseen").await.unwrap();
1837 let ids: HashSet<u32> = ids.iter().cloned().collect();
1838 assert!(
1839 session.stream.inner.written_buf == b"A0001 SEARCH Unseen\r\n".to_vec(),
1840 "Invalid search command"
1841 );
1842 assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect());
1843 }
1844
1845 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1846 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1847 async fn uid_search() {
1848 let response = b"* SEARCH 1 2 3 4 5\r\n\
1849 A0001 OK Search completed\r\n"
1850 .to_vec();
1851 let mock_stream = MockStream::new(response);
1852 let mut session = mock_session!(mock_stream);
1853 let ids = session.uid_search("Unseen").await.unwrap();
1854 let ids: HashSet<Uid> = ids.iter().cloned().collect();
1855 assert!(
1856 session.stream.inner.written_buf == b"A0001 UID SEARCH Unseen\r\n".to_vec(),
1857 "Invalid search command"
1858 );
1859 assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect());
1860 }
1861
1862 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1863 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1864 async fn uid_search_unordered() {
1865 let response = b"* SEARCH 1 2 3 4 5\r\n\
1866 A0002 OK CAPABILITY completed\r\n\
1867 A0001 OK Search completed\r\n"
1868 .to_vec();
1869 let mock_stream = MockStream::new(response);
1870 let mut session = mock_session!(mock_stream);
1871 let ids = session.uid_search("Unseen").await.unwrap();
1872 let ids: HashSet<Uid> = ids.iter().cloned().collect();
1873 assert!(
1874 session.stream.inner.written_buf == b"A0001 UID SEARCH Unseen\r\n".to_vec(),
1875 "Invalid search command"
1876 );
1877 assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect());
1878 }
1879
1880 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1881 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1882 async fn capability() {
1883 let response = b"* CAPABILITY IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r\n\
1884 A0001 OK CAPABILITY completed\r\n"
1885 .to_vec();
1886 let expected_capabilities = vec!["IMAP4rev1", "STARTTLS", "AUTH=GSSAPI", "LOGINDISABLED"];
1887 let mock_stream = MockStream::new(response);
1888 let mut session = mock_session!(mock_stream);
1889 let capabilities = session.capabilities().await.unwrap();
1890 assert!(
1891 session.stream.inner.written_buf == b"A0001 CAPABILITY\r\n".to_vec(),
1892 "Invalid capability command"
1893 );
1894 assert_eq!(capabilities.len(), 4);
1895 for e in expected_capabilities {
1896 assert!(capabilities.has_str(e));
1897 }
1898 }
1899
1900 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1901 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1902 async fn create() {
1903 let response = b"A0001 OK CREATE completed\r\n".to_vec();
1904 let mailbox_name = "INBOX";
1905 let command = format!("A0001 CREATE {}\r\n", quote!(mailbox_name));
1906 let mock_stream = MockStream::new(response);
1907 let mut session = mock_session!(mock_stream);
1908 session.create(mailbox_name).await.unwrap();
1909 assert!(
1910 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1911 "Invalid create command"
1912 );
1913 }
1914
1915 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1916 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1917 async fn delete() {
1918 let response = b"A0001 OK DELETE completed\r\n".to_vec();
1919 let mailbox_name = "INBOX";
1920 let command = format!("A0001 DELETE {}\r\n", quote!(mailbox_name));
1921 let mock_stream = MockStream::new(response);
1922 let mut session = mock_session!(mock_stream);
1923 session.delete(mailbox_name).await.unwrap();
1924 assert!(
1925 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1926 "Invalid delete command"
1927 );
1928 }
1929
1930 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1931 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1932 async fn noop() {
1933 let response = b"A0001 OK NOOP completed\r\n".to_vec();
1934 let mock_stream = MockStream::new(response);
1935 let mut session = mock_session!(mock_stream);
1936 session.noop().await.unwrap();
1937 assert!(
1938 session.stream.inner.written_buf == b"A0001 NOOP\r\n".to_vec(),
1939 "Invalid noop command"
1940 );
1941 }
1942
1943 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1944 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1945 async fn close() {
1946 let response = b"A0001 OK CLOSE completed\r\n".to_vec();
1947 let mock_stream = MockStream::new(response);
1948 let mut session = mock_session!(mock_stream);
1949 session.close().await.unwrap();
1950 assert!(
1951 session.stream.inner.written_buf == b"A0001 CLOSE\r\n".to_vec(),
1952 "Invalid close command"
1953 );
1954 }
1955
1956 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1957 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1958 async fn store() {
1959 generic_store(" ", |c, set, query| async move {
1960 c.lock()
1961 .await
1962 .store(set, query)
1963 .await?
1964 .collect::<Vec<_>>()
1965 .await;
1966 Ok(())
1967 })
1968 .await;
1969 }
1970
1971 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1972 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1973 async fn uid_store() {
1974 generic_store(" UID ", |c, set, query| async move {
1975 c.lock()
1976 .await
1977 .uid_store(set, query)
1978 .await?
1979 .collect::<Vec<_>>()
1980 .await;
1981 Ok(())
1982 })
1983 .await;
1984 }
1985
1986 async fn generic_store<'a, F, T, K>(prefix: &'a str, op: F)
1987 where
1988 F: 'a + FnOnce(Arc<Mutex<Session<MockStream>>>, &'a str, &'a str) -> K,
1989 K: 'a + Future<Output = Result<T>>,
1990 {
1991 let res = "* 2 FETCH (FLAGS (\\Deleted \\Seen))\r\n\
1992 * 3 FETCH (FLAGS (\\Deleted))\r\n\
1993 * 4 FETCH (FLAGS (\\Deleted \\Flagged \\Seen))\r\n\
1994 A0001 OK STORE completed\r\n";
1995
1996 generic_with_uid(res, "STORE", "2.4", "+FLAGS (\\Deleted)", prefix, op).await;
1997 }
1998
1999 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2000 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2001 async fn copy() {
2002 generic_copy(" ", |c, set, query| async move {
2003 c.lock().await.copy(set, query).await?;
2004 Ok(())
2005 })
2006 .await;
2007 }
2008
2009 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2010 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2011 async fn uid_copy() {
2012 generic_copy(" UID ", |c, set, query| async move {
2013 c.lock().await.uid_copy(set, query).await?;
2014 Ok(())
2015 })
2016 .await;
2017 }
2018
2019 async fn generic_copy<'a, F, T, K>(prefix: &'a str, op: F)
2020 where
2021 F: 'a + FnOnce(Arc<Mutex<Session<MockStream>>>, &'a str, &'a str) -> K,
2022 K: 'a + Future<Output = Result<T>>,
2023 {
2024 generic_with_uid(
2025 "A0001 OK COPY completed\r\n",
2026 "COPY",
2027 "2:4",
2028 "MEETING",
2029 prefix,
2030 op,
2031 )
2032 .await;
2033 }
2034
2035 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2036 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2037 async fn mv() {
2038 let response = b"* OK [COPYUID 1511554416 142,399 41:42] Moved UIDs.\r\n\
2039 * 2 EXPUNGE\r\n\
2040 * 1 EXPUNGE\r\n\
2041 A0001 OK Move completed\r\n"
2042 .to_vec();
2043 let mailbox_name = "MEETING";
2044 let command = format!("A0001 MOVE 1:2 {}\r\n", quote!(mailbox_name));
2045 let mock_stream = MockStream::new(response);
2046 let mut session = mock_session!(mock_stream);
2047 session.mv("1:2", mailbox_name).await.unwrap();
2048 assert!(
2049 session.stream.inner.written_buf == command.as_bytes().to_vec(),
2050 "Invalid move command"
2051 );
2052 }
2053
2054 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2055 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2056 async fn uid_mv() {
2057 let response = b"* OK [COPYUID 1511554416 142,399 41:42] Moved UIDs.\r\n\
2058 * 2 EXPUNGE\r\n\
2059 * 1 EXPUNGE\r\n\
2060 A0001 OK Move completed\r\n"
2061 .to_vec();
2062 let mailbox_name = "MEETING";
2063 let command = format!("A0001 UID MOVE 41:42 {}\r\n", quote!(mailbox_name));
2064 let mock_stream = MockStream::new(response);
2065 let mut session = mock_session!(mock_stream);
2066 session.uid_mv("41:42", mailbox_name).await.unwrap();
2067 assert!(
2068 session.stream.inner.written_buf == command.as_bytes().to_vec(),
2069 "Invalid uid move command"
2070 );
2071 }
2072
2073 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2074 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2075 async fn fetch() {
2076 generic_fetch(" ", |c, seq, query| async move {
2077 c.lock()
2078 .await
2079 .fetch(seq, query)
2080 .await?
2081 .collect::<Vec<_>>()
2082 .await;
2083
2084 Ok(())
2085 })
2086 .await;
2087 }
2088
2089 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2090 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2091 async fn uid_fetch() {
2092 generic_fetch(" UID ", |c, seq, query| async move {
2093 c.lock()
2094 .await
2095 .uid_fetch(seq, query)
2096 .await?
2097 .collect::<Vec<_>>()
2098 .await;
2099 Ok(())
2100 })
2101 .await;
2102 }
2103
2104 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2105 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2106 async fn fetch_unexpected_eof() {
2107 let response = b"".to_vec();
2109
2110 let mock_stream = MockStream::new(response);
2111 let mut session = mock_session!(mock_stream);
2112
2113 {
2114 let mut fetch_result = session
2115 .uid_fetch("1:*", "(FLAGS BODY.PEEK[])")
2116 .await
2117 .unwrap();
2118
2119 let err = fetch_result.next().await.unwrap().unwrap_err();
2121 let Error::Io(io_err) = err else {
2122 panic!("Unexpected error type: {err}")
2123 };
2124 assert_eq!(io_err.kind(), io::ErrorKind::UnexpectedEof);
2125 }
2126
2127 assert_eq!(
2128 session.stream.inner.written_buf,
2129 b"A0001 UID FETCH 1:* (FLAGS BODY.PEEK[])\r\n".to_vec()
2130 );
2131 }
2132
2133 async fn generic_fetch<'a, F, T, K>(prefix: &'a str, op: F)
2134 where
2135 F: 'a + FnOnce(Arc<Mutex<Session<MockStream>>>, &'a str, &'a str) -> K,
2136 K: 'a + Future<Output = Result<T>>,
2137 {
2138 generic_with_uid(
2139 "A0001 OK FETCH completed\r\n",
2140 "FETCH",
2141 "1",
2142 "BODY[]",
2143 prefix,
2144 op,
2145 )
2146 .await;
2147 }
2148
2149 async fn generic_with_uid<'a, F, T, K>(
2150 res: &'a str,
2151 cmd: &'a str,
2152 seq: &'a str,
2153 query: &'a str,
2154 prefix: &'a str,
2155 op: F,
2156 ) where
2157 F: 'a + FnOnce(Arc<Mutex<Session<MockStream>>>, &'a str, &'a str) -> K,
2158 K: 'a + Future<Output = Result<T>>,
2159 {
2160 let resp = res.as_bytes().to_vec();
2161 let line = format!("A0001{}{} {} {}\r\n", prefix, cmd, seq, query);
2162 let session = Arc::new(Mutex::new(mock_session!(MockStream::new(resp))));
2163
2164 {
2165 let _ = op(session.clone(), seq, query).await.unwrap();
2166 }
2167 assert!(
2168 session.lock().await.stream.inner.written_buf == line.as_bytes().to_vec(),
2169 "Invalid command"
2170 );
2171 }
2172
2173 #[test]
2174 fn quote_backslash() {
2175 assert_eq!("\"test\\\\text\"", quote!(r"test\text"));
2176 }
2177
2178 #[test]
2179 fn quote_dquote() {
2180 assert_eq!("\"test\\\"text\"", quote!("test\"text"));
2181 }
2182
2183 #[test]
2184 fn validate_random() {
2185 assert_eq!(
2186 "\"~iCQ_k;>[&\\\"sVCvUW`e<<P!wJ\"",
2187 &validate_str("~iCQ_k;>[&\"sVCvUW`e<<P!wJ").unwrap()
2188 );
2189 }
2190
2191 #[test]
2192 fn validate_newline() {
2193 if let Err(ref e) = validate_str("test\nstring") {
2194 if let Error::Validate(ref ve) = e {
2195 if ve.0 == '\n' {
2196 return;
2197 }
2198 }
2199 panic!("Wrong error: {:?}", e);
2200 }
2201 panic!("No error");
2202 }
2203
2204 #[test]
2205 #[allow(unreachable_patterns)]
2206 fn validate_carriage_return() {
2207 if let Err(ref e) = validate_str("test\rstring") {
2208 if let Error::Validate(ref ve) = e {
2209 if ve.0 == '\r' {
2210 return;
2211 }
2212 }
2213 panic!("Wrong error: {:?}", e);
2214 }
2215 panic!("No error");
2216 }
2217
2218 #[cfg(feature = "runtime-tokio")]
2222 async fn handle_client(stream: tokio::io::DuplexStream) -> Result<()> {
2223 use tokio::io::AsyncBufReadExt;
2224
2225 let (reader, mut writer) = tokio::io::split(stream);
2226 let reader = tokio::io::BufReader::new(reader);
2227
2228 let mut lines = reader.lines();
2229 while let Some(line) = lines.next_line().await? {
2230 let (request_id, request) = line.split_once(' ').unwrap();
2231 eprintln!("Received request {request_id}.");
2232
2233 let (id, _) = request
2234 .strip_prefix("FETCH ")
2235 .unwrap()
2236 .split_once(' ')
2237 .unwrap();
2238 let id = id.parse().unwrap();
2239
2240 let mut body = concat!(
2241 "From: Bob <bob@example.com>\r\n",
2242 "To: Alice <alice@example.org>\r\n",
2243 "Subject: Test\r\n",
2244 "Message-Id: <foobar@example.com>\r\n",
2245 "Date: Sun, 22 Mar 2020 00:00:00 +0100\r\n",
2246 "\r\n",
2247 )
2248 .to_string();
2249 for _ in 1..id {
2250 body +=
2251 "012345678901234567890123456789012345678901234567890123456789012345678901\r\n";
2252 }
2253 let body_len = body.len();
2254
2255 let response = format!("* {id} FETCH (RFC822.SIZE {body_len} BODY[] {{{body_len}}}\r\n{body} FLAGS (\\Seen))\r\n");
2256 writer.write_all(response.as_bytes()).await?;
2257 writer
2258 .write_all(format!("{request_id} OK FETCH completed\r\n").as_bytes())
2259 .await?;
2260 writer.flush().await?;
2261 }
2262
2263 Ok(())
2264 }
2265
2266 #[cfg(feature = "runtime-tokio")]
2273 #[cfg_attr(
2274 feature = "runtime-tokio",
2275 tokio::test(flavor = "multi_thread", worker_threads = 2)
2276 )]
2277 async fn large_fetch() -> Result<()> {
2278 use futures::TryStreamExt;
2279
2280 let (client, server) = tokio::io::duplex(4096);
2281 tokio::spawn(handle_client(server));
2282
2283 let client = crate::Client::new(client);
2284 let mut imap_session = Session::new(client.conn);
2285
2286 for i in 200..300 {
2287 eprintln!("Fetching {i}.");
2288 let mut messages_stream = imap_session
2289 .fetch(format!("{i}"), "(RFC822.SIZE BODY.PEEK[] FLAGS)")
2290 .await?;
2291 let fetch = messages_stream
2292 .try_next()
2293 .await?
2294 .expect("no FETCH returned");
2295 let body = fetch.body().expect("message did not have a body!");
2296 assert_eq!(body.len(), 76 + 74 * i);
2297
2298 let no_fetch = messages_stream.try_next().await?;
2299 assert!(no_fetch.is_none());
2300 drop(messages_stream);
2301 }
2302
2303 Ok(())
2304 }
2305
2306 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2307 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2308 async fn status() {
2309 {
2310 let response = b"* STATUS INBOX (UIDNEXT 25)\r\n\
2311 A0001 OK [CLIENTBUG] Status on selected mailbox completed (0.001 + 0.000 secs).\r\n"
2312 .to_vec();
2313
2314 let mock_stream = MockStream::new(response);
2315 let mut session = mock_session!(mock_stream);
2316 let status = session.status("INBOX", "(UIDNEXT)").await.unwrap();
2317 assert_eq!(
2318 session.stream.inner.written_buf,
2319 b"A0001 STATUS \"INBOX\" (UIDNEXT)\r\n".to_vec()
2320 );
2321 assert_eq!(status.uid_next, Some(25));
2322 }
2323
2324 {
2325 let response = b"* STATUS INBOX (RECENT 15)\r\n\
2326 A0001 OK STATUS completed\r\n"
2327 .to_vec();
2328
2329 let mock_stream = MockStream::new(response);
2330 let mut session = mock_session!(mock_stream);
2331 let status = session.status("INBOX", "(RECENT)").await.unwrap();
2332 assert_eq!(
2333 session.stream.inner.written_buf,
2334 b"A0001 STATUS \"INBOX\" (RECENT)\r\n".to_vec()
2335 );
2336 assert_eq!(status.recent, 15);
2337 }
2338
2339 {
2340 let response = b"* STATUS blurdybloop (MESSAGES 231 UIDNEXT 44292)\r\n\
2342 A0001 OK STATUS completed\r\n"
2343 .to_vec();
2344
2345 let mock_stream = MockStream::new(response);
2346 let mut session = mock_session!(mock_stream);
2347 let status = session
2348 .status("blurdybloop", "(UIDNEXT MESSAGES)")
2349 .await
2350 .unwrap();
2351 assert_eq!(
2352 session.stream.inner.written_buf,
2353 b"A0001 STATUS \"blurdybloop\" (UIDNEXT MESSAGES)\r\n".to_vec()
2354 );
2355 assert_eq!(status.uid_next, Some(44292));
2356 assert_eq!(status.exists, 231);
2357 }
2358 }
2359
2360 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2361 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2362 async fn append() {
2363 {
2364 let response = b"+ OK\r\nA0001 OK [APPENDUID 1725735035 2] Append completed (0.052 + 12.097 + 0.049 secs).\r\n".to_vec();
2368
2369 let mock_stream = MockStream::new(response);
2370 let mut session = mock_session!(mock_stream);
2371 session
2372 .append("INBOX", Some(r"(\Seen)"), None, "foobarbaz")
2373 .await
2374 .unwrap();
2375 assert_eq!(
2376 session.stream.inner.written_buf,
2377 b"A0001 APPEND \"INBOX\" (\\Seen) {9}\r\nfoobarbaz\r\n".to_vec()
2378 );
2379 }
2380
2381 {
2382 let response = b"+ OK\r\n* 3 EXISTS\r\n* 2 RECENT\r\nA0001 OK [APPENDUID 1725735035 2] Append completed (0.052 + 12.097 + 0.049 secs).\r\n".to_vec();
2386
2387 let mock_stream = MockStream::new(response);
2388 let mut session = mock_session!(mock_stream);
2389 session
2390 .append("INBOX", Some(r"(\Seen)"), None, "foobarbaz")
2391 .await
2392 .unwrap();
2393 assert_eq!(
2394 session.stream.inner.written_buf,
2395 b"A0001 APPEND \"INBOX\" (\\Seen) {9}\r\nfoobarbaz\r\n".to_vec()
2396 );
2397 let exists_response = session.unsolicited_responses.recv().await.unwrap();
2398 assert_eq!(exists_response, UnsolicitedResponse::Exists(3));
2399 let recent_response = session.unsolicited_responses.recv().await.unwrap();
2400 assert_eq!(recent_response, UnsolicitedResponse::Recent(2));
2401 }
2402
2403 {
2404 let response =
2406 b"A0001 NO [TRYCREATE] Mailbox doesn't exist: foobar (0.001 + 0.000 secs)."
2407 .to_vec();
2408 let mock_stream = MockStream::new(response);
2409 let mut session = mock_session!(mock_stream);
2410 session
2411 .append("foobar", None, None, "foobarbaz")
2412 .await
2413 .unwrap_err();
2414 assert_eq!(
2415 session.stream.inner.written_buf,
2416 b"A0001 APPEND \"foobar\" {9}\r\n".to_vec()
2417 );
2418 }
2419 }
2420
2421 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2422 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2423 async fn get_metadata() {
2424 {
2425 let response = b"* METADATA \"INBOX\" (/private/comment \"My own comment\")\r\n\
2426 A0001 OK GETMETADATA complete\r\n"
2427 .to_vec();
2428
2429 let mock_stream = MockStream::new(response);
2430 let mut session = mock_session!(mock_stream);
2431 let metadata = session
2432 .get_metadata("INBOX", "", "/private/comment")
2433 .await
2434 .unwrap();
2435 assert_eq!(
2436 session.stream.inner.written_buf,
2437 b"A0001 GETMETADATA \"INBOX\" /private/comment\r\n".to_vec()
2438 );
2439 assert_eq!(metadata.len(), 1);
2440 assert_eq!(metadata[0].entry, "/private/comment");
2441 assert_eq!(metadata[0].value.as_ref().unwrap(), "My own comment");
2442 }
2443
2444 {
2445 let response = b"* METADATA \"INBOX\" (/shared/comment \"Shared comment\" /private/comment \"My own comment\")\r\n\
2446 A0001 OK GETMETADATA complete\r\n"
2447 .to_vec();
2448
2449 let mock_stream = MockStream::new(response);
2450 let mut session = mock_session!(mock_stream);
2451 let metadata = session
2452 .get_metadata("INBOX", "", "(/shared/comment /private/comment)")
2453 .await
2454 .unwrap();
2455 assert_eq!(
2456 session.stream.inner.written_buf,
2457 b"A0001 GETMETADATA \"INBOX\" (/shared/comment /private/comment)\r\n".to_vec()
2458 );
2459 assert_eq!(metadata.len(), 2);
2460 assert_eq!(metadata[0].entry, "/shared/comment");
2461 assert_eq!(metadata[0].value.as_ref().unwrap(), "Shared comment");
2462 assert_eq!(metadata[1].entry, "/private/comment");
2463 assert_eq!(metadata[1].value.as_ref().unwrap(), "My own comment");
2464 }
2465
2466 {
2467 let response = b"* METADATA \"\" (/shared/comment {15}\r\nChatmail server /shared/admin {28}\r\nmailto:root@nine.testrun.org)\r\n\
2468 A0001 OK OK Getmetadata completed (0.001 + 0.000 secs).\r\n"
2469 .to_vec();
2470
2471 let mock_stream = MockStream::new(response);
2472 let mut session = mock_session!(mock_stream);
2473 let metadata = session
2474 .get_metadata("", "", "(/shared/comment /shared/admin)")
2475 .await
2476 .unwrap();
2477 assert_eq!(
2478 session.stream.inner.written_buf,
2479 b"A0001 GETMETADATA \"\" (/shared/comment /shared/admin)\r\n".to_vec()
2480 );
2481 assert_eq!(metadata.len(), 2);
2482 assert_eq!(metadata[0].entry, "/shared/comment");
2483 assert_eq!(metadata[0].value.as_ref().unwrap(), "Chatmail server");
2484 assert_eq!(metadata[1].entry, "/shared/admin");
2485 assert_eq!(
2486 metadata[1].value.as_ref().unwrap(),
2487 "mailto:root@nine.testrun.org"
2488 );
2489 }
2490
2491 {
2492 let response = b"* METADATA \"\" (/shared/comment \"Chatmail server\")\r\n\
2493 * METADATA \"\" (/shared/admin \"mailto:root@nine.testrun.org\")\r\n\
2494 A0001 OK OK Getmetadata completed (0.001 + 0.000 secs).\r\n"
2495 .to_vec();
2496
2497 let mock_stream = MockStream::new(response);
2498 let mut session = mock_session!(mock_stream);
2499 let metadata = session
2500 .get_metadata("", "", "(/shared/comment /shared/admin)")
2501 .await
2502 .unwrap();
2503 assert_eq!(
2504 session.stream.inner.written_buf,
2505 b"A0001 GETMETADATA \"\" (/shared/comment /shared/admin)\r\n".to_vec()
2506 );
2507 assert_eq!(metadata.len(), 2);
2508 assert_eq!(metadata[0].entry, "/shared/comment");
2509 assert_eq!(metadata[0].value.as_ref().unwrap(), "Chatmail server");
2510 assert_eq!(metadata[1].entry, "/shared/admin");
2511 assert_eq!(
2512 metadata[1].value.as_ref().unwrap(),
2513 "mailto:root@nine.testrun.org"
2514 );
2515 }
2516
2517 {
2518 let response = b"* METADATA \"\" (/shared/comment NIL /shared/admin NIL)\r\n\
2519 A0001 OK OK Getmetadata completed (0.001 + 0.000 secs).\r\n"
2520 .to_vec();
2521
2522 let mock_stream = MockStream::new(response);
2523 let mut session = mock_session!(mock_stream);
2524 let metadata = session
2525 .get_metadata("", "", "(/shared/comment /shared/admin)")
2526 .await
2527 .unwrap();
2528 assert_eq!(
2529 session.stream.inner.written_buf,
2530 b"A0001 GETMETADATA \"\" (/shared/comment /shared/admin)\r\n".to_vec()
2531 );
2532 assert_eq!(metadata.len(), 2);
2533 assert_eq!(metadata[0].entry, "/shared/comment");
2534 assert_eq!(metadata[0].value, None);
2535 assert_eq!(metadata[1].entry, "/shared/admin");
2536 assert_eq!(metadata[1].value, None);
2537 }
2538 }
2539
2540 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2541 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2542 async fn test_get_quota_root() {
2543 {
2544 let response = b"* QUOTAROOT Sent Userquota\r\n\
2545 * QUOTA Userquota (STORAGE 4855 48576)\r\n\
2546 A0001 OK Getquotaroot completed (0.004 + 0.000 + 0.004 secs).\r\n"
2547 .to_vec();
2548
2549 let mock_stream = MockStream::new(response);
2550 let mut session = mock_session!(mock_stream);
2551 let (quotaroots, quota) = dbg!(session.get_quota_root("Sent").await.unwrap());
2552 assert_eq!(
2553 str::from_utf8(&session.stream.inner.written_buf).unwrap(),
2554 "A0001 GETQUOTAROOT \"Sent\"\r\n"
2555 );
2556 assert_eq!(
2557 quotaroots,
2558 vec![QuotaRoot {
2559 mailbox_name: "Sent".to_string(),
2560 quota_root_names: vec!["Userquota".to_string(),],
2561 },],
2562 );
2563 assert_eq!(
2564 quota,
2565 vec![Quota {
2566 root_name: "Userquota".to_string(),
2567 resources: vec![QuotaResource {
2568 name: QuotaResourceName::Storage,
2569 usage: 4855,
2570 limit: 48576,
2571 }],
2572 }]
2573 );
2574 assert_eq!(quota[0].resources[0].get_usage_percentage(), 9);
2575 }
2576
2577 {
2578 let response = b"* QUOTAROOT \"INBOX\" \"#19\"\r\n\
2579 * QUOTA \"#19\" (STORAGE 0 0)\r\n\
2580 A0001 OK GETQUOTAROOT successful.\r\n"
2581 .to_vec();
2582
2583 let mock_stream = MockStream::new(response);
2584 let mut session = mock_session!(mock_stream);
2585 let (quotaroots, quota) = session.get_quota_root("INBOX").await.unwrap();
2586 assert_eq!(
2587 str::from_utf8(&session.stream.inner.written_buf).unwrap(),
2588 "A0001 GETQUOTAROOT \"INBOX\"\r\n"
2589 );
2590 assert_eq!(
2591 quotaroots,
2592 vec![QuotaRoot {
2593 mailbox_name: "INBOX".to_string(),
2594 quota_root_names: vec!["#19".to_string(),],
2595 },],
2596 );
2597 assert_eq!(
2598 quota,
2599 vec![Quota {
2600 root_name: "#19".to_string(),
2601 resources: vec![QuotaResource {
2602 name: QuotaResourceName::Storage,
2603 usage: 0,
2604 limit: 0,
2605 }],
2606 }]
2607 );
2608 assert_eq!(quota[0].resources[0].get_usage_percentage(), 0);
2609 }
2610 }
2611
2612 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2613 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2614 async fn test_parsing_error() {
2615 let response = b"220 mail.example.org ESMTP Postcow\r\n".to_vec();
2617 let command = "A0001 NOOP\r\n";
2618 let mock_stream = MockStream::new(response);
2619 let mut session = mock_session!(mock_stream);
2620 assert!(session
2621 .noop()
2622 .await
2623 .unwrap_err()
2624 .to_string()
2625 .contains("220 mail.example.org ESMTP Postcow"));
2626 assert!(
2627 session.stream.inner.written_buf == command.as_bytes().to_vec(),
2628 "Invalid NOOP command"
2629 );
2630 }
2631}