playdate_device/usb/
io.rs1use std::io::Write;
2
3use futures_lite::future::block_on;
4
5
6use futures_lite::StreamExt;
7use nusb::transfer::RequestBuffer;
8use nusb::{DeviceInfo, Interface};
9
10use crate::device::Device;
11use crate::error::Error;
12use crate::serial::redirect_interface_to_stdout as redirect_serial_to_stdout;
13use crate::usb::mode::DeviceMode;
14use crate::usb::mode::Mode;
15use crate::usb::BULK_IN;
16
17
18#[cfg_attr(feature = "tracing", tracing::instrument(skip(interface)))]
19pub fn read_interface(interface: &Interface,
20 buf_size: usize,
21 bufs: usize)
22 -> Result<impl futures_lite::stream::Stream<Item = Result<String, Error>>, Error> {
23 let mut inp = interface.bulk_in_queue(BULK_IN);
24
25 while inp.pending() < bufs {
27 inp.submit(RequestBuffer::new(buf_size));
28 }
29
30 let stream = futures_lite::stream::poll_fn(move |ctx| {
31 inp.poll_next(ctx)
32 .map(|out| -> Result<_, Error> {
33 let data = out.into_result()?;
34 let s = std::str::from_utf8(&data)?.to_owned();
35 inp.submit(RequestBuffer::reuse(data, buf_size));
36 Ok(s)
37 })
38 .map(Some)
39 });
40
41 Ok(stream)
42}
43
44#[cfg_attr(feature = "tracing", tracing::instrument(skip(interface, map)))]
45pub fn read_while_map<F, T>(interface: &Interface,
46 buf_size: usize,
47 buffers: usize,
48 mut map: F)
49 -> Result<impl futures_lite::stream::Stream<Item = T>, Error>
50 where F: FnMut(&[u8]) -> Option<T>
51{
52 let mut inp = interface.bulk_in_queue(BULK_IN);
53
54 while inp.pending() < buffers {
56 inp.submit(RequestBuffer::new(buf_size));
57 }
58
59 let stream = futures_lite::stream::poll_fn(move |ctx| {
60 inp.poll_next(ctx).map(|out| -> Option<_> {
61 match out.into_result() {
62 Ok(data) => {
63 let res = map(data.as_slice());
64 if res.is_some() {
65 inp.submit(RequestBuffer::reuse(data, buf_size));
66 } else {
67 trace!("cancel all IN queue, by predicate.");
68 inp.cancel_all();
69 }
70 res
71 },
72 Err(err) => {
73 trace!("cancel all IN queue, by err: {err}.");
74 inp.cancel_all();
75 None
76 },
77 }
78 })
79 });
80
81 Ok(stream)
82}
83
84
85#[cfg_attr(feature = "tracing", tracing::instrument)]
86pub fn read_once(device: DeviceInfo) -> Result<(String, Interface), Error> {
87 let mode = device.mode();
88 if !matches!(mode, Mode::Data) {
89 return Err(Error::WrongState(mode));
90 }
91
92
93 let device = device.open()?;
94 let inter = device.claim_interface(1)?;
95
96 let stream = read_while_map(&inter, 256, 2, |data| {
97 match std::str::from_utf8(data) {
98 Ok(s) => {
99 if s.trim().is_empty() {
100 None
101 } else {
102 Some(s.to_owned())
103 }
104 },
105 Err(err) => {
106 error!("{err:?}");
107 None
108 },
109 }
110 })?.fold(String::new(), |acc, ref s| acc + s);
111 let s = block_on(stream);
112 Ok((s, inter))
113}
114
115
116#[cfg_attr(feature = "tracing", tracing::instrument)]
117pub async fn redirect_to_stdout(device: &mut Device) -> Result<(), Error> {
118 let mode = device.mode();
119 if !matches!(mode, Mode::Data) {
120 return Err(Error::WrongState(mode));
121 }
122
123 device.open()?;
124 redirect_interface_to_stdout(device.interface_mut()?).await?;
125
126 Ok(())
127}
128
129#[cfg_attr(feature = "tracing", tracing::instrument)]
130pub async fn redirect_interface_to_stdout(interface: &mut crate::interface::Interface) -> Result<(), Error> {
131 match interface {
132 crate::interface::Interface::Usb(interface) => {
133 let mut stdout = std::io::stdout();
134 let to_stdout = move |data: &[u8]| stdout.write_all(data).inspect_err(|err| error!("{err}")).ok();
135 let stream = read_while_map(&interface.inner, 256, 2, to_stdout)?;
136 if stream.last().await.is_some() {
137 trace!("Read stream complete.");
138 }
139 },
140 crate::interface::Interface::Serial(interface) => {
141 interface.open()?;
142 redirect_serial_to_stdout(interface).await?;
143 },
144 }
145 Ok(())
146}