1use crate::common::{
8 config::smtp::queue::{QueueExpiry, QueueName},
9 expr::{self, functions::ResolveVariable, *},
10};
11use crate::store::write::now;
12use crate::types::blob_hash::BlobHash;
13use crate::utils::DomainPart;
14use compact_str::ToCompactString;
15use smtp_proto::Response;
16use std::{
17 fmt::Display,
18 net::{IpAddr, Ipv4Addr},
19 time::{Duration, Instant, SystemTime},
20};
21
22pub mod dsn;
23pub mod manager;
24pub mod quota;
25pub mod spool;
26pub mod throttle;
27
28pub type QueueId = u64;
29
30#[derive(Debug, Clone, rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, serde::Deserialize)]
31pub struct Schedule<T> {
32 pub due: u64,
33 pub inner: T,
34}
35
36#[derive(Debug, Clone, Copy)]
37pub struct QueuedMessage {
38 pub due: u64,
39 pub queue_id: QueueId,
40 pub queue_name: QueueName,
41}
42
43#[derive(Debug, Clone, Copy)]
44pub enum MessageSource {
45 Authenticated,
46 Unauthenticated {
47 dmarc_pass: bool,
48 train_spam: Option<bool>,
49 },
50 Dsn,
51 Report,
52 Autogenerated,
53}
54
55#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, Debug, Clone, PartialEq, Eq)]
56pub struct Message {
57 pub created: u64,
58 pub blob_hash: BlobHash,
59
60 pub return_path: Box<str>,
61 pub recipients: Vec<Recipient>,
62
63 pub received_from_ip: IpAddr,
64 pub received_via_port: u16,
65
66 pub flags: u64,
67 pub env_id: Option<Box<str>>,
68 pub priority: i16,
69
70 pub size: u64,
71 pub quota_keys: Box<[QuotaKey]>,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct MessageWrapper {
76 pub queue_id: QueueId,
77 pub queue_name: QueueName,
78 pub is_multi_queue: bool,
79 pub span_id: u64,
80 pub message: Message,
81}
82
83#[derive(
84 rkyv::Serialize,
85 rkyv::Deserialize,
86 rkyv::Archive,
87 Debug,
88 Clone,
89 PartialEq,
90 Eq,
91 serde::Deserialize,
92)]
93pub enum QuotaKey {
94 Size { key: Box<[u8]>, id: u64 },
95 Count { key: Box<[u8]>, id: u64 },
96}
97
98#[derive(
99 rkyv::Serialize,
100 rkyv::Deserialize,
101 rkyv::Archive,
102 Debug,
103 Clone,
104 PartialEq,
105 Eq,
106 serde::Deserialize,
107)]
108pub struct Recipient {
109 pub address: Box<str>,
110
111 pub retry: Schedule<u32>,
112 pub notify: Schedule<u32>,
113 pub expires: QueueExpiry,
114
115 pub queue: QueueName,
116 pub status: Status<HostResponse<Box<str>>, ErrorDetails>,
117 pub flags: u64,
118 pub orcpt: Option<Box<str>>,
119}
120
121pub const FROM_AUTHENTICATED: u64 = 1 << 32;
122pub const FROM_UNAUTHENTICATED: u64 = 1 << 33;
123pub const FROM_UNAUTHENTICATED_DMARC: u64 = 1 << 34;
124pub const FROM_DSN: u64 = 1 << 35;
125pub const FROM_REPORT: u64 = 1 << 36;
126pub const FROM_AUTOGENERATED: u64 = 1 << 37;
127
128pub const RCPT_DSN_SENT: u64 = 1 << 32;
129pub const RCPT_SPAM_PAYLOAD: u64 = 1 << 34;
131
132#[derive(
133 Debug,
134 Clone,
135 PartialEq,
136 Eq,
137 rkyv::Serialize,
138 rkyv::Deserialize,
139 rkyv::Archive,
140 serde::Serialize,
141 serde::Deserialize,
142)]
143pub enum Status<T, E> {
144 #[serde(rename = "scheduled")]
145 Scheduled,
146 #[serde(rename = "completed")]
147 Completed(T),
148 #[serde(rename = "temp_fail")]
149 TemporaryFailure(E),
150 #[serde(rename = "perm_fail")]
151 PermanentFailure(E),
152}
153
154#[derive(
155 Debug,
156 Clone,
157 PartialEq,
158 Eq,
159 rkyv::Serialize,
160 rkyv::Deserialize,
161 rkyv::Archive,
162 serde::Deserialize,
163)]
164pub struct HostResponse<T> {
165 pub hostname: T,
166 pub response: Response<Box<str>>,
167}
168
169#[derive(
170 Debug,
171 Clone,
172 PartialEq,
173 Eq,
174 rkyv::Serialize,
175 rkyv::Deserialize,
176 rkyv::Archive,
177 serde::Deserialize,
178 Default,
179)]
180pub enum Error {
181 DnsError(Box<str>),
182 UnexpectedResponse(UnexpectedResponse),
183 ConnectionError(Box<str>),
184 TlsError(Box<str>),
185 DaneError(Box<str>),
186 MtaStsError(Box<str>),
187 RateLimited,
188 #[default]
189 ConcurrencyLimited,
190 Io(Box<str>),
191}
192
193#[derive(
194 Debug,
195 Clone,
196 PartialEq,
197 Eq,
198 rkyv::Serialize,
199 rkyv::Deserialize,
200 rkyv::Archive,
201 serde::Deserialize,
202)]
203pub struct UnexpectedResponse {
204 pub command: Box<str>,
205 pub response: Response<Box<str>>,
206}
207
208#[derive(
209 Debug,
210 Clone,
211 PartialEq,
212 Eq,
213 rkyv::Serialize,
214 rkyv::Deserialize,
215 rkyv::Archive,
216 Default,
217 serde::Deserialize,
218)]
219pub struct ErrorDetails {
220 pub entity: Box<str>,
221 pub details: Error,
222}
223
224impl<T> Ord for Schedule<T> {
225 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
226 other.due.cmp(&self.due)
227 }
228}
229
230impl<T> PartialOrd for Schedule<T> {
231 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
232 Some(self.cmp(other))
233 }
234}
235
236impl<T> PartialEq for Schedule<T> {
237 fn eq(&self, other: &Self) -> bool {
238 self.due == other.due
239 }
240}
241
242impl<T> Eq for Schedule<T> {}
243
244impl<T: Default> Schedule<T> {
245 pub fn now() -> Self {
246 Schedule {
247 due: now(),
248 inner: T::default(),
249 }
250 }
251
252 pub fn later(duration: u64) -> Self {
253 Schedule {
254 due: now() + duration,
255 inner: T::default(),
256 }
257 }
258}
259
260pub struct QueueEnvelope<'x> {
261 pub message: &'x Message,
262 pub domain: &'x str,
263 pub mx: &'x str,
264 pub rcpt: &'x Recipient,
265 pub remote_ip: IpAddr,
266 pub local_ip: IpAddr,
267}
268
269impl<'x> QueueEnvelope<'x> {
270 pub fn new(message: &'x Message, rcpt: &'x Recipient) -> Self {
271 Self {
272 message,
273 domain: rcpt.address.domain_part(),
274 rcpt,
275 mx: "",
276 remote_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
277 local_ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
278 }
279 }
280}
281
282impl<'x> ResolveVariable for QueueEnvelope<'x> {
283 fn resolve_variable(&self, variable: u32) -> expr::Variable<'x> {
284 match variable {
285 V_SENDER => self.message.return_path.as_ref().into(),
286 V_SENDER_DOMAIN => self.message.return_path.domain_part().into(),
287 V_RECIPIENT_DOMAIN => self.domain.into(),
288 V_RECIPIENT => self.rcpt.address.as_ref().into(),
289 V_RECIPIENTS => self
290 .message
291 .recipients
292 .iter()
293 .map(|r| Variable::from(r.address.as_ref()))
294 .collect::<Vec<_>>()
295 .into(),
296 V_QUEUE_RETRY_NUM => self.rcpt.retry.inner.into(),
297 V_QUEUE_NOTIFY_NUM => self.rcpt.notify.inner.into(),
298 V_QUEUE_EXPIRES_IN => match &self.rcpt.expires {
299 QueueExpiry::Ttl(time) => (*time + self.message.created).saturating_sub(now()),
300 QueueExpiry::Attempts(count) => {
301 (count.saturating_sub(self.rcpt.retry.inner)) as u64
302 }
303 }
304 .into(),
305 V_QUEUE_LAST_STATUS => self.rcpt.status.to_compact_string().into(),
306 V_QUEUE_LAST_ERROR => match &self.rcpt.status {
307 Status::Scheduled | Status::Completed(_) => "none",
308 Status::TemporaryFailure(err) | Status::PermanentFailure(err) => {
309 match &err.details {
310 Error::DnsError(_) => "dns",
311 Error::UnexpectedResponse(_) => "unexpected-reply",
312 Error::ConnectionError(_) => "connection",
313 Error::TlsError(_) => "tls",
314 Error::DaneError(_) => "dane",
315 Error::MtaStsError(_) => "mta-sts",
316 Error::RateLimited => "rate",
317 Error::ConcurrencyLimited => "concurrency",
318 Error::Io(_) => "io",
319 }
320 }
321 }
322 .into(),
323 V_QUEUE_NAME => self.rcpt.queue.as_str().into(),
324 V_QUEUE_AGE => now().saturating_sub(self.message.created).into(),
325 V_SOURCE => if (self.message.flags & FROM_AUTHENTICATED) != 0 {
326 "authenticated"
327 } else if (self.message.flags & FROM_UNAUTHENTICATED_DMARC) != 0 {
328 "dmarc_pass"
329 } else if (self.message.flags & FROM_UNAUTHENTICATED) != 0 {
330 "unauthenticated"
331 } else if (self.message.flags & FROM_DSN) != 0 {
332 "dsn"
333 } else if (self.message.flags & FROM_REPORT) != 0 {
334 "report"
335 } else if (self.message.flags & FROM_AUTOGENERATED) != 0 {
336 "autogenerated"
337 } else {
338 "unknown"
339 }
340 .into(),
341 V_MX => self.mx.into(),
342 V_PRIORITY => self.message.priority.into(),
343 V_REMOTE_IP => self.remote_ip.to_compact_string().into(),
344 V_LOCAL_IP => self.local_ip.to_compact_string().into(),
345 V_RECEIVED_FROM_IP => self.message.received_from_ip.to_compact_string().into(),
346 V_RECEIVED_VIA_PORT => self.message.received_via_port.into(),
347 V_SIZE => self.message.size.into(),
348 _ => "".into(),
349 }
350 }
351
352 fn resolve_global(&self, _: &str) -> Variable<'_> {
353 Variable::Integer(0)
354 }
355}
356
357impl ResolveVariable for Message {
358 fn resolve_variable(&self, variable: u32) -> expr::Variable<'_> {
359 match variable {
360 V_SENDER => self.return_path.as_ref().into(),
361 V_SENDER_DOMAIN => self.return_path.domain_part().into(),
362 V_RECIPIENTS => self
363 .recipients
364 .iter()
365 .map(|r| Variable::from(r.address.as_ref()))
366 .collect::<Vec<_>>()
367 .into(),
368 V_PRIORITY => self.priority.into(),
369 _ => "".into(),
370 }
371 }
372
373 fn resolve_global(&self, _: &str) -> Variable<'_> {
374 Variable::Integer(0)
375 }
376}
377
378pub struct RecipientDomain<'x>(&'x str);
379
380impl<'x> RecipientDomain<'x> {
381 pub fn new(domain: &'x str) -> Self {
382 Self(domain)
383 }
384}
385
386impl<'x> ResolveVariable for RecipientDomain<'x> {
387 fn resolve_variable(&self, variable: u32) -> expr::Variable<'x> {
388 match variable {
389 V_RECIPIENT_DOMAIN => self.0.into(),
390 _ => "".into(),
391 }
392 }
393
394 fn resolve_global(&self, _: &str) -> Variable<'_> {
395 Variable::Integer(0)
396 }
397}
398
399#[inline(always)]
400pub fn instant_to_timestamp(now: Instant, time: Instant) -> u64 {
401 SystemTime::now()
402 .duration_since(SystemTime::UNIX_EPOCH)
403 .map_or(0, |d| d.as_secs())
404 + time.checked_duration_since(now).map_or(0, |d| d.as_secs())
405}
406
407impl Recipient {
408 pub fn new(address: impl AsRef<str>) -> Self {
409 Recipient {
410 address: address.to_lowercase_domain().into_boxed_str(),
411 status: Status::Scheduled,
412 flags: 0,
413 orcpt: None,
414 retry: Schedule::now(),
415 notify: Schedule::now(),
416 expires: QueueExpiry::Attempts(0),
417 queue: QueueName::default(),
418 }
419 }
420
421 pub fn with_flags(mut self, flags: u64) -> Self {
422 self.flags = flags;
423 self
424 }
425
426 pub fn with_orcpt(mut self, orcpt: Option<Box<str>>) -> Self {
427 self.orcpt = orcpt;
428 self
429 }
430
431 pub fn address(&self) -> &str {
432 &self.address
433 }
434
435 pub fn domain_part(&self) -> &str {
436 self.address.domain_part()
437 }
438}
439
440impl ArchivedRecipient {
441 pub fn address(&self) -> &str {
442 self.address.as_ref()
443 }
444
445 pub fn domain_part(&self) -> &str {
446 self.address.domain_part()
447 }
448}
449
450pub trait InstantFromTimestamp {
451 fn to_instant(&self) -> Instant;
452}
453
454impl InstantFromTimestamp for u64 {
455 fn to_instant(&self) -> Instant {
456 let timestamp = *self;
457 let current_timestamp = SystemTime::now()
458 .duration_since(SystemTime::UNIX_EPOCH)
459 .map_or(0, |d| d.as_secs());
460 if timestamp > current_timestamp {
461 Instant::now() + Duration::from_secs(timestamp - current_timestamp)
462 } else {
463 Instant::now()
464 }
465 }
466}
467
468impl Display for Error {
469 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
470 match self {
471 Error::UnexpectedResponse(response) => {
472 write!(
473 f,
474 "Unexpected response for {}: {}",
475 response.command, response.response
476 )
477 }
478 Error::DnsError(err) => {
479 write!(f, "DNS lookup failed: {err}")
480 }
481 Error::ConnectionError(details) => {
482 write!(f, "Connection failed: {details}",)
483 }
484 Error::TlsError(details) => {
485 write!(f, "TLS error: {details}",)
486 }
487 Error::DaneError(details) => {
488 write!(f, "DANE authentication failure: {details}",)
489 }
490 Error::MtaStsError(details) => {
491 write!(f, "MTA-STS auth failed: {details}")
492 }
493 Error::RateLimited => {
494 write!(f, "Rate limited")
495 }
496 Error::ConcurrencyLimited => {
497 write!(f, "Too many concurrent connections to remote server")
498 }
499 Error::Io(err) => {
500 write!(f, "Queue error: {err}")
501 }
502 }
503 }
504}
505
506impl Display for ArchivedError {
507 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
508 match self {
509 ArchivedError::UnexpectedResponse(response) => {
510 write!(
511 f,
512 "Unexpected response for {}: {}",
513 response.command, response.response
514 )
515 }
516 ArchivedError::DnsError(err) => {
517 write!(f, "DNS lookup failed: {err}")
518 }
519 ArchivedError::ConnectionError(details) => {
520 write!(f, "Connection failed: {details}",)
521 }
522 ArchivedError::TlsError(details) => {
523 write!(f, "TLS error: {details}",)
524 }
525 ArchivedError::DaneError(details) => {
526 write!(f, "DANE authentication failure: {details}",)
527 }
528 ArchivedError::MtaStsError(details) => {
529 write!(f, "MTA-STS auth failed: {details}")
530 }
531 ArchivedError::RateLimited => {
532 write!(f, "Rate limited")
533 }
534 ArchivedError::ConcurrencyLimited => {
535 write!(f, "Too many concurrent connections to remote server")
536 }
537 ArchivedError::Io(err) => {
538 write!(f, "Queue error: {err}")
539 }
540 }
541 }
542}
543
544impl Display for Status<HostResponse<Box<str>>, ErrorDetails> {
545 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
546 match self {
547 Status::Scheduled => write!(f, "Scheduled"),
548 Status::Completed(response) => write!(f, "Delivered: {}", response.response),
549 Status::TemporaryFailure(err) => {
550 write!(f, "Temporary Failure for {}: {}", err.entity, err.details)
551 }
552 Status::PermanentFailure(err) => {
553 write!(f, "Permanent Failure for {}: {}", err.entity, err.details)
554 }
555 }
556 }
557}
558
559impl Display for ArchivedErrorDetails {
560 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
561 write!(f, "Error for {}: {}", self.entity, self.details)
562 }
563}
564
565