1use std::collections::{HashMap, HashSet};
2use std::fmt;
3use std::ops::{Deref, DerefMut};
4use std::pin::Pin;
5use std::str;
6
7use async_channel::{self as channel, bounded};
8#[cfg(feature = "runtime-async-std")]
9use async_std::io::{Read, Write, WriteExt};
10use base64::Engine as _;
11use extensions::id::{format_identification, parse_id};
12use extensions::quota::parse_get_quota_root;
13use futures::{io, Stream, TryStreamExt};
14use imap_proto::{Metadata, RequestId, Response};
15#[cfg(feature = "runtime-tokio")]
16use tokio::io::{AsyncRead as Read, AsyncWrite as Write, AsyncWriteExt};
17
18use super::authenticator::Authenticator;
19use super::error::{Error, ParseError, Result, ValidateError};
20use super::parse::*;
21use super::types::*;
22use crate::extensions::{self, quota::parse_get_quota};
23use crate::imap_stream::ImapStream;
24
25macro_rules! quote {
26 ($x:expr) => {
27 format!("\"{}\"", $x.replace(r"\", r"\\").replace("\"", "\\\""))
28 };
29}
30
31#[derive(Debug)]
41pub struct Session<T: Read + Write + Unpin + fmt::Debug> {
42 pub(crate) conn: Connection<T>,
43 pub(crate) unsolicited_responses_tx: channel::Sender<UnsolicitedResponse>,
44
45 pub unsolicited_responses: channel::Receiver<UnsolicitedResponse>,
48}
49
50impl<T: Read + Write + Unpin + fmt::Debug> Unpin for Session<T> {}
51impl<T: Read + Write + Unpin + fmt::Debug> Unpin for Client<T> {}
52impl<T: Read + Write + Unpin + fmt::Debug> Unpin for Connection<T> {}
53
54impl<T: Read + Write + Unpin + fmt::Debug> AsMut<T> for Session<T> {
57 fn as_mut(&mut self) -> &mut T {
58 self.conn.stream.as_mut()
59 }
60}
61
62#[derive(Debug)]
70pub struct Client<T: Read + Write + Unpin + fmt::Debug> {
71 conn: Connection<T>,
72}
73
74#[derive(Debug)]
77pub struct Connection<T: Read + Write + Unpin + fmt::Debug> {
78 pub(crate) stream: ImapStream<T>,
79
80 pub(crate) request_ids: IdGenerator,
82}
83
84impl<T: Read + Write + Unpin + fmt::Debug> Deref for Client<T> {
87 type Target = Connection<T>;
88
89 fn deref(&self) -> &Connection<T> {
90 &self.conn
91 }
92}
93
94impl<T: Read + Write + Unpin + fmt::Debug> DerefMut for Client<T> {
95 fn deref_mut(&mut self) -> &mut Connection<T> {
96 &mut self.conn
97 }
98}
99
100impl<T: Read + Write + Unpin + fmt::Debug> Deref for Session<T> {
101 type Target = Connection<T>;
102
103 fn deref(&self) -> &Connection<T> {
104 &self.conn
105 }
106}
107
108impl<T: Read + Write + Unpin + fmt::Debug> DerefMut for Session<T> {
109 fn deref_mut(&mut self) -> &mut Connection<T> {
110 &mut self.conn
111 }
112}
113
114macro_rules! ok_or_unauth_client_err {
122 ($r:expr, $self:expr) => {
123 match $r {
124 Ok(o) => o,
125 Err(e) => return Err((e.into(), $self)),
126 }
127 };
128}
129
130impl<T: Read + Write + Unpin + fmt::Debug + Send> Client<T> {
131 pub fn new(stream: T) -> Client<T> {
136 let stream = ImapStream::new(stream);
137
138 Client {
139 conn: Connection {
140 stream,
141 request_ids: IdGenerator::new(),
142 },
143 }
144 }
145
146 pub fn into_inner(self) -> T {
148 let Self { conn, .. } = self;
149 conn.into_inner()
150 }
151
152 pub async fn login<U: AsRef<str>, P: AsRef<str>>(
184 mut self,
185 username: U,
186 password: P,
187 ) -> ::std::result::Result<Session<T>, (Error, Client<T>)> {
188 let u = ok_or_unauth_client_err!(validate_str(username.as_ref()), self);
189 let p = ok_or_unauth_client_err!(validate_str(password.as_ref()), self);
190 ok_or_unauth_client_err!(
191 self.run_command_and_check_ok(&format!("LOGIN {u} {p}"), None)
192 .await,
193 self
194 );
195
196 Ok(Session::new(self.conn))
197 }
198
199 pub async fn authenticate<A: Authenticator, S: AsRef<str>>(
243 mut self,
244 auth_type: S,
245 authenticator: A,
246 ) -> ::std::result::Result<Session<T>, (Error, Client<T>)> {
247 let id = ok_or_unauth_client_err!(
248 self.run_command(&format!("AUTHENTICATE {}", auth_type.as_ref()))
249 .await,
250 self
251 );
252 let session = self.do_auth_handshake(id, authenticator).await?;
253 Ok(session)
254 }
255
256 async fn do_auth_handshake<A: Authenticator>(
258 mut self,
259 id: RequestId,
260 mut authenticator: A,
261 ) -> ::std::result::Result<Session<T>, (Error, Client<T>)> {
262 loop {
265 let Some(res) = ok_or_unauth_client_err!(self.read_response().await, self) else {
266 return Err((Error::ConnectionLost, self));
267 };
268 match res.parsed() {
269 Response::Continue { information, .. } => {
270 let challenge = if let Some(text) = information {
271 ok_or_unauth_client_err!(
272 base64::engine::general_purpose::STANDARD
273 .decode(text.as_ref())
274 .map_err(|e| Error::Parse(ParseError::Authentication(
275 (*text).to_string(),
276 Some(e)
277 ))),
278 self
279 )
280 } else {
281 Vec::new()
282 };
283 let raw_response = &mut authenticator.process(&challenge);
284 let auth_response =
285 base64::engine::general_purpose::STANDARD.encode(raw_response);
286
287 ok_or_unauth_client_err!(
288 self.conn.run_command_untagged(&auth_response).await,
289 self
290 );
291 }
292 _ => {
293 ok_or_unauth_client_err!(self.check_done_ok_from(&id, None, res).await, self);
294 return Ok(Session::new(self.conn));
295 }
296 }
297 }
298 }
299}
300
301impl<T: Read + Write + Unpin + fmt::Debug + Send> Session<T> {
302 unsafe_pinned!(conn: Connection<T>);
303
304 pub(crate) fn get_stream(self: Pin<&mut Self>) -> Pin<&mut ImapStream<T>> {
305 self.conn().stream()
306 }
307
308 fn new(conn: Connection<T>) -> Self {
310 let (tx, rx) = bounded(100);
311 Session {
312 conn,
313 unsolicited_responses: rx,
314 unsolicited_responses_tx: tx,
315 }
316 }
317
318 pub async fn select<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<Mailbox> {
336 let id = self
338 .run_command(&format!("SELECT {}", validate_str(mailbox_name.as_ref())?))
339 .await?;
340 let mbox = parse_mailbox(
341 &mut self.conn.stream,
342 self.unsolicited_responses_tx.clone(),
343 id,
344 )
345 .await?;
346
347 Ok(mbox)
348 }
349
350 pub async fn select_condstore<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<Mailbox> {
353 let id = self
354 .run_command(&format!(
355 "SELECT {} (CONDSTORE)",
356 validate_str(mailbox_name.as_ref())?
357 ))
358 .await?;
359 let mbox = parse_mailbox(
360 &mut self.conn.stream,
361 self.unsolicited_responses_tx.clone(),
362 id,
363 )
364 .await?;
365
366 Ok(mbox)
367 }
368
369 pub async fn examine<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<Mailbox> {
374 let id = self
375 .run_command(&format!("EXAMINE {}", validate_str(mailbox_name.as_ref())?))
376 .await?;
377 let mbox = parse_mailbox(
378 &mut self.conn.stream,
379 self.unsolicited_responses_tx.clone(),
380 id,
381 )
382 .await?;
383
384 Ok(mbox)
385 }
386
387 pub async fn fetch<S1, S2>(
446 &mut self,
447 sequence_set: S1,
448 query: S2,
449 ) -> Result<impl Stream<Item = Result<Fetch>> + '_ + Send>
450 where
451 S1: AsRef<str>,
452 S2: AsRef<str>,
453 {
454 let id = self
455 .run_command(&format!(
456 "FETCH {} {}",
457 sequence_set.as_ref(),
458 query.as_ref()
459 ))
460 .await?;
461 let res = parse_fetches(
462 &mut self.conn.stream,
463 self.unsolicited_responses_tx.clone(),
464 id,
465 );
466
467 Ok(res)
468 }
469
470 pub async fn uid_fetch<S1, S2>(
473 &mut self,
474 uid_set: S1,
475 query: S2,
476 ) -> Result<impl Stream<Item = Result<Fetch>> + '_ + Send + Unpin>
477 where
478 S1: AsRef<str>,
479 S2: AsRef<str>,
480 {
481 let id = self
482 .run_command(&format!(
483 "UID FETCH {} {}",
484 uid_set.as_ref(),
485 query.as_ref()
486 ))
487 .await?;
488 let res = parse_fetches(
489 &mut self.conn.stream,
490 self.unsolicited_responses_tx.clone(),
491 id,
492 );
493 Ok(res)
494 }
495
496 pub async fn noop(&mut self) -> Result<()> {
498 let id = self.run_command("NOOP").await?;
499 parse_noop(
500 &mut self.conn.stream,
501 self.unsolicited_responses_tx.clone(),
502 id,
503 )
504 .await?;
505 Ok(())
506 }
507
508 pub async fn logout(&mut self) -> Result<()> {
510 self.run_command_and_check_ok("LOGOUT").await?;
511 Ok(())
512 }
513
514 pub async fn create<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<()> {
537 self.run_command_and_check_ok(&format!("CREATE {}", validate_str(mailbox_name.as_ref())?))
538 .await?;
539
540 Ok(())
541 }
542
543 pub async fn delete<S: AsRef<str>>(&mut self, mailbox_name: S) -> Result<()> {
563 self.run_command_and_check_ok(&format!("DELETE {}", validate_str(mailbox_name.as_ref())?))
564 .await?;
565
566 Ok(())
567 }
568
569 pub async fn rename<S1: AsRef<str>, S2: AsRef<str>>(&mut self, from: S1, to: S2) -> Result<()> {
595 self.run_command_and_check_ok(&format!(
596 "RENAME {} {}",
597 quote!(from.as_ref()),
598 quote!(to.as_ref())
599 ))
600 .await?;
601
602 Ok(())
603 }
604
605 pub async fn subscribe<S: AsRef<str>>(&mut self, mailbox: S) -> Result<()> {
614 self.run_command_and_check_ok(&format!("SUBSCRIBE {}", quote!(mailbox.as_ref())))
615 .await?;
616 Ok(())
617 }
618
619 pub async fn unsubscribe<S: AsRef<str>>(&mut self, mailbox: S) -> Result<()> {
624 self.run_command_and_check_ok(&format!("UNSUBSCRIBE {}", quote!(mailbox.as_ref())))
625 .await?;
626 Ok(())
627 }
628
629 pub async fn capabilities(&mut self) -> Result<Capabilities> {
633 let id = self.run_command("CAPABILITY").await?;
634 let c = parse_capabilities(
635 &mut self.conn.stream,
636 self.unsolicited_responses_tx.clone(),
637 id,
638 )
639 .await?;
640 Ok(c)
641 }
642
643 pub async fn expunge(&mut self) -> Result<impl Stream<Item = Result<Seq>> + '_ + Send> {
647 let id = self.run_command("EXPUNGE").await?;
648 let res = parse_expunge(
649 &mut self.conn.stream,
650 self.unsolicited_responses_tx.clone(),
651 id,
652 );
653 Ok(res)
654 }
655
656 pub async fn uid_expunge<S: AsRef<str>>(
679 &mut self,
680 uid_set: S,
681 ) -> Result<impl Stream<Item = Result<Uid>> + '_ + Send> {
682 let id = self
683 .run_command(&format!("UID EXPUNGE {}", uid_set.as_ref()))
684 .await?;
685 let res = parse_expunge(
686 &mut self.conn.stream,
687 self.unsolicited_responses_tx.clone(),
688 id,
689 );
690 Ok(res)
691 }
692
693 pub async fn check(&mut self) -> Result<()> {
704 self.run_command_and_check_ok("CHECK").await?;
705 Ok(())
706 }
707
708 pub async fn close(&mut self) -> Result<()> {
724 self.run_command_and_check_ok("CLOSE").await?;
725 Ok(())
726 }
727
728 pub async fn store<S1, S2>(
778 &mut self,
779 sequence_set: S1,
780 query: S2,
781 ) -> Result<impl Stream<Item = Result<Fetch>> + '_ + Send>
782 where
783 S1: AsRef<str>,
784 S2: AsRef<str>,
785 {
786 let id = self
787 .run_command(&format!(
788 "STORE {} {}",
789 sequence_set.as_ref(),
790 query.as_ref()
791 ))
792 .await?;
793 let res = parse_fetches(
794 &mut self.conn.stream,
795 self.unsolicited_responses_tx.clone(),
796 id,
797 );
798 Ok(res)
799 }
800
801 pub async fn uid_store<S1, S2>(
804 &mut self,
805 uid_set: S1,
806 query: S2,
807 ) -> Result<impl Stream<Item = Result<Fetch>> + '_ + Send>
808 where
809 S1: AsRef<str>,
810 S2: AsRef<str>,
811 {
812 let id = self
813 .run_command(&format!(
814 "UID STORE {} {}",
815 uid_set.as_ref(),
816 query.as_ref()
817 ))
818 .await?;
819 let res = parse_fetches(
820 &mut self.conn.stream,
821 self.unsolicited_responses_tx.clone(),
822 id,
823 );
824 Ok(res)
825 }
826
827 pub async fn copy<S1: AsRef<str>, S2: AsRef<str>>(
835 &mut self,
836 sequence_set: S1,
837 mailbox_name: S2,
838 ) -> Result<()> {
839 self.run_command_and_check_ok(&format!(
840 "COPY {} {}",
841 sequence_set.as_ref(),
842 mailbox_name.as_ref()
843 ))
844 .await?;
845
846 Ok(())
847 }
848
849 pub async fn uid_copy<S1: AsRef<str>, S2: AsRef<str>>(
852 &mut self,
853 uid_set: S1,
854 mailbox_name: S2,
855 ) -> Result<()> {
856 self.run_command_and_check_ok(&format!(
857 "UID COPY {} {}",
858 uid_set.as_ref(),
859 mailbox_name.as_ref()
860 ))
861 .await?;
862
863 Ok(())
864 }
865
866 pub async fn mv<S1: AsRef<str>, S2: AsRef<str>>(
897 &mut self,
898 sequence_set: S1,
899 mailbox_name: S2,
900 ) -> Result<()> {
901 self.run_command_and_check_ok(&format!(
902 "MOVE {} {}",
903 sequence_set.as_ref(),
904 validate_str(mailbox_name.as_ref())?
905 ))
906 .await?;
907
908 Ok(())
909 }
910
911 pub async fn uid_mv<S1: AsRef<str>, S2: AsRef<str>>(
916 &mut self,
917 uid_set: S1,
918 mailbox_name: S2,
919 ) -> Result<()> {
920 self.run_command_and_check_ok(&format!(
921 "UID MOVE {} {}",
922 uid_set.as_ref(),
923 validate_str(mailbox_name.as_ref())?
924 ))
925 .await?;
926
927 Ok(())
928 }
929
930 pub async fn list(
962 &mut self,
963 reference_name: Option<&str>,
964 mailbox_pattern: Option<&str>,
965 ) -> Result<impl Stream<Item = Result<Name>> + '_ + Send> {
966 let id = self
967 .run_command(&format!(
968 "LIST {} {}",
969 quote!(reference_name.unwrap_or("")),
970 mailbox_pattern.unwrap_or("\"\"")
971 ))
972 .await?;
973 let names = parse_names(
974 &mut self.conn.stream,
975 self.unsolicited_responses_tx.clone(),
976 id,
977 );
978
979 Ok(names)
980 }
981
982 pub async fn lsub(
998 &mut self,
999 reference_name: Option<&str>,
1000 mailbox_pattern: Option<&str>,
1001 ) -> Result<impl Stream<Item = Result<Name>> + '_ + Send> {
1002 let id = self
1003 .run_command(&format!(
1004 "LSUB {} {}",
1005 quote!(reference_name.unwrap_or("")),
1006 mailbox_pattern.unwrap_or("")
1007 ))
1008 .await?;
1009 let names = parse_names(
1010 &mut self.conn.stream,
1011 self.unsolicited_responses_tx.clone(),
1012 id,
1013 );
1014
1015 Ok(names)
1016 }
1017
1018 pub async fn status<S1: AsRef<str>, S2: AsRef<str>>(
1053 &mut self,
1054 mailbox_name: S1,
1055 data_items: S2,
1056 ) -> Result<Mailbox> {
1057 let id = self
1058 .run_command(&format!(
1059 "STATUS {} {}",
1060 validate_str(mailbox_name.as_ref())?,
1061 data_items.as_ref()
1062 ))
1063 .await?;
1064 let mbox = parse_status(
1065 &mut self.conn.stream,
1066 mailbox_name.as_ref(),
1067 self.unsolicited_responses_tx.clone(),
1068 id,
1069 )
1070 .await?;
1071 Ok(mbox)
1072 }
1073
1074 pub fn idle(self) -> extensions::idle::Handle<T> {
1093 extensions::idle::Handle::new(self)
1094 }
1095
1096 pub async fn append(
1116 &mut self,
1117 mailbox: impl AsRef<str>,
1118 flags: Option<&str>,
1119 internaldate: Option<&str>,
1120 content: impl AsRef<[u8]>,
1121 ) -> Result<()> {
1122 let content = content.as_ref();
1123 let id = self
1124 .run_command(&format!(
1125 "APPEND \"{}\"{}{}{}{} {{{}}}",
1126 mailbox.as_ref(),
1127 if flags.is_some() { " " } else { "" },
1128 flags.unwrap_or(""),
1129 if internaldate.is_some() { " " } else { "" },
1130 internaldate.unwrap_or(""),
1131 content.len()
1132 ))
1133 .await?;
1134
1135 let Some(res) = self.read_response().await? else {
1136 return Err(Error::Append);
1137 };
1138 let Response::Continue { .. } = res.parsed() else {
1139 return Err(Error::Append);
1140 };
1141
1142 self.stream.as_mut().write_all(content).await?;
1143 self.stream.as_mut().write_all(b"\r\n").await?;
1144 self.stream.flush().await?;
1145 self.conn
1146 .check_done_ok(&id, Some(self.unsolicited_responses_tx.clone()))
1147 .await?;
1148 Ok(())
1149 }
1150
1151 pub async fn search<S: AsRef<str>>(&mut self, query: S) -> Result<HashSet<Seq>> {
1196 let id = self
1197 .run_command(&format!("SEARCH {}", query.as_ref()))
1198 .await?;
1199 let seqs = parse_ids(
1200 &mut self.conn.stream,
1201 self.unsolicited_responses_tx.clone(),
1202 id,
1203 )
1204 .await?;
1205
1206 Ok(seqs)
1207 }
1208
1209 pub async fn uid_search<S: AsRef<str>>(&mut self, query: S) -> Result<HashSet<Uid>> {
1213 let id = self
1214 .run_command(&format!("UID SEARCH {}", query.as_ref()))
1215 .await?;
1216 let uids = parse_ids(
1217 &mut self.conn.stream,
1218 self.unsolicited_responses_tx.clone(),
1219 id,
1220 )
1221 .await?;
1222
1223 Ok(uids)
1224 }
1225
1226 pub async fn get_quota(&mut self, quota_root: &str) -> Result<Quota> {
1228 let id = self
1229 .run_command(format!("GETQUOTA {}", quote!(quota_root)))
1230 .await?;
1231 let c = parse_get_quota(
1232 &mut self.conn.stream,
1233 self.unsolicited_responses_tx.clone(),
1234 id,
1235 )
1236 .await?;
1237 Ok(c)
1238 }
1239
1240 pub async fn get_quota_root(
1242 &mut self,
1243 mailbox_name: &str,
1244 ) -> Result<(Vec<QuotaRoot>, Vec<Quota>)> {
1245 let id = self
1246 .run_command(format!("GETQUOTAROOT {}", quote!(mailbox_name)))
1247 .await?;
1248 let c = parse_get_quota_root(
1249 &mut self.conn.stream,
1250 self.unsolicited_responses_tx.clone(),
1251 id,
1252 )
1253 .await?;
1254 Ok(c)
1255 }
1256
1257 pub async fn get_metadata(
1259 &mut self,
1260 mailbox_name: &str,
1261 options: &str,
1262 entry_specifier: &str,
1263 ) -> Result<Vec<Metadata>> {
1264 let options = if options.is_empty() {
1265 String::new()
1266 } else {
1267 format!(" {options}")
1268 };
1269 let id = self
1270 .run_command(format!(
1271 "GETMETADATA {} {}{}",
1272 quote!(mailbox_name),
1273 options,
1274 entry_specifier
1275 ))
1276 .await?;
1277 let metadata = parse_metadata(
1278 &mut self.conn.stream,
1279 mailbox_name,
1280 self.unsolicited_responses_tx.clone(),
1281 id,
1282 )
1283 .await?;
1284 Ok(metadata)
1285 }
1286
1287 pub async fn id(
1291 &mut self,
1292 identification: impl IntoIterator<Item = (&str, Option<&str>)>,
1293 ) -> Result<Option<HashMap<String, String>>> {
1294 let id = self
1295 .run_command(format!("ID ({})", format_identification(identification)))
1296 .await?;
1297 let server_identification = parse_id(
1298 &mut self.conn.stream,
1299 self.unsolicited_responses_tx.clone(),
1300 id,
1301 )
1302 .await?;
1303 Ok(server_identification)
1304 }
1305
1306 pub async fn id_nil(&mut self) -> Result<Option<HashMap<String, String>>> {
1310 let id = self.run_command("ID NIL").await?;
1311 let server_identification = parse_id(
1312 &mut self.conn.stream,
1313 self.unsolicited_responses_tx.clone(),
1314 id,
1315 )
1316 .await?;
1317 Ok(server_identification)
1318 }
1319
1320 pub async fn run_command_and_check_ok<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
1323 self.conn
1324 .run_command_and_check_ok(
1325 command.as_ref(),
1326 Some(self.unsolicited_responses_tx.clone()),
1327 )
1328 .await?;
1329
1330 Ok(())
1331 }
1332
1333 pub async fn run_command<S: AsRef<str>>(&mut self, command: S) -> Result<RequestId> {
1335 let id = self.conn.run_command(command.as_ref()).await?;
1336
1337 Ok(id)
1338 }
1339
1340 pub async fn run_command_untagged<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
1342 self.conn.run_command_untagged(command.as_ref()).await?;
1343
1344 Ok(())
1345 }
1346
1347 pub async fn read_response(&mut self) -> io::Result<Option<ResponseData>> {
1349 self.conn.read_response().await
1350 }
1351}
1352
1353impl<T: Read + Write + Unpin + fmt::Debug> Connection<T> {
1354 unsafe_pinned!(stream: ImapStream<T>);
1355
1356 pub fn get_ref(&self) -> &T {
1358 self.stream.get_ref()
1359 }
1360
1361 pub fn get_mut(&mut self) -> &mut T {
1363 self.stream.get_mut()
1364 }
1365
1366 pub fn into_inner(self) -> T {
1368 let Self { stream, .. } = self;
1369 stream.into_inner()
1370 }
1371
1372 pub async fn read_response(&mut self) -> io::Result<Option<ResponseData>> {
1374 self.stream.try_next().await
1375 }
1376
1377 pub(crate) async fn run_command_untagged(&mut self, command: &str) -> Result<()> {
1378 self.stream
1379 .encode(Request(None, command.as_bytes().into()))
1380 .await?;
1381 self.stream.flush().await?;
1382 Ok(())
1383 }
1384
1385 pub(crate) async fn run_command(&mut self, command: &str) -> Result<RequestId> {
1386 let request_id = self.request_ids.next().unwrap(); self.stream
1388 .encode(Request(Some(request_id.clone()), command.as_bytes().into()))
1389 .await?;
1390 self.stream.flush().await?;
1391 Ok(request_id)
1392 }
1393
1394 pub async fn run_command_and_check_ok(
1396 &mut self,
1397 command: &str,
1398 unsolicited: Option<channel::Sender<UnsolicitedResponse>>,
1399 ) -> Result<()> {
1400 let id = self.run_command(command).await?;
1401 self.check_done_ok(&id, unsolicited).await?;
1402
1403 Ok(())
1404 }
1405
1406 pub(crate) async fn check_done_ok(
1407 &mut self,
1408 id: &RequestId,
1409 unsolicited: Option<channel::Sender<UnsolicitedResponse>>,
1410 ) -> Result<()> {
1411 if let Some(first_res) = self.stream.try_next().await? {
1412 self.check_done_ok_from(id, unsolicited, first_res).await
1413 } else {
1414 Err(Error::ConnectionLost)
1415 }
1416 }
1417
1418 pub(crate) async fn check_done_ok_from(
1419 &mut self,
1420 id: &RequestId,
1421 unsolicited: Option<channel::Sender<UnsolicitedResponse>>,
1422 mut response: ResponseData,
1423 ) -> Result<()> {
1424 loop {
1425 if let Response::Done {
1426 status,
1427 code,
1428 information,
1429 tag,
1430 } = response.parsed()
1431 {
1432 self.check_status_ok(status, code.as_ref(), information.as_deref())?;
1433
1434 if tag == id {
1435 return Ok(());
1436 }
1437 }
1438
1439 if let Some(unsolicited) = unsolicited.clone() {
1440 handle_unilateral(response, unsolicited);
1441 }
1442
1443 let Some(res) = self.stream.try_next().await? else {
1444 return Err(Error::ConnectionLost);
1445 };
1446 response = res;
1447 }
1448 }
1449
1450 pub(crate) fn check_status_ok(
1451 &self,
1452 status: &imap_proto::Status,
1453 code: Option<&imap_proto::ResponseCode<'_>>,
1454 information: Option<&str>,
1455 ) -> Result<()> {
1456 use imap_proto::Status;
1457 match status {
1458 Status::Ok => Ok(()),
1459 Status::Bad => Err(Error::Bad(format!("code: {code:?}, info: {information:?}"))),
1460 Status::No => Err(Error::No(format!("code: {code:?}, info: {information:?}"))),
1461 _ => Err(Error::Io(io::Error::other(format!(
1462 "status: {status:?}, code: {code:?}, information: {information:?}"
1463 )))),
1464 }
1465 }
1466}
1467
1468fn validate_str(value: &str) -> Result<String> {
1469 let quoted = quote!(value);
1470 if quoted.find('\n').is_some() {
1471 return Err(Error::Validate(ValidateError('\n')));
1472 }
1473 if quoted.find('\r').is_some() {
1474 return Err(Error::Validate(ValidateError('\r')));
1475 }
1476 Ok(quoted)
1477}
1478
1479#[cfg(test)]
1480mod tests {
1481 use pretty_assertions::assert_eq;
1482
1483 use super::super::error::Result;
1484 use super::super::mock_stream::MockStream;
1485 use super::*;
1486 use std::borrow::Cow;
1487 use std::future::Future;
1488
1489 use async_std::sync::{Arc, Mutex};
1490 use futures::StreamExt;
1491 use imap_proto::Status;
1492
1493 macro_rules! mock_client {
1494 ($s:expr) => {
1495 Client::new($s)
1496 };
1497 }
1498
1499 macro_rules! mock_session {
1500 ($s:expr) => {
1501 Session::new(mock_client!($s).conn)
1502 };
1503 }
1504
1505 macro_rules! assert_eq_bytes {
1506 ($a:expr, $b:expr, $c:expr) => {
1507 assert_eq!(
1508 std::str::from_utf8($a).unwrap(),
1509 std::str::from_utf8($b).unwrap(),
1510 $c
1511 )
1512 };
1513 }
1514
1515 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1516 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1517 async fn fetch_body() {
1518 let response = "a0 OK Logged in.\r\n\
1519 * 2 FETCH (BODY[TEXT] {3}\r\nfoo)\r\n\
1520 a0 OK FETCH completed\r\n";
1521 let mut session = mock_session!(MockStream::new(response.as_bytes().to_vec()));
1522 session.read_response().await.unwrap().unwrap();
1523 session.read_response().await.unwrap().unwrap();
1524 }
1525
1526 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1527 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1528 async fn readline_delay_read() {
1529 let greeting = "* OK Dovecot ready.\r\n";
1530 let mock_stream = MockStream::default()
1531 .with_buf(greeting.as_bytes().to_vec())
1532 .with_delay();
1533
1534 let mut client = mock_client!(mock_stream);
1535 let actual_response = client.read_response().await.unwrap().unwrap();
1536 assert_eq!(
1537 actual_response.parsed(),
1538 &Response::Data {
1539 status: Status::Ok,
1540 code: None,
1541 information: Some(Cow::Borrowed("Dovecot ready.")),
1542 }
1543 );
1544 }
1545
1546 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1547 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1548 async fn readline_eof() {
1549 let mock_stream = MockStream::default().with_eof();
1550 let mut client = mock_client!(mock_stream);
1551 let res = client.read_response().await.unwrap();
1552 assert!(res.is_none());
1553 }
1554
1555 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1556 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1557 #[should_panic]
1558 async fn readline_err() {
1559 let mock_stream = MockStream::default().with_err();
1561 let mut client = mock_client!(mock_stream);
1562 client.read_response().await.unwrap().unwrap();
1563 }
1564
1565 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1566 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1567 async fn authenticate() {
1568 let response = b"+ YmFy\r\n\
1569 A0001 OK Logged in\r\n"
1570 .to_vec();
1571 let command = "A0001 AUTHENTICATE PLAIN\r\n\
1572 Zm9v\r\n";
1573 let mock_stream = MockStream::new(response);
1574 let client = mock_client!(mock_stream);
1575 enum Authenticate {
1576 Auth,
1577 }
1578 impl Authenticator for &Authenticate {
1579 type Response = Vec<u8>;
1580 fn process(&mut self, challenge: &[u8]) -> Self::Response {
1581 assert!(challenge == b"bar", "Invalid authenticate challenge");
1582 b"foo".to_vec()
1583 }
1584 }
1585 let session = client
1586 .authenticate("PLAIN", &Authenticate::Auth)
1587 .await
1588 .ok()
1589 .unwrap();
1590 assert_eq_bytes!(
1591 &session.stream.inner.written_buf,
1592 command.as_bytes(),
1593 "Invalid authenticate command"
1594 );
1595 }
1596
1597 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1598 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1599 async fn login() {
1600 let response = b"A0001 OK Logged in\r\n".to_vec();
1601 let username = "username";
1602 let password = "password";
1603 let command = format!("A0001 LOGIN {} {}\r\n", quote!(username), quote!(password));
1604 let mock_stream = MockStream::new(response);
1605 let client = mock_client!(mock_stream);
1606 if let Ok(session) = client.login(username, password).await {
1607 assert_eq!(
1608 session.stream.inner.written_buf,
1609 command.as_bytes().to_vec(),
1610 "Invalid login command"
1611 );
1612 } else {
1613 unreachable!("invalid login");
1614 }
1615 }
1616
1617 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1618 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1619 async fn logout() {
1620 let response = b"A0001 OK Logout completed.\r\n".to_vec();
1621 let command = "A0001 LOGOUT\r\n";
1622 let mock_stream = MockStream::new(response);
1623 let mut session = mock_session!(mock_stream);
1624 session.logout().await.unwrap();
1625 assert!(
1626 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1627 "Invalid logout command"
1628 );
1629 }
1630
1631 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1632 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1633 async fn rename() {
1634 let response = b"A0001 OK RENAME completed\r\n".to_vec();
1635 let current_mailbox_name = "INBOX";
1636 let new_mailbox_name = "NEWINBOX";
1637 let command = format!(
1638 "A0001 RENAME {} {}\r\n",
1639 quote!(current_mailbox_name),
1640 quote!(new_mailbox_name)
1641 );
1642 let mock_stream = MockStream::new(response);
1643 let mut session = mock_session!(mock_stream);
1644 session
1645 .rename(current_mailbox_name, new_mailbox_name)
1646 .await
1647 .unwrap();
1648 assert!(
1649 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1650 "Invalid rename command"
1651 );
1652 }
1653
1654 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1655 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1656 async fn subscribe() {
1657 let response = b"A0001 OK SUBSCRIBE completed\r\n".to_vec();
1658 let mailbox = "INBOX";
1659 let command = format!("A0001 SUBSCRIBE {}\r\n", quote!(mailbox));
1660 let mock_stream = MockStream::new(response);
1661 let mut session = mock_session!(mock_stream);
1662 session.subscribe(mailbox).await.unwrap();
1663 assert!(
1664 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1665 "Invalid subscribe command"
1666 );
1667 }
1668
1669 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1670 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1671 async fn unsubscribe() {
1672 let response = b"A0001 OK UNSUBSCRIBE completed\r\n".to_vec();
1673 let mailbox = "INBOX";
1674 let command = format!("A0001 UNSUBSCRIBE {}\r\n", quote!(mailbox));
1675 let mock_stream = MockStream::new(response);
1676 let mut session = mock_session!(mock_stream);
1677 session.unsubscribe(mailbox).await.unwrap();
1678 assert!(
1679 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1680 "Invalid unsubscribe command"
1681 );
1682 }
1683
1684 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1685 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1686 async fn expunge() {
1687 let response = b"A0001 OK EXPUNGE completed\r\n".to_vec();
1688 let mock_stream = MockStream::new(response);
1689 let mut session = mock_session!(mock_stream);
1690 session.expunge().await.unwrap().collect::<Vec<_>>().await;
1691 assert!(
1692 session.stream.inner.written_buf == b"A0001 EXPUNGE\r\n".to_vec(),
1693 "Invalid expunge command"
1694 );
1695 }
1696
1697 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1698 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1699 async fn uid_expunge() {
1700 let response = b"* 2 EXPUNGE\r\n\
1701 * 3 EXPUNGE\r\n\
1702 * 4 EXPUNGE\r\n\
1703 A0001 OK UID EXPUNGE completed\r\n"
1704 .to_vec();
1705 let mock_stream = MockStream::new(response);
1706 let mut session = mock_session!(mock_stream);
1707 session
1708 .uid_expunge("2:4")
1709 .await
1710 .unwrap()
1711 .collect::<Vec<_>>()
1712 .await;
1713 assert!(
1714 session.stream.inner.written_buf == b"A0001 UID EXPUNGE 2:4\r\n".to_vec(),
1715 "Invalid expunge command"
1716 );
1717 }
1718
1719 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1720 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1721 async fn check() {
1722 let response = b"A0001 OK CHECK completed\r\n".to_vec();
1723 let mock_stream = MockStream::new(response);
1724 let mut session = mock_session!(mock_stream);
1725 session.check().await.unwrap();
1726 assert!(
1727 session.stream.inner.written_buf == b"A0001 CHECK\r\n".to_vec(),
1728 "Invalid check command"
1729 );
1730 }
1731
1732 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1733 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1734 async fn examine() {
1735 let response = b"* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n\
1736 * OK [PERMANENTFLAGS ()] Read-only mailbox.\r\n\
1737 * 1 EXISTS\r\n\
1738 * 1 RECENT\r\n\
1739 * OK [UNSEEN 1] First unseen.\r\n\
1740 * OK [UIDVALIDITY 1257842737] UIDs valid\r\n\
1741 * OK [UIDNEXT 2] Predicted next UID\r\n\
1742 A0001 OK [READ-ONLY] Select completed.\r\n"
1743 .to_vec();
1744 let expected_mailbox = Mailbox {
1745 flags: vec![
1746 Flag::Answered,
1747 Flag::Flagged,
1748 Flag::Deleted,
1749 Flag::Seen,
1750 Flag::Draft,
1751 ],
1752 exists: 1,
1753 recent: 1,
1754 unseen: Some(1),
1755 permanent_flags: vec![],
1756 uid_next: Some(2),
1757 uid_validity: Some(1257842737),
1758 highest_modseq: None,
1759 };
1760 let mailbox_name = "INBOX";
1761 let command = format!("A0001 EXAMINE {}\r\n", quote!(mailbox_name));
1762 let mock_stream = MockStream::new(response);
1763 let mut session = mock_session!(mock_stream);
1764 let mailbox = session.examine(mailbox_name).await.unwrap();
1765 assert!(
1766 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1767 "Invalid examine command"
1768 );
1769 assert_eq!(mailbox, expected_mailbox);
1770 }
1771
1772 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1773 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1774 async fn select() {
1775 let response = b"* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n\
1776 * OK [PERMANENTFLAGS (\\* \\Answered \\Flagged \\Deleted \\Draft \\Seen)] \
1777 Read-only mailbox.\r\n\
1778 * 1 EXISTS\r\n\
1779 * 1 RECENT\r\n\
1780 * OK [UNSEEN 1] First unseen.\r\n\
1781 * OK [UIDVALIDITY 1257842737] UIDs valid\r\n\
1782 * OK [UIDNEXT 2] Predicted next UID\r\n\
1783 * OK [HIGHESTMODSEQ 90060115205545359] Highest mailbox modsequence\r\n\
1784 A0001 OK [READ-ONLY] Select completed.\r\n"
1785 .to_vec();
1786 let expected_mailbox = Mailbox {
1787 flags: vec![
1788 Flag::Answered,
1789 Flag::Flagged,
1790 Flag::Deleted,
1791 Flag::Seen,
1792 Flag::Draft,
1793 ],
1794 exists: 1,
1795 recent: 1,
1796 unseen: Some(1),
1797 permanent_flags: vec![
1798 Flag::MayCreate,
1799 Flag::Answered,
1800 Flag::Flagged,
1801 Flag::Deleted,
1802 Flag::Draft,
1803 Flag::Seen,
1804 ],
1805 uid_next: Some(2),
1806 uid_validity: Some(1257842737),
1807 highest_modseq: Some(90060115205545359),
1808 };
1809 let mailbox_name = "INBOX";
1810 let command = format!("A0001 SELECT {}\r\n", quote!(mailbox_name));
1811 let mock_stream = MockStream::new(response);
1812 let mut session = mock_session!(mock_stream);
1813 let mailbox = session.select(mailbox_name).await.unwrap();
1814 assert!(
1815 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1816 "Invalid select command"
1817 );
1818 assert_eq!(mailbox, expected_mailbox);
1819 }
1820
1821 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1822 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1823 async fn search() {
1824 let response = b"* SEARCH 1 2 3 4 5\r\n\
1825 A0001 OK Search completed\r\n"
1826 .to_vec();
1827 let mock_stream = MockStream::new(response);
1828 let mut session = mock_session!(mock_stream);
1829 let ids = session.search("Unseen").await.unwrap();
1830 let ids: HashSet<u32> = ids.iter().cloned().collect();
1831 assert!(
1832 session.stream.inner.written_buf == b"A0001 SEARCH Unseen\r\n".to_vec(),
1833 "Invalid search command"
1834 );
1835 assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect());
1836 }
1837
1838 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1839 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1840 async fn uid_search() {
1841 let response = b"* SEARCH 1 2 3 4 5\r\n\
1842 A0001 OK Search completed\r\n"
1843 .to_vec();
1844 let mock_stream = MockStream::new(response);
1845 let mut session = mock_session!(mock_stream);
1846 let ids = session.uid_search("Unseen").await.unwrap();
1847 let ids: HashSet<Uid> = ids.iter().cloned().collect();
1848 assert!(
1849 session.stream.inner.written_buf == b"A0001 UID SEARCH Unseen\r\n".to_vec(),
1850 "Invalid search command"
1851 );
1852 assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect());
1853 }
1854
1855 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1856 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1857 async fn uid_search_unordered() {
1858 let response = b"* SEARCH 1 2 3 4 5\r\n\
1859 A0002 OK CAPABILITY completed\r\n\
1860 A0001 OK Search completed\r\n"
1861 .to_vec();
1862 let mock_stream = MockStream::new(response);
1863 let mut session = mock_session!(mock_stream);
1864 let ids = session.uid_search("Unseen").await.unwrap();
1865 let ids: HashSet<Uid> = ids.iter().cloned().collect();
1866 assert!(
1867 session.stream.inner.written_buf == b"A0001 UID SEARCH Unseen\r\n".to_vec(),
1868 "Invalid search command"
1869 );
1870 assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect());
1871 }
1872
1873 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1874 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1875 async fn capability() {
1876 let response = b"* CAPABILITY IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r\n\
1877 A0001 OK CAPABILITY completed\r\n"
1878 .to_vec();
1879 let expected_capabilities = vec!["IMAP4rev1", "STARTTLS", "AUTH=GSSAPI", "LOGINDISABLED"];
1880 let mock_stream = MockStream::new(response);
1881 let mut session = mock_session!(mock_stream);
1882 let capabilities = session.capabilities().await.unwrap();
1883 assert!(
1884 session.stream.inner.written_buf == b"A0001 CAPABILITY\r\n".to_vec(),
1885 "Invalid capability command"
1886 );
1887 assert_eq!(capabilities.len(), 4);
1888 for e in expected_capabilities {
1889 assert!(capabilities.has_str(e));
1890 }
1891 }
1892
1893 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1894 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1895 async fn create() {
1896 let response = b"A0001 OK CREATE completed\r\n".to_vec();
1897 let mailbox_name = "INBOX";
1898 let command = format!("A0001 CREATE {}\r\n", quote!(mailbox_name));
1899 let mock_stream = MockStream::new(response);
1900 let mut session = mock_session!(mock_stream);
1901 session.create(mailbox_name).await.unwrap();
1902 assert!(
1903 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1904 "Invalid create command"
1905 );
1906 }
1907
1908 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1909 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1910 async fn delete() {
1911 let response = b"A0001 OK DELETE completed\r\n".to_vec();
1912 let mailbox_name = "INBOX";
1913 let command = format!("A0001 DELETE {}\r\n", quote!(mailbox_name));
1914 let mock_stream = MockStream::new(response);
1915 let mut session = mock_session!(mock_stream);
1916 session.delete(mailbox_name).await.unwrap();
1917 assert!(
1918 session.stream.inner.written_buf == command.as_bytes().to_vec(),
1919 "Invalid delete command"
1920 );
1921 }
1922
1923 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1924 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1925 async fn noop() {
1926 let response = b"A0001 OK NOOP completed\r\n".to_vec();
1927 let mock_stream = MockStream::new(response);
1928 let mut session = mock_session!(mock_stream);
1929 session.noop().await.unwrap();
1930 assert!(
1931 session.stream.inner.written_buf == b"A0001 NOOP\r\n".to_vec(),
1932 "Invalid noop command"
1933 );
1934 }
1935
1936 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1937 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1938 async fn close() {
1939 let response = b"A0001 OK CLOSE completed\r\n".to_vec();
1940 let mock_stream = MockStream::new(response);
1941 let mut session = mock_session!(mock_stream);
1942 session.close().await.unwrap();
1943 assert!(
1944 session.stream.inner.written_buf == b"A0001 CLOSE\r\n".to_vec(),
1945 "Invalid close command"
1946 );
1947 }
1948
1949 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1950 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1951 async fn store() {
1952 generic_store(" ", |c, set, query| async move {
1953 c.lock()
1954 .await
1955 .store(set, query)
1956 .await?
1957 .collect::<Vec<_>>()
1958 .await;
1959 Ok(())
1960 })
1961 .await;
1962 }
1963
1964 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1965 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1966 async fn uid_store() {
1967 generic_store(" UID ", |c, set, query| async move {
1968 c.lock()
1969 .await
1970 .uid_store(set, query)
1971 .await?
1972 .collect::<Vec<_>>()
1973 .await;
1974 Ok(())
1975 })
1976 .await;
1977 }
1978
1979 async fn generic_store<'a, F, T, K>(prefix: &'a str, op: F)
1980 where
1981 F: 'a + FnOnce(Arc<Mutex<Session<MockStream>>>, &'a str, &'a str) -> K,
1982 K: 'a + Future<Output = Result<T>>,
1983 {
1984 let res = "* 2 FETCH (FLAGS (\\Deleted \\Seen))\r\n\
1985 * 3 FETCH (FLAGS (\\Deleted))\r\n\
1986 * 4 FETCH (FLAGS (\\Deleted \\Flagged \\Seen))\r\n\
1987 A0001 OK STORE completed\r\n";
1988
1989 generic_with_uid(res, "STORE", "2.4", "+FLAGS (\\Deleted)", prefix, op).await;
1990 }
1991
1992 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
1993 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
1994 async fn copy() {
1995 generic_copy(" ", |c, set, query| async move {
1996 c.lock().await.copy(set, query).await?;
1997 Ok(())
1998 })
1999 .await;
2000 }
2001
2002 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2003 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2004 async fn uid_copy() {
2005 generic_copy(" UID ", |c, set, query| async move {
2006 c.lock().await.uid_copy(set, query).await?;
2007 Ok(())
2008 })
2009 .await;
2010 }
2011
2012 async fn generic_copy<'a, F, T, K>(prefix: &'a str, op: F)
2013 where
2014 F: 'a + FnOnce(Arc<Mutex<Session<MockStream>>>, &'a str, &'a str) -> K,
2015 K: 'a + Future<Output = Result<T>>,
2016 {
2017 generic_with_uid(
2018 "A0001 OK COPY completed\r\n",
2019 "COPY",
2020 "2:4",
2021 "MEETING",
2022 prefix,
2023 op,
2024 )
2025 .await;
2026 }
2027
2028 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2029 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2030 async fn mv() {
2031 let response = b"* OK [COPYUID 1511554416 142,399 41:42] Moved UIDs.\r\n\
2032 * 2 EXPUNGE\r\n\
2033 * 1 EXPUNGE\r\n\
2034 A0001 OK Move completed\r\n"
2035 .to_vec();
2036 let mailbox_name = "MEETING";
2037 let command = format!("A0001 MOVE 1:2 {}\r\n", quote!(mailbox_name));
2038 let mock_stream = MockStream::new(response);
2039 let mut session = mock_session!(mock_stream);
2040 session.mv("1:2", mailbox_name).await.unwrap();
2041 assert!(
2042 session.stream.inner.written_buf == command.as_bytes().to_vec(),
2043 "Invalid move command"
2044 );
2045 }
2046
2047 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2048 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2049 async fn uid_mv() {
2050 let response = b"* OK [COPYUID 1511554416 142,399 41:42] Moved UIDs.\r\n\
2051 * 2 EXPUNGE\r\n\
2052 * 1 EXPUNGE\r\n\
2053 A0001 OK Move completed\r\n"
2054 .to_vec();
2055 let mailbox_name = "MEETING";
2056 let command = format!("A0001 UID MOVE 41:42 {}\r\n", quote!(mailbox_name));
2057 let mock_stream = MockStream::new(response);
2058 let mut session = mock_session!(mock_stream);
2059 session.uid_mv("41:42", mailbox_name).await.unwrap();
2060 assert!(
2061 session.stream.inner.written_buf == command.as_bytes().to_vec(),
2062 "Invalid uid move command"
2063 );
2064 }
2065
2066 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2067 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2068 async fn fetch() {
2069 generic_fetch(" ", |c, seq, query| async move {
2070 c.lock()
2071 .await
2072 .fetch(seq, query)
2073 .await?
2074 .collect::<Vec<_>>()
2075 .await;
2076
2077 Ok(())
2078 })
2079 .await;
2080 }
2081
2082 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2083 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2084 async fn uid_fetch() {
2085 generic_fetch(" UID ", |c, seq, query| async move {
2086 c.lock()
2087 .await
2088 .uid_fetch(seq, query)
2089 .await?
2090 .collect::<Vec<_>>()
2091 .await;
2092 Ok(())
2093 })
2094 .await;
2095 }
2096
2097 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2098 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2099 async fn fetch_unexpected_eof() {
2100 let response = b"".to_vec();
2102
2103 let mock_stream = MockStream::new(response);
2104 let mut session = mock_session!(mock_stream);
2105
2106 {
2107 let mut fetch_result = session
2108 .uid_fetch("1:*", "(FLAGS BODY.PEEK[])")
2109 .await
2110 .unwrap();
2111
2112 let err = fetch_result.try_next().await.unwrap_err();
2114 let Error::Io(io_err) = err else {
2115 panic!("Unexpected error type: {err}")
2116 };
2117 assert_eq!(io_err.kind(), io::ErrorKind::UnexpectedEof);
2118 }
2119
2120 assert_eq!(
2121 session.stream.inner.written_buf,
2122 b"A0001 UID FETCH 1:* (FLAGS BODY.PEEK[])\r\n".to_vec()
2123 );
2124 }
2125
2126 async fn generic_fetch<'a, F, T, K>(prefix: &'a str, op: F)
2127 where
2128 F: 'a + FnOnce(Arc<Mutex<Session<MockStream>>>, &'a str, &'a str) -> K,
2129 K: 'a + Future<Output = Result<T>>,
2130 {
2131 generic_with_uid(
2132 "A0001 OK FETCH completed\r\n",
2133 "FETCH",
2134 "1",
2135 "BODY[]",
2136 prefix,
2137 op,
2138 )
2139 .await;
2140 }
2141
2142 async fn generic_with_uid<'a, F, T, K>(
2143 res: &'a str,
2144 cmd: &'a str,
2145 seq: &'a str,
2146 query: &'a str,
2147 prefix: &'a str,
2148 op: F,
2149 ) where
2150 F: 'a + FnOnce(Arc<Mutex<Session<MockStream>>>, &'a str, &'a str) -> K,
2151 K: 'a + Future<Output = Result<T>>,
2152 {
2153 let resp = res.as_bytes().to_vec();
2154 let line = format!("A0001{prefix}{cmd} {seq} {query}\r\n");
2155 let session = Arc::new(Mutex::new(mock_session!(MockStream::new(resp))));
2156
2157 {
2158 let _ = op(session.clone(), seq, query).await.unwrap();
2159 }
2160 assert!(
2161 session.lock().await.stream.inner.written_buf == line.as_bytes().to_vec(),
2162 "Invalid command"
2163 );
2164 }
2165
2166 #[test]
2167 fn quote_backslash() {
2168 assert_eq!("\"test\\\\text\"", quote!(r"test\text"));
2169 }
2170
2171 #[test]
2172 fn quote_dquote() {
2173 assert_eq!("\"test\\\"text\"", quote!("test\"text"));
2174 }
2175
2176 #[test]
2177 fn validate_random() {
2178 assert_eq!(
2179 "\"~iCQ_k;>[&\\\"sVCvUW`e<<P!wJ\"",
2180 &validate_str("~iCQ_k;>[&\"sVCvUW`e<<P!wJ").unwrap()
2181 );
2182 }
2183
2184 #[test]
2185 fn validate_newline() {
2186 if let Err(ref e) = validate_str("test\nstring") {
2187 if let Error::Validate(ref ve) = e {
2188 if ve.0 == '\n' {
2189 return;
2190 }
2191 }
2192 panic!("Wrong error: {e:?}");
2193 }
2194 panic!("No error");
2195 }
2196
2197 #[test]
2198 #[allow(unreachable_patterns)]
2199 fn validate_carriage_return() {
2200 if let Err(ref e) = validate_str("test\rstring") {
2201 if let Error::Validate(ref ve) = e {
2202 if ve.0 == '\r' {
2203 return;
2204 }
2205 }
2206 panic!("Wrong error: {e:?}");
2207 }
2208 panic!("No error");
2209 }
2210
2211 #[cfg(feature = "runtime-tokio")]
2215 async fn handle_client(stream: tokio::io::DuplexStream) -> Result<()> {
2216 use tokio::io::AsyncBufReadExt;
2217
2218 let (reader, mut writer) = tokio::io::split(stream);
2219 let reader = tokio::io::BufReader::new(reader);
2220
2221 let mut lines = reader.lines();
2222 while let Some(line) = lines.next_line().await? {
2223 let (request_id, request) = line.split_once(' ').unwrap();
2224 eprintln!("Received request {request_id}.");
2225
2226 let (id, _) = request
2227 .strip_prefix("FETCH ")
2228 .unwrap()
2229 .split_once(' ')
2230 .unwrap();
2231 let id = id.parse().unwrap();
2232
2233 let mut body = concat!(
2234 "From: Bob <bob@example.com>\r\n",
2235 "To: Alice <alice@example.org>\r\n",
2236 "Subject: Test\r\n",
2237 "Message-Id: <foobar@example.com>\r\n",
2238 "Date: Sun, 22 Mar 2020 00:00:00 +0100\r\n",
2239 "\r\n",
2240 )
2241 .to_string();
2242 for _ in 1..id {
2243 body +=
2244 "012345678901234567890123456789012345678901234567890123456789012345678901\r\n";
2245 }
2246 let body_len = body.len();
2247
2248 let response = format!("* {id} FETCH (RFC822.SIZE {body_len} BODY[] {{{body_len}}}\r\n{body} FLAGS (\\Seen))\r\n");
2249 writer.write_all(response.as_bytes()).await?;
2250 writer
2251 .write_all(format!("{request_id} OK FETCH completed\r\n").as_bytes())
2252 .await?;
2253 writer.flush().await?;
2254 }
2255
2256 Ok(())
2257 }
2258
2259 #[cfg(feature = "runtime-tokio")]
2266 #[cfg_attr(
2267 feature = "runtime-tokio",
2268 tokio::test(flavor = "multi_thread", worker_threads = 2)
2269 )]
2270 async fn large_fetch() -> Result<()> {
2271 use futures::TryStreamExt;
2272
2273 let (client, server) = tokio::io::duplex(4096);
2274 tokio::spawn(handle_client(server));
2275
2276 let client = crate::Client::new(client);
2277 let mut imap_session = Session::new(client.conn);
2278
2279 for i in 200..300 {
2280 eprintln!("Fetching {i}.");
2281 let mut messages_stream = imap_session
2282 .fetch(format!("{i}"), "(RFC822.SIZE BODY.PEEK[] FLAGS)")
2283 .await?;
2284 let fetch = messages_stream
2285 .try_next()
2286 .await?
2287 .expect("no FETCH returned");
2288 let body = fetch.body().expect("message did not have a body!");
2289 assert_eq!(body.len(), 76 + 74 * i);
2290
2291 let no_fetch = messages_stream.try_next().await?;
2292 assert!(no_fetch.is_none());
2293 drop(messages_stream);
2294 }
2295
2296 Ok(())
2297 }
2298
2299 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2300 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2301 async fn status() {
2302 {
2303 let response = b"* STATUS INBOX (UIDNEXT 25)\r\n\
2304 A0001 OK [CLIENTBUG] Status on selected mailbox completed (0.001 + 0.000 secs).\r\n"
2305 .to_vec();
2306
2307 let mock_stream = MockStream::new(response);
2308 let mut session = mock_session!(mock_stream);
2309 let status = session.status("INBOX", "(UIDNEXT)").await.unwrap();
2310 assert_eq!(
2311 session.stream.inner.written_buf,
2312 b"A0001 STATUS \"INBOX\" (UIDNEXT)\r\n".to_vec()
2313 );
2314 assert_eq!(status.uid_next, Some(25));
2315 }
2316
2317 {
2318 let response = b"* STATUS INBOX (RECENT 15)\r\n\
2319 A0001 OK STATUS completed\r\n"
2320 .to_vec();
2321
2322 let mock_stream = MockStream::new(response);
2323 let mut session = mock_session!(mock_stream);
2324 let status = session.status("INBOX", "(RECENT)").await.unwrap();
2325 assert_eq!(
2326 session.stream.inner.written_buf,
2327 b"A0001 STATUS \"INBOX\" (RECENT)\r\n".to_vec()
2328 );
2329 assert_eq!(status.recent, 15);
2330 }
2331
2332 {
2333 let response = b"* STATUS blurdybloop (MESSAGES 231 UIDNEXT 44292)\r\n\
2335 A0001 OK STATUS completed\r\n"
2336 .to_vec();
2337
2338 let mock_stream = MockStream::new(response);
2339 let mut session = mock_session!(mock_stream);
2340 let status = session
2341 .status("blurdybloop", "(UIDNEXT MESSAGES)")
2342 .await
2343 .unwrap();
2344 assert_eq!(
2345 session.stream.inner.written_buf,
2346 b"A0001 STATUS \"blurdybloop\" (UIDNEXT MESSAGES)\r\n".to_vec()
2347 );
2348 assert_eq!(status.uid_next, Some(44292));
2349 assert_eq!(status.exists, 231);
2350 }
2351 }
2352
2353 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2354 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2355 async fn append() {
2356 {
2357 let response = b"+ OK\r\nA0001 OK [APPENDUID 1725735035 2] Append completed (0.052 + 12.097 + 0.049 secs).\r\n".to_vec();
2361
2362 let mock_stream = MockStream::new(response);
2363 let mut session = mock_session!(mock_stream);
2364 session
2365 .append("INBOX", Some(r"(\Seen)"), None, "foobarbaz")
2366 .await
2367 .unwrap();
2368 assert_eq!(
2369 session.stream.inner.written_buf,
2370 b"A0001 APPEND \"INBOX\" (\\Seen) {9}\r\nfoobarbaz\r\n".to_vec()
2371 );
2372 }
2373
2374 {
2375 let response = b"+ OK\r\n* 3 EXISTS\r\n* 2 RECENT\r\nA0001 OK [APPENDUID 1725735035 2] Append completed (0.052 + 12.097 + 0.049 secs).\r\n".to_vec();
2379
2380 let mock_stream = MockStream::new(response);
2381 let mut session = mock_session!(mock_stream);
2382 session
2383 .append("INBOX", Some(r"(\Seen)"), None, "foobarbaz")
2384 .await
2385 .unwrap();
2386 assert_eq!(
2387 session.stream.inner.written_buf,
2388 b"A0001 APPEND \"INBOX\" (\\Seen) {9}\r\nfoobarbaz\r\n".to_vec()
2389 );
2390 let exists_response = session.unsolicited_responses.recv().await.unwrap();
2391 assert_eq!(exists_response, UnsolicitedResponse::Exists(3));
2392 let recent_response = session.unsolicited_responses.recv().await.unwrap();
2393 assert_eq!(recent_response, UnsolicitedResponse::Recent(2));
2394 }
2395
2396 {
2397 let response =
2399 b"A0001 NO [TRYCREATE] Mailbox doesn't exist: foobar (0.001 + 0.000 secs)."
2400 .to_vec();
2401 let mock_stream = MockStream::new(response);
2402 let mut session = mock_session!(mock_stream);
2403 session
2404 .append("foobar", None, None, "foobarbaz")
2405 .await
2406 .unwrap_err();
2407 assert_eq!(
2408 session.stream.inner.written_buf,
2409 b"A0001 APPEND \"foobar\" {9}\r\n".to_vec()
2410 );
2411 }
2412 }
2413
2414 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2415 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2416 async fn get_metadata() {
2417 {
2418 let response = b"* METADATA \"INBOX\" (/private/comment \"My own comment\")\r\n\
2419 A0001 OK GETMETADATA complete\r\n"
2420 .to_vec();
2421
2422 let mock_stream = MockStream::new(response);
2423 let mut session = mock_session!(mock_stream);
2424 let metadata = session
2425 .get_metadata("INBOX", "", "/private/comment")
2426 .await
2427 .unwrap();
2428 assert_eq!(
2429 session.stream.inner.written_buf,
2430 b"A0001 GETMETADATA \"INBOX\" /private/comment\r\n".to_vec()
2431 );
2432 assert_eq!(metadata.len(), 1);
2433 assert_eq!(metadata[0].entry, "/private/comment");
2434 assert_eq!(metadata[0].value.as_ref().unwrap(), "My own comment");
2435 }
2436
2437 {
2438 let response = b"* METADATA \"INBOX\" (/shared/comment \"Shared comment\" /private/comment \"My own comment\")\r\n\
2439 A0001 OK GETMETADATA complete\r\n"
2440 .to_vec();
2441
2442 let mock_stream = MockStream::new(response);
2443 let mut session = mock_session!(mock_stream);
2444 let metadata = session
2445 .get_metadata("INBOX", "", "(/shared/comment /private/comment)")
2446 .await
2447 .unwrap();
2448 assert_eq!(
2449 session.stream.inner.written_buf,
2450 b"A0001 GETMETADATA \"INBOX\" (/shared/comment /private/comment)\r\n".to_vec()
2451 );
2452 assert_eq!(metadata.len(), 2);
2453 assert_eq!(metadata[0].entry, "/shared/comment");
2454 assert_eq!(metadata[0].value.as_ref().unwrap(), "Shared comment");
2455 assert_eq!(metadata[1].entry, "/private/comment");
2456 assert_eq!(metadata[1].value.as_ref().unwrap(), "My own comment");
2457 }
2458
2459 {
2460 let response = b"* METADATA \"\" (/shared/comment {15}\r\nChatmail server /shared/admin {28}\r\nmailto:root@nine.testrun.org)\r\n\
2461 A0001 OK OK Getmetadata completed (0.001 + 0.000 secs).\r\n"
2462 .to_vec();
2463
2464 let mock_stream = MockStream::new(response);
2465 let mut session = mock_session!(mock_stream);
2466 let metadata = session
2467 .get_metadata("", "", "(/shared/comment /shared/admin)")
2468 .await
2469 .unwrap();
2470 assert_eq!(
2471 session.stream.inner.written_buf,
2472 b"A0001 GETMETADATA \"\" (/shared/comment /shared/admin)\r\n".to_vec()
2473 );
2474 assert_eq!(metadata.len(), 2);
2475 assert_eq!(metadata[0].entry, "/shared/comment");
2476 assert_eq!(metadata[0].value.as_ref().unwrap(), "Chatmail server");
2477 assert_eq!(metadata[1].entry, "/shared/admin");
2478 assert_eq!(
2479 metadata[1].value.as_ref().unwrap(),
2480 "mailto:root@nine.testrun.org"
2481 );
2482 }
2483
2484 {
2485 let response = b"* METADATA \"\" (/shared/comment \"Chatmail server\")\r\n\
2486 * METADATA \"\" (/shared/admin \"mailto:root@nine.testrun.org\")\r\n\
2487 A0001 OK OK Getmetadata completed (0.001 + 0.000 secs).\r\n"
2488 .to_vec();
2489
2490 let mock_stream = MockStream::new(response);
2491 let mut session = mock_session!(mock_stream);
2492 let metadata = session
2493 .get_metadata("", "", "(/shared/comment /shared/admin)")
2494 .await
2495 .unwrap();
2496 assert_eq!(
2497 session.stream.inner.written_buf,
2498 b"A0001 GETMETADATA \"\" (/shared/comment /shared/admin)\r\n".to_vec()
2499 );
2500 assert_eq!(metadata.len(), 2);
2501 assert_eq!(metadata[0].entry, "/shared/comment");
2502 assert_eq!(metadata[0].value.as_ref().unwrap(), "Chatmail server");
2503 assert_eq!(metadata[1].entry, "/shared/admin");
2504 assert_eq!(
2505 metadata[1].value.as_ref().unwrap(),
2506 "mailto:root@nine.testrun.org"
2507 );
2508 }
2509
2510 {
2511 let response = b"* METADATA \"\" (/shared/comment NIL /shared/admin NIL)\r\n\
2512 A0001 OK OK Getmetadata completed (0.001 + 0.000 secs).\r\n"
2513 .to_vec();
2514
2515 let mock_stream = MockStream::new(response);
2516 let mut session = mock_session!(mock_stream);
2517 let metadata = session
2518 .get_metadata("", "", "(/shared/comment /shared/admin)")
2519 .await
2520 .unwrap();
2521 assert_eq!(
2522 session.stream.inner.written_buf,
2523 b"A0001 GETMETADATA \"\" (/shared/comment /shared/admin)\r\n".to_vec()
2524 );
2525 assert_eq!(metadata.len(), 2);
2526 assert_eq!(metadata[0].entry, "/shared/comment");
2527 assert_eq!(metadata[0].value, None);
2528 assert_eq!(metadata[1].entry, "/shared/admin");
2529 assert_eq!(metadata[1].value, None);
2530 }
2531 }
2532
2533 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2534 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2535 async fn test_get_quota_root() {
2536 {
2537 let response = b"* QUOTAROOT Sent Userquota\r\n\
2538 * QUOTA Userquota (STORAGE 4855 48576)\r\n\
2539 A0001 OK Getquotaroot completed (0.004 + 0.000 + 0.004 secs).\r\n"
2540 .to_vec();
2541
2542 let mock_stream = MockStream::new(response);
2543 let mut session = mock_session!(mock_stream);
2544 let (quotaroots, quota) = dbg!(session.get_quota_root("Sent").await.unwrap());
2545 assert_eq!(
2546 str::from_utf8(&session.stream.inner.written_buf).unwrap(),
2547 "A0001 GETQUOTAROOT \"Sent\"\r\n"
2548 );
2549 assert_eq!(
2550 quotaroots,
2551 vec![QuotaRoot {
2552 mailbox_name: "Sent".to_string(),
2553 quota_root_names: vec!["Userquota".to_string(),],
2554 },],
2555 );
2556 assert_eq!(
2557 quota,
2558 vec![Quota {
2559 root_name: "Userquota".to_string(),
2560 resources: vec![QuotaResource {
2561 name: QuotaResourceName::Storage,
2562 usage: 4855,
2563 limit: 48576,
2564 }],
2565 }]
2566 );
2567 assert_eq!(quota[0].resources[0].get_usage_percentage(), 9);
2568 }
2569
2570 {
2571 let response = b"* QUOTAROOT \"INBOX\" \"#19\"\r\n\
2572 * QUOTA \"#19\" (STORAGE 0 0)\r\n\
2573 A0001 OK GETQUOTAROOT successful.\r\n"
2574 .to_vec();
2575
2576 let mock_stream = MockStream::new(response);
2577 let mut session = mock_session!(mock_stream);
2578 let (quotaroots, quota) = session.get_quota_root("INBOX").await.unwrap();
2579 assert_eq!(
2580 str::from_utf8(&session.stream.inner.written_buf).unwrap(),
2581 "A0001 GETQUOTAROOT \"INBOX\"\r\n"
2582 );
2583 assert_eq!(
2584 quotaroots,
2585 vec![QuotaRoot {
2586 mailbox_name: "INBOX".to_string(),
2587 quota_root_names: vec!["#19".to_string(),],
2588 },],
2589 );
2590 assert_eq!(
2591 quota,
2592 vec![Quota {
2593 root_name: "#19".to_string(),
2594 resources: vec![QuotaResource {
2595 name: QuotaResourceName::Storage,
2596 usage: 0,
2597 limit: 0,
2598 }],
2599 }]
2600 );
2601 assert_eq!(quota[0].resources[0].get_usage_percentage(), 0);
2602 }
2603 }
2604
2605 #[cfg_attr(feature = "runtime-tokio", tokio::test)]
2606 #[cfg_attr(feature = "runtime-async-std", async_std::test)]
2607 async fn test_parsing_error() {
2608 let response = b"220 mail.example.org ESMTP Postcow\r\n".to_vec();
2610 let command = "A0001 NOOP\r\n";
2611 let mock_stream = MockStream::new(response);
2612 let mut session = mock_session!(mock_stream);
2613 assert!(session
2614 .noop()
2615 .await
2616 .unwrap_err()
2617 .to_string()
2618 .contains("220 mail.example.org ESMTP Postcow"));
2619 assert!(
2620 session.stream.inner.written_buf == command.as_bytes().to_vec(),
2621 "Invalid NOOP command"
2622 );
2623 }
2624}