samotop_delivery/sendmail/
mod.rs

1//! The sendmail transport sends the envelope using the local sendmail command.
2//!
3
4mod error;
5pub use self::error::*;
6use crate::{sendmail::error::Error, SyncFuture};
7use crate::{Envelope, MailDataStream, Transport};
8use async_std::task;
9use samotop_core::common::*;
10use std::ops::DerefMut;
11use std::process::{Child, Command, Stdio};
12use std::{convert::AsRef, fmt};
13
14/// Sends an envelope using the `sendmail` command
15#[derive(Debug, Default)]
16#[cfg_attr(
17    feature = "serde-impls",
18    derive(serde_derive::Serialize, serde_derive::Deserialize)
19)]
20pub struct SendmailTransport {
21    command: String,
22}
23
24impl SendmailTransport {
25    /// Creates a new transport with the default `/usr/sbin/sendmail` command
26    pub fn new() -> SendmailTransport {
27        SendmailTransport {
28            command: "/usr/sbin/sendmail".to_string(),
29        }
30    }
31
32    /// Creates a new transport to the given sendmail command
33    pub fn new_with_command<S: Into<String>>(command: S) -> SendmailTransport {
34        SendmailTransport {
35            command: command.into(),
36        }
37    }
38}
39
40impl Transport for SendmailTransport {
41    type DataStream = ProcStream;
42    type Error = Error;
43    fn send_stream<'s, 'a>(
44        &'s self,
45        envelope: Envelope,
46    ) -> SyncFuture<std::result::Result<ProcStream, Error>>
47    where
48        's: 'a,
49    {
50        let command = self.command.clone();
51        let message_id = envelope.message_id().to_string();
52
53        let from = envelope
54            .from()
55            .map(AsRef::as_ref)
56            .unwrap_or("\"\"")
57            .to_owned();
58        let to = envelope.to().to_owned();
59
60        Box::pin(async move {
61            Ok(ProcStream::Ready(ProcStreamInner {
62                child: Command::new(command)
63                    .arg("-i")
64                    .arg("-f")
65                    .arg(from)
66                    .args(to)
67                    .stdin(Stdio::piped())
68                    .stdout(Stdio::piped())
69                    .spawn()
70                    .map_err(Error::Io)?,
71                message_id,
72            }))
73        })
74    }
75}
76
77#[allow(missing_debug_implementations)]
78pub enum ProcStream {
79    Busy,
80    Ready(ProcStreamInner),
81    Closing(Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + Sync>>),
82    Done,
83}
84
85#[derive(Debug)]
86pub struct ProcStreamInner {
87    child: Child,
88    message_id: String,
89}
90
91impl MailDataStream for ProcStream {
92    fn is_done(&self) -> bool {
93        matches!(self, ProcStream::Done)
94    }
95}
96
97/// Todo: async when available
98impl io::Write for ProcStream {
99    fn poll_write(
100        mut self: Pin<&mut Self>,
101        cx: &mut Context<'_>,
102        buf: &[u8],
103    ) -> Poll<std::io::Result<usize>> {
104        loop {
105            break match self.deref_mut() {
106                ProcStream::Ready(ref mut inner) => {
107                    use std::io::Write;
108                    let len = inner.child.stdin.as_mut().ok_or_else(broken)?.write(buf)?;
109                    Poll::Ready(Ok(len))
110                }
111                mut otherwise => {
112                    ready!(Pin::new(&mut otherwise).poll_flush(cx))?;
113                    continue;
114                }
115            };
116        }
117    }
118    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
119        loop {
120            break match self.deref_mut() {
121                ProcStream::Ready(ref mut inner) => {
122                    use std::io::Write;
123                    inner.child.stdin.as_mut().ok_or_else(broken)?.flush()?;
124                    Poll::Ready(Ok(()))
125                }
126                ProcStream::Closing(ref mut fut) => {
127                    ready!(fut.as_mut().poll(cx))?;
128                    *self = ProcStream::Done;
129                    continue;
130                }
131                ProcStream::Done => Poll::Ready(Ok(())),
132                ProcStream::Busy => Poll::Ready(Err(broken())),
133            };
134        }
135    }
136    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
137        loop {
138            break match std::mem::replace(self.deref_mut(), ProcStream::Busy) {
139                ProcStream::Ready(ProcStreamInner { child, message_id }) => {
140                    let fut = async move {
141                        let output = task::spawn_blocking(move || child.wait_with_output()).await?;
142
143                        info!("Wrote {} message to stdin", message_id);
144
145                        if output.status.success() {
146                            Ok(())
147                        } else {
148                            Err(std::io::Error::new(
149                                std::io::ErrorKind::Other,
150                                String::from_utf8_lossy(output.stderr.as_slice()),
151                            ))
152                        }
153                    };
154                    *self = ProcStream::Closing(Box::pin(fut));
155                    continue;
156                }
157                otherwise @ ProcStream::Closing(_) => {
158                    *self = otherwise;
159                    ready!(Pin::new(&mut self).poll_flush(cx))?;
160                    continue;
161                }
162                otherwise => {
163                    *self = otherwise;
164                    ready!(Pin::new(&mut self).poll_flush(cx))?;
165                    Poll::Ready(Ok(()))
166                }
167            };
168        }
169    }
170}
171
172fn broken() -> std::io::Error {
173    std::io::Error::from(std::io::ErrorKind::NotConnected)
174}
175
176impl fmt::Debug for ProcStream {
177    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178        match self {
179            ProcStream::Busy => f.debug_tuple("Busy").finish(),
180            ProcStream::Done => f.debug_tuple("Done").finish(),
181            ProcStream::Closing(_) => f.debug_tuple("Closing").field(&"*").finish(),
182            ProcStream::Ready(ref r) => f.debug_tuple("Ready").field(&r).finish(),
183        }
184    }
185}