1use crate::config::ImapConfig;
4use crate::handler::HandlerContext;
5use crate::mailbox_watcher::{MailboxChanges, MailboxWatcher};
6use crate::parser::{has_literal, parse_append_command, parse_command, LiteralType};
7use crate::response::ImapResponse;
8use crate::session::{ImapSession, ImapState};
9use std::time::Duration;
10use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
11use tokio::net::{TcpListener, TcpStream};
12
13pub struct ImapServer {
15 bind_addr: String,
16 context: HandlerContext,
17 idle_timeout: Duration,
18}
19
20impl ImapServer {
21 pub fn new(bind_addr: impl Into<String>, context: HandlerContext) -> Self {
23 Self::new_with_timeout(bind_addr, context, Duration::from_secs(1800))
24 }
25
26 pub fn new_with_timeout(
28 bind_addr: impl Into<String>,
29 context: HandlerContext,
30 idle_timeout: Duration,
31 ) -> Self {
32 Self {
33 bind_addr: bind_addr.into(),
34 context,
35 idle_timeout,
36 }
37 }
38
39 pub fn from_config(config: ImapConfig, context: HandlerContext) -> Self {
41 Self {
42 bind_addr: config.bind_addr(),
43 context,
44 idle_timeout: config.idle_timeout,
45 }
46 }
47
48 pub async fn run(self) -> anyhow::Result<()> {
50 let listener = TcpListener::bind(&self.bind_addr).await?;
51 tracing::info!("IMAP server listening on {}", self.bind_addr);
52
53 let context = std::sync::Arc::new(self.context);
54 let idle_timeout = self.idle_timeout;
55
56 loop {
57 let (stream, remote_addr) = listener.accept().await?;
58 tracing::info!("New IMAP connection from {}", remote_addr);
59
60 let ctx = context.clone();
61 tokio::spawn(async move {
62 if let Err(e) = handle_connection(stream, ctx, idle_timeout).await {
63 tracing::error!("IMAP session error: {}", e);
64 }
65 });
66 }
67 }
68}
69
70async fn handle_connection(
71 stream: TcpStream,
72 ctx: std::sync::Arc<HandlerContext>,
73 idle_timeout: Duration,
74) -> anyhow::Result<()> {
75 let (read_half, write_half) = tokio::io::split(stream);
76 let mut reader = BufReader::new(read_half);
77 let mut writer = BufWriter::new(write_half);
78
79 let mut session = ImapSession::new_with_timeout(idle_timeout);
80
81 let greeting = ImapResponse::new(None, "OK", "RusMES IMAP Server ready");
83 writer.write_all(greeting.format().as_bytes()).await?;
84 writer.flush().await?;
85
86 let mut line = String::new();
87
88 loop {
89 line.clear();
90
91 let n = match tokio::time::timeout(idle_timeout, reader.read_line(&mut line)).await {
93 Ok(Ok(n)) => n,
94 Ok(Err(e)) => {
95 tracing::error!("Read error: {}", e);
96 return Err(e.into());
97 }
98 Err(_) => {
99 tracing::info!(
101 "Idle timeout for connection - auto-logout after {} seconds",
102 idle_timeout.as_secs()
103 );
104 let bye_msg = "* BYE Autologout; idle too long\r\n";
105 let _ = writer.write_all(bye_msg.as_bytes()).await;
106 let _ = writer.flush().await;
107 break;
108 }
109 };
110
111 if n == 0 {
112 break; }
114
115 let line_trimmed = line.trim();
116 tracing::debug!("IMAP command: {}", line_trimmed);
117
118 let (tag, command) = if let Some((size, literal_type)) = has_literal(line_trimmed) {
120 match handle_literal_command(line_trimmed, size, literal_type, &mut reader, &mut writer)
122 .await
123 {
124 Ok(cmd) => cmd,
125 Err(e) => {
126 let response = ImapResponse::bad("*", format!("Literal error: {}", e));
127 writer.write_all(response.format().as_bytes()).await?;
128 writer.flush().await?;
129 continue;
130 }
131 }
132 } else {
133 match parse_command(line_trimmed) {
135 Ok(cmd) => cmd,
136 Err(e) => {
137 let response = ImapResponse::bad("*", format!("Parse error: {}", e));
138 writer.write_all(response.format().as_bytes()).await?;
139 writer.flush().await?;
140 continue;
141 }
142 }
143 };
144
145 let response = crate::handler::handle_command(&ctx, &mut session, &tag, command).await?;
147
148 writer.write_all(response.format().as_bytes()).await?;
149 writer.flush().await?;
150
151 if matches!(session.state(), ImapState::Idle { .. }) {
153 if let Err(e) = handle_idle_mode(&ctx, &mut session, &mut reader, &mut writer).await {
155 tracing::error!("IDLE mode error: {}", e);
156 if let Some(mailbox_id) = session.mailbox_id() {
158 session.state = ImapState::Selected {
159 mailbox_id: *mailbox_id,
160 };
161 }
162 }
163 }
164
165 if matches!(session.state(), ImapState::Logout) {
167 break;
168 }
169 }
170
171 Ok(())
172}
173
174async fn handle_idle_mode<R, W>(
176 ctx: &HandlerContext,
177 session: &mut ImapSession,
178 reader: &mut BufReader<R>,
179 writer: &mut BufWriter<W>,
180) -> anyhow::Result<()>
181where
182 R: tokio::io::AsyncRead + Unpin,
183 W: tokio::io::AsyncWrite + Unpin,
184{
185 let mailbox_id = match session.state() {
186 ImapState::Idle { mailbox_id } => *mailbox_id,
187 _ => return Ok(()), };
189
190 let tag = session.tag.clone().unwrap_or_else(|| "A001".to_string());
191 let watcher = MailboxWatcher::new(ctx.metadata_store.clone());
192
193 let mut last_state = if let Some(ref snapshot) = session.mailbox_snapshot {
195 MailboxChanges::new(snapshot.exists, snapshot.recent)
196 } else {
197 watcher.get_mailbox_state(&mailbox_id).await?
198 };
199
200 let idle_timeout = Duration::from_secs(29 * 60);
202 let check_interval = Duration::from_secs(5); let mut interval = tokio::time::interval(check_interval);
205 let idle_deadline = tokio::time::Instant::now() + idle_timeout;
206
207 loop {
208 let mut line = String::new();
209
210 tokio::select! {
211 result = reader.read_line(&mut line) => {
213 let n = result?;
214 if n == 0 {
215 break;
217 }
218
219 let line_trimmed = line.trim();
220 if line_trimmed.eq_ignore_ascii_case("DONE") {
221 break;
223 }
224 }
226
227 _ = interval.tick() => {
229 let current_state = watcher.get_mailbox_state(&mailbox_id).await?;
230
231 if current_state.has_changes(&last_state) {
232 if current_state.exists != last_state.exists {
234 let exists_response = format!("* {} EXISTS\r\n", current_state.exists);
235 writer.write_all(exists_response.as_bytes()).await?;
236 }
237
238 if current_state.recent != last_state.recent {
239 let recent_response = format!("* {} RECENT\r\n", current_state.recent);
240 writer.write_all(recent_response.as_bytes()).await?;
241 }
242
243 writer.flush().await?;
244 last_state = current_state;
245 }
246
247 if tokio::time::Instant::now() >= idle_deadline {
249 tracing::debug!("IDLE timeout reached after 29 minutes");
250 break;
251 }
252 }
253 }
254 }
255
256 let completion = ImapResponse::ok(&tag, "IDLE terminated");
258 writer.write_all(completion.format().as_bytes()).await?;
259 writer.flush().await?;
260
261 session.state = ImapState::Selected { mailbox_id };
263 session.tag = None;
264
265 Ok(())
266}
267
268async fn handle_literal_command<R, W>(
274 command_line: &str,
275 literal_size: usize,
276 literal_type: LiteralType,
277 reader: &mut BufReader<R>,
278 writer: &mut BufWriter<W>,
279) -> anyhow::Result<(String, crate::command::ImapCommand)>
280where
281 R: tokio::io::AsyncRead + Unpin,
282 W: tokio::io::AsyncWrite + Unpin,
283{
284 if literal_type == LiteralType::Synchronizing {
286 let continuation = "+ Ready for literal data\r\n";
287 writer.write_all(continuation.as_bytes()).await?;
288 writer.flush().await?;
289 tracing::debug!(
290 "Sent continuation for synchronizing literal of {} bytes",
291 literal_size
292 );
293 } else {
294 tracing::debug!(
295 "Processing non-synchronizing literal (LITERAL+) of {} bytes",
296 literal_size
297 );
298 }
299
300 let mut literal_data = vec![0u8; literal_size];
302 reader.read_exact(&mut literal_data).await?;
303
304 let mut crlf = [0u8; 2];
306 reader.read_exact(&mut crlf).await?;
307
308 parse_append_command(command_line, literal_data)
310 .map_err(|e| anyhow::anyhow!("Failed to parse APPEND command: {}", e))
311}