use super::{SmtpSession, SmtpSessionHandler, SmtpState};
use crate::response::SmtpResponse;
use std::io::Write as IoWrite;
use std::net::SocketAddr;
use std::sync::Arc;
use tempfile::NamedTempFile;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
pub(super) async fn handle_data_input<R, W>(
session: &mut SmtpSession,
reader: &mut R,
writer: &mut W,
remote_addr: &SocketAddr,
) -> anyhow::Result<()>
where
R: AsyncBufReadExt + Unpin,
W: AsyncWriteExt + Unpin,
{
let threshold = session.config.data_tempfile_threshold;
let max_size = session.config.max_message_size;
let spill_dir = session.config.data_spill_dir.clone();
let mut mem_buf: Vec<u8> = Vec::new();
let mut temp_file: Option<NamedTempFile> = None;
let mut total_bytes: usize = 0;
let mut line = String::new();
loop {
line.clear();
let n = reader.read_line(&mut line).await?;
if n == 0 {
return Err(anyhow::anyhow!("Unexpected EOF during DATA"));
}
if line.trim() == "." {
break;
}
let line_to_add: &str = if line.starts_with("..") {
&line[1..]
} else {
&line
};
let chunk = line_to_add.as_bytes();
let new_total = total_bytes.saturating_add(chunk.len());
if new_total > max_size {
tracing::info!(
peer = %remote_addr,
current_size = new_total,
max_size = max_size,
"smtp.session DATA rejected incrementally: size limit exceeded"
);
rusmes_metrics::global_metrics().inc_smtp_messages_rejected();
SmtpSessionHandler::write_response_to(
writer,
SmtpResponse::storage_exceeded(format!(
"Message size exceeds maximum {}",
max_size
)),
remote_addr,
)
.await?;
session.transaction.reset();
session.state = SmtpState::Authenticated;
return Ok(());
}
total_bytes = new_total;
if temp_file.is_none() && mem_buf.len() + chunk.len() > threshold {
let mut tf = NamedTempFile::new_in(&spill_dir)
.map_err(|e| anyhow::anyhow!("Failed to create tempfile for DATA spill: {}", e))?;
tf.write_all(&mem_buf)
.map_err(|e| anyhow::anyhow!("Failed to flush mem_buf to tempfile: {}", e))?;
mem_buf = Vec::new(); temp_file = Some(tf);
}
match &mut temp_file {
None => mem_buf.extend_from_slice(chunk),
Some(tf) => tf
.write_all(chunk)
.map_err(|e| anyhow::anyhow!("Failed to write DATA chunk to tempfile: {}", e))?,
}
}
let message_size = total_bytes;
if let Some(declared_size) = session.transaction.declared_size {
let max_allowed = declared_size + (declared_size / 10);
if message_size > max_allowed {
tracing::info!(
peer = %remote_addr,
message_size = message_size,
declared_size = declared_size,
"smtp.session DATA rejected: size exceeds declared value"
);
rusmes_metrics::global_metrics().inc_smtp_messages_rejected();
SmtpSessionHandler::write_response_to(
writer,
SmtpResponse::storage_exceeded(format!(
"Message size {} exceeds declared size {}",
message_size, declared_size
)),
remote_addr,
)
.await?;
session.transaction.reset();
session.state = SmtpState::Authenticated;
return Ok(());
}
}
let body_result = build_message_body(mem_buf, temp_file, message_size).await;
let (message_body, cleanup_path) = match body_result {
Ok(pair) => pair,
Err(e) => {
tracing::error!(peer = %remote_addr, "Failed to finalise DATA body: {}", e);
SmtpSessionHandler::write_response_to(
writer,
SmtpResponse::local_error("Internal error finalising message body"),
remote_addr,
)
.await?;
session.transaction.reset();
session.state = SmtpState::Authenticated;
return Ok(());
}
};
rusmes_metrics::global_metrics().inc_smtp_messages_received();
session.transaction.message_size = message_size;
let sender_display = session
.transaction
.sender
.as_ref()
.map(|a| a.to_string())
.unwrap_or_else(|| "<unknown>".to_string());
tracing::info!(
peer = %remote_addr,
mail_from = %sender_display,
message_size = message_size,
recipient_count = session.transaction.recipients.len(),
"smtp.session DATA accepted: message queued"
);
SmtpSessionHandler::write_response_to(
writer,
SmtpResponse::ok("Message accepted for delivery"),
remote_addr,
)
.await?;
let sender = session.transaction.sender.clone();
let recipients = session.transaction.recipients.clone();
let router = session.processor_router.clone();
tracing::info!(
peer = %remote_addr,
recipient_count = recipients.len(),
"smtp.session spawning async message processing task"
);
tokio::spawn(async move {
if let Err(e) =
process_accepted_message(sender, recipients, message_body, router, cleanup_path).await
{
tracing::error!("Failed to process message: {}", e);
}
});
session.transaction.reset();
session.state = SmtpState::Authenticated;
Ok(())
}
async fn build_message_body(
mem_buf: Vec<u8>,
temp_file: Option<NamedTempFile>,
_message_size: usize,
) -> anyhow::Result<(rusmes_proto::MessageBody, Option<std::path::PathBuf>)> {
use bytes::Bytes;
use rusmes_proto::message::{LargeBody, MessageBody};
match temp_file {
None => Ok((MessageBody::Small(Bytes::from(mem_buf)), None)),
Some(mut tf) => {
tf.flush()
.map_err(|e| anyhow::anyhow!("Failed to flush tempfile: {}", e))?;
let temp_path = tf.into_temp_path();
let kept_path = temp_path
.keep()
.map_err(|e| anyhow::anyhow!("Failed to keep tempfile: {}", e))?;
let large = LargeBody::from_path(&kept_path)
.await
.map_err(|e| anyhow::anyhow!("Failed to open kept tempfile as LargeBody: {}", e))?;
Ok((MessageBody::Large(large), Some(kept_path)))
}
}
}
async fn process_accepted_message(
sender: Option<rusmes_proto::MailAddress>,
recipients: Vec<rusmes_proto::MailAddress>,
message_body: rusmes_proto::MessageBody,
router: Arc<rusmes_core::MailProcessorRouter>,
cleanup_path: Option<std::path::PathBuf>,
) -> anyhow::Result<()> {
use bytes::Bytes;
use rusmes_proto::{HeaderMap, Mail, MessageBody, MimeMessage};
tracing::info!(
"Starting message processing for {} recipients",
recipients.len()
);
let raw_bytes: Bytes = match message_body {
MessageBody::Small(b) => b,
MessageBody::Large(ref large) => large
.read_to_bytes()
.await
.map_err(|e| anyhow::anyhow!("Failed to read large body for header parsing: {}", e))?,
};
if let Some(ref path) = cleanup_path {
if let Err(e) = tokio::fs::remove_file(path).await {
tracing::warn!(
path = %path.display(),
error = %e,
"smtp.data Failed to remove DATA spill tempfile after read"
);
} else {
tracing::debug!(
path = %path.display(),
"smtp.data DATA spill tempfile removed after read"
);
}
}
let (headers, body_offset) = rusmes_proto::mime::parse_headers(&raw_bytes)?;
let mut header_map = HeaderMap::new();
for (name, value) in headers {
header_map.insert(name, value);
}
let body_bytes = if body_offset < raw_bytes.len() {
raw_bytes.slice(body_offset..)
} else {
Bytes::new()
};
let message = MimeMessage::new(header_map, MessageBody::Small(body_bytes));
let mail = Mail::new(sender, recipients, message, None, None);
tracing::info!("Processing mail {} through pipeline", mail.id());
router.route(mail).await?;
tracing::info!("Mail processing completed");
Ok(())
}