use std::io as std_io;
use std::mem::replace;
use bytes::Bytes;
use futures::future::{self, Either, Future};
use futures::stream::Stream;
use futures::{Async, IntoFuture, Poll};
use vec1::Vec1;
use crate::{
chain::{chain, HandleErrorInChain, OnError},
command::{self, params_with_smtputf8},
common::SetupTls,
connect::ConnectionConfig,
data_types::{ForwardPath, ReversePath},
error::{GeneralError, LogicError, MissingCapabilities},
{Cmd, Connection},
};
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum EncodingRequirement {
None,
Smtputf8,
Mime8bit,
}
#[derive(Debug, Clone)]
pub struct Mail {
encoding_requirement: EncodingRequirement,
mail: Bytes,
}
impl Mail {
pub fn new(encoding_requirement: EncodingRequirement, buffer: impl Into<Bytes>) -> Self {
Mail {
encoding_requirement,
mail: buffer.into(),
}
}
pub fn needs_smtputf8(&self) -> bool {
self.encoding_requirement == EncodingRequirement::Smtputf8
}
pub fn encoding_requirement(&self) -> EncodingRequirement {
self.encoding_requirement
}
pub fn raw_data(&self) -> &[u8] {
self.mail.as_ref()
}
pub fn into_raw_data(self) -> Bytes {
self.mail
}
}
#[derive(Debug, Clone)]
pub struct EnvelopData {
pub from: Option<MailAddress>,
pub to: Vec1<MailAddress>,
}
impl EnvelopData {
pub fn needs_smtputf8(&self) -> bool {
self.from
.as_ref()
.map(|f| f.needs_smtputf8())
.unwrap_or(false)
|| self.to.iter().any(|to| to.needs_smtputf8())
}
}
#[derive(Debug, Clone)]
pub struct MailEnvelop {
envelop_data: EnvelopData,
mail: Mail,
}
impl MailEnvelop {
pub fn new(from: MailAddress, to: Vec1<MailAddress>, mail: Mail) -> Self {
MailEnvelop {
envelop_data: EnvelopData {
from: Some(from),
to,
},
mail,
}
}
pub fn without_reverse_path(to: Vec1<MailAddress>, mail: Mail) -> Self {
MailEnvelop {
envelop_data: EnvelopData { from: None, to },
mail,
}
}
pub fn from_address(&self) -> Option<&MailAddress> {
self.envelop_data.from.as_ref()
}
pub fn to_address(&self) -> &Vec1<MailAddress> {
&self.envelop_data.to
}
pub fn mail(&self) -> &Mail {
&self.mail
}
pub fn needs_smtputf8(&self) -> bool {
self.envelop_data.needs_smtputf8() || self.mail.needs_smtputf8()
}
}
impl From<(Mail, EnvelopData)> for MailEnvelop {
fn from((mail, envelop_data): (Mail, EnvelopData)) -> Self {
MailEnvelop { envelop_data, mail }
}
}
impl From<MailEnvelop> for (Mail, EnvelopData) {
fn from(me: MailEnvelop) -> Self {
let MailEnvelop { mail, envelop_data } = me;
(mail, envelop_data)
}
}
#[derive(Debug, Clone)]
pub struct MailAddress {
raw: String,
needs_smtputf8: bool,
}
impl MailAddress {
pub fn new_unchecked(raw_email: String, needs_smtputf8: bool) -> Self {
MailAddress {
raw: raw_email,
needs_smtputf8,
}
}
pub fn from_unchecked<I>(raw: I) -> Self
where
I: Into<String> + AsRef<str>,
{
let has_utf8 = raw.as_ref().bytes().any(|b| b >= 0x80);
MailAddress {
raw: raw.into(),
needs_smtputf8: has_utf8,
}
}
pub fn needs_smtputf8(&self) -> bool {
self.needs_smtputf8
}
pub fn as_str(&self) -> &str {
&self.raw
}
}
impl AsRef<str> for MailAddress {
fn as_ref(&self) -> &str {
self.as_str()
}
}
impl Into<String> for MailAddress {
fn into(self) -> String {
self.raw
}
}
impl From<MailAddress> for ReversePath {
fn from(addr: MailAddress) -> ReversePath {
ReversePath::from_unchecked(addr.raw)
}
}
impl From<MailAddress> for ForwardPath {
fn from(addr: MailAddress) -> ForwardPath {
ForwardPath::from_unchecked(addr.raw)
}
}
pub type MailSendResult = Result<(), (usize, LogicError)>;
pub type MailSendFuture =
Box<dyn Future<Item = (Connection, MailSendResult), Error = std_io::Error> + Send>;
pub fn send_mail<H>(
con: Connection,
envelop: MailEnvelop,
on_error: H,
) -> impl Future<Item = (Connection, MailSendResult), Error = std_io::Error> + Send
where
H: HandleErrorInChain,
{
let use_smtputf8 = envelop.needs_smtputf8();
let (mail, EnvelopData { from, to: tos }) = envelop.into();
let check_mime_8bit_support =
!use_smtputf8 && mail.encoding_requirement() == EncodingRequirement::Mime8bit;
if (use_smtputf8 && !con.has_capability("SMTPUTF8"))
|| (check_mime_8bit_support && !con.has_capability("8BITMIME"))
{
return Either::B(future::ok((
con,
Err((
0,
MissingCapabilities::new_from_unchecked("SMTPUTF8").into(),
)),
)));
}
let reverse_path = from
.map(ReversePath::from)
.unwrap_or_else(|| ReversePath::from_unchecked(""));
let mut mail_params = Default::default();
if use_smtputf8 {
mail_params = params_with_smtputf8(mail_params);
}
let mut cmd_chain = vec![command::Mail {
reverse_path,
params: mail_params,
}
.boxed()];
for to in tos.into_iter() {
cmd_chain.push(command::Recipient::new(to.into()).boxed());
}
cmd_chain.push(command::Data::from_buf(mail.into_raw_data()).boxed());
Either::A(chain(con, cmd_chain, on_error))
}
impl Connection {
pub fn send_mail(
self,
envelop: MailEnvelop,
) -> impl Future<Item = (Connection, MailSendResult), Error = std_io::Error> + Send {
send_mail(self, envelop, OnError::StopAndReset)
}
pub fn send_all_mails<E, M>(
con: Connection,
mails: M,
) -> SendAllMails<M>
where
E: From<GeneralError>,
M: Iterator<Item = Result<MailEnvelop, E>>,
{
SendAllMails::new(con, mails)
}
pub fn connect_send_quit<A, E, I, T>(
config: ConnectionConfig<A, T>,
mails: I,
) -> impl Stream<Item = (), Error = E>
where
A: Cmd,
E: From<GeneralError>,
I: IntoIterator<Item = Result<MailEnvelop, E>>,
T: SetupTls,
{
let fut = Connection::connect(config)
.then(|res| match res {
Err(err) => Err(E::from(GeneralError::from(err))),
Ok(con) => Ok(SendAllMails::new(con, mails).quit_on_completion()),
})
.flatten_stream();
fut
}
}
pub struct SendAllMails<I> {
mails: I,
con: Option<Connection>,
pending:
Option<Box<dyn Future<Item = (Connection, MailSendResult), Error = std_io::Error> + Send>>,
}
impl<I, E> SendAllMails<I>
where
I: Iterator<Item = Result<MailEnvelop, E>>,
E: From<GeneralError>,
{
pub fn new<V>(con: Connection, mails: V) -> Self
where
V: IntoIterator<IntoIter = I, Item = Result<MailEnvelop, E>>,
{
SendAllMails {
mails: mails.into_iter(),
con: Some(con),
pending: None,
}
}
pub fn take_connection(&mut self) -> Option<Connection> {
self.con.take()
}
pub fn set_connection(&mut self, con: Connection) -> Option<Connection> {
::std::mem::replace(&mut self.con, Some(con))
}
pub fn is_pending(&self) -> bool {
self.pending.is_some()
}
pub fn quit_on_completion(self) -> impl Stream<Item = (), Error = E> {
OnCompletion::new(self, |stream| {
if let Some(con) = stream.take_connection() {
Either::A(con.quit().then(|_| Ok(())))
} else {
Either::B(future::ok(()))
}
})
}
pub fn on_completion<F, ITF>(self, func: F) -> impl Stream<Item = (), Error = E>
where
F: FnOnce(Option<Connection>) -> ITF,
ITF: IntoFuture<Item = (), Error = ()>,
{
OnCompletion::new(self, |stream| {
let opt_con = stream.take_connection();
func(opt_con)
})
}
}
impl<I, E> Stream for SendAllMails<I>
where
I: Iterator<Item = Result<MailEnvelop, E>>,
E: From<GeneralError>,
{
type Item = ();
type Error = E;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
if let Some(mut pending) = self.pending.take() {
return match pending.poll() {
Ok(Async::NotReady) => {
self.pending = Some(pending);
Ok(Async::NotReady)
}
Ok(Async::Ready((con, result))) => {
self.con = Some(con);
match result {
Ok(()) => Ok(Async::Ready(Some(()))),
Err((_idx, err)) => Err(E::from(GeneralError::from(err))),
}
}
Err(io_error) => Err(E::from(GeneralError::from(io_error))),
};
}
return match self.mails.next() {
None => Ok(Async::Ready(None)),
Some(Ok(mail)) => {
if let Some(con) = self.con.take() {
self.pending = Some(Box::new(con.send_mail(mail)));
continue;
} else {
Err(E::from(GeneralError::Io(std_io::Error::new(
std_io::ErrorKind::NotConnected,
"previous error killed connection",
))))
}
}
Some(Err(err)) => Err(err),
};
}
}
}
pub struct OnCompletion<S, F, UF> {
stream: S,
state: CompletionState<F, UF>,
}
enum CompletionState<F, U> {
Done,
Ready(F),
Pending(U),
}
impl<F, U> CompletionState<F, U> {
fn take_func(&mut self) -> Option<F> {
use self::CompletionState::*;
let me = replace(self, CompletionState::Done);
match me {
Done => None,
Ready(func) => Some(func),
Pending(_) => panic!("[BUG] take func in pending state"),
}
}
}
impl<S, F, U> OnCompletion<S, F, U::Future>
where
S: Stream,
F: FnOnce(&mut S) -> U,
U: IntoFuture<Item = (), Error = ()>,
{
pub fn new(stream: S, func: F) -> Self {
OnCompletion {
stream,
state: CompletionState::Ready(func), }
}
}
impl<S, F, U> Stream for OnCompletion<S, F, U::Future>
where
S: Stream,
F: FnOnce(&mut S) -> U,
U: IntoFuture,
{
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
let is_done = if let CompletionState::Pending(fut) = &mut self.state {
if let Ok(Async::NotReady) = fut.poll() {
return Ok(Async::NotReady);
} else {
true
}
} else {
false
};
if is_done {
self.state = CompletionState::Done;
return Ok(Async::Ready(None));
}
let next = try_ready!(self.stream.poll());
if let Some(next) = next {
return Ok(Async::Ready(Some(next)));
} else if let Some(func) = self.state.take_func() {
let fut = func(&mut self.stream).into_future();
self.state = CompletionState::Pending(fut);
continue;
} else {
return Ok(Async::Ready(None));
}
}
}
}
#[cfg(test)]
mod test {
use crate::{
command, error::GeneralError, send_mail::MailEnvelop, Connection, ConnectionConfig,
};
fn assert_send(_: &impl Send) {}
#[allow(unused)]
fn assert_send_in_send_out() {
let config: ConnectionConfig<command::Noop> = unimplemented!();
let mails: Vec<Result<MailEnvelop, GeneralError>> = unimplemented!();
assert_send(&mails);
let fut = Connection::connect_send_quit(config, mails);
assert_send(&fut);
}
}