1use std::fmt::{Debug};
10use std::clone::{Clone};
11use std::net::SocketAddr;
12use std::sync::{Arc, Mutex};
13
14use futures::sync::mpsc;
15use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender, SendError};
16use futures::sync::oneshot;
17
18use tokio::prelude::*;
19use tokio_codec::{Encoder, Decoder};
20use tokio::spawn;
21use tokio::net::udp::{UdpSocket, UdpFramed};
22
23use crate::error::Error;
24
25pub type IncomingRx<Req> = UnboundedReceiver<(Req, SocketAddr)>;
60pub type OutgoingTx<Resp> = UnboundedSender<(Resp, SocketAddr)>;
61
62#[derive(Clone)]
65pub struct UdpConnection<Codec: Encoder + Decoder>
66{
67 incoming_rx: Arc<Mutex<IncomingRx<<Codec as Decoder>::Item>>>,
68 outgoing_tx: Arc<Mutex<OutgoingTx<<Codec as Encoder>::Item>>>,
69 exit_tx: Arc<Mutex<Option<(oneshot::Sender<()>, oneshot::Sender<()>)>>>,
70}
71
72impl <Codec> UdpConnection<Codec>
73where
74 Codec: Encoder + Decoder + Clone + Send + 'static,
75 <Codec as Encoder>::Item: Send + Debug,
76 <Codec as Encoder>::Error: Send + Debug,
77 <Codec as Decoder>::Item: Send + Debug,
78 <Codec as Decoder>::Error: Send + Debug,
79{
80 pub fn new(addr: &SocketAddr, codec: Codec) -> impl Future<Item=UdpConnection<Codec>, Error=Error> {
82 debug!("[connector] creating connection (udp address: {})", addr);
83 let socket = match UdpSocket::bind(&addr) {
85 Ok(s) => s,
86 Err(e) => return futures::future::err(e.into()),
87 };
88 futures::future::ok(UdpConnection::from_socket(socket, codec))
90 }
91
92 fn from_socket(socket: UdpSocket, codec: Codec) -> UdpConnection<Codec> {
94 let framed = UdpFramed::new(socket, codec);
95 let (tx, rx) = framed.split();
96
97 let (incoming_tx, incoming_rx) = mpsc::unbounded::<_>();
98 let (incoming_exit_tx, incoming_exit_rx) = oneshot::channel::<()>();
99
100 let rx_handle = rx.for_each(move |(data, addr)| {
102 trace!("[udp connection] receive from: '{:?}' data: '{:?}'", addr, data);
103 incoming_tx.clone().send((data, addr)).map(|_| () ).map_err(|e| panic!("[udp connection] send error: {:?}", e))
104 })
105 .map_err(|e| panic!("[udp connection] error: {:?}", e))
106 .select2(incoming_exit_rx)
107 .then(|_| {
108 debug!("[udp connection] closing incoming handler");
109 Ok(())
110 });
111 spawn(rx_handle);
112
113 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<_>();
114 let (outgoing_exit_tx, outgoing_exit_rx) = oneshot::channel::<()>();
115
116 let tx_handle = tx.send_all(outgoing_rx.map_err(|_| panic!() ))
117 .select2(outgoing_exit_rx)
118 .then(|_| {
119 debug!("[udp connection] closing outgoing handler");
120 Ok(())
121 });
122 spawn(tx_handle);
123
124 UdpConnection{
126 incoming_rx: Arc::new(Mutex::new(incoming_rx)),
127 outgoing_tx: Arc::new(Mutex::new(outgoing_tx)),
128 exit_tx: Arc::new(Mutex::new(Some((incoming_exit_tx, outgoing_exit_tx)))),
129 }
130 }
131
132 pub fn send(&mut self, addr: SocketAddr, data: <Codec as Encoder>::Item) {
133 let unlocked = self.outgoing_tx.lock().unwrap();
134
135 let _err = unlocked.unbounded_send((data, addr));
136 }
137
138 pub fn shutdown(self) {
139 let tx = self.exit_tx.lock().unwrap().take().unwrap();
141 let _ = tx.0.send(());
142 let _ = tx.1.send(());
143
144 }
147}
148
149unsafe impl<Codec> Send for UdpConnection<Codec>
151where
152 Codec: Encoder + Decoder + Clone,
153{}
154
155impl<Codec> Sink for UdpConnection<Codec>
157where
158 Codec: Encoder + Decoder + Clone,
159{
160 type SinkItem = (<Codec as Encoder>::Item, SocketAddr);
161 type SinkError = SendError<(<Codec as Encoder>::Item, SocketAddr)>;
162
163 fn start_send(
164 &mut self,
165 item: Self::SinkItem,
166 ) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
167 trace!("[connection] start send");
168 self.outgoing_tx.lock().unwrap().start_send(item)
169 }
170
171 fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
172 trace!("[connection] send complete");
173 self.outgoing_tx.lock().unwrap().poll_complete()
174 }
175}
176
177impl<Codec> Stream for UdpConnection<Codec>
179where
180 Codec: Encoder + Decoder + Clone,
181{
182 type Item = (<Codec as Decoder>::Item, SocketAddr);
183 type Error = ();
184
185 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
186 trace!("[connection] poll receive");
187 self.incoming_rx.lock().unwrap().poll()
188 }
189}
190
191#[derive(Clone, Debug)]
194pub struct UdpInfo {
195 pub address: SocketAddr,
196}