marlin_binary_transfer/file_transfer.rs
1//! Layer 3: file-transfer state machine.
2//!
3//! Drives the protocol-1 sub-protocol (QUERY / OPEN / WRITE / CLOSE / ABORT)
4//! on top of a [`Session`]. Translates the session's raw [`Event`]s into
5//! file-level [`FileEvent`]s so the caller doesn't have to parse the
6//! `PFT:*` tokens themselves.
7//!
8//! # Lifecycle
9//!
10//! ```text
11//! Idle ── query() ──► AwaitingQueryReply ──► Negotiated
12//! │
13//! Negotiated ── open() ──► AwaitingOpenReply ──┴──► Opened ── close() ──► Closed
14//! │ ── abort() ──► Aborted
15//! └── (busy/fail) ──► Failed
16//! Opened ── write() ──► AwaitingWriteAck ──► Opened
17//! ```
18//!
19//! Compression is negotiated inside [`FileTransfer::query`]: if the caller
20//! passed [`Compression::Auto`] and the device advertises `heatshrink`, the
21//! negotiated parameters are exposed via
22//! [`FileTransfer::negotiated_compression`]. The adapter modules are
23//! responsible for actually compressing chunk bytes before calling
24//! [`FileTransfer::write`].
25
26use std::time::Instant;
27
28use thiserror::Error;
29
30use crate::session::{Event, Session};
31
32/// Caller-visible compression preference.
33#[derive(Debug, Clone, PartialEq, Eq, Default)]
34pub enum Compression {
35 /// Send raw, uncompressed payloads.
36 #[default]
37 None,
38 /// Use heatshrink with the given window/lookahead. Both parameters
39 /// must match what the device advertises in its QUERY reply, or the
40 /// transfer will be corrupt on the device side.
41 Heatshrink {
42 /// log2 of the heatshrink sliding-window size.
43 window: u8,
44 /// log2 of the heatshrink lookahead size.
45 lookahead: u8,
46 },
47 /// Use whatever the device advertises during QUERY. Falls back to
48 /// [`Compression::None`] if the device only supports `none`.
49 Auto,
50}
51
52/// Things [`FileTransfer::poll`] emits as the protocol progresses.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub enum FileEvent {
55 /// QUERY completed. Carries the device's protocol version and the
56 /// effective compression spec the session will use.
57 Negotiated {
58 /// Device-reported file-transfer protocol version.
59 version: String,
60 /// Effective compression spec to use for this transfer.
61 compression: Compression,
62 },
63 /// OPEN succeeded; the device is ready to receive WRITE packets.
64 Opened,
65 /// A WRITE packet has been acknowledged. One per [`FileTransfer::write`]
66 /// call.
67 WriteAcked,
68 /// CLOSE succeeded; transfer is complete.
69 Closed,
70 /// ABORT acknowledged; transfer cancelled cleanly.
71 AbortAcked,
72 /// Something failed; the transfer cannot continue.
73 Failed(FileError),
74}
75
76/// Reasons a transfer failed.
77#[derive(Debug, Clone, PartialEq, Eq, Error)]
78pub enum FileError {
79 /// Device replied `PFT:busy` to OPEN — another transfer is in
80 /// progress on the device side.
81 #[error("device busy (PFT:busy)")]
82 OpenBusy,
83 /// Device replied `PFT:fail` to OPEN — likely SD card not present
84 /// or write-protected.
85 #[error("device refused open (PFT:fail)")]
86 OpenFail,
87 /// Device replied `PFT:ioerror` to CLOSE.
88 #[error("device storage I/O error (PFT:ioerror)")]
89 IoError,
90 /// Device replied `PFT:invalid` to CLOSE — no file was open.
91 #[error("no open file on device (PFT:invalid)")]
92 NoOpenFile,
93 /// The session emitted a [`Event::Timeout`] while we were waiting
94 /// for a reply.
95 #[error("session timed out waiting for reply")]
96 SessionTimeout,
97 /// The session emitted a [`Event::FatalError`].
98 #[error("session reported fatal error (fe)")]
99 SessionFatalError,
100 /// The session emitted [`Event::OutOfSync`].
101 #[error("session out of sync: expected {expected}, got {got}")]
102 SessionOutOfSync {
103 /// Sync number we expected.
104 expected: u8,
105 /// Sync number the device acked.
106 got: u8,
107 },
108 /// Caller requested `Compression::Heatshrink` but the device only
109 /// advertises `none`.
110 #[error("device does not support heatshrink compression")]
111 CompressionUnsupported,
112 /// Device sent the closing `ok<n>` for a CLOSE or ABORT without
113 /// first emitting the expected `PFT:*` preamble. Without that
114 /// preamble, the transfer can't be classified as success or
115 /// failure, so we surface the violation rather than silently
116 /// claiming completion.
117 #[error("device sent bare ok in {state} without expected {expected} preamble")]
118 ProtocolViolation {
119 /// Symbolic name of the FileTransfer state at the time of the ack.
120 state: &'static str,
121 /// The PFT preamble line(s) the protocol requires before the ack.
122 expected: &'static str,
123 },
124}
125
126/// Internal state-machine state.
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
128enum State {
129 Idle,
130 AwaitingQueryReply,
131 Negotiated,
132 AwaitingOpenReply,
133 Opened,
134 AwaitingWriteAck,
135 AwaitingCloseReply,
136 Closed,
137 AwaitingAbortReply,
138 Aborted,
139 Failed,
140}
141
142/// File-transfer state machine driving a [`Session`].
143#[derive(Debug)]
144pub struct FileTransfer<'a> {
145 session: &'a mut Session,
146 state: State,
147 requested_compression: Compression,
148 negotiated: Option<Compression>,
149 advertised_version: Option<String>,
150 /// Set once the AsciiLine half of a two-part reply has arrived; reset
151 /// after the matching Ack drives the corresponding FileEvent.
152 pending_ascii: Option<PendingAscii>,
153 /// Queue of FileEvents ready to be drained by [`Self::poll`].
154 out_events: std::collections::VecDeque<FileEvent>,
155 /// File-transfer protocol id (constant 1; configurable for tests).
156 protocol_id: u8,
157}
158
159#[derive(Debug)]
160enum PendingAscii {
161 QueryVersion {
162 version: String,
163 compression: Compression,
164 },
165 /// QUERY reply parsed but compression negotiation failed (e.g. caller
166 /// asked for heatshrink and device advertises only `none`). Surfaces
167 /// as `FileEvent::Failed(_)` once the matching `ok<n>` arrives.
168 QueryFailed(FileError),
169 OpenSuccess,
170 OpenBusy,
171 OpenFail,
172 CloseSuccess,
173 CloseIoError,
174 CloseInvalid,
175 AbortSuccess,
176}
177
178const PROTOCOL_FILE_TRANSFER: u8 = 1;
179const PT_QUERY: u8 = 0;
180const PT_OPEN: u8 = 1;
181const PT_CLOSE: u8 = 2;
182const PT_WRITE: u8 = 3;
183const PT_ABORT: u8 = 4;
184
185impl<'a> FileTransfer<'a> {
186 /// Build a new file-transfer driver borrowing the given session.
187 /// The session must already have completed the SYNC handshake
188 /// (`session.is_synced() == true`).
189 pub fn new(session: &'a mut Session) -> Self {
190 Self {
191 session,
192 state: State::Idle,
193 requested_compression: Compression::None,
194 negotiated: None,
195 advertised_version: None,
196 pending_ascii: None,
197 out_events: std::collections::VecDeque::new(),
198 protocol_id: PROTOCOL_FILE_TRANSFER,
199 }
200 }
201
202 /// Issue the QUERY control packet. Caller must specify what
203 /// compression mode they want — the actual negotiated mode is
204 /// reported back via [`FileEvent::Negotiated`].
205 ///
206 /// # Panics
207 ///
208 /// Panics unless the state is `Idle` (i.e. this is a fresh
209 /// transfer). Programmer error; callers must construct a new
210 /// `FileTransfer` for retries after a `Failed` / `Closed` /
211 /// `Aborted` outcome.
212 pub fn query(&mut self, compression: Compression, now: Instant) {
213 assert!(
214 matches!(self.state, State::Idle),
215 "query() requires Idle state, found {:?}",
216 self.state
217 );
218 self.requested_compression = compression;
219 self.state = State::AwaitingQueryReply;
220 self.session.send(self.protocol_id, PT_QUERY, &[], now);
221 }
222
223 /// Send the OPEN packet. `name` is the destination filename on the
224 /// SD card. `dummy` requests the device pretend to receive a file
225 /// without actually writing it (used for protocol smoke tests).
226 ///
227 /// # Panics
228 ///
229 /// Panics unless QUERY has completed (`Negotiated` state). Without
230 /// QUERY, the compression byte in the OPEN payload would be a
231 /// guess.
232 pub fn open(&mut self, name: &str, dummy: bool, now: Instant) {
233 assert!(
234 matches!(self.state, State::Negotiated),
235 "open() requires Negotiated state (call query() first), found {:?}",
236 self.state
237 );
238 let comp_byte = match self.negotiated {
239 Some(Compression::Heatshrink { .. }) => 1u8,
240 _ => 0u8,
241 };
242 let mut payload = Vec::with_capacity(2 + name.len() + 1);
243 payload.push(if dummy { 1 } else { 0 });
244 payload.push(comp_byte);
245 payload.extend_from_slice(name.as_bytes());
246 payload.push(0);
247 self.state = State::AwaitingOpenReply;
248 self.session.send(self.protocol_id, PT_OPEN, &payload, now);
249 }
250
251 /// Send a WRITE packet with the given chunk. The caller is
252 /// responsible for splitting the source data so each chunk fits
253 /// inside the device's `max_block_size` (and, if compression is in
254 /// use, for compressing each chunk before passing it here).
255 ///
256 /// # Panics
257 ///
258 /// Panics unless the file is open and no WRITE is in flight (state
259 /// `Opened`). Callers must pump until [`FileEvent::WriteAcked`]
260 /// before issuing the next [`write`](Self::write). This mirrors
261 /// the Python reference's one-packet-in-flight policy and keeps
262 /// the state machine unambiguous — pipelining is out of scope for
263 /// 0.1.
264 pub fn write(&mut self, chunk: &[u8], now: Instant) {
265 assert!(
266 matches!(self.state, State::Opened),
267 "write() requires Opened state (pump until WriteAcked between writes), found {:?}",
268 self.state
269 );
270 self.state = State::AwaitingWriteAck;
271 self.session.send(self.protocol_id, PT_WRITE, chunk, now);
272 }
273
274 /// Send the CLOSE packet, finalising the transfer.
275 ///
276 /// # Panics
277 ///
278 /// Panics unless the file is open and no WRITE is in flight
279 /// (state `Opened`).
280 pub fn close(&mut self, now: Instant) {
281 assert!(
282 matches!(self.state, State::Opened),
283 "close() requires Opened state, found {:?}",
284 self.state
285 );
286 self.state = State::AwaitingCloseReply;
287 self.session.send(self.protocol_id, PT_CLOSE, &[], now);
288 }
289
290 /// Send the ABORT packet, cancelling the transfer.
291 ///
292 /// # Panics
293 ///
294 /// Panics unless the file is open and no WRITE is in flight
295 /// (state `Opened`). Aborting before OPEN completes makes no
296 /// protocol sense — there's nothing for the device to abort.
297 pub fn abort(&mut self, now: Instant) {
298 assert!(
299 matches!(self.state, State::Opened),
300 "abort() requires Opened state, found {:?}",
301 self.state
302 );
303 self.state = State::AwaitingAbortReply;
304 self.session.send(self.protocol_id, PT_ABORT, &[], now);
305 }
306
307 /// Compression mode negotiated during QUERY, if any.
308 pub fn negotiated_compression(&self) -> Option<&Compression> {
309 self.negotiated.as_ref()
310 }
311
312 /// Drain bytes the caller should write to the wire (delegates to the
313 /// underlying session).
314 pub fn poll_outbound(&mut self) -> Option<Vec<u8>> {
315 self.session.poll_outbound()
316 }
317
318 /// Push received bytes into the underlying session.
319 ///
320 /// `now` is forwarded into [`Session::feed`](crate::session::Session::feed)
321 /// so any packet dispatched as a side effect of an inbound ack gets
322 /// a real timestamp.
323 pub fn feed(&mut self, bytes: &[u8], now: Instant) {
324 self.session.feed(bytes, now);
325 }
326
327 /// Drive retransmit/timeout logic in the underlying session.
328 pub fn tick(&mut self, now: Instant) {
329 self.session.tick(now);
330 }
331
332 /// Per-attempt response timeout of the underlying session — used by
333 /// adapters to bound inbound reads.
334 pub fn response_timeout(&self) -> std::time::Duration {
335 self.session.response_timeout()
336 }
337
338 /// Pull the next file-level event, processing whatever session events
339 /// have accumulated. Returns `None` when there is nothing pending.
340 pub fn poll(&mut self) -> Option<FileEvent> {
341 // Drain session events until we have at least one FileEvent ready.
342 while let Some(event) = self.session.poll_event() {
343 self.handle_session_event(event);
344 }
345 self.out_events.pop_front()
346 }
347
348 fn handle_session_event(&mut self, event: Event) {
349 match event {
350 Event::AsciiLine(line) => self.handle_ascii_line(line),
351 Event::Ack(_) => self.handle_ack(),
352 Event::Synced { .. } => {
353 // Synced may arrive on the session prior to FileTransfer
354 // being constructed. Treat post-construction as benign
355 // passthrough.
356 }
357 Event::ResendRequested(_) => {
358 // The session re-sends on its own; nothing for us to do.
359 }
360 Event::FatalError => self.fail(FileError::SessionFatalError),
361 Event::OutOfSync { expected, got } => {
362 self.fail(FileError::SessionOutOfSync { expected, got })
363 }
364 Event::Timeout { .. } => self.fail(FileError::SessionTimeout),
365 }
366 }
367
368 fn handle_ascii_line(&mut self, line: String) {
369 if let Some(rest) = line.strip_prefix("PFT:version:") {
370 // Format: <version>:<compression-spec>
371 // compression-spec is "none" or "heatshrink,<window>,<lookahead>"
372 if !matches!(self.state, State::AwaitingQueryReply) {
373 return;
374 }
375 let (version, comp) = match rest.split_once(':') {
376 Some(parts) => parts,
377 None => return,
378 };
379 self.advertised_version = Some(version.to_string());
380 let device_compression = parse_compression_spec(comp);
381 self.pending_ascii = Some(match self.choose_compression(&device_compression) {
382 Ok(chosen) => PendingAscii::QueryVersion {
383 version: version.to_string(),
384 compression: chosen,
385 },
386 Err(err) => PendingAscii::QueryFailed(err),
387 });
388 return;
389 }
390 match line.as_str() {
391 "PFT:success" => {
392 self.pending_ascii = Some(match self.state {
393 State::AwaitingOpenReply => PendingAscii::OpenSuccess,
394 State::AwaitingCloseReply => PendingAscii::CloseSuccess,
395 State::AwaitingAbortReply => PendingAscii::AbortSuccess,
396 _ => return, // unsolicited; ignore
397 });
398 }
399 "PFT:busy" => {
400 if matches!(self.state, State::AwaitingOpenReply) {
401 self.pending_ascii = Some(PendingAscii::OpenBusy);
402 }
403 }
404 "PFT:fail" => {
405 if matches!(self.state, State::AwaitingOpenReply) {
406 self.pending_ascii = Some(PendingAscii::OpenFail);
407 }
408 }
409 "PFT:ioerror" => {
410 if matches!(self.state, State::AwaitingCloseReply) {
411 self.pending_ascii = Some(PendingAscii::CloseIoError);
412 }
413 }
414 "PFT:invalid" => {
415 if matches!(self.state, State::AwaitingCloseReply) {
416 self.pending_ascii = Some(PendingAscii::CloseInvalid);
417 }
418 }
419 _ => {
420 // Unknown line — ignore. Marlin emits chatter we don't
421 // care about during binary mode.
422 }
423 }
424 }
425
426 fn handle_ack(&mut self) {
427 let pending = self.pending_ascii.take();
428
429 match (self.state, pending) {
430 (
431 State::AwaitingQueryReply,
432 Some(PendingAscii::QueryVersion {
433 version,
434 compression,
435 }),
436 ) => {
437 self.negotiated = Some(compression.clone());
438 self.state = State::Negotiated;
439 self.out_events.push_back(FileEvent::Negotiated {
440 version,
441 compression,
442 });
443 }
444 (State::AwaitingQueryReply, Some(PendingAscii::QueryFailed(err))) => {
445 self.fail(err);
446 }
447 (State::AwaitingQueryReply, None) => {
448 self.fail(FileError::ProtocolViolation {
449 state: "AwaitingQueryReply",
450 expected: "PFT:version:<v>:<compression-spec>",
451 });
452 }
453 (State::AwaitingOpenReply, Some(PendingAscii::OpenSuccess)) => {
454 self.state = State::Opened;
455 self.out_events.push_back(FileEvent::Opened);
456 }
457 (State::AwaitingOpenReply, Some(PendingAscii::OpenBusy)) => {
458 self.state = State::Failed;
459 self.out_events
460 .push_back(FileEvent::Failed(FileError::OpenBusy));
461 }
462 (State::AwaitingOpenReply, Some(PendingAscii::OpenFail)) => {
463 self.state = State::Failed;
464 self.out_events
465 .push_back(FileEvent::Failed(FileError::OpenFail));
466 }
467 (State::AwaitingOpenReply, None) => {
468 self.fail(FileError::ProtocolViolation {
469 state: "AwaitingOpenReply",
470 expected: "PFT:success | PFT:busy | PFT:fail",
471 });
472 }
473 (State::AwaitingWriteAck, None) => {
474 self.state = State::Opened;
475 self.out_events.push_back(FileEvent::WriteAcked);
476 }
477 (State::AwaitingWriteAck, Some(_)) => {
478 // The PFT setters are state-gated, so reaching this arm
479 // means the device sent something we couldn't interpret
480 // (or we have a logic bug). Surface it loudly rather than
481 // silently leaving the FSM in AwaitingWriteAck.
482 self.fail(FileError::ProtocolViolation {
483 state: "AwaitingWriteAck",
484 expected: "bare ok<n> (no PFT preamble)",
485 });
486 }
487 (State::AwaitingCloseReply, Some(PendingAscii::CloseSuccess)) => {
488 self.state = State::Closed;
489 self.out_events.push_back(FileEvent::Closed);
490 }
491 (State::AwaitingCloseReply, Some(PendingAscii::CloseIoError)) => {
492 self.state = State::Failed;
493 self.out_events
494 .push_back(FileEvent::Failed(FileError::IoError));
495 }
496 (State::AwaitingCloseReply, Some(PendingAscii::CloseInvalid)) => {
497 self.state = State::Failed;
498 self.out_events
499 .push_back(FileEvent::Failed(FileError::NoOpenFile));
500 }
501 (State::AwaitingCloseReply, None) => {
502 self.fail(FileError::ProtocolViolation {
503 state: "AwaitingCloseReply",
504 expected: "PFT:success | PFT:ioerror | PFT:invalid",
505 });
506 }
507 (State::AwaitingAbortReply, Some(PendingAscii::AbortSuccess)) => {
508 self.state = State::Aborted;
509 self.out_events.push_back(FileEvent::AbortAcked);
510 }
511 (State::AwaitingAbortReply, None) => {
512 self.fail(FileError::ProtocolViolation {
513 state: "AwaitingAbortReply",
514 expected: "PFT:success",
515 });
516 }
517 _ => {
518 // Ack arrived in a state where we don't have a pending
519 // ASCII line to interpret. Most often this is a file
520 // open or close that we've already finalized; ignore.
521 }
522 }
523 }
524
525 /// Resolve the negotiated compression mode given what the caller
526 /// requested and what the device just advertised.
527 ///
528 /// Returns `Err(CompressionUnsupported)` when the caller explicitly
529 /// asked for heatshrink but the device only advertises `none` —
530 /// proceeding would corrupt the upload on the device side.
531 ///
532 /// Caller-supplied window/lookahead override the device-advertised
533 /// values when both sides speak heatshrink. Per the
534 /// [`Compression::Heatshrink`] doc-comment, matching parameters is
535 /// the caller's responsibility.
536 fn choose_compression(&self, device: &Compression) -> Result<Compression, FileError> {
537 match (&self.requested_compression, device) {
538 (Compression::None, _) => Ok(Compression::None),
539 (Compression::Heatshrink { window, lookahead }, Compression::Heatshrink { .. }) => {
540 Ok(Compression::Heatshrink {
541 window: *window,
542 lookahead: *lookahead,
543 })
544 }
545 (Compression::Heatshrink { .. }, _) => Err(FileError::CompressionUnsupported),
546 (Compression::Auto, Compression::Heatshrink { window, lookahead }) => {
547 Ok(Compression::Heatshrink {
548 window: *window,
549 lookahead: *lookahead,
550 })
551 }
552 (Compression::Auto, _) => Ok(Compression::None),
553 }
554 }
555
556 fn fail(&mut self, err: FileError) {
557 self.state = State::Failed;
558 self.out_events.push_back(FileEvent::Failed(err));
559 }
560}
561
562fn parse_compression_spec(spec: &str) -> Compression {
563 let spec = spec.trim();
564 if spec == "none" {
565 return Compression::None;
566 }
567 if let Some(rest) = spec.strip_prefix("heatshrink,") {
568 let mut parts = rest.split(',');
569 let window: Option<u8> = parts.next().and_then(|s| s.trim().parse().ok());
570 let lookahead: Option<u8> = parts.next().and_then(|s| s.trim().parse().ok());
571 if let (Some(window), Some(lookahead)) = (window, lookahead) {
572 return Compression::Heatshrink { window, lookahead };
573 }
574 }
575 // Unknown spec — be conservative and report None so we don't claim
576 // compression that the device may not actually understand.
577 Compression::None
578}