dante_control_rs/
lib.rs

1use crate::DanteDeviceEncoding::{PCM16, PCM24, PCM32};
2use ascii::AsciiStr;
3use bytes::BytesMut;
4use log::{debug, error, info, warn};
5use mdns_sd::{ServiceDaemon, ServiceEvent};
6use std::collections::{HashMap, HashSet};
7use std::error::Error;
8use std::fmt::{Debug, Display, Formatter, Write};
9use std::hash::{Hash, Hasher};
10use std::net::{Ipv4Addr, UdpSocket};
11use std::sync::{Arc, Mutex};
12use std::thread::sleep;
13use std::time::Duration;
14
15const CMC_SERVICE: &'static str = "_netaudio-cmc._udp.local.";
16const DBC_SERVICE: &'static str = "_netaudio-dbc._udp.local.";
17const ARC_SERVICE: &'static str = "_netaudio-arc._udp.local.";
18const CHAN_SERVICE: &'static str = "_netaudio-chan._udp.local.";
19
20const DEVICE_CONTROL_PORT: u32 = 8800;
21const DEVICE_HEARTBEAT_PORT: u32 = 8708;
22const DEVICE_INFO_PORT: u32 = 8702;
23const DEVICE_INFO_SRC_PORT1: u32 = 1029;
24const DEVICE_INFO_SRC_PORT2: u32 = 1030;
25
26const DEVICE_SETTINGS_PORT: u32 = 8700;
27
28#[derive(Debug)]
29pub enum DanteVersion {
30    Dante4_4_1_3,
31    Dante4_2_1_3,
32}
33
34impl Display for DanteVersion {
35    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
36        f.write_fmt(format_args!(
37            "{}",
38            match self {
39                DanteVersion::Dante4_4_1_3 => "4.4.1.3",
40                DanteVersion::Dante4_2_1_3 => "4.2.1.3",
41            }
42        ))
43    }
44}
45
46impl DanteVersion {
47    fn get_commands(&self) -> DanteVersionCommands {
48        match self {
49            DanteVersion::Dante4_4_1_3 => DANTECOMMANDS_4_4_1_3,
50            DanteVersion::Dante4_2_1_3 => DANTECOMMANDS_4_2_1_3,
51        }
52    }
53
54    pub fn from_string(string: &str) -> Option<Self> {
55        match string {
56            "4.4.1.3" => Some(Self::Dante4_4_1_3),
57            "4.2.1.3" => Some(Self::Dante4_2_1_3),
58            _ => None,
59        }
60    }
61}
62
63struct DanteVersionCommands {
64    command_subscription: [u8; 2],
65}
66
67// Command IDs for different Dante Versions.
68const DANTECOMMANDS_4_4_1_3: DanteVersionCommands = DanteVersionCommands {
69    command_subscription: [0x34, 0x10],
70};
71const DANTECOMMANDS_4_2_1_3: DanteVersionCommands = DanteVersionCommands {
72    command_subscription: [0x30, 0x10],
73};
74
75// Still need to figure these out.
76/*
77const COMMAND_CHANNELCOUNT: [u8; 2] = 1000u16.to_be_bytes();
78const COMMAND_DEVICEINFO: [u8; 2] = 1003u16.to_be_bytes();
79const COMMAND_DEVICENAME: [u8; 2] = 1002u16.to_be_bytes();
80const COMMAND_RXCHANNELNAMES: [u8; 2] = 3000u16.to_be_bytes();
81const COMMAND_TXCHANNELNAMES: [u8; 2] = 2010u16.to_be_bytes();
82const COMMAND_SETRXCHANNELNAME: [u8; 2] = 12289u16.to_be_bytes();
83const COMMAND_SETTXCHANNELNAME: [u8; 2] = 8211u16.to_be_bytes();
84const COMMAND_SETDEVICENAME: [u8; 2] = 4097u16.to_be_bytes();
85 */
86
87#[derive(Clone)]
88enum DanteDeviceEncoding {
89    PCM16,
90    PCM24,
91    PCM32,
92}
93
94#[derive(Clone)]
95struct DBCInfo {
96    addresses: HashSet<Ipv4Addr>,
97    port: u16,
98}
99
100#[derive(Clone)]
101struct CMCInfo {
102    addresses: HashSet<Ipv4Addr>,
103    port: u16,
104    id: String,
105    manufacturer: String,
106    model: String,
107}
108
109#[derive(Clone)]
110struct ARCInfo {
111    addresses: HashSet<Ipv4Addr>,
112    port: u16,
113    router_vers: String,
114    router_info: String,
115}
116
117#[derive(Clone)]
118struct CHANInfo {
119    name: String,
120    id: Option<u16>,
121    sample_rate: Option<u32>,
122    encoding: Option<DanteDeviceEncoding>,
123    latency: Option<Duration>,
124}
125
126impl PartialEq<Self> for CHANInfo {
127    fn eq(&self, other: &Self) -> bool {
128        self.id == other.id
129    }
130}
131
132impl Eq for CHANInfo {}
133
134impl Hash for CHANInfo {
135    fn hash<H: Hasher>(&self, state: &mut H) {
136        self.id.hash(state);
137    }
138}
139
140#[derive(Debug)]
141struct DeviceAlreadyPresent {}
142
143impl Display for DeviceAlreadyPresent {
144    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145        write!(f, "Device not present.")
146    }
147}
148
149impl std::error::Error for DeviceAlreadyPresent {}
150
151#[derive(Debug)]
152struct DeviceNotPresent {}
153
154impl Display for DeviceNotPresent {
155    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
156        write!(f, "Device not present.")
157    }
158}
159
160impl std::error::Error for DeviceNotPresent {}
161
162#[derive(Debug)]
163struct DeviceStatus {
164    connected_dbc: bool,
165    connected_cmc: bool,
166    connected_arc: bool,
167    connected_chan: bool,
168}
169
170impl DeviceStatus {
171    fn new() -> Self {
172        DeviceStatus {
173            connected_dbc: false,
174            connected_cmc: false,
175            connected_arc: false,
176            connected_chan: false,
177        }
178    }
179}
180
181struct DeviceDiscoveryCache {
182    dbc_info: Option<DBCInfo>,
183    cmc_info: Option<CMCInfo>,
184    arc_info: Option<ARCInfo>,
185    chan_info: HashSet<CHANInfo>,
186}
187
188struct DanteDeviceList {
189    devices: HashMap<String, DeviceStatus>,
190    caches: HashMap<String, DeviceDiscoveryCache>,
191}
192
193impl DanteDeviceList {
194    /// Adds a new device to the list. Will return error when the device is already in the list.
195    fn add_device(&mut self, new_device_name: &str) -> Result<(), DeviceAlreadyPresent> {
196        if self.devices.contains_key(new_device_name) {
197            return Err(DeviceAlreadyPresent {});
198        }
199
200        self.devices
201            .insert(new_device_name.to_owned(), DeviceStatus::new());
202
203        // Create a cache for the device as well if there isn't already one.
204        if !self.caches.contains_key(new_device_name) {
205            self.caches.insert(
206                new_device_name.to_owned(),
207                DeviceDiscoveryCache {
208                    dbc_info: None,
209                    cmc_info: None,
210                    arc_info: None,
211                    chan_info: HashSet::new(),
212                },
213            );
214        }
215
216        Ok(())
217    }
218
219    fn try_add_device(&mut self, new_device_name: &str) {
220        // Explicitly throw away error. If we already had one, Ok. If we make one, also Ok.
221        let _ = self.add_device(new_device_name);
222    }
223
224    fn device_connected(&self, device_name: &str) -> bool {
225        self.devices.contains_key(device_name)
226    }
227
228    fn channel_id_exist(&self, device_name: &str, chan_id: u16) -> bool {
229        if !(self.device_connected(device_name)) {
230            return false;
231        }
232        match self.caches.get(device_name) {
233            Some(cache) => cache.chan_info.iter().any(|chan_info| match chan_info.id {
234                Some(chan_info_id) => chan_info_id == chan_id,
235                None => false,
236            }),
237            None => {
238                error!("Cache doesn't exist despite device being connected!");
239                false
240            }
241        }
242    }
243
244    fn get_channel_name_from_id(&self, device_name: &str, chan_id: u16) -> Option<&str> {
245        if !(self.device_connected(device_name)) {
246            return None;
247        }
248
249        match self.caches.get(device_name) {
250            Some(cache) => match cache.chan_info.iter().find(|chan_info| match chan_info.id {
251                Some(chan_info_id) => chan_info_id == chan_id,
252                None => false,
253            }) {
254                Some(chan) => Some(&chan.name),
255                None => None,
256            },
257            None => {
258                error!("Cache doesn't exist despite device being connected!");
259                None
260            }
261        }
262    }
263
264    fn get_device_ips(&self, device_name: &str) -> Option<HashSet<Ipv4Addr>> {
265        if !(self.device_connected(device_name)) {
266            return None;
267        }
268
269        let mut device_ips: HashSet<Ipv4Addr> = HashSet::new();
270
271        match self.caches.get(device_name) {
272            None => return None,
273            Some(caches) => {
274                if let Some(arc_info) = &caches.arc_info {
275                    device_ips.extend(&arc_info.addresses);
276                }
277                if let Some(dbc_info) = &caches.dbc_info {
278                    device_ips.extend(&dbc_info.addresses);
279                }
280                if let Some(cmc_info) = &caches.cmc_info {
281                    device_ips.extend(&cmc_info.addresses);
282                }
283            }
284        }
285
286        Some(device_ips)
287    }
288
289    /// Updates the dbc info of device in the list with a specific name.
290    fn update_dbc(&mut self, device_name: &str, info: DBCInfo) {
291        self.caches
292            .get_mut(device_name)
293            .expect("Tried updating cache of device that doesn't exist")
294            .dbc_info = Some(info);
295        debug!("update_dbc for {}", device_name);
296    }
297
298    /// Updates the cmc info of device in the list with a specific name.
299    fn update_cmc(&mut self, device_name: &str, info: CMCInfo) {
300        self.caches
301            .get_mut(device_name)
302            .expect("Tried updating cache of device that doesn't exist")
303            .cmc_info = Some(info);
304        debug!("update_cmc for {}", device_name);
305    }
306
307    /// Updates the arc info of device in the list with a specific name.
308    fn update_arc(&mut self, device_name: &str, info: ARCInfo) {
309        self.caches
310            .get_mut(device_name)
311            .expect("Tried updating cache of device that doesn't exist")
312            .arc_info = Some(info);
313        debug!("update_arc for {}", device_name);
314    }
315
316    /// Updates the cmc info of device in the list with a specific name.
317    fn update_chan(&mut self, device_name: &str, info: CHANInfo) {
318        self.caches
319            .get_mut(device_name)
320            .expect("Tried updating cache of device that doesn't exist")
321            .chan_info
322            .replace(info);
323        debug!("update_chan for {}", device_name);
324    }
325
326    fn connect_dbc(&mut self, device_name: &str) {
327        self.try_add_device(device_name);
328        self.devices
329            .get_mut(device_name)
330            .expect("Just tried to add device, should be able to get it")
331            .connected_dbc = true;
332        debug!("Connected to dbc discovery.");
333    }
334
335    fn connect_cmc(&mut self, device_name: &str) {
336        self.try_add_device(device_name);
337        self.devices
338            .get_mut(device_name)
339            .expect("Just tried to add device, should be able to get it")
340            .connected_cmc = true;
341        debug!("Connected to cmc discovery.");
342    }
343
344    fn connect_arc(&mut self, device_name: &str) {
345        self.try_add_device(device_name);
346        self.devices
347            .get_mut(device_name)
348            .expect("Just tried to add device, should be able to get it")
349            .connected_arc = true;
350        debug!("Connected to arc discovery.");
351    }
352
353    fn connect_chan(&mut self, device_name: &str) {
354        self.try_add_device(device_name);
355        self.devices
356            .get_mut(device_name)
357            .expect("Just tried to add device, should be able to get it")
358            .connected_chan = true;
359        debug!("Connected to chan discovery.");
360    }
361
362    fn disconnect_dbc(&mut self, device_name: &str) {
363        self.devices
364            .get_mut(device_name)
365            .expect("If we're calling disconnect, we should still have the device in the list.")
366            .connected_dbc = false;
367        self.check_remove(device_name)
368            .expect("If we're calling disconnect, we should still have the device in the list.");
369        debug!("Disconnected from dbc discovery");
370    }
371
372    fn disconnect_cmc(&mut self, device_name: &str) {
373        self.devices
374            .get_mut(device_name)
375            .expect("If we're calling disconnect, we should still have the device in the list.")
376            .connected_cmc = false;
377        self.check_remove(device_name)
378            .expect("If we're calling disconnect, we should still have the device in the list.");
379        debug!("Disconnected from cmc discovery");
380    }
381
382    fn disconnect_arc(&mut self, device_name: &str) {
383        self.devices
384            .get_mut(device_name)
385            .expect("If we're calling disconnect, we should still have the device in the list.")
386            .connected_arc = false;
387        self.check_remove(device_name)
388            .expect("If we're calling disconnect, we should still have the device in the list.");
389        debug!("Disconnected from arc discovery");
390    }
391
392    fn disconnect_chan(&mut self, device_name: &str) {
393        self.devices
394            .get_mut(device_name)
395            .expect("If we're calling disconnect, we should still have the device in the list.")
396            .connected_chan = false;
397        self.check_remove(device_name)
398            .expect("If we're calling disconnect, we should still have the device in the list.");
399        debug!("Disconnected from chan discovery");
400    }
401
402    /// Checks if a device should be removed (all the discovery types have been removed), and deletes if if that's the case.
403    /// Errors when the device name isn't a device in the list.
404    fn check_remove(&mut self, device_name: &str) -> Result<(), DeviceNotPresent> {
405        match self.devices.get(device_name) {
406            Some(device_status) => {
407                if !(device_status.connected_dbc
408                    || device_status.connected_cmc
409                    || device_status.connected_arc
410                    || device_status.connected_chan)
411                {
412                    self.devices.remove(device_name);
413                }
414
415                Ok(())
416            }
417            None => Err(DeviceNotPresent {}),
418        }
419    }
420
421    fn new() -> Self {
422        DanteDeviceList {
423            devices: HashMap::new(),
424            caches: HashMap::new(),
425        }
426    }
427}
428
429/// Cutoff the address from a hostname. Address default is "local."
430fn cutoff_address<'a>(hostname: &'a str, address: Option<&'a str>) -> &'a str {
431    let cutoff_string = ".".to_string() + address.unwrap_or("local.");
432    match hostname.strip_suffix(&cutoff_string) {
433        None => {
434            warn!(
435                "Device \"{}\" doesn't end with \"{}\". This is abnormal.",
436                hostname, cutoff_string
437            );
438            hostname
439        }
440        Some(stripped) => stripped,
441    }
442}
443
444#[derive(thiserror::Error, Debug)]
445pub enum MakeSubscriptionError {
446    #[error("error sending udp packet")]
447    ConnectionFailed,
448}
449#[derive(thiserror::Error, Debug)]
450pub enum ClearSubscriptionError {
451    #[error("error sending udp packet")]
452    ConnectionFailed,
453}
454
455/// A Dante Device Manager stores information related to interacting with dante devices. Right now, it stores mdns information found from start_discovery() and a sequence ID. Currently, the control of dante devices is separate from the discovery of them. I found that for some devices on the network, mdns discovery can be slow or not happen at all, so I switched to using direct ip addresses and channel numbers/names (essentially exactly the information that is needed to send the udp packet to make the connection). In the case of make_subscription() and clear_subscription(), the only state changed by DanteDeviceManager is a sequence ID, which is an incrementing 16-bit integer, though whether this is really needed is suspect.
456pub struct DanteDeviceManager {
457    device_list: Arc<Mutex<DanteDeviceList>>,
458    running: Arc<Mutex<bool>>,
459    current_command_sequence_id: u16,
460}
461
462impl DanteDeviceManager {
463    /// Spawns the discovery service in a separate thread. Call stop_discovery() to end it.
464    pub fn start_discovery(&self) -> Result<(), Box<dyn std::error::Error>> {
465        info!("Starting discovery");
466        *self.running.lock().unwrap() = true;
467
468        // Spawn threads equal to the number of different addresses we are discovering on.
469        let mdns = ServiceDaemon::new().expect("Failed to create mdns service daemon!");
470
471        // Discovery for DBC
472        let dbc_receiver = mdns
473            .browse(DBC_SERVICE)
474            .unwrap_or_else(|_| panic!("Failed to browse for {}", DBC_SERVICE));
475
476        // Fresh Arcs to move into thread.
477        let device_list_dbc = self.device_list.clone();
478        let running_dbc = self.running.clone();
479
480        let dbc_thread = std::thread::spawn(move || {
481            debug!("Starting discovery thread");
482            while *running_dbc.lock().unwrap() {
483                while let Ok(event) = dbc_receiver.try_recv() {
484                    match event {
485                        ServiceEvent::SearchStarted(service_type) => {
486                            debug!("DBC Search Started: {}", &service_type);
487                        }
488                        ServiceEvent::ServiceFound(service_type, fullname) => {
489                            debug!("DBC Search Found: {}, {}", &service_type, &fullname);
490                            let device_name = cutoff_address(&fullname, Some(DBC_SERVICE));
491
492                            let mut device_list_lock = device_list_dbc
493                                .lock()
494                                .expect("Cannot get mutex lock of DanteDevices");
495
496                            device_list_lock.connect_dbc(device_name);
497                        }
498                        ServiceEvent::ServiceResolved(service_info) => {
499                            info!("DBC Service Resolved: {:?}", &service_info);
500                            let device_name =
501                                cutoff_address(service_info.get_fullname(), Some(DBC_SERVICE));
502                            let mut device_list_lock = device_list_dbc
503                                .lock()
504                                .expect("Cannot get mutex lock of DanteDevices");
505                            device_list_lock.update_dbc(
506                                device_name,
507                                DBCInfo {
508                                    addresses: service_info.get_addresses().to_owned(),
509                                    port: service_info.get_port().to_owned(),
510                                },
511                            );
512                        }
513                        ServiceEvent::ServiceRemoved(service_type, fullname) => {
514                            info!("DBC Service Removed: a:{}, b:{}", &service_type, &fullname);
515                            let mut device_list_lock = device_list_dbc.lock().unwrap();
516                            device_list_lock
517                                .disconnect_dbc(cutoff_address(&fullname, Some(DBC_SERVICE)));
518                        }
519                        ServiceEvent::SearchStopped(service_type) => {
520                            error!("DBC Search Stopped: {}", &service_type);
521                        }
522                    }
523                }
524                sleep(Duration::from_millis(100));
525            }
526        });
527
528        // Discovery for CMC
529        let cmc_receiver = mdns
530            .browse(CMC_SERVICE)
531            .unwrap_or_else(|_| panic!("Failed to browse for {}", CMC_SERVICE));
532
533        // Fresh Arcs to move into thread.
534        let device_list_cmc = self.device_list.clone();
535        let running_cmc = self.running.clone();
536
537        let cmc_thread = std::thread::spawn(move || {
538            debug!("Starting discovery thread");
539            while *running_cmc.lock().unwrap() {
540                while let Ok(event) = cmc_receiver.try_recv() {
541                    match event {
542                        ServiceEvent::SearchStarted(service_type) => {
543                            debug!("CMC Search Started: {}", &service_type);
544                        }
545                        ServiceEvent::ServiceFound(service_type, fullname) => {
546                            debug!("CMC Search Found: {}, {}", &service_type, &fullname);
547                            let device_name = cutoff_address(&fullname, Some(CMC_SERVICE));
548
549                            let mut device_list_lock = device_list_cmc
550                                .lock()
551                                .expect("Cannot get mutex lock of DanteDevices");
552
553                            device_list_lock.connect_cmc(device_name);
554                        }
555                        ServiceEvent::ServiceResolved(service_info) => {
556                            info!("CMC Service Resolved: {:?}", &service_info);
557                            let device_name =
558                                cutoff_address(service_info.get_fullname(), Some(CMC_SERVICE));
559                            let mut device_list_lock = device_list_cmc
560                                .lock()
561                                .expect("Cannot get mutex lock of DanteDevices");
562                            device_list_lock.update_cmc(
563                                device_name,
564                                CMCInfo {
565                                    addresses: service_info.get_addresses().to_owned(),
566                                    port: service_info.get_port().to_owned(),
567                                    id: match service_info.get_property("id") {
568                                        Some(id_property) => id_property.val_str().to_owned(),
569                                        None => "N/A".to_string(),
570                                    },
571                                    manufacturer: match service_info.get_property("mf") {
572                                        Some(mf_property) => mf_property.val_str().to_owned(),
573                                        None => "N/A".to_string(),
574                                    },
575                                    model: match service_info.get_property("model") {
576                                        Some(model_property) => model_property.val_str().to_owned(),
577                                        None => "N/A".to_string(),
578                                    },
579                                },
580                            );
581                        }
582                        ServiceEvent::ServiceRemoved(service_type, fullname) => {
583                            info!("CMC Service Removed: a:{}, b:{}", &service_type, &fullname);
584                            let mut device_list_lock = device_list_cmc.lock().unwrap();
585                            device_list_lock
586                                .disconnect_cmc(cutoff_address(&fullname, Some(CMC_SERVICE)));
587                        }
588                        ServiceEvent::SearchStopped(service_type) => {
589                            error!("CMC Search Stopped: {}", &service_type);
590                        }
591                    }
592                }
593                sleep(Duration::from_millis(100));
594            }
595        });
596
597        // Discovery for ARC
598        let arc_receiver = mdns
599            .browse(ARC_SERVICE)
600            .unwrap_or_else(|_| panic!("Failed to browse for {}", ARC_SERVICE));
601
602        // Fresh Arcs to move into thread.
603        let device_list_arc = self.device_list.clone();
604        let running_arc = self.running.clone();
605
606        let arc_thread = std::thread::spawn(move || {
607            debug!("Starting discovery thread");
608            while *running_arc.lock().unwrap() {
609                while let Ok(event) = arc_receiver.try_recv() {
610                    match event {
611                        ServiceEvent::SearchStarted(service_type) => {
612                            debug!("ARC Search Started: {}", &service_type);
613                        }
614                        ServiceEvent::ServiceFound(service_type, fullname) => {
615                            debug!("ARC Search Found: {}, {}", &service_type, &fullname);
616                            let device_name = cutoff_address(&fullname, Some(ARC_SERVICE));
617
618                            let mut device_list_lock = device_list_arc
619                                .lock()
620                                .expect("Cannot get mutex lock of DanteDevices");
621
622                            device_list_lock.connect_arc(device_name);
623                        }
624                        ServiceEvent::ServiceResolved(service_info) => {
625                            info!("ARC Service Resolved: {:?}", &service_info);
626                            let device_name =
627                                cutoff_address(service_info.get_fullname(), Some(ARC_SERVICE));
628                            let mut device_list_lock = device_list_arc
629                                .lock()
630                                .expect("Cannot get mutex lock of DanteDevices");
631                            device_list_lock.update_arc(
632                                device_name,
633                                ARCInfo {
634                                    addresses: service_info.get_addresses().to_owned(),
635                                    port: service_info.get_port().to_owned(),
636                                    router_vers: match service_info.get_property("router_vers") {
637                                        Some(router_vers_property) => {
638                                            router_vers_property.val_str().to_owned()
639                                        }
640                                        None => "N/A".to_string(),
641                                    },
642                                    router_info: match service_info.get_property("router_info") {
643                                        Some(router_info_property) => {
644                                            router_info_property.val_str().to_owned()
645                                        }
646                                        None => "N/A".to_string(),
647                                    },
648                                },
649                            );
650                        }
651                        ServiceEvent::ServiceRemoved(service_type, fullname) => {
652                            info!("ARC Service Removed: a:{}, b:{}", &service_type, &fullname);
653                            let mut device_list_lock = device_list_arc.lock().unwrap();
654                            device_list_lock
655                                .disconnect_arc(cutoff_address(&fullname, Some(ARC_SERVICE)));
656                        }
657                        ServiceEvent::SearchStopped(service_type) => {
658                            error!("ARC Search Stopped: {}", &service_type);
659                        }
660                    }
661                }
662                sleep(Duration::from_millis(100));
663            }
664        });
665
666        // Discovery for CHAN
667        let chan_receiver = mdns
668            .browse(CHAN_SERVICE)
669            .unwrap_or_else(|_| panic!("Failed to browse for {}", CHAN_SERVICE));
670
671        // Fresh Arcs to move into thread.
672        let device_list_chan = self.device_list.clone();
673        let running_chan = self.running.clone();
674
675        let chan_thread = std::thread::spawn(move || {
676            debug!("Starting discovery thread");
677            while *running_chan.lock().unwrap() {
678                while let Ok(event) = chan_receiver.try_recv() {
679                    match event {
680                        ServiceEvent::SearchStarted(service_type) => {
681                            debug!("CHAN Search Started: {}", &service_type);
682                        }
683                        ServiceEvent::ServiceFound(service_type, fullname) => {
684                            debug!("CHAN Search Found: {}, {}", &service_type, &fullname);
685                            let (chan_name, full_name) = fullname
686                                .split_once("@")
687                                .expect("CHAN fullname without \"@\" unexpected.");
688                            let device_name = cutoff_address(full_name, Some(CHAN_SERVICE));
689
690                            let mut device_list_lock = device_list_chan
691                                .lock()
692                                .expect("Cannot get mutex lock of DanteDevices");
693
694                            device_list_lock.connect_chan(device_name);
695                        }
696                        ServiceEvent::ServiceResolved(service_info) => {
697                            info!("CHAN Service Resolved: {:?}", &service_info);
698                            let (chan_name, full_name) = service_info
699                                .get_fullname()
700                                .split_once("@")
701                                .expect("CHAN fullname without \"@\" unexpected.");
702                            let device_name = cutoff_address(full_name, Some(CHAN_SERVICE));
703                            let mut device_list_lock = device_list_chan
704                                .lock()
705                                .expect("Cannot get mutex lock of DanteDevices");
706                            device_list_lock.update_chan(
707                                device_name,
708                                CHANInfo {
709                                    name: chan_name.to_owned(),
710                                    id: service_info.get_property("id").map(|id_property| {
711                                        id_property
712                                            .val_str()
713                                            .to_owned()
714                                            .parse()
715                                            .expect("Couldn't parse chan service id")
716                                    }),
717                                    sample_rate: match service_info.get_property("rate") {
718                                        Some(rate_property) => rate_property.val_str().parse().ok(),
719                                        None => None,
720                                    },
721                                    encoding: match service_info.get_property("en") {
722                                        Some(encoding_property) => {
723                                            match encoding_property.val_str() {
724                                                "16" => Some(PCM16),
725                                                "24" => Some(PCM24),
726                                                "32" => Some(PCM32),
727                                                &_ => None,
728                                            }
729                                        }
730                                        None => None,
731                                    },
732                                    latency: match service_info.get_property("latency_ns") {
733                                        Some(latency_property) => latency_property
734                                            .val_str()
735                                            .parse()
736                                            .ok()
737                                            .map(Duration::from_nanos),
738                                        None => None,
739                                    },
740                                },
741                            );
742                        }
743                        ServiceEvent::ServiceRemoved(service_type, fullname) => {
744                            info!("CHAN Service Removed: a:{}, b:{}", &service_type, &fullname);
745                            let (chan_name, full_name) = fullname
746                                .split_once("@")
747                                .expect("CHAN fullname without \"@\" unexpected.");
748                            let device_name = cutoff_address(full_name, Some(CHAN_SERVICE));
749
750                            let mut device_list_lock = device_list_chan.lock().unwrap();
751                            device_list_lock.disconnect_chan(device_name);
752                        }
753                        ServiceEvent::SearchStopped(service_type) => {
754                            error!("CHAN Search Stopped: {}", &service_type);
755                        }
756                    }
757                }
758                sleep(Duration::from_millis(100));
759            }
760        });
761
762        Ok(())
763    }
764
765    fn get_new_command_sequence_id(&mut self) -> u16 {
766        let return_id = self.current_command_sequence_id;
767        self.current_command_sequence_id += 1;
768        return_id
769    }
770
771    fn make_dante_command(&mut self, command: [u8; 2], command_args: &[u8]) -> BytesMut {
772        let mut buffer = bytes::BytesMut::new();
773        buffer.extend_from_slice(&[0x28, 0x30]);
774        assert_eq!(buffer.len(), 2);
775        buffer.extend_from_slice(&((command_args.len() + 10) as u16).to_be_bytes());
776        assert_eq!(buffer.len(), 4);
777        buffer.extend_from_slice(&self.get_new_command_sequence_id().to_be_bytes());
778        assert_eq!(buffer.len(), 6);
779        buffer.extend(command);
780        assert_eq!(buffer.len(), 8);
781        buffer.extend_from_slice(&[0x00, 0x00]);
782        assert_eq!(buffer.len(), 10);
783        buffer.extend_from_slice(&command_args);
784        buffer
785    }
786
787    fn send_bytes_to_addresses(
788        addresses: &HashSet<Ipv4Addr>,
789        port: u16,
790        bytes: &[u8],
791    ) -> Result<(), Box<dyn Error>> {
792        let socket = UdpSocket::bind("0.0.0.0:0")?;
793        for address in addresses {
794            debug!(
795                "Sent bytes {:?} to {}:{}",
796                hex::encode(bytes),
797                address,
798                port
799            );
800            socket.send_to(bytes, (*address, port))?;
801        }
802        Ok(())
803    }
804
805    fn send_bytes_to_address(
806        address: &Ipv4Addr,
807        port: u16,
808        bytes: &[u8],
809    ) -> Result<(), Box<dyn Error>> {
810        let socket = UdpSocket::bind("0.0.0.0:0")?;
811
812        debug!(
813            "Sent bytes {:?} to {}:{}",
814            hex::encode(bytes),
815            address,
816            port
817        );
818        socket.send_to(bytes, (*address, port))?;
819
820        Ok(())
821    }
822
823    /// Makes a dante subscription on a device. Dante subscriptions are "stored" on the receiver side, where a transmitter device name and transmitter device channel name are associated with a specific channel number on the receiver side. The arguments for this function are exactly the arguments needed to construct the udp packet. Also, there is no need to start_discovery() beforehand, the two functionalities are separate.
824    pub fn make_subscription(
825        &mut self,
826        version: &DanteVersion,
827        rx_device_ip: &Ipv4Addr,
828        rx_channel_id: u16,
829        tx_device: &AsciiStr,
830        tx_channel: &AsciiStr,
831    ) -> Result<(), MakeSubscriptionError> {
832        let tx_device_name_buffer = tx_device.as_bytes();
833        let tx_channel_name_buffer = tx_channel.as_bytes();
834
835        let port: u16 = 4440;
836
837        let mut command_buffer = BytesMut::new();
838
839        match version {
840            DanteVersion::Dante4_4_1_3 => {
841                command_buffer.extend_from_slice(&[
842                    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x00, 0x20, 0x01,
843                ]);
844                assert_eq!(command_buffer.len(), 10);
845                command_buffer.extend_from_slice(&rx_channel_id.to_be_bytes());
846                assert_eq!(command_buffer.len(), 12);
847                command_buffer.extend_from_slice(&[0x00, 0x03, 0x01, 0x14]);
848                assert_eq!(command_buffer.len(), 16);
849                let end_pos: u16 = (276 + tx_channel_name_buffer.len() + 1) as u16;
850                command_buffer.extend_from_slice(&end_pos.to_be_bytes());
851                assert_eq!(command_buffer.len(), 18);
852                command_buffer.extend_from_slice(&vec![0x00; 248]);
853                assert_eq!(command_buffer.len(), 266);
854                command_buffer.extend_from_slice(tx_channel_name_buffer);
855                command_buffer.extend_from_slice(&[0x00]);
856                command_buffer.extend_from_slice(tx_device_name_buffer);
857                command_buffer.extend_from_slice(&[0x00]);
858            }
859            DanteVersion::Dante4_2_1_3 => {
860                command_buffer.extend_from_slice(&[0x10, 0x01]);
861                assert_eq!(command_buffer.len(), 2);
862                command_buffer.extend_from_slice(&rx_channel_id.to_be_bytes());
863                assert_eq!(command_buffer.len(), 4);
864                command_buffer.extend_from_slice(&[0x01, 0x4C]);
865                assert_eq!(command_buffer.len(), 6);
866                let end_pos: u16 = (332 + tx_channel_name_buffer.len() + 1) as u16;
867                command_buffer.extend_from_slice(&end_pos.to_be_bytes());
868                assert_eq!(command_buffer.len(), 8);
869                command_buffer.extend_from_slice(&vec![0x00; 314]);
870                assert_eq!(command_buffer.len(), 322);
871                command_buffer.extend_from_slice(tx_channel_name_buffer);
872                command_buffer.extend_from_slice(&[0x00]);
873                command_buffer.extend_from_slice(tx_device_name_buffer);
874                command_buffer.extend_from_slice(&[0x00]);
875            }
876        }
877
878        match Self::send_bytes_to_address(
879            rx_device_ip,
880            port,
881            &self.make_dante_command(version.get_commands().command_subscription, &command_buffer),
882        ) {
883            Ok(_) => Ok(()),
884            Err(_) => Err(MakeSubscriptionError::ConnectionFailed),
885        }
886    }
887
888    /// Clears a dante device subscription. Essentially the same as make_subscription except with an empty transmitter name and transmitter channel name.
889    pub fn clear_subscription(
890        &mut self,
891        version: &DanteVersion,
892        rx_device_ip: &Ipv4Addr,
893        rx_channel_id: u16,
894    ) -> Result<(), MakeSubscriptionError> {
895        let mut command_buffer = BytesMut::new();
896
897        match version {
898            DanteVersion::Dante4_4_1_3 => {
899                command_buffer.extend_from_slice(&[
900                    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x00, 0x20, 0x01,
901                ]);
902                assert_eq!(command_buffer.len(), 10);
903                command_buffer.extend_from_slice(&rx_channel_id.to_be_bytes());
904                assert_eq!(command_buffer.len(), 12);
905                command_buffer.extend_from_slice(&[0x00, 0x03, 0x00, 0x00, 0x00, 0x00]);
906                assert_eq!(command_buffer.len(), 18);
907                command_buffer.extend_from_slice(&vec![0x00; 248]);
908                assert_eq!(command_buffer.len(), 266);
909            }
910            DanteVersion::Dante4_2_1_3 => {
911                command_buffer.extend_from_slice(&[0x10, 0x01]);
912                assert_eq!(command_buffer.len(), 2);
913                command_buffer.extend_from_slice(&rx_channel_id.to_be_bytes());
914                assert_eq!(command_buffer.len(), 4);
915                command_buffer.extend_from_slice(&vec![0x00; 318]);
916                assert_eq!(command_buffer.len(), 322);
917            }
918        }
919
920        let port: u16 = 4440;
921
922        match Self::send_bytes_to_address(
923            rx_device_ip,
924            port,
925            &self.make_dante_command(version.get_commands().command_subscription, &command_buffer),
926        ) {
927            Ok(_) => Ok(()),
928            Err(_) => Err(MakeSubscriptionError::ConnectionFailed),
929        }
930    }
931
932    /// Returns whether dante mdns discovery is running
933    pub fn is_running(&self) -> bool {
934        *self.running.lock().unwrap()
935    }
936
937    /// Stops mdns discovery
938    pub fn stop_discovery(&self) {
939        *self.running.lock().unwrap() = false;
940    }
941
942    /// Returns a list of all the mdns dante device names that were found on the network.
943    pub fn get_device_names(&self) -> Vec<String> {
944        self.device_list
945            .lock()
946            .unwrap()
947            .devices
948            .keys()
949            .map(|device| device.to_owned())
950            .collect()
951    }
952
953    /// Returns a list descriptions of all the mdns dante device names that were found on the network.
954    pub fn get_device_descriptions(&self) -> Vec<String> {
955        let device_list = self.device_list.lock().unwrap();
956        let device_info_map = device_list.devices.iter().map(|(device, status)| {
957            (
958                device,
959                status,
960                device_list
961                    .caches
962                    .get(device)
963                    .expect("Should have a cache for any given connected device."),
964            )
965        });
966        device_info_map.into_iter()
967            .map(|(device, status, cache)| {
968                let mut info = format!(
969                    "{}:\ndbc status: {}\ncmc status: {}\narc status: {}\nchan status: {}\nid: {}\nmanufacturer: {}\nmodel: {}\nrouter_vers: {}\nrouter_info: {}\nARC port: {}\nIP: {}",
970                    device,
971                    match status.connected_dbc {
972                        true => "Connected",
973                        false => "Disconnected",
974                    },
975                    match status.connected_cmc {
976                        true => "Connected",
977                        false => "Disconnected",
978                    },
979                    match status.connected_arc {
980                        true => "Connected",
981                        false => "Disconnected",
982                    },
983                    match status.connected_chan {
984                        true => "Connected",
985                        false => "Disconnected",
986                    },
987                    match &cache.cmc_info {
988                        Some(cmc_info) => {cmc_info.id.to_owned()}
989                        None => "N/A".to_string()
990                    },
991                    match &cache.cmc_info {
992                        Some(cmc_info) => {cmc_info.manufacturer.to_owned()}
993                        None => "N/A".to_string()
994                    },
995                    match &cache.cmc_info {
996                        Some(cmc_info) => {cmc_info.model.to_owned()}
997                        None => "N/A".to_string()
998                    },
999                    match &cache.arc_info {
1000                        Some(arc_info) => {arc_info.router_vers.to_owned()}
1001                        None => "N/A".to_string()
1002                    },
1003                    match &cache.arc_info {
1004                        Some(arc_info) => {arc_info.router_info.to_owned()}
1005                        None => "N/A".to_string()
1006                    },
1007                    match &cache.arc_info {
1008                        Some(arc_info) => {arc_info.port.to_string()}
1009                        None => "N/A".to_string()
1010                    },
1011                    match &cache.arc_info {
1012                        Some(arc_info) => {format!("{:?}", &arc_info.addresses)}
1013                        None => "N/A".to_string()
1014                    }
1015                );
1016                info += "\nChannels:";
1017                let mut chan_info_sorted: Vec<&CHANInfo> = cache.chan_info.iter().collect();
1018                chan_info_sorted.sort_by(|x, y| x.id.partial_cmp(&y.id).unwrap());
1019                for chan_info in chan_info_sorted {
1020                    info += &format!("\n\"{}\"", chan_info.name);
1021                }
1022                info
1023            })
1024            .collect()
1025    }
1026
1027    pub fn new() -> Self {
1028        DanteDeviceManager {
1029            device_list: Arc::new(Mutex::new(DanteDeviceList::new())),
1030            running: Arc::new(Mutex::new(false)),
1031            current_command_sequence_id: 0,
1032        }
1033    }
1034}
1035
1036impl Default for DanteDeviceManager {
1037    fn default() -> Self {
1038        DanteDeviceManager::new()
1039    }
1040}
1041
1042/// Print raw data received from mDNS discovery requests at addr.
1043fn print_mdns_with_address(addr: &str, poll_time: Duration) {
1044    info!("Starting discovery");
1045
1046    let mdns = ServiceDaemon::new().expect("Failed to create mdns service daemon!");
1047    let receiver = mdns
1048        .browse(addr)
1049        .unwrap_or_else(|_| panic!("Failed to browse for {}", addr));
1050
1051    let keep_polling = Arc::new(Mutex::new(true));
1052    let keep_polling_thread = keep_polling.clone();
1053
1054    let thread = std::thread::spawn(move || {
1055        debug!("Starting discovery thread");
1056        while *keep_polling_thread.lock().unwrap() {
1057            while let Ok(event) = receiver.try_recv() {
1058                match event {
1059                    ServiceEvent::SearchStarted(service_name) => {
1060                        println!("Search Started: {}", &service_name)
1061                    }
1062                    ServiceEvent::ServiceFound(service_name, host_service_name) => {
1063                        println!("Search Found: {}, {}", &service_name, &host_service_name)
1064                    }
1065                    ServiceEvent::ServiceResolved(service_info) => {
1066                        println!("Service Resolved: {:?}", &service_info);
1067                    }
1068                    ServiceEvent::ServiceRemoved(a, b) => {
1069                        println!("Service Removed: {}, {}", &a, &b)
1070                    }
1071                    ServiceEvent::SearchStopped(a) => {
1072                        println!("Search Stopped: {}", &a)
1073                    }
1074                }
1075            }
1076            sleep(Duration::from_millis(100));
1077        }
1078    });
1079
1080    sleep(poll_time);
1081
1082    *keep_polling.lock().unwrap() = false;
1083
1084    thread.join().unwrap();
1085}
1086
1087/// Print raw data received from mDNS discovery requests to the "_netaudio-cmc._udp.local." address.
1088pub fn print_cmc(poll_time: Duration) {
1089    print_mdns_with_address(CMC_SERVICE, poll_time);
1090}
1091
1092/// Print raw data received from mDNS discovery requests to the "_netaudio-dbc._udp.local." address.
1093pub fn print_dbc(poll_time: Duration) {
1094    print_mdns_with_address(DBC_SERVICE, poll_time);
1095}
1096
1097/// Print raw data received from mDNS discovery requests to the "_netaudio-arc._udp.local." address.
1098pub fn print_arc(poll_time: Duration) {
1099    print_mdns_with_address(ARC_SERVICE, poll_time);
1100}
1101
1102/// Print raw data received from mDNS discovery requests to the "_netaudio-chan._udp.local." address.
1103pub fn print_chan(poll_time: Duration) {
1104    print_mdns_with_address(CHAN_SERVICE, poll_time);
1105}