daemon_engine/
udp.rs

1/**
2 * rust-daemon
3 * TCP Server and Connection Implementations
4 *
5 * https://github.com/ryankurte/rust-daemon
6 * Copyright 2018 Ryan Kurte
7 */
8
9use std::fmt::{Debug};
10use std::clone::{Clone};
11use std::net::SocketAddr;
12use std::sync::{Arc, Mutex};
13
14use futures::sync::mpsc;
15use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender, SendError};
16use futures::sync::oneshot;
17
18use tokio::prelude::*;
19use tokio_codec::{Encoder, Decoder};
20use tokio::spawn;
21use tokio::net::udp::{UdpSocket, UdpFramed};
22
23use crate::error::Error;
24
25/// ```no_run
26/// use std::net::{SocketAddr, IpAddr, Ipv4Addr};
27/// 
28/// extern crate tokio;
29/// use tokio::prelude::*;
30/// use tokio::{spawn, run};
31/// 
32/// #[macro_use]
33/// extern crate serde_derive;
34/// 
35/// extern crate daemon_engine;
36/// use daemon_engine::{UdpConnection, JsonCodec, DaemonError};
37/// 
38/// #[derive(Debug, Clone, Serialize, Deserialize)]
39/// struct Request {}
40/// 
41/// #[derive(Debug, Clone, Serialize, Deserialize)]
42/// struct Response {}
43/// 
44/// # fn main() {
45/// let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8111);
46/// let client = UdpConnection::<JsonCodec<Request, Response>>::new(&addr, JsonCodec::new()).wait().unwrap();
47/// let (tx, rx) = client.split();
48/// // Send data
49/// tx.send((Request{}, addr.clone())).wait().unwrap();
50/// 
51/// // Receive data
52/// rx.map(|resp| -> Result<(), DaemonError> {
53///    println!("Response: {:?}", resp);
54///    Ok(())
55/// }).wait().next();
56/// # }
57/// ```
58
59pub type IncomingRx<Req> = UnboundedReceiver<(Req, SocketAddr)>;
60pub type OutgoingTx<Resp> = UnboundedSender<(Resp, SocketAddr)>;
61
62/// UdpConnection is a wrapper around UdpSocket to provide a consistent ish interface for UDP clients and servers.
63/// TODO: Ideally it should be possible to genericise this and merge back with the Connection and Server components
64#[derive(Clone)]
65pub struct UdpConnection<Codec: Encoder + Decoder> 
66{
67    incoming_rx: Arc<Mutex<IncomingRx<<Codec as Decoder>::Item>>>,
68    outgoing_tx: Arc<Mutex<OutgoingTx<<Codec as Encoder>::Item>>>,
69    exit_tx: Arc<Mutex<Option<(oneshot::Sender<()>, oneshot::Sender<()>)>>>,
70}
71
72impl <Codec> UdpConnection<Codec> 
73where
74    Codec: Encoder + Decoder + Clone + Send + 'static,
75    <Codec as Encoder>::Item: Send + Debug,
76    <Codec as Encoder>::Error: Send + Debug,
77    <Codec as Decoder>::Item: Send + Debug,
78    <Codec as Decoder>::Error: Send + Debug,
79{
80    /// Create a new connection by binding to the specified UDP socket
81    pub fn new(addr: &SocketAddr, codec: Codec) -> impl Future<Item=UdpConnection<Codec>, Error=Error> {
82        debug!("[connector] creating connection (udp address: {})", addr);
83        // Create the socket future
84        let socket = match UdpSocket::bind(&addr) {
85            Ok(s) => s,
86            Err(e) => return futures::future::err(e.into()),
87        };
88        // Create the socket instance
89        futures::future::ok(UdpConnection::from_socket(socket, codec))
90    }
91
92    /// Create a new connection from an existing udp socket
93    fn from_socket(socket: UdpSocket, codec: Codec) -> UdpConnection<Codec> {
94        let framed = UdpFramed::new(socket, codec);
95        let (tx, rx) = framed.split();
96
97        let (incoming_tx, incoming_rx) = mpsc::unbounded::<_>();
98        let (incoming_exit_tx, incoming_exit_rx) = oneshot::channel::<()>();
99
100        // Handle incoming messages
101        let rx_handle = rx.for_each(move |(data, addr)| {
102            trace!("[udp connection] receive from: '{:?}' data: '{:?}'", addr, data);
103            incoming_tx.clone().send((data, addr)).map(|_| () ).map_err(|e| panic!("[udp connection] send error: {:?}", e))
104        })
105        .map_err(|e| panic!("[udp connection] error: {:?}", e))
106        .select2(incoming_exit_rx)
107        .then(|_| {
108            debug!("[udp connection] closing incoming handler");
109            Ok(())
110        });
111        spawn(rx_handle);
112
113        let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<_>();
114        let (outgoing_exit_tx, outgoing_exit_rx) = oneshot::channel::<()>();
115
116        let tx_handle = tx.send_all(outgoing_rx.map_err(|_| panic!() ))
117        .select2(outgoing_exit_rx)
118        .then(|_| {
119            debug!("[udp connection] closing outgoing handler");
120            Ok(())
121        });
122        spawn(tx_handle);
123
124        // Build connection object
125        UdpConnection{
126            incoming_rx: Arc::new(Mutex::new(incoming_rx)),
127            outgoing_tx: Arc::new(Mutex::new(outgoing_tx)),
128            exit_tx: Arc::new(Mutex::new(Some((incoming_exit_tx, outgoing_exit_tx)))),
129        }
130    }
131
132    pub fn send(&mut self, addr: SocketAddr, data: <Codec as Encoder>::Item) {
133        let unlocked = self.outgoing_tx.lock().unwrap();
134
135        let _err = unlocked.unbounded_send((data, addr));
136    }
137
138    pub fn shutdown(self) {
139        // Send listener exit signal
140        let tx = self.exit_tx.lock().unwrap().take().unwrap();
141        let _ = tx.0.send(());
142        let _ = tx.1.send(());
143
144        // Close the stream
145        //self.socket.lock().unwrap().get_mut().shutdown().unwrap()
146    }
147}
148
149/// Blank send
150unsafe impl<Codec> Send for UdpConnection<Codec> 
151where
152    Codec: Encoder + Decoder + Clone, 
153{}
154
155/// Sink implementation allows sending messages over a connection
156impl<Codec> Sink for UdpConnection<Codec>
157where
158    Codec: Encoder + Decoder + Clone, 
159{
160    type SinkItem = (<Codec as Encoder>::Item, SocketAddr);
161    type SinkError = SendError<(<Codec as Encoder>::Item, SocketAddr)>;
162
163    fn start_send(
164        &mut self,
165        item: Self::SinkItem,
166    ) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
167        trace!("[connection] start send");
168        self.outgoing_tx.lock().unwrap().start_send(item)
169    }
170
171    fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
172        trace!("[connection] send complete");
173        self.outgoing_tx.lock().unwrap().poll_complete()
174    }
175}
176
177/// Stream implementation allows receiving messages from a connection
178impl<Codec> Stream for UdpConnection<Codec>
179where
180    Codec: Encoder + Decoder + Clone, 
181{
182    type Item = (<Codec as Decoder>::Item, SocketAddr);
183    type Error = ();
184
185    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
186        trace!("[connection] poll receive");
187        self.incoming_rx.lock().unwrap().poll()
188    }
189}
190
191/// UdpInfo is an information object associated with a given UdpServer connection.
192/// This is passed to the server request handler to allow ACLs and connection tracking
193#[derive(Clone, Debug)]
194pub struct UdpInfo {
195    pub address: SocketAddr,
196}