1use std::{net::{SocketAddr, UdpSocket, Ipv4Addr, Ipv6Addr, IpAddr}, sync::atomic::{AtomicI32, Ordering}, time::{SystemTime, Instant, Duration}, error::Error, num::Wrapping};
2use rosc::{OscPacket, OscMessage, OscBundle, OscTime, OscError};
3use rosc::encoder;
4use rosc::OscType;
5use local_ip_address::local_ip;
6use indexmap::{IndexMap};
7
8use crate::{cursor::{Position}, osc_encode_decode::{EncodeOsc, OscEncoder}, Object, Cursor, Blob};
9
10pub trait SendOsc<P, E> where E: Error {
12 fn send_osc_packet(&self, packet: &P) -> Result<(), E>;
18
19 fn is_connected(&self) -> bool;
21
22 fn is_local(&self) -> bool;
24}
25
26pub struct UdpSender {
27 socket: UdpSocket,
28 address: SocketAddr
29}
30
31impl UdpSender {
32 pub fn new(target: SocketAddr) -> Result<Self, std::io::Error> {
37 let ip_address: IpAddr = if target.is_ipv4() {IpAddr::V4(Ipv4Addr::LOCALHOST)} else {IpAddr::V6(Ipv6Addr::LOCALHOST)};
38 Ok(Self {socket: UdpSocket::bind(SocketAddr::new(ip_address, 0))?, address: target})
39 }
40}
41
42impl SendOsc<OscPacket, OscError> for UdpSender {
43 fn send_osc_packet(&self, packet: &OscPacket) -> Result<(), OscError> {
49 let buffer = encoder::encode(packet)?;
50 self.socket.send_to(&buffer, self.address).unwrap();
51 Ok(())
52 }
53
54 fn is_connected(&self) -> bool {
56 true
57 }
58
59 fn is_local(&self) -> bool {
61 self.address.ip().is_loopback()
62 }
63}
64
65pub struct Server {
67 sender_list: Vec<Box<dyn SendOsc<OscPacket, OscError>>>,
68 source_name: String,
69 session_id: i32,
70 object_map: IndexMap<i32, Object>,
71 object_updated: bool,
72 frame_cursor_ids: Vec<i32>,
73 frame_object_ids: Vec<i32>,
74 frame_blob_ids: Vec<i32>,
75 cursor_map: IndexMap<i32, Cursor>,
76 cursor_updated: bool,
77 blob_map: IndexMap<i32, Blob>,
78 blob_updated: bool,
79 instant: Instant,
80 last_frame_instant: Instant,
81 frame_duration: Duration,
82 last_frame_id: AtomicI32,
83 pub full_update: bool,
85 periodic_messaging: bool,
86 update_interval: Duration,
87 pub object_profiling: bool,
88 object_update_time: Instant,
89 pub cursor_profiling: bool,
90 cursor_update_time: Instant,
91 pub blob_profiling: bool,
92 blob_update_time: Instant,
93}
94
95impl Server {
96 pub fn new(source_name: &str) -> Result<Self, std::io::Error> {
101 let mut server = Self::from_socket_addr(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3333))?;
102 server.set_source_name(source_name);
103 Ok(server)
104 }
105
106 pub fn from_socket_addr(socket_addr: SocketAddr) -> Result<Self, std::io::Error> {
111 Ok(Self::from_osc_sender(UdpSender::new(socket_addr)?))
112 }
113
114 pub fn from_osc_sender(osc_sender: impl SendOsc<OscPacket, OscError> + 'static) -> Self {
119 Self {
120 sender_list: vec![Box::new(osc_sender)],
121 source_name: String::new(),
122 session_id: -1,
123 object_map: IndexMap::new(),
124 object_updated: false,
125 cursor_map: IndexMap::new(),
126 cursor_updated: false,
127 blob_map: IndexMap::new(),
128 blob_updated: false,
129 instant: Instant::now(),
130 last_frame_instant: Instant::now(),
131 frame_duration: Duration::default(),
132 last_frame_id: AtomicI32::new(0),
133 full_update: false,
134 periodic_messaging: false,
135 update_interval: Duration::from_secs(1),
136 object_profiling: true,
137 object_update_time: Instant::now(),
138 cursor_profiling: true,
139 cursor_update_time: Instant::now(),
140 blob_profiling: true,
141 blob_update_time: Instant::now(),
142 frame_cursor_ids: Vec::new(),
143 frame_object_ids: Vec::new(),
144 frame_blob_ids: Vec::new(),
145 }
146 }
147
148 pub fn add_osc_sender(&mut self, osc_sender: impl SendOsc<OscPacket, OscError> + 'static) {
153 self.sender_list.push(Box::new(osc_sender));
154 }
155
156 pub fn set_source_name(&mut self, name: &str) {
161 let source = if self.sender_list[0].is_local() {String::from("local")} else {
162 match local_ip() {
163 Ok(ip) => ip.to_string(),
164 Err(_) => String::new()
165 }
166 };
167
168 self.source_name = format!("{}@{}", name, source);
169 }
170
171 pub fn enable_periodic_message(&mut self, interval: Option<Duration>) {
176 self.periodic_messaging = true;
177
178 if let Some(new_interval) = interval {
179 self.update_interval = new_interval.max(Duration::from_millis(10));
180 }
181 }
182
183 pub fn disable_periodic_message(&mut self) {
185 self.periodic_messaging = false;
186 }
187
188 fn get_session_id(&mut self) -> i32 {
189 self.session_id = (Wrapping(self.session_id) + Wrapping(1)).0;
190 self.session_id
191 }
192
193 pub fn create_object(&mut self, class_id: i32, x: f32, y: f32, angle: f32) -> i32 {
201 let session_id = self.get_session_id();
202
203 let object = Object::new(session_id, class_id, Position{x, y}, angle);
204 self.object_map.insert(session_id, object);
205 self.frame_object_ids.push(session_id);
206 self.object_updated = true;
207 session_id
208 }
209
210 pub fn update_object(&mut self, session_id: i32, x: f32, y: f32, angle: f32) {
218 if let Some(object) = self.object_map.get_mut(&session_id) {
219 object.update(self.frame_duration, Position{x, y}, angle);
220 self.frame_object_ids.push(session_id);
221 self.frame_object_ids.push(session_id);
222 self.object_updated = true;
223 }
224 }
225
226 pub fn remove_object(&mut self, session_id: i32) {
231 if self.object_map.remove(&session_id).is_some() {
232 self.object_updated = true;
233 }
234 }
235
236 pub fn create_cursor(&mut self, x: f32, y: f32) -> i32 {
242 let session_id = self.get_session_id();
243
244 let cursor = Cursor::new(session_id, Position{x, y});
245 self.cursor_map.insert(session_id, cursor);
246 self.frame_cursor_ids.push(session_id);
247 self.cursor_updated = true;
248 session_id
249 }
250
251 pub fn update_cursor(&mut self, session_id: i32, x: f32, y: f32) {
258 if let Some(cursor) = self.cursor_map.get_mut(&session_id) {
259 cursor.update(self.frame_duration, Position{x, y});
260 self.frame_cursor_ids.push(session_id);
261 self.cursor_updated = true;
262 }
263 }
264
265 pub fn remove_cursor(&mut self, session_id: i32) {
270 if self.cursor_map.remove(&session_id).is_some() {
271 self.cursor_updated = true;
272 }
273 }
274
275 pub fn create_blob(&mut self, x: f32, y: f32, angle: f32, width: f32, height: f32, area: f32) -> i32 {
285 let session_id = self.get_session_id();
286
287 let blob = Blob::new(session_id, Position{x, y}, angle, width, height, area);
288 self.blob_map.insert(session_id, blob);
289 self.frame_blob_ids.push(session_id);
290 self.blob_updated = true;
291 session_id
292 }
293
294 #[allow(clippy::too_many_arguments)]
295 pub fn update_blob(&mut self, session_id: i32, x: f32, y: f32, angle: f32, width: f32, height: f32, area: f32) {
306 if let Some(blob) = self.blob_map.get_mut(&session_id) {
307 blob.update(self.frame_duration, Position{x, y}, angle, width, height, area);
308 self.frame_blob_ids.push(session_id);
309 self.frame_blob_ids.push(session_id);
310 self.blob_updated = true;
311 }
312 }
313
314 pub fn remove_blob(&mut self, session_id: i32) {
319 if self.blob_map.remove(&session_id).is_some() {
320 self.blob_updated = true;
321 }
322 }
323
324 pub fn init_frame(&mut self) {
326 self.frame_duration = self.instant.duration_since(self.last_frame_instant);
327 self.last_frame_instant = Instant::now();
328 self.last_frame_id.fetch_add(1, Ordering::SeqCst);
329 }
330
331 pub fn commit_frame(&mut self) {
335 if self.object_updated || (self.periodic_messaging && self.object_profiling && self.object_update_time.duration_since(self.last_frame_instant) >= self.update_interval) {
336 if self.full_update {
337 let object_collection = self.frame_object_ids.iter().map(|id| self.object_map.get(id).unwrap());
338 self.deliver_osc_packet(OscPacket::Bundle(OscEncoder::encode_object_bundle(object_collection, self.source_name.clone(), self.last_frame_id.load(Ordering::SeqCst))));
339 }
340 else {
341 let object_collection = self.object_map.values();
342 self.deliver_osc_packet(OscPacket::Bundle(OscEncoder::encode_object_bundle(object_collection, self.source_name.clone(), self.last_frame_id.load(Ordering::SeqCst))));
343 }
344
345 self.frame_object_ids.clear();
346 self.object_update_time = self.last_frame_instant;
347 self.object_updated = false;
348 }
349
350 if self.cursor_updated || (self.periodic_messaging && self.cursor_profiling && self.cursor_update_time.duration_since(self.last_frame_instant) >= self.update_interval) {
351 if !self.full_update {
352 let cursor_collection = self.frame_cursor_ids.iter().map(|id| self.cursor_map.get(id).unwrap());
353 self.deliver_osc_packet(OscPacket::Bundle(OscEncoder::encode_cursor_bundle(cursor_collection, self.source_name.clone(), self.last_frame_id.load(Ordering::SeqCst))));
354 } else {
355 let cursor_collection = self.cursor_map.iter().map(|(_, cursor)| cursor);
356 self.deliver_osc_packet(OscPacket::Bundle(OscEncoder::encode_cursor_bundle(cursor_collection, self.source_name.clone(), self.last_frame_id.load(Ordering::SeqCst))));
357 };
358
359 self.frame_cursor_ids.clear();
360 self.cursor_update_time = self.last_frame_instant;
361 self.cursor_updated = false;
362 }
363
364 if self.blob_updated || (self.periodic_messaging && self.blob_profiling && self.blob_update_time.duration_since(self.last_frame_instant) >= self.update_interval) {
365 if !self.full_update {
366 let blob_collection = self.frame_blob_ids.iter().map(|id| self.blob_map.get(id).unwrap());
367 self.deliver_osc_packet(OscPacket::Bundle(OscEncoder::encode_blob_bundle(blob_collection, self.source_name.clone(), self.last_frame_id.load(Ordering::SeqCst))));
368 } else {
369 let blob_collection = self.blob_map.values();
370 self.deliver_osc_packet(OscPacket::Bundle(OscEncoder::encode_blob_bundle(blob_collection, self.source_name.clone(), self.last_frame_id.load(Ordering::SeqCst))));
371 };
372
373 self.frame_blob_ids.clear();
374 self.blob_update_time = self.last_frame_instant;
375 self.blob_updated = false;
376 }
377 }
378
379 pub fn send_full_messages(&self) {
380 let frame_id = self.last_frame_id.load(Ordering::SeqCst);
381 self.deliver_osc_packet(OscPacket::Bundle(OscEncoder::encode_object_bundle(self.object_map.values(), self.source_name.clone(), frame_id)));
382 self.deliver_osc_packet(OscPacket::Bundle(OscEncoder::encode_cursor_bundle(self.cursor_map.values(), self.source_name.clone(), frame_id)));
383 self.deliver_osc_packet(OscPacket::Bundle(OscEncoder::encode_blob_bundle(self.blob_map.values(), self.source_name.clone(), frame_id)));
384 }
385
386 fn deliver_osc_packet(&self, packet: OscPacket) {
387 for sender in &self.sender_list {
388 sender.send_osc_packet(&packet).expect("invalid packet")
389 }
390 }
391}
392
393impl Drop for Server {
394 fn drop(&mut self) {
395 let source_message = OscPacket::Message(OscMessage {
396 addr: "/tuio/2Dobj".into(),
397 args: vec![
398 OscType::String("source".into()),
399 OscType::String(self.source_name.clone())
400 ]
401 });
402
403 let alive_message = OscPacket::Message(OscMessage {
404 addr: "/tuio/2Dobj".into(),
405 args: vec![OscType::String("alive".into())]
406 });
407
408 let frame_message = OscPacket::Message(OscMessage {
409 addr: "/tuio/2Dobj".into(),
410 args: vec![OscType::String("fseq".into()), OscType::Int(-1)]
411 });
412
413 let packet = OscPacket::Bundle(OscBundle {
414 timetag: OscTime::try_from(SystemTime::now()).expect("failed with system time conversion"),
415 content: vec![
416 source_message,
417 alive_message,
418 frame_message
419 ]
420 });
421
422 self.deliver_osc_packet(packet);
423
424 let source_message = OscPacket::Message(OscMessage {
425 addr: "/tuio/2Dcur".into(),
426 args: vec![
427 OscType::String("source".into()),
428 OscType::String(self.source_name.clone())
429 ]
430 });
431
432 let alive_message = OscPacket::Message(OscMessage {
433 addr: "/tuio/2Dcur".into(),
434 args: vec![OscType::String("alive".into())]
435 });
436
437 let frame_message = OscPacket::Message(OscMessage {
438 addr: "/tuio/2Dcur".into(),
439 args: vec![OscType::String("fseq".into()), OscType::Int(-1)]
440 });
441
442 let packet = OscPacket::Bundle(OscBundle {
443 timetag: OscTime::try_from(SystemTime::now()).expect("failed with system time conversion"),
444 content: vec![
445 source_message,
446 alive_message,
447 frame_message
448 ]
449 });
450
451 self.deliver_osc_packet(packet);
452
453 let source_message = OscPacket::Message(OscMessage {
454 addr: "/tuio/2Dblb".into(),
455 args: vec![
456 OscType::String("source".into()),
457 OscType::String(self.source_name.clone())
458 ]
459 });
460
461 let alive_message = OscPacket::Message(OscMessage {
462 addr: "/tuio/2Dblb".into(),
463 args: vec![OscType::String("alive".into())]
464 });
465
466 let frame_message = OscPacket::Message(OscMessage {
467 addr: "/tuio/2Dblb".into(),
468 args: vec![OscType::String("fseq".into()), OscType::Int(-1)]
469 });
470
471 let packet = OscPacket::Bundle(OscBundle {
472 timetag: OscTime::try_from(SystemTime::now()).expect("failed with system time conversion"),
473 content: vec![
474 source_message,
475 alive_message,
476 frame_message
477 ]
478 });
479
480 self.deliver_osc_packet(packet);
481 }
482}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487
488 #[test]
489 fn id_wrapping() {
490 let mut server = Server::new("source_name").unwrap();
491 server.session_id = i32::MAX;
492 assert_eq!(server.get_session_id(), i32::MIN);
493 }
494}