Skip to main content

marlin_binary_transfer/adapters/
blocking.rs

1//! Synchronous adapter — drives the sans-I/O core over a [`Read`] +
2//! [`Write`] transport.
3//!
4//! ```no_run
5//! use std::io::{Read, Write};
6//! use marlin_binary_transfer::adapters::blocking::{upload, UploadOptions};
7//! use marlin_binary_transfer::file_transfer::Compression;
8//!
9//! # fn run<T: Read + Write>(mut port: T) -> Result<(), Box<dyn std::error::Error>> {
10//! let opts = UploadOptions {
11//!     dest_filename: "model.gco".into(),
12//!     compression: Compression::Auto,
13//!     ..UploadOptions::default()
14//! };
15//! let stats = upload(&mut port, std::fs::File::open("model.gco")?, opts)?;
16//! println!("Uploaded {} bytes in {} chunks", stats.bytes_sent, stats.chunks_sent);
17//! # Ok(()) }
18//! ```
19//!
20//! Any `Read + Write` works. With the `serial` feature enabled,
21//! [`serialport::SerialPort`](https://docs.rs/serialport/latest/serialport/trait.SerialPort.html)
22//! satisfies both traits directly; see
23//! [`adapters::serialport`](crate::adapters::serialport) for an `open` helper.
24//!
25//! # Transport requirements
26//!
27//! `transport.read(..)` must return [`std::io::ErrorKind::TimedOut`] within a
28//! bounded interval when no data is available, so [`tick()`](crate::session::Session::tick)
29//! can fire retransmits. A blocking read with no timeout would deadlock the
30//! loop under packet loss. The
31//! [`adapters::serialport::open`](crate::adapters::serialport::open) helper
32//! configures a 100 ms read timeout out of the box; if you're plugging in a
33//! different transport (TCP-to-serial bridge, USB CDC via `rusb`, in-memory
34//! pipe) make sure it has the equivalent behaviour.
35//!
36//! # Lifecycle handled for you
37//!
38//! 1. Send `M28 B1` to enter binary mode.
39//! 2. SYNC handshake.
40//! 3. QUERY → compression negotiation.
41//! 4. OPEN, WRITE × N, CLOSE.
42//! 5. Control CLOSE (proto=0, type=2) so the device exits binary mode and
43//!    the same serial session can accept ASCII g-code again afterwards.
44
45use std::io::{Read, Write};
46use std::time::Instant;
47
48use crate::adapters::common::resolve_chunk_size;
49use crate::file_transfer::{Compression, FileEvent, FileTransfer};
50use crate::session::Session;
51
52pub use crate::adapters::common::{UploadError, UploadOptions, UploadStats};
53
54/// Perform a complete upload: SYNC, QUERY, OPEN, WRITE×N, CLOSE.
55///
56/// On success, `transport` is left synced and idle.
57pub fn upload<T: Read + Write + ?Sized, S: Read>(
58    transport: &mut T,
59    mut src: S,
60    options: UploadOptions,
61) -> Result<UploadStats, UploadError> {
62    // Send the binary-mode trigger as plain ASCII first.
63    transport.write_all(b"M28B1\n")?;
64
65    let mut session = Session::new();
66    let now = Instant::now();
67    session.connect(now);
68
69    drive_session_until_synced(transport, &mut session)?;
70
71    // Capture the device-advertised block size before the FileTransfer
72    // borrow takes the session mutably.
73    let device_max = session.max_block_size().unwrap_or(0);
74
75    let mut ft = FileTransfer::new(&mut session);
76    ft.query(options.compression.clone(), Instant::now());
77    let negotiated = drive_until_negotiated(transport, &mut ft)?;
78
79    ft.open(&options.dest_filename, options.dummy, Instant::now());
80    drive_until_event(transport, &mut ft, |e| matches!(e, FileEvent::Opened))?;
81
82    let mut stats = UploadStats {
83        compression: negotiated.clone(),
84        ..UploadStats::default()
85    };
86
87    let chunk_size = resolve_chunk_size(options.chunk_size, device_max);
88    // Read whole source into memory once, then either compress or chunk
89    // through it. Mirrors the Python ref's behaviour and keeps the chunk
90    // boundary deterministic.
91    let mut source_bytes = Vec::new();
92    src.read_to_end(&mut source_bytes)?;
93    stats.source_bytes = source_bytes.len() as u64;
94
95    let payload: Vec<u8> = match &negotiated {
96        Compression::None => source_bytes,
97        Compression::Heatshrink { window, lookahead } => {
98            #[cfg(feature = "heatshrink")]
99            {
100                crate::compression::compress(&source_bytes, *window, *lookahead)?
101            }
102            #[cfg(not(feature = "heatshrink"))]
103            {
104                let _ = (window, lookahead);
105                return Err(UploadError::CompressionFeatureDisabled);
106            }
107        }
108        Compression::Auto => unreachable!("FileTransfer resolves Auto during query"),
109    };
110
111    for chunk in payload.chunks(chunk_size) {
112        ft.write(chunk, Instant::now());
113        drive_until_event(transport, &mut ft, |e| matches!(e, FileEvent::WriteAcked))?;
114        stats.bytes_sent += chunk.len() as u64;
115        stats.chunks_sent += 1;
116    }
117
118    ft.close(Instant::now());
119    drive_until_event(transport, &mut ft, |e| matches!(e, FileEvent::Closed))?;
120
121    // Drop the FT borrow so we can talk to the session directly to send
122    // the control CLOSE (proto=0, type=2) that drops the device back to
123    // ASCII mode. Without this, the printer remains in binary mode and
124    // subsequent ASCII g-code on the same serial session is ignored.
125    drop(ft);
126    session.send(0, 2, &[], Instant::now());
127    drive_session_until_idle(transport, &mut session)?;
128
129    Ok(stats)
130}
131
132fn drive_session_until_idle<T: Read + Write + ?Sized>(
133    transport: &mut T,
134    session: &mut Session,
135) -> Result<(), UploadError> {
136    use crate::file_transfer::FileError;
137    use crate::session::Event;
138    let mut buf = [0u8; 1024];
139    for _ in 0..200 {
140        while let Some(out) = session.poll_outbound() {
141            transport.write_all(&out)?;
142        }
143        let n = match transport.read(&mut buf) {
144            Ok(n) => n,
145            Err(e) if e.kind() == std::io::ErrorKind::TimedOut => 0,
146            Err(e) => return Err(UploadError::Io(e)),
147        };
148        if n > 0 {
149            session.feed(&buf[..n], Instant::now());
150        }
151        while let Some(evt) = session.poll_event() {
152            match evt {
153                Event::Ack(_) => return Ok(()),
154                Event::FatalError => {
155                    return Err(UploadError::Transfer(FileError::SessionFatalError));
156                }
157                Event::Timeout { .. } => {
158                    return Err(UploadError::Transfer(FileError::SessionTimeout));
159                }
160                Event::OutOfSync { expected, got } => {
161                    return Err(UploadError::Transfer(FileError::SessionOutOfSync {
162                        expected,
163                        got,
164                    }));
165                }
166                _ => {}
167            }
168        }
169        session.tick(Instant::now());
170    }
171    Err(UploadError::Stalled("control close not acked"))
172}
173
174fn drive_session_until_synced<T: Read + Write + ?Sized>(
175    transport: &mut T,
176    session: &mut Session,
177) -> Result<(), UploadError> {
178    use crate::file_transfer::FileError;
179    use crate::session::Event;
180    let mut buf = [0u8; 1024];
181    for _ in 0..200 {
182        while let Some(out) = session.poll_outbound() {
183            transport.write_all(&out)?;
184        }
185        let n = match transport.read(&mut buf) {
186            Ok(n) => n,
187            Err(e) if e.kind() == std::io::ErrorKind::TimedOut => 0,
188            Err(e) => return Err(UploadError::Io(e)),
189        };
190        if n > 0 {
191            session.feed(&buf[..n], Instant::now());
192        }
193        while let Some(evt) = session.poll_event() {
194            match evt {
195                Event::Synced { .. } => return Ok(()),
196                Event::FatalError => {
197                    return Err(UploadError::Transfer(FileError::SessionFatalError));
198                }
199                Event::Timeout { .. } => {
200                    return Err(UploadError::Transfer(FileError::SessionTimeout));
201                }
202                Event::OutOfSync { expected, got } => {
203                    return Err(UploadError::Transfer(FileError::SessionOutOfSync {
204                        expected,
205                        got,
206                    }));
207                }
208                _ => {}
209            }
210        }
211        session.tick(Instant::now());
212    }
213    Err(UploadError::HandshakeFailed)
214}
215
216fn drive_until_negotiated<T: Read + Write + ?Sized>(
217    transport: &mut T,
218    ft: &mut FileTransfer<'_>,
219) -> Result<Compression, UploadError> {
220    let mut buf = [0u8; 1024];
221    for _ in 0..200 {
222        while let Some(out) = ft.poll_outbound() {
223            transport.write_all(&out)?;
224        }
225        let n = match transport.read(&mut buf) {
226            Ok(n) => n,
227            Err(e) if e.kind() == std::io::ErrorKind::TimedOut => 0,
228            Err(e) => return Err(UploadError::Io(e)),
229        };
230        if n > 0 {
231            ft.feed(&buf[..n], Instant::now());
232        }
233        while let Some(evt) = ft.poll() {
234            match evt {
235                FileEvent::Negotiated { compression, .. } => return Ok(compression),
236                FileEvent::Failed(err) => return Err(UploadError::Transfer(err)),
237                _ => {}
238            }
239        }
240        ft.tick(Instant::now());
241    }
242    Err(UploadError::Stalled("negotiation did not complete"))
243}
244
245fn drive_until_event<T: Read + Write + ?Sized, F: Fn(&FileEvent) -> bool>(
246    transport: &mut T,
247    ft: &mut FileTransfer<'_>,
248    pred: F,
249) -> Result<(), UploadError> {
250    let mut buf = [0u8; 1024];
251    for _ in 0..200 {
252        while let Some(out) = ft.poll_outbound() {
253            transport.write_all(&out)?;
254        }
255        let n = match transport.read(&mut buf) {
256            Ok(n) => n,
257            Err(e) if e.kind() == std::io::ErrorKind::TimedOut => 0,
258            Err(e) => return Err(UploadError::Io(e)),
259        };
260        if n > 0 {
261            ft.feed(&buf[..n], Instant::now());
262        }
263        while let Some(evt) = ft.poll() {
264            if let FileEvent::Failed(err) = &evt {
265                return Err(UploadError::Transfer(err.clone()));
266            }
267            if pred(&evt) {
268                return Ok(());
269            }
270        }
271        ft.tick(Instant::now());
272    }
273    Err(UploadError::Stalled("event did not arrive in time"))
274}