Skip to main content

stalwart_lib/smtp/src/queue/
mod.rs

1/*
2 * SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
3 *
4 * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
5 */
6
7use crate::common::{
8    config::smtp::queue::{QueueExpiry, QueueName},
9    expr::{self, functions::ResolveVariable, *},
10};
11use crate::store::write::now;
12use crate::types::blob_hash::BlobHash;
13use crate::utils::DomainPart;
14use compact_str::ToCompactString;
15use smtp_proto::Response;
16use std::{
17    fmt::Display,
18    net::{IpAddr, Ipv4Addr},
19    time::{Duration, Instant, SystemTime},
20};
21
22pub mod dsn;
23pub mod manager;
24pub mod quota;
25pub mod spool;
26pub mod throttle;
27
28pub type QueueId = u64;
29
30#[derive(Debug, Clone, rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, serde::Deserialize)]
31pub struct Schedule<T> {
32    pub due: u64,
33    pub inner: T,
34}
35
36#[derive(Debug, Clone, Copy)]
37pub struct QueuedMessage {
38    pub due: u64,
39    pub queue_id: QueueId,
40    pub queue_name: QueueName,
41}
42
43#[derive(Debug, Clone, Copy)]
44pub enum MessageSource {
45    Authenticated,
46    Unauthenticated {
47        dmarc_pass: bool,
48        train_spam: Option<bool>,
49    },
50    Dsn,
51    Report,
52    Autogenerated,
53}
54
55#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, Debug, Clone, PartialEq, Eq)]
56pub struct Message {
57    pub created: u64,
58    pub blob_hash: BlobHash,
59
60    pub return_path: Box<str>,
61    pub recipients: Vec<Recipient>,
62
63    pub received_from_ip: IpAddr,
64    pub received_via_port: u16,
65
66    pub flags: u64,
67    pub env_id: Option<Box<str>>,
68    pub priority: i16,
69
70    pub size: u64,
71    pub quota_keys: Box<[QuotaKey]>,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct MessageWrapper {
76    pub queue_id: QueueId,
77    pub queue_name: QueueName,
78    pub is_multi_queue: bool,
79    pub span_id: u64,
80    pub message: Message,
81}
82
83#[derive(
84    rkyv::Serialize,
85    rkyv::Deserialize,
86    rkyv::Archive,
87    Debug,
88    Clone,
89    PartialEq,
90    Eq,
91    serde::Deserialize,
92)]
93pub enum QuotaKey {
94    Size { key: Box<[u8]>, id: u64 },
95    Count { key: Box<[u8]>, id: u64 },
96}
97
98#[derive(
99    rkyv::Serialize,
100    rkyv::Deserialize,
101    rkyv::Archive,
102    Debug,
103    Clone,
104    PartialEq,
105    Eq,
106    serde::Deserialize,
107)]
108pub struct Recipient {
109    pub address: Box<str>,
110
111    pub retry: Schedule<u32>,
112    pub notify: Schedule<u32>,
113    pub expires: QueueExpiry,
114
115    pub queue: QueueName,
116    pub status: Status<HostResponse<Box<str>>, ErrorDetails>,
117    pub flags: u64,
118    pub orcpt: Option<Box<str>>,
119}
120
121pub const FROM_AUTHENTICATED: u64 = 1 << 32;
122pub const FROM_UNAUTHENTICATED: u64 = 1 << 33;
123pub const FROM_UNAUTHENTICATED_DMARC: u64 = 1 << 34;
124pub const FROM_DSN: u64 = 1 << 35;
125pub const FROM_REPORT: u64 = 1 << 36;
126pub const FROM_AUTOGENERATED: u64 = 1 << 37;
127
128pub const RCPT_DSN_SENT: u64 = 1 << 32;
129//pub const RCPT_STATUS_CHANGED: u64 = 1 << 33;
130pub const RCPT_SPAM_PAYLOAD: u64 = 1 << 34;
131
132#[derive(
133    Debug,
134    Clone,
135    PartialEq,
136    Eq,
137    rkyv::Serialize,
138    rkyv::Deserialize,
139    rkyv::Archive,
140    serde::Serialize,
141    serde::Deserialize,
142)]
143pub enum Status<T, E> {
144    #[serde(rename = "scheduled")]
145    Scheduled,
146    #[serde(rename = "completed")]
147    Completed(T),
148    #[serde(rename = "temp_fail")]
149    TemporaryFailure(E),
150    #[serde(rename = "perm_fail")]
151    PermanentFailure(E),
152}
153
154#[derive(
155    Debug,
156    Clone,
157    PartialEq,
158    Eq,
159    rkyv::Serialize,
160    rkyv::Deserialize,
161    rkyv::Archive,
162    serde::Deserialize,
163)]
164pub struct HostResponse<T> {
165    pub hostname: T,
166    pub response: Response<Box<str>>,
167}
168
169#[derive(
170    Debug,
171    Clone,
172    PartialEq,
173    Eq,
174    rkyv::Serialize,
175    rkyv::Deserialize,
176    rkyv::Archive,
177    serde::Deserialize,
178    Default,
179)]
180pub enum Error {
181    DnsError(Box<str>),
182    UnexpectedResponse(UnexpectedResponse),
183    ConnectionError(Box<str>),
184    TlsError(Box<str>),
185    DaneError(Box<str>),
186    MtaStsError(Box<str>),
187    RateLimited,
188    #[default]
189    ConcurrencyLimited,
190    Io(Box<str>),
191}
192
193#[derive(
194    Debug,
195    Clone,
196    PartialEq,
197    Eq,
198    rkyv::Serialize,
199    rkyv::Deserialize,
200    rkyv::Archive,
201    serde::Deserialize,
202)]
203pub struct UnexpectedResponse {
204    pub command: Box<str>,
205    pub response: Response<Box<str>>,
206}
207
208#[derive(
209    Debug,
210    Clone,
211    PartialEq,
212    Eq,
213    rkyv::Serialize,
214    rkyv::Deserialize,
215    rkyv::Archive,
216    Default,
217    serde::Deserialize,
218)]
219pub struct ErrorDetails {
220    pub entity: Box<str>,
221    pub details: Error,
222}
223
224impl<T> Ord for Schedule<T> {
225    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
226        other.due.cmp(&self.due)
227    }
228}
229
230impl<T> PartialOrd for Schedule<T> {
231    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
232        Some(self.cmp(other))
233    }
234}
235
236impl<T> PartialEq for Schedule<T> {
237    fn eq(&self, other: &Self) -> bool {
238        self.due == other.due
239    }
240}
241
242impl<T> Eq for Schedule<T> {}
243
244impl<T: Default> Schedule<T> {
245    pub fn now() -> Self {
246        Schedule {
247            due: now(),
248            inner: T::default(),
249        }
250    }
251
252    pub fn later(duration: u64) -> Self {
253        Schedule {
254            due: now() + duration,
255            inner: T::default(),
256        }
257    }
258}
259
260pub struct QueueEnvelope<'x> {
261    pub message: &'x Message,
262    pub domain: &'x str,
263    pub mx: &'x str,
264    pub rcpt: &'x Recipient,
265    pub remote_ip: IpAddr,
266    pub local_ip: IpAddr,
267}
268
269impl<'x> QueueEnvelope<'x> {
270    pub fn new(message: &'x Message, rcpt: &'x Recipient) -> Self {
271        Self {
272            message,
273            domain: rcpt.address.domain_part(),
274            rcpt,
275            mx: "",
276            remote_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
277            local_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
278        }
279    }
280}
281
282impl<'x> ResolveVariable for QueueEnvelope<'x> {
283    fn resolve_variable(&self, variable: u32) -> expr::Variable<'x> {
284        match variable {
285            V_SENDER => self.message.return_path.as_ref().into(),
286            V_SENDER_DOMAIN => self.message.return_path.domain_part().into(),
287            V_RECIPIENT_DOMAIN => self.domain.into(),
288            V_RECIPIENT => self.rcpt.address.as_ref().into(),
289            V_RECIPIENTS => self
290                .message
291                .recipients
292                .iter()
293                .map(|r| Variable::from(r.address.as_ref()))
294                .collect::<Vec<_>>()
295                .into(),
296            V_QUEUE_RETRY_NUM => self.rcpt.retry.inner.into(),
297            V_QUEUE_NOTIFY_NUM => self.rcpt.notify.inner.into(),
298            V_QUEUE_EXPIRES_IN => match &self.rcpt.expires {
299                QueueExpiry::Ttl(time) => (*time + self.message.created).saturating_sub(now()),
300                QueueExpiry::Attempts(count) => {
301                    (count.saturating_sub(self.rcpt.retry.inner)) as u64
302                }
303            }
304            .into(),
305            V_QUEUE_LAST_STATUS => self.rcpt.status.to_compact_string().into(),
306            V_QUEUE_LAST_ERROR => match &self.rcpt.status {
307                Status::Scheduled | Status::Completed(_) => "none",
308                Status::TemporaryFailure(err) | Status::PermanentFailure(err) => {
309                    match &err.details {
310                        Error::DnsError(_) => "dns",
311                        Error::UnexpectedResponse(_) => "unexpected-reply",
312                        Error::ConnectionError(_) => "connection",
313                        Error::TlsError(_) => "tls",
314                        Error::DaneError(_) => "dane",
315                        Error::MtaStsError(_) => "mta-sts",
316                        Error::RateLimited => "rate",
317                        Error::ConcurrencyLimited => "concurrency",
318                        Error::Io(_) => "io",
319                    }
320                }
321            }
322            .into(),
323            V_QUEUE_NAME => self.rcpt.queue.as_str().into(),
324            V_QUEUE_AGE => now().saturating_sub(self.message.created).into(),
325            V_SOURCE => if (self.message.flags & FROM_AUTHENTICATED) != 0 {
326                "authenticated"
327            } else if (self.message.flags & FROM_UNAUTHENTICATED_DMARC) != 0 {
328                "dmarc_pass"
329            } else if (self.message.flags & FROM_UNAUTHENTICATED) != 0 {
330                "unauthenticated"
331            } else if (self.message.flags & FROM_DSN) != 0 {
332                "dsn"
333            } else if (self.message.flags & FROM_REPORT) != 0 {
334                "report"
335            } else if (self.message.flags & FROM_AUTOGENERATED) != 0 {
336                "autogenerated"
337            } else {
338                "unknown"
339            }
340            .into(),
341            V_MX => self.mx.into(),
342            V_PRIORITY => self.message.priority.into(),
343            V_REMOTE_IP => self.remote_ip.to_compact_string().into(),
344            V_LOCAL_IP => self.local_ip.to_compact_string().into(),
345            V_RECEIVED_FROM_IP => self.message.received_from_ip.to_compact_string().into(),
346            V_RECEIVED_VIA_PORT => self.message.received_via_port.into(),
347            V_SIZE => self.message.size.into(),
348            _ => "".into(),
349        }
350    }
351
352    fn resolve_global(&self, _: &str) -> Variable<'_> {
353        Variable::Integer(0)
354    }
355}
356
357impl ResolveVariable for Message {
358    fn resolve_variable(&self, variable: u32) -> expr::Variable<'_> {
359        match variable {
360            V_SENDER => self.return_path.as_ref().into(),
361            V_SENDER_DOMAIN => self.return_path.domain_part().into(),
362            V_RECIPIENTS => self
363                .recipients
364                .iter()
365                .map(|r| Variable::from(r.address.as_ref()))
366                .collect::<Vec<_>>()
367                .into(),
368            V_PRIORITY => self.priority.into(),
369            _ => "".into(),
370        }
371    }
372
373    fn resolve_global(&self, _: &str) -> Variable<'_> {
374        Variable::Integer(0)
375    }
376}
377
378pub struct RecipientDomain<'x>(&'x str);
379
380impl<'x> RecipientDomain<'x> {
381    pub fn new(domain: &'x str) -> Self {
382        Self(domain)
383    }
384}
385
386impl<'x> ResolveVariable for RecipientDomain<'x> {
387    fn resolve_variable(&self, variable: u32) -> expr::Variable<'x> {
388        match variable {
389            V_RECIPIENT_DOMAIN => self.0.into(),
390            _ => "".into(),
391        }
392    }
393
394    fn resolve_global(&self, _: &str) -> Variable<'_> {
395        Variable::Integer(0)
396    }
397}
398
399#[inline(always)]
400pub fn instant_to_timestamp(now: Instant, time: Instant) -> u64 {
401    SystemTime::now()
402        .duration_since(SystemTime::UNIX_EPOCH)
403        .map_or(0, |d| d.as_secs())
404        + time.checked_duration_since(now).map_or(0, |d| d.as_secs())
405}
406
407impl Recipient {
408    pub fn new(address: impl AsRef<str>) -> Self {
409        Recipient {
410            address: address.to_lowercase_domain().into_boxed_str(),
411            status: Status::Scheduled,
412            flags: 0,
413            orcpt: None,
414            retry: Schedule::now(),
415            notify: Schedule::now(),
416            expires: QueueExpiry::Attempts(0),
417            queue: QueueName::default(),
418        }
419    }
420
421    pub fn with_flags(mut self, flags: u64) -> Self {
422        self.flags = flags;
423        self
424    }
425
426    pub fn with_orcpt(mut self, orcpt: Option<Box<str>>) -> Self {
427        self.orcpt = orcpt;
428        self
429    }
430
431    pub fn address(&self) -> &str {
432        &self.address
433    }
434
435    pub fn domain_part(&self) -> &str {
436        self.address.domain_part()
437    }
438}
439
440impl ArchivedRecipient {
441    pub fn address(&self) -> &str {
442        self.address.as_ref()
443    }
444
445    pub fn domain_part(&self) -> &str {
446        self.address.domain_part()
447    }
448}
449
450pub trait InstantFromTimestamp {
451    fn to_instant(&self) -> Instant;
452}
453
454impl InstantFromTimestamp for u64 {
455    fn to_instant(&self) -> Instant {
456        let timestamp = *self;
457        let current_timestamp = SystemTime::now()
458            .duration_since(SystemTime::UNIX_EPOCH)
459            .map_or(0, |d| d.as_secs());
460        if timestamp > current_timestamp {
461            Instant::now() + Duration::from_secs(timestamp - current_timestamp)
462        } else {
463            Instant::now()
464        }
465    }
466}
467
468impl Display for Error {
469    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
470        match self {
471            Error::UnexpectedResponse(response) => {
472                write!(
473                    f,
474                    "Unexpected response for {}: {}",
475                    response.command, response.response
476                )
477            }
478            Error::DnsError(err) => {
479                write!(f, "DNS lookup failed: {err}")
480            }
481            Error::ConnectionError(details) => {
482                write!(f, "Connection failed: {details}",)
483            }
484            Error::TlsError(details) => {
485                write!(f, "TLS error: {details}",)
486            }
487            Error::DaneError(details) => {
488                write!(f, "DANE authentication failure: {details}",)
489            }
490            Error::MtaStsError(details) => {
491                write!(f, "MTA-STS auth failed: {details}")
492            }
493            Error::RateLimited => {
494                write!(f, "Rate limited")
495            }
496            Error::ConcurrencyLimited => {
497                write!(f, "Too many concurrent connections to remote server")
498            }
499            Error::Io(err) => {
500                write!(f, "Queue error: {err}")
501            }
502        }
503    }
504}
505
506impl Display for ArchivedError {
507    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
508        match self {
509            ArchivedError::UnexpectedResponse(response) => {
510                write!(
511                    f,
512                    "Unexpected response for {}: {}",
513                    response.command, response.response
514                )
515            }
516            ArchivedError::DnsError(err) => {
517                write!(f, "DNS lookup failed: {err}")
518            }
519            ArchivedError::ConnectionError(details) => {
520                write!(f, "Connection failed: {details}",)
521            }
522            ArchivedError::TlsError(details) => {
523                write!(f, "TLS error: {details}",)
524            }
525            ArchivedError::DaneError(details) => {
526                write!(f, "DANE authentication failure: {details}",)
527            }
528            ArchivedError::MtaStsError(details) => {
529                write!(f, "MTA-STS auth failed: {details}")
530            }
531            ArchivedError::RateLimited => {
532                write!(f, "Rate limited")
533            }
534            ArchivedError::ConcurrencyLimited => {
535                write!(f, "Too many concurrent connections to remote server")
536            }
537            ArchivedError::Io(err) => {
538                write!(f, "Queue error: {err}")
539            }
540        }
541    }
542}
543
544impl Display for Status<HostResponse<Box<str>>, ErrorDetails> {
545    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
546        match self {
547            Status::Scheduled => write!(f, "Scheduled"),
548            Status::Completed(response) => write!(f, "Delivered: {}", response.response),
549            Status::TemporaryFailure(err) => {
550                write!(f, "Temporary Failure for {}: {}", err.entity, err.details)
551            }
552            Status::PermanentFailure(err) => {
553                write!(f, "Permanent Failure for {}: {}", err.entity, err.details)
554            }
555        }
556    }
557}
558
559impl Display for ArchivedErrorDetails {
560    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
561        write!(f, "Error for {}: {}", self.entity, self.details)
562    }
563}
564
565/*
566
567pub trait DisplayArchivedResponse {
568    fn to_string(&self) -> String;
569}
570
571impl DisplayArchivedResponse for ArchivedResponse<Box<str>> {
572    fn to_string(&self) -> String {
573        format!(
574            "Code: {}, Enhanced code: {}.{}.{}, Message: {}",
575            self.code, self.esc[0], self.esc[1], self.esc[2], self.message,
576        )
577    }
578}
579*/