daemon_engine/
tcp.rs

1/**
2 * rust-daemon
3 * TCP Server and Connection Implementations
4 *
5 * https://github.com/ryankurte/rust-daemon
6 * Copyright 2018 Ryan Kurte
7 */
8
9use std::fmt::{Debug};
10use std::clone::{Clone};
11use std::net::SocketAddr;
12
13use tokio::prelude::*;
14use tokio::spawn;
15use tokio_codec::{Encoder, Decoder};
16
17use tokio_tcp::{TcpListener, TcpStream};
18
19use crate::server::Server;
20use crate::connection::Connection;
21use crate::error::Error;
22
23
24/// TcpServer is a Server implementation over TcpStream and TcpInfo types with a generic codec
25/// 
26/// ```no_run
27/// use std::net::{SocketAddr, IpAddr, Ipv4Addr};
28/// 
29/// extern crate tokio;
30/// use tokio::prelude::*;
31/// use tokio::{spawn, run};
32/// 
33/// #[macro_use]
34/// extern crate serde_derive;
35/// 
36/// extern crate daemon_engine;
37/// use daemon_engine::{TcpServer, JsonCodec};
38/// 
39/// #[derive(Debug, Clone, Serialize, Deserialize)]
40/// struct Request {}
41/// 
42/// #[derive(Debug, Clone, Serialize, Deserialize)]
43/// struct Response {}
44/// 
45/// # fn main() {
46/// 
47/// let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8111);
48/// let server = future::lazy(move || {
49///     let mut s = TcpServer::<JsonCodec<Response, Request>>::new(&addr, JsonCodec::new()).unwrap();
50///     let server_handle = s
51///         .incoming()
52///         .unwrap()
53///         .for_each(move |r| {
54///             println!("Request data {:?} info: {:?}", r.data(), r.info());
55///             r.send(Response{}).wait().unwrap();
56///             Ok(())
57///         }).map_err(|_e| ());
58///     spawn(server_handle);
59///     Ok(())
60/// });
61/// run(server);
62/// 
63/// # }
64/// ```
65pub type TcpServer<C> = Server<TcpStream, C, TcpInfo>;
66
67/// TcpConnection is a Connection implementation over TcpStream
68/// 
69/// ```no_run
70/// use std::net::{SocketAddr, IpAddr, Ipv4Addr};
71/// 
72/// extern crate tokio;
73/// use tokio::prelude::*;
74/// use tokio::{spawn, run};
75/// 
76/// #[macro_use]
77/// extern crate serde_derive;
78/// 
79/// extern crate daemon_engine;
80/// use daemon_engine::{TcpConnection, JsonCodec, DaemonError};
81/// 
82/// #[derive(Debug, Clone, Serialize, Deserialize)]
83/// struct Request {}
84/// 
85/// #[derive(Debug, Clone, Serialize, Deserialize)]
86/// struct Response {}
87/// 
88/// # fn main() {
89/// let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8111);
90/// let client = TcpConnection::<JsonCodec<Request, Response>>::new(&addr, JsonCodec::new()).wait().unwrap();
91/// let (tx, rx) = client.split();
92/// // Send data
93/// tx.send(Request{}).wait().unwrap();
94/// 
95/// // Receive data
96/// rx.map(|resp| -> Result<(), DaemonError> {
97///    println!("Response: {:?}", resp);
98///    Ok(())
99/// }).wait().next();
100/// # }
101/// ```
102pub type TcpConnection<C> = Connection<TcpStream, C>;
103
104impl <C> TcpConnection<C> 
105where
106    C: Encoder + Decoder + Clone + Send + 'static,
107    <C as Decoder>::Item: Send,
108    <C as Decoder>::Error: Send + Debug,
109{
110    /// Create a new client connected to the provided TCP socket address
111    pub fn new(addr: &SocketAddr, codec: C) -> impl Future<Item=Connection<TcpStream, C>, Error=Error> {
112        debug!("[connector] creating connection (tcp address: {})", addr);
113        // Create the socket future
114        TcpStream::connect(&addr).map(move |s| {
115            Connection::from_socket(s, codec)
116        }).map_err(|e| e.into() )
117    }
118
119    pub fn close(self) {
120        self.shutdown();
121    }
122}
123
124/// TcpInfo is an information object associated with a given TcpServer connection.
125/// This is passed to the server request handler to allow ACLs and connection tracking
126#[derive(Clone, Debug)]
127pub struct TcpInfo {
128    pub address: SocketAddr,
129}
130
131/// TCP server implementation.
132impl<C> TcpServer<C>
133where
134    C: Encoder + Decoder + Clone + Send + 'static,
135    <C as Decoder>::Item: Clone + Send + Debug,
136    <C as Decoder>::Error: Send + Debug,
137    <C as Encoder>::Item: Clone + Send + Debug,
138    <C as Encoder>::Error: Send + Debug,
139{
140    
141    pub fn new(address: &SocketAddr, codec: C) -> Result<TcpServer<C>, Error> {
142
143        // Create base server instance
144        let server = Server::base(codec);
145
146        // Create listener socket
147        let socket = TcpListener::bind(&address)?;
148
149        let exit_rx = server.exit_rx.lock().unwrap().take();
150        let mut server_int = server.clone();
151
152        // Create listening thread
153        let tokio_server = socket
154            .incoming()
155            .for_each(move |s| {
156                debug!("[server] accept connection: {:?}", s);
157                let info = TcpInfo{address: s.peer_addr().unwrap()};
158                server_int.bind(info, s); 
159                Ok(())
160             })
161            .map_err(|err| {
162                error!("[server] accept error: {:?}", err);
163            })
164            .select2(exit_rx)
165            .then(|_| {
166                debug!("[server] closing listener");
167                Ok(())
168            });
169        spawn(tokio_server);
170
171        // Return new connector instance
172        Ok(server)
173    }
174
175    // Connect to a TCP socket
176    pub fn connect(&mut self, address: SocketAddr) -> impl Future<Item=(), Error=Error> {
177        let mut s = self.clone();
178        TcpStream::connect(&address).map(move |socket| {
179            let info = TcpInfo{address: address.clone()};
180            s.bind(info, socket);
181            ()
182        }).map_err(|e| e.into() )
183    }
184}