1use std::future::Future;
2use std::time::Duration;
3
4use futures::stream::FuturesUnordered as Unordered;
5use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
6
7use crate::device::query::Query;
8use crate::device::query::Value as QueryValue;
9use crate::device::serial::SerialNumber as Sn;
10use crate::device::{wait_mode_storage, wait_mode_data, Device};
11use crate::error::Error;
12use crate::interface::r#async::Out;
13use crate::mount::{MountAsync, MountHandle};
14use crate::mount::MountedDevice;
15use crate::mount::volume::volumes_for_map;
16use crate::retry::{DefaultIterTime, Retries, IterTime};
17use crate::usb::discover::devices_storage;
18use crate::usb;
19use crate::serial::{self, dev_with_port};
20use crate::interface;
21
22
23type Result<T = (), E = Error> = std::result::Result<T, E>;
24
25
26#[cfg_attr(feature = "tracing", tracing::instrument(fields(dev = mount.device.to_string(),
33 mount = mount.handle.volume().path().as_ref().display().to_string(),
34 )))]
35pub async fn wait_fs_available<T>(mount: &MountedDevice, retry: Retries<T>) -> Result
36 where T: Clone + std::fmt::Debug + IterTime {
37 let total = &retry.total;
38 let iter_ms = retry.iters.interval(total);
39 let retries_num = total.as_millis() / iter_ms.as_millis();
40 debug!("retries: {retries_num} * {iter_ms:?} ≈ {total:?}.");
41
42 let mut counter = retries_num;
43 #[cfg(all(feature = "tokio", not(feature = "async-std")))]
44 let mut interval = tokio::time::interval(iter_ms);
45
46 let check = || {
47 mount.handle
48 .path()
49 .try_exists()
50 .inspect_err(|err| debug!("{err}"))
51 .ok()
52 .unwrap_or_default()
53 .then(|| {
54 let path = mount.handle.path();
55 match std::fs::read_dir(path).inspect_err(|err| debug!("{err}")) {
56 Ok(entries) => entries.into_iter().flatten().next().is_some(),
58 _ => false,
59 }
60 })
61 .unwrap_or_default()
62 };
63
64 if check() {
65 trace!("filesystem available at {}", mount.handle.path().display());
66 return Ok(());
67 }
68
69 while {
70 counter -= 1;
71 counter
72 } != 0
73 {
74 #[cfg(all(not(feature = "async-std"), feature = "tokio"))]
75 interval.tick().await;
76 #[cfg(feature = "async-std")]
77 async_std::task::sleep(iter_ms).await;
78 #[cfg(all(not(feature = "tokio"), not(feature = "async-std")))]
79 std::thread::sleep(iter_ms);
80
81 if check() {
82 return Ok(());
83 } else {
84 trace!(
85 "{dev}: waiting filesystem availability, try: {i}/{retries_num}",
86 dev = mount.device,
87 i = retries_num - counter,
88 );
89 }
90 }
91
92 Err(Error::timeout(format!(
93 "{dev}: filesystem not available at {path} after {retries_num} retries",
94 dev = mount.device,
95 path = mount.handle.path().display(),
96 )))
97}
98
99
100pub async fn wait_fs_available_with_user<T>(mount: &MountedDevice, retry: Retries<T>) -> Result
102 where T: Clone + std::fmt::Debug + IterTime {
103 match wait_fs_available(mount, retry).await {
104 Ok(_) => (),
105 Err(err) => {
106 error!("{err}");
107 warn!(
108 "Still waiting mounted device at {}.",
109 mount.handle.volume.path().display()
110 );
111
112 let last_chance = fs_available_wait_time();
113 wait_fs_available(mount, last_chance).await?
114 },
115 }
116
117 Ok(())
118}
119
120
121pub const FS_AVAILABLE_AWAIT_ENV: &str = "PD_MOUNT_TIMEOUT";
122const FS_AVAILABLE_AWAIT_TIMEOUT: Duration = Duration::from_secs(60);
123
124pub fn fs_available_wait_time() -> Retries<Duration> {
125 let t = match std::env::var_os(FS_AVAILABLE_AWAIT_ENV).map(|s| s.to_string_lossy().trim().parse::<u64>()) {
126 Some(Ok(v)) => Duration::from_millis(v as _),
127 Some(Err(err)) => {
128 error!("Invalid ms value of {FS_AVAILABLE_AWAIT_ENV}: {err}, using default timeout.");
129 FS_AVAILABLE_AWAIT_TIMEOUT
130 },
131 None => FS_AVAILABLE_AWAIT_TIMEOUT,
132 };
133 Retries::new(Duration::from_millis(200), t)
134}
135
136pub const UNMOUNT_AWAIT_ENV: &str = "PD_UNMOUNT_TIMEOUT";
137const UNMOUNT_AWAIT_TIMEOUT: Duration = Duration::from_secs(60);
138
139pub fn unmount_wait_time() -> Retries<Duration> {
140 let t = match std::env::var_os(UNMOUNT_AWAIT_ENV).map(|s| s.to_string_lossy().trim().parse::<u64>()) {
141 Some(Ok(v)) => Duration::from_millis(v as _),
142 Some(Err(err)) => {
143 error!("Invalid ms value of '{UNMOUNT_AWAIT_ENV}': {err}, using default timeout.");
144 UNMOUNT_AWAIT_TIMEOUT
145 },
146 None => UNMOUNT_AWAIT_TIMEOUT,
147 };
148 Retries::new(Duration::from_millis(100), t)
149}
150
151
152#[cfg_attr(feature = "tracing", tracing::instrument())]
153pub async fn mount(query: Query) -> Result<impl Stream<Item = Result<MountedDevice>>> {
154 match query.value {
155 Some(QueryValue::Path(port)) => {
156 let fut = mount_by_port_name(port.display().to_string()).await?
157 .left_stream();
158 Ok(fut)
159 },
160 Some(QueryValue::Com(port)) => {
161 let fut = mount_by_port_name(format!("COM{port}")).await?.left_stream();
162 Ok(fut)
163 },
164 Some(QueryValue::Serial(sn)) => Ok(mount_by_sn_mb(Some(sn)).await?.right_stream()),
165 _ => Ok(mount_by_sn_mb(None).await?.right_stream()),
166 }
167}
168
169
170#[cfg_attr(feature = "tracing", tracing::instrument())]
173pub async fn mount_and(query: Query, wait: bool) -> Result<impl Stream<Item = Result<MountedDevice>>> {
174 let fut = mount(query).await?.flat_map(move |res| {
175 async move {
176 match res {
177 Ok(drive) => {
178 if wait {
179 let retry = fs_available_wait_time();
180 wait_fs_available_with_user(&drive, retry).await?
181 }
182 Ok(drive)
183 },
184 Err(err) => Err(err),
185 }
186 }.into_stream()
187 });
188 Ok(fut)
189}
190
191
192#[cfg_attr(feature = "tracing", tracing::instrument())]
193pub async fn mount_by_sn_mb(sn: Option<Sn>) -> Result<Unordered<impl Future<Output = Result<MountedDevice>>>> {
194 let devices = usb::discover::devices_with(sn)?;
195 let mounting = devices.map(mount_dev);
196
197 let futures = Unordered::new();
198 for dev in mounting {
199 futures.push(dev?);
200 }
201
202 if futures.is_empty() {
203 Err(Error::not_found())
204 } else {
205 Ok(futures)
206 }
207}
208
209
210#[cfg_attr(feature = "tracing", tracing::instrument(fields(port = port.as_ref())))]
211pub async fn mount_by_port_name<S: AsRef<str>>(
212 port: S)
213 -> Result<Unordered<impl Future<Output = Result<MountedDevice>>>> {
214 let port = port.as_ref();
215 let existing = serial::discover::ports().map(|ports| {
216 ports.into_iter()
217 .find(|p| p.port_name == port)
218 .map(serial::Interface::new)
219 });
220
221 let futures = Unordered::new();
222
223 let err_not_found = || futures_lite::future::ready(Err(Error::not_found()));
224
225 match existing {
226 Ok(Some(port)) => {
227 if let serialport::SerialPortType::UsbPort(serialport::UsbPortInfo { serial_number: Some(ref sn),
228 .. }) = port.info().port_type
229 {
230 let dev = Sn::try_from(sn.as_str()).map_err(Error::from)
231 .and_then(|sn| usb::discover::device(&sn));
232 match dev {
233 Ok(mut dev) => {
234 dev.set_interface(interface::Interface::Serial(port));
235 futures.push(mount_dev(dev)?.left_future());
236 },
237 Err(err) => {
238 let name = port.info().port_name.as_str();
239 error!("Unable to map specified port {name} to device: {err}");
240 port.mount().await?;
241 futures.push(err_not_found().right_future());
242 },
243 }
244 }
245 },
246 Ok(None) => {
247 match dev_with_port(port).await {
248 Ok(dev) => futures.push(mount_dev(dev)?.left_future()),
249 Err(err) => {
250 let name = port;
251 error!("Unable to map specified port {name} to device: {err}");
252 let port = serial::open(name)?;
253 let interface = serial::Interface::new_with(port, Some(name.to_string()));
254 interface.send_cmd(crate::device::command::Command::Datadisk)
255 .await?;
256 futures.push(err_not_found().right_future());
257 },
258 }
259 },
260 Err(err) => {
261 error!("{err}");
262 match dev_with_port(port).await {
263 Ok(dev) => futures.push(mount_dev(dev)?.left_future()),
264 Err(err) => {
265 let name = port;
266 error!("Unable to map specified port {name} to device: {err}");
267 let port = serial::open(name)?;
268 let interface = serial::Interface::new_with(port, Some(name.to_string()));
269 interface.send_cmd(crate::device::command::Command::Datadisk)
270 .await?;
271 futures.push(err_not_found().right_future());
272 },
273 }
274 },
275 }
276
277 if futures.is_empty() {
278 Err(Error::not_found())
279 } else {
280 Ok(futures)
281 }
282}
283
284
285#[cfg_attr(feature = "tracing", tracing::instrument(fields(dev = dev.info().serial_number())))]
286fn mount_dev(mut dev: Device) -> Result<impl Future<Output = Result<MountedDevice>>> {
287 let retry = Retries::<DefaultIterTime>::default();
288 let mut retry_wait_mount_point = retry.clone();
289 retry_wait_mount_point.total += Duration::from_secs(40);
290
291 trace!("mounting {dev}");
292 let fut = match dev.mode_cached() {
293 usb::mode::Mode::Data => {
294 trace!("create sending fut");
295 async move {
296 dev.open()?;
297 dev.interface()?
298 .send_cmd(crate::device::command::Command::Datadisk)
299 .await?;
300 dev.close();
301 Ok(dev)
302 }.and_then(|dev| wait_mode_storage(dev, retry))
303 .left_future()
304 },
305 usb::mode::Mode::Storage => futures_lite::future::ready(Ok(dev)).right_future(),
306 mode => return Err(Error::WrongState(mode)),
307 };
308 Ok(fut.and_then(|dev| wait_mount_point(dev, retry_wait_mount_point)))
309}
310
311
312#[cfg_attr(feature = "tracing", tracing::instrument(fields(dev = dev.info().serial_number())))]
313async fn wait_mount_point<T>(dev: Device, retry: Retries<T>) -> Result<MountedDevice>
314 where T: Clone + std::fmt::Debug + IterTime {
315 let total = &retry.total;
316 let iter_ms = retry.iters.interval(total);
317 let retries_num = total.as_millis() / iter_ms.as_millis();
318 debug!("retries: {retries_num} * {iter_ms:?} ≈ {total:?}.");
319
320 let mut counter = retries_num;
321 #[cfg(all(feature = "tokio", not(feature = "async-std")))]
322 let mut interval = tokio::time::interval(iter_ms);
323
324 let sn = dev.info()
325 .serial_number()
326 .ok_or_else(|| Error::DeviceSerial { source: "unknown".into() })?
327 .to_owned();
328
329 while {
330 counter -= 1;
331 counter
332 } != 0
333 {
334 #[cfg(all(not(feature = "async-std"), feature = "tokio"))]
335 interval.tick().await;
336 #[cfg(feature = "async-std")]
337 async_std::task::sleep(iter_ms).await;
338 #[cfg(all(not(feature = "tokio"), not(feature = "async-std")))]
339 std::thread::sleep(iter_ms);
340
341 let mode = dev.mode_cached();
342 trace!(
343 "waiting mount point availability: {sn}, current: {mode}, try: {}/{retries_num}",
344 retries_num - counter
345 );
346
347 let vol = crate::mount::volume::volume_for(&dev).await
348 .map_err(|err| debug!("ERROR: {err}"))
349 .ok();
350 if let Some(vol) = vol {
351 debug!("{sn} mounted, volume found: '{vol}'");
352 let handle = MountHandle::new(vol, false);
353 let mounted = MountedDevice::new(dev, handle);
354 return Ok(mounted);
355 } else {
356 debug!("mount point still not found, waiting...")
357 }
358 }
359
360 Err(Error::usb_timeout(dev))
361}
362
363
364#[cfg_attr(feature = "tracing", tracing::instrument())]
365pub async fn unmount(query: Query) -> Result<impl Stream<Item = (Device, Result)>> {
366 match query.value {
367 Some(QueryValue::Path(path)) => {
368 todo!("unmount dev by vol path: {}", path.display())
373 },
374 Some(QueryValue::Com(_)) => todo!("ERROR: not supported (impossible)"),
375 Some(QueryValue::Serial(sn)) => unmount_mb_sn(Some(sn)),
376 _ => unmount_mb_sn(None),
377 }.map_ok(|stream| {
378 stream.inspect(|(dev, res)| {
379 if let Some(err) = res.as_ref().err() {
380 error!("{dev}: {err}");
381 warn!("Please press 'A' on the Playdate to exit Data Disk mode.");
382 }
383 })
384 })
385 .await
386}
387
388#[cfg_attr(feature = "tracing", tracing::instrument())]
390pub async fn unmount_and_wait<T>(query: Query, retry: Retries<T>) -> Result<impl Stream<Item = Result<Device>>>
391 where T: Clone + std::fmt::Debug + IterTime {
392 let stream = Unordered::new();
393 unmount(query).await?
394 .for_each_concurrent(4, |(dev, res)| {
395 if let Some(err) = res.err() {
396 error!("{dev}: {err}")
397 }
398 stream.push(wait_mode_data(dev, retry.clone()));
399 futures_lite::future::ready(())
400 })
401 .await;
402
403 trace!("Waiting state change for {} devices.", stream.len());
404 Ok(stream)
405}
406
407#[cfg_attr(feature = "tracing", tracing::instrument())]
410pub async fn unmount_and(query: Query, wait: bool) -> Result<impl Stream<Item = Result<Device>>> {
411 let results = if wait {
412 let retry = unmount_wait_time();
413 unmount_and_wait(query, retry).await?.left_stream()
414 } else {
415 unmount(query).await?
416 .map(|(dev, res)| res.map(|_| dev))
417 .right_stream()
418 };
419
420 Ok(results)
421}
422
423
424#[cfg_attr(feature = "tracing", tracing::instrument())]
425pub async fn unmount_mb_sn(sn: Option<Sn>) -> Result<Unordered<impl Future<Output = (Device, Result)>>> {
426 let devs = devices_storage()?.filter(move |dev| {
427 sn.as_ref()
428 .filter(|qsn| dev.info().serial_number().filter(|ref s| qsn.eq(s)).is_some())
429 .is_some() ||
430 sn.is_none()
431 })
432 .inspect(|dev| trace!("Unmounting {dev}"));
433
434 let unmounting = volumes_for_map(devs).await?
435 .into_iter()
436 .filter_map(|(dev, vol)| vol.map(|vol| (dev, vol)))
437 .inspect(|(dev, vol)| trace!("Unmounting {dev}: {vol}"))
438 .map(|(dev, vol)| {
439 let h = MountHandle::new(vol, false);
440 MountedDevice::new(dev, h)
441 })
442 .map(move |mut dev| {
443 use crate::mount::UnmountAsync;
444 async move {
445 dev.device.close();
446 let res = dev.unmount().await;
447 (dev.device, res)
448 }
449 })
450 .collect::<Unordered<_>>();
451
452 trace!("Unmounting {} devices.", unmounting.len());
453 Ok(unmounting)
454}