1use crate::session::{SmtpConfig, SmtpSessionHandler};
16use rusmes_auth::AuthBackend;
17use rusmes_core::{MailProcessorRouter, RateLimiter};
18use rusmes_storage::StorageBackend;
19use std::sync::Arc;
20use tokio::net::TcpListener;
21
22#[derive(Debug, Clone)]
27pub struct SubmissionConfig {
28 pub hostname: String,
30 pub max_message_size: usize,
32 pub require_starttls: bool,
34 pub require_auth: bool,
36 pub check_recipient_exists: bool,
38 pub reject_unknown_recipients: bool,
40 pub local_domains: Vec<String>,
42 pub connection_timeout: std::time::Duration,
44 pub idle_timeout: std::time::Duration,
46 pub max_recipients_per_message: usize,
48 pub enforce_sender_match: bool,
50}
51
52impl Default for SubmissionConfig {
53 fn default() -> Self {
54 Self {
55 hostname: "localhost".to_string(),
56 max_message_size: 25 * 1024 * 1024, require_starttls: true, require_auth: true, check_recipient_exists: false, reject_unknown_recipients: false, local_domains: vec!["localhost".to_string()],
62 connection_timeout: std::time::Duration::from_secs(1800), idle_timeout: std::time::Duration::from_secs(180), max_recipients_per_message: 100, enforce_sender_match: true, }
67 }
68}
69
70impl From<SubmissionConfig> for SmtpConfig {
71 fn from(config: SubmissionConfig) -> Self {
72 SmtpConfig {
73 hostname: config.hostname,
74 max_message_size: config.max_message_size,
75 require_auth: config.require_auth,
76 enable_starttls: config.require_starttls,
77 check_recipient_exists: config.check_recipient_exists,
78 reject_unknown_recipients: config.reject_unknown_recipients,
79 relay_networks: vec![],
81 local_domains: config.local_domains,
82 connection_timeout: config.connection_timeout,
83 idle_timeout: config.idle_timeout,
84 }
85 }
86}
87
88pub struct SubmissionServer {
95 config: SubmissionConfig,
96 bind_addr: String,
97 listener: Option<TcpListener>,
98 tls_config: Option<Arc<rustls::ServerConfig>>,
99 processor_router: Arc<MailProcessorRouter>,
100 auth_backend: Arc<dyn AuthBackend>,
101 rate_limiter: Arc<RateLimiter>,
102 storage_backend: Arc<dyn StorageBackend>,
103}
104
105impl SubmissionServer {
106 #[allow(clippy::too_many_arguments)]
116 pub fn new(
117 config: SubmissionConfig,
118 bind_addr: impl Into<String>,
119 processor_router: Arc<MailProcessorRouter>,
120 auth_backend: Arc<dyn AuthBackend>,
121 rate_limiter: Arc<RateLimiter>,
122 storage_backend: Arc<dyn StorageBackend>,
123 ) -> Self {
124 assert!(
126 config.require_auth,
127 "Submission server must require authentication"
128 );
129 assert!(
130 config.require_starttls,
131 "Submission server must require STARTTLS"
132 );
133
134 Self {
135 config,
136 bind_addr: bind_addr.into(),
137 listener: None,
138 tls_config: None,
139 processor_router,
140 auth_backend,
141 rate_limiter,
142 storage_backend,
143 }
144 }
145
146 pub fn with_tls(mut self, tls_config: Arc<rustls::ServerConfig>) -> Self {
154 self.tls_config = Some(tls_config);
155 self
156 }
157
158 pub async fn bind(&mut self) -> anyhow::Result<()> {
163 let listener = TcpListener::bind(&self.bind_addr).await?;
164 tracing::info!("SMTP Submission server listening on {}", self.bind_addr);
165 self.listener = Some(listener);
166 Ok(())
167 }
168
169 pub async fn serve(&self) -> anyhow::Result<()> {
177 let listener = self
178 .listener
179 .as_ref()
180 .ok_or_else(|| anyhow::anyhow!("Server not bound - call bind() first"))?;
181
182 if self.tls_config.is_none() {
184 tracing::warn!(
185 "Submission server running WITHOUT TLS configuration. \
186 This is INSECURE and should only be used for testing. \
187 STARTTLS will fail without TLS configuration."
188 );
189 }
190
191 loop {
192 let (stream, remote_addr) = listener.accept().await?;
193 tracing::info!("New SMTP submission connection from {}", remote_addr);
194
195 let ip = remote_addr.ip();
197 if !self.rate_limiter.allow_connection(ip).await {
198 tracing::warn!(
199 "Connection rate limit exceeded for {} on submission port",
200 ip
201 );
202 drop(stream);
204 continue;
205 }
206
207 let smtp_config: SmtpConfig = self.config.clone().into();
209
210 let session = SmtpSessionHandler::new(
212 stream,
213 remote_addr,
214 smtp_config,
215 self.processor_router.clone(),
216 self.auth_backend.clone(),
217 self.rate_limiter.clone(),
218 self.storage_backend.clone(),
219 );
220
221 let rate_limiter = self.rate_limiter.clone();
222
223 tokio::spawn(async move {
225 if let Err(e) = session.handle().await {
226 tracing::error!("SMTP submission session error from {}: {}", remote_addr, e);
227 }
228 rate_limiter.release_connection(ip).await;
230 });
231 }
232 }
233
234 pub async fn run(mut self) -> anyhow::Result<()> {
241 self.bind().await?;
242 self.serve().await
243 }
244
245 pub fn bind_addr(&self) -> &str {
247 &self.bind_addr
248 }
249
250 pub fn config(&self) -> &SubmissionConfig {
252 &self.config
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259 use rusmes_metrics::MetricsCollector;
260 use rusmes_proto::Username;
261 use rusmes_storage::{MailboxStore, MessageStore, MetadataStore};
262
263 #[allow(dead_code)]
264 struct DummyAuthBackend;
265
266 #[async_trait::async_trait]
267 impl AuthBackend for DummyAuthBackend {
268 async fn authenticate(
269 &self,
270 _username: &rusmes_proto::Username,
271 _password: &str,
272 ) -> anyhow::Result<bool> {
273 Ok(true)
274 }
275
276 async fn verify_identity(
277 &self,
278 _username: &rusmes_proto::Username,
279 ) -> anyhow::Result<bool> {
280 Ok(true)
281 }
282
283 async fn list_users(&self) -> anyhow::Result<Vec<rusmes_proto::Username>> {
284 Ok(Vec::new())
285 }
286
287 async fn create_user(
288 &self,
289 _username: &rusmes_proto::Username,
290 _password: &str,
291 ) -> anyhow::Result<()> {
292 Ok(())
293 }
294
295 async fn delete_user(&self, _username: &rusmes_proto::Username) -> anyhow::Result<()> {
296 Ok(())
297 }
298
299 async fn change_password(
300 &self,
301 _username: &rusmes_proto::Username,
302 _new_password: &str,
303 ) -> anyhow::Result<()> {
304 Ok(())
305 }
306 }
307
308 #[allow(dead_code)]
309 struct DummyMailboxStore;
310
311 #[async_trait::async_trait]
312 impl MailboxStore for DummyMailboxStore {
313 async fn create_mailbox(
314 &self,
315 _path: &rusmes_storage::MailboxPath,
316 ) -> anyhow::Result<rusmes_storage::MailboxId> {
317 Ok(rusmes_storage::MailboxId::new())
318 }
319
320 async fn delete_mailbox(&self, _id: &rusmes_storage::MailboxId) -> anyhow::Result<()> {
321 Ok(())
322 }
323
324 async fn rename_mailbox(
325 &self,
326 _id: &rusmes_storage::MailboxId,
327 _new_path: &rusmes_storage::MailboxPath,
328 ) -> anyhow::Result<()> {
329 Ok(())
330 }
331
332 async fn get_mailbox(
333 &self,
334 _id: &rusmes_storage::MailboxId,
335 ) -> anyhow::Result<Option<rusmes_storage::Mailbox>> {
336 Ok(None)
337 }
338
339 async fn list_mailboxes(
340 &self,
341 _user: &Username,
342 ) -> anyhow::Result<Vec<rusmes_storage::Mailbox>> {
343 Ok(Vec::new())
344 }
345
346 async fn get_user_inbox(
347 &self,
348 _user: &Username,
349 ) -> anyhow::Result<Option<rusmes_storage::MailboxId>> {
350 Ok(None)
351 }
352
353 async fn subscribe_mailbox(
354 &self,
355 _user: &Username,
356 _mailbox_name: String,
357 ) -> anyhow::Result<()> {
358 Ok(())
359 }
360
361 async fn unsubscribe_mailbox(
362 &self,
363 _user: &Username,
364 _mailbox_name: &str,
365 ) -> anyhow::Result<()> {
366 Ok(())
367 }
368
369 async fn list_subscriptions(&self, _user: &Username) -> anyhow::Result<Vec<String>> {
370 Ok(Vec::new())
371 }
372 }
373
374 #[allow(dead_code)]
375 struct DummyMessageStore;
376
377 #[async_trait::async_trait]
378 impl MessageStore for DummyMessageStore {
379 async fn append_message(
380 &self,
381 _mailbox_id: &rusmes_storage::MailboxId,
382 _message: rusmes_proto::Mail,
383 ) -> anyhow::Result<rusmes_storage::MessageMetadata> {
384 Ok(rusmes_storage::MessageMetadata::new(
385 rusmes_proto::MessageId::new(),
386 rusmes_storage::MailboxId::new(),
387 1,
388 rusmes_storage::MessageFlags::new(),
389 0,
390 ))
391 }
392
393 async fn get_message(
394 &self,
395 _message_id: &rusmes_proto::MessageId,
396 ) -> anyhow::Result<Option<rusmes_proto::Mail>> {
397 Ok(None)
398 }
399
400 async fn delete_messages(
401 &self,
402 _message_ids: &[rusmes_proto::MessageId],
403 ) -> anyhow::Result<()> {
404 Ok(())
405 }
406
407 async fn set_flags(
408 &self,
409 _message_ids: &[rusmes_proto::MessageId],
410 _flags: rusmes_storage::MessageFlags,
411 ) -> anyhow::Result<()> {
412 Ok(())
413 }
414
415 async fn search(
416 &self,
417 _mailbox_id: &rusmes_storage::MailboxId,
418 _criteria: rusmes_storage::SearchCriteria,
419 ) -> anyhow::Result<Vec<rusmes_proto::MessageId>> {
420 Ok(Vec::new())
421 }
422
423 async fn copy_messages(
424 &self,
425 _message_ids: &[rusmes_proto::MessageId],
426 _dest_mailbox_id: &rusmes_storage::MailboxId,
427 ) -> anyhow::Result<Vec<rusmes_storage::MessageMetadata>> {
428 Ok(Vec::new())
429 }
430
431 async fn get_mailbox_messages(
432 &self,
433 _mailbox_id: &rusmes_storage::MailboxId,
434 ) -> anyhow::Result<Vec<rusmes_storage::MessageMetadata>> {
435 Ok(Vec::new())
436 }
437 }
438
439 #[allow(dead_code)]
440 struct DummyMetadataStore;
441
442 #[async_trait::async_trait]
443 impl MetadataStore for DummyMetadataStore {
444 async fn get_user_quota(&self, _user: &Username) -> anyhow::Result<rusmes_storage::Quota> {
445 Ok(rusmes_storage::Quota::new(0, 1024 * 1024 * 1024))
446 }
447
448 async fn set_user_quota(
449 &self,
450 _user: &Username,
451 _quota: rusmes_storage::Quota,
452 ) -> anyhow::Result<()> {
453 Ok(())
454 }
455
456 async fn get_mailbox_counters(
457 &self,
458 _mailbox_id: &rusmes_storage::MailboxId,
459 ) -> anyhow::Result<rusmes_storage::MailboxCounters> {
460 Ok(rusmes_storage::MailboxCounters::default())
461 }
462 }
463
464 #[allow(dead_code)]
465 struct DummyStorageBackend {
466 mailbox_store: Arc<dyn MailboxStore>,
467 message_store: Arc<dyn MessageStore>,
468 metadata_store: Arc<dyn MetadataStore>,
469 }
470
471 impl StorageBackend for DummyStorageBackend {
472 fn mailbox_store(&self) -> Arc<dyn MailboxStore> {
473 self.mailbox_store.clone()
474 }
475
476 fn message_store(&self) -> Arc<dyn MessageStore> {
477 self.message_store.clone()
478 }
479
480 fn metadata_store(&self) -> Arc<dyn MetadataStore> {
481 self.metadata_store.clone()
482 }
483 }
484
485 #[test]
486 fn test_submission_config_default() {
487 let config = SubmissionConfig::default();
488 assert!(config.require_auth);
489 assert!(config.require_starttls);
490 assert_eq!(config.max_message_size, 25 * 1024 * 1024);
491 assert_eq!(config.max_recipients_per_message, 100);
492 assert!(config.enforce_sender_match);
493 }
494
495 #[test]
496 fn test_submission_config_to_smtp_config() {
497 let submission_config = SubmissionConfig {
498 hostname: "mail.example.com".to_string(),
499 max_message_size: 10 * 1024 * 1024,
500 require_starttls: true,
501 require_auth: true,
502 check_recipient_exists: false,
503 reject_unknown_recipients: false,
504 local_domains: vec!["example.com".to_string()],
505 connection_timeout: std::time::Duration::from_secs(600),
506 idle_timeout: std::time::Duration::from_secs(120),
507 max_recipients_per_message: 50,
508 enforce_sender_match: true,
509 };
510
511 let smtp_config: SmtpConfig = submission_config.clone().into();
512
513 assert_eq!(smtp_config.hostname, "mail.example.com");
514 assert_eq!(smtp_config.max_message_size, 10 * 1024 * 1024);
515 assert!(smtp_config.require_auth);
516 assert!(smtp_config.enable_starttls);
517 assert!(!smtp_config.check_recipient_exists);
518 assert!(!smtp_config.reject_unknown_recipients);
519 assert_eq!(smtp_config.local_domains, vec!["example.com"]);
520 assert_eq!(smtp_config.connection_timeout.as_secs(), 600);
521 assert_eq!(smtp_config.idle_timeout.as_secs(), 120);
522 assert!(smtp_config.relay_networks.is_empty());
524 }
525
526 #[test]
527 fn test_submission_server_creation() {
528 let config = SubmissionConfig::default();
529 let metrics = Arc::new(MetricsCollector::new());
530 let router = Arc::new(MailProcessorRouter::new(metrics));
531 let auth = Arc::new(DummyAuthBackend);
532 let rate_limiter = Arc::new(rusmes_core::RateLimiter::new(
533 rusmes_core::RateLimitConfig::default(),
534 ));
535 let storage: Arc<dyn StorageBackend> = Arc::new(DummyStorageBackend {
536 mailbox_store: Arc::new(DummyMailboxStore),
537 message_store: Arc::new(DummyMessageStore),
538 metadata_store: Arc::new(DummyMetadataStore),
539 });
540
541 let server = SubmissionServer::new(
542 config.clone(),
543 "127.0.0.1:587",
544 router,
545 auth,
546 rate_limiter,
547 storage,
548 );
549
550 assert_eq!(server.bind_addr(), "127.0.0.1:587");
551 assert_eq!(server.config().hostname, config.hostname);
552 assert!(server.config().require_auth);
553 assert!(server.config().require_starttls);
554 }
555
556 #[test]
557 #[should_panic(expected = "Submission server must require authentication")]
558 fn test_submission_server_requires_auth() {
559 let config = SubmissionConfig {
560 require_auth: false,
561 ..Default::default()
562 };
563
564 let metrics = Arc::new(MetricsCollector::new());
565 let router = Arc::new(MailProcessorRouter::new(metrics));
566 let auth = Arc::new(DummyAuthBackend);
567 let rate_limiter = Arc::new(rusmes_core::RateLimiter::new(
568 rusmes_core::RateLimitConfig::default(),
569 ));
570 let storage: Arc<dyn StorageBackend> = Arc::new(DummyStorageBackend {
571 mailbox_store: Arc::new(DummyMailboxStore),
572 message_store: Arc::new(DummyMessageStore),
573 metadata_store: Arc::new(DummyMetadataStore),
574 });
575
576 let _server =
577 SubmissionServer::new(config, "127.0.0.1:587", router, auth, rate_limiter, storage);
578 }
579
580 #[test]
581 #[should_panic(expected = "Submission server must require STARTTLS")]
582 fn test_submission_server_requires_starttls() {
583 let config = SubmissionConfig {
584 require_starttls: false,
585 ..Default::default()
586 };
587
588 let metrics = Arc::new(MetricsCollector::new());
589 let router = Arc::new(MailProcessorRouter::new(metrics));
590 let auth = Arc::new(DummyAuthBackend);
591 let rate_limiter = Arc::new(rusmes_core::RateLimiter::new(
592 rusmes_core::RateLimitConfig::default(),
593 ));
594 let storage: Arc<dyn StorageBackend> = Arc::new(DummyStorageBackend {
595 mailbox_store: Arc::new(DummyMailboxStore),
596 message_store: Arc::new(DummyMessageStore),
597 metadata_store: Arc::new(DummyMetadataStore),
598 });
599
600 let _server =
601 SubmissionServer::new(config, "127.0.0.1:587", router, auth, rate_limiter, storage);
602 }
603}