playdate_device/mount/
methods.rs

1use std::future::Future;
2use std::time::Duration;
3
4use futures::stream::FuturesUnordered as Unordered;
5use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
6
7use crate::device::query::Query;
8use crate::device::query::Value as QueryValue;
9use crate::device::serial::SerialNumber as Sn;
10use crate::device::{wait_mode_storage, wait_mode_data, Device};
11use crate::error::Error;
12use crate::interface::r#async::Out;
13use crate::mount::{MountAsync, MountHandle};
14use crate::mount::MountedDevice;
15use crate::mount::volume::volumes_for_map;
16use crate::retry::{DefaultIterTime, Retries, IterTime};
17use crate::usb::discover::devices_storage;
18use crate::usb;
19use crate::serial::{self, dev_with_port};
20use crate::interface;
21
22
23type Result<T = (), E = Error> = std::result::Result<T, E>;
24
25
26/// Recommended total time for retries is 30 seconds or more.
27///
28/// ```ignore
29/// let retry = Retries::new(Duration::from_secs(1), Duration::from_secs(60));
30/// mount::wait_fs_available(drive, retry).await?;
31/// ```
32#[cfg_attr(feature = "tracing", tracing::instrument(fields(dev = mount.device.to_string(),
33                                                           mount = mount.handle.volume().path().as_ref().display().to_string(),
34																			)))]
35pub async fn wait_fs_available<T>(mount: &MountedDevice, retry: Retries<T>) -> Result
36	where T: Clone + std::fmt::Debug + IterTime {
37	let total = &retry.total;
38	let iter_ms = retry.iters.interval(total);
39	let retries_num = total.as_millis() / iter_ms.as_millis();
40	debug!("retries: {retries_num} * {iter_ms:?} ≈ {total:?}.");
41
42	let mut counter = retries_num;
43	#[cfg(all(feature = "tokio", not(feature = "async-std")))]
44	let mut interval = tokio::time::interval(iter_ms);
45
46	let check = || {
47		mount.handle
48		     .path()
49		     .try_exists()
50		     .inspect_err(|err| debug!("{err}"))
51		     .ok()
52		     .unwrap_or_default()
53		     .then(|| {
54			     let path = mount.handle.path();
55			     match std::fs::read_dir(path).inspect_err(|err| debug!("{err}")) {
56				     // then find first dir entry:
57			        Ok(entries) => entries.into_iter().flatten().next().is_some(),
58			        _ => false,
59			     }
60		     })
61		     .unwrap_or_default()
62	};
63
64	if check() {
65		trace!("filesystem available at {}", mount.handle.path().display());
66		return Ok(());
67	}
68
69	while {
70		counter -= 1;
71		counter
72	} != 0
73	{
74		#[cfg(all(not(feature = "async-std"), feature = "tokio"))]
75		interval.tick().await;
76		#[cfg(feature = "async-std")]
77		async_std::task::sleep(iter_ms).await;
78		#[cfg(all(not(feature = "tokio"), not(feature = "async-std")))]
79		std::thread::sleep(iter_ms);
80
81		if check() {
82			return Ok(());
83		} else {
84			trace!(
85			       "{dev}: waiting filesystem availability, try: {i}/{retries_num}",
86			       dev = mount.device,
87			       i = retries_num - counter,
88			);
89		}
90	}
91
92	Err(Error::timeout(format!(
93		"{dev}: filesystem not available at {path} after {retries_num} retries",
94		dev = mount.device,
95		path = mount.handle.path().display(),
96	)))
97}
98
99
100/// Double wait time and notify user in between of halfs.
101pub async fn wait_fs_available_with_user<T>(mount: &MountedDevice, retry: Retries<T>) -> Result
102	where T: Clone + std::fmt::Debug + IterTime {
103	match wait_fs_available(mount, retry).await {
104		Ok(_) => (),
105		Err(err) => {
106			error!("{err}");
107			warn!(
108			      "Still waiting mounted device at {}.",
109			      mount.handle.volume.path().display()
110			);
111
112			let last_chance = fs_available_wait_time();
113			wait_fs_available(mount, last_chance).await?
114		},
115	}
116
117	Ok(())
118}
119
120
121pub const FS_AVAILABLE_AWAIT_ENV: &str = "PD_MOUNT_TIMEOUT";
122const FS_AVAILABLE_AWAIT_TIMEOUT: Duration = Duration::from_secs(60);
123
124pub fn fs_available_wait_time() -> Retries<Duration> {
125	let t = match std::env::var_os(FS_AVAILABLE_AWAIT_ENV).map(|s| s.to_string_lossy().trim().parse::<u64>()) {
126		Some(Ok(v)) => Duration::from_millis(v as _),
127		Some(Err(err)) => {
128			error!("Invalid ms value of {FS_AVAILABLE_AWAIT_ENV}: {err}, using default timeout.");
129			FS_AVAILABLE_AWAIT_TIMEOUT
130		},
131		None => FS_AVAILABLE_AWAIT_TIMEOUT,
132	};
133	Retries::new(Duration::from_millis(200), t)
134}
135
136pub const UNMOUNT_AWAIT_ENV: &str = "PD_UNMOUNT_TIMEOUT";
137const UNMOUNT_AWAIT_TIMEOUT: Duration = Duration::from_secs(60);
138
139pub fn unmount_wait_time() -> Retries<Duration> {
140	let t = match std::env::var_os(UNMOUNT_AWAIT_ENV).map(|s| s.to_string_lossy().trim().parse::<u64>()) {
141		Some(Ok(v)) => Duration::from_millis(v as _),
142		Some(Err(err)) => {
143			error!("Invalid ms value of '{UNMOUNT_AWAIT_ENV}': {err}, using default timeout.");
144			UNMOUNT_AWAIT_TIMEOUT
145		},
146		None => UNMOUNT_AWAIT_TIMEOUT,
147	};
148	Retries::new(Duration::from_millis(100), t)
149}
150
151
152#[cfg_attr(feature = "tracing", tracing::instrument())]
153pub async fn mount(query: Query) -> Result<impl Stream<Item = Result<MountedDevice>>> {
154	match query.value {
155		Some(QueryValue::Path(port)) => {
156			let fut = mount_by_port_name(port.display().to_string()).await?
157			                                                        .left_stream();
158			Ok(fut)
159		},
160		Some(QueryValue::Com(port)) => {
161			let fut = mount_by_port_name(format!("COM{port}")).await?.left_stream();
162			Ok(fut)
163		},
164		Some(QueryValue::Serial(sn)) => Ok(mount_by_sn_mb(Some(sn)).await?.right_stream()),
165		_ => Ok(mount_by_sn_mb(None).await?.right_stream()),
166	}
167}
168
169
170/// Switch between stream methods `mount` and `mount then wait_fs_available`,
171/// depending on `wait` parameter.
172#[cfg_attr(feature = "tracing", tracing::instrument())]
173pub async fn mount_and(query: Query, wait: bool) -> Result<impl Stream<Item = Result<MountedDevice>>> {
174	let fut = mount(query).await?.flat_map(move |res| {
175		                             async move {
176			                             match res {
177				                             Ok(drive) => {
178				                                if wait {
179					                                let retry = fs_available_wait_time();
180					                                wait_fs_available_with_user(&drive, retry).await?
181				                                }
182				                                Ok(drive)
183			                                },
184			                                Err(err) => Err(err),
185			                             }
186		                             }.into_stream()
187	                             });
188	Ok(fut)
189}
190
191
192#[cfg_attr(feature = "tracing", tracing::instrument())]
193pub async fn mount_by_sn_mb(sn: Option<Sn>) -> Result<Unordered<impl Future<Output = Result<MountedDevice>>>> {
194	let devices = usb::discover::devices_with(sn)?;
195	let mounting = devices.map(mount_dev);
196
197	let futures = Unordered::new();
198	for dev in mounting {
199		futures.push(dev?);
200	}
201
202	if futures.is_empty() {
203		Err(Error::not_found())
204	} else {
205		Ok(futures)
206	}
207}
208
209
210#[cfg_attr(feature = "tracing", tracing::instrument(fields(port = port.as_ref())))]
211pub async fn mount_by_port_name<S: AsRef<str>>(
212	port: S)
213	-> Result<Unordered<impl Future<Output = Result<MountedDevice>>>> {
214	let port = port.as_ref();
215	let existing = serial::discover::ports().map(|ports| {
216		                                        ports.into_iter()
217		                                             .find(|p| p.port_name == port)
218		                                             .map(serial::Interface::new)
219	                                        });
220
221	let futures = Unordered::new();
222
223	let err_not_found = || futures_lite::future::ready(Err(Error::not_found()));
224
225	match existing {
226		Ok(Some(port)) => {
227			if let serialport::SerialPortType::UsbPort(serialport::UsbPortInfo { serial_number: Some(ref sn),
228			                                                                     .. }) = port.info().port_type
229			{
230				let dev = Sn::try_from(sn.as_str()).map_err(Error::from)
231				                                   .and_then(|sn| usb::discover::device(&sn));
232				match dev {
233					Ok(mut dev) => {
234						dev.set_interface(interface::Interface::Serial(port));
235						futures.push(mount_dev(dev)?.left_future());
236					},
237					Err(err) => {
238						let name = port.info().port_name.as_str();
239						error!("Unable to map specified port {name} to device: {err}");
240						port.mount().await?;
241						futures.push(err_not_found().right_future());
242					},
243				}
244			}
245		},
246		Ok(None) => {
247			match dev_with_port(port).await {
248				Ok(dev) => futures.push(mount_dev(dev)?.left_future()),
249				Err(err) => {
250					let name = port;
251					error!("Unable to map specified port {name} to device: {err}");
252					let port = serial::open(name)?;
253					let interface = serial::Interface::new_with(port, Some(name.to_string()));
254					interface.send_cmd(crate::device::command::Command::Datadisk)
255					         .await?;
256					futures.push(err_not_found().right_future());
257				},
258			}
259		},
260		Err(err) => {
261			error!("{err}");
262			match dev_with_port(port).await {
263				Ok(dev) => futures.push(mount_dev(dev)?.left_future()),
264				Err(err) => {
265					let name = port;
266					error!("Unable to map specified port {name} to device: {err}");
267					let port = serial::open(name)?;
268					let interface = serial::Interface::new_with(port, Some(name.to_string()));
269					interface.send_cmd(crate::device::command::Command::Datadisk)
270					         .await?;
271					futures.push(err_not_found().right_future());
272				},
273			}
274		},
275	}
276
277	if futures.is_empty() {
278		Err(Error::not_found())
279	} else {
280		Ok(futures)
281	}
282}
283
284
285#[cfg_attr(feature = "tracing", tracing::instrument(fields(dev = dev.info().serial_number())))]
286fn mount_dev(mut dev: Device) -> Result<impl Future<Output = Result<MountedDevice>>> {
287	let retry = Retries::<DefaultIterTime>::default();
288	let mut retry_wait_mount_point = retry.clone();
289	retry_wait_mount_point.total += Duration::from_secs(40);
290
291	trace!("mounting {dev}");
292	let fut = match dev.mode_cached() {
293		usb::mode::Mode::Data => {
294			trace!("create sending fut");
295			async move {
296				dev.open()?;
297				dev.interface()?
298				   .send_cmd(crate::device::command::Command::Datadisk)
299				   .await?;
300				dev.close();
301				Ok(dev)
302			}.and_then(|dev| wait_mode_storage(dev, retry))
303			.left_future()
304		},
305		usb::mode::Mode::Storage => futures_lite::future::ready(Ok(dev)).right_future(),
306		mode => return Err(Error::WrongState(mode)),
307	};
308	Ok(fut.and_then(|dev| wait_mount_point(dev, retry_wait_mount_point)))
309}
310
311
312#[cfg_attr(feature = "tracing", tracing::instrument(fields(dev = dev.info().serial_number())))]
313async fn wait_mount_point<T>(dev: Device, retry: Retries<T>) -> Result<MountedDevice>
314	where T: Clone + std::fmt::Debug + IterTime {
315	let total = &retry.total;
316	let iter_ms = retry.iters.interval(total);
317	let retries_num = total.as_millis() / iter_ms.as_millis();
318	debug!("retries: {retries_num} * {iter_ms:?} ≈ {total:?}.");
319
320	let mut counter = retries_num;
321	#[cfg(all(feature = "tokio", not(feature = "async-std")))]
322	let mut interval = tokio::time::interval(iter_ms);
323
324	let sn = dev.info()
325	            .serial_number()
326	            .ok_or_else(|| Error::DeviceSerial { source: "unknown".into() })?
327	            .to_owned();
328
329	while {
330		counter -= 1;
331		counter
332	} != 0
333	{
334		#[cfg(all(not(feature = "async-std"), feature = "tokio"))]
335		interval.tick().await;
336		#[cfg(feature = "async-std")]
337		async_std::task::sleep(iter_ms).await;
338		#[cfg(all(not(feature = "tokio"), not(feature = "async-std")))]
339		std::thread::sleep(iter_ms);
340
341		let mode = dev.mode_cached();
342		trace!(
343		       "waiting mount point availability: {sn}, current: {mode}, try: {}/{retries_num}",
344		       retries_num - counter
345		);
346
347		let vol = crate::mount::volume::volume_for(&dev).await
348		                                                .map_err(|err| debug!("ERROR: {err}"))
349		                                                .ok();
350		if let Some(vol) = vol {
351			debug!("{sn} mounted, volume found: '{vol}'");
352			let handle = MountHandle::new(vol, false);
353			let mounted = MountedDevice::new(dev, handle);
354			return Ok(mounted);
355		} else {
356			debug!("mount point still not found, waiting...")
357		}
358	}
359
360	Err(Error::usb_timeout(dev))
361}
362
363
364#[cfg_attr(feature = "tracing", tracing::instrument())]
365pub async fn unmount(query: Query) -> Result<impl Stream<Item = (Device, Result)>> {
366	match query.value {
367		Some(QueryValue::Path(path)) => {
368			// TODO: Check query is path and this is mounted volume.
369			// check is `path` is a a path of existing __volume__,
370			// try find device behind the volume,
371			// unmount the volume anyway
372			todo!("unmount dev by vol path: {}", path.display())
373		},
374		Some(QueryValue::Com(_)) => todo!("ERROR: not supported (impossible)"),
375		Some(QueryValue::Serial(sn)) => unmount_mb_sn(Some(sn)),
376		_ => unmount_mb_sn(None),
377	}.map_ok(|stream| {
378		stream.inspect(|(dev, res)| {
379			      if let Some(err) = res.as_ref().err() {
380				      error!("{dev}: {err}");
381				      warn!("Please press 'A' on the Playdate to exit Data Disk mode.");
382			      }
383		      })
384	})
385	.await
386}
387
388/// Unmount device(s), then wait for state change to [`Data`][usb::mode::Mode::Data].
389#[cfg_attr(feature = "tracing", tracing::instrument())]
390pub async fn unmount_and_wait<T>(query: Query, retry: Retries<T>) -> Result<impl Stream<Item = Result<Device>>>
391	where T: Clone + std::fmt::Debug + IterTime {
392	let stream = Unordered::new();
393	unmount(query).await?
394	              .for_each_concurrent(4, |(dev, res)| {
395		              if let Some(err) = res.err() {
396			              error!("{dev}: {err}")
397		              }
398		              stream.push(wait_mode_data(dev, retry.clone()));
399		              futures_lite::future::ready(())
400	              })
401	              .await;
402
403	trace!("Waiting state change for {} devices.", stream.len());
404	Ok(stream)
405}
406
407/// Switch between stream methods `unmount` and `unmount_and_wait`,
408/// depending on `wait` parameter.
409#[cfg_attr(feature = "tracing", tracing::instrument())]
410pub async fn unmount_and(query: Query, wait: bool) -> Result<impl Stream<Item = Result<Device>>> {
411	let results = if wait {
412		let retry = unmount_wait_time();
413		unmount_and_wait(query, retry).await?.left_stream()
414	} else {
415		unmount(query).await?
416		              .map(|(dev, res)| res.map(|_| dev))
417		              .right_stream()
418	};
419
420	Ok(results)
421}
422
423
424#[cfg_attr(feature = "tracing", tracing::instrument())]
425pub async fn unmount_mb_sn(sn: Option<Sn>) -> Result<Unordered<impl Future<Output = (Device, Result)>>> {
426	let devs = devices_storage()?.filter(move |dev| {
427		                             sn.as_ref()
428		                               .filter(|qsn| dev.info().serial_number().filter(|ref s| qsn.eq(s)).is_some())
429		                               .is_some() ||
430		                             sn.is_none()
431	                             })
432	                             .inspect(|dev| trace!("Unmounting {dev}"));
433
434	let unmounting = volumes_for_map(devs).await?
435	                                      .into_iter()
436	                                      .filter_map(|(dev, vol)| vol.map(|vol| (dev, vol)))
437	                                      .inspect(|(dev, vol)| trace!("Unmounting {dev}: {vol}"))
438	                                      .map(|(dev, vol)| {
439		                                      let h = MountHandle::new(vol, false);
440		                                      MountedDevice::new(dev, h)
441	                                      })
442	                                      .map(move |mut dev| {
443		                                      use crate::mount::UnmountAsync;
444		                                      async move {
445			                                      dev.device.close();
446			                                      let res = dev.unmount().await;
447			                                      (dev.device, res)
448		                                      }
449	                                      })
450	                                      .collect::<Unordered<_>>();
451
452	trace!("Unmounting {} devices.", unmounting.len());
453	Ok(unmounting)
454}