daemon_engine/tcp.rs
1/**
2 * rust-daemon
3 * TCP Server and Connection Implementations
4 *
5 * https://github.com/ryankurte/rust-daemon
6 * Copyright 2018 Ryan Kurte
7 */
8
9use std::fmt::{Debug};
10use std::clone::{Clone};
11use std::net::SocketAddr;
12
13use tokio::prelude::*;
14use tokio::spawn;
15use tokio_codec::{Encoder, Decoder};
16
17use tokio_tcp::{TcpListener, TcpStream};
18
19use crate::server::Server;
20use crate::connection::Connection;
21use crate::error::Error;
22
23
24/// TcpServer is a Server implementation over TcpStream and TcpInfo types with a generic codec
25///
26/// ```no_run
27/// use std::net::{SocketAddr, IpAddr, Ipv4Addr};
28///
29/// extern crate tokio;
30/// use tokio::prelude::*;
31/// use tokio::{spawn, run};
32///
33/// #[macro_use]
34/// extern crate serde_derive;
35///
36/// extern crate daemon_engine;
37/// use daemon_engine::{TcpServer, JsonCodec};
38///
39/// #[derive(Debug, Clone, Serialize, Deserialize)]
40/// struct Request {}
41///
42/// #[derive(Debug, Clone, Serialize, Deserialize)]
43/// struct Response {}
44///
45/// # fn main() {
46///
47/// let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8111);
48/// let server = future::lazy(move || {
49/// let mut s = TcpServer::<JsonCodec<Response, Request>>::new(&addr, JsonCodec::new()).unwrap();
50/// let server_handle = s
51/// .incoming()
52/// .unwrap()
53/// .for_each(move |r| {
54/// println!("Request data {:?} info: {:?}", r.data(), r.info());
55/// r.send(Response{}).wait().unwrap();
56/// Ok(())
57/// }).map_err(|_e| ());
58/// spawn(server_handle);
59/// Ok(())
60/// });
61/// run(server);
62///
63/// # }
64/// ```
65pub type TcpServer<C> = Server<TcpStream, C, TcpInfo>;
66
67/// TcpConnection is a Connection implementation over TcpStream
68///
69/// ```no_run
70/// use std::net::{SocketAddr, IpAddr, Ipv4Addr};
71///
72/// extern crate tokio;
73/// use tokio::prelude::*;
74/// use tokio::{spawn, run};
75///
76/// #[macro_use]
77/// extern crate serde_derive;
78///
79/// extern crate daemon_engine;
80/// use daemon_engine::{TcpConnection, JsonCodec, DaemonError};
81///
82/// #[derive(Debug, Clone, Serialize, Deserialize)]
83/// struct Request {}
84///
85/// #[derive(Debug, Clone, Serialize, Deserialize)]
86/// struct Response {}
87///
88/// # fn main() {
89/// let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8111);
90/// let client = TcpConnection::<JsonCodec<Request, Response>>::new(&addr, JsonCodec::new()).wait().unwrap();
91/// let (tx, rx) = client.split();
92/// // Send data
93/// tx.send(Request{}).wait().unwrap();
94///
95/// // Receive data
96/// rx.map(|resp| -> Result<(), DaemonError> {
97/// println!("Response: {:?}", resp);
98/// Ok(())
99/// }).wait().next();
100/// # }
101/// ```
102pub type TcpConnection<C> = Connection<TcpStream, C>;
103
104impl <C> TcpConnection<C>
105where
106 C: Encoder + Decoder + Clone + Send + 'static,
107 <C as Decoder>::Item: Send,
108 <C as Decoder>::Error: Send + Debug,
109{
110 /// Create a new client connected to the provided TCP socket address
111 pub fn new(addr: &SocketAddr, codec: C) -> impl Future<Item=Connection<TcpStream, C>, Error=Error> {
112 debug!("[connector] creating connection (tcp address: {})", addr);
113 // Create the socket future
114 TcpStream::connect(&addr).map(move |s| {
115 Connection::from_socket(s, codec)
116 }).map_err(|e| e.into() )
117 }
118
119 pub fn close(self) {
120 self.shutdown();
121 }
122}
123
124/// TcpInfo is an information object associated with a given TcpServer connection.
125/// This is passed to the server request handler to allow ACLs and connection tracking
126#[derive(Clone, Debug)]
127pub struct TcpInfo {
128 pub address: SocketAddr,
129}
130
131/// TCP server implementation.
132impl<C> TcpServer<C>
133where
134 C: Encoder + Decoder + Clone + Send + 'static,
135 <C as Decoder>::Item: Clone + Send + Debug,
136 <C as Decoder>::Error: Send + Debug,
137 <C as Encoder>::Item: Clone + Send + Debug,
138 <C as Encoder>::Error: Send + Debug,
139{
140
141 pub fn new(address: &SocketAddr, codec: C) -> Result<TcpServer<C>, Error> {
142
143 // Create base server instance
144 let server = Server::base(codec);
145
146 // Create listener socket
147 let socket = TcpListener::bind(&address)?;
148
149 let exit_rx = server.exit_rx.lock().unwrap().take();
150 let mut server_int = server.clone();
151
152 // Create listening thread
153 let tokio_server = socket
154 .incoming()
155 .for_each(move |s| {
156 debug!("[server] accept connection: {:?}", s);
157 let info = TcpInfo{address: s.peer_addr().unwrap()};
158 server_int.bind(info, s);
159 Ok(())
160 })
161 .map_err(|err| {
162 error!("[server] accept error: {:?}", err);
163 })
164 .select2(exit_rx)
165 .then(|_| {
166 debug!("[server] closing listener");
167 Ok(())
168 });
169 spawn(tokio_server);
170
171 // Return new connector instance
172 Ok(server)
173 }
174
175 // Connect to a TCP socket
176 pub fn connect(&mut self, address: SocketAddr) -> impl Future<Item=(), Error=Error> {
177 let mut s = self.clone();
178 TcpStream::connect(&address).map(move |socket| {
179 let info = TcpInfo{address: address.clone()};
180 s.bind(info, socket);
181 ()
182 }).map_err(|e| e.into() )
183 }
184}