daemon_engine/
connection.rs1use std::sync::{Arc, Mutex};
10use std::fmt::{Debug};
11
12use futures::sync::oneshot;
13
14use tokio::prelude::*;
15use tokio_codec::{Encoder, Decoder, Framed};
16
17
18pub struct Connection<T: AsyncRead + AsyncWrite, Codec: Encoder + Decoder>
21{
22 stream: Arc<Mutex<Framed<T, Codec>>>,
23 pub(crate) exit_rx: Arc<Mutex<Option<oneshot::Receiver<()>>>>,
24 pub(crate) exit_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
25}
26
27impl <T, Codec> Connection<T, Codec>
28where
29 T: AsyncWrite + AsyncRead + Send + 'static,
30 Codec: Encoder + Decoder + Clone + Send + 'static,
31 <Codec as Decoder>::Item: Send,
32 <Codec as Decoder>::Error: Send + Debug,
33{
34 pub fn from_socket(stream: T, codec: Codec) -> Connection<T, Codec> {
36 let (exit_tx, exit_rx) = oneshot::channel::<()>();
38
39 Connection{
41 stream: Arc::new(Mutex::new(Framed::new(stream, codec))),
42 exit_rx: Arc::new(Mutex::new(Some(exit_rx))),
43 exit_tx: Arc::new(Mutex::new(Some(exit_tx))),
44 }
45 }
46}
47
48impl <T, Codec>Connection<T, Codec>
49where
50 T: AsyncWrite + AsyncRead + Send + 'static,
51 Codec: Encoder + Decoder + Clone + Send + 'static,
52 <Codec as Decoder>::Item: Send,
53 <Codec as Decoder>::Error: Send + Debug,
54{
55 pub fn shutdown(self) {
58 debug!("[connection] exit called");
59
60 if let Some(c) = self.exit_tx.lock().unwrap().take() {
62 c.send(()).unwrap();
63 }
64
65 self.stream.lock().unwrap().get_mut().shutdown().unwrap();
67 }
68}
69
70unsafe impl<T, Codec> Send for Connection<T, Codec>
72where
73 T: AsyncWrite + AsyncRead,
74 Codec: Encoder + Decoder,
75{}
76
77
78impl<T, Codec> Sink for Connection<T, Codec>
80where
81 T: AsyncWrite + AsyncRead,
82 Codec: Encoder + Decoder,
83{
84 type SinkItem = <Codec as tokio_codec::Encoder>::Item;
85 type SinkError = <Codec as tokio_codec::Encoder>::Error;
86
87 fn start_send(
88 &mut self,
89 item: Self::SinkItem,
90 ) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
91 debug!("[connection] start send");
92 self.stream.lock().unwrap().start_send(item)
93 }
94
95 fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
96 debug!("[connection] send complete");
97 self.stream.lock().unwrap().poll_complete()
98 }
99}
100
101impl<T, Codec> Stream for Connection<T, Codec>
103where
104 T: AsyncWrite + AsyncRead,
105 Codec: Encoder + Decoder,
106{
107 type Item = <Codec as tokio_codec::Decoder>::Item;
108 type Error = <Codec as tokio_codec::Decoder>::Error;
109
110 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
111 debug!("[connection] poll receive");
112 self.stream.lock().unwrap().poll()
113 }
114}
115
116impl<T, Codec> Clone for Connection<T, Codec>
119where
120 T: AsyncWrite + AsyncRead,
121 Codec: Encoder + Decoder,
122{
123 fn clone(&self) -> Self {
124 Connection {
125 stream: self.stream.clone(),
126 exit_tx: self.exit_tx.clone(),
127 exit_rx: self.exit_rx.clone(),
128 }
129 }
130}
131
132#[cfg(test)]
133mod tests {
134
135 use tokio::prelude::*;
136 use tokio::{spawn, run};
137 use tokio_uds::{UnixStream};
138 use tokio_codec::{Decoder, Encoder};
139 use bytes::{BufMut, BytesMut};
140
141 use super::Connection;
142 use crate::error::Error;
143
144 #[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
145 struct TestCodec {}
146
147 impl Decoder for TestCodec {
148 type Item = String;
149 type Error = Error;
150
151 fn decode(&mut self, buff: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
152 let vec: Vec<u8> = buff.clone().into_iter().collect();
153 let val = String::from_utf8(vec).unwrap();
154 buff.advance(val.len());
155
156 if val.len() > 0 {
157 Ok(Some(val))
158 } else {
159 Ok(None)
160 }
161 }
162 }
163
164 impl Encoder for TestCodec {
165 type Item = String;
166 type Error = Error;
167
168 fn encode(&mut self, v: Self::Item, buff: &mut BytesMut) -> Result<(), Self::Error> {
169 buff.reserve(v.len());
170 buff.put_slice(&v.as_bytes());
171 Ok(())
172 }
173 }
174
175 use crate::AsyncWait;
176
177 #[test]
178 fn client_ping_pong() {
179 let test = future::lazy(move || {
180 let (a, b) = UnixStream::pair().unwrap();
182 let client_a = Connection::<UnixStream, TestCodec>::from_socket(a, TestCodec{});
183 let client_b = Connection::<UnixStream, TestCodec>::from_socket(b, TestCodec{});
184
185 let t = "test string".to_owned();
187
188 client_a.send(t.clone()).async_wait().unwrap();
189
190 println!("Send message: {:?}", t);
191
192 let rx_handle = client_b
196 .for_each(move |m| {
197 println!("Received message: {:?}", m);
198 assert_eq!(t, m);
199 Ok(())
200 }).map_err(|_e| ());
201 spawn(rx_handle);
202
203 Ok(())
204 }).map(|_e| ());
205
206 run(test);
207 }
208}