Skip to main content

rusmes_imap/
server.rs

1//! IMAP server implementation
2
3use crate::config::ImapConfig;
4use crate::handler::HandlerContext;
5use crate::mailbox_watcher::{MailboxChanges, MailboxWatcher};
6use crate::parser::{has_literal, parse_append_command, parse_command, LiteralType};
7use crate::response::ImapResponse;
8use crate::session::{ImapSession, ImapState};
9use std::time::Duration;
10use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
11use tokio::net::{TcpListener, TcpStream};
12
13/// IMAP server
14pub struct ImapServer {
15    bind_addr: String,
16    context: HandlerContext,
17    idle_timeout: Duration,
18}
19
20impl ImapServer {
21    /// Create a new IMAP server with default idle timeout (30 minutes)
22    pub fn new(bind_addr: impl Into<String>, context: HandlerContext) -> Self {
23        Self::new_with_timeout(bind_addr, context, Duration::from_secs(1800))
24    }
25
26    /// Create a new IMAP server with custom idle timeout
27    pub fn new_with_timeout(
28        bind_addr: impl Into<String>,
29        context: HandlerContext,
30        idle_timeout: Duration,
31    ) -> Self {
32        Self {
33            bind_addr: bind_addr.into(),
34            context,
35            idle_timeout,
36        }
37    }
38
39    /// Create a new IMAP server from configuration
40    pub fn from_config(config: ImapConfig, context: HandlerContext) -> Self {
41        Self {
42            bind_addr: config.bind_addr(),
43            context,
44            idle_timeout: config.idle_timeout,
45        }
46    }
47
48    /// Run the IMAP server
49    pub async fn run(self) -> anyhow::Result<()> {
50        let listener = TcpListener::bind(&self.bind_addr).await?;
51        tracing::info!("IMAP server listening on {}", self.bind_addr);
52
53        let context = std::sync::Arc::new(self.context);
54        let idle_timeout = self.idle_timeout;
55
56        loop {
57            let (stream, remote_addr) = listener.accept().await?;
58            tracing::info!("New IMAP connection from {}", remote_addr);
59
60            let ctx = context.clone();
61            tokio::spawn(async move {
62                if let Err(e) = handle_connection(stream, ctx, idle_timeout).await {
63                    tracing::error!("IMAP session error: {}", e);
64                }
65            });
66        }
67    }
68}
69
70async fn handle_connection(
71    stream: TcpStream,
72    ctx: std::sync::Arc<HandlerContext>,
73    idle_timeout: Duration,
74) -> anyhow::Result<()> {
75    let (read_half, write_half) = tokio::io::split(stream);
76    let mut reader = BufReader::new(read_half);
77    let mut writer = BufWriter::new(write_half);
78
79    let mut session = ImapSession::new_with_timeout(idle_timeout);
80
81    // Send greeting
82    let greeting = ImapResponse::new(None, "OK", "RusMES IMAP Server ready");
83    writer.write_all(greeting.format().as_bytes()).await?;
84    writer.flush().await?;
85
86    let mut line = String::new();
87
88    loop {
89        line.clear();
90
91        // Read command with idle timeout
92        let n = match tokio::time::timeout(idle_timeout, reader.read_line(&mut line)).await {
93            Ok(Ok(n)) => n,
94            Ok(Err(e)) => {
95                tracing::error!("Read error: {}", e);
96                return Err(e.into());
97            }
98            Err(_) => {
99                // Idle timeout - auto-logout
100                tracing::info!(
101                    "Idle timeout for connection - auto-logout after {} seconds",
102                    idle_timeout.as_secs()
103                );
104                let bye_msg = "* BYE Autologout; idle too long\r\n";
105                let _ = writer.write_all(bye_msg.as_bytes()).await;
106                let _ = writer.flush().await;
107                break;
108            }
109        };
110
111        if n == 0 {
112            break; // EOF
113        }
114
115        let line_trimmed = line.trim();
116        tracing::debug!("IMAP command: {}", line_trimmed);
117
118        // Check for literal in command (APPEND, etc.)
119        let (tag, command) = if let Some((size, literal_type)) = has_literal(line_trimmed) {
120            // Handle literal data
121            match handle_literal_command(line_trimmed, size, literal_type, &mut reader, &mut writer)
122                .await
123            {
124                Ok(cmd) => cmd,
125                Err(e) => {
126                    let response = ImapResponse::bad("*", format!("Literal error: {}", e));
127                    writer.write_all(response.format().as_bytes()).await?;
128                    writer.flush().await?;
129                    continue;
130                }
131            }
132        } else {
133            // Parse regular command
134            match parse_command(line_trimmed) {
135                Ok(cmd) => cmd,
136                Err(e) => {
137                    let response = ImapResponse::bad("*", format!("Parse error: {}", e));
138                    writer.write_all(response.format().as_bytes()).await?;
139                    writer.flush().await?;
140                    continue;
141                }
142            }
143        };
144
145        // Handle command
146        let response = crate::handler::handle_command(&ctx, &mut session, &tag, command).await?;
147
148        writer.write_all(response.format().as_bytes()).await?;
149        writer.flush().await?;
150
151        // Check if we entered IDLE mode
152        if matches!(session.state(), ImapState::Idle { .. }) {
153            // Enter IDLE loop
154            if let Err(e) = handle_idle_mode(&ctx, &mut session, &mut reader, &mut writer).await {
155                tracing::error!("IDLE mode error: {}", e);
156                // Exit IDLE and continue
157                if let Some(mailbox_id) = session.mailbox_id() {
158                    session.state = ImapState::Selected {
159                        mailbox_id: *mailbox_id,
160                    };
161                }
162            }
163        }
164
165        // Check for logout
166        if matches!(session.state(), ImapState::Logout) {
167            break;
168        }
169    }
170
171    Ok(())
172}
173
174/// Handle IDLE mode - wait for DONE or mailbox changes
175async fn handle_idle_mode<R, W>(
176    ctx: &HandlerContext,
177    session: &mut ImapSession,
178    reader: &mut BufReader<R>,
179    writer: &mut BufWriter<W>,
180) -> anyhow::Result<()>
181where
182    R: tokio::io::AsyncRead + Unpin,
183    W: tokio::io::AsyncWrite + Unpin,
184{
185    let mailbox_id = match session.state() {
186        ImapState::Idle { mailbox_id } => *mailbox_id,
187        _ => return Ok(()), // Not in IDLE state
188    };
189
190    let tag = session.tag.clone().unwrap_or_else(|| "A001".to_string());
191    let watcher = MailboxWatcher::new(ctx.metadata_store.clone());
192
193    // Get initial snapshot
194    let mut last_state = if let Some(ref snapshot) = session.mailbox_snapshot {
195        MailboxChanges::new(snapshot.exists, snapshot.recent)
196    } else {
197        watcher.get_mailbox_state(&mailbox_id).await?
198    };
199
200    // IDLE loop with 29-minute timeout (RFC 2177)
201    let idle_timeout = Duration::from_secs(29 * 60);
202    let check_interval = Duration::from_secs(5); // Check for changes every 5 seconds
203
204    let mut interval = tokio::time::interval(check_interval);
205    let idle_deadline = tokio::time::Instant::now() + idle_timeout;
206
207    loop {
208        let mut line = String::new();
209
210        tokio::select! {
211            // Check for DONE command
212            result = reader.read_line(&mut line) => {
213                let n = result?;
214                if n == 0 {
215                    // Connection closed
216                    break;
217                }
218
219                let line_trimmed = line.trim();
220                if line_trimmed.eq_ignore_ascii_case("DONE") {
221                    // Exit IDLE mode
222                    break;
223                }
224                // Ignore other input during IDLE
225            }
226
227            // Check for mailbox changes periodically
228            _ = interval.tick() => {
229                let current_state = watcher.get_mailbox_state(&mailbox_id).await?;
230
231                if current_state.has_changes(&last_state) {
232                    // Send untagged responses for changes
233                    if current_state.exists != last_state.exists {
234                        let exists_response = format!("* {} EXISTS\r\n", current_state.exists);
235                        writer.write_all(exists_response.as_bytes()).await?;
236                    }
237
238                    if current_state.recent != last_state.recent {
239                        let recent_response = format!("* {} RECENT\r\n", current_state.recent);
240                        writer.write_all(recent_response.as_bytes()).await?;
241                    }
242
243                    writer.flush().await?;
244                    last_state = current_state;
245                }
246
247                // Check if we've exceeded the timeout
248                if tokio::time::Instant::now() >= idle_deadline {
249                    tracing::debug!("IDLE timeout reached after 29 minutes");
250                    break;
251                }
252            }
253        }
254    }
255
256    // Send completion response
257    let completion = ImapResponse::ok(&tag, "IDLE terminated");
258    writer.write_all(completion.format().as_bytes()).await?;
259    writer.flush().await?;
260
261    // Return to Selected state
262    session.state = ImapState::Selected { mailbox_id };
263    session.tag = None;
264
265    Ok(())
266}
267
268/// Handle a command with literal data
269///
270/// This function implements RFC 7888 LITERAL+ support:
271/// - Synchronizing literals {size}: Server sends "+ Ready for literal data" continuation
272/// - Non-synchronizing literals {size+}: Server accepts data immediately without continuation
273async fn handle_literal_command<R, W>(
274    command_line: &str,
275    literal_size: usize,
276    literal_type: LiteralType,
277    reader: &mut BufReader<R>,
278    writer: &mut BufWriter<W>,
279) -> anyhow::Result<(String, crate::command::ImapCommand)>
280where
281    R: tokio::io::AsyncRead + Unpin,
282    W: tokio::io::AsyncWrite + Unpin,
283{
284    // For synchronizing literals, send continuation response
285    if literal_type == LiteralType::Synchronizing {
286        let continuation = "+ Ready for literal data\r\n";
287        writer.write_all(continuation.as_bytes()).await?;
288        writer.flush().await?;
289        tracing::debug!(
290            "Sent continuation for synchronizing literal of {} bytes",
291            literal_size
292        );
293    } else {
294        tracing::debug!(
295            "Processing non-synchronizing literal (LITERAL+) of {} bytes",
296            literal_size
297        );
298    }
299
300    // Read literal data
301    let mut literal_data = vec![0u8; literal_size];
302    reader.read_exact(&mut literal_data).await?;
303
304    // Read the trailing CRLF after literal
305    let mut crlf = [0u8; 2];
306    reader.read_exact(&mut crlf).await?;
307
308    // Parse the APPEND command with literal data
309    parse_append_command(command_line, literal_data)
310        .map_err(|e| anyhow::anyhow!("Failed to parse APPEND command: {}", e))
311}