1use shm::{SharedMem, SharedMemMap, Access as SharedMemAccess};
2
3use std::{io, fs, slice};
4use std::ops::Deref;
5use std::io::Read;
6
7use serde::{Serialize, Deserialize, Serializer, Deserializer};
8
9#[derive(Debug)]
11pub struct SendableFile<B = fs::File>(pub B) where
12 B: ::std::borrow::Borrow<fs::File>;
13
14#[derive(Debug)]
25pub struct SendableSocket<S>(pub S);
26
27#[derive(Serialize, Deserialize, Debug)]
30pub enum SendableDataSource {
31 File(#[serde(with = "sendable_data_source_file")] fs::File),
32 Memory(SharedMem),
33 Inline(Vec<u8>),
34}
35
36impl From<fs::File> for SendableFile {
37 fn from(file: fs::File) -> SendableFile {
38 SendableFile(file)
39 }
40}
41
42impl From<SendableFile> for fs::File {
43 fn from(file: SendableFile) -> fs::File {
44 file.0
45 }
46}
47
48mod sendable_data_source_file {
49 use super::*;
50
51 pub fn serialize<S>(f: &fs::File, serializer: S) -> Result<S::Ok, S::Error> where
52 S: Serializer,
53 {
54 Serialize::serialize(&SendableFile(f), serializer)
55 }
56
57 pub fn deserialize<'de, D>(deserializer: D) -> Result<fs::File, D::Error> where
58 D: Deserializer<'de>,
59 {
60 let file: SendableFile = Deserialize::deserialize(deserializer)?;
61 Ok(file.0)
62 }
63}
64
65pub struct SendableDataSourceReader(_SendableDataSourceReader);
66
67enum _SendableDataSourceReader {
68 File(fs::File),
69 Memory(SharedMemMap, io::Cursor<&'static [u8]>),
70 Inline(io::Cursor<Vec<u8>>),
71}
72
73pub struct SendableDataSourceBytes(_SendableDataSourceBytes);
74
75enum _SendableDataSourceBytes {
76 Vec(Vec<u8>),
77 Shared(SharedMemMap),
78}
79
80impl SendableDataSource {
81 pub fn to_read(self) -> io::Result<SendableDataSourceReader> {
83 Ok(SendableDataSourceReader(match self {
84 SendableDataSource::File(file) => {
85 _SendableDataSourceReader::File(file)
86 },
87 SendableDataSource::Memory(shared_mem) => {
88 unsafe {
89 let map = shared_mem.map(.., SharedMemAccess::Read)?;
90 let slice = ::std::slice::from_raw_parts(map.pointer() as _, map.len());
91 _SendableDataSourceReader::Memory(map, io::Cursor::new(slice))
92 }
93 },
94 SendableDataSource::Inline(bytes) => {
95 _SendableDataSourceReader::Inline(io::Cursor::new(bytes))
96 }
97 }))
98 }
99
100 pub fn to_bytes(self) -> io::Result<SendableDataSourceBytes> {
105 Ok(SendableDataSourceBytes(match self {
106 SendableDataSource::File(mut file) => {
107 let mut buffer = Vec::new();
108 file.read_to_end(&mut buffer)?;
109 _SendableDataSourceBytes::Vec(buffer)
110 },
111 SendableDataSource::Memory(shared_mem) => {
112 _SendableDataSourceBytes::Shared(shared_mem.map(.., SharedMemAccess::Read)?)
113 },
114 SendableDataSource::Inline(bytes) => {
115 _SendableDataSourceBytes::Vec(bytes)
116 }
117 }))
118 }
119}
120
121impl From<Vec<u8>> for SendableDataSource {
122 fn from(bytes: Vec<u8>) -> Self {
123 SendableDataSource::Inline(bytes)
124 }
125}
126
127impl From<fs::File> for SendableDataSource {
128 fn from(file: fs::File) -> Self {
129 SendableDataSource::File(file)
130 }
131}
132
133impl io::Read for SendableDataSourceReader {
134 fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
135 match self.0 {
136 _SendableDataSourceReader::File(ref mut file) => file.read(buffer),
137 _SendableDataSourceReader::Memory(_, ref mut cursor) => cursor.read(buffer),
138 _SendableDataSourceReader::Inline(ref mut cursor) => cursor.read(buffer),
139 }
140 }
141}
142
143impl Deref for SendableDataSourceBytes {
144 type Target = [u8];
145
146 fn deref(&self) -> &[u8] {
147 match self.0 {
148 _SendableDataSourceBytes::Vec(ref bytes) => bytes,
149 _SendableDataSourceBytes::Shared(ref map) => unsafe {
150 slice::from_raw_parts(map.pointer(), map.len())
151 },
152 }
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use ::MessageChannel;
160
161 use std::{fs, env};
162 use std::io::{Write, Read, Seek, SeekFrom};
163
164 use futures::{Sink, Stream};
165 use tokio::runtime::Runtime;
166
167 #[test]
168 fn send_file_same_process() {
169 let mut runtime = Runtime::new().unwrap();
170 let (a, b) = MessageChannel::<SendableFile, SendableFile>::pair(runtime.reactor(), 8192).unwrap();
171
172 let mut file = fs::OpenOptions::new().read(true).write(true).create(true).truncate(true)
173 .open(env::temp_dir().join("some_test_file.txt")).unwrap();
174 write!(file, "hello").unwrap();
175 file.flush().unwrap();
176
177 let _a = runtime.block_on(a.send(SendableFile(file))).unwrap();
178 let (message, _b) = runtime.block_on(b.into_future()).map_err(|(err, _)| err).unwrap();
179 let SendableFile(mut file) = message.unwrap();
180
181 file.seek(SeekFrom::Start(0)).unwrap();
182 let mut buffer = String::new();
183 file.read_to_string(&mut buffer).unwrap();
184
185 assert_eq!("hello", buffer);
186 }
187}