1use crate::DanteDeviceEncoding::{PCM16, PCM24, PCM32};
2use ascii::AsciiStr;
3use bytes::BytesMut;
4use log::{debug, error, info, warn};
5use mdns_sd::{ServiceDaemon, ServiceEvent};
6use std::collections::{HashMap, HashSet};
7use std::error::Error;
8use std::fmt::{Debug, Display, Formatter, Write};
9use std::hash::{Hash, Hasher};
10use std::net::{Ipv4Addr, UdpSocket};
11use std::sync::{Arc, Mutex};
12use std::thread::sleep;
13use std::time::Duration;
14
15const CMC_SERVICE: &'static str = "_netaudio-cmc._udp.local.";
16const DBC_SERVICE: &'static str = "_netaudio-dbc._udp.local.";
17const ARC_SERVICE: &'static str = "_netaudio-arc._udp.local.";
18const CHAN_SERVICE: &'static str = "_netaudio-chan._udp.local.";
19
20const DEVICE_CONTROL_PORT: u32 = 8800;
21const DEVICE_HEARTBEAT_PORT: u32 = 8708;
22const DEVICE_INFO_PORT: u32 = 8702;
23const DEVICE_INFO_SRC_PORT1: u32 = 1029;
24const DEVICE_INFO_SRC_PORT2: u32 = 1030;
25
26const DEVICE_SETTINGS_PORT: u32 = 8700;
27
28#[derive(Debug)]
29pub enum DanteVersion {
30 Dante4_4_1_3,
31 Dante4_2_1_3,
32}
33
34impl Display for DanteVersion {
35 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
36 f.write_fmt(format_args!(
37 "{}",
38 match self {
39 DanteVersion::Dante4_4_1_3 => "4.4.1.3",
40 DanteVersion::Dante4_2_1_3 => "4.2.1.3",
41 }
42 ))
43 }
44}
45
46impl DanteVersion {
47 fn get_commands(&self) -> DanteVersionCommands {
48 match self {
49 DanteVersion::Dante4_4_1_3 => DANTECOMMANDS_4_4_1_3,
50 DanteVersion::Dante4_2_1_3 => DANTECOMMANDS_4_2_1_3,
51 }
52 }
53
54 pub fn from_string(string: &str) -> Option<Self> {
55 match string {
56 "4.4.1.3" => Some(Self::Dante4_4_1_3),
57 "4.2.1.3" => Some(Self::Dante4_2_1_3),
58 _ => None,
59 }
60 }
61}
62
63struct DanteVersionCommands {
64 command_subscription: [u8; 2],
65}
66
67const DANTECOMMANDS_4_4_1_3: DanteVersionCommands = DanteVersionCommands {
69 command_subscription: [0x34, 0x10],
70};
71const DANTECOMMANDS_4_2_1_3: DanteVersionCommands = DanteVersionCommands {
72 command_subscription: [0x30, 0x10],
73};
74
75#[derive(Clone)]
88enum DanteDeviceEncoding {
89 PCM16,
90 PCM24,
91 PCM32,
92}
93
94#[derive(Clone)]
95struct DBCInfo {
96 addresses: HashSet<Ipv4Addr>,
97 port: u16,
98}
99
100#[derive(Clone)]
101struct CMCInfo {
102 addresses: HashSet<Ipv4Addr>,
103 port: u16,
104 id: String,
105 manufacturer: String,
106 model: String,
107}
108
109#[derive(Clone)]
110struct ARCInfo {
111 addresses: HashSet<Ipv4Addr>,
112 port: u16,
113 router_vers: String,
114 router_info: String,
115}
116
117#[derive(Clone)]
118struct CHANInfo {
119 name: String,
120 id: Option<u16>,
121 sample_rate: Option<u32>,
122 encoding: Option<DanteDeviceEncoding>,
123 latency: Option<Duration>,
124}
125
126impl PartialEq<Self> for CHANInfo {
127 fn eq(&self, other: &Self) -> bool {
128 self.id == other.id
129 }
130}
131
132impl Eq for CHANInfo {}
133
134impl Hash for CHANInfo {
135 fn hash<H: Hasher>(&self, state: &mut H) {
136 self.id.hash(state);
137 }
138}
139
140#[derive(Debug)]
141struct DeviceAlreadyPresent {}
142
143impl Display for DeviceAlreadyPresent {
144 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145 write!(f, "Device not present.")
146 }
147}
148
149impl std::error::Error for DeviceAlreadyPresent {}
150
151#[derive(Debug)]
152struct DeviceNotPresent {}
153
154impl Display for DeviceNotPresent {
155 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
156 write!(f, "Device not present.")
157 }
158}
159
160impl std::error::Error for DeviceNotPresent {}
161
162#[derive(Debug)]
163struct DeviceStatus {
164 connected_dbc: bool,
165 connected_cmc: bool,
166 connected_arc: bool,
167 connected_chan: bool,
168}
169
170impl DeviceStatus {
171 fn new() -> Self {
172 DeviceStatus {
173 connected_dbc: false,
174 connected_cmc: false,
175 connected_arc: false,
176 connected_chan: false,
177 }
178 }
179}
180
181struct DeviceDiscoveryCache {
182 dbc_info: Option<DBCInfo>,
183 cmc_info: Option<CMCInfo>,
184 arc_info: Option<ARCInfo>,
185 chan_info: HashSet<CHANInfo>,
186}
187
188struct DanteDeviceList {
189 devices: HashMap<String, DeviceStatus>,
190 caches: HashMap<String, DeviceDiscoveryCache>,
191}
192
193impl DanteDeviceList {
194 fn add_device(&mut self, new_device_name: &str) -> Result<(), DeviceAlreadyPresent> {
196 if self.devices.contains_key(new_device_name) {
197 return Err(DeviceAlreadyPresent {});
198 }
199
200 self.devices
201 .insert(new_device_name.to_owned(), DeviceStatus::new());
202
203 if !self.caches.contains_key(new_device_name) {
205 self.caches.insert(
206 new_device_name.to_owned(),
207 DeviceDiscoveryCache {
208 dbc_info: None,
209 cmc_info: None,
210 arc_info: None,
211 chan_info: HashSet::new(),
212 },
213 );
214 }
215
216 Ok(())
217 }
218
219 fn try_add_device(&mut self, new_device_name: &str) {
220 let _ = self.add_device(new_device_name);
222 }
223
224 fn device_connected(&self, device_name: &str) -> bool {
225 self.devices.contains_key(device_name)
226 }
227
228 fn channel_id_exist(&self, device_name: &str, chan_id: u16) -> bool {
229 if !(self.device_connected(device_name)) {
230 return false;
231 }
232 match self.caches.get(device_name) {
233 Some(cache) => cache.chan_info.iter().any(|chan_info| match chan_info.id {
234 Some(chan_info_id) => chan_info_id == chan_id,
235 None => false,
236 }),
237 None => {
238 error!("Cache doesn't exist despite device being connected!");
239 false
240 }
241 }
242 }
243
244 fn get_channel_name_from_id(&self, device_name: &str, chan_id: u16) -> Option<&str> {
245 if !(self.device_connected(device_name)) {
246 return None;
247 }
248
249 match self.caches.get(device_name) {
250 Some(cache) => match cache.chan_info.iter().find(|chan_info| match chan_info.id {
251 Some(chan_info_id) => chan_info_id == chan_id,
252 None => false,
253 }) {
254 Some(chan) => Some(&chan.name),
255 None => None,
256 },
257 None => {
258 error!("Cache doesn't exist despite device being connected!");
259 None
260 }
261 }
262 }
263
264 fn get_device_ips(&self, device_name: &str) -> Option<HashSet<Ipv4Addr>> {
265 if !(self.device_connected(device_name)) {
266 return None;
267 }
268
269 let mut device_ips: HashSet<Ipv4Addr> = HashSet::new();
270
271 match self.caches.get(device_name) {
272 None => return None,
273 Some(caches) => {
274 if let Some(arc_info) = &caches.arc_info {
275 device_ips.extend(&arc_info.addresses);
276 }
277 if let Some(dbc_info) = &caches.dbc_info {
278 device_ips.extend(&dbc_info.addresses);
279 }
280 if let Some(cmc_info) = &caches.cmc_info {
281 device_ips.extend(&cmc_info.addresses);
282 }
283 }
284 }
285
286 Some(device_ips)
287 }
288
289 fn update_dbc(&mut self, device_name: &str, info: DBCInfo) {
291 self.caches
292 .get_mut(device_name)
293 .expect("Tried updating cache of device that doesn't exist")
294 .dbc_info = Some(info);
295 debug!("update_dbc for {}", device_name);
296 }
297
298 fn update_cmc(&mut self, device_name: &str, info: CMCInfo) {
300 self.caches
301 .get_mut(device_name)
302 .expect("Tried updating cache of device that doesn't exist")
303 .cmc_info = Some(info);
304 debug!("update_cmc for {}", device_name);
305 }
306
307 fn update_arc(&mut self, device_name: &str, info: ARCInfo) {
309 self.caches
310 .get_mut(device_name)
311 .expect("Tried updating cache of device that doesn't exist")
312 .arc_info = Some(info);
313 debug!("update_arc for {}", device_name);
314 }
315
316 fn update_chan(&mut self, device_name: &str, info: CHANInfo) {
318 self.caches
319 .get_mut(device_name)
320 .expect("Tried updating cache of device that doesn't exist")
321 .chan_info
322 .replace(info);
323 debug!("update_chan for {}", device_name);
324 }
325
326 fn connect_dbc(&mut self, device_name: &str) {
327 self.try_add_device(device_name);
328 self.devices
329 .get_mut(device_name)
330 .expect("Just tried to add device, should be able to get it")
331 .connected_dbc = true;
332 debug!("Connected to dbc discovery.");
333 }
334
335 fn connect_cmc(&mut self, device_name: &str) {
336 self.try_add_device(device_name);
337 self.devices
338 .get_mut(device_name)
339 .expect("Just tried to add device, should be able to get it")
340 .connected_cmc = true;
341 debug!("Connected to cmc discovery.");
342 }
343
344 fn connect_arc(&mut self, device_name: &str) {
345 self.try_add_device(device_name);
346 self.devices
347 .get_mut(device_name)
348 .expect("Just tried to add device, should be able to get it")
349 .connected_arc = true;
350 debug!("Connected to arc discovery.");
351 }
352
353 fn connect_chan(&mut self, device_name: &str) {
354 self.try_add_device(device_name);
355 self.devices
356 .get_mut(device_name)
357 .expect("Just tried to add device, should be able to get it")
358 .connected_chan = true;
359 debug!("Connected to chan discovery.");
360 }
361
362 fn disconnect_dbc(&mut self, device_name: &str) {
363 self.devices
364 .get_mut(device_name)
365 .expect("If we're calling disconnect, we should still have the device in the list.")
366 .connected_dbc = false;
367 self.check_remove(device_name)
368 .expect("If we're calling disconnect, we should still have the device in the list.");
369 debug!("Disconnected from dbc discovery");
370 }
371
372 fn disconnect_cmc(&mut self, device_name: &str) {
373 self.devices
374 .get_mut(device_name)
375 .expect("If we're calling disconnect, we should still have the device in the list.")
376 .connected_cmc = false;
377 self.check_remove(device_name)
378 .expect("If we're calling disconnect, we should still have the device in the list.");
379 debug!("Disconnected from cmc discovery");
380 }
381
382 fn disconnect_arc(&mut self, device_name: &str) {
383 self.devices
384 .get_mut(device_name)
385 .expect("If we're calling disconnect, we should still have the device in the list.")
386 .connected_arc = false;
387 self.check_remove(device_name)
388 .expect("If we're calling disconnect, we should still have the device in the list.");
389 debug!("Disconnected from arc discovery");
390 }
391
392 fn disconnect_chan(&mut self, device_name: &str) {
393 self.devices
394 .get_mut(device_name)
395 .expect("If we're calling disconnect, we should still have the device in the list.")
396 .connected_chan = false;
397 self.check_remove(device_name)
398 .expect("If we're calling disconnect, we should still have the device in the list.");
399 debug!("Disconnected from chan discovery");
400 }
401
402 fn check_remove(&mut self, device_name: &str) -> Result<(), DeviceNotPresent> {
405 match self.devices.get(device_name) {
406 Some(device_status) => {
407 if !(device_status.connected_dbc
408 || device_status.connected_cmc
409 || device_status.connected_arc
410 || device_status.connected_chan)
411 {
412 self.devices.remove(device_name);
413 }
414
415 Ok(())
416 }
417 None => Err(DeviceNotPresent {}),
418 }
419 }
420
421 fn new() -> Self {
422 DanteDeviceList {
423 devices: HashMap::new(),
424 caches: HashMap::new(),
425 }
426 }
427}
428
429fn cutoff_address<'a>(hostname: &'a str, address: Option<&'a str>) -> &'a str {
431 let cutoff_string = ".".to_string() + address.unwrap_or("local.");
432 match hostname.strip_suffix(&cutoff_string) {
433 None => {
434 warn!(
435 "Device \"{}\" doesn't end with \"{}\". This is abnormal.",
436 hostname, cutoff_string
437 );
438 hostname
439 }
440 Some(stripped) => stripped,
441 }
442}
443
444#[derive(thiserror::Error, Debug)]
445pub enum MakeSubscriptionError {
446 #[error("error sending udp packet")]
447 ConnectionFailed,
448}
449#[derive(thiserror::Error, Debug)]
450pub enum ClearSubscriptionError {
451 #[error("error sending udp packet")]
452 ConnectionFailed,
453}
454
455pub struct DanteDeviceManager {
457 device_list: Arc<Mutex<DanteDeviceList>>,
458 running: Arc<Mutex<bool>>,
459 current_command_sequence_id: u16,
460}
461
462impl DanteDeviceManager {
463 pub fn start_discovery(&self) -> Result<(), Box<dyn std::error::Error>> {
465 info!("Starting discovery");
466 *self.running.lock().unwrap() = true;
467
468 let mdns = ServiceDaemon::new().expect("Failed to create mdns service daemon!");
470
471 let dbc_receiver = mdns
473 .browse(DBC_SERVICE)
474 .unwrap_or_else(|_| panic!("Failed to browse for {}", DBC_SERVICE));
475
476 let device_list_dbc = self.device_list.clone();
478 let running_dbc = self.running.clone();
479
480 let dbc_thread = std::thread::spawn(move || {
481 debug!("Starting discovery thread");
482 while *running_dbc.lock().unwrap() {
483 while let Ok(event) = dbc_receiver.try_recv() {
484 match event {
485 ServiceEvent::SearchStarted(service_type) => {
486 debug!("DBC Search Started: {}", &service_type);
487 }
488 ServiceEvent::ServiceFound(service_type, fullname) => {
489 debug!("DBC Search Found: {}, {}", &service_type, &fullname);
490 let device_name = cutoff_address(&fullname, Some(DBC_SERVICE));
491
492 let mut device_list_lock = device_list_dbc
493 .lock()
494 .expect("Cannot get mutex lock of DanteDevices");
495
496 device_list_lock.connect_dbc(device_name);
497 }
498 ServiceEvent::ServiceResolved(service_info) => {
499 info!("DBC Service Resolved: {:?}", &service_info);
500 let device_name =
501 cutoff_address(service_info.get_fullname(), Some(DBC_SERVICE));
502 let mut device_list_lock = device_list_dbc
503 .lock()
504 .expect("Cannot get mutex lock of DanteDevices");
505 device_list_lock.update_dbc(
506 device_name,
507 DBCInfo {
508 addresses: service_info.get_addresses().to_owned(),
509 port: service_info.get_port().to_owned(),
510 },
511 );
512 }
513 ServiceEvent::ServiceRemoved(service_type, fullname) => {
514 info!("DBC Service Removed: a:{}, b:{}", &service_type, &fullname);
515 let mut device_list_lock = device_list_dbc.lock().unwrap();
516 device_list_lock
517 .disconnect_dbc(cutoff_address(&fullname, Some(DBC_SERVICE)));
518 }
519 ServiceEvent::SearchStopped(service_type) => {
520 error!("DBC Search Stopped: {}", &service_type);
521 }
522 }
523 }
524 sleep(Duration::from_millis(100));
525 }
526 });
527
528 let cmc_receiver = mdns
530 .browse(CMC_SERVICE)
531 .unwrap_or_else(|_| panic!("Failed to browse for {}", CMC_SERVICE));
532
533 let device_list_cmc = self.device_list.clone();
535 let running_cmc = self.running.clone();
536
537 let cmc_thread = std::thread::spawn(move || {
538 debug!("Starting discovery thread");
539 while *running_cmc.lock().unwrap() {
540 while let Ok(event) = cmc_receiver.try_recv() {
541 match event {
542 ServiceEvent::SearchStarted(service_type) => {
543 debug!("CMC Search Started: {}", &service_type);
544 }
545 ServiceEvent::ServiceFound(service_type, fullname) => {
546 debug!("CMC Search Found: {}, {}", &service_type, &fullname);
547 let device_name = cutoff_address(&fullname, Some(CMC_SERVICE));
548
549 let mut device_list_lock = device_list_cmc
550 .lock()
551 .expect("Cannot get mutex lock of DanteDevices");
552
553 device_list_lock.connect_cmc(device_name);
554 }
555 ServiceEvent::ServiceResolved(service_info) => {
556 info!("CMC Service Resolved: {:?}", &service_info);
557 let device_name =
558 cutoff_address(service_info.get_fullname(), Some(CMC_SERVICE));
559 let mut device_list_lock = device_list_cmc
560 .lock()
561 .expect("Cannot get mutex lock of DanteDevices");
562 device_list_lock.update_cmc(
563 device_name,
564 CMCInfo {
565 addresses: service_info.get_addresses().to_owned(),
566 port: service_info.get_port().to_owned(),
567 id: match service_info.get_property("id") {
568 Some(id_property) => id_property.val_str().to_owned(),
569 None => "N/A".to_string(),
570 },
571 manufacturer: match service_info.get_property("mf") {
572 Some(mf_property) => mf_property.val_str().to_owned(),
573 None => "N/A".to_string(),
574 },
575 model: match service_info.get_property("model") {
576 Some(model_property) => model_property.val_str().to_owned(),
577 None => "N/A".to_string(),
578 },
579 },
580 );
581 }
582 ServiceEvent::ServiceRemoved(service_type, fullname) => {
583 info!("CMC Service Removed: a:{}, b:{}", &service_type, &fullname);
584 let mut device_list_lock = device_list_cmc.lock().unwrap();
585 device_list_lock
586 .disconnect_cmc(cutoff_address(&fullname, Some(CMC_SERVICE)));
587 }
588 ServiceEvent::SearchStopped(service_type) => {
589 error!("CMC Search Stopped: {}", &service_type);
590 }
591 }
592 }
593 sleep(Duration::from_millis(100));
594 }
595 });
596
597 let arc_receiver = mdns
599 .browse(ARC_SERVICE)
600 .unwrap_or_else(|_| panic!("Failed to browse for {}", ARC_SERVICE));
601
602 let device_list_arc = self.device_list.clone();
604 let running_arc = self.running.clone();
605
606 let arc_thread = std::thread::spawn(move || {
607 debug!("Starting discovery thread");
608 while *running_arc.lock().unwrap() {
609 while let Ok(event) = arc_receiver.try_recv() {
610 match event {
611 ServiceEvent::SearchStarted(service_type) => {
612 debug!("ARC Search Started: {}", &service_type);
613 }
614 ServiceEvent::ServiceFound(service_type, fullname) => {
615 debug!("ARC Search Found: {}, {}", &service_type, &fullname);
616 let device_name = cutoff_address(&fullname, Some(ARC_SERVICE));
617
618 let mut device_list_lock = device_list_arc
619 .lock()
620 .expect("Cannot get mutex lock of DanteDevices");
621
622 device_list_lock.connect_arc(device_name);
623 }
624 ServiceEvent::ServiceResolved(service_info) => {
625 info!("ARC Service Resolved: {:?}", &service_info);
626 let device_name =
627 cutoff_address(service_info.get_fullname(), Some(ARC_SERVICE));
628 let mut device_list_lock = device_list_arc
629 .lock()
630 .expect("Cannot get mutex lock of DanteDevices");
631 device_list_lock.update_arc(
632 device_name,
633 ARCInfo {
634 addresses: service_info.get_addresses().to_owned(),
635 port: service_info.get_port().to_owned(),
636 router_vers: match service_info.get_property("router_vers") {
637 Some(router_vers_property) => {
638 router_vers_property.val_str().to_owned()
639 }
640 None => "N/A".to_string(),
641 },
642 router_info: match service_info.get_property("router_info") {
643 Some(router_info_property) => {
644 router_info_property.val_str().to_owned()
645 }
646 None => "N/A".to_string(),
647 },
648 },
649 );
650 }
651 ServiceEvent::ServiceRemoved(service_type, fullname) => {
652 info!("ARC Service Removed: a:{}, b:{}", &service_type, &fullname);
653 let mut device_list_lock = device_list_arc.lock().unwrap();
654 device_list_lock
655 .disconnect_arc(cutoff_address(&fullname, Some(ARC_SERVICE)));
656 }
657 ServiceEvent::SearchStopped(service_type) => {
658 error!("ARC Search Stopped: {}", &service_type);
659 }
660 }
661 }
662 sleep(Duration::from_millis(100));
663 }
664 });
665
666 let chan_receiver = mdns
668 .browse(CHAN_SERVICE)
669 .unwrap_or_else(|_| panic!("Failed to browse for {}", CHAN_SERVICE));
670
671 let device_list_chan = self.device_list.clone();
673 let running_chan = self.running.clone();
674
675 let chan_thread = std::thread::spawn(move || {
676 debug!("Starting discovery thread");
677 while *running_chan.lock().unwrap() {
678 while let Ok(event) = chan_receiver.try_recv() {
679 match event {
680 ServiceEvent::SearchStarted(service_type) => {
681 debug!("CHAN Search Started: {}", &service_type);
682 }
683 ServiceEvent::ServiceFound(service_type, fullname) => {
684 debug!("CHAN Search Found: {}, {}", &service_type, &fullname);
685 let (chan_name, full_name) = fullname
686 .split_once("@")
687 .expect("CHAN fullname without \"@\" unexpected.");
688 let device_name = cutoff_address(full_name, Some(CHAN_SERVICE));
689
690 let mut device_list_lock = device_list_chan
691 .lock()
692 .expect("Cannot get mutex lock of DanteDevices");
693
694 device_list_lock.connect_chan(device_name);
695 }
696 ServiceEvent::ServiceResolved(service_info) => {
697 info!("CHAN Service Resolved: {:?}", &service_info);
698 let (chan_name, full_name) = service_info
699 .get_fullname()
700 .split_once("@")
701 .expect("CHAN fullname without \"@\" unexpected.");
702 let device_name = cutoff_address(full_name, Some(CHAN_SERVICE));
703 let mut device_list_lock = device_list_chan
704 .lock()
705 .expect("Cannot get mutex lock of DanteDevices");
706 device_list_lock.update_chan(
707 device_name,
708 CHANInfo {
709 name: chan_name.to_owned(),
710 id: service_info.get_property("id").map(|id_property| {
711 id_property
712 .val_str()
713 .to_owned()
714 .parse()
715 .expect("Couldn't parse chan service id")
716 }),
717 sample_rate: match service_info.get_property("rate") {
718 Some(rate_property) => rate_property.val_str().parse().ok(),
719 None => None,
720 },
721 encoding: match service_info.get_property("en") {
722 Some(encoding_property) => {
723 match encoding_property.val_str() {
724 "16" => Some(PCM16),
725 "24" => Some(PCM24),
726 "32" => Some(PCM32),
727 &_ => None,
728 }
729 }
730 None => None,
731 },
732 latency: match service_info.get_property("latency_ns") {
733 Some(latency_property) => latency_property
734 .val_str()
735 .parse()
736 .ok()
737 .map(Duration::from_nanos),
738 None => None,
739 },
740 },
741 );
742 }
743 ServiceEvent::ServiceRemoved(service_type, fullname) => {
744 info!("CHAN Service Removed: a:{}, b:{}", &service_type, &fullname);
745 let (chan_name, full_name) = fullname
746 .split_once("@")
747 .expect("CHAN fullname without \"@\" unexpected.");
748 let device_name = cutoff_address(full_name, Some(CHAN_SERVICE));
749
750 let mut device_list_lock = device_list_chan.lock().unwrap();
751 device_list_lock.disconnect_chan(device_name);
752 }
753 ServiceEvent::SearchStopped(service_type) => {
754 error!("CHAN Search Stopped: {}", &service_type);
755 }
756 }
757 }
758 sleep(Duration::from_millis(100));
759 }
760 });
761
762 Ok(())
763 }
764
765 fn get_new_command_sequence_id(&mut self) -> u16 {
766 let return_id = self.current_command_sequence_id;
767 self.current_command_sequence_id += 1;
768 return_id
769 }
770
771 fn make_dante_command(&mut self, command: [u8; 2], command_args: &[u8]) -> BytesMut {
772 let mut buffer = bytes::BytesMut::new();
773 buffer.extend_from_slice(&[0x28, 0x30]);
774 assert_eq!(buffer.len(), 2);
775 buffer.extend_from_slice(&((command_args.len() + 10) as u16).to_be_bytes());
776 assert_eq!(buffer.len(), 4);
777 buffer.extend_from_slice(&self.get_new_command_sequence_id().to_be_bytes());
778 assert_eq!(buffer.len(), 6);
779 buffer.extend(command);
780 assert_eq!(buffer.len(), 8);
781 buffer.extend_from_slice(&[0x00, 0x00]);
782 assert_eq!(buffer.len(), 10);
783 buffer.extend_from_slice(&command_args);
784 buffer
785 }
786
787 fn send_bytes_to_addresses(
788 addresses: &HashSet<Ipv4Addr>,
789 port: u16,
790 bytes: &[u8],
791 ) -> Result<(), Box<dyn Error>> {
792 let socket = UdpSocket::bind("0.0.0.0:0")?;
793 for address in addresses {
794 debug!(
795 "Sent bytes {:?} to {}:{}",
796 hex::encode(bytes),
797 address,
798 port
799 );
800 socket.send_to(bytes, (*address, port))?;
801 }
802 Ok(())
803 }
804
805 fn send_bytes_to_address(
806 address: &Ipv4Addr,
807 port: u16,
808 bytes: &[u8],
809 ) -> Result<(), Box<dyn Error>> {
810 let socket = UdpSocket::bind("0.0.0.0:0")?;
811
812 debug!(
813 "Sent bytes {:?} to {}:{}",
814 hex::encode(bytes),
815 address,
816 port
817 );
818 socket.send_to(bytes, (*address, port))?;
819
820 Ok(())
821 }
822
823 pub fn make_subscription(
825 &mut self,
826 version: &DanteVersion,
827 rx_device_ip: &Ipv4Addr,
828 rx_channel_id: u16,
829 tx_device: &AsciiStr,
830 tx_channel: &AsciiStr,
831 ) -> Result<(), MakeSubscriptionError> {
832 let tx_device_name_buffer = tx_device.as_bytes();
833 let tx_channel_name_buffer = tx_channel.as_bytes();
834
835 let port: u16 = 4440;
836
837 let mut command_buffer = BytesMut::new();
838
839 match version {
840 DanteVersion::Dante4_4_1_3 => {
841 command_buffer.extend_from_slice(&[
842 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x00, 0x20, 0x01,
843 ]);
844 assert_eq!(command_buffer.len(), 10);
845 command_buffer.extend_from_slice(&rx_channel_id.to_be_bytes());
846 assert_eq!(command_buffer.len(), 12);
847 command_buffer.extend_from_slice(&[0x00, 0x03, 0x01, 0x14]);
848 assert_eq!(command_buffer.len(), 16);
849 let end_pos: u16 = (276 + tx_channel_name_buffer.len() + 1) as u16;
850 command_buffer.extend_from_slice(&end_pos.to_be_bytes());
851 assert_eq!(command_buffer.len(), 18);
852 command_buffer.extend_from_slice(&vec![0x00; 248]);
853 assert_eq!(command_buffer.len(), 266);
854 command_buffer.extend_from_slice(tx_channel_name_buffer);
855 command_buffer.extend_from_slice(&[0x00]);
856 command_buffer.extend_from_slice(tx_device_name_buffer);
857 command_buffer.extend_from_slice(&[0x00]);
858 }
859 DanteVersion::Dante4_2_1_3 => {
860 command_buffer.extend_from_slice(&[0x10, 0x01]);
861 assert_eq!(command_buffer.len(), 2);
862 command_buffer.extend_from_slice(&rx_channel_id.to_be_bytes());
863 assert_eq!(command_buffer.len(), 4);
864 command_buffer.extend_from_slice(&[0x01, 0x4C]);
865 assert_eq!(command_buffer.len(), 6);
866 let end_pos: u16 = (332 + tx_channel_name_buffer.len() + 1) as u16;
867 command_buffer.extend_from_slice(&end_pos.to_be_bytes());
868 assert_eq!(command_buffer.len(), 8);
869 command_buffer.extend_from_slice(&vec![0x00; 314]);
870 assert_eq!(command_buffer.len(), 322);
871 command_buffer.extend_from_slice(tx_channel_name_buffer);
872 command_buffer.extend_from_slice(&[0x00]);
873 command_buffer.extend_from_slice(tx_device_name_buffer);
874 command_buffer.extend_from_slice(&[0x00]);
875 }
876 }
877
878 match Self::send_bytes_to_address(
879 rx_device_ip,
880 port,
881 &self.make_dante_command(version.get_commands().command_subscription, &command_buffer),
882 ) {
883 Ok(_) => Ok(()),
884 Err(_) => Err(MakeSubscriptionError::ConnectionFailed),
885 }
886 }
887
888 pub fn clear_subscription(
890 &mut self,
891 version: &DanteVersion,
892 rx_device_ip: &Ipv4Addr,
893 rx_channel_id: u16,
894 ) -> Result<(), MakeSubscriptionError> {
895 let mut command_buffer = BytesMut::new();
896
897 match version {
898 DanteVersion::Dante4_4_1_3 => {
899 command_buffer.extend_from_slice(&[
900 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x00, 0x20, 0x01,
901 ]);
902 assert_eq!(command_buffer.len(), 10);
903 command_buffer.extend_from_slice(&rx_channel_id.to_be_bytes());
904 assert_eq!(command_buffer.len(), 12);
905 command_buffer.extend_from_slice(&[0x00, 0x03, 0x00, 0x00, 0x00, 0x00]);
906 assert_eq!(command_buffer.len(), 18);
907 command_buffer.extend_from_slice(&vec![0x00; 248]);
908 assert_eq!(command_buffer.len(), 266);
909 }
910 DanteVersion::Dante4_2_1_3 => {
911 command_buffer.extend_from_slice(&[0x10, 0x01]);
912 assert_eq!(command_buffer.len(), 2);
913 command_buffer.extend_from_slice(&rx_channel_id.to_be_bytes());
914 assert_eq!(command_buffer.len(), 4);
915 command_buffer.extend_from_slice(&vec![0x00; 318]);
916 assert_eq!(command_buffer.len(), 322);
917 }
918 }
919
920 let port: u16 = 4440;
921
922 match Self::send_bytes_to_address(
923 rx_device_ip,
924 port,
925 &self.make_dante_command(version.get_commands().command_subscription, &command_buffer),
926 ) {
927 Ok(_) => Ok(()),
928 Err(_) => Err(MakeSubscriptionError::ConnectionFailed),
929 }
930 }
931
932 pub fn is_running(&self) -> bool {
934 *self.running.lock().unwrap()
935 }
936
937 pub fn stop_discovery(&self) {
939 *self.running.lock().unwrap() = false;
940 }
941
942 pub fn get_device_names(&self) -> Vec<String> {
944 self.device_list
945 .lock()
946 .unwrap()
947 .devices
948 .keys()
949 .map(|device| device.to_owned())
950 .collect()
951 }
952
953 pub fn get_device_descriptions(&self) -> Vec<String> {
955 let device_list = self.device_list.lock().unwrap();
956 let device_info_map = device_list.devices.iter().map(|(device, status)| {
957 (
958 device,
959 status,
960 device_list
961 .caches
962 .get(device)
963 .expect("Should have a cache for any given connected device."),
964 )
965 });
966 device_info_map.into_iter()
967 .map(|(device, status, cache)| {
968 let mut info = format!(
969 "{}:\ndbc status: {}\ncmc status: {}\narc status: {}\nchan status: {}\nid: {}\nmanufacturer: {}\nmodel: {}\nrouter_vers: {}\nrouter_info: {}\nARC port: {}\nIP: {}",
970 device,
971 match status.connected_dbc {
972 true => "Connected",
973 false => "Disconnected",
974 },
975 match status.connected_cmc {
976 true => "Connected",
977 false => "Disconnected",
978 },
979 match status.connected_arc {
980 true => "Connected",
981 false => "Disconnected",
982 },
983 match status.connected_chan {
984 true => "Connected",
985 false => "Disconnected",
986 },
987 match &cache.cmc_info {
988 Some(cmc_info) => {cmc_info.id.to_owned()}
989 None => "N/A".to_string()
990 },
991 match &cache.cmc_info {
992 Some(cmc_info) => {cmc_info.manufacturer.to_owned()}
993 None => "N/A".to_string()
994 },
995 match &cache.cmc_info {
996 Some(cmc_info) => {cmc_info.model.to_owned()}
997 None => "N/A".to_string()
998 },
999 match &cache.arc_info {
1000 Some(arc_info) => {arc_info.router_vers.to_owned()}
1001 None => "N/A".to_string()
1002 },
1003 match &cache.arc_info {
1004 Some(arc_info) => {arc_info.router_info.to_owned()}
1005 None => "N/A".to_string()
1006 },
1007 match &cache.arc_info {
1008 Some(arc_info) => {arc_info.port.to_string()}
1009 None => "N/A".to_string()
1010 },
1011 match &cache.arc_info {
1012 Some(arc_info) => {format!("{:?}", &arc_info.addresses)}
1013 None => "N/A".to_string()
1014 }
1015 );
1016 info += "\nChannels:";
1017 let mut chan_info_sorted: Vec<&CHANInfo> = cache.chan_info.iter().collect();
1018 chan_info_sorted.sort_by(|x, y| x.id.partial_cmp(&y.id).unwrap());
1019 for chan_info in chan_info_sorted {
1020 info += &format!("\n\"{}\"", chan_info.name);
1021 }
1022 info
1023 })
1024 .collect()
1025 }
1026
1027 pub fn new() -> Self {
1028 DanteDeviceManager {
1029 device_list: Arc::new(Mutex::new(DanteDeviceList::new())),
1030 running: Arc::new(Mutex::new(false)),
1031 current_command_sequence_id: 0,
1032 }
1033 }
1034}
1035
1036impl Default for DanteDeviceManager {
1037 fn default() -> Self {
1038 DanteDeviceManager::new()
1039 }
1040}
1041
1042fn print_mdns_with_address(addr: &str, poll_time: Duration) {
1044 info!("Starting discovery");
1045
1046 let mdns = ServiceDaemon::new().expect("Failed to create mdns service daemon!");
1047 let receiver = mdns
1048 .browse(addr)
1049 .unwrap_or_else(|_| panic!("Failed to browse for {}", addr));
1050
1051 let keep_polling = Arc::new(Mutex::new(true));
1052 let keep_polling_thread = keep_polling.clone();
1053
1054 let thread = std::thread::spawn(move || {
1055 debug!("Starting discovery thread");
1056 while *keep_polling_thread.lock().unwrap() {
1057 while let Ok(event) = receiver.try_recv() {
1058 match event {
1059 ServiceEvent::SearchStarted(service_name) => {
1060 println!("Search Started: {}", &service_name)
1061 }
1062 ServiceEvent::ServiceFound(service_name, host_service_name) => {
1063 println!("Search Found: {}, {}", &service_name, &host_service_name)
1064 }
1065 ServiceEvent::ServiceResolved(service_info) => {
1066 println!("Service Resolved: {:?}", &service_info);
1067 }
1068 ServiceEvent::ServiceRemoved(a, b) => {
1069 println!("Service Removed: {}, {}", &a, &b)
1070 }
1071 ServiceEvent::SearchStopped(a) => {
1072 println!("Search Stopped: {}", &a)
1073 }
1074 }
1075 }
1076 sleep(Duration::from_millis(100));
1077 }
1078 });
1079
1080 sleep(poll_time);
1081
1082 *keep_polling.lock().unwrap() = false;
1083
1084 thread.join().unwrap();
1085}
1086
1087pub fn print_cmc(poll_time: Duration) {
1089 print_mdns_with_address(CMC_SERVICE, poll_time);
1090}
1091
1092pub fn print_dbc(poll_time: Duration) {
1094 print_mdns_with_address(DBC_SERVICE, poll_time);
1095}
1096
1097pub fn print_arc(poll_time: Duration) {
1099 print_mdns_with_address(ARC_SERVICE, poll_time);
1100}
1101
1102pub fn print_chan(poll_time: Duration) {
1104 print_mdns_with_address(CHAN_SERVICE, poll_time);
1105}