use std::{
io,
sync::{Arc, Mutex},
};
use tokio::{
io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
net::{TcpListener, TcpStream},
sync::broadcast,
};
use crate::store::{Store, current_internal_date};
pub(crate) async fn run_http(
listener: TcpListener,
store: Arc<Mutex<Store>>,
mut shutdown_rx: broadcast::Receiver<()>,
) {
loop {
tokio::select! {
accept = listener.accept() => {
let Ok((stream, _)) = accept else { break };
let store = store.clone();
tokio::spawn(async move {
let _ = handle_http(stream, store).await;
});
}
_ = shutdown_rx.recv() => {
break;
}
}
}
}
async fn parse_request(stream: &mut TcpStream) -> io::Result<(String, String, Vec<u8>)> {
let mut reader = BufReader::new(stream);
let mut first_line = String::new();
reader.read_line(&mut first_line).await?;
let parts: Vec<&str> = first_line.split_whitespace().collect();
if parts.len() < 2 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid request line",
));
}
let method = parts[0].to_string();
let path = parts[1].to_string();
let mut content_length = 0usize;
loop {
let mut header = String::new();
reader.read_line(&mut header).await?;
let header = header.trim();
if header.is_empty() {
break;
}
if let Some((key, value)) = header.split_once(':') {
if key.trim().eq_ignore_ascii_case("Content-Length") {
content_length = value.trim().parse().unwrap_or(0);
}
}
}
let mut body = vec![0u8; content_length];
if content_length > 0 {
reader.read_exact(&mut body).await?;
}
Ok((method, path, body))
}
async fn send_response(stream: &mut TcpStream, status: u16, body: &str) -> io::Result<()> {
let status_text = match status {
200 => "OK",
201 => "Created",
400 => "Bad Request",
404 => "Not Found",
405 => "Method Not Allowed",
_ => "Unknown",
};
let response = format!(
"HTTP/1.1 {} {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
status,
status_text,
body.len(),
body
);
stream.write_all(response.as_bytes()).await
}
async fn handle_http(mut stream: TcpStream, store: Arc<Mutex<Store>>) -> io::Result<()> {
let (method, path, body) = parse_request(&mut stream).await?;
let (path, query) = path.split_once('?').unwrap_or((&path, ""));
let query_params = parse_query_string(query);
match (method.as_str(), path) {
("POST", "/reset") => handle_reset(&mut stream, &store).await,
("POST", "/inject") => handle_inject(&mut stream, &store, &body).await,
("GET", "/users") => handle_list_users(&mut stream, &store).await,
("GET", "/messages") => {
let user = query_params.get("user").cloned().unwrap_or_default();
let mailbox = query_params
.get("mailbox")
.cloned()
.unwrap_or_else(|| "INBOX".to_string());
handle_list_messages(&mut stream, &store, &user, &mailbox).await
}
("DELETE", "/messages") => {
let user = query_params.get("user").cloned().unwrap_or_default();
let mailbox = query_params
.get("mailbox")
.cloned()
.unwrap_or_else(|| "INBOX".to_string());
let uid: u32 = query_params
.get("uid")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
handle_delete_message(&mut stream, &store, &user, &mailbox, uid).await
}
(_, "/reset" | "/inject" | "/users" | "/messages") => {
send_response(&mut stream, 405, r#"{"error": "Method not allowed"}"#).await
}
_ => send_response(&mut stream, 404, r#"{"error": "Not found"}"#).await,
}
}
fn parse_query_string(query: &str) -> std::collections::HashMap<String, String> {
let mut params = std::collections::HashMap::new();
for pair in query.split('&') {
if let Some((key, value)) = pair.split_once('=') {
let value = value.replace("%40", "@").replace("+", " ");
params.insert(key.to_string(), value);
}
}
params
}
async fn handle_reset(stream: &mut TcpStream, store: &Arc<Mutex<Store>>) -> io::Result<()> {
{
let mut guard = store.lock().expect("store lock poisoned");
guard.reset();
}
send_response(stream, 200, r#"{"status": "ok"}"#).await
}
async fn handle_inject(
stream: &mut TcpStream,
store: &Arc<Mutex<Store>>,
body: &[u8],
) -> io::Result<()> {
let body_str = String::from_utf8_lossy(body);
let user = extract_json_string(&body_str, "user").unwrap_or_default();
let mailbox = extract_json_string(&body_str, "mailbox").unwrap_or_else(|| "INBOX".to_string());
let raw = extract_json_string(&body_str, "raw").unwrap_or_default();
if user.is_empty() || raw.is_empty() {
return send_response(stream, 400, r#"{"error": "Missing user or raw message"}"#).await;
}
let uid = {
let mut guard = store.lock().expect("store lock poisoned");
guard.append(&user, &mailbox, raw.into_bytes(), current_internal_date());
let messages = guard.list(&user, &mailbox);
drop(guard);
messages.last().map(|m| m.uid).unwrap_or(0)
};
send_response(stream, 201, &format!(r#"{{"uid": {}}}"#, uid)).await
}
async fn handle_list_users(stream: &mut TcpStream, store: &Arc<Mutex<Store>>) -> io::Result<()> {
let users = {
let guard = store.lock().expect("store lock poisoned");
guard.list_users()
};
let users_json: Vec<String> = users.iter().map(|u| format!("\"{}\"", u)).collect();
let response = format!(r#"{{"users": [{}]}}"#, users_json.join(", "));
send_response(stream, 200, &response).await
}
async fn handle_list_messages(
stream: &mut TcpStream,
store: &Arc<Mutex<Store>>,
user: &str,
mailbox: &str,
) -> io::Result<()> {
if user.is_empty() {
return send_response(stream, 400, r#"{"error": "Missing user parameter"}"#).await;
}
let messages = {
let mut guard = store.lock().expect("store lock poisoned");
guard.list(user, mailbox)
};
let messages_json: Vec<String> = messages
.iter()
.map(|m| {
let subject = crate::imap::fetch::header_value(&m.data, "Subject")
.unwrap_or_default()
.replace('"', "\\\"");
let from = crate::imap::fetch::header_value(&m.data, "From")
.unwrap_or_default()
.replace('"', "\\\"");
format!(
r#"{{"uid": {}, "size": {}, "subject": "{}", "from": "{}"}}"#,
m.uid,
m.data.len(),
subject,
from
)
})
.collect();
let response = format!(r#"{{"messages": [{}]}}"#, messages_json.join(", "));
send_response(stream, 200, &response).await
}
async fn handle_delete_message(
stream: &mut TcpStream,
store: &Arc<Mutex<Store>>,
user: &str,
mailbox: &str,
uid: u32,
) -> io::Result<()> {
if user.is_empty() || uid == 0 {
return send_response(stream, 400, r#"{"error": "Missing user or uid parameter"}"#).await;
}
{
let mut guard = store.lock().expect("store lock poisoned");
guard.delete_by_uid(user, mailbox, uid);
}
send_response(stream, 200, r#"{"status": "ok"}"#).await
}
fn extract_json_string(json: &str, key: &str) -> Option<String> {
let pattern = format!("\"{}\"", key);
let start = json.find(&pattern)?;
let after_key = &json[start + pattern.len()..];
let after_colon = after_key.trim_start().strip_prefix(':')?;
let after_colon = after_colon.trim_start();
let after_quote = after_colon.strip_prefix('"')?;
let mut end = 0;
let mut escaped = false;
for (i, c) in after_quote.chars().enumerate() {
if escaped {
escaped = false;
continue;
}
if c == '\\' {
escaped = true;
continue;
}
if c == '"' {
end = i;
break;
}
}
Some(after_quote[..end].to_string())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_extract_json_string() {
let json = r#"{"user": "alice", "mailbox": "INBOX"}"#;
assert_eq!(extract_json_string(json, "user"), Some("alice".to_string()));
assert_eq!(
extract_json_string(json, "mailbox"),
Some("INBOX".to_string())
);
assert_eq!(extract_json_string(json, "missing"), None);
}
#[test]
fn test_parse_query_string() {
let params = parse_query_string("user=alice&mailbox=INBOX");
assert_eq!(params.get("user"), Some(&"alice".to_string()));
assert_eq!(params.get("mailbox"), Some(&"INBOX".to_string()));
}
}