daemon_engine/
server.rs

1/**
2 * rust-daemon
3 * Generic server implementation
4 *
5 * https://github.com/ryankurte/rust-daemon
6 * Copyright 2018 Ryan Kurte
7 */
8
9use std::sync::{Arc, Mutex};
10use std::fmt::{Debug};
11use std::clone::{Clone};
12
13use futures::sync::mpsc;
14use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
15use futures::sync::oneshot;
16
17use tokio::prelude::*;
18use tokio::spawn;
19use tokio_codec::{Encoder, Decoder};
20
21use crate::connection::Connection;
22
23
24/// Server provides a generic base for building stream servers.
25/// 
26/// This is generic over T, a stream reader and writer, C, and encoder and decoder, and I, and information object.
27///
28/// You probably want to be looking at TcpServer and UnixServer implementations
29pub struct Server<T: AsyncRead + AsyncWrite, C: Encoder + Decoder, I> {
30    connections: Arc<Mutex<Vec<Connection<T, C>>>>,
31    incoming_tx: Arc<Mutex<UnboundedSender<Request<T, C, I>>>>,
32    incoming_rx: Arc<Mutex<Option<UnboundedReceiver<Request<T, C, I>>>>>,
33    pub(crate) exit_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
34    pub(crate) exit_rx: Arc<Mutex<Option<oneshot::Receiver<()>>>>,
35    codec: C,
36    info: std::marker::PhantomData<I>,
37}
38
39impl<T, C, I> Server<T, C, I>
40where
41    T: AsyncWrite + AsyncRead + Send + Sync + 'static,
42    C: Encoder + Decoder + Clone + Send + 'static,
43    I: Clone + Send + Debug + 'static,
44    <C as Decoder>::Item: Clone + Send + Debug,
45    <C as Decoder>::Error: Send + Debug,
46    <C as Encoder>::Item: Clone + Send + Debug,
47    <C as Encoder>::Error: Send + Debug,
48{
49    /// Create a new base server with defined request and response message types.
50    /// 
51    /// This sets up internal resources however requires implementation to handle
52    /// creating listeners and binding connections
53    /// See TcpServer and UnixServer for examples
54    pub fn base(codec: C) -> Server<T, C, I> {
55        // Setup internal state and communication channels
56        let connections = Arc::new(Mutex::new(Vec::new()));
57        let (incoming_tx, incoming_rx) = mpsc::unbounded::<Request<T, C, I>>();
58        let (exit_tx, exit_rx) = oneshot::channel::<()>();
59
60        Server {
61            connections,
62            incoming_tx: Arc::new(Mutex::new(incoming_tx)),
63            incoming_rx: Arc::new(Mutex::new(Some(incoming_rx))),
64            exit_tx: Arc::new(Mutex::new(Some(exit_tx))),
65            exit_rx: Arc::new(Mutex::new(Some(exit_rx))),
66            codec,
67            info: std::marker::PhantomData,
68        }
69    }
70
71    /// Take the incoming data handle.
72    /// 
73    /// You can then use `for_each` to iterate over received requests as in
74    /// the examples
75    pub fn incoming(&mut self) -> Option<UnboundedReceiver<Request<T, C, I>>> {
76        self.incoming_rx.lock().unwrap().take()
77    }
78
79    /// Bind a socket to a server.
80    /// 
81    /// This attaches an rx handler to the server, and can be used both for
82    /// server listener implementations as well as to support server-initialised
83    /// connections if required
84    pub fn bind(&mut self, info: I, socket: T) {
85        // Create new connection object
86        let conn = Connection::from_socket(socket, self.codec.clone());
87
88        // Add connection to list
89        self.connections.lock().unwrap().push(conn.clone());
90
91        // Handle incoming requests
92        // This creates a handler task on the connection object
93        let inner_tx = self.incoming_tx.clone();
94        let exit_rx = conn.exit_rx.lock().unwrap().take();
95
96        let rx_handle = conn.clone().for_each(move |data| {
97            let tx = inner_tx.lock().unwrap();
98            let req = Request{inner: conn.clone(), info: info.clone(), data: data.clone()};
99            tx.clone().send(req).map(|_| () ).map_err(|e| panic!("[server] send error: {:?}", e) )
100        })
101        .map_err(|e| panic!("[server] error: {:?}", e) )
102        .select2(exit_rx)
103        .then(|_| {
104            debug!("[server] closing handler");
105            Ok(())
106        });
107
108        spawn(rx_handle);
109    }
110
111    /// Close the socket server
112    /// 
113    /// This sends exit messages to the main task and all connected hosts
114    pub fn close(self) {
115        debug!("[server] closing");
116
117        // Send listener exit signal
118        let tx = self.exit_tx.lock().unwrap().take().unwrap();
119        let _ = tx.send(());
120
121        // Send exit signals to client listeners
122        let mut connections = self.connections.lock().unwrap();
123        let _results: Vec<_> = connections.drain(..).map(|c| c.shutdown() ).collect();
124    }
125}
126
127/// Clone over generic connector
128/// 
129/// All instances of a given connector contain the same arc/mutex protected information
130impl<T, C, I> Clone for Server<T, C, I>
131where
132    T: AsyncWrite + AsyncRead + Send + Sync + 'static,
133    C: Encoder + Decoder + Clone + Send + 'static,
134    I: Clone + Send + Debug + 'static,
135    <C as Decoder>::Item: Clone + Send + Debug,
136    <C as Decoder>::Error: Send + Debug,
137    <C as Encoder>::Item: Clone + Send + Debug,
138    <C as Encoder>::Error: Send + Debug,
139{
140    fn clone(&self) -> Self {
141        Server {
142            connections: self.connections.clone(),
143            incoming_tx: self.incoming_tx.clone(),
144            incoming_rx: self.incoming_rx.clone(),
145            exit_tx: self.exit_tx.clone(),
146            exit_rx: self.exit_rx.clone(),
147            codec: self.codec.clone(),
148            info: std::marker::PhantomData,
149        }
150    }
151}
152
153/// Request is a request from a client
154/// 
155/// This contains the Request data and is a Sink for Response data
156/// Allowing message handlers to reply directly to the client
157pub struct Request<T: AsyncRead + AsyncWrite, C: Encoder + Decoder, I> {
158    inner: Connection<T, C>,
159    data: <C as Decoder>::Item,
160    info: I,
161}
162
163/// Allows requests to be cloned
164impl<T, C, I> Request<T, C, I>
165where
166    T: AsyncWrite + AsyncRead + Send + 'static,
167    C: Encoder + Decoder + Clone + Send + 'static,
168    I: Clone + Send + Debug + 'static,
169    <C as Decoder>::Item: Send + Clone,
170    <C as Decoder>::Error: Send + Debug,
171{
172    // Fetch the data for a given request
173    pub fn data(&self) -> <C as Decoder>::Item {
174        self.data.clone()
175    }
176
177    // Fetch connection information for a given request
178    pub fn info(&self) -> &I {
179        &self.info
180    }
181}
182
183/// Sink implementation allows responses to requests
184impl<T, C, I> Sink for Request<T, C, I>
185where
186    T: AsyncWrite + AsyncRead + Send + 'static,
187    C: Encoder + Decoder + Clone + Send + 'static,
188    I: Clone + Send + Debug + 'static,
189    <C as Decoder>::Item: Send,
190    <C as Decoder>::Error: Send + Debug,
191{
192    type SinkItem = <C as Encoder>::Item;
193    type SinkError = <C as Encoder>::Error;
194
195    fn start_send(
196        &mut self,
197        item: Self::SinkItem,
198    ) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
199        trace!("[request] start send");
200        self.inner.start_send(item)
201    }
202
203    fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
204        trace!("[request] send complete");
205        self.inner.poll_complete()
206    }
207}