udp_polygon/
lib.rs

1//! # udp-polygon
2//!
3//! `udp-polygon` is a library that allows to send and receive UDP packets.
4//!
5//! It can be configured in many ways, using toml, args, or env vars.
6//!
7//! It also supports retransmission of packets, using timers.
8//!
9//! ## Requirements
10//! * the consumer requires  [tokio](https://docs.rs/tokio/)
11//! * a producer does not require anything extra
12//! * a producer with the timer flag enabled requires [tokio](https://docs.rs/tokio/)
13
14use std::net::{IpAddr, SocketAddr, UdpSocket};
15/// This is the configuration module. It allows to configure the lib, using toml, args, or env vars.
16pub mod config;
17use config::Config;
18use std::sync::mpsc::{self, Receiver, Sender};
19use std::sync::{Arc, Mutex};
20use tokio;
21
22#[cfg(feature = "timers")]
23pub mod timers;
24
25#[cfg(feature = "timers")]
26use crate::timers::Timers;
27
28#[cfg(feature = "timers")]
29use tokio::time::Duration;
30
31/// Polygon is a UDP socket that can send and receive data.
32/// It can be configured by using the `configure` method.
33/// ``` rust
34/// let mut polygon = Polygon::configure(config);
35/// ```
36#[derive(Debug)]
37pub struct Polygon {
38    pub socket: UdpSocket,
39    pub buffer: [u8; 65535],
40    pub destination: Option<SocketAddr>,
41    pub pause_timer_send: Arc<Mutex<bool>>,
42}
43
44impl Polygon {
45    pub fn get_channel() -> (Sender<Vec<u8>>, Receiver<Vec<u8>>) {
46        let (tx, rx) = mpsc::channel();
47        (tx, rx)
48    }
49    pub fn configure(config: Config) -> Self {
50        let addrs = config
51            .bind_addresses
52            .into_iter()
53            .map(|addr| match addr.ip {
54                IpAddr::V4(ipv4) => SocketAddr::new(IpAddr::V4(ipv4), addr.port),
55                IpAddr::V6(ipv6) => SocketAddr::new(IpAddr::V6(ipv6), addr.port),
56            })
57            .collect::<Vec<_>>();
58
59        let socket = match UdpSocket::bind(&addrs[..]) {
60            Ok(socket) => socket,
61            Err(e) => panic!("couldn't bind socket: {:?}", e),
62        };
63
64        let buffer = [0_u8; 65535];
65        Self {
66            socket,
67            buffer,
68            destination: if let Some(addr) = config.destination_address {
69                match addr.ip {
70                    IpAddr::V4(ipv4) => Some(SocketAddr::new(IpAddr::V4(ipv4), addr.port)),
71                    IpAddr::V6(ipv6) => Some(SocketAddr::new(IpAddr::V6(ipv6), addr.port)),
72                }
73            } else {
74                None
75            },
76            pause_timer_send: Arc::new(Mutex::new(false)),
77        }
78    }
79    pub fn receive(&mut self) -> Receiver<Vec<u8>> {
80        let mut socket = self.socket.try_clone().unwrap();
81        let mut buffer = self.buffer;
82        let (tx, rx) = Self::get_channel();
83
84        tokio::spawn(async move {
85            loop {
86                let maybe: Option<Vec<u8>>;
87                {
88                    let packets_queued = UdpRead::peek(&mut socket, &mut buffer);
89                    if packets_queued > 0 {
90                        maybe = match UdpRead::read_bytes(&mut socket, &mut buffer) {
91                            Ok(buf) => Some(buf),
92                            Err(_) => None,
93                        };
94
95                        if let Some(data) = maybe {
96                            match tx.send(data) {
97                                Ok(_) => {}
98                                Err(e) => {
99                                    println!("receiver error: {:?}", e.to_string())
100                                }
101                            }
102                        }
103                    }
104                }
105            }
106        });
107
108        rx
109    }
110
111    #[cfg(feature = "timers")]
112    pub fn resume_timer_send(&mut self) {
113        *self.pause_timer_send.lock().unwrap() = false;
114    }
115    #[cfg(feature = "timers")]
116    pub fn cancel_timer_receive(&mut self) {
117        *self.pause_timer_send.lock().unwrap() = true;
118    }
119    #[cfg(feature = "timers")]
120    pub fn send_with_timer(&mut self, data: Vec<u8>, timers: Timers) {
121        let socket = self.socket.try_clone().unwrap();
122        let destination = self.destination.clone().unwrap();
123        let pause = Arc::clone(&self.pause_timer_send);
124        tokio::spawn(async move {
125            let mut current_timer = timers.delays.into_iter();
126            let mut counter = 0;
127            loop {
128                if *pause.lock().unwrap() && counter > 0 {
129                    break;
130                }
131                let next_timer = match current_timer.next() {
132                    Some(timer) => timer,
133                    None => {
134                        break;
135                    }
136                };
137
138                socket
139                    .send_to(
140                        &data,
141                        format!("{}:{}", &destination.ip(), &destination.port()),
142                    )
143                    .unwrap();
144                tokio::time::sleep(Duration::from_millis(next_timer)).await;
145                counter += 1;
146            }
147        });
148    }
149    pub fn send(&mut self, data: Vec<u8>) {
150        let destination = self.destination.unwrap();
151        self.socket
152            .send_to(
153                &data,
154                format!("{}:{}", &destination.ip(), &destination.port()),
155            )
156            .unwrap();
157    }
158    pub fn change_destination(&mut self, new_destination: SocketAddr) {
159        self.destination = Some(new_destination);
160    }
161}
162
163struct UdpRead;
164
165impl UdpRead {
166    fn peek(socket: &mut UdpSocket, buffer: &mut [u8; 65535]) -> usize {
167        match socket.peek(buffer) {
168            Ok(received) => received,
169            Err(_e) => 0,
170        }
171    }
172
173    fn read_bytes(socket: &mut UdpSocket, buffer: &mut [u8; 65535]) -> Result<Vec<u8>, String> {
174        let (amt, _src) = socket.recv_from(buffer).unwrap();
175        let slice = &mut buffer[..amt];
176        Ok(slice.to_vec())
177    }
178}