1use crate::command::SmtpCommand;
4use crate::parser::parse_command;
5use crate::response::SmtpResponse;
6use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
7use rusmes_auth::AuthBackend;
8use rusmes_core::{MailProcessorRouter, RateLimiter};
9use rusmes_proto::{MailAddress, Username};
10use rusmes_storage::StorageBackend;
11use std::collections::HashMap;
12use std::net::SocketAddr;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
16use tokio::net::TcpStream;
17use tokio::sync::RwLock;
18
19#[derive(Debug, Clone, PartialEq)]
21pub enum SmtpState {
22 Initial,
24 Connected,
26 Authenticated,
28 MailTransaction,
30 Data,
32 Quit,
34}
35
36#[derive(Debug, Clone)]
38pub struct SmtpTransaction {
39 sender: Option<MailAddress>,
40 recipients: Vec<MailAddress>,
41 helo_name: Option<String>,
42 message_size: usize,
43 declared_size: Option<usize>,
45 body_type: Option<String>,
47 smtputf8: bool,
49 bdat_state: Option<crate::BdatState>,
51 message_data: Vec<u8>,
53}
54
55impl SmtpTransaction {
56 fn new() -> Self {
57 Self {
58 sender: None,
59 recipients: Vec::new(),
60 helo_name: None,
61 message_size: 0,
62 declared_size: None,
63 body_type: None,
64 smtputf8: false,
65 bdat_state: None,
66 message_data: Vec::new(),
67 }
68 }
69
70 fn reset(&mut self) {
71 self.sender = None;
72 self.recipients.clear();
73 self.message_size = 0;
74 self.declared_size = None;
75 self.body_type = None;
76 self.smtputf8 = false;
77 self.bdat_state = None;
78 self.message_data.clear();
79 }
80
81 fn is_valid(&self) -> bool {
82 self.sender.is_some() && !self.recipients.is_empty()
83 }
84}
85
86#[derive(Debug, Clone)]
88pub struct SmtpConfig {
89 pub hostname: String,
90 pub max_message_size: usize,
91 pub require_auth: bool,
92 pub enable_starttls: bool,
93 pub check_recipient_exists: bool,
94 pub reject_unknown_recipients: bool,
95 pub relay_networks: Vec<String>,
97 pub local_domains: Vec<String>,
99 pub connection_timeout: Duration,
101 pub idle_timeout: Duration,
103}
104
105impl Default for SmtpConfig {
106 fn default() -> Self {
107 Self {
108 hostname: "localhost".to_string(),
109 max_message_size: 10 * 1024 * 1024, require_auth: false,
111 enable_starttls: false,
112 check_recipient_exists: true,
113 reject_unknown_recipients: true,
114 relay_networks: vec!["127.0.0.0/8".to_string()],
115 local_domains: vec!["localhost".to_string()],
116 connection_timeout: Duration::from_secs(3600), idle_timeout: Duration::from_secs(300), }
119 }
120}
121
122#[derive(Debug, Clone)]
124struct RecipientCacheEntry {
125 exists: bool,
126 cached_at: Instant,
127}
128
129#[derive(Debug, Clone)]
131struct ScramState {
132 client_first_bare: String,
133 server_first: String,
134 nonce: String,
135 username: String,
136}
137
138pub struct SmtpSession {
140 remote_addr: SocketAddr,
141 state: SmtpState,
142 transaction: SmtpTransaction,
143 config: SmtpConfig,
144 authenticated: bool,
145 #[allow(dead_code)]
146 username: Option<String>,
147 #[allow(dead_code)]
148 relaying_allowed: bool,
149 #[allow(dead_code)]
150 processor_router: Arc<MailProcessorRouter>,
151 auth_backend: Arc<dyn AuthBackend>,
152 rate_limiter: Arc<RateLimiter>,
153 storage_backend: Arc<dyn StorageBackend>,
154 recipient_cache: Arc<RwLock<HashMap<String, RecipientCacheEntry>>>,
155 cram_md5_challenge: Option<String>,
157 scram_state: Option<ScramState>,
159}
160
161pub struct SmtpSessionHandler {
163 session: SmtpSession,
164 stream: TcpStream,
165 #[allow(dead_code)]
167 pipelined_commands: Vec<String>,
168 #[allow(dead_code)]
170 connection_started: Instant,
171 #[allow(dead_code)]
173 last_command: Instant,
174}
175
176impl SmtpSessionHandler {
177 #[allow(clippy::too_many_arguments)]
179 pub fn new(
180 stream: TcpStream,
181 remote_addr: SocketAddr,
182 config: SmtpConfig,
183 processor_router: Arc<MailProcessorRouter>,
184 auth_backend: Arc<dyn AuthBackend>,
185 rate_limiter: Arc<RateLimiter>,
186 storage_backend: Arc<dyn StorageBackend>,
187 ) -> Self {
188 let now = Instant::now();
189 Self {
190 session: SmtpSession {
191 remote_addr,
192 state: SmtpState::Connected,
193 transaction: SmtpTransaction::new(),
194 config,
195 authenticated: false,
196 username: None,
197 relaying_allowed: false,
198 processor_router,
199 auth_backend,
200 rate_limiter,
201 storage_backend,
202 recipient_cache: Arc::new(RwLock::new(HashMap::new())),
203 cram_md5_challenge: None,
204 scram_state: None,
205 },
206 stream,
207 pipelined_commands: Vec::new(),
208 connection_started: now,
209 last_command: now,
210 }
211 }
212
213 pub async fn handle(mut self) -> anyhow::Result<()> {
215 let (read_half, write_half) = tokio::io::split(self.stream);
216 let mut reader = BufReader::new(read_half);
217 let mut writer = BufWriter::new(write_half);
218
219 Self::write_response_to(
221 &mut writer,
222 SmtpResponse::service_ready(&self.session.config.hostname),
223 &self.session.remote_addr,
224 )
225 .await?;
226
227 let mut line = String::new();
228
229 loop {
230 if self.connection_started.elapsed() > self.session.config.connection_timeout {
232 tracing::info!(
233 "Connection timeout exceeded for {}",
234 self.session.remote_addr
235 );
236 Self::write_response_to(
237 &mut writer,
238 SmtpResponse::new(421, "4.4.2 Connection timeout - closing connection"),
239 &self.session.remote_addr,
240 )
241 .await?;
242 break;
243 }
244
245 line.clear();
246
247 let n = match tokio::time::timeout(
249 self.session.config.idle_timeout,
250 reader.read_line(&mut line),
251 )
252 .await
253 {
254 Ok(Ok(n)) => n,
255 Ok(Err(e)) => {
256 tracing::error!("Read error from {}: {}", self.session.remote_addr, e);
257 break;
258 }
259 Err(_) => {
260 tracing::info!("Idle timeout for {}", self.session.remote_addr);
262 Self::write_response_to(
263 &mut writer,
264 SmtpResponse::new(421, "4.4.2 Idle timeout - closing connection"),
265 &self.session.remote_addr,
266 )
267 .await?;
268 break;
269 }
270 };
271
272 if n == 0 {
273 break; }
275
276 self.last_command = Instant::now();
278
279 let line_trimmed = line.trim();
280 tracing::debug!(
281 "SMTP command from {}: {}",
282 self.session.remote_addr,
283 line_trimmed
284 );
285
286 if self.session.cram_md5_challenge.is_some() {
288 let response = match self.session.handle_cram_md5_response(line_trimmed).await {
289 Ok(resp) => resp,
290 Err(e) => {
291 tracing::error!("Error handling CRAM-MD5 response: {}", e);
292 SmtpResponse::new(535, "5.7.8 Authentication credentials invalid")
293 }
294 };
295 Self::write_response_to(&mut writer, response, &self.session.remote_addr).await?;
296 continue;
297 }
298
299 if self.session.scram_state.is_some() {
301 let response = match self.session.handle_scram_client_final(line_trimmed).await {
302 Ok(resp) => resp,
303 Err(e) => {
304 tracing::error!("Error handling SCRAM-SHA-256 client-final: {}", e);
305 self.session.scram_state = None;
306 SmtpResponse::new(535, "5.7.8 Authentication credentials invalid")
307 }
308 };
309 Self::write_response_to(&mut writer, response, &self.session.remote_addr).await?;
310 continue;
311 }
312
313 let command = match parse_command(line_trimmed) {
315 Ok(cmd) => cmd,
316 Err(e) => {
317 tracing::warn!("Failed to parse command: {}", e);
318 Self::write_response_to(
319 &mut writer,
320 SmtpResponse::syntax_error("Command not recognized"),
321 &self.session.remote_addr,
322 )
323 .await?;
324 continue;
325 }
326 };
327
328 let response = match self.session.handle_command(command.clone()).await {
330 Ok(resp) => resp,
331 Err(e) => {
332 tracing::error!("Error handling command: {}", e);
333 SmtpResponse::local_error("Internal server error")
334 }
335 };
336
337 Self::write_response_to(&mut writer, response, &self.session.remote_addr).await?;
338
339 if self.session.state == SmtpState::Quit {
341 break;
342 }
343
344 if matches!(command, SmtpCommand::Data) && self.session.state == SmtpState::Data {
348 let remote_addr = self.session.remote_addr;
350 if let Err(e) = Self::handle_data_input(
351 &mut self.session,
352 &mut reader,
353 &mut writer,
354 &remote_addr,
355 )
356 .await
357 {
358 tracing::error!("Error reading message data: {}", e);
359 Self::write_response_to(
360 &mut writer,
361 SmtpResponse::local_error("Error reading message data"),
362 &remote_addr,
363 )
364 .await?;
365 }
366 }
367 }
368
369 Ok(())
370 }
371
372 async fn handle_data_input<R, W>(
374 session: &mut SmtpSession,
375 reader: &mut R,
376 writer: &mut W,
377 remote_addr: &SocketAddr,
378 ) -> anyhow::Result<()>
379 where
380 R: AsyncBufReadExt + Unpin,
381 W: AsyncWriteExt + Unpin,
382 {
383 let mut message_data = Vec::new();
384 let mut line = String::new();
385
386 loop {
387 line.clear();
388 let n = reader.read_line(&mut line).await?;
389 if n == 0 {
390 return Err(anyhow::anyhow!("Unexpected EOF during DATA"));
391 }
392
393 if line.trim() == "." {
395 break;
396 }
397
398 let line_to_add = if line.starts_with("..") {
400 &line[1..]
401 } else {
402 &line
403 };
404
405 message_data.extend_from_slice(line_to_add.as_bytes());
406 }
407
408 let message_size = message_data.len();
410 if message_size > session.config.max_message_size {
411 Self::write_response_to(
412 writer,
413 SmtpResponse::storage_exceeded(format!(
414 "Message size {} exceeds maximum {}",
415 message_size, session.config.max_message_size
416 )),
417 remote_addr,
418 )
419 .await?;
420 session.transaction.reset();
421 session.state = SmtpState::Authenticated;
422 return Ok(());
423 }
424
425 if let Some(declared_size) = session.transaction.declared_size {
427 let max_allowed = declared_size + (declared_size / 10);
429 if message_size > max_allowed {
430 Self::write_response_to(
431 writer,
432 SmtpResponse::storage_exceeded(format!(
433 "Message size {} exceeds declared size {}",
434 message_size, declared_size
435 )),
436 remote_addr,
437 )
438 .await?;
439 session.transaction.reset();
440 session.state = SmtpState::Authenticated;
441 return Ok(());
442 }
443 }
444
445 session.transaction.message_size = message_size;
447 session.transaction.message_data = message_data;
448
449 let sender_display = session
450 .transaction
451 .sender
452 .as_ref()
453 .map(|a| a.to_string())
454 .unwrap_or_else(|| "<unknown>".to_string());
455 tracing::info!(
456 "Accepted message from {} ({} bytes) with {} recipient(s)",
457 sender_display,
458 message_size,
459 session.transaction.recipients.len()
460 );
461
462 Self::write_response_to(
464 writer,
465 SmtpResponse::ok("Message accepted for delivery"),
466 remote_addr,
467 )
468 .await?;
469
470 tracing::info!("About to spawn async message processing task");
472 let sender = session.transaction.sender.clone();
473 let recipients = session.transaction.recipients.clone();
474 let message_data = session.transaction.message_data.clone();
475 let router = session.processor_router.clone();
476
477 tracing::info!(
478 "Spawning task for {} recipients, {} bytes",
479 recipients.len(),
480 message_data.len()
481 );
482
483 tokio::spawn(async move {
484 tracing::info!("Inside spawned task - starting processing");
485 if let Err(e) = SmtpSessionHandler::process_accepted_message(
486 sender,
487 recipients,
488 message_data,
489 router,
490 )
491 .await
492 {
493 tracing::error!("Failed to process message: {}", e);
494 }
495 });
496
497 session.transaction.reset();
499 session.state = SmtpState::Authenticated;
500
501 Ok(())
502 }
503
504 async fn write_response_to<W: AsyncWriteExt + Unpin>(
506 writer: &mut W,
507 response: SmtpResponse,
508 remote_addr: &SocketAddr,
509 ) -> anyhow::Result<()> {
510 let formatted = response.format();
511 tracing::debug!("SMTP response to {}: {}", remote_addr, formatted.trim());
512 writer.write_all(formatted.as_bytes()).await?;
513 writer.flush().await?;
514 Ok(())
515 }
516
517 async fn process_accepted_message(
519 sender: Option<rusmes_proto::MailAddress>,
520 recipients: Vec<rusmes_proto::MailAddress>,
521 message_data: Vec<u8>,
522 router: Arc<rusmes_core::MailProcessorRouter>,
523 ) -> anyhow::Result<()> {
524 use bytes::Bytes;
525 use rusmes_proto::{HeaderMap, Mail, MessageBody, MimeMessage};
526
527 tracing::info!(
528 "Starting message processing for {} recipients",
529 recipients.len()
530 );
531
532 let (headers, body_offset) = rusmes_proto::mime::parse_headers(&message_data)?;
534
535 let mut header_map = HeaderMap::new();
536 for (name, value) in headers {
537 header_map.insert(name, value);
538 }
539
540 let body = if body_offset < message_data.len() {
541 Bytes::from(message_data[body_offset..].to_vec())
542 } else {
543 Bytes::new()
544 };
545
546 let message = MimeMessage::new(header_map, MessageBody::Small(body));
547 let mail = Mail::new(sender, recipients, message, None, None);
548
549 tracing::info!("Processing mail {} through pipeline", mail.id());
550
551 router.route(mail).await?;
553
554 tracing::info!("Mail processing completed");
555 Ok(())
556 }
557}
558
559impl SmtpSession {
560 async fn handle_command(&mut self, command: SmtpCommand) -> anyhow::Result<SmtpResponse> {
562 match command {
563 SmtpCommand::Helo(domain) => self.handle_helo(domain).await,
564 SmtpCommand::Ehlo(domain) => self.handle_ehlo(domain).await,
565 SmtpCommand::Mail { from, params } => self.handle_mail(from, params).await,
566 SmtpCommand::Rcpt { to, params } => self.handle_rcpt(to, params).await,
567 SmtpCommand::Data => self.handle_data().await,
568 SmtpCommand::Bdat { chunk_size, last } => self.handle_bdat(chunk_size, last).await,
569 SmtpCommand::Rset => self.handle_rset().await,
570 SmtpCommand::Noop => Ok(SmtpResponse::ok_simple()),
571 SmtpCommand::Quit => self.handle_quit().await,
572 SmtpCommand::StartTls => self.handle_starttls().await,
573 SmtpCommand::Auth {
574 mechanism,
575 initial_response,
576 } => self.handle_auth(mechanism, initial_response).await,
577 _ => Ok(SmtpResponse::not_implemented("Command not implemented")),
578 }
579 }
580
581 async fn handle_helo(&mut self, domain: String) -> anyhow::Result<SmtpResponse> {
583 if self.state != SmtpState::Connected && self.state != SmtpState::Authenticated {
584 return Ok(SmtpResponse::bad_sequence("Out of sequence"));
585 }
586
587 self.transaction.helo_name = Some(domain);
588 self.state = SmtpState::Authenticated;
589
590 Ok(SmtpResponse::ok(format!(
591 "{} Hello {}",
592 self.config.hostname,
593 self.remote_addr.ip()
594 )))
595 }
596
597 async fn handle_ehlo(&mut self, domain: String) -> anyhow::Result<SmtpResponse> {
599 if self.state != SmtpState::Connected && self.state != SmtpState::Authenticated {
600 return Ok(SmtpResponse::bad_sequence("Out of sequence"));
601 }
602
603 self.transaction.helo_name = Some(domain);
604 self.state = SmtpState::Authenticated;
605
606 let mut extensions = vec![
607 format!("SIZE {}", self.config.max_message_size),
608 "8BITMIME".to_string(),
609 "SMTPUTF8".to_string(),
610 "PIPELINING".to_string(),
611 "CHUNKING".to_string(), ];
613
614 if self.config.enable_starttls {
615 extensions.push("STARTTLS".to_string());
616 }
617
618 if self.config.require_auth {
619 extensions.push("AUTH PLAIN LOGIN CRAM-MD5 SCRAM-SHA-256".to_string());
620 }
621
622 Ok(SmtpResponse::ehlo(&self.config.hostname, extensions))
623 }
624
625 async fn handle_mail(
627 &mut self,
628 from: MailAddress,
629 params: Vec<crate::command::MailParam>,
630 ) -> anyhow::Result<SmtpResponse> {
631 if self.state != SmtpState::Authenticated {
632 return Ok(SmtpResponse::bad_sequence("Must send HELO/EHLO first"));
633 }
634
635 if self.config.require_auth && !self.authenticated {
636 return Ok(SmtpResponse::bad_sequence("Authentication required"));
637 }
638
639 let ip = self.remote_addr.ip();
641 if !self.rate_limiter.allow_message(ip).await {
642 tracing::warn!("Message rate limit exceeded for {} from {}", from, ip);
643 return Ok(SmtpResponse::mailbox_unavailable(
644 "Rate limit exceeded, please try again later",
645 ));
646 }
647
648 for param in params {
650 match param.keyword.to_uppercase().as_str() {
651 "SIZE" => {
652 if let Some(size_str) = param.value {
654 match size_str.parse::<usize>() {
655 Ok(size) => {
656 if size > self.config.max_message_size {
657 return Ok(SmtpResponse::storage_exceeded(format!(
658 "Message size {} exceeds maximum {}",
659 size, self.config.max_message_size
660 )));
661 }
662 self.transaction.declared_size = Some(size);
663 }
664 Err(_) => {
665 return Ok(SmtpResponse::parameter_error("Invalid SIZE parameter"));
666 }
667 }
668 } else {
669 return Ok(SmtpResponse::parameter_error(
670 "SIZE parameter requires a value",
671 ));
672 }
673 }
674 "BODY" => {
675 if let Some(body_value) = param.value {
677 let body_upper = body_value.to_uppercase();
678 match body_upper.as_str() {
679 "7BIT" | "8BITMIME" => {
680 self.transaction.body_type = Some(body_upper);
681 }
682 _ => {
683 return Ok(SmtpResponse::parameter_not_implemented(format!(
684 "Unsupported BODY type: {}",
685 body_value
686 )));
687 }
688 }
689 } else {
690 return Ok(SmtpResponse::parameter_error(
691 "BODY parameter requires a value",
692 ));
693 }
694 }
695 "SMTPUTF8" => {
696 if param.value.is_none() {
699 self.transaction.smtputf8 = true;
700 } else {
701 return Ok(SmtpResponse::parameter_error(
702 "SMTPUTF8 parameter must not have a value",
703 ));
704 }
705 }
706 _ => {
707 tracing::debug!("Unknown MAIL parameter: {}", param.keyword);
709 }
710 }
711 }
712
713 self.transaction.sender = Some(from.clone());
714 self.state = SmtpState::MailTransaction;
715
716 Ok(SmtpResponse::ok(format!("Sender {} OK", from)))
717 }
718
719 async fn handle_rcpt(
721 &mut self,
722 to: MailAddress,
723 params: Vec<crate::command::MailParam>,
724 ) -> anyhow::Result<SmtpResponse> {
725 if self.state != SmtpState::MailTransaction {
726 return Ok(SmtpResponse::bad_sequence("Must send MAIL FROM first"));
727 }
728
729 for param in params {
731 tracing::debug!("Unknown RCPT parameter: {}", param.keyword);
733 }
734
735 if !self.is_relay_allowed(&to) {
737 return Ok(SmtpResponse::new(550, "5.7.1 Relaying denied"));
738 }
739
740 if self.config.check_recipient_exists {
742 if !self.authenticated && !self.relaying_allowed {
744 match self.validate_recipient(&to).await {
745 Ok(true) => {
746 }
748 Ok(false) => {
749 if self.config.reject_unknown_recipients {
750 tracing::warn!("Rejecting unknown recipient: {}", to);
751 return Ok(SmtpResponse::new(
752 550,
753 format!("5.1.1 User unknown: {}", to),
754 ));
755 } else {
756 tracing::info!(
758 "Accepting unknown recipient (rejection disabled): {}",
759 to
760 );
761 }
762 }
763 Err(e) => {
764 tracing::error!("Error validating recipient {}: {}", to, e);
765 tracing::warn!("Accepting recipient {} due to validation error", to);
767 }
768 }
769 }
770 }
771
772 self.transaction.recipients.push(to.clone());
774
775 Ok(SmtpResponse::ok(format!("Recipient {} OK", to)))
776 }
777
778 async fn handle_data(&mut self) -> anyhow::Result<SmtpResponse> {
780 if self.state != SmtpState::MailTransaction {
781 return Ok(SmtpResponse::bad_sequence("Must send RCPT TO first"));
782 }
783
784 if !self.transaction.is_valid() {
785 return Ok(SmtpResponse::bad_sequence("Need at least one recipient"));
786 }
787
788 self.state = SmtpState::Data;
789 Ok(SmtpResponse::start_data())
790 }
791
792 async fn handle_bdat(&mut self, chunk_size: usize, last: bool) -> anyhow::Result<SmtpResponse> {
797 if self.state != SmtpState::MailTransaction {
799 return Ok(SmtpResponse::bad_sequence(
800 "Must send MAIL FROM and RCPT TO first",
801 ));
802 }
803
804 if !self.transaction.is_valid() {
805 return Ok(SmtpResponse::bad_sequence(
806 "Need sender and at least one recipient",
807 ));
808 }
809
810 if self.transaction.bdat_state.is_none() {
812 self.transaction.bdat_state = Some(crate::BdatState::new(self.config.max_message_size));
813 }
814
815 let bdat_state = match self.transaction.bdat_state.as_ref() {
821 Some(state) => state,
822 None => {
823 return Err(anyhow::anyhow!(
824 "Internal error: bdat_state not initialized"
825 ))
826 }
827 };
828 if bdat_state.total_size() + chunk_size > self.config.max_message_size {
829 return Ok(SmtpResponse::storage_exceeded(format!(
830 "Message size {} exceeds maximum {}",
831 bdat_state.total_size() + chunk_size,
832 self.config.max_message_size
833 )));
834 }
835
836 if last {
838 let sender_display = self
839 .transaction
840 .sender
841 .as_ref()
842 .map(|a| a.to_string())
843 .unwrap_or_else(|| "<unknown>".to_string());
844 tracing::info!(
845 "BDAT LAST chunk ({} bytes) from {} with {} recipient(s)",
846 chunk_size,
847 sender_display,
848 self.transaction.recipients.len()
849 );
850 }
851
852 Ok(SmtpResponse::ok(format!("{} octets received", chunk_size)))
854 }
855
856 async fn handle_rset(&mut self) -> anyhow::Result<SmtpResponse> {
858 self.transaction.reset();
859 self.state = SmtpState::Authenticated;
860 Ok(SmtpResponse::ok_simple())
861 }
862
863 async fn handle_quit(&mut self) -> anyhow::Result<SmtpResponse> {
865 self.state = SmtpState::Quit;
866 Ok(SmtpResponse::closing())
867 }
868
869 async fn handle_starttls(&mut self) -> anyhow::Result<SmtpResponse> {
871 if !self.config.enable_starttls {
872 return Ok(SmtpResponse::not_implemented("STARTTLS not available"));
873 }
874
875 Ok(SmtpResponse::new(220, "Ready to start TLS"))
877 }
878
879 async fn handle_auth(
881 &mut self,
882 mechanism: String,
883 initial_response: Option<String>,
884 ) -> anyhow::Result<SmtpResponse> {
885 if !self.config.require_auth {
886 return Ok(SmtpResponse::not_implemented("AUTH not available"));
887 }
888
889 match mechanism.to_uppercase().as_str() {
890 "CRAM-MD5" => self.handle_auth_cram_md5().await,
891 "SCRAM-SHA-256" => self.handle_auth_scram_sha256(initial_response).await,
892 "PLAIN" => {
893 if let Some(response) = initial_response {
894 self.handle_auth_plain(response).await
895 } else {
896 Ok(SmtpResponse::new(334, ""))
898 }
899 }
900 "LOGIN" => {
901 Ok(SmtpResponse::new(334, "VXNlcm5hbWU6"))
904 }
905 _ => Ok(SmtpResponse::parameter_not_implemented(
906 "Authentication mechanism not supported",
907 )),
908 }
909 }
910
911 async fn handle_auth_plain(&mut self, response: String) -> anyhow::Result<SmtpResponse> {
913 let (username, password) = match crate::auth::parse_plain_auth(&response) {
915 Ok(creds) => creds,
916 Err(e) => {
917 tracing::warn!("Failed to parse PLAIN auth: {}", e);
918 return Ok(SmtpResponse::new(535, "5.7.8 Authentication failed"));
919 }
920 };
921
922 let username_obj = match rusmes_proto::Username::new(username.clone()) {
924 Ok(u) => u,
925 Err(e) => {
926 tracing::warn!("Invalid username '{}': {}", username, e);
927 return Ok(SmtpResponse::new(535, "5.7.8 Authentication failed"));
928 }
929 };
930
931 match self
933 .auth_backend
934 .authenticate(&username_obj, &password)
935 .await
936 {
937 Ok(true) => {
938 self.authenticated = true;
939 self.username = Some(username.clone());
940 tracing::info!("User '{}' authenticated successfully (PLAIN)", username);
941 Ok(SmtpResponse::new(235, "2.7.0 Authentication successful"))
942 }
943 Ok(false) => {
944 tracing::warn!("Authentication failed for user '{}'", username);
945 Ok(SmtpResponse::new(535, "5.7.8 Authentication failed"))
946 }
947 Err(e) => {
948 tracing::error!("Authentication error for user '{}': {}", username, e);
949 Ok(SmtpResponse::new(535, "5.7.8 Authentication failed"))
950 }
951 }
952 }
953
954 async fn handle_auth_cram_md5(&mut self) -> anyhow::Result<SmtpResponse> {
956 let challenge = crate::auth::generate_cram_md5_challenge(&self.config.hostname)?;
958
959 self.cram_md5_challenge = Some(challenge.clone());
961
962 let encoded = crate::auth::encode_challenge(&challenge);
964 Ok(SmtpResponse::new(334, encoded))
965 }
966
967 async fn handle_cram_md5_response(
969 &mut self,
970 response_line: &str,
971 ) -> anyhow::Result<SmtpResponse> {
972 let challenge = self
974 .cram_md5_challenge
975 .take()
976 .ok_or_else(|| anyhow::anyhow!("No CRAM-MD5 challenge pending"))?;
977
978 if response_line == "*" {
980 tracing::info!("CRAM-MD5 authentication aborted by client");
981 return Ok(SmtpResponse::new(501, "5.7.0 Authentication aborted"));
982 }
983
984 let decoded = crate::auth::decode_response(response_line)?;
986
987 let (username, client_hmac) = crate::auth::parse_cram_md5_response(&decoded)?;
989
990 tracing::warn!(
1001 "CRAM-MD5 authentication attempted for user '{}', but cannot verify HMAC with bcrypt-hashed passwords",
1002 username
1003 );
1004
1005 tracing::info!(
1011 "CRAM-MD5 authentication for user '{}' from {} - challenge: {}, client_hmac: {}",
1012 username,
1013 self.remote_addr,
1014 challenge,
1015 client_hmac
1016 );
1017
1018 let username_obj = rusmes_proto::Username::new(username.to_string())
1020 .map_err(|e| anyhow::anyhow!("Invalid username: {}", e))?;
1021
1022 let user_exists = self.auth_backend.verify_identity(&username_obj).await?;
1023
1024 if !user_exists {
1025 tracing::warn!(
1026 "CRAM-MD5 authentication failed: user '{}' does not exist",
1027 username
1028 );
1029 return Ok(SmtpResponse::new(
1030 535,
1031 "5.7.8 Authentication credentials invalid",
1032 ));
1033 }
1034
1035 tracing::warn!(
1043 "CRAM-MD5 authentication rejected: mechanism requires plaintext password storage"
1044 );
1045
1046 Ok(SmtpResponse::new(
1047 535,
1048 "5.7.8 Authentication credentials invalid",
1049 ))
1050 }
1051
1052 fn is_relay_allowed(&self, recipient: &MailAddress) -> bool {
1059 if self.authenticated {
1061 tracing::debug!(
1062 "Relay allowed for {} from {}: authenticated user",
1063 recipient,
1064 self.remote_addr.ip()
1065 );
1066 return true;
1067 }
1068
1069 if crate::is_ip_in_networks(self.remote_addr.ip(), &self.config.relay_networks) {
1071 tracing::debug!(
1072 "Relay allowed for {} from {}: client IP in relay_networks",
1073 recipient,
1074 self.remote_addr.ip()
1075 );
1076 return true;
1077 }
1078
1079 let recipient_domain = recipient.domain().as_str();
1081 for local_domain in &self.config.local_domains {
1082 if recipient_domain.eq_ignore_ascii_case(local_domain) {
1083 tracing::debug!(
1084 "Relay allowed for {} from {}: local domain",
1085 recipient,
1086 self.remote_addr.ip()
1087 );
1088 return true;
1089 }
1090 }
1091
1092 tracing::warn!(
1094 "Relay denied for {} from {}: not authenticated, not in relay_networks, not local domain",
1095 recipient,
1096 self.remote_addr.ip()
1097 );
1098 false
1099 }
1100
1101 async fn validate_recipient(&self, recipient: &MailAddress) -> anyhow::Result<bool> {
1103 const CACHE_TTL: Duration = Duration::from_secs(300);
1105
1106 let cache_key = recipient.as_string();
1107
1108 {
1110 let cache = self.recipient_cache.read().await;
1111 if let Some(entry) = cache.get(&cache_key) {
1112 if entry.cached_at.elapsed() < CACHE_TTL {
1113 tracing::debug!("Recipient validation cache hit for {}", recipient);
1114 return Ok(entry.exists);
1115 }
1116 }
1117 }
1118
1119 tracing::debug!(
1121 "Recipient validation cache miss for {}, querying storage",
1122 recipient
1123 );
1124
1125 let username = Username::new(recipient.local_part())?;
1127
1128 let mailbox_store = self.storage_backend.mailbox_store();
1130 let mailboxes = mailbox_store.list_mailboxes(&username).await?;
1131
1132 let exists = !mailboxes.is_empty();
1133
1134 {
1136 let mut cache = self.recipient_cache.write().await;
1137 cache.insert(
1138 cache_key,
1139 RecipientCacheEntry {
1140 exists,
1141 cached_at: Instant::now(),
1142 },
1143 );
1144 }
1145
1146 Ok(exists)
1147 }
1148
1149 async fn handle_auth_scram_sha256(
1151 &mut self,
1152 initial_response: Option<String>,
1153 ) -> anyhow::Result<SmtpResponse> {
1154 let client_first_encoded = match initial_response {
1157 Some(resp) => resp,
1158 None => {
1159 return Ok(SmtpResponse::new(334, ""));
1161 }
1162 };
1163
1164 let client_first_decoded = BASE64
1166 .decode(client_first_encoded.trim())
1167 .map_err(|e| anyhow::anyhow!("Failed to decode client-first: {}", e))?;
1168 let client_first_str = String::from_utf8(client_first_decoded)
1169 .map_err(|e| anyhow::anyhow!("Failed to decode UTF-8: {}", e))?;
1170
1171 let (username, client_nonce, client_first_bare) =
1173 crate::auth::parse_scram_client_first(&client_first_str)?;
1174
1175 let server_nonce = crate::auth::generate_scram_server_nonce()?;
1177 let nonce = format!("{}{}", client_nonce, server_nonce);
1178
1179 tracing::warn!(
1187 "SCRAM-SHA-256 authentication attempted for user '{}', but AuthBackend does not support SCRAM credentials",
1188 username
1189 );
1190
1191 let mut salt = [0u8; 16];
1194 getrandom::fill(&mut salt)
1195 .map_err(|e| anyhow::anyhow!("RNG failure generating SCRAM salt: {}", e))?;
1196 let iterations = 4096u32;
1197
1198 let server_first = format!("r={},s={},i={}", nonce, BASE64.encode(salt), iterations);
1200
1201 self.scram_state = Some(ScramState {
1203 client_first_bare: client_first_bare.clone(),
1204 server_first: server_first.clone(),
1205 nonce: nonce.clone(),
1206 username: username.clone(),
1207 });
1208
1209 let server_first_encoded = BASE64.encode(server_first.as_bytes());
1211 Ok(SmtpResponse::new(334, server_first_encoded))
1212 }
1213
1214 async fn handle_scram_client_final(
1216 &mut self,
1217 client_final_line: &str,
1218 ) -> anyhow::Result<SmtpResponse> {
1219 let state = self
1221 .scram_state
1222 .take()
1223 .ok_or_else(|| anyhow::anyhow!("No SCRAM state"))?;
1224
1225 if client_final_line == "*" {
1227 tracing::info!("SCRAM-SHA-256 authentication aborted by client");
1228 return Ok(SmtpResponse::new(501, "5.7.0 Authentication aborted"));
1229 }
1230
1231 let client_final_decoded = BASE64
1233 .decode(client_final_line.trim())
1234 .map_err(|e| anyhow::anyhow!("Failed to decode client-final: {}", e))?;
1235 let client_final_str = String::from_utf8(client_final_decoded)
1236 .map_err(|e| anyhow::anyhow!("Failed to decode UTF-8: {}", e))?;
1237
1238 let (_channel_binding, nonce, proof, client_final_without_proof) =
1240 crate::auth::parse_scram_client_final(&client_final_str)?;
1241
1242 if nonce != state.nonce {
1244 tracing::warn!(
1245 "SCRAM-SHA-256 nonce mismatch for user '{}': expected '{}', got '{}'",
1246 state.username,
1247 state.nonce,
1248 nonce
1249 );
1250 return Ok(SmtpResponse::new(
1251 535,
1252 "5.7.8 Authentication credentials invalid",
1253 ));
1254 }
1255
1256 let username_obj = rusmes_proto::Username::new(state.username.clone())
1258 .map_err(|e| anyhow::anyhow!("Invalid username: {}", e))?;
1259
1260 let user_exists = self.auth_backend.verify_identity(&username_obj).await?;
1261
1262 if !user_exists {
1263 tracing::warn!(
1264 "SCRAM-SHA-256 authentication failed: user '{}' does not exist",
1265 state.username
1266 );
1267 return Ok(SmtpResponse::new(
1268 535,
1269 "5.7.8 Authentication credentials invalid",
1270 ));
1271 }
1272
1273 tracing::warn!(
1285 "SCRAM-SHA-256 authentication rejected for user '{}': AuthBackend does not support SCRAM credential storage",
1286 state.username
1287 );
1288
1289 tracing::debug!(
1291 "SCRAM-SHA-256 auth attempt - client_first_bare: {}, server_first: {}, client_final_without_proof: {}, proof: {}",
1292 state.client_first_bare,
1293 state.server_first,
1294 client_final_without_proof,
1295 proof
1296 );
1297
1298 Ok(SmtpResponse::new(
1299 535,
1300 "5.7.8 Authentication credentials invalid - SCRAM-SHA-256 requires separate credential storage",
1301 ))
1302 }
1303}
1304
1305#[cfg(test)]
1306mod tests {
1307 use super::*;
1308
1309 #[test]
1310 fn test_transaction_validity() {
1311 let mut tx = SmtpTransaction::new();
1312 assert!(!tx.is_valid());
1313
1314 tx.sender = Some(
1315 "sender@example.com"
1316 .parse()
1317 .expect("valid email address literal"),
1318 );
1319 assert!(!tx.is_valid());
1320
1321 tx.recipients.push(
1322 "rcpt@example.com"
1323 .parse()
1324 .expect("valid email address literal"),
1325 );
1326 assert!(tx.is_valid());
1327
1328 tx.reset();
1329 assert!(!tx.is_valid());
1330 }
1331
1332 #[test]
1333 fn test_smtp_config_default() {
1334 let config = SmtpConfig::default();
1335 assert_eq!(config.hostname, "localhost");
1336 assert_eq!(config.max_message_size, 10 * 1024 * 1024);
1337 assert!(!config.require_auth);
1338 assert!(!config.enable_starttls);
1339 }
1340}