Skip to main content

marlin_binary_transfer/adapters/
tokio.rs

1//! Tokio async adapter — drives the sans-I/O core over an
2//! [`AsyncRead`] + [`AsyncWrite`] transport.
3//!
4//! Mirrors [`adapters::blocking`](crate::adapters::blocking) one-for-one,
5//! returning the same [`UploadStats`] / [`UploadError`] types.
6//!
7//! # Read timeout
8//!
9//! Every inbound read is wrapped in
10//! [`tokio::time::timeout`](https://docs.rs/tokio/latest/tokio/time/fn.timeout.html)
11//! keyed to [`Session::response_timeout`](crate::session::Session::response_timeout).
12//! On `Elapsed` the adapter falls through to `tick()` so retransmits and the
13//! total-budget timeout still fire on a quiet transport. Without this wrapping
14//! a stalled peer would leave the future `Pending` forever.
15//!
16//! # Lifecycle handled for you
17//!
18//! 1. Send `M28 B1` to enter binary mode.
19//! 2. SYNC handshake.
20//! 3. QUERY → compression negotiation.
21//! 4. OPEN, WRITE × N, CLOSE.
22//! 5. Control CLOSE (proto=0, type=2) so the device exits binary mode.
23//!
24//! [`AsyncRead`]: tokio::io::AsyncRead
25//! [`AsyncWrite`]: tokio::io::AsyncWrite
26
27use std::time::{Duration, Instant};
28
29use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
30
31use crate::adapters::common::resolve_chunk_size;
32use crate::file_transfer::{Compression, FileEvent, FileTransfer};
33use crate::session::Session;
34
35pub use crate::adapters::common::{
36    Progress, ProgressCallback, UploadError, UploadOptions, UploadStats,
37};
38
39/// Async equivalent of [`adapters::blocking::upload`](crate::adapters::blocking::upload).
40pub async fn upload<T, S>(
41    transport: &mut T,
42    src: &mut S,
43    mut options: UploadOptions,
44) -> Result<UploadStats, UploadError>
45where
46    T: AsyncRead + AsyncWrite + Unpin,
47    S: AsyncRead + Unpin,
48{
49    transport.write_all(b"M28B1\n").await?;
50
51    let mut session = Session::new();
52    session.connect(Instant::now());
53    drive_until_synced(transport, &mut session).await?;
54
55    // Capture the device-advertised block size before FileTransfer takes
56    // the session mutably.
57    let device_max = session.max_block_size().unwrap_or(0);
58
59    let mut ft = FileTransfer::new(&mut session);
60    ft.query(options.compression.clone(), Instant::now());
61    let negotiated = drive_until_negotiated(transport, &mut ft).await?;
62
63    ft.open(&options.dest_filename, options.dummy, Instant::now());
64    drive_until(transport, &mut ft, |e| matches!(e, FileEvent::Opened)).await?;
65
66    let mut stats = UploadStats {
67        compression: negotiated.clone(),
68        ..UploadStats::default()
69    };
70
71    let mut source_bytes = Vec::new();
72    src.read_to_end(&mut source_bytes).await?;
73    stats.source_bytes = source_bytes.len() as u64;
74
75    let payload: Vec<u8> = match &negotiated {
76        Compression::None => source_bytes,
77        Compression::Heatshrink { window, lookahead } => {
78            #[cfg(feature = "heatshrink")]
79            {
80                crate::compression::compress(&source_bytes, *window, *lookahead)?
81            }
82            #[cfg(not(feature = "heatshrink"))]
83            {
84                let _ = (window, lookahead);
85                return Err(UploadError::CompressionFeatureDisabled);
86            }
87        }
88        Compression::Auto => unreachable!("FileTransfer resolves Auto during query"),
89    };
90
91    let chunk_size = resolve_chunk_size(options.chunk_size, device_max);
92
93    for chunk in payload.chunks(chunk_size) {
94        ft.write(chunk, Instant::now());
95        drive_until(transport, &mut ft, |e| matches!(e, FileEvent::WriteAcked)).await?;
96        stats.bytes_sent += chunk.len() as u64;
97        stats.chunks_sent += 1;
98        if let Some(cb) = options.progress.as_mut() {
99            cb(Progress {
100                bytes_sent: stats.bytes_sent,
101                chunks_sent: stats.chunks_sent,
102                source_bytes: stats.source_bytes,
103            });
104        }
105    }
106
107    ft.close(Instant::now());
108    drive_until(transport, &mut ft, |e| matches!(e, FileEvent::Closed)).await?;
109
110    // Drop FT so we can drive the session directly through the control
111    // CLOSE (proto=0, type=2) that exits binary mode. Without this, the
112    // printer stays in binary mode and ignores subsequent ASCII g-code.
113    drop(ft);
114    session.send(0, 2, &[], Instant::now());
115    drive_session_until_idle(transport, &mut session).await?;
116
117    Ok(stats)
118}
119
120async fn drive_session_until_idle<T>(
121    transport: &mut T,
122    session: &mut Session,
123) -> Result<(), UploadError>
124where
125    T: AsyncRead + AsyncWrite + Unpin,
126{
127    use crate::file_transfer::FileError;
128    use crate::session::Event;
129    let mut buf = [0u8; 1024];
130    for _ in 0..200 {
131        while let Some(out) = session.poll_outbound() {
132            transport.write_all(&out).await?;
133        }
134        let n = read_with_timeout(transport, &mut buf, session.response_timeout()).await?;
135        if n > 0 {
136            session.feed(&buf[..n], Instant::now());
137        }
138        while let Some(evt) = session.poll_event() {
139            match evt {
140                Event::Ack(_) => return Ok(()),
141                Event::FatalError => {
142                    return Err(UploadError::Transfer(FileError::SessionFatalError));
143                }
144                Event::Timeout { .. } => {
145                    return Err(UploadError::Transfer(FileError::SessionTimeout));
146                }
147                Event::OutOfSync { expected, got } => {
148                    return Err(UploadError::Transfer(FileError::SessionOutOfSync {
149                        expected,
150                        got,
151                    }));
152                }
153                _ => {}
154            }
155        }
156        session.tick(Instant::now());
157    }
158    Err(UploadError::Stalled("control close not acked"))
159}
160
161async fn drive_until_synced<T>(transport: &mut T, session: &mut Session) -> Result<(), UploadError>
162where
163    T: AsyncRead + AsyncWrite + Unpin,
164{
165    use crate::file_transfer::FileError;
166    use crate::session::Event;
167    let mut buf = [0u8; 1024];
168    for _ in 0..200 {
169        while let Some(out) = session.poll_outbound() {
170            transport.write_all(&out).await?;
171        }
172        let n = read_with_timeout(transport, &mut buf, session.response_timeout()).await?;
173        if n > 0 {
174            session.feed(&buf[..n], Instant::now());
175        }
176        while let Some(evt) = session.poll_event() {
177            match evt {
178                Event::Synced { .. } => return Ok(()),
179                Event::FatalError => {
180                    return Err(UploadError::Transfer(FileError::SessionFatalError));
181                }
182                Event::Timeout { .. } => {
183                    return Err(UploadError::Transfer(FileError::SessionTimeout));
184                }
185                Event::OutOfSync { expected, got } => {
186                    return Err(UploadError::Transfer(FileError::SessionOutOfSync {
187                        expected,
188                        got,
189                    }));
190                }
191                _ => {}
192            }
193        }
194        session.tick(Instant::now());
195    }
196    Err(UploadError::HandshakeFailed)
197}
198
199async fn drive_until_negotiated<T>(
200    transport: &mut T,
201    ft: &mut FileTransfer<'_>,
202) -> Result<Compression, UploadError>
203where
204    T: AsyncRead + AsyncWrite + Unpin,
205{
206    let mut buf = [0u8; 1024];
207    for _ in 0..200 {
208        while let Some(out) = ft.poll_outbound() {
209            transport.write_all(&out).await?;
210        }
211        let n = read_with_timeout(transport, &mut buf, ft.response_timeout()).await?;
212        if n > 0 {
213            ft.feed(&buf[..n], Instant::now());
214        }
215        while let Some(evt) = ft.poll() {
216            match evt {
217                FileEvent::Negotiated { compression, .. } => return Ok(compression),
218                FileEvent::Failed(err) => return Err(UploadError::Transfer(err)),
219                _ => {}
220            }
221        }
222        ft.tick(Instant::now());
223    }
224    Err(UploadError::Stalled("negotiation did not complete"))
225}
226
227async fn drive_until<T, F>(
228    transport: &mut T,
229    ft: &mut FileTransfer<'_>,
230    pred: F,
231) -> Result<(), UploadError>
232where
233    T: AsyncRead + AsyncWrite + Unpin,
234    F: Fn(&FileEvent) -> bool,
235{
236    let mut buf = [0u8; 1024];
237    for _ in 0..200 {
238        while let Some(out) = ft.poll_outbound() {
239            transport.write_all(&out).await?;
240        }
241        let n = read_with_timeout(transport, &mut buf, ft.response_timeout()).await?;
242        if n > 0 {
243            ft.feed(&buf[..n], Instant::now());
244        }
245        while let Some(evt) = ft.poll() {
246            if let FileEvent::Failed(err) = &evt {
247                return Err(UploadError::Transfer(err.clone()));
248            }
249            if pred(&evt) {
250                return Ok(());
251            }
252        }
253        ft.tick(Instant::now());
254    }
255    Err(UploadError::Stalled("event did not arrive in time"))
256}
257
258/// Read with a per-call timeout so the surrounding drive loop can fall
259/// through to `tick()` and fire retransmits even when the transport
260/// stays quiet. Returns 0 on timeout (treated as "no bytes this turn").
261async fn read_with_timeout<T>(
262    transport: &mut T,
263    buf: &mut [u8],
264    timeout: Duration,
265) -> Result<usize, UploadError>
266where
267    T: AsyncRead + Unpin,
268{
269    match tokio::time::timeout(timeout, transport.read(buf)).await {
270        Ok(r) => r.map_err(UploadError::Io),
271        Err(_) => Ok(0),
272    }
273}