Skip to main content

rusmes_smtp/
session.rs

1//! SMTP session state machine and handler
2
3use crate::command::SmtpCommand;
4use crate::parser::parse_command;
5use crate::response::SmtpResponse;
6use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
7use rusmes_auth::AuthBackend;
8use rusmes_core::{MailProcessorRouter, RateLimiter};
9use rusmes_proto::{MailAddress, Username};
10use rusmes_storage::StorageBackend;
11use std::collections::HashMap;
12use std::net::SocketAddr;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
16use tokio::net::TcpStream;
17use tokio::sync::RwLock;
18
19/// SMTP session state
20#[derive(Debug, Clone, PartialEq)]
21pub enum SmtpState {
22    /// Initial state before connection
23    Initial,
24    /// Connected, waiting for HELO/EHLO
25    Connected,
26    /// Authenticated (if AUTH required)
27    Authenticated,
28    /// In mail transaction (after MAIL FROM)
29    MailTransaction,
30    /// Receiving message data (after DATA command)
31    Data,
32    /// Quit command received
33    Quit,
34}
35
36/// SMTP transaction state
37#[derive(Debug, Clone)]
38pub struct SmtpTransaction {
39    sender: Option<MailAddress>,
40    recipients: Vec<MailAddress>,
41    helo_name: Option<String>,
42    message_size: usize,
43    /// Declared message size from MAIL FROM SIZE parameter
44    declared_size: Option<usize>,
45    /// BODY parameter value (7BIT, 8BITMIME, BINARYMIME)
46    body_type: Option<String>,
47    /// SMTPUTF8 flag
48    smtputf8: bool,
49    /// BDAT state for CHUNKING extension (RFC 3030)
50    bdat_state: Option<crate::BdatState>,
51    /// Message data received via DATA command
52    message_data: Vec<u8>,
53}
54
55impl SmtpTransaction {
56    fn new() -> Self {
57        Self {
58            sender: None,
59            recipients: Vec::new(),
60            helo_name: None,
61            message_size: 0,
62            declared_size: None,
63            body_type: None,
64            smtputf8: false,
65            bdat_state: None,
66            message_data: Vec::new(),
67        }
68    }
69
70    fn reset(&mut self) {
71        self.sender = None;
72        self.recipients.clear();
73        self.message_size = 0;
74        self.declared_size = None;
75        self.body_type = None;
76        self.smtputf8 = false;
77        self.bdat_state = None;
78        self.message_data.clear();
79    }
80
81    fn is_valid(&self) -> bool {
82        self.sender.is_some() && !self.recipients.is_empty()
83    }
84}
85
86/// SMTP session configuration
87#[derive(Debug, Clone)]
88pub struct SmtpConfig {
89    pub hostname: String,
90    pub max_message_size: usize,
91    pub require_auth: bool,
92    pub enable_starttls: bool,
93    pub check_recipient_exists: bool,
94    pub reject_unknown_recipients: bool,
95    /// CIDR networks allowed to relay mail (e.g., "192.168.0.0/16")
96    pub relay_networks: Vec<String>,
97    /// Local domains that this server accepts mail for
98    pub local_domains: Vec<String>,
99    /// Total connection timeout (max session duration)
100    pub connection_timeout: Duration,
101    /// Idle timeout (time between commands)
102    pub idle_timeout: Duration,
103}
104
105impl Default for SmtpConfig {
106    fn default() -> Self {
107        Self {
108            hostname: "localhost".to_string(),
109            max_message_size: 10 * 1024 * 1024, // 10MB
110            require_auth: false,
111            enable_starttls: false,
112            check_recipient_exists: true,
113            reject_unknown_recipients: true,
114            relay_networks: vec!["127.0.0.0/8".to_string()],
115            local_domains: vec!["localhost".to_string()],
116            connection_timeout: Duration::from_secs(3600), // 1 hour
117            idle_timeout: Duration::from_secs(300),        // 5 minutes
118        }
119    }
120}
121
122/// Cache entry for recipient validation
123#[derive(Debug, Clone)]
124struct RecipientCacheEntry {
125    exists: bool,
126    cached_at: Instant,
127}
128
129/// SCRAM-SHA-256 authentication state
130#[derive(Debug, Clone)]
131struct ScramState {
132    client_first_bare: String,
133    server_first: String,
134    nonce: String,
135    username: String,
136}
137
138/// SMTP session handler
139pub struct SmtpSession {
140    remote_addr: SocketAddr,
141    state: SmtpState,
142    transaction: SmtpTransaction,
143    config: SmtpConfig,
144    authenticated: bool,
145    #[allow(dead_code)]
146    username: Option<String>,
147    #[allow(dead_code)]
148    relaying_allowed: bool,
149    #[allow(dead_code)]
150    processor_router: Arc<MailProcessorRouter>,
151    auth_backend: Arc<dyn AuthBackend>,
152    rate_limiter: Arc<RateLimiter>,
153    storage_backend: Arc<dyn StorageBackend>,
154    recipient_cache: Arc<RwLock<HashMap<String, RecipientCacheEntry>>>,
155    /// CRAM-MD5 challenge for ongoing authentication
156    cram_md5_challenge: Option<String>,
157    /// SCRAM-SHA-256 authentication state
158    scram_state: Option<ScramState>,
159}
160
161/// SMTP session with stream
162pub struct SmtpSessionHandler {
163    session: SmtpSession,
164    stream: TcpStream,
165    /// Support for PIPELINING - buffer of commands to process
166    #[allow(dead_code)]
167    pipelined_commands: Vec<String>,
168    /// Connection start time
169    #[allow(dead_code)]
170    connection_started: Instant,
171    /// Last command received time
172    #[allow(dead_code)]
173    last_command: Instant,
174}
175
176impl SmtpSessionHandler {
177    /// Create a new SMTP session handler
178    #[allow(clippy::too_many_arguments)]
179    pub fn new(
180        stream: TcpStream,
181        remote_addr: SocketAddr,
182        config: SmtpConfig,
183        processor_router: Arc<MailProcessorRouter>,
184        auth_backend: Arc<dyn AuthBackend>,
185        rate_limiter: Arc<RateLimiter>,
186        storage_backend: Arc<dyn StorageBackend>,
187    ) -> Self {
188        let now = Instant::now();
189        Self {
190            session: SmtpSession {
191                remote_addr,
192                state: SmtpState::Connected,
193                transaction: SmtpTransaction::new(),
194                config,
195                authenticated: false,
196                username: None,
197                relaying_allowed: false,
198                processor_router,
199                auth_backend,
200                rate_limiter,
201                storage_backend,
202                recipient_cache: Arc::new(RwLock::new(HashMap::new())),
203                cram_md5_challenge: None,
204                scram_state: None,
205            },
206            stream,
207            pipelined_commands: Vec::new(),
208            connection_started: now,
209            last_command: now,
210        }
211    }
212
213    /// Handle the SMTP session
214    pub async fn handle(mut self) -> anyhow::Result<()> {
215        let (read_half, write_half) = tokio::io::split(self.stream);
216        let mut reader = BufReader::new(read_half);
217        let mut writer = BufWriter::new(write_half);
218
219        // Send greeting
220        Self::write_response_to(
221            &mut writer,
222            SmtpResponse::service_ready(&self.session.config.hostname),
223            &self.session.remote_addr,
224        )
225        .await?;
226
227        let mut line = String::new();
228
229        loop {
230            // Check total connection timeout
231            if self.connection_started.elapsed() > self.session.config.connection_timeout {
232                tracing::info!(
233                    "Connection timeout exceeded for {}",
234                    self.session.remote_addr
235                );
236                Self::write_response_to(
237                    &mut writer,
238                    SmtpResponse::new(421, "4.4.2 Connection timeout - closing connection"),
239                    &self.session.remote_addr,
240                )
241                .await?;
242                break;
243            }
244
245            line.clear();
246
247            // Read command with idle timeout
248            let n = match tokio::time::timeout(
249                self.session.config.idle_timeout,
250                reader.read_line(&mut line),
251            )
252            .await
253            {
254                Ok(Ok(n)) => n,
255                Ok(Err(e)) => {
256                    tracing::error!("Read error from {}: {}", self.session.remote_addr, e);
257                    break;
258                }
259                Err(_) => {
260                    // Idle timeout
261                    tracing::info!("Idle timeout for {}", self.session.remote_addr);
262                    Self::write_response_to(
263                        &mut writer,
264                        SmtpResponse::new(421, "4.4.2 Idle timeout - closing connection"),
265                        &self.session.remote_addr,
266                    )
267                    .await?;
268                    break;
269                }
270            };
271
272            if n == 0 {
273                break; // EOF
274            }
275
276            // Update last command time
277            self.last_command = Instant::now();
278
279            let line_trimmed = line.trim();
280            tracing::debug!(
281                "SMTP command from {}: {}",
282                self.session.remote_addr,
283                line_trimmed
284            );
285
286            // Check if we're waiting for CRAM-MD5 response
287            if self.session.cram_md5_challenge.is_some() {
288                let response = match self.session.handle_cram_md5_response(line_trimmed).await {
289                    Ok(resp) => resp,
290                    Err(e) => {
291                        tracing::error!("Error handling CRAM-MD5 response: {}", e);
292                        SmtpResponse::new(535, "5.7.8 Authentication credentials invalid")
293                    }
294                };
295                Self::write_response_to(&mut writer, response, &self.session.remote_addr).await?;
296                continue;
297            }
298
299            // Check if we're waiting for SCRAM-SHA-256 response
300            if self.session.scram_state.is_some() {
301                let response = match self.session.handle_scram_client_final(line_trimmed).await {
302                    Ok(resp) => resp,
303                    Err(e) => {
304                        tracing::error!("Error handling SCRAM-SHA-256 client-final: {}", e);
305                        self.session.scram_state = None;
306                        SmtpResponse::new(535, "5.7.8 Authentication credentials invalid")
307                    }
308                };
309                Self::write_response_to(&mut writer, response, &self.session.remote_addr).await?;
310                continue;
311            }
312
313            // Parse command
314            let command = match parse_command(line_trimmed) {
315                Ok(cmd) => cmd,
316                Err(e) => {
317                    tracing::warn!("Failed to parse command: {}", e);
318                    Self::write_response_to(
319                        &mut writer,
320                        SmtpResponse::syntax_error("Command not recognized"),
321                        &self.session.remote_addr,
322                    )
323                    .await?;
324                    continue;
325                }
326            };
327
328            // Handle command
329            let response = match self.session.handle_command(command.clone()).await {
330                Ok(resp) => resp,
331                Err(e) => {
332                    tracing::error!("Error handling command: {}", e);
333                    SmtpResponse::local_error("Internal server error")
334                }
335            };
336
337            Self::write_response_to(&mut writer, response, &self.session.remote_addr).await?;
338
339            // Check if we should close the connection
340            if self.session.state == SmtpState::Quit {
341                break;
342            }
343
344            // For PIPELINING: DATA command requires special handling
345            // After DATA is accepted, we must stop processing pipelined commands
346            // and read the message data
347            if matches!(command, SmtpCommand::Data) && self.session.state == SmtpState::Data {
348                // Read message data (until .<CRLF>)
349                let remote_addr = self.session.remote_addr;
350                if let Err(e) = Self::handle_data_input(
351                    &mut self.session,
352                    &mut reader,
353                    &mut writer,
354                    &remote_addr,
355                )
356                .await
357                {
358                    tracing::error!("Error reading message data: {}", e);
359                    Self::write_response_to(
360                        &mut writer,
361                        SmtpResponse::local_error("Error reading message data"),
362                        &remote_addr,
363                    )
364                    .await?;
365                }
366            }
367        }
368
369        Ok(())
370    }
371
372    /// Handle DATA input (message body)
373    async fn handle_data_input<R, W>(
374        session: &mut SmtpSession,
375        reader: &mut R,
376        writer: &mut W,
377        remote_addr: &SocketAddr,
378    ) -> anyhow::Result<()>
379    where
380        R: AsyncBufReadExt + Unpin,
381        W: AsyncWriteExt + Unpin,
382    {
383        let mut message_data = Vec::new();
384        let mut line = String::new();
385
386        loop {
387            line.clear();
388            let n = reader.read_line(&mut line).await?;
389            if n == 0 {
390                return Err(anyhow::anyhow!("Unexpected EOF during DATA"));
391            }
392
393            // Check for end of message (.<CRLF>)
394            if line.trim() == "." {
395                break;
396            }
397
398            // Remove transparency (leading dot)
399            let line_to_add = if line.starts_with("..") {
400                &line[1..]
401            } else {
402                &line
403            };
404
405            message_data.extend_from_slice(line_to_add.as_bytes());
406        }
407
408        // Check size limit
409        let message_size = message_data.len();
410        if message_size > session.config.max_message_size {
411            Self::write_response_to(
412                writer,
413                SmtpResponse::storage_exceeded(format!(
414                    "Message size {} exceeds maximum {}",
415                    message_size, session.config.max_message_size
416                )),
417                remote_addr,
418            )
419            .await?;
420            session.transaction.reset();
421            session.state = SmtpState::Authenticated;
422            return Ok(());
423        }
424
425        // Check declared size if provided
426        if let Some(declared_size) = session.transaction.declared_size {
427            // Allow some tolerance (10%) for encoding differences
428            let max_allowed = declared_size + (declared_size / 10);
429            if message_size > max_allowed {
430                Self::write_response_to(
431                    writer,
432                    SmtpResponse::storage_exceeded(format!(
433                        "Message size {} exceeds declared size {}",
434                        message_size, declared_size
435                    )),
436                    remote_addr,
437                )
438                .await?;
439                session.transaction.reset();
440                session.state = SmtpState::Authenticated;
441                return Ok(());
442            }
443        }
444
445        // Message accepted
446        session.transaction.message_size = message_size;
447        session.transaction.message_data = message_data;
448
449        let sender_display = session
450            .transaction
451            .sender
452            .as_ref()
453            .map(|a| a.to_string())
454            .unwrap_or_else(|| "<unknown>".to_string());
455        tracing::info!(
456            "Accepted message from {} ({} bytes) with {} recipient(s)",
457            sender_display,
458            message_size,
459            session.transaction.recipients.len()
460        );
461
462        // Send immediate response to client
463        Self::write_response_to(
464            writer,
465            SmtpResponse::ok("Message accepted for delivery"),
466            remote_addr,
467        )
468        .await?;
469
470        // Process the message asynchronously
471        tracing::info!("About to spawn async message processing task");
472        let sender = session.transaction.sender.clone();
473        let recipients = session.transaction.recipients.clone();
474        let message_data = session.transaction.message_data.clone();
475        let router = session.processor_router.clone();
476
477        tracing::info!(
478            "Spawning task for {} recipients, {} bytes",
479            recipients.len(),
480            message_data.len()
481        );
482
483        tokio::spawn(async move {
484            tracing::info!("Inside spawned task - starting processing");
485            if let Err(e) = SmtpSessionHandler::process_accepted_message(
486                sender,
487                recipients,
488                message_data,
489                router,
490            )
491            .await
492            {
493                tracing::error!("Failed to process message: {}", e);
494            }
495        });
496
497        // Reset transaction state
498        session.transaction.reset();
499        session.state = SmtpState::Authenticated;
500
501        Ok(())
502    }
503
504    /// Write a response to a writer
505    async fn write_response_to<W: AsyncWriteExt + Unpin>(
506        writer: &mut W,
507        response: SmtpResponse,
508        remote_addr: &SocketAddr,
509    ) -> anyhow::Result<()> {
510        let formatted = response.format();
511        tracing::debug!("SMTP response to {}: {}", remote_addr, formatted.trim());
512        writer.write_all(formatted.as_bytes()).await?;
513        writer.flush().await?;
514        Ok(())
515    }
516
517    /// Process an accepted message through the mail processor pipeline
518    async fn process_accepted_message(
519        sender: Option<rusmes_proto::MailAddress>,
520        recipients: Vec<rusmes_proto::MailAddress>,
521        message_data: Vec<u8>,
522        router: Arc<rusmes_core::MailProcessorRouter>,
523    ) -> anyhow::Result<()> {
524        use bytes::Bytes;
525        use rusmes_proto::{HeaderMap, Mail, MessageBody, MimeMessage};
526
527        tracing::info!(
528            "Starting message processing for {} recipients",
529            recipients.len()
530        );
531
532        // Parse message headers and body
533        let (headers, body_offset) = rusmes_proto::mime::parse_headers(&message_data)?;
534
535        let mut header_map = HeaderMap::new();
536        for (name, value) in headers {
537            header_map.insert(name, value);
538        }
539
540        let body = if body_offset < message_data.len() {
541            Bytes::from(message_data[body_offset..].to_vec())
542        } else {
543            Bytes::new()
544        };
545
546        let message = MimeMessage::new(header_map, MessageBody::Small(body));
547        let mail = Mail::new(sender, recipients, message, None, None);
548
549        tracing::info!("Processing mail {} through pipeline", mail.id());
550
551        // Process through the router
552        router.route(mail).await?;
553
554        tracing::info!("Mail processing completed");
555        Ok(())
556    }
557}
558
559impl SmtpSession {
560    /// Handle a single SMTP command
561    async fn handle_command(&mut self, command: SmtpCommand) -> anyhow::Result<SmtpResponse> {
562        match command {
563            SmtpCommand::Helo(domain) => self.handle_helo(domain).await,
564            SmtpCommand::Ehlo(domain) => self.handle_ehlo(domain).await,
565            SmtpCommand::Mail { from, params } => self.handle_mail(from, params).await,
566            SmtpCommand::Rcpt { to, params } => self.handle_rcpt(to, params).await,
567            SmtpCommand::Data => self.handle_data().await,
568            SmtpCommand::Bdat { chunk_size, last } => self.handle_bdat(chunk_size, last).await,
569            SmtpCommand::Rset => self.handle_rset().await,
570            SmtpCommand::Noop => Ok(SmtpResponse::ok_simple()),
571            SmtpCommand::Quit => self.handle_quit().await,
572            SmtpCommand::StartTls => self.handle_starttls().await,
573            SmtpCommand::Auth {
574                mechanism,
575                initial_response,
576            } => self.handle_auth(mechanism, initial_response).await,
577            _ => Ok(SmtpResponse::not_implemented("Command not implemented")),
578        }
579    }
580
581    /// Handle HELO command
582    async fn handle_helo(&mut self, domain: String) -> anyhow::Result<SmtpResponse> {
583        if self.state != SmtpState::Connected && self.state != SmtpState::Authenticated {
584            return Ok(SmtpResponse::bad_sequence("Out of sequence"));
585        }
586
587        self.transaction.helo_name = Some(domain);
588        self.state = SmtpState::Authenticated;
589
590        Ok(SmtpResponse::ok(format!(
591            "{} Hello {}",
592            self.config.hostname,
593            self.remote_addr.ip()
594        )))
595    }
596
597    /// Handle EHLO command
598    async fn handle_ehlo(&mut self, domain: String) -> anyhow::Result<SmtpResponse> {
599        if self.state != SmtpState::Connected && self.state != SmtpState::Authenticated {
600            return Ok(SmtpResponse::bad_sequence("Out of sequence"));
601        }
602
603        self.transaction.helo_name = Some(domain);
604        self.state = SmtpState::Authenticated;
605
606        let mut extensions = vec![
607            format!("SIZE {}", self.config.max_message_size),
608            "8BITMIME".to_string(),
609            "SMTPUTF8".to_string(),
610            "PIPELINING".to_string(),
611            "CHUNKING".to_string(), // RFC 3030 - BDAT support
612        ];
613
614        if self.config.enable_starttls {
615            extensions.push("STARTTLS".to_string());
616        }
617
618        if self.config.require_auth {
619            extensions.push("AUTH PLAIN LOGIN CRAM-MD5 SCRAM-SHA-256".to_string());
620        }
621
622        Ok(SmtpResponse::ehlo(&self.config.hostname, extensions))
623    }
624
625    /// Handle MAIL FROM command
626    async fn handle_mail(
627        &mut self,
628        from: MailAddress,
629        params: Vec<crate::command::MailParam>,
630    ) -> anyhow::Result<SmtpResponse> {
631        if self.state != SmtpState::Authenticated {
632            return Ok(SmtpResponse::bad_sequence("Must send HELO/EHLO first"));
633        }
634
635        if self.config.require_auth && !self.authenticated {
636            return Ok(SmtpResponse::bad_sequence("Authentication required"));
637        }
638
639        // Check message rate limit
640        let ip = self.remote_addr.ip();
641        if !self.rate_limiter.allow_message(ip).await {
642            tracing::warn!("Message rate limit exceeded for {} from {}", from, ip);
643            return Ok(SmtpResponse::mailbox_unavailable(
644                "Rate limit exceeded, please try again later",
645            ));
646        }
647
648        // Process ESMTP parameters
649        for param in params {
650            match param.keyword.to_uppercase().as_str() {
651                "SIZE" => {
652                    // RFC 1870 - SIZE extension
653                    if let Some(size_str) = param.value {
654                        match size_str.parse::<usize>() {
655                            Ok(size) => {
656                                if size > self.config.max_message_size {
657                                    return Ok(SmtpResponse::storage_exceeded(format!(
658                                        "Message size {} exceeds maximum {}",
659                                        size, self.config.max_message_size
660                                    )));
661                                }
662                                self.transaction.declared_size = Some(size);
663                            }
664                            Err(_) => {
665                                return Ok(SmtpResponse::parameter_error("Invalid SIZE parameter"));
666                            }
667                        }
668                    } else {
669                        return Ok(SmtpResponse::parameter_error(
670                            "SIZE parameter requires a value",
671                        ));
672                    }
673                }
674                "BODY" => {
675                    // RFC 6152 - 8BITMIME extension
676                    if let Some(body_value) = param.value {
677                        let body_upper = body_value.to_uppercase();
678                        match body_upper.as_str() {
679                            "7BIT" | "8BITMIME" => {
680                                self.transaction.body_type = Some(body_upper);
681                            }
682                            _ => {
683                                return Ok(SmtpResponse::parameter_not_implemented(format!(
684                                    "Unsupported BODY type: {}",
685                                    body_value
686                                )));
687                            }
688                        }
689                    } else {
690                        return Ok(SmtpResponse::parameter_error(
691                            "BODY parameter requires a value",
692                        ));
693                    }
694                }
695                "SMTPUTF8" => {
696                    // RFC 6531 - SMTPUTF8 extension
697                    // This parameter has no value
698                    if param.value.is_none() {
699                        self.transaction.smtputf8 = true;
700                    } else {
701                        return Ok(SmtpResponse::parameter_error(
702                            "SMTPUTF8 parameter must not have a value",
703                        ));
704                    }
705                }
706                _ => {
707                    // Unknown parameter - ignore per RFC 5321
708                    tracing::debug!("Unknown MAIL parameter: {}", param.keyword);
709                }
710            }
711        }
712
713        self.transaction.sender = Some(from.clone());
714        self.state = SmtpState::MailTransaction;
715
716        Ok(SmtpResponse::ok(format!("Sender {} OK", from)))
717    }
718
719    /// Handle RCPT TO command
720    async fn handle_rcpt(
721        &mut self,
722        to: MailAddress,
723        params: Vec<crate::command::MailParam>,
724    ) -> anyhow::Result<SmtpResponse> {
725        if self.state != SmtpState::MailTransaction {
726            return Ok(SmtpResponse::bad_sequence("Must send MAIL FROM first"));
727        }
728
729        // Process ESMTP parameters (for future extensions like DSN)
730        for param in params {
731            // Unknown parameter - ignore per RFC 5321
732            tracing::debug!("Unknown RCPT parameter: {}", param.keyword);
733        }
734
735        // Check relay authorization
736        if !self.is_relay_allowed(&to) {
737            return Ok(SmtpResponse::new(550, "5.7.1 Relaying denied"));
738        }
739
740        // Validate recipient if configured
741        if self.config.check_recipient_exists {
742            // Skip validation for relay-authorized senders
743            if !self.authenticated && !self.relaying_allowed {
744                match self.validate_recipient(&to).await {
745                    Ok(true) => {
746                        // Recipient exists, continue
747                    }
748                    Ok(false) => {
749                        if self.config.reject_unknown_recipients {
750                            tracing::warn!("Rejecting unknown recipient: {}", to);
751                            return Ok(SmtpResponse::new(
752                                550,
753                                format!("5.1.1 User unknown: {}", to),
754                            ));
755                        } else {
756                            // Accept but log
757                            tracing::info!(
758                                "Accepting unknown recipient (rejection disabled): {}",
759                                to
760                            );
761                        }
762                    }
763                    Err(e) => {
764                        tracing::error!("Error validating recipient {}: {}", to, e);
765                        // On error, fail open to avoid blocking legitimate mail
766                        tracing::warn!("Accepting recipient {} due to validation error", to);
767                    }
768                }
769            }
770        }
771
772        // Add recipient
773        self.transaction.recipients.push(to.clone());
774
775        Ok(SmtpResponse::ok(format!("Recipient {} OK", to)))
776    }
777
778    /// Handle DATA command
779    async fn handle_data(&mut self) -> anyhow::Result<SmtpResponse> {
780        if self.state != SmtpState::MailTransaction {
781            return Ok(SmtpResponse::bad_sequence("Must send RCPT TO first"));
782        }
783
784        if !self.transaction.is_valid() {
785            return Ok(SmtpResponse::bad_sequence("Need at least one recipient"));
786        }
787
788        self.state = SmtpState::Data;
789        Ok(SmtpResponse::start_data())
790    }
791
792    /// Handle BDAT command (RFC 3030 CHUNKING)
793    ///
794    /// This method only validates the command and prepares for chunk reception.
795    /// Actual chunk data reading must be done by the caller after receiving this response.
796    async fn handle_bdat(&mut self, chunk_size: usize, last: bool) -> anyhow::Result<SmtpResponse> {
797        // BDAT can only be used after MAIL FROM and RCPT TO
798        if self.state != SmtpState::MailTransaction {
799            return Ok(SmtpResponse::bad_sequence(
800                "Must send MAIL FROM and RCPT TO first",
801            ));
802        }
803
804        if !self.transaction.is_valid() {
805            return Ok(SmtpResponse::bad_sequence(
806                "Need sender and at least one recipient",
807            ));
808        }
809
810        // Initialize BDAT state if not already present
811        if self.transaction.bdat_state.is_none() {
812            self.transaction.bdat_state = Some(crate::BdatState::new(self.config.max_message_size));
813        }
814
815        // Note: The actual chunk data reading happens outside this method
816        // The caller must read exactly chunk_size bytes and call add_chunk on bdat_state
817
818        // Check if this would exceed size limits (preliminary check)
819        // Safety: we just initialized bdat_state above if it was None
820        let bdat_state = match self.transaction.bdat_state.as_ref() {
821            Some(state) => state,
822            None => {
823                return Err(anyhow::anyhow!(
824                    "Internal error: bdat_state not initialized"
825                ))
826            }
827        };
828        if bdat_state.total_size() + chunk_size > self.config.max_message_size {
829            return Ok(SmtpResponse::storage_exceeded(format!(
830                "Message size {} exceeds maximum {}",
831                bdat_state.total_size() + chunk_size,
832                self.config.max_message_size
833            )));
834        }
835
836        // If this is the LAST chunk and message will be complete, log it
837        if last {
838            let sender_display = self
839                .transaction
840                .sender
841                .as_ref()
842                .map(|a| a.to_string())
843                .unwrap_or_else(|| "<unknown>".to_string());
844            tracing::info!(
845                "BDAT LAST chunk ({} bytes) from {} with {} recipient(s)",
846                chunk_size,
847                sender_display,
848                self.transaction.recipients.len()
849            );
850        }
851
852        // Return success - caller must now read chunk_size bytes
853        Ok(SmtpResponse::ok(format!("{} octets received", chunk_size)))
854    }
855
856    /// Handle RSET command
857    async fn handle_rset(&mut self) -> anyhow::Result<SmtpResponse> {
858        self.transaction.reset();
859        self.state = SmtpState::Authenticated;
860        Ok(SmtpResponse::ok_simple())
861    }
862
863    /// Handle QUIT command
864    async fn handle_quit(&mut self) -> anyhow::Result<SmtpResponse> {
865        self.state = SmtpState::Quit;
866        Ok(SmtpResponse::closing())
867    }
868
869    /// Handle STARTTLS command
870    async fn handle_starttls(&mut self) -> anyhow::Result<SmtpResponse> {
871        if !self.config.enable_starttls {
872            return Ok(SmtpResponse::not_implemented("STARTTLS not available"));
873        }
874
875        // In a real implementation, we would upgrade to TLS here
876        Ok(SmtpResponse::new(220, "Ready to start TLS"))
877    }
878
879    /// Handle AUTH command
880    async fn handle_auth(
881        &mut self,
882        mechanism: String,
883        initial_response: Option<String>,
884    ) -> anyhow::Result<SmtpResponse> {
885        if !self.config.require_auth {
886            return Ok(SmtpResponse::not_implemented("AUTH not available"));
887        }
888
889        match mechanism.to_uppercase().as_str() {
890            "CRAM-MD5" => self.handle_auth_cram_md5().await,
891            "SCRAM-SHA-256" => self.handle_auth_scram_sha256(initial_response).await,
892            "PLAIN" => {
893                if let Some(response) = initial_response {
894                    self.handle_auth_plain(response).await
895                } else {
896                    // Request credentials
897                    Ok(SmtpResponse::new(334, ""))
898                }
899            }
900            "LOGIN" => {
901                // LOGIN authentication requires multi-step exchange
902                // Send "334 VXNlcm5hbWU6" (Username: in base64)
903                Ok(SmtpResponse::new(334, "VXNlcm5hbWU6"))
904            }
905            _ => Ok(SmtpResponse::parameter_not_implemented(
906                "Authentication mechanism not supported",
907            )),
908        }
909    }
910
911    /// Handle PLAIN authentication
912    async fn handle_auth_plain(&mut self, response: String) -> anyhow::Result<SmtpResponse> {
913        // Parse credentials
914        let (username, password) = match crate::auth::parse_plain_auth(&response) {
915            Ok(creds) => creds,
916            Err(e) => {
917                tracing::warn!("Failed to parse PLAIN auth: {}", e);
918                return Ok(SmtpResponse::new(535, "5.7.8 Authentication failed"));
919            }
920        };
921
922        // Create Username object
923        let username_obj = match rusmes_proto::Username::new(username.clone()) {
924            Ok(u) => u,
925            Err(e) => {
926                tracing::warn!("Invalid username '{}': {}", username, e);
927                return Ok(SmtpResponse::new(535, "5.7.8 Authentication failed"));
928            }
929        };
930
931        // Authenticate with backend
932        match self
933            .auth_backend
934            .authenticate(&username_obj, &password)
935            .await
936        {
937            Ok(true) => {
938                self.authenticated = true;
939                self.username = Some(username.clone());
940                tracing::info!("User '{}' authenticated successfully (PLAIN)", username);
941                Ok(SmtpResponse::new(235, "2.7.0 Authentication successful"))
942            }
943            Ok(false) => {
944                tracing::warn!("Authentication failed for user '{}'", username);
945                Ok(SmtpResponse::new(535, "5.7.8 Authentication failed"))
946            }
947            Err(e) => {
948                tracing::error!("Authentication error for user '{}': {}", username, e);
949                Ok(SmtpResponse::new(535, "5.7.8 Authentication failed"))
950            }
951        }
952    }
953
954    /// Handle CRAM-MD5 authentication - send challenge
955    async fn handle_auth_cram_md5(&mut self) -> anyhow::Result<SmtpResponse> {
956        // Generate challenge
957        let challenge = crate::auth::generate_cram_md5_challenge(&self.config.hostname)?;
958
959        // Store challenge for verification
960        self.cram_md5_challenge = Some(challenge.clone());
961
962        // Encode and send challenge
963        let encoded = crate::auth::encode_challenge(&challenge);
964        Ok(SmtpResponse::new(334, encoded))
965    }
966
967    /// Handle CRAM-MD5 response
968    async fn handle_cram_md5_response(
969        &mut self,
970        response_line: &str,
971    ) -> anyhow::Result<SmtpResponse> {
972        // Get the challenge (must be set)
973        let challenge = self
974            .cram_md5_challenge
975            .take()
976            .ok_or_else(|| anyhow::anyhow!("No CRAM-MD5 challenge pending"))?;
977
978        // Check for SASL abort
979        if response_line == "*" {
980            tracing::info!("CRAM-MD5 authentication aborted by client");
981            return Ok(SmtpResponse::new(501, "5.7.0 Authentication aborted"));
982        }
983
984        // Decode response
985        let decoded = crate::auth::decode_response(response_line)?;
986
987        // Parse username and HMAC
988        let (username, client_hmac) = crate::auth::parse_cram_md5_response(&decoded)?;
989
990        // IMPORTANT: CRAM-MD5 requires plaintext passwords or password-equivalent secrets
991        // The current AuthBackend uses bcrypt, which is one-way hashing
992        // For CRAM-MD5 to work, we would need:
993        // 1. A separate plaintext password store (security risk)
994        // 2. A password-equivalent secret store
995        // 3. A different authentication backend
996        //
997        // For now, we'll try to authenticate but it will fail with bcrypt
998        // This is documented limitation - CRAM-MD5 is not compatible with secure password storage
999
1000        tracing::warn!(
1001            "CRAM-MD5 authentication attempted for user '{}', but cannot verify HMAC with bcrypt-hashed passwords",
1002            username
1003        );
1004
1005        // We cannot compute the expected HMAC without the plaintext password
1006        // The proper solution would be to store CRAM-MD5 secrets separately
1007        // or use a more modern authentication mechanism like SCRAM
1008
1009        // For demonstration purposes, we log the authentication attempt
1010        tracing::info!(
1011            "CRAM-MD5 authentication for user '{}' from {} - challenge: {}, client_hmac: {}",
1012            username,
1013            self.remote_addr,
1014            challenge,
1015            client_hmac
1016        );
1017
1018        // Check if user exists
1019        let username_obj = rusmes_proto::Username::new(username.to_string())
1020            .map_err(|e| anyhow::anyhow!("Invalid username: {}", e))?;
1021
1022        let user_exists = self.auth_backend.verify_identity(&username_obj).await?;
1023
1024        if !user_exists {
1025            tracing::warn!(
1026                "CRAM-MD5 authentication failed: user '{}' does not exist",
1027                username
1028            );
1029            return Ok(SmtpResponse::new(
1030                535,
1031                "5.7.8 Authentication credentials invalid",
1032            ));
1033        }
1034
1035        // Since we cannot verify HMAC with bcrypt, reject the authentication
1036        // In a real implementation with plaintext or reversible password storage,
1037        // we would:
1038        // 1. Get password from auth backend
1039        // 2. Compute expected HMAC: compute_cram_md5_hmac(password, challenge)
1040        // 3. Compare with client_hmac (constant-time comparison)
1041
1042        tracing::warn!(
1043            "CRAM-MD5 authentication rejected: mechanism requires plaintext password storage"
1044        );
1045
1046        Ok(SmtpResponse::new(
1047            535,
1048            "5.7.8 Authentication credentials invalid",
1049        ))
1050    }
1051
1052    /// Check if relay is allowed for the given recipient
1053    ///
1054    /// Returns `true` if:
1055    /// - User is authenticated, OR
1056    /// - Client IP is in relay_networks (CIDR notation), OR
1057    /// - Recipient domain is a local domain
1058    fn is_relay_allowed(&self, recipient: &MailAddress) -> bool {
1059        // Allow if authenticated
1060        if self.authenticated {
1061            tracing::debug!(
1062                "Relay allowed for {} from {}: authenticated user",
1063                recipient,
1064                self.remote_addr.ip()
1065            );
1066            return true;
1067        }
1068
1069        // Allow if client IP is in relay_networks
1070        if crate::is_ip_in_networks(self.remote_addr.ip(), &self.config.relay_networks) {
1071            tracing::debug!(
1072                "Relay allowed for {} from {}: client IP in relay_networks",
1073                recipient,
1074                self.remote_addr.ip()
1075            );
1076            return true;
1077        }
1078
1079        // Allow if recipient is local domain
1080        let recipient_domain = recipient.domain().as_str();
1081        for local_domain in &self.config.local_domains {
1082            if recipient_domain.eq_ignore_ascii_case(local_domain) {
1083                tracing::debug!(
1084                    "Relay allowed for {} from {}: local domain",
1085                    recipient,
1086                    self.remote_addr.ip()
1087                );
1088                return true;
1089            }
1090        }
1091
1092        // Deny relay
1093        tracing::warn!(
1094            "Relay denied for {} from {}: not authenticated, not in relay_networks, not local domain",
1095            recipient,
1096            self.remote_addr.ip()
1097        );
1098        false
1099    }
1100
1101    /// Validate recipient against storage backend with caching
1102    async fn validate_recipient(&self, recipient: &MailAddress) -> anyhow::Result<bool> {
1103        // Cache TTL: 5 minutes
1104        const CACHE_TTL: Duration = Duration::from_secs(300);
1105
1106        let cache_key = recipient.as_string();
1107
1108        // Check cache first
1109        {
1110            let cache = self.recipient_cache.read().await;
1111            if let Some(entry) = cache.get(&cache_key) {
1112                if entry.cached_at.elapsed() < CACHE_TTL {
1113                    tracing::debug!("Recipient validation cache hit for {}", recipient);
1114                    return Ok(entry.exists);
1115                }
1116            }
1117        }
1118
1119        // Cache miss or expired, query storage backend
1120        tracing::debug!(
1121            "Recipient validation cache miss for {}, querying storage",
1122            recipient
1123        );
1124
1125        // Extract username from mail address
1126        let username = Username::new(recipient.local_part())?;
1127
1128        // Query storage backend for mailboxes
1129        let mailbox_store = self.storage_backend.mailbox_store();
1130        let mailboxes = mailbox_store.list_mailboxes(&username).await?;
1131
1132        let exists = !mailboxes.is_empty();
1133
1134        // Update cache
1135        {
1136            let mut cache = self.recipient_cache.write().await;
1137            cache.insert(
1138                cache_key,
1139                RecipientCacheEntry {
1140                    exists,
1141                    cached_at: Instant::now(),
1142                },
1143            );
1144        }
1145
1146        Ok(exists)
1147    }
1148
1149    /// Handle SCRAM-SHA-256 authentication - initial client-first message
1150    async fn handle_auth_scram_sha256(
1151        &mut self,
1152        initial_response: Option<String>,
1153    ) -> anyhow::Result<SmtpResponse> {
1154        // SCRAM-SHA-256 requires client-first message
1155        // If not provided, send 334 continuation to request it
1156        let client_first_encoded = match initial_response {
1157            Some(resp) => resp,
1158            None => {
1159                // Send initial continuation to request client-first
1160                return Ok(SmtpResponse::new(334, ""));
1161            }
1162        };
1163
1164        // Decode client-first message
1165        let client_first_decoded = BASE64
1166            .decode(client_first_encoded.trim())
1167            .map_err(|e| anyhow::anyhow!("Failed to decode client-first: {}", e))?;
1168        let client_first_str = String::from_utf8(client_first_decoded)
1169            .map_err(|e| anyhow::anyhow!("Failed to decode UTF-8: {}", e))?;
1170
1171        // Parse client-first message
1172        let (username, client_nonce, client_first_bare) =
1173            crate::auth::parse_scram_client_first(&client_first_str)?;
1174
1175        // Generate server nonce and combine with client nonce
1176        let server_nonce = crate::auth::generate_scram_server_nonce()?;
1177        let nonce = format!("{}{}", client_nonce, server_nonce);
1178
1179        // IMPORTANT: SCRAM-SHA-256 requires different credential storage than bcrypt
1180        // The current AuthBackend uses bcrypt which is incompatible with SCRAM
1181        // For SCRAM to work, we need:
1182        // 1. Salt and iteration count for PBKDF2
1183        // 2. StoredKey and ServerKey derived from password
1184        //
1185        // For now, we'll use placeholder values and document this limitation
1186        tracing::warn!(
1187            "SCRAM-SHA-256 authentication attempted for user '{}', but AuthBackend does not support SCRAM credentials",
1188            username
1189        );
1190
1191        // Placeholder: Use fixed salt and iteration count
1192        // In production, these should be stored per-user in the auth backend
1193        let mut salt = [0u8; 16];
1194        getrandom::fill(&mut salt)
1195            .map_err(|e| anyhow::anyhow!("RNG failure generating SCRAM salt: {}", e))?;
1196        let iterations = 4096u32;
1197
1198        // Build server-first message
1199        let server_first = format!("r={},s={},i={}", nonce, BASE64.encode(salt), iterations);
1200
1201        // Store state for next round
1202        self.scram_state = Some(ScramState {
1203            client_first_bare: client_first_bare.clone(),
1204            server_first: server_first.clone(),
1205            nonce: nonce.clone(),
1206            username: username.clone(),
1207        });
1208
1209        // Send server-first as base64-encoded 334 response
1210        let server_first_encoded = BASE64.encode(server_first.as_bytes());
1211        Ok(SmtpResponse::new(334, server_first_encoded))
1212    }
1213
1214    /// Handle SCRAM-SHA-256 client-final message
1215    async fn handle_scram_client_final(
1216        &mut self,
1217        client_final_line: &str,
1218    ) -> anyhow::Result<SmtpResponse> {
1219        // Get stored state
1220        let state = self
1221            .scram_state
1222            .take()
1223            .ok_or_else(|| anyhow::anyhow!("No SCRAM state"))?;
1224
1225        // Check for SASL abort
1226        if client_final_line == "*" {
1227            tracing::info!("SCRAM-SHA-256 authentication aborted by client");
1228            return Ok(SmtpResponse::new(501, "5.7.0 Authentication aborted"));
1229        }
1230
1231        // Decode client-final message
1232        let client_final_decoded = BASE64
1233            .decode(client_final_line.trim())
1234            .map_err(|e| anyhow::anyhow!("Failed to decode client-final: {}", e))?;
1235        let client_final_str = String::from_utf8(client_final_decoded)
1236            .map_err(|e| anyhow::anyhow!("Failed to decode UTF-8: {}", e))?;
1237
1238        // Parse client-final message
1239        let (_channel_binding, nonce, proof, client_final_without_proof) =
1240            crate::auth::parse_scram_client_final(&client_final_str)?;
1241
1242        // Verify nonce matches
1243        if nonce != state.nonce {
1244            tracing::warn!(
1245                "SCRAM-SHA-256 nonce mismatch for user '{}': expected '{}', got '{}'",
1246                state.username,
1247                state.nonce,
1248                nonce
1249            );
1250            return Ok(SmtpResponse::new(
1251                535,
1252                "5.7.8 Authentication credentials invalid",
1253            ));
1254        }
1255
1256        // Check if user exists
1257        let username_obj = rusmes_proto::Username::new(state.username.clone())
1258            .map_err(|e| anyhow::anyhow!("Invalid username: {}", e))?;
1259
1260        let user_exists = self.auth_backend.verify_identity(&username_obj).await?;
1261
1262        if !user_exists {
1263            tracing::warn!(
1264                "SCRAM-SHA-256 authentication failed: user '{}' does not exist",
1265                state.username
1266            );
1267            return Ok(SmtpResponse::new(
1268                535,
1269                "5.7.8 Authentication credentials invalid",
1270            ));
1271        }
1272
1273        // IMPORTANT: Since AuthBackend doesn't support SCRAM credentials,
1274        // we cannot verify the client proof properly.
1275        //
1276        // In a real implementation with SCRAM support, we would:
1277        // 1. Get stored_key and server_key from auth backend
1278        // 2. Construct auth_message from client_first_bare, server_first, client_final_without_proof
1279        // 3. Verify client proof using verify_scram_client_proof()
1280        // 4. Compute server signature using compute_scram_server_signature()
1281        //
1282        // For now, we reject all SCRAM authentication attempts with a clear error message
1283
1284        tracing::warn!(
1285            "SCRAM-SHA-256 authentication rejected for user '{}': AuthBackend does not support SCRAM credential storage",
1286            state.username
1287        );
1288
1289        // Log the authentication attempt for debugging
1290        tracing::debug!(
1291            "SCRAM-SHA-256 auth attempt - client_first_bare: {}, server_first: {}, client_final_without_proof: {}, proof: {}",
1292            state.client_first_bare,
1293            state.server_first,
1294            client_final_without_proof,
1295            proof
1296        );
1297
1298        Ok(SmtpResponse::new(
1299            535,
1300            "5.7.8 Authentication credentials invalid - SCRAM-SHA-256 requires separate credential storage",
1301        ))
1302    }
1303}
1304
1305#[cfg(test)]
1306mod tests {
1307    use super::*;
1308
1309    #[test]
1310    fn test_transaction_validity() {
1311        let mut tx = SmtpTransaction::new();
1312        assert!(!tx.is_valid());
1313
1314        tx.sender = Some(
1315            "sender@example.com"
1316                .parse()
1317                .expect("valid email address literal"),
1318        );
1319        assert!(!tx.is_valid());
1320
1321        tx.recipients.push(
1322            "rcpt@example.com"
1323                .parse()
1324                .expect("valid email address literal"),
1325        );
1326        assert!(tx.is_valid());
1327
1328        tx.reset();
1329        assert!(!tx.is_valid());
1330    }
1331
1332    #[test]
1333    fn test_smtp_config_default() {
1334        let config = SmtpConfig::default();
1335        assert_eq!(config.hostname, "localhost");
1336        assert_eq!(config.max_message_size, 10 * 1024 * 1024);
1337        assert!(!config.require_auth);
1338        assert!(!config.enable_starttls);
1339    }
1340}