Skip to main content

marlin_binary_transfer/adapters/
blocking.rs

1//! Synchronous adapter — drives the sans-I/O core over a [`Read`] +
2//! [`Write`] transport.
3//!
4//! ```no_run
5//! use std::io::{Read, Write};
6//! use marlin_binary_transfer::adapters::blocking::{upload, UploadOptions};
7//! use marlin_binary_transfer::file_transfer::Compression;
8//!
9//! # fn run<T: Read + Write>(mut port: T) -> Result<(), Box<dyn std::error::Error>> {
10//! let opts = UploadOptions {
11//!     dest_filename: "model.gco".into(),
12//!     compression: Compression::Auto,
13//!     ..UploadOptions::default()
14//! };
15//! let stats = upload(&mut port, std::fs::File::open("model.gco")?, opts)?;
16//! println!("Uploaded {} bytes in {} chunks", stats.bytes_sent, stats.chunks_sent);
17//! # Ok(()) }
18//! ```
19//!
20//! Any `Read + Write` works. With the `serial` feature enabled,
21//! [`serialport::SerialPort`](https://docs.rs/serialport/latest/serialport/trait.SerialPort.html)
22//! satisfies both traits directly; see
23//! [`adapters::serialport`](crate::adapters::serialport) for an `open` helper.
24//!
25//! # Transport requirements
26//!
27//! `transport.read(..)` must return [`std::io::ErrorKind::TimedOut`] within a
28//! bounded interval when no data is available, so [`tick()`](crate::session::Session::tick)
29//! can fire retransmits. A blocking read with no timeout would deadlock the
30//! loop under packet loss. The
31//! [`adapters::serialport::open`](crate::adapters::serialport::open) helper
32//! configures a 100 ms read timeout out of the box; if you're plugging in a
33//! different transport (TCP-to-serial bridge, USB CDC via `rusb`, in-memory
34//! pipe) make sure it has the equivalent behaviour.
35//!
36//! # Lifecycle handled for you
37//!
38//! 1. Send `M28 B1` to enter binary mode.
39//! 2. SYNC handshake.
40//! 3. QUERY → compression negotiation.
41//! 4. OPEN, WRITE × N, CLOSE.
42//! 5. Control CLOSE (proto=0, type=2) so the device exits binary mode and
43//!    the same serial session can accept ASCII g-code again afterwards.
44
45use std::io::{Read, Write};
46use std::time::Instant;
47
48use crate::adapters::common::resolve_chunk_size;
49use crate::file_transfer::{Compression, FileEvent, FileTransfer};
50use crate::session::Session;
51
52pub use crate::adapters::common::{
53    Progress, ProgressCallback, UploadError, UploadOptions, UploadStats,
54};
55
56/// Perform a complete upload: SYNC, QUERY, OPEN, WRITE×N, CLOSE.
57///
58/// On success, `transport` is left synced and idle.
59pub fn upload<T: Read + Write + ?Sized, S: Read>(
60    transport: &mut T,
61    mut src: S,
62    mut options: UploadOptions,
63) -> Result<UploadStats, UploadError> {
64    // Send the binary-mode trigger as plain ASCII first.
65    transport.write_all(b"M28B1\n")?;
66
67    let mut session = Session::new();
68    let now = Instant::now();
69    session.connect(now);
70
71    drive_session_until_synced(transport, &mut session)?;
72
73    // Capture the device-advertised block size before the FileTransfer
74    // borrow takes the session mutably.
75    let device_max = session.max_block_size().unwrap_or(0);
76
77    let mut ft = FileTransfer::new(&mut session);
78    ft.query(options.compression.clone(), Instant::now());
79    let negotiated = drive_until_negotiated(transport, &mut ft)?;
80
81    ft.open(&options.dest_filename, options.dummy, Instant::now());
82    drive_until_event(transport, &mut ft, |e| matches!(e, FileEvent::Opened))?;
83
84    let mut stats = UploadStats {
85        compression: negotiated.clone(),
86        ..UploadStats::default()
87    };
88
89    let chunk_size = resolve_chunk_size(options.chunk_size, device_max);
90    // Read whole source into memory once, then either compress or chunk
91    // through it. Mirrors the Python ref's behaviour and keeps the chunk
92    // boundary deterministic.
93    let mut source_bytes = Vec::new();
94    src.read_to_end(&mut source_bytes)?;
95    stats.source_bytes = source_bytes.len() as u64;
96
97    let payload: Vec<u8> = match &negotiated {
98        Compression::None => source_bytes,
99        Compression::Heatshrink { window, lookahead } => {
100            #[cfg(feature = "heatshrink")]
101            {
102                crate::compression::compress(&source_bytes, *window, *lookahead)?
103            }
104            #[cfg(not(feature = "heatshrink"))]
105            {
106                let _ = (window, lookahead);
107                return Err(UploadError::CompressionFeatureDisabled);
108            }
109        }
110        Compression::Auto => unreachable!("FileTransfer resolves Auto during query"),
111    };
112
113    for chunk in payload.chunks(chunk_size) {
114        ft.write(chunk, Instant::now());
115        drive_until_event(transport, &mut ft, |e| matches!(e, FileEvent::WriteAcked))?;
116        stats.bytes_sent += chunk.len() as u64;
117        stats.chunks_sent += 1;
118        if let Some(cb) = options.progress.as_mut() {
119            cb(Progress {
120                bytes_sent: stats.bytes_sent,
121                chunks_sent: stats.chunks_sent,
122                source_bytes: stats.source_bytes,
123            });
124        }
125    }
126
127    ft.close(Instant::now());
128    drive_until_event(transport, &mut ft, |e| matches!(e, FileEvent::Closed))?;
129
130    // Drop the FT borrow so we can talk to the session directly to send
131    // the control CLOSE (proto=0, type=2) that drops the device back to
132    // ASCII mode. Without this, the printer remains in binary mode and
133    // subsequent ASCII g-code on the same serial session is ignored.
134    drop(ft);
135    session.send(0, 2, &[], Instant::now());
136    drive_session_until_idle(transport, &mut session)?;
137
138    Ok(stats)
139}
140
141fn drive_session_until_idle<T: Read + Write + ?Sized>(
142    transport: &mut T,
143    session: &mut Session,
144) -> Result<(), UploadError> {
145    use crate::file_transfer::FileError;
146    use crate::session::Event;
147    let mut buf = [0u8; 1024];
148    for _ in 0..200 {
149        while let Some(out) = session.poll_outbound() {
150            transport.write_all(&out)?;
151        }
152        let n = match transport.read(&mut buf) {
153            Ok(n) => n,
154            Err(e) if e.kind() == std::io::ErrorKind::TimedOut => 0,
155            Err(e) => return Err(UploadError::Io(e)),
156        };
157        if n > 0 {
158            session.feed(&buf[..n], Instant::now());
159        }
160        while let Some(evt) = session.poll_event() {
161            match evt {
162                Event::Ack(_) => return Ok(()),
163                Event::FatalError => {
164                    return Err(UploadError::Transfer(FileError::SessionFatalError));
165                }
166                Event::Timeout { .. } => {
167                    return Err(UploadError::Transfer(FileError::SessionTimeout));
168                }
169                Event::OutOfSync { expected, got } => {
170                    return Err(UploadError::Transfer(FileError::SessionOutOfSync {
171                        expected,
172                        got,
173                    }));
174                }
175                _ => {}
176            }
177        }
178        session.tick(Instant::now());
179    }
180    Err(UploadError::Stalled("control close not acked"))
181}
182
183fn drive_session_until_synced<T: Read + Write + ?Sized>(
184    transport: &mut T,
185    session: &mut Session,
186) -> Result<(), UploadError> {
187    use crate::file_transfer::FileError;
188    use crate::session::Event;
189    let mut buf = [0u8; 1024];
190    for _ in 0..200 {
191        while let Some(out) = session.poll_outbound() {
192            transport.write_all(&out)?;
193        }
194        let n = match transport.read(&mut buf) {
195            Ok(n) => n,
196            Err(e) if e.kind() == std::io::ErrorKind::TimedOut => 0,
197            Err(e) => return Err(UploadError::Io(e)),
198        };
199        if n > 0 {
200            session.feed(&buf[..n], Instant::now());
201        }
202        while let Some(evt) = session.poll_event() {
203            match evt {
204                Event::Synced { .. } => return Ok(()),
205                Event::FatalError => {
206                    return Err(UploadError::Transfer(FileError::SessionFatalError));
207                }
208                Event::Timeout { .. } => {
209                    return Err(UploadError::Transfer(FileError::SessionTimeout));
210                }
211                Event::OutOfSync { expected, got } => {
212                    return Err(UploadError::Transfer(FileError::SessionOutOfSync {
213                        expected,
214                        got,
215                    }));
216                }
217                _ => {}
218            }
219        }
220        session.tick(Instant::now());
221    }
222    Err(UploadError::HandshakeFailed)
223}
224
225fn drive_until_negotiated<T: Read + Write + ?Sized>(
226    transport: &mut T,
227    ft: &mut FileTransfer<'_>,
228) -> Result<Compression, UploadError> {
229    let mut buf = [0u8; 1024];
230    for _ in 0..200 {
231        while let Some(out) = ft.poll_outbound() {
232            transport.write_all(&out)?;
233        }
234        let n = match transport.read(&mut buf) {
235            Ok(n) => n,
236            Err(e) if e.kind() == std::io::ErrorKind::TimedOut => 0,
237            Err(e) => return Err(UploadError::Io(e)),
238        };
239        if n > 0 {
240            ft.feed(&buf[..n], Instant::now());
241        }
242        while let Some(evt) = ft.poll() {
243            match evt {
244                FileEvent::Negotiated { compression, .. } => return Ok(compression),
245                FileEvent::Failed(err) => return Err(UploadError::Transfer(err)),
246                _ => {}
247            }
248        }
249        ft.tick(Instant::now());
250    }
251    Err(UploadError::Stalled("negotiation did not complete"))
252}
253
254fn drive_until_event<T: Read + Write + ?Sized, F: Fn(&FileEvent) -> bool>(
255    transport: &mut T,
256    ft: &mut FileTransfer<'_>,
257    pred: F,
258) -> Result<(), UploadError> {
259    let mut buf = [0u8; 1024];
260    for _ in 0..200 {
261        while let Some(out) = ft.poll_outbound() {
262            transport.write_all(&out)?;
263        }
264        let n = match transport.read(&mut buf) {
265            Ok(n) => n,
266            Err(e) if e.kind() == std::io::ErrorKind::TimedOut => 0,
267            Err(e) => return Err(UploadError::Io(e)),
268        };
269        if n > 0 {
270            ft.feed(&buf[..n], Instant::now());
271        }
272        while let Some(evt) = ft.poll() {
273            if let FileEvent::Failed(err) = &evt {
274                return Err(UploadError::Transfer(err.clone()));
275            }
276            if pred(&evt) {
277                return Ok(());
278            }
279        }
280        ft.tick(Instant::now());
281    }
282    Err(UploadError::Stalled("event did not arrive in time"))
283}