1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
use std::borrow::Cow;
use std::path::PathBuf;

use futures::stream::FuturesUnordered;
use futures::{FutureExt, TryStreamExt};
use futures_lite::StreamExt;

use crate::device::query::Query;
use crate::device::wait_mode_data;
use crate::error::Error;
use crate::mount::UnmountAsync;
use crate::{install, device, usb, interface};


type Result<T = (), E = Error> = std::result::Result<T, E>;


#[cfg_attr(feature = "tracing", tracing::instrument)]
pub async fn run(query: Query,
                 pdx: PathBuf,
                 no_install: bool,
                 no_read: bool,
                 force: bool)
                 -> Result<Vec<device::Device>> {
	use crate::retry::{DefaultIterTime, Retries};
	let wait_data = Retries::<DefaultIterTime>::default();


	let to_run = if !no_install {
		install::mount_and_install(query, &pdx, force).await?
		                                              .filter_map(|r| r.map_err(|e| error!("{e}")).ok())
		                                              .flat_map(|path| {
			                                              async {
				                                              let (mount, path) = path.into_parts();
				                                              mount.unmount().await?;
				                                              wait_mode_data(mount.device, wait_data.clone()).await
				                                                                                         .map(|dev| {
					                                                                                         (dev, path.into())
				                                                                                         })
			                                              }.into_stream()
			                                              .filter_map(move |r| r.inspect_err(|e| error!("{e}")).ok())
		                                              })
		                                              .collect::<Vec<(device::Device, Cow<_>)>>()
		                                              .await
	} else {
		usb::discover::devices_data()?.map(|dev| (dev, pdx.to_string_lossy()))
		                              .collect()
	};


	let mut to_read = Vec::with_capacity(to_run.len());
	let readers = FuturesUnordered::new();

	for (mut device, path) in to_run {
		use interface::r#async::Out;

		device.open()?;
		{
			let interface = device.interface()?;
			interface.send_cmd(device::command::Command::Run { path: path.into_owned() })
			         .await?;
		}

		if !no_read {
			to_read.push(device);
		}
	}

	if !no_read {
		for device in to_read.iter_mut() {
			readers.push(usb::io::redirect_to_stdout(device));
		}
	}

	readers.inspect_err(|err| error!("{err}"))
	       .try_for_each_concurrent(8, |_| async { Ok(()) })
	       .await?;

	Ok(to_read)
}