1use super::*;
2use ser::BincodeDatagram;
3
4use std::{io, process};
5use std::marker::PhantomData;
6use std::time::Duration;
7use std::ffi::OsStr;
8
9use serde::{Serialize, Deserialize};
10use futures::{Stream, Sink, Poll, StartSend};
11use tokio::reactor::{Handle as TokioHandle};
12use tokio::io::{AsyncRead, AsyncWrite};
13
14pub struct MessageChannel<T, R> where
26 T: Serialize,
27 R: for<'de> Deserialize<'de>
28{
29 inner: BincodeDatagram<platform::MessageChannel, T, R, platform::ChannelSerializeWrapper>,
30 #[allow(unused)]
31 max_message_size: usize,
32}
33
34#[derive(Serialize, Deserialize, Debug)]
39pub struct ChildMessageChannel {
40 pub(crate) inner: platform::ChildMessageChannel,
41 max_message_size: usize,
42}
43
44impl<T, R> MessageChannel<T, R> where
45 T: Serialize,
46 R: for<'de> Deserialize<'de>
47{
48 pub fn pair(tokio_handle: &TokioHandle, max_message_size: usize) -> io::Result<(Self, Self)> {
49 let (a, b) = platform::MessageChannel::pair(tokio_handle)?;
50
51 Ok((
52 MessageChannel { inner: BincodeDatagram::wrap(a, max_message_size), max_message_size },
53 MessageChannel { inner: BincodeDatagram::wrap(b, max_message_size), max_message_size },
54 ))
55 }
56
57 pub fn from_raw(channel: RawMessageChannel, max_message_size: usize) -> io::Result<Self> {
58 Ok(MessageChannel { inner: BincodeDatagram::wrap(channel.inner, max_message_size), max_message_size })
59 }
60
61 pub fn establish_with_child<F>(command: &mut process::Command, max_message_size: usize, tokio_loop: &TokioHandle, transmit_and_launch: F) -> io::Result<(Self, process::Child)> where
62 F: FnOnce(&mut process::Command, &ChildMessageChannel) -> io::Result<process::Child>
63 {
64 let (raw_channel, child) = platform::MessageChannel::establish_with_child(command, tokio_loop, |command, channel| {
65 let channel = ChildMessageChannel { inner: channel, max_message_size };
66 transmit_and_launch(command, &channel)
67 })?;
68
69 Ok((MessageChannel { inner: BincodeDatagram::wrap(raw_channel, max_message_size), max_message_size }, child))
70 }
71}
72
73impl ChildMessageChannel {
74 pub fn into_channel<T, R>(self, tokio_loop: &TokioHandle) -> io::Result<MessageChannel<T, R>> where
75 T: Serialize,
76 R: for<'de> Deserialize<'de>
77 {
78 let inner = self.inner.into_channel(tokio_loop)?;
79
80 Ok(MessageChannel {
81 inner: BincodeDatagram::wrap(inner, self.max_message_size),
82 max_message_size: self.max_message_size,
83 })
84 }
85}
86
87impl<T, R> Stream for MessageChannel<T, R> where
88 T: Serialize,
89 R: for<'de> Deserialize<'de>,
90{
91 type Item = R;
92 type Error = io::Error;
93
94 fn poll(&mut self) -> Poll<Option<R>, io::Error> {
95 self.inner.poll()
96 }
97}
98
99impl<T, R> Sink for MessageChannel<T, R> where
100 T: Serialize,
101 R: for<'de> Deserialize<'de>,
102{
103 type SinkItem = T;
104 type SinkError = io::Error;
105
106 fn start_send(&mut self, item: T) -> StartSend<T, io::Error> {
107 self.inner.start_send(item)
108 }
109
110 fn poll_complete(&mut self) -> Poll<(), io::Error> {
111 self.inner.poll_complete()
112 }
113}
114
115pub struct NamedMessageChannel<T, R> where
127 T: Serialize,
128 R: for<'de> Deserialize<'de>
129{
130 inner: platform::NamedMessageChannel,
131 max_message_size: usize,
132 _phantom: PhantomData<(T, R)>,
133}
134
135impl<T, R> NamedMessageChannel<T, R> where
136 T: Serialize,
137 R: for<'de> Deserialize<'de>
138{
139 pub fn new(tokio_loop: &TokioHandle, max_message_size: usize) -> io::Result<Self> {
140 let inner = platform::NamedMessageChannel::new(tokio_loop)?;
141 Ok(NamedMessageChannel { inner, max_message_size, _phantom: PhantomData })
142 }
143
144 pub fn name(&self) -> &OsStr { self.inner.name() }
145
146 pub fn accept(self, timeout: Option<Duration>) -> io::Result<MessageChannel<T, R>> {
147 let inner = self.inner.accept(timeout)?;
148 Ok(MessageChannel {
149 inner: BincodeDatagram::wrap(inner, self.max_message_size),
150 max_message_size: self.max_message_size,
151 })
152 }
153
154 pub fn connect<N>(name: N, timeout: Option<Duration>, tokio_loop: &TokioHandle, max_message_size: usize) -> io::Result<MessageChannel<T, R>> where
155 N: AsRef<OsStr>,
156 {
157 let inner = platform::NamedMessageChannel::connect(name.as_ref(), timeout, tokio_loop)?;
158 Ok(MessageChannel {
159 inner: BincodeDatagram::wrap(inner, max_message_size),
160 max_message_size,
161 })
162 }
163}
164
165#[derive(Serialize, Deserialize, Debug)]
171pub struct PreMessageChannel<T, R> where
172 T: Serialize,
173 R: for<'des> Deserialize<'des>
174{
175 inner: platform::PreMessageChannel,
176 max_message_size: usize,
177 _phantom: PhantomData<(T, R)>,
178}
179
180impl<T, R> PreMessageChannel<T, R> where
181 T: Serialize,
182 R: for<'de> Deserialize<'de>
183{
184 pub fn pair(max_message_size: usize) -> io::Result<(Self, Self)> {
185 let (a, b) = platform::PreMessageChannel::pair()?;
186
187 Ok((
188 PreMessageChannel { inner: a, max_message_size, _phantom: PhantomData },
189 PreMessageChannel { inner: b, max_message_size, _phantom: PhantomData },
190 ))
191 }
192
193 pub fn into_channel(self, remote_process: ProcessHandle, tokio_loop: &TokioHandle) -> io::Result<MessageChannel<T, R>> {
203 let channel = self.inner.into_channel(remote_process.0, tokio_loop)?;
204 Ok(MessageChannel { inner: BincodeDatagram::wrap(channel, self.max_message_size), max_message_size: self.max_message_size })
205 }
206
207 pub fn into_sealed_channel(self, tokio_loop: &TokioHandle) -> io::Result<MessageChannel<T, R>> {
212 let channel = self.inner.into_sealed_channel(tokio_loop)?;
213 Ok(MessageChannel { inner: BincodeDatagram::wrap(channel, self.max_message_size), max_message_size: self.max_message_size })
214 }
215}
216
217pub struct RawMessageChannel {
221 inner: platform::MessageChannel,
222}
223
224impl RawMessageChannel {
225 pub fn pair(tokio_loop: &TokioHandle) -> io::Result<(Self, Self)> {
226 let (a, b) = platform::MessageChannel::pair(tokio_loop)?;
227
228 Ok((
229 RawMessageChannel { inner: a },
230 RawMessageChannel { inner: b },
231 ))
232 }
233
234 pub fn establish_with_child<F>(command: &mut process::Command, tokio_loop: &TokioHandle, transmit_and_launch: F) -> io::Result<(Self, process::Child)> where
235 F: FnOnce(&mut process::Command, ChildRawMessageChannel) -> io::Result<process::Child>
236 {
237 let (channel, child) = platform::MessageChannel::establish_with_child(command, tokio_loop, |command, child_channel| {
238 transmit_and_launch(command, ChildRawMessageChannel(child_channel))
239 })?;
240
241 Ok((RawMessageChannel { inner: channel }, child))
242 }
243
244 pub fn establish_with_child_custom<F, T>(tokio_loop: &TokioHandle, transmit_and_launch: F) -> io::Result<(Self, T)> where
245 F: FnOnce(ChildRawMessageChannel) -> io::Result<(ProcessHandle, T)>
246 {
247 let (channel, t) = platform::MessageChannel::establish_with_child_custom(tokio_loop, |child_channel| {
248 let (token, t) = transmit_and_launch(ChildRawMessageChannel(child_channel))?;
249 Ok((token.0, t))
250 })?;
251
252 Ok((RawMessageChannel { inner: channel }, t))
253 }
254}
255
256impl io::Read for RawMessageChannel {
257 fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
258 self.inner.read(buffer)
259 }
260}
261
262impl AsyncRead for RawMessageChannel {
263}
264
265impl io::Write for RawMessageChannel {
266 fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
267 self.inner.write(bytes)
268 }
269
270 fn flush(&mut self) -> io::Result<()> {
271 self.inner.flush()
272 }
273}
274
275impl AsyncWrite for RawMessageChannel {
276 fn shutdown(&mut self) -> Poll<(), io::Error> {
277 self.inner.shutdown()
278 }
279}
280
281#[derive(Serialize, Deserialize, Debug)]
286pub struct ChildRawMessageChannel(pub(crate) platform::ChildMessageChannel);
287
288impl ChildRawMessageChannel {
289 pub fn into_channel(self, tokio_loop: &TokioHandle) -> io::Result<RawMessageChannel> {
290 Ok(RawMessageChannel {
291 inner: self.0.into_channel(tokio_loop)?,
292 })
293 }
294}
295
296#[derive(Serialize, Deserialize, Debug)]
302pub struct PreRawMessageChannel(platform::PreMessageChannel);
303
304impl PreRawMessageChannel {
305 pub fn pair() -> io::Result<(Self, Self)> {
306 let (a, b) = platform::PreMessageChannel::pair()?;
307
308 Ok((
309 PreRawMessageChannel(a),
310 PreRawMessageChannel(b),
311 ))
312 }
313
314 pub fn into_raw_channel(self, remote_process: ProcessHandle, tokio_loop: &TokioHandle) -> io::Result<RawMessageChannel> {
324 let channel = self.0.into_channel(remote_process.0, tokio_loop)?;
325 Ok(RawMessageChannel { inner: channel })
326 }
327
328 pub fn into_sealed_raw_channel(self, tokio_loop: &TokioHandle) -> io::Result<RawMessageChannel> {
333 let channel = self.0.into_sealed_channel(tokio_loop)?;
334 Ok(RawMessageChannel { inner: channel })
335 }
336}
337
338#[derive(Serialize, Deserialize, Debug)]
345pub struct ProcessHandle(pub(crate) platform::ProcessHandle);
346
347impl ProcessHandle {
348 pub fn current() -> io::Result<Self> {
350 Ok(ProcessHandle(
351 platform::ProcessHandle::current()?
352 ))
353 }
354
355 pub fn from_child(child: &process::Child) -> io::Result<Self> {
356 Ok(ProcessHandle(
357 platform::ProcessHandle::from_child(child)?
358 ))
359 }
360
361 pub fn clone(&self) -> io::Result<Self> {
363 Ok(ProcessHandle(
364 self.0.clone()?
365 ))
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372 use ::check_send;
373
374 #[test]
375 fn pre_raw_message_channel_is_send() {
376 let (a, _b) = PreRawMessageChannel::pair().unwrap();
377 check_send(&a);
378 }
379
380 #[test]
381 fn process_handle_is_send() {
382 let token = ProcessHandle::current().unwrap();
383 check_send(&token);
384 }
385}