1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
mod error;
pub use self::error::*;
use crate::Envelope;
use crate::MailDataStream;
use crate::Transport;
use crate::{
file::error::{Error, FileResult},
SyncFuture,
};
use async_std::fs::File;
use async_std::io::Write;
use async_std::path::Path;
use futures::io::AsyncWriteExt;
use futures::ready;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Debug)]
#[cfg_attr(
feature = "serde-impls",
derive(serde_derive::Serialize, serde_derive::Deserialize)
)]
pub struct FileTransport {
path: PathBuf,
}
impl FileTransport {
pub fn new<P: AsRef<Path>>(path: P) -> FileTransport {
FileTransport {
path: PathBuf::from(path.as_ref()),
}
}
}
#[derive(PartialEq, Eq, Clone, Debug)]
#[cfg_attr(
feature = "serde-impls",
derive(serde_derive::Serialize, serde_derive::Deserialize)
)]
struct SerializableEmail {
envelope: Envelope,
}
impl Transport for FileTransport {
type DataStream = FileStream;
fn send_stream<'s, 'a>(
&'s self,
envelope: Envelope,
) -> SyncFuture<'a, Result<FileStream, Error>>
where
's: 'a,
{
let mut file = self.path.clone();
file.push(format!("{}.json", envelope.message_id()));
Box::pin(async move {
let mut serialized = serde_json::to_string(&SerializableEmail { envelope })?;
serialized += "\n";
let mut file = File::create(file).await?;
file.write_all(serialized.as_bytes()).await?;
Ok(FileStream {
file,
closed: false,
})
})
}
}
#[derive(Debug)]
pub struct FileStream {
file: File,
closed: bool,
}
impl MailDataStream for FileStream {
type Output = ();
type Error = Error;
fn result(&self) -> FileResult {
if self.closed {
Ok(())
} else {
Err(Error::Client("file was not closed properly"))
}
}
}
impl Write for FileStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::result::Result<usize, std::io::Error>> {
Pin::new(&mut self.file).poll_write(cx, buf)
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), std::io::Error>> {
Pin::new(&mut self.file).poll_flush(cx)
}
fn poll_close(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), std::io::Error>> {
ready!(Pin::new(&mut self.file).poll_close(cx)?);
self.closed = true;
Poll::Ready(Ok(()))
}
}