minidsp/
builder.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
//! Device discovery integrated as the builder pattern

use std::{collections::HashMap, net::ToSocketAddrs, path::PathBuf, pin::Pin, sync::Arc};

use futures::{Stream, StreamExt};
use minidsp_protocol::{
    device::{self, Device},
    DeviceInfo,
};
use tokio::sync::Mutex;
use url2::Url2;

#[cfg(feature = "hid")]
use crate::transport::hid;
use crate::{
    client::Client,
    device::DeviceKind,
    transport::{self, Hub, Multiplexer, Openable},
    utils::decoder::Decoder,
    MiniDSP, MiniDSPError,
};

/// Discovers, probes and instantiate device instances.
///
/// - Configure what devices to probe using the `with_` methods
/// - Consume builder into a vec of transports using [`Builder::probe`]
/// - Get an instance of [`MiniDSP`] using [`DeviceHandle::to_minidsp`]
#[derive(Default)]
pub struct Builder {
    /// The candidate device pool, devices get added when their helper methods
    /// detect matching ones. (url -> openable)
    candidate_devices: HashMap<String, Box<dyn Openable>>,
    options: DeviceOptions,
}

#[derive(Default, Clone)]
pub struct DeviceOptions {
    log_filename: Option<PathBuf>,
    log_console: Option<u8>,
    kind: Option<DeviceKind>,
}

/// TODO: Copied from minidspd

pub struct DeviceHandle {
    pub url: String,

    // Frame-level multiplexer
    pub transport: Hub,

    // Probed hardware id and dsp version
    pub device_info: DeviceInfo,

    // Device spec structure indicating the address of every component
    pub device_spec: &'static Device,
}

impl DeviceHandle {
    pub fn to_minidsp(&self) -> Option<MiniDSP<'static>> {
        let transport = self.transport.try_clone()?;
        let multiplexer = Multiplexer::from_transport(transport);
        let client = Client::new(Arc::new(Mutex::new(multiplexer.to_service())));
        let dsp = MiniDSP::from_client(client, self.device_spec, self.device_info);
        Some(dsp)
    }

    pub fn to_hub(&self) -> Option<Hub> {
        self.transport.try_clone()
    }
}

impl Builder {
    pub fn new() -> Self {
        Default::default()
    }

    /// Uses devices managed by a remote instance of minidspd
    pub async fn with_http(&mut self, s: &str) -> Result<&mut Self, transport::ws::Error> {
        let url = Url2::try_parse(s)?;
        self.candidate_devices.extend(
            transport::ws::discover(&url)
                .await?
                .into_iter()
                .map(|url| (url.to_url(), Box::new(url) as Box<dyn Openable>)),
        );
        Ok(self)
    }

    /// Uses devices managed by a local instance of minidspd
    #[cfg(target_family = "unix")]
    pub async fn with_unix_socket(
        &mut self,
        socket_path: &str,
    ) -> Result<&mut Self, transport::ws::Error> {
        self.candidate_devices.extend(
            transport::ws::discover_unix(socket_path)
                .await?
                .into_iter()
                .map(|device| (device.to_url(), Box::new(device) as Box<dyn Openable>)),
        );
        Ok(self)
    }

    #[cfg(feature = "hid")]
    fn extend_hid_device(&mut self, devices: impl IntoIterator<Item = hid::Device>) {
        let devices = devices
            .into_iter()
            .map(|dev| (Openable::to_url(&dev), Box::new(dev) as Box<dyn Openable>));

        self.candidate_devices.extend(devices);
    }

    /// Add all local devices matching known minidsp vendor and product ids
    #[cfg(feature = "hid")]
    pub fn with_default_usb(&mut self) -> Result<&mut Self, hid::HidError> {
        let api = hid::initialize_api()?;
        let mut api = api.lock().unwrap();
        self.extend_hid_device(hid::discover(&mut api)?);
        Ok(self)
    }

    /// Add all local devices matching `vid` and `pid`
    #[cfg(feature = "hid")]
    pub fn with_usb_product_id<T: Into<Option<u16>>>(
        &mut self,
        vid: u16,
        pid: T,
    ) -> Result<&mut Self, hid::HidError> {
        let api = hid::initialize_api()?;
        let mut api = api.lock().unwrap();
        let pid = pid.into();
        self.extend_hid_device(hid::discover_with(&mut api, |dev| {
            vid == dev.vendor_id() && (pid.is_none() || pid == Some(dev.product_id()))
        })?);
        Ok(self)
    }

    /// Adds a single usb device by path
    #[cfg(feature = "hid")]
    pub fn with_usb_path(&mut self, path: &str) -> &mut Self {
        self.extend_hid_device(Some(hid::Device {
            id: None,
            path: Some(path.into()),
        }));
        self
    }

    /// Adds a remote compat tcp server, or a wi-dg device
    pub fn with_tcp<T: ToSocketAddrs>(&mut self, sockaddr: T) -> std::io::Result<&mut Self> {
        self.candidate_devices
            .extend(sockaddr.to_socket_addrs()?.map(|sa| {
                let url = format!("tcp://{}", sa);
                let url2 = Url2::parse(&url);
                (url, Box::new(url2) as Box<dyn Openable>)
            }));
        Ok(self)
    }

    /// Adds a device by url
    pub fn with_url(&mut self, s: &str) -> Result<&mut Self, url2::Url2Error> {
        let url2 = Url2::try_parse(s)?;
        self.candidate_devices
            .insert(s.into(), Box::new(url2) as Box<dyn Openable>);
        Ok(self)
    }

    /// Activates console logging at the given level, optionally logging all sent and received frames to a file
    pub fn with_logging<T>(&mut self, level: u8, filename: T) -> &mut Self
    where
        T: Into<Option<PathBuf>>,
    {
        self.options.log_console.replace(level);
        self.options.log_filename = T::into(filename);
        self
    }

    /// Do not probe the device to identify what hardware it is, and use the specified DeviceKind instead.
    pub fn force_device_kind(&mut self, kind: DeviceKind) -> &mut Self {
        self.options.kind.replace(kind);
        self
    }
    /// Probe all candidate devices
    pub fn probe(self) -> Pin<Box<impl Stream<Item = Result<DeviceHandle, MiniDSPError>>>> {
        let Self {
            options,
            candidate_devices,
        } = self;

        let options = Arc::new(options);

        // Attempt to instantiate every candidate device
        Box::pin(
            futures::stream::iter(candidate_devices).then(move |(key, dev)| {
                let options = options.clone();

                async move {
                    let mut transport = dev.open().await?;
                    let mut decoder: Option<Arc<Mutex<Decoder>>> = None;
                    // Apply any logging options
                    if options.log_console.is_some() || options.log_filename.is_some() {
                        let wrapped = crate::logging::transport_logging(
                            transport,
                            options.log_console.unwrap_or_default(),
                            options.log_filename.clone(),
                        );
                        decoder = wrapped.0;
                        transport = wrapped.1;
                    }

                    let hub = Hub::new(transport);

                    // Convert to a service
                    // FIXME: This is convoluted, should be a part of Client
                    // Probe the device for its hw id and serial
                    let mplex = Multiplexer::from_transport(
                        hub.try_clone().ok_or(MiniDSPError::TransportClosed)?,
                    );
                    let svc = Arc::new(Mutex::new(mplex.to_service()));
                    let client = Client::new(svc);

                    let device_info = client.get_device_info().await?;
                    let device_spec = match options.kind {
                        None => device::probe(&device_info),
                        Some(k) => device::by_kind(k),
                    };

                    #[cfg(feature = "devices")]
                    if let Some(decoder) = decoder {
                        let mut decoder = decoder.lock().await;
                        decoder.set_name_map(device_spec.symbols.iter().copied());
                    }
                    Ok::<_, MiniDSPError>(DeviceHandle {
                        url: key,
                        transport: hub,
                        device_info,
                        device_spec,
                    })
                }
            }),
        )
    }
}

#[cfg(test)]
mod tests {
    use std::path::PathBuf;

    use super::*;

    #[tokio::test]
    async fn test_builder() {
        // Device selection. Options are *additive*
        let mut b = Builder::new();
        b
            // Discover any matching usb devices
            .with_default_usb()
            .unwrap()
            // Discover usb devices matching the given vid+pid
            .with_usb_product_id(0x2752, 0x0011)
            .unwrap()
            // Use a single usb device by path
            .with_usb_path("usb:")
            // Connect via tcp (yields one device)
            .with_tcp("127.0.0.1:5333")
            .unwrap()
            // Connect to a specific device by url (hid,tcp,websocket,etc.)
            .with_url("ws://127.0.0.1:5380/devices/0/ws")
            .unwrap()
            // Console transport logging
            .with_logging(0, PathBuf::from("file.log"))
            // Probing/device options
            .force_device_kind(DeviceKind::M2x4Hd);
        // Probe all matching devices
        let _ = b.probe().next().await.unwrap();
    }
}