use std::fmt::Display;
use std::time::Duration;
use async_std::net::{SocketAddr, ToSocketAddrs};
use async_std::pin::Pin;
use async_trait::async_trait;
use log::{debug, info};
use pin_project::pin_project;
use crate::smtp::authentication::{
Credentials, Mechanism, DEFAULT_ENCRYPTED_MECHANISMS, DEFAULT_UNENCRYPTED_MECHANISMS,
};
use crate::smtp::client::net::ClientTlsParameters;
#[cfg(feature = "socks5")]
use crate::smtp::client::net::NetworkStream;
use crate::smtp::client::InnerClient;
use crate::smtp::commands::*;
use crate::smtp::error::{Error, SmtpResult};
use crate::smtp::extension::{ClientId, Extension, MailBodyParameter, MailParameter, ServerInfo};
use crate::{SendableEmail, Transport};
pub const SMTP_PORT: u16 = 25;
pub const SUBMISSION_PORT: u16 = 587;
pub const SUBMISSIONS_PORT: u16 = 465;
#[derive(Debug)]
pub enum ClientSecurity {
None,
Opportunistic(ClientTlsParameters),
Required(ClientTlsParameters),
Wrapper(ClientTlsParameters),
}
#[derive(Clone, Debug, Copy)]
#[cfg_attr(
feature = "serde-impls",
derive(serde_derive::Serialize, serde_derive::Deserialize)
)]
pub enum ConnectionReuseParameters {
ReuseUnlimited,
ReuseLimited(u16),
NoReuse,
}
#[derive(Debug)]
#[allow(missing_debug_implementations)]
pub struct SmtpClient {
connection_reuse: ConnectionReuseParameters,
hello_name: ClientId,
credentials: Option<Credentials>,
server_addr: SocketAddr,
security: ClientSecurity,
smtp_utf8: bool,
authentication_mechanism: Option<Vec<Mechanism>>,
force_set_auth: bool,
timeout: Option<Duration>,
}
impl SmtpClient {
pub async fn with_security<A: ToSocketAddrs>(
addr: A,
security: ClientSecurity,
) -> Result<SmtpClient, Error> {
let mut addresses = addr.to_socket_addrs().await?;
match addresses.next() {
Some(addr) => Ok(SmtpClient {
server_addr: addr,
security,
smtp_utf8: false,
credentials: None,
connection_reuse: ConnectionReuseParameters::NoReuse,
hello_name: ClientId::hostname(),
authentication_mechanism: None,
force_set_auth: false,
timeout: Some(Duration::new(60, 0)),
}),
None => Err(Error::Resolution),
}
}
pub async fn new(domain: &str) -> Result<SmtpClient, Error> {
let tls = async_native_tls::TlsConnector::new();
let tls_parameters = ClientTlsParameters::new(domain.to_string(), tls);
SmtpClient::with_security(
(domain, SUBMISSIONS_PORT),
ClientSecurity::Wrapper(tls_parameters),
)
.await
}
pub async fn new_unencrypted_localhost() -> Result<SmtpClient, Error> {
SmtpClient::with_security(("localhost", SMTP_PORT), ClientSecurity::None).await
}
pub fn smtp_utf8(mut self, enabled: bool) -> SmtpClient {
self.smtp_utf8 = enabled;
self
}
pub fn hello_name(mut self, name: ClientId) -> SmtpClient {
self.hello_name = name;
self
}
pub fn connection_reuse(mut self, parameters: ConnectionReuseParameters) -> SmtpClient {
self.connection_reuse = parameters;
self
}
pub fn credentials<S: Into<Credentials>>(mut self, credentials: S) -> SmtpClient {
self.credentials = Some(credentials.into());
self
}
pub fn authentication_mechanism(mut self, mechanism: Vec<Mechanism>) -> SmtpClient {
self.authentication_mechanism = Some(mechanism);
self
}
pub fn force_set_auth(mut self, force: bool) -> SmtpClient {
self.force_set_auth = force;
self
}
pub fn timeout(mut self, timeout: Option<Duration>) -> SmtpClient {
self.timeout = timeout;
self
}
pub fn into_transport(self) -> SmtpTransport {
SmtpTransport::new(self)
}
fn get_accepted_mechanism(&self, encrypted: bool) -> &[Mechanism] {
match self.authentication_mechanism {
Some(ref mechanism) => mechanism,
None => {
if encrypted {
DEFAULT_ENCRYPTED_MECHANISMS
} else {
DEFAULT_UNENCRYPTED_MECHANISMS
}
}
}
}
}
#[derive(Debug)]
struct State {
pub panic: bool,
pub connection_reuse_count: u16,
}
#[pin_project]
#[allow(missing_debug_implementations)]
pub struct SmtpTransport {
server_info: Option<ServerInfo>,
state: State,
client_info: SmtpClient,
#[pin]
client: InnerClient,
}
macro_rules! try_smtp (
($err: expr, $client: ident) => ({
match $err {
Ok(val) => val,
Err(err) => {
if !$client.state.panic {
$client.state.panic = true;
$client.close().await?;
}
return Err(From::from(err))
},
}
})
);
impl<'a> SmtpTransport {
pub fn new(builder: SmtpClient) -> SmtpTransport {
SmtpTransport {
client: InnerClient::new(),
server_info: None,
client_info: builder,
state: State {
panic: false,
connection_reuse_count: 0,
},
}
}
pub fn is_connected(&self) -> bool {
self.client.is_connected()
}
async fn post_connect(&mut self) -> Result<(), Error> {
debug!("connection established to {}", self.client_info.server_addr);
self.ehlo().await?;
self.try_tls().await?;
if self.client_info.credentials.is_some() {
self.try_login().await?;
}
Ok(())
}
pub async fn connect(&mut self) -> Result<(), Error> {
if (self.state.connection_reuse_count > 0) && (!self.client.is_connected()) {
self.close().await?;
}
if self.state.connection_reuse_count > 0 {
debug!(
"connection already established to {}",
self.client_info.server_addr
);
return Ok(());
}
{
let mut client = Pin::new(&mut self.client);
client
.connect(
&self.client_info.server_addr,
self.client_info.timeout,
match self.client_info.security {
ClientSecurity::Wrapper(ref tls_parameters) => Some(tls_parameters),
_ => None,
},
)
.await?;
client.set_timeout(self.client_info.timeout);
let _response = client.read_response().await?;
}
self.post_connect().await
}
#[cfg(feature = "socks5")]
pub async fn connect_with_stream(&mut self, stream: NetworkStream) -> Result<(), Error> {
if (self.state.connection_reuse_count > 0) && (!self.client.is_connected()) {
self.close().await?;
}
if self.state.connection_reuse_count > 0 {
debug!(
"connection already established to {}",
self.client_info.server_addr
);
return Ok(());
}
{
let mut client = Pin::new(&mut self.client);
client
.connect_with_stream(stream)
.await?;
client.set_timeout(self.client_info.timeout);
let _response = client.read_response().await?;
}
self.post_connect().await
}
async fn try_login(&mut self) -> Result<(), Error> {
let client = Pin::new(&mut self.client);
let mut found = false;
if !self.client_info.force_set_auth {
let accepted_mechanisms = self
.client_info
.get_accepted_mechanism(client.is_encrypted());
if let Some(server_info) = &self.server_info {
if let Some(mechanism) = accepted_mechanisms
.iter()
.find(|mechanism| server_info.supports_auth_mechanism(**mechanism))
{
found = true;
if let Some(credentials) = &self.client_info.credentials {
try_smtp!(client.auth(*mechanism, credentials).await, self);
}
}
} else {
return Err(Error::NoServerInfo);
}
} else {
let mut client = Pin::new(&mut self.client);
if let Some(mechanisms) = self.client_info.authentication_mechanism.as_ref() {
for mechanism in mechanisms {
if let Some(credentials) = &self.client_info.credentials {
try_smtp!(client.as_mut().auth(*mechanism, credentials).await, self);
}
}
found = true;
} else {
debug!("force_set_auth set to true, but no authentication mechanism set");
}
}
if !found {
info!("No supported authentication mechanisms available");
}
Ok(())
}
async fn try_tls(&mut self) -> Result<(), Error> {
let server_info = self
.server_info
.as_ref()
.ok_or_else(|| Error::NoServerInfo)?;
match (
&self.client_info.security,
server_info.supports_feature(Extension::StartTls),
) {
(&ClientSecurity::Required(_), false) => {
Err(From::from("Could not encrypt connection, aborting"))
}
(&ClientSecurity::Opportunistic(_), false) => Ok(()),
(&ClientSecurity::None, _) => Ok(()),
(&ClientSecurity::Wrapper(_), _) => Ok(()),
(&ClientSecurity::Opportunistic(ref tls_parameters), true)
| (&ClientSecurity::Required(ref tls_parameters), true) => {
{
let client = Pin::new(&mut self.client);
try_smtp!(client.command(StarttlsCommand).await, self);
}
let client = std::mem::replace(&mut self.client, InnerClient::default());
let ssl_client = client.upgrade_tls_stream(tls_parameters).await?;
std::mem::replace(&mut self.client, ssl_client);
debug!("connection encrypted");
self.ehlo().await.map(|_| ())
}
}
}
pub async fn command<C: Display>(&mut self, command: C) -> SmtpResult {
let mut client = Pin::new(&mut self.client);
client.as_mut().command(command).await
}
async fn ehlo(&mut self) -> SmtpResult {
let ehlo_response = try_smtp!(
self.command(EhloCommand::new(ClientId::new(
self.client_info.hello_name.to_string()
)))
.await,
self
);
let server_info = try_smtp!(ServerInfo::from_response(&ehlo_response), self);
debug!("server {}", server_info);
self.server_info = Some(server_info);
Ok(ehlo_response)
}
pub async fn close(&mut self) -> Result<(), Error> {
let client = Pin::new(&mut self.client);
client.close().await?;
self.server_info = None;
self.state.panic = false;
self.state.connection_reuse_count = 0;
Ok(())
}
fn supports_feature(&self, keyword: Extension) -> bool {
self.server_info
.as_ref()
.map(|info| info.supports_feature(keyword))
.unwrap_or_default()
}
async fn connection_was_used(&mut self) -> Result<(), Error> {
match self.client_info.connection_reuse {
ConnectionReuseParameters::ReuseLimited(limit)
if self.state.connection_reuse_count >= limit =>
{
self.close().await?;
}
ConnectionReuseParameters::NoReuse => self.close().await?,
_ => (),
}
Ok(())
}
pub async fn connect_and_send(&mut self, email: SendableEmail) -> SmtpResult {
self.connect().await?;
self.send(email).await
}
}
#[async_trait]
impl<'a> Transport<'a> for SmtpTransport {
type Result = SmtpResult;
async fn send(&mut self, email: SendableEmail) -> SmtpResult {
let message_id = email.message_id().to_string();
let mut mail_options = vec![];
if self.supports_feature(Extension::EightBitMime) {
mail_options.push(MailParameter::Body(MailBodyParameter::EightBitMime));
}
if self.supports_feature(Extension::SmtpUtfEight) && self.client_info.smtp_utf8 {
mail_options.push(MailParameter::SmtpUtfEight);
}
let mut client = Pin::new(&mut self.client);
try_smtp!(
client
.as_mut()
.command(MailCommand::new(
email.envelope().from().cloned(),
mail_options,
))
.await,
self
);
for to_address in email.envelope().to() {
try_smtp!(
client
.as_mut()
.command(RcptCommand::new(to_address.clone(), vec![]))
.await,
self
);
debug!("{}: to=<{}>", message_id, to_address);
}
try_smtp!(client.as_mut().command(DataCommand).await, self);
let res = client.as_mut().message(email.message()).await;
if let Ok(result) = &res {
self.state.connection_reuse_count += 1;
debug!(
"{}: conn_use={}, status=sent ({})",
message_id,
self.state.connection_reuse_count,
result
.message
.iter()
.next()
.unwrap_or(&"no response".to_string())
);
}
self.connection_was_used().await?;
res
}
}