async_imap_lite/
connection.rs1use std::io;
2use std::task::Poll;
3
4cfg_if::cfg_if! {
5 if #[cfg(unix)] {
6 pub(crate) use std::os::unix::io::AsRawFd as AsRawFdOrSocket;
7 } else if #[cfg(windows)] {
8 pub(crate) use std::os::windows::io::AsRawSocket as AsRawFdOrSocket;
9 } else {
10 compile_error!("async-imap-lite does not support this target OS");
11 }
12}
13
14pub(crate) use async_io::Async;
15use async_stream_packed::SyncableWithWakerAsyncStream;
16pub(crate) use async_stream_packed::{TlsClientUpgrader, UpgradableAsyncStream, UpgraderExtRefer};
17use futures_lite::future;
18pub(crate) use futures_lite::{AsyncRead, AsyncWrite};
19
20use crate::imap::{Connection, Error, Result};
21use crate::util::optimistic;
22
23pub struct AsyncConnection<AS, ASTU>
24where
25 AS: AsRawFdOrSocket,
26 Async<AS>: AsyncRead + AsyncWrite,
27 ASTU: TlsClientUpgrader<Async<AS>> + Unpin,
28 ASTU::Output: AsyncRead + AsyncWrite + Unpin,
29{
30 connection: Connection<SyncableWithWakerAsyncStream<UpgradableAsyncStream<Async<AS>, ASTU>>>,
31}
32
33impl<AS, ASTU> AsyncConnection<AS, ASTU>
34where
35 AS: AsRawFdOrSocket,
36 Async<AS>: AsyncRead + AsyncWrite + Unpin,
37 ASTU: TlsClientUpgrader<Async<AS>> + UpgraderExtRefer<Async<AS>> + Unpin,
38 ASTU::Output: AsyncRead + AsyncWrite + Unpin,
39{
40 pub async fn new(
41 stream: UpgradableAsyncStream<Async<AS>, ASTU>,
42 debug: bool,
43 greeting_read: bool,
44 ) -> Self {
45 let mut stream = Some(stream);
46 let connection = future::poll_fn(|cx| {
47 let stream =
48 SyncableWithWakerAsyncStream::new(stream.take().expect("never"), cx.waker());
49 let connection = Connection::new(stream, debug, greeting_read);
50 Poll::Ready(connection)
51 })
52 .await;
53
54 Self { connection }
55 }
56
57 pub async fn read_greeting(&mut self) -> Result<Vec<u8>> {
59 assert!(
60 !self.connection.greeting_read,
61 "Greeting can only be read once"
62 );
63
64 let mut v = Vec::new();
65 self.readline(&mut v).await?;
66 self.connection.greeting_read = true;
67
68 Ok(v)
69 }
70
71 pub async fn run_command_and_check_ok<S: AsRef<str>>(&mut self, command: S) -> Result<()> {
73 self.run_command_and_read_response(command)
74 .await
75 .map(|_| ())
76 }
77
78 pub async fn run_command_and_read_response<S: AsRef<str>>(
80 &mut self,
81 untagged_command: S,
82 ) -> Result<Vec<u8>> {
83 self.run_command(untagged_command.as_ref()).await?;
84
85 let mut v = Vec::new();
86 self.read_response_onto(&mut v).await?;
87
88 Ok(v)
89 }
90
91 async fn readline(&mut self, into: &mut Vec<u8>) -> Result<usize> {
93 loop {
94 match self.connection.readline(into) {
95 Err(Error::Io(err)) if err.kind() == io::ErrorKind::WouldBlock => {}
96 ret => break ret,
97 }
98 optimistic(
99 self.connection
100 .get_mut()
101 .get_mut()
102 .get_mut()
103 .get_mut()
104 .readable(),
105 )
106 .await?;
107 }
108 }
109
110 async fn run_command(&mut self, untagged_command: &str) -> Result<()> {
112 loop {
113 match self.connection.run_command(untagged_command) {
114 Err(Error::Io(err)) if err.kind() == io::ErrorKind::WouldBlock => {}
115 ret => break ret,
116 }
117 optimistic(
118 self.connection
119 .get_mut()
120 .get_mut()
121 .get_mut()
122 .get_mut()
123 .writable(),
124 )
125 .await?;
126 }
127 }
128
129 async fn read_response_onto(&mut self, data: &mut Vec<u8>) -> Result<()> {
131 loop {
132 match self.connection.read_response_onto(data) {
133 Err(Error::Io(err)) if err.kind() == io::ErrorKind::WouldBlock => {}
134 ret => break ret,
135 }
136 optimistic(
137 self.connection
138 .get_mut()
139 .get_mut()
140 .get_mut()
141 .get_mut()
142 .readable(),
143 )
144 .await?;
145 }
146 }
147}