1use std::net::{IpAddr, SocketAddr, UdpSocket};
15pub mod config;
17use config::Config;
18use std::sync::mpsc::{self, Receiver, Sender};
19use std::sync::{Arc, Mutex};
20use tokio;
21
22#[cfg(feature = "timers")]
23pub mod timers;
24
25#[cfg(feature = "timers")]
26use crate::timers::Timers;
27
28#[cfg(feature = "timers")]
29use tokio::time::Duration;
30
31#[derive(Debug)]
37pub struct Polygon {
38 pub socket: UdpSocket,
39 pub buffer: [u8; 65535],
40 pub destination: Option<SocketAddr>,
41 pub pause_timer_send: Arc<Mutex<bool>>,
42}
43
44impl Polygon {
45 pub fn get_channel() -> (Sender<Vec<u8>>, Receiver<Vec<u8>>) {
46 let (tx, rx) = mpsc::channel();
47 (tx, rx)
48 }
49 pub fn configure(config: Config) -> Self {
50 let addrs = config
51 .bind_addresses
52 .into_iter()
53 .map(|addr| match addr.ip {
54 IpAddr::V4(ipv4) => SocketAddr::new(IpAddr::V4(ipv4), addr.port),
55 IpAddr::V6(ipv6) => SocketAddr::new(IpAddr::V6(ipv6), addr.port),
56 })
57 .collect::<Vec<_>>();
58
59 let socket = match UdpSocket::bind(&addrs[..]) {
60 Ok(socket) => socket,
61 Err(e) => panic!("couldn't bind socket: {:?}", e),
62 };
63
64 let buffer = [0_u8; 65535];
65 Self {
66 socket,
67 buffer,
68 destination: if let Some(addr) = config.destination_address {
69 match addr.ip {
70 IpAddr::V4(ipv4) => Some(SocketAddr::new(IpAddr::V4(ipv4), addr.port)),
71 IpAddr::V6(ipv6) => Some(SocketAddr::new(IpAddr::V6(ipv6), addr.port)),
72 }
73 } else {
74 None
75 },
76 pause_timer_send: Arc::new(Mutex::new(false)),
77 }
78 }
79 pub fn receive(&mut self) -> Receiver<Vec<u8>> {
80 let mut socket = self.socket.try_clone().unwrap();
81 let mut buffer = self.buffer;
82 let (tx, rx) = Self::get_channel();
83
84 tokio::spawn(async move {
85 loop {
86 let maybe: Option<Vec<u8>>;
87 {
88 let packets_queued = UdpRead::peek(&mut socket, &mut buffer);
89 if packets_queued > 0 {
90 maybe = match UdpRead::read_bytes(&mut socket, &mut buffer) {
91 Ok(buf) => Some(buf),
92 Err(_) => None,
93 };
94
95 if let Some(data) = maybe {
96 match tx.send(data) {
97 Ok(_) => {}
98 Err(e) => {
99 println!("receiver error: {:?}", e.to_string())
100 }
101 }
102 }
103 }
104 }
105 }
106 });
107
108 rx
109 }
110
111 #[cfg(feature = "timers")]
112 pub fn resume_timer_send(&mut self) {
113 *self.pause_timer_send.lock().unwrap() = false;
114 }
115 #[cfg(feature = "timers")]
116 pub fn cancel_timer_receive(&mut self) {
117 *self.pause_timer_send.lock().unwrap() = true;
118 }
119 #[cfg(feature = "timers")]
120 pub fn send_with_timer(&mut self, data: Vec<u8>, timers: Timers) {
121 let socket = self.socket.try_clone().unwrap();
122 let destination = self.destination.clone().unwrap();
123 let pause = Arc::clone(&self.pause_timer_send);
124 tokio::spawn(async move {
125 let mut current_timer = timers.delays.into_iter();
126 let mut counter = 0;
127 loop {
128 if *pause.lock().unwrap() && counter > 0 {
129 break;
130 }
131 let next_timer = match current_timer.next() {
132 Some(timer) => timer,
133 None => {
134 break;
135 }
136 };
137
138 socket
139 .send_to(
140 &data,
141 format!("{}:{}", &destination.ip(), &destination.port()),
142 )
143 .unwrap();
144 tokio::time::sleep(Duration::from_millis(next_timer)).await;
145 counter += 1;
146 }
147 });
148 }
149 pub fn send(&mut self, data: Vec<u8>) {
150 let destination = self.destination.unwrap();
151 self.socket
152 .send_to(
153 &data,
154 format!("{}:{}", &destination.ip(), &destination.port()),
155 )
156 .unwrap();
157 }
158 pub fn change_destination(&mut self, new_destination: SocketAddr) {
159 self.destination = Some(new_destination);
160 }
161}
162
163struct UdpRead;
164
165impl UdpRead {
166 fn peek(socket: &mut UdpSocket, buffer: &mut [u8; 65535]) -> usize {
167 match socket.peek(buffer) {
168 Ok(received) => received,
169 Err(_e) => 0,
170 }
171 }
172
173 fn read_bytes(socket: &mut UdpSocket, buffer: &mut [u8; 65535]) -> Result<Vec<u8>, String> {
174 let (amt, _src) = socket.recv_from(buffer).unwrap();
175 let slice = &mut buffer[..amt];
176 Ok(slice.to_vec())
177 }
178}