Skip to main content

marlin_binary_transfer/adapters/
tokio.rs

1//! Tokio async adapter — drives the sans-I/O core over an
2//! [`AsyncRead`] + [`AsyncWrite`] transport.
3//!
4//! Mirrors [`adapters::blocking`](crate::adapters::blocking) one-for-one,
5//! returning the same [`UploadStats`] / [`UploadError`] types.
6//!
7//! # Read timeout
8//!
9//! Every inbound read is wrapped in
10//! [`tokio::time::timeout`](https://docs.rs/tokio/latest/tokio/time/fn.timeout.html)
11//! keyed to [`Session::response_timeout`](crate::session::Session::response_timeout).
12//! On `Elapsed` the adapter falls through to `tick()` so retransmits and the
13//! total-budget timeout still fire on a quiet transport. Without this wrapping
14//! a stalled peer would leave the future `Pending` forever.
15//!
16//! # Lifecycle handled for you
17//!
18//! 1. Send `M28 B1` to enter binary mode.
19//! 2. SYNC handshake.
20//! 3. QUERY → compression negotiation.
21//! 4. OPEN, WRITE × N, CLOSE.
22//! 5. Control CLOSE (proto=0, type=2) so the device exits binary mode.
23//!
24//! [`AsyncRead`]: tokio::io::AsyncRead
25//! [`AsyncWrite`]: tokio::io::AsyncWrite
26
27use std::time::{Duration, Instant};
28
29use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
30
31use crate::adapters::common::resolve_chunk_size;
32use crate::file_transfer::{Compression, FileEvent, FileTransfer};
33use crate::session::Session;
34
35pub use crate::adapters::common::{UploadError, UploadOptions, UploadStats};
36
37/// Async equivalent of [`adapters::blocking::upload`](crate::adapters::blocking::upload).
38pub async fn upload<T, S>(
39    transport: &mut T,
40    src: &mut S,
41    options: UploadOptions,
42) -> Result<UploadStats, UploadError>
43where
44    T: AsyncRead + AsyncWrite + Unpin,
45    S: AsyncRead + Unpin,
46{
47    transport.write_all(b"M28B1\n").await?;
48
49    let mut session = Session::new();
50    session.connect(Instant::now());
51    drive_until_synced(transport, &mut session).await?;
52
53    // Capture the device-advertised block size before FileTransfer takes
54    // the session mutably.
55    let device_max = session.max_block_size().unwrap_or(0);
56
57    let mut ft = FileTransfer::new(&mut session);
58    ft.query(options.compression.clone(), Instant::now());
59    let negotiated = drive_until_negotiated(transport, &mut ft).await?;
60
61    ft.open(&options.dest_filename, options.dummy, Instant::now());
62    drive_until(transport, &mut ft, |e| matches!(e, FileEvent::Opened)).await?;
63
64    let mut stats = UploadStats {
65        compression: negotiated.clone(),
66        ..UploadStats::default()
67    };
68
69    let mut source_bytes = Vec::new();
70    src.read_to_end(&mut source_bytes).await?;
71    stats.source_bytes = source_bytes.len() as u64;
72
73    let payload: Vec<u8> = match &negotiated {
74        Compression::None => source_bytes,
75        Compression::Heatshrink { window, lookahead } => {
76            #[cfg(feature = "heatshrink")]
77            {
78                crate::compression::compress(&source_bytes, *window, *lookahead)?
79            }
80            #[cfg(not(feature = "heatshrink"))]
81            {
82                let _ = (window, lookahead);
83                return Err(UploadError::CompressionFeatureDisabled);
84            }
85        }
86        Compression::Auto => unreachable!("FileTransfer resolves Auto during query"),
87    };
88
89    let chunk_size = resolve_chunk_size(options.chunk_size, device_max);
90
91    for chunk in payload.chunks(chunk_size) {
92        ft.write(chunk, Instant::now());
93        drive_until(transport, &mut ft, |e| matches!(e, FileEvent::WriteAcked)).await?;
94        stats.bytes_sent += chunk.len() as u64;
95        stats.chunks_sent += 1;
96    }
97
98    ft.close(Instant::now());
99    drive_until(transport, &mut ft, |e| matches!(e, FileEvent::Closed)).await?;
100
101    // Drop FT so we can drive the session directly through the control
102    // CLOSE (proto=0, type=2) that exits binary mode. Without this, the
103    // printer stays in binary mode and ignores subsequent ASCII g-code.
104    drop(ft);
105    session.send(0, 2, &[], Instant::now());
106    drive_session_until_idle(transport, &mut session).await?;
107
108    Ok(stats)
109}
110
111async fn drive_session_until_idle<T>(
112    transport: &mut T,
113    session: &mut Session,
114) -> Result<(), UploadError>
115where
116    T: AsyncRead + AsyncWrite + Unpin,
117{
118    use crate::file_transfer::FileError;
119    use crate::session::Event;
120    let mut buf = [0u8; 1024];
121    for _ in 0..200 {
122        while let Some(out) = session.poll_outbound() {
123            transport.write_all(&out).await?;
124        }
125        let n = read_with_timeout(transport, &mut buf, session.response_timeout()).await?;
126        if n > 0 {
127            session.feed(&buf[..n], Instant::now());
128        }
129        while let Some(evt) = session.poll_event() {
130            match evt {
131                Event::Ack(_) => return Ok(()),
132                Event::FatalError => {
133                    return Err(UploadError::Transfer(FileError::SessionFatalError));
134                }
135                Event::Timeout { .. } => {
136                    return Err(UploadError::Transfer(FileError::SessionTimeout));
137                }
138                Event::OutOfSync { expected, got } => {
139                    return Err(UploadError::Transfer(FileError::SessionOutOfSync {
140                        expected,
141                        got,
142                    }));
143                }
144                _ => {}
145            }
146        }
147        session.tick(Instant::now());
148    }
149    Err(UploadError::Stalled("control close not acked"))
150}
151
152async fn drive_until_synced<T>(transport: &mut T, session: &mut Session) -> Result<(), UploadError>
153where
154    T: AsyncRead + AsyncWrite + Unpin,
155{
156    use crate::file_transfer::FileError;
157    use crate::session::Event;
158    let mut buf = [0u8; 1024];
159    for _ in 0..200 {
160        while let Some(out) = session.poll_outbound() {
161            transport.write_all(&out).await?;
162        }
163        let n = read_with_timeout(transport, &mut buf, session.response_timeout()).await?;
164        if n > 0 {
165            session.feed(&buf[..n], Instant::now());
166        }
167        while let Some(evt) = session.poll_event() {
168            match evt {
169                Event::Synced { .. } => return Ok(()),
170                Event::FatalError => {
171                    return Err(UploadError::Transfer(FileError::SessionFatalError));
172                }
173                Event::Timeout { .. } => {
174                    return Err(UploadError::Transfer(FileError::SessionTimeout));
175                }
176                Event::OutOfSync { expected, got } => {
177                    return Err(UploadError::Transfer(FileError::SessionOutOfSync {
178                        expected,
179                        got,
180                    }));
181                }
182                _ => {}
183            }
184        }
185        session.tick(Instant::now());
186    }
187    Err(UploadError::HandshakeFailed)
188}
189
190async fn drive_until_negotiated<T>(
191    transport: &mut T,
192    ft: &mut FileTransfer<'_>,
193) -> Result<Compression, UploadError>
194where
195    T: AsyncRead + AsyncWrite + Unpin,
196{
197    let mut buf = [0u8; 1024];
198    for _ in 0..200 {
199        while let Some(out) = ft.poll_outbound() {
200            transport.write_all(&out).await?;
201        }
202        let n = read_with_timeout(transport, &mut buf, ft.response_timeout()).await?;
203        if n > 0 {
204            ft.feed(&buf[..n], Instant::now());
205        }
206        while let Some(evt) = ft.poll() {
207            match evt {
208                FileEvent::Negotiated { compression, .. } => return Ok(compression),
209                FileEvent::Failed(err) => return Err(UploadError::Transfer(err)),
210                _ => {}
211            }
212        }
213        ft.tick(Instant::now());
214    }
215    Err(UploadError::Stalled("negotiation did not complete"))
216}
217
218async fn drive_until<T, F>(
219    transport: &mut T,
220    ft: &mut FileTransfer<'_>,
221    pred: F,
222) -> Result<(), UploadError>
223where
224    T: AsyncRead + AsyncWrite + Unpin,
225    F: Fn(&FileEvent) -> bool,
226{
227    let mut buf = [0u8; 1024];
228    for _ in 0..200 {
229        while let Some(out) = ft.poll_outbound() {
230            transport.write_all(&out).await?;
231        }
232        let n = read_with_timeout(transport, &mut buf, ft.response_timeout()).await?;
233        if n > 0 {
234            ft.feed(&buf[..n], Instant::now());
235        }
236        while let Some(evt) = ft.poll() {
237            if let FileEvent::Failed(err) = &evt {
238                return Err(UploadError::Transfer(err.clone()));
239            }
240            if pred(&evt) {
241                return Ok(());
242            }
243        }
244        ft.tick(Instant::now());
245    }
246    Err(UploadError::Stalled("event did not arrive in time"))
247}
248
249/// Read with a per-call timeout so the surrounding drive loop can fall
250/// through to `tick()` and fire retransmits even when the transport
251/// stays quiet. Returns 0 on timeout (treated as "no bytes this turn").
252async fn read_with_timeout<T>(
253    transport: &mut T,
254    buf: &mut [u8],
255    timeout: Duration,
256) -> Result<usize, UploadError>
257where
258    T: AsyncRead + Unpin,
259{
260    match tokio::time::timeout(timeout, transport.read(buf)).await {
261        Ok(r) => r.map_err(UploadError::Io),
262        Err(_) => Ok(0),
263    }
264}