use core::future::Future;
use core::pin::Pin;
use std::borrow::Cow;
use std::time::Duration;
use email_message::{EmailAddress, Envelope, Header, Message, OutboundMessage};
use time::format_description::well_known::Rfc2822;
pub use crate::options::{
CorrelationId, IdempotencyKey, SendOptions, TransportOption, TransportOptions,
};
#[cfg(feature = "serde")]
pub use crate::options::{
SendOptionsSeed, TransportOptionRegistry, TransportOptionRegistryError, TransportOptionsSeed,
};
pub trait Transport: RuntimeBound {
fn capabilities(&self) -> Capabilities {
Capabilities {
structured_send: StructuredSendCapability::Supported,
..Capabilities::default()
}
}
fn send<'a>(
&'a self,
message: &'a OutboundMessage,
options: &'a SendOptions,
) -> impl Future<Output = Result<SendReport, TransportError>> + MaybeSend + 'a;
fn send_owned<'a>(
&'a self,
message: OutboundMessage,
options: &'a SendOptions,
) -> impl Future<Output = Result<SendReport, TransportError>> + MaybeSend + 'a {
async move { self.send(&message, options).await }
}
}
pub trait RawTransport: RuntimeBound {
fn capabilities(&self) -> Capabilities {
Capabilities {
raw_rfc822: true,
custom_envelope: true,
..Capabilities::default()
}
}
fn send_raw<'a>(
&'a self,
envelope: &'a Envelope,
rfc822: &'a [u8],
options: &'a SendOptions,
) -> impl Future<Output = Result<SendReport, TransportError>> + MaybeSend + 'a;
fn send_raw_owned<'a>(
&'a self,
envelope: Envelope,
rfc822: Vec<u8>,
options: &'a SendOptions,
) -> impl Future<Output = Result<SendReport, TransportError>> + MaybeSend + 'a {
async move { self.send_raw(&envelope, &rfc822, options).await }
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
#[non_exhaustive]
#[allow(clippy::struct_excessive_bools)]
pub struct Capabilities {
pub raw_rfc822: bool,
pub structured_send: StructuredSendCapability,
pub custom_envelope: bool,
pub custom_headers: bool,
pub attachments: bool,
pub inline_attachments: bool,
pub idempotency_key: bool,
pub timeout: bool,
}
impl Capabilities {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub const fn with_raw_rfc822(mut self, value: bool) -> Self {
self.raw_rfc822 = value;
self
}
#[must_use]
pub const fn with_structured_send(mut self, value: StructuredSendCapability) -> Self {
self.structured_send = value;
self
}
#[must_use]
pub const fn with_custom_envelope(mut self, value: bool) -> Self {
self.custom_envelope = value;
self
}
#[must_use]
pub const fn with_custom_headers(mut self, value: bool) -> Self {
self.custom_headers = value;
self
}
#[must_use]
pub const fn with_attachments(mut self, value: bool) -> Self {
self.attachments = value;
self
}
#[must_use]
pub const fn with_inline_attachments(mut self, value: bool) -> Self {
self.inline_attachments = value;
self
}
#[must_use]
pub const fn with_idempotency_key(mut self, value: bool) -> Self {
self.idempotency_key = value;
self
}
#[must_use]
pub const fn with_timeout(mut self, value: bool) -> Self {
self.timeout = value;
self
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
#[non_exhaustive]
pub enum StructuredSendCapability {
#[default]
Unsupported,
Supported,
RequiresTransportOptions,
}
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct SendReport {
pub provider: Cow<'static, str>,
pub provider_message_id: Option<String>,
pub accepted: Vec<EmailAddress>,
}
impl SendReport {
#[must_use]
pub fn new(provider: impl Into<Cow<'static, str>>) -> Self {
Self {
provider: provider.into(),
provider_message_id: None,
accepted: Vec::new(),
}
}
#[must_use]
pub fn with_provider_message_id(mut self, id: impl Into<String>) -> Self {
self.provider_message_id = Some(id.into());
self
}
#[must_use]
pub fn with_accepted<I>(mut self, accepted: I) -> Self
where
I: IntoIterator<Item = EmailAddress>,
{
self.accepted = accepted.into_iter().collect();
self
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum ErrorKind {
Validation,
Authentication,
Authorization,
RateLimited,
Timeout,
TransientNetwork,
TransientProvider,
PermanentProvider,
UnsupportedFeature,
Internal,
}
impl ErrorKind {
#[must_use]
pub const fn from_http_status(status: u16) -> Self {
match status {
400 | 422 => Self::Validation,
401 => Self::Authentication,
403 => Self::Authorization,
408 => Self::Timeout,
425 => Self::TransientNetwork,
429 => Self::RateLimited,
501 | 505 | 510 | 511 => Self::PermanentProvider,
500..=599 => Self::TransientProvider,
_ => Self::PermanentProvider,
}
}
}
impl std::fmt::Display for ErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let label = match self {
Self::Validation => "validation",
Self::Authentication => "authentication",
Self::Authorization => "authorization",
Self::RateLimited => "rate-limited",
Self::Timeout => "timeout",
Self::TransientNetwork => "transient-network",
Self::TransientProvider => "transient-provider",
Self::PermanentProvider => "permanent-provider",
Self::UnsupportedFeature => "unsupported-feature",
Self::Internal => "internal",
};
f.write_str(label)
}
}
#[derive(Debug, thiserror::Error)]
#[error("{kind}: {message}")]
#[non_exhaustive]
pub struct TransportError {
pub kind: ErrorKind,
pub message: String,
pub http_status: Option<u16>,
pub provider_error_code: Option<String>,
pub request_id: Option<String>,
pub retry_after: Option<Duration>,
#[source]
source: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
}
impl TransportError {
#[must_use]
pub fn new(kind: ErrorKind, message: impl Into<String>) -> Self {
Self {
kind,
message: message.into(),
http_status: None,
provider_error_code: None,
request_id: None,
retry_after: None,
source: None,
}
}
#[must_use]
pub const fn with_http_status(mut self, status: u16) -> Self {
self.http_status = Some(status);
self
}
#[must_use]
pub fn with_provider_error_code(mut self, code: impl Into<String>) -> Self {
self.provider_error_code = Some(code.into());
self
}
#[must_use]
pub const fn with_retry_after(mut self, retry_after: Duration) -> Self {
self.retry_after = Some(retry_after);
self
}
#[must_use]
pub fn with_source(mut self, source: impl std::error::Error + Send + Sync + 'static) -> Self {
self.source = Some(Box::new(source));
self
}
#[must_use]
pub const fn is_retryable(&self) -> bool {
matches!(
self.kind,
ErrorKind::RateLimited
| ErrorKind::Timeout
| ErrorKind::TransientNetwork
| ErrorKind::TransientProvider
)
}
#[must_use]
pub const fn is_terminal(&self) -> bool {
!self.is_retryable()
}
#[must_use]
pub const fn is_timeout(&self) -> bool {
matches!(self.kind, ErrorKind::Timeout)
}
}
#[cfg(not(target_arch = "wasm32"))]
pub trait RuntimeBound: Send + Sync {}
#[cfg(not(target_arch = "wasm32"))]
impl<T: Send + Sync + ?Sized> RuntimeBound for T {}
#[cfg(target_arch = "wasm32")]
pub trait RuntimeBound {}
#[cfg(target_arch = "wasm32")]
impl<T: ?Sized> RuntimeBound for T {}
#[cfg(not(target_arch = "wasm32"))]
pub trait MaybeSend: Send {}
#[cfg(not(target_arch = "wasm32"))]
impl<T: Send + ?Sized> MaybeSend for T {}
#[cfg(target_arch = "wasm32")]
pub trait MaybeSend {}
#[cfg(target_arch = "wasm32")]
impl<T: ?Sized> MaybeSend for T {}
#[cfg(target_arch = "wasm32")]
#[doc(hidden)]
pub type BoxFut<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
#[cfg(not(target_arch = "wasm32"))]
#[doc(hidden)]
pub type BoxFut<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
mod sealed {
pub trait ErasedTransport {}
pub trait ErasedRawTransport {}
}
pub trait ErasedTransport: RuntimeBound + sealed::ErasedTransport {
fn capabilities(&self) -> Capabilities;
fn send<'a>(
&'a self,
message: &'a OutboundMessage,
options: &'a SendOptions,
) -> BoxFut<'a, Result<SendReport, TransportError>>;
fn send_owned<'a>(
&'a self,
message: OutboundMessage,
options: &'a SendOptions,
) -> BoxFut<'a, Result<SendReport, TransportError>>;
}
impl<T: Transport + ?Sized> sealed::ErasedTransport for T {}
impl<T> ErasedTransport for T
where
T: Transport + ?Sized,
{
fn capabilities(&self) -> Capabilities {
Transport::capabilities(self)
}
fn send<'a>(
&'a self,
message: &'a OutboundMessage,
options: &'a SendOptions,
) -> BoxFut<'a, Result<SendReport, TransportError>> {
Box::pin(Transport::send(self, message, options))
}
fn send_owned<'a>(
&'a self,
message: OutboundMessage,
options: &'a SendOptions,
) -> BoxFut<'a, Result<SendReport, TransportError>> {
Box::pin(Transport::send_owned(self, message, options))
}
}
pub trait ErasedRawTransport: RuntimeBound + sealed::ErasedRawTransport {
fn capabilities(&self) -> Capabilities;
fn send_raw<'a>(
&'a self,
envelope: &'a Envelope,
rfc822: &'a [u8],
options: &'a SendOptions,
) -> BoxFut<'a, Result<SendReport, TransportError>>;
fn send_raw_owned<'a>(
&'a self,
envelope: Envelope,
rfc822: Vec<u8>,
options: &'a SendOptions,
) -> BoxFut<'a, Result<SendReport, TransportError>>;
}
impl<T: RawTransport + ?Sized> sealed::ErasedRawTransport for T {}
impl<T> ErasedRawTransport for T
where
T: RawTransport + ?Sized,
{
fn capabilities(&self) -> Capabilities {
RawTransport::capabilities(self)
}
fn send_raw<'a>(
&'a self,
envelope: &'a Envelope,
rfc822: &'a [u8],
options: &'a SendOptions,
) -> BoxFut<'a, Result<SendReport, TransportError>> {
Box::pin(RawTransport::send_raw(self, envelope, rfc822, options))
}
fn send_raw_owned<'a>(
&'a self,
envelope: Envelope,
rfc822: Vec<u8>,
options: &'a SendOptions,
) -> BoxFut<'a, Result<SendReport, TransportError>> {
Box::pin(RawTransport::send_raw_owned(
self, envelope, rfc822, options,
))
}
}
pub type DynTransport = dyn ErasedTransport;
pub type SharedTransport = std::sync::Arc<DynTransport>;
pub type DynRawTransport = dyn ErasedRawTransport;
pub type SharedRawTransport = std::sync::Arc<DynRawTransport>;
#[must_use]
pub fn accepted_recipient_emails(message: &Message) -> Vec<EmailAddress> {
message
.to()
.iter()
.chain(message.cc())
.chain(message.bcc())
.flat_map(email_message::Address::mailboxes)
.map(|mailbox| mailbox.email().clone())
.collect()
}
#[must_use]
pub fn structured_accepted_for(
message: &Message,
options: &SendOptions,
capabilities: Capabilities,
) -> Vec<EmailAddress> {
if capabilities.custom_envelope
&& let Some(envelope) = options.envelope.as_ref()
{
return envelope.rcpt_to().to_vec();
}
accepted_recipient_emails(message)
}
pub fn standard_message_headers(message: &Message) -> Result<Vec<Header>, TransportError> {
let mut headers = Vec::new();
if let Some(sender) = message.sender() {
headers.push(
Header::new("Sender", sender.to_string())
.map_err(|error| TransportError::new(ErrorKind::Validation, error.to_string()))?,
);
}
if let Some(date) = message.date() {
headers.push(
Header::new(
"Date",
date.format(&Rfc2822).map_err(|error| {
TransportError::new(ErrorKind::Validation, error.to_string())
})?,
)
.map_err(|error| TransportError::new(ErrorKind::Validation, error.to_string()))?,
);
}
if let Some(message_id) = message.message_id() {
headers.push(
Header::new("Message-ID", message_id.to_string())
.map_err(|error| TransportError::new(ErrorKind::Validation, error.to_string()))?,
);
}
Ok(headers)
}
#[cfg(test)]
mod tests {
use email_message::{Address, Body, EmailAddress, Envelope, Message};
#[cfg(any(feature = "serde", feature = "schemars"))]
use super::SendReport;
use super::{
Capabilities, ErrorKind, SendOptions, StructuredSendCapability, TransportError,
structured_accepted_for,
};
fn message_with_recipient(recipient: &str) -> Message {
Message::builder(Body::text("hello"))
.from_mailbox("sender@example.com".parse().expect("sender parses"))
.to(vec![Address::Mailbox(
recipient.parse().expect("recipient parses"),
)])
.build()
.expect("message validates")
}
fn options_with_envelope(recipient: &str) -> SendOptions {
SendOptions::new().with_envelope(Envelope::new(
Some(
"bounce@example.com"
.parse::<EmailAddress>()
.expect("from parses"),
),
vec![recipient.parse::<EmailAddress>().expect("rcpt parses")],
))
}
fn accepted_strings(accepted: &[EmailAddress]) -> Vec<&str> {
accepted.iter().map(EmailAddress::as_str).collect()
}
#[test]
fn capabilities_default_to_false() {
assert_eq!(
Capabilities::default(),
Capabilities {
raw_rfc822: false,
structured_send: StructuredSendCapability::Unsupported,
custom_envelope: false,
custom_headers: false,
attachments: false,
inline_attachments: false,
idempotency_key: false,
timeout: false,
}
);
}
#[test]
fn structured_accepted_ignores_envelope_when_capability_is_false() {
let message = message_with_recipient("message@example.com");
let options = options_with_envelope("envelope@example.com");
let accepted = structured_accepted_for(&message, &options, Capabilities::new());
assert_eq!(accepted_strings(&accepted), vec!["message@example.com"]);
}
#[test]
fn structured_accepted_uses_envelope_when_capability_is_true() {
let message = message_with_recipient("message@example.com");
let options = options_with_envelope("envelope@example.com");
let capabilities = Capabilities::new().with_custom_envelope(true);
let accepted = structured_accepted_for(&message, &options, capabilities);
assert_eq!(accepted_strings(&accepted), vec!["envelope@example.com"]);
}
#[cfg(feature = "serde")]
#[test]
fn send_report_round_trips_through_serde() {
let report = SendReport::new("postmark")
.with_provider_message_id("message-id")
.with_accepted(["recipient@example.com"
.parse::<EmailAddress>()
.expect("recipient parses")]);
let json = serde_json::to_value(&report).expect("send report serializes");
assert_eq!(
json,
serde_json::json!({
"provider": "postmark",
"provider_message_id": "message-id",
"accepted": ["recipient@example.com"],
})
);
let back: SendReport = serde_json::from_value(json).expect("send report deserializes");
assert_eq!(back, report);
}
#[cfg(feature = "schemars")]
#[test]
fn send_report_schema_includes_public_wire_fields() {
let schema = schemars::schema_for!(SendReport);
let value = schema.as_value();
assert!(value.pointer("/properties/provider").is_some());
assert!(value.pointer("/properties/provider_message_id").is_some());
assert!(value.pointer("/properties/accepted").is_some());
}
#[test]
fn transport_error_kind_predicates_classify_each_variant() {
let cases = [
(ErrorKind::Validation, "validation"),
(ErrorKind::Authentication, "auth"),
(ErrorKind::Authorization, "authz"),
(ErrorKind::RateLimited, "rate"),
(ErrorKind::Timeout, "timeout"),
(ErrorKind::TransientNetwork, "net"),
(ErrorKind::TransientProvider, "transient"),
(ErrorKind::PermanentProvider, "permanent"),
(ErrorKind::UnsupportedFeature, "unsupported"),
(ErrorKind::Internal, "internal"),
];
for (kind, label) in cases {
let err = TransportError::new(kind.clone(), label);
let retryable = matches!(
kind,
ErrorKind::RateLimited
| ErrorKind::Timeout
| ErrorKind::TransientNetwork
| ErrorKind::TransientProvider
);
assert_eq!(err.is_retryable(), retryable, "{label}: is_retryable");
assert_eq!(err.is_terminal(), !retryable, "{label}: is_terminal");
assert_eq!(
err.is_timeout(),
matches!(kind, ErrorKind::Timeout),
"{label}: is_timeout"
);
}
}
#[test]
fn from_http_status_maps_documented_codes() {
assert_eq!(ErrorKind::from_http_status(400), ErrorKind::Validation);
assert_eq!(ErrorKind::from_http_status(422), ErrorKind::Validation);
assert_eq!(ErrorKind::from_http_status(401), ErrorKind::Authentication);
assert_eq!(ErrorKind::from_http_status(403), ErrorKind::Authorization);
assert_eq!(ErrorKind::from_http_status(408), ErrorKind::Timeout);
assert_eq!(
ErrorKind::from_http_status(425),
ErrorKind::TransientNetwork
);
assert_eq!(ErrorKind::from_http_status(429), ErrorKind::RateLimited);
assert_eq!(
ErrorKind::from_http_status(500),
ErrorKind::TransientProvider
);
assert_eq!(
ErrorKind::from_http_status(599),
ErrorKind::TransientProvider
);
for code in [501u16, 505, 510, 511] {
assert_eq!(
ErrorKind::from_http_status(code),
ErrorKind::PermanentProvider,
"code {code}"
);
}
assert_eq!(
ErrorKind::from_http_status(418),
ErrorKind::PermanentProvider
);
}
#[test]
fn from_http_status_408_is_retryable_timeout() {
let kind = ErrorKind::from_http_status(408);
assert_eq!(kind, ErrorKind::Timeout);
let err = TransportError::new(kind, "request timeout");
assert!(err.is_retryable());
assert!(err.is_timeout());
}
#[test]
fn from_http_status_terminal_5xx_is_not_retryable() {
for code in [501u16, 505, 510, 511] {
let kind = ErrorKind::from_http_status(code);
let err = TransportError::new(kind, "terminal");
assert!(!err.is_retryable(), "{code} must not be retryable");
assert!(err.is_terminal(), "{code} must be terminal");
}
}
}