samotop_delivery/sendmail/
mod.rs1mod error;
5pub use self::error::*;
6use crate::{sendmail::error::Error, SyncFuture};
7use crate::{Envelope, MailDataStream, Transport};
8use async_std::task;
9use samotop_core::common::*;
10use std::ops::DerefMut;
11use std::process::{Child, Command, Stdio};
12use std::{convert::AsRef, fmt};
13
14#[derive(Debug, Default)]
16#[cfg_attr(
17 feature = "serde-impls",
18 derive(serde_derive::Serialize, serde_derive::Deserialize)
19)]
20pub struct SendmailTransport {
21 command: String,
22}
23
24impl SendmailTransport {
25 pub fn new() -> SendmailTransport {
27 SendmailTransport {
28 command: "/usr/sbin/sendmail".to_string(),
29 }
30 }
31
32 pub fn new_with_command<S: Into<String>>(command: S) -> SendmailTransport {
34 SendmailTransport {
35 command: command.into(),
36 }
37 }
38}
39
40impl Transport for SendmailTransport {
41 type DataStream = ProcStream;
42 type Error = Error;
43 fn send_stream<'s, 'a>(
44 &'s self,
45 envelope: Envelope,
46 ) -> SyncFuture<std::result::Result<ProcStream, Error>>
47 where
48 's: 'a,
49 {
50 let command = self.command.clone();
51 let message_id = envelope.message_id().to_string();
52
53 let from = envelope
54 .from()
55 .map(AsRef::as_ref)
56 .unwrap_or("\"\"")
57 .to_owned();
58 let to = envelope.to().to_owned();
59
60 Box::pin(async move {
61 Ok(ProcStream::Ready(ProcStreamInner {
62 child: Command::new(command)
63 .arg("-i")
64 .arg("-f")
65 .arg(from)
66 .args(to)
67 .stdin(Stdio::piped())
68 .stdout(Stdio::piped())
69 .spawn()
70 .map_err(Error::Io)?,
71 message_id,
72 }))
73 })
74 }
75}
76
77#[allow(missing_debug_implementations)]
78pub enum ProcStream {
79 Busy,
80 Ready(ProcStreamInner),
81 Closing(Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + Sync>>),
82 Done,
83}
84
85#[derive(Debug)]
86pub struct ProcStreamInner {
87 child: Child,
88 message_id: String,
89}
90
91impl MailDataStream for ProcStream {
92 fn is_done(&self) -> bool {
93 matches!(self, ProcStream::Done)
94 }
95}
96
97impl io::Write for ProcStream {
99 fn poll_write(
100 mut self: Pin<&mut Self>,
101 cx: &mut Context<'_>,
102 buf: &[u8],
103 ) -> Poll<std::io::Result<usize>> {
104 loop {
105 break match self.deref_mut() {
106 ProcStream::Ready(ref mut inner) => {
107 use std::io::Write;
108 let len = inner.child.stdin.as_mut().ok_or_else(broken)?.write(buf)?;
109 Poll::Ready(Ok(len))
110 }
111 mut otherwise => {
112 ready!(Pin::new(&mut otherwise).poll_flush(cx))?;
113 continue;
114 }
115 };
116 }
117 }
118 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
119 loop {
120 break match self.deref_mut() {
121 ProcStream::Ready(ref mut inner) => {
122 use std::io::Write;
123 inner.child.stdin.as_mut().ok_or_else(broken)?.flush()?;
124 Poll::Ready(Ok(()))
125 }
126 ProcStream::Closing(ref mut fut) => {
127 ready!(fut.as_mut().poll(cx))?;
128 *self = ProcStream::Done;
129 continue;
130 }
131 ProcStream::Done => Poll::Ready(Ok(())),
132 ProcStream::Busy => Poll::Ready(Err(broken())),
133 };
134 }
135 }
136 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
137 loop {
138 break match std::mem::replace(self.deref_mut(), ProcStream::Busy) {
139 ProcStream::Ready(ProcStreamInner { child, message_id }) => {
140 let fut = async move {
141 let output = task::spawn_blocking(move || child.wait_with_output()).await?;
142
143 info!("Wrote {} message to stdin", message_id);
144
145 if output.status.success() {
146 Ok(())
147 } else {
148 Err(std::io::Error::new(
149 std::io::ErrorKind::Other,
150 String::from_utf8_lossy(output.stderr.as_slice()),
151 ))
152 }
153 };
154 *self = ProcStream::Closing(Box::pin(fut));
155 continue;
156 }
157 otherwise @ ProcStream::Closing(_) => {
158 *self = otherwise;
159 ready!(Pin::new(&mut self).poll_flush(cx))?;
160 continue;
161 }
162 otherwise => {
163 *self = otherwise;
164 ready!(Pin::new(&mut self).poll_flush(cx))?;
165 Poll::Ready(Ok(()))
166 }
167 };
168 }
169 }
170}
171
172fn broken() -> std::io::Error {
173 std::io::Error::from(std::io::ErrorKind::NotConnected)
174}
175
176impl fmt::Debug for ProcStream {
177 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178 match self {
179 ProcStream::Busy => f.debug_tuple("Busy").finish(),
180 ProcStream::Done => f.debug_tuple("Done").finish(),
181 ProcStream::Closing(_) => f.debug_tuple("Closing").field(&"*").finish(),
182 ProcStream::Ready(ref r) => f.debug_tuple("Ready").field(&r).finish(),
183 }
184 }
185}