1use std::sync::{Arc, Mutex};
10use std::fmt::{Debug};
11use std::clone::{Clone};
12
13use futures::sync::mpsc;
14use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
15use futures::sync::oneshot;
16
17use tokio::prelude::*;
18use tokio::spawn;
19use tokio_codec::{Encoder, Decoder};
20
21use crate::connection::Connection;
22
23
24pub struct Server<T: AsyncRead + AsyncWrite, C: Encoder + Decoder, I> {
30 connections: Arc<Mutex<Vec<Connection<T, C>>>>,
31 incoming_tx: Arc<Mutex<UnboundedSender<Request<T, C, I>>>>,
32 incoming_rx: Arc<Mutex<Option<UnboundedReceiver<Request<T, C, I>>>>>,
33 pub(crate) exit_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
34 pub(crate) exit_rx: Arc<Mutex<Option<oneshot::Receiver<()>>>>,
35 codec: C,
36 info: std::marker::PhantomData<I>,
37}
38
39impl<T, C, I> Server<T, C, I>
40where
41 T: AsyncWrite + AsyncRead + Send + Sync + 'static,
42 C: Encoder + Decoder + Clone + Send + 'static,
43 I: Clone + Send + Debug + 'static,
44 <C as Decoder>::Item: Clone + Send + Debug,
45 <C as Decoder>::Error: Send + Debug,
46 <C as Encoder>::Item: Clone + Send + Debug,
47 <C as Encoder>::Error: Send + Debug,
48{
49 pub fn base(codec: C) -> Server<T, C, I> {
55 let connections = Arc::new(Mutex::new(Vec::new()));
57 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Request<T, C, I>>();
58 let (exit_tx, exit_rx) = oneshot::channel::<()>();
59
60 Server {
61 connections,
62 incoming_tx: Arc::new(Mutex::new(incoming_tx)),
63 incoming_rx: Arc::new(Mutex::new(Some(incoming_rx))),
64 exit_tx: Arc::new(Mutex::new(Some(exit_tx))),
65 exit_rx: Arc::new(Mutex::new(Some(exit_rx))),
66 codec,
67 info: std::marker::PhantomData,
68 }
69 }
70
71 pub fn incoming(&mut self) -> Option<UnboundedReceiver<Request<T, C, I>>> {
76 self.incoming_rx.lock().unwrap().take()
77 }
78
79 pub fn bind(&mut self, info: I, socket: T) {
85 let conn = Connection::from_socket(socket, self.codec.clone());
87
88 self.connections.lock().unwrap().push(conn.clone());
90
91 let inner_tx = self.incoming_tx.clone();
94 let exit_rx = conn.exit_rx.lock().unwrap().take();
95
96 let rx_handle = conn.clone().for_each(move |data| {
97 let tx = inner_tx.lock().unwrap();
98 let req = Request{inner: conn.clone(), info: info.clone(), data: data.clone()};
99 tx.clone().send(req).map(|_| () ).map_err(|e| panic!("[server] send error: {:?}", e) )
100 })
101 .map_err(|e| panic!("[server] error: {:?}", e) )
102 .select2(exit_rx)
103 .then(|_| {
104 debug!("[server] closing handler");
105 Ok(())
106 });
107
108 spawn(rx_handle);
109 }
110
111 pub fn close(self) {
115 debug!("[server] closing");
116
117 let tx = self.exit_tx.lock().unwrap().take().unwrap();
119 let _ = tx.send(());
120
121 let mut connections = self.connections.lock().unwrap();
123 let _results: Vec<_> = connections.drain(..).map(|c| c.shutdown() ).collect();
124 }
125}
126
127impl<T, C, I> Clone for Server<T, C, I>
131where
132 T: AsyncWrite + AsyncRead + Send + Sync + 'static,
133 C: Encoder + Decoder + Clone + Send + 'static,
134 I: Clone + Send + Debug + 'static,
135 <C as Decoder>::Item: Clone + Send + Debug,
136 <C as Decoder>::Error: Send + Debug,
137 <C as Encoder>::Item: Clone + Send + Debug,
138 <C as Encoder>::Error: Send + Debug,
139{
140 fn clone(&self) -> Self {
141 Server {
142 connections: self.connections.clone(),
143 incoming_tx: self.incoming_tx.clone(),
144 incoming_rx: self.incoming_rx.clone(),
145 exit_tx: self.exit_tx.clone(),
146 exit_rx: self.exit_rx.clone(),
147 codec: self.codec.clone(),
148 info: std::marker::PhantomData,
149 }
150 }
151}
152
153pub struct Request<T: AsyncRead + AsyncWrite, C: Encoder + Decoder, I> {
158 inner: Connection<T, C>,
159 data: <C as Decoder>::Item,
160 info: I,
161}
162
163impl<T, C, I> Request<T, C, I>
165where
166 T: AsyncWrite + AsyncRead + Send + 'static,
167 C: Encoder + Decoder + Clone + Send + 'static,
168 I: Clone + Send + Debug + 'static,
169 <C as Decoder>::Item: Send + Clone,
170 <C as Decoder>::Error: Send + Debug,
171{
172 pub fn data(&self) -> <C as Decoder>::Item {
174 self.data.clone()
175 }
176
177 pub fn info(&self) -> &I {
179 &self.info
180 }
181}
182
183impl<T, C, I> Sink for Request<T, C, I>
185where
186 T: AsyncWrite + AsyncRead + Send + 'static,
187 C: Encoder + Decoder + Clone + Send + 'static,
188 I: Clone + Send + Debug + 'static,
189 <C as Decoder>::Item: Send,
190 <C as Decoder>::Error: Send + Debug,
191{
192 type SinkItem = <C as Encoder>::Item;
193 type SinkError = <C as Encoder>::Error;
194
195 fn start_send(
196 &mut self,
197 item: Self::SinkItem,
198 ) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
199 trace!("[request] start send");
200 self.inner.start_send(item)
201 }
202
203 fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
204 trace!("[request] send complete");
205 self.inner.poll_complete()
206 }
207}