daemon_engine/
connection.rs

1/**
2 * rust-daemon
3 * Connection implementation
4 *
5 * https://github.com/ryankurte/rust-daemon
6 * Copyright 2018 Ryan Kurte
7 */
8
9use std::sync::{Arc, Mutex};
10use std::fmt::{Debug};
11
12use futures::sync::oneshot;
13
14use tokio::prelude::*;
15use tokio_codec::{Encoder, Decoder, Framed};
16
17
18/// Connection type implemented on top of AsyncRead + AsyncWrite and an Encoder/Decoder
19/// This provides a simple / generic base object for managing tokio connections
20pub struct Connection<T: AsyncRead + AsyncWrite, Codec: Encoder + Decoder> 
21{
22    stream: Arc<Mutex<Framed<T, Codec>>>,
23    pub(crate) exit_rx: Arc<Mutex<Option<oneshot::Receiver<()>>>>,
24    pub(crate) exit_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
25}
26
27impl <T, Codec> Connection<T, Codec>
28where 
29    T: AsyncWrite + AsyncRead + Send + 'static,
30    Codec: Encoder + Decoder + Clone + Send + 'static,
31    <Codec as Decoder>::Item: Send,
32    <Codec as Decoder>::Error: Send + Debug,
33{
34    /// Create a new connection instance over an arbitrary stream
35    pub fn from_socket(stream: T, codec: Codec) -> Connection<T, Codec> {
36        // Setup stream and exit channels
37        let (exit_tx, exit_rx) = oneshot::channel::<()>();
38
39        // Build connection object
40        Connection{
41            stream: Arc::new(Mutex::new(Framed::new(stream, codec))),
42            exit_rx: Arc::new(Mutex::new(Some(exit_rx))),
43            exit_tx: Arc::new(Mutex::new(Some(exit_tx))),
44        }
45    }
46}
47
48impl <T, Codec>Connection<T, Codec>
49where 
50    T: AsyncWrite + AsyncRead + Send + 'static,
51    Codec: Encoder + Decoder + Clone + Send + 'static,
52    <Codec as Decoder>::Item: Send,
53    <Codec as Decoder>::Error: Send + Debug,
54{
55    /// Exit closes the handler task if bound
56    /// note this will panic if exit has already been called
57    pub fn shutdown(self) {
58        debug!("[connection] exit called");
59
60        // Send exit signal
61        if let Some(c) = self.exit_tx.lock().unwrap().take() {
62            c.send(()).unwrap();
63        }
64
65        // Close the stream
66        self.stream.lock().unwrap().get_mut().shutdown().unwrap();
67    }
68}
69
70/// Blank send
71unsafe impl<T, Codec> Send for Connection<T, Codec> 
72where
73    T: AsyncWrite + AsyncRead,
74    Codec: Encoder + Decoder, 
75{}
76
77
78/// Sink implementation allows sending messages over a connection
79impl<T, Codec> Sink for Connection<T, Codec>
80where
81    T: AsyncWrite + AsyncRead,
82    Codec: Encoder + Decoder, 
83{
84    type SinkItem = <Codec as tokio_codec::Encoder>::Item;
85    type SinkError = <Codec as tokio_codec::Encoder>::Error;
86
87    fn start_send(
88        &mut self,
89        item: Self::SinkItem,
90    ) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
91        debug!("[connection] start send");
92        self.stream.lock().unwrap().start_send(item)
93    }
94
95    fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
96        debug!("[connection] send complete");
97        self.stream.lock().unwrap().poll_complete()
98    }
99}
100
101/// Stream implementation allows receiving messages from a connection
102impl<T, Codec> Stream for Connection<T, Codec>
103where
104    T: AsyncWrite + AsyncRead,
105    Codec: Encoder + Decoder, 
106{
107    type Item = <Codec as tokio_codec::Decoder>::Item;
108    type Error = <Codec as tokio_codec::Decoder>::Error;
109
110    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
111        debug!("[connection] poll receive");
112        self.stream.lock().unwrap().poll()
113    }
114}
115
116/// Clone over generic connector
117/// All instances of a given connector contain the same arc/mutex protected information
118impl<T, Codec> Clone for Connection<T, Codec>
119where
120    T: AsyncWrite + AsyncRead,
121    Codec: Encoder + Decoder, 
122{
123    fn clone(&self) -> Self {
124        Connection {
125            stream: self.stream.clone(),
126            exit_tx: self.exit_tx.clone(),
127            exit_rx: self.exit_rx.clone(),
128        }
129    }
130}
131
132#[cfg(test)]
133mod tests {
134
135    use tokio::prelude::*;
136    use tokio::{spawn, run};
137    use tokio_uds::{UnixStream};
138    use tokio_codec::{Decoder, Encoder};
139    use bytes::{BufMut, BytesMut};
140
141    use super::Connection;
142    use crate::error::Error;
143
144    #[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
145    struct TestCodec {}
146
147    impl Decoder for TestCodec {
148        type Item = String;
149        type Error = Error;
150
151        fn decode(&mut self, buff: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
152            let vec: Vec<u8> = buff.clone().into_iter().collect();
153            let val = String::from_utf8(vec).unwrap();
154            buff.advance(val.len());
155            
156            if val.len() > 0 {
157                Ok(Some(val))
158            } else {
159                Ok(None)
160            }
161        }
162    }
163
164    impl Encoder for TestCodec {
165        type Item = String;
166        type Error = Error;
167
168        fn encode(&mut self, v: Self::Item, buff: &mut BytesMut) -> Result<(), Self::Error> {
169            buff.reserve(v.len());
170            buff.put_slice(&v.as_bytes());
171            Ok(())
172        }
173    }
174
175    use crate::AsyncWait;
176
177    #[test]
178    fn client_ping_pong() {
179        let test = future::lazy(move || {
180            // Build client pair
181            let (a, b) = UnixStream::pair().unwrap();
182            let client_a = Connection::<UnixStream, TestCodec>::from_socket(a, TestCodec{});
183            let client_b = Connection::<UnixStream, TestCodec>::from_socket(b, TestCodec{});
184
185            // Send a message
186            let t = "test string".to_owned();
187
188            client_a.send(t.clone()).async_wait().unwrap();
189
190            println!("Send message: {:?}", t);
191
192            // Receive a message
193            // TODO: this should be, receive ONE message
194            // Maybe a once + a timeout would work here?
195            let rx_handle = client_b
196                .for_each(move |m| {
197                    println!("Received message: {:?}", m);
198                    assert_eq!(t, m);
199                    Ok(())
200                }).map_err(|_e| ());
201            spawn(rx_handle);
202            
203            Ok(())
204        }).map(|_e| ()); 
205
206        run(test);
207    }
208}