sandbox_ipc/
io.rs

1use shm::{SharedMem, SharedMemMap, Access as SharedMemAccess};
2
3use std::{io, fs, slice};
4use std::ops::Deref;
5use std::io::Read;
6
7use serde::{Serialize, Deserialize, Serializer, Deserializer};
8
9/// Wraps a normal `std::fs::File` so it may be transmitted to other processes.
10#[derive(Debug)]
11pub struct SendableFile<B = fs::File>(pub B) where
12    B: ::std::borrow::Borrow<fs::File>;
13
14/// Wraps normal sockets types (e.g. `std::net::TcpStream`) so they may be transmitted to
15/// other processes.
16/// 
17/// # Windows
18/// 
19/// Windows file/pipe handles or sockets that have been associated with an IOCP cannot be
20/// sent to other processes. Tokio is backed by an IOCP on Windows, so Tokio sockets may
21/// not be sent. Do note however that Tokio sockets allow you to `accept` standard blocking
22/// sockets, which *will* be elligible for transmission to other processes, where they can
23/// be added to a Tokio event loop.
24#[derive(Debug)]
25pub struct SendableSocket<S>(pub S);
26
27/// A source of data can be sent to other processes over a `MessageChannel` or similar
28/// mechanism. It may consist of a file handle, shared memory, or inline data.
29#[derive(Serialize, Deserialize, Debug)]
30pub enum SendableDataSource {
31    File(#[serde(with = "sendable_data_source_file")] fs::File),
32    Memory(SharedMem),
33    Inline(Vec<u8>),
34}
35
36impl From<fs::File> for SendableFile {
37    fn from(file: fs::File) -> SendableFile {
38        SendableFile(file)
39    }
40}
41
42impl From<SendableFile> for fs::File {
43    fn from(file: SendableFile) -> fs::File {
44        file.0
45    }
46}
47
48mod sendable_data_source_file {
49    use super::*;
50
51    pub fn serialize<S>(f: &fs::File, serializer: S) -> Result<S::Ok, S::Error> where
52        S: Serializer,
53    {
54        Serialize::serialize(&SendableFile(f), serializer)
55    }
56
57    pub fn deserialize<'de, D>(deserializer: D) -> Result<fs::File, D::Error> where
58        D: Deserializer<'de>,
59    {
60        let file: SendableFile = Deserialize::deserialize(deserializer)?;
61        Ok(file.0)
62    }
63}
64
65pub struct SendableDataSourceReader(_SendableDataSourceReader);
66
67enum _SendableDataSourceReader {
68    File(fs::File),
69    Memory(SharedMemMap, io::Cursor<&'static [u8]>),
70    Inline(io::Cursor<Vec<u8>>),
71}
72
73pub struct SendableDataSourceBytes(_SendableDataSourceBytes);
74
75enum _SendableDataSourceBytes {
76    Vec(Vec<u8>),
77    Shared(SharedMemMap),
78}
79
80impl SendableDataSource {
81    /// Converts the `SendableDataSource` into an appropriate `std::io::Read` implementation.
82    pub fn to_read(self) -> io::Result<SendableDataSourceReader> {
83        Ok(SendableDataSourceReader(match self {
84            SendableDataSource::File(file) => {
85                _SendableDataSourceReader::File(file)
86            },
87            SendableDataSource::Memory(shared_mem) => {
88                unsafe {
89                    let map = shared_mem.map(.., SharedMemAccess::Read)?;
90                    let slice = ::std::slice::from_raw_parts(map.pointer() as _, map.len());
91                    _SendableDataSourceReader::Memory(map, io::Cursor::new(slice))
92                }
93            },
94            SendableDataSource::Inline(bytes) => {
95                _SendableDataSourceReader::Inline(io::Cursor::new(bytes))
96            }
97        }))
98    }
99
100    /// Converts the `SendableDataSource` into a in-memory byte array.
101    /// 
102    /// If the data source is a file, this will read the entirety of it to memory at once. Use `to_read`
103    /// if you need only streaming access to the data.
104    pub fn to_bytes(self) -> io::Result<SendableDataSourceBytes> {
105        Ok(SendableDataSourceBytes(match self {
106            SendableDataSource::File(mut file) => {
107                let mut buffer = Vec::new();
108                file.read_to_end(&mut buffer)?;
109                _SendableDataSourceBytes::Vec(buffer)
110            },
111            SendableDataSource::Memory(shared_mem) => {
112                _SendableDataSourceBytes::Shared(shared_mem.map(.., SharedMemAccess::Read)?)
113            },
114            SendableDataSource::Inline(bytes) => {
115                _SendableDataSourceBytes::Vec(bytes)
116            }
117        }))
118    }
119}
120
121impl From<Vec<u8>> for SendableDataSource {
122    fn from(bytes: Vec<u8>) -> Self {
123        SendableDataSource::Inline(bytes)
124    }
125}
126
127impl From<fs::File> for SendableDataSource {
128    fn from(file: fs::File) -> Self {
129        SendableDataSource::File(file)
130    }
131}
132
133impl io::Read for SendableDataSourceReader {
134    fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
135        match self.0 {
136            _SendableDataSourceReader::File(ref mut file) => file.read(buffer),
137            _SendableDataSourceReader::Memory(_, ref mut cursor) => cursor.read(buffer),
138            _SendableDataSourceReader::Inline(ref mut cursor) => cursor.read(buffer),
139        }
140    }
141}
142
143impl Deref for SendableDataSourceBytes {
144    type Target = [u8];
145
146    fn deref(&self) -> &[u8] {
147        match self.0 {
148            _SendableDataSourceBytes::Vec(ref bytes) => bytes,
149            _SendableDataSourceBytes::Shared(ref map) => unsafe {
150                slice::from_raw_parts(map.pointer(), map.len())
151            },
152        }
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use ::MessageChannel;
160
161    use std::{fs, env};
162    use std::io::{Write, Read, Seek, SeekFrom};
163
164    use futures::{Sink, Stream};
165    use tokio::runtime::Runtime;
166
167    #[test]
168    fn send_file_same_process() {
169        let mut runtime = Runtime::new().unwrap();
170        let (a, b) = MessageChannel::<SendableFile, SendableFile>::pair(runtime.reactor(), 8192).unwrap();
171
172        let mut file = fs::OpenOptions::new().read(true).write(true).create(true).truncate(true)
173            .open(env::temp_dir().join("some_test_file.txt")).unwrap();
174        write!(file, "hello").unwrap();
175        file.flush().unwrap();
176
177        let _a = runtime.block_on(a.send(SendableFile(file))).unwrap();
178        let (message, _b) = runtime.block_on(b.into_future()).map_err(|(err, _)| err).unwrap();
179        let SendableFile(mut file) = message.unwrap();
180
181        file.seek(SeekFrom::Start(0)).unwrap();
182        let mut buffer = String::new();
183        file.read_to_string(&mut buffer).unwrap();
184
185        assert_eq!("hello", buffer);
186    }
187}