1use std::borrow::Cow;
2use std::path::PathBuf;
3
4use futures::stream::FuturesUnordered;
5use futures::{FutureExt, TryStreamExt};
6use futures_lite::StreamExt;
7
8use crate::device::query::Query;
9use crate::device::wait_mode_data;
10use crate::error::Error;
11use crate::mount::UnmountAsync;
12use crate::{install, device, usb, interface};
13
14
15type Result<T = (), E = Error> = std::result::Result<T, E>;
16
17
18#[cfg_attr(feature = "tracing", tracing::instrument)]
19pub async fn run(query: Query,
20 pdx: PathBuf,
21 no_install: bool,
22 no_read: bool,
23 force: bool)
24 -> Result<Vec<device::Device>> {
25 use crate::retry::{DefaultIterTime, Retries};
26 let wait_data = Retries::<DefaultIterTime>::default();
27
28
29 let to_run = if !no_install {
30 install::mount_and_install(query, &pdx, force).await?
31 .filter_map(|r| r.map_err(|e| error!("{e}")).ok())
32 .flat_map(|path| {
33 async {
34 let (mount, path) = path.into_parts();
35 mount.unmount().await?;
36 wait_mode_data(mount.device, wait_data.clone()).await
37 .map(|dev| {
38 (dev, path.into())
39 })
40 }.into_stream()
41 .filter_map(move |r| r.inspect_err(|e| error!("{e}")).ok())
42 })
43 .collect::<Vec<(device::Device, Cow<_>)>>()
44 .await
45 } else {
46 usb::discover::devices_data()?.map(|dev| (dev, pdx.to_string_lossy()))
47 .collect()
48 };
49
50
51 let mut to_read = Vec::with_capacity(to_run.len());
52 let readers = FuturesUnordered::new();
53
54 for (mut device, path) in to_run {
55 use interface::r#async::Out;
56
57 device.open()?;
58 {
59 let interface = device.interface()?;
60 interface.send_cmd(device::command::Command::Run { path: path.into_owned() })
61 .await?;
62 }
63
64 if !no_read {
65 to_read.push(device);
66 }
67 }
68
69 if !no_read {
70 for device in to_read.iter_mut() {
71 readers.push(usb::io::redirect_to_stdout(device));
72 }
73 }
74
75 readers.inspect_err(|err| error!("{err}"))
76 .try_for_each_concurrent(8, |_| async { Ok(()) })
77 .await?;
78
79 Ok(to_read)
80}