async_imap/extensions/
idle.rs1use std::fmt;
4use std::pin::Pin;
5use std::time::Duration;
6
7#[cfg(feature = "runtime-async-std")]
8use async_std::{
9 future::timeout,
10 io::{Read, Write},
11};
12use futures::prelude::*;
13use futures::task::{Context, Poll};
14use imap_proto::{RequestId, Response, Status};
15use stop_token::prelude::*;
16#[cfg(feature = "runtime-tokio")]
17use tokio::{
18 io::{AsyncRead as Read, AsyncWrite as Write},
19 time::timeout,
20};
21
22use crate::client::Session;
23use crate::error::Result;
24use crate::parse::handle_unilateral;
25use crate::types::ResponseData;
26
27#[derive(Debug)]
43pub struct Handle<T: Read + Write + Unpin + fmt::Debug> {
44 session: Session<T>,
45 id: Option<RequestId>,
46}
47
48impl<T: Read + Write + Unpin + fmt::Debug> Unpin for Handle<T> {}
49
50impl<T: Read + Write + Unpin + fmt::Debug + Send> Stream for Handle<T> {
51 type Item = std::io::Result<ResponseData>;
52
53 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
54 self.as_mut().session().get_stream().poll_next(cx)
55 }
56}
57
58#[derive(Debug)]
60#[must_use = "futures do nothing unless polled"]
61pub struct IdleStream<'a, St> {
62 stream: &'a mut St,
63}
64
65impl<St: Unpin> Unpin for IdleStream<'_, St> {}
66
67impl<'a, St: Stream + Unpin> IdleStream<'a, St> {
68 unsafe_pinned!(stream: &'a mut St);
69
70 pub(crate) fn new(stream: &'a mut St) -> Self {
71 IdleStream { stream }
72 }
73}
74
75impl<St: futures::stream::FusedStream + Unpin> futures::stream::FusedStream for IdleStream<'_, St> {
76 fn is_terminated(&self) -> bool {
77 self.stream.is_terminated()
78 }
79}
80
81impl<St: Stream + Unpin> Stream for IdleStream<'_, St> {
82 type Item = St::Item;
83
84 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85 self.stream().poll_next(cx)
86 }
87}
88
89#[derive(Debug, PartialEq, Eq)]
91pub enum IdleResponse {
92 ManualInterrupt,
94 Timeout,
96 NewData(ResponseData),
98}
99
100impl<T: Read + Write + Unpin + fmt::Debug> AsMut<T> for Handle<T> {
103 fn as_mut(&mut self) -> &mut T {
104 self.session.conn.stream.as_mut()
105 }
106}
107
108impl<T: Read + Write + Unpin + fmt::Debug + Send> Handle<T> {
109 unsafe_pinned!(session: Session<T>);
110
111 pub(crate) fn new(session: Session<T>) -> Handle<T> {
112 Handle { session, id: None }
113 }
114
115 pub fn wait(
118 &mut self,
119 ) -> (
120 impl Future<Output = Result<IdleResponse>> + '_,
121 stop_token::StopSource,
122 ) {
123 self.wait_with_timeout(Duration::from_secs(24 * 60 * 60))
124 }
125
126 pub fn wait_with_timeout(
133 &mut self,
134 dur: Duration,
135 ) -> (
136 impl Future<Output = Result<IdleResponse>> + '_,
137 stop_token::StopSource,
138 ) {
139 assert!(
140 self.id.is_some(),
141 "Cannot listen to response without starting IDLE"
142 );
143
144 let sender = self.session.unsolicited_responses_tx.clone();
145
146 let interrupt = stop_token::StopSource::new();
147 let raw_stream = IdleStream::new(self);
148 let mut interruptible_stream = raw_stream.timeout_at(interrupt.token());
149
150 let fut = async move {
151 loop {
152 let Ok(res) = timeout(dur, interruptible_stream.next()).await else {
153 return Ok(IdleResponse::Timeout);
154 };
155
156 let Some(Ok(resp)) = res else {
157 return Ok(IdleResponse::ManualInterrupt);
158 };
159
160 let resp = resp?;
161 match resp.parsed() {
162 Response::Data {
163 status: Status::Ok, ..
164 } => {
165 }
167 Response::Continue { .. } => {
168 }
170 Response::Done { .. } => {
171 handle_unilateral(resp, sender.clone());
172 }
173 _ => return Ok(IdleResponse::NewData(resp)),
174 }
175 }
176 };
177
178 (fut, interrupt)
179 }
180
181 pub async fn init(&mut self) -> Result<()> {
183 let id = self.session.run_command("IDLE").await?;
184 self.id = Some(id);
185 while let Some(res) = self.session.stream.next().await {
186 let res = res?;
187 match res.parsed() {
188 Response::Continue { .. } => {
189 return Ok(());
190 }
191 Response::Done {
192 tag,
193 status,
194 information,
195 ..
196 } => {
197 if tag == self.id.as_ref().unwrap() {
198 if let Status::Bad = status {
199 return Err(std::io::Error::new(
200 std::io::ErrorKind::ConnectionRefused,
201 information.as_ref().unwrap().to_string(),
202 )
203 .into());
204 }
205 }
206 handle_unilateral(res, self.session.unsolicited_responses_tx.clone());
207 }
208 _ => {
209 handle_unilateral(res, self.session.unsolicited_responses_tx.clone());
210 }
211 }
212 }
213
214 Err(std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "").into())
215 }
216
217 pub async fn done(mut self) -> Result<Session<T>> {
220 assert!(
221 self.id.is_some(),
222 "Cannot call DONE on a non initialized idle connection"
223 );
224 self.session.run_command_untagged("DONE").await?;
225 let sender = self.session.unsolicited_responses_tx.clone();
226 self.session
227 .check_done_ok(&self.id.expect("invalid setup"), Some(sender))
228 .await?;
229
230 Ok(self.session)
231 }
232}