1use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
2use futures_lite::stream;
3
4use crate::device::command::Command;
5use crate::{device, usb, interface};
6use crate::error::Error;
7use device::query::Query;
8use interface::r#async::Out;
9
10
11type Result<T = (), E = Error> = std::result::Result<T, E>;
12
13
14#[cfg_attr(feature = "tracing", tracing::instrument())]
17pub async fn send_to_devs(query: Query,
18 cmd: Command,
19 read: bool)
20 -> Result<impl Stream<Item = Result<device::Device>>> {
21 let devices = usb::discover::devices_data_for(query).await?;
22
23 if devices.is_empty() {
24 return Err(Error::not_found());
25 }
26
27 let devices = devices.into_iter().flat_map(|mut dev| {
28 dev.open().inspect_err(|err| error!("{err}")).ok()?;
29 Some(dev)
30 });
31 let stream = stream::iter(devices).flat_map_unordered(None, move |mut dev| {
32 let cmd = cmd.clone();
33 async move {
34 match dev.interface_mut().inspect_err(|err| error!("{err}")) {
35 Ok(interface) => {
36 if read {
37 interface.send_cmd(cmd).await?;
38 usb::io::redirect_interface_to_stdout(interface).await?;
39 } else {
40 interface.send_cmd(cmd).await?;
41 }
42 Ok(())
43 },
44 Err(err) => Err(err),
45 }?;
46 Ok::<_, Error>(dev)
47 }.into_stream()
48 .boxed_local()
49 });
50 Ok(stream)
51}
52
53
54#[cfg_attr(feature = "tracing", tracing::instrument())]
55pub async fn send_to_interfaces(query: Query,
56 cmd: Command)
57 -> Result<impl Stream<Item = Result<interface::Interface>>> {
58 usb::discover::for_each_data_interface(query, move |interface| {
59 let cmd = cmd.clone();
60 async move {
61 interface.send_cmd(cmd.clone())
62 .inspect_err(|err| error!("{err}"))
63 .await?;
64 Ok::<_, Error>(interface)
65 }
66 }).await
67}