playdate_device/usb/
io.rs

1use std::io::Write;
2
3use futures_lite::future::block_on;
4
5
6use futures_lite::StreamExt;
7use nusb::transfer::RequestBuffer;
8use nusb::{DeviceInfo, Interface};
9
10use crate::device::Device;
11use crate::error::Error;
12use crate::serial::redirect_interface_to_stdout as redirect_serial_to_stdout;
13use crate::usb::mode::DeviceMode;
14use crate::usb::mode::Mode;
15use crate::usb::BULK_IN;
16
17
18#[cfg_attr(feature = "tracing", tracing::instrument(skip(interface)))]
19pub fn read_interface(interface: &Interface,
20                      buf_size: usize,
21                      bufs: usize)
22                      -> Result<impl futures_lite::stream::Stream<Item = Result<String, Error>>, Error> {
23	let mut inp = interface.bulk_in_queue(BULK_IN);
24
25	// preallocate buffers
26	while inp.pending() < bufs {
27		inp.submit(RequestBuffer::new(buf_size));
28	}
29
30	let stream = futures_lite::stream::poll_fn(move |ctx| {
31		inp.poll_next(ctx)
32		   .map(|out| -> Result<_, Error> {
33			   let data = out.into_result()?;
34			   let s = std::str::from_utf8(&data)?.to_owned();
35			   inp.submit(RequestBuffer::reuse(data, buf_size));
36			   Ok(s)
37		   })
38		   .map(Some)
39	});
40
41	Ok(stream)
42}
43
44#[cfg_attr(feature = "tracing", tracing::instrument(skip(interface, map)))]
45pub fn read_while_map<F, T>(interface: &Interface,
46                            buf_size: usize,
47                            buffers: usize,
48                            mut map: F)
49                            -> Result<impl futures_lite::stream::Stream<Item = T>, Error>
50	where F: FnMut(&[u8]) -> Option<T>
51{
52	let mut inp = interface.bulk_in_queue(BULK_IN);
53
54	// preallocate buffers
55	while inp.pending() < buffers {
56		inp.submit(RequestBuffer::new(buf_size));
57	}
58
59	let stream = futures_lite::stream::poll_fn(move |ctx| {
60		inp.poll_next(ctx).map(|out| -> Option<_> {
61			                  match out.into_result() {
62				                  Ok(data) => {
63				                     let res = map(data.as_slice());
64				                     if res.is_some() {
65					                     inp.submit(RequestBuffer::reuse(data, buf_size));
66				                     } else {
67					                     trace!("cancel all IN queue, by predicate.");
68					                     inp.cancel_all();
69				                     }
70				                     res
71			                     },
72			                     Err(err) => {
73				                     trace!("cancel all IN queue, by err: {err}.");
74				                     inp.cancel_all();
75				                     None
76			                     },
77			                  }
78		                  })
79	});
80
81	Ok(stream)
82}
83
84
85#[cfg_attr(feature = "tracing", tracing::instrument)]
86pub fn read_once(device: DeviceInfo) -> Result<(String, Interface), Error> {
87	let mode = device.mode();
88	if !matches!(mode, Mode::Data) {
89		return Err(Error::WrongState(mode));
90	}
91
92
93	let device = device.open()?;
94	let inter = device.claim_interface(1)?;
95
96	let stream = read_while_map(&inter, 256, 2, |data| {
97		             match std::str::from_utf8(data) {
98			             Ok(s) => {
99			                if s.trim().is_empty() {
100				                None
101			                } else {
102				                Some(s.to_owned())
103			                }
104		                },
105		                Err(err) => {
106			                error!("{err:?}");
107			                None
108		                },
109		             }
110	             })?.fold(String::new(), |acc, ref s| acc + s);
111	let s = block_on(stream);
112	Ok((s, inter))
113}
114
115
116#[cfg_attr(feature = "tracing", tracing::instrument)]
117pub async fn redirect_to_stdout(device: &mut Device) -> Result<(), Error> {
118	let mode = device.mode();
119	if !matches!(mode, Mode::Data) {
120		return Err(Error::WrongState(mode));
121	}
122
123	device.open()?;
124	redirect_interface_to_stdout(device.interface_mut()?).await?;
125
126	Ok(())
127}
128
129#[cfg_attr(feature = "tracing", tracing::instrument)]
130pub async fn redirect_interface_to_stdout(interface: &mut crate::interface::Interface) -> Result<(), Error> {
131	match interface {
132		crate::interface::Interface::Usb(interface) => {
133			let mut stdout = std::io::stdout();
134			let to_stdout = move |data: &[u8]| stdout.write_all(data).inspect_err(|err| error!("{err}")).ok();
135			let stream = read_while_map(&interface.inner, 256, 2, to_stdout)?;
136			if stream.last().await.is_some() {
137				trace!("Read stream complete.");
138			}
139		},
140		crate::interface::Interface::Serial(interface) => {
141			interface.open()?;
142			redirect_serial_to_stdout(interface).await?;
143		},
144	}
145	Ok(())
146}