Skip to main content

marlin_binary_transfer/
file_transfer.rs

1//! Layer 3: file-transfer state machine.
2//!
3//! Drives the protocol-1 sub-protocol (QUERY / OPEN / WRITE / CLOSE / ABORT)
4//! on top of a [`Session`]. Translates the session's raw [`Event`]s into
5//! file-level [`FileEvent`]s so the caller doesn't have to parse the
6//! `PFT:*` tokens themselves.
7//!
8//! # Lifecycle
9//!
10//! ```text
11//! Idle ── query()  ──► AwaitingQueryReply ──► Negotiated
12//!                                              │
13//! Negotiated ── open() ──► AwaitingOpenReply ──┴──► Opened ── close() ──► Closed
14//!                                              │           ── abort() ──► Aborted
15//!                                              └── (busy/fail) ──► Failed
16//! Opened     ── write() ──► AwaitingWriteAck ──► Opened
17//! ```
18//!
19//! Compression is negotiated inside [`FileTransfer::query`]: if the caller
20//! passed [`Compression::Auto`] and the device advertises `heatshrink`, the
21//! negotiated parameters are exposed via
22//! [`FileTransfer::negotiated_compression`]. The adapter modules are
23//! responsible for actually compressing chunk bytes before calling
24//! [`FileTransfer::write`].
25
26use std::time::Instant;
27
28use thiserror::Error;
29
30use crate::session::{Event, Session};
31
32/// Caller-visible compression preference.
33#[derive(Debug, Clone, PartialEq, Eq, Default)]
34pub enum Compression {
35    /// Send raw, uncompressed payloads.
36    #[default]
37    None,
38    /// Use heatshrink with the given window/lookahead. Both parameters
39    /// must match what the device advertises in its QUERY reply, or the
40    /// transfer will be corrupt on the device side.
41    Heatshrink {
42        /// log2 of the heatshrink sliding-window size.
43        window: u8,
44        /// log2 of the heatshrink lookahead size.
45        lookahead: u8,
46    },
47    /// Use whatever the device advertises during QUERY. Falls back to
48    /// [`Compression::None`] if the device only supports `none`.
49    Auto,
50}
51
52/// Things [`FileTransfer::poll`] emits as the protocol progresses.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub enum FileEvent {
55    /// QUERY completed. Carries the device's protocol version and the
56    /// effective compression spec the session will use.
57    Negotiated {
58        /// Device-reported file-transfer protocol version.
59        version: String,
60        /// Effective compression spec to use for this transfer.
61        compression: Compression,
62    },
63    /// OPEN succeeded; the device is ready to receive WRITE packets.
64    Opened,
65    /// A WRITE packet has been acknowledged. One per [`FileTransfer::write`]
66    /// call.
67    WriteAcked,
68    /// CLOSE succeeded; transfer is complete.
69    Closed,
70    /// ABORT acknowledged; transfer cancelled cleanly.
71    AbortAcked,
72    /// Something failed; the transfer cannot continue.
73    Failed(FileError),
74}
75
76/// Reasons a transfer failed.
77#[derive(Debug, Clone, PartialEq, Eq, Error)]
78pub enum FileError {
79    /// Device replied `PFT:busy` to OPEN — another transfer is in
80    /// progress on the device side.
81    #[error("device busy (PFT:busy)")]
82    OpenBusy,
83    /// Device replied `PFT:fail` to OPEN — likely SD card not present
84    /// or write-protected.
85    #[error("device refused open (PFT:fail)")]
86    OpenFail,
87    /// Device replied `PFT:ioerror` to CLOSE.
88    #[error("device storage I/O error (PFT:ioerror)")]
89    IoError,
90    /// Device replied `PFT:invalid` to CLOSE — no file was open.
91    #[error("no open file on device (PFT:invalid)")]
92    NoOpenFile,
93    /// The session emitted a [`Event::Timeout`] while we were waiting
94    /// for a reply.
95    #[error("session timed out waiting for reply")]
96    SessionTimeout,
97    /// The session emitted a [`Event::FatalError`].
98    #[error("session reported fatal error (fe)")]
99    SessionFatalError,
100    /// The session emitted [`Event::OutOfSync`].
101    #[error("session out of sync: expected {expected}, got {got}")]
102    SessionOutOfSync {
103        /// Sync number we expected.
104        expected: u8,
105        /// Sync number the device acked.
106        got: u8,
107    },
108    /// Caller requested `Compression::Heatshrink` but the device only
109    /// advertises `none`.
110    #[error("device does not support heatshrink compression")]
111    CompressionUnsupported,
112    /// Device sent the closing `ok<n>` for a CLOSE or ABORT without
113    /// first emitting the expected `PFT:*` preamble. Without that
114    /// preamble, the transfer can't be classified as success or
115    /// failure, so we surface the violation rather than silently
116    /// claiming completion.
117    #[error("device sent bare ok in {state} without expected {expected} preamble")]
118    ProtocolViolation {
119        /// Symbolic name of the FileTransfer state at the time of the ack.
120        state: &'static str,
121        /// The PFT preamble line(s) the protocol requires before the ack.
122        expected: &'static str,
123    },
124}
125
126/// Internal state-machine state.
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
128enum State {
129    Idle,
130    AwaitingQueryReply,
131    Negotiated,
132    AwaitingOpenReply,
133    Opened,
134    AwaitingWriteAck,
135    AwaitingCloseReply,
136    Closed,
137    AwaitingAbortReply,
138    Aborted,
139    Failed,
140}
141
142/// File-transfer state machine driving a [`Session`].
143#[derive(Debug)]
144pub struct FileTransfer<'a> {
145    session: &'a mut Session,
146    state: State,
147    requested_compression: Compression,
148    negotiated: Option<Compression>,
149    advertised_version: Option<String>,
150    /// Set once the AsciiLine half of a two-part reply has arrived; reset
151    /// after the matching Ack drives the corresponding FileEvent.
152    pending_ascii: Option<PendingAscii>,
153    /// Queue of FileEvents ready to be drained by [`Self::poll`].
154    out_events: std::collections::VecDeque<FileEvent>,
155    /// File-transfer protocol id (constant 1; configurable for tests).
156    protocol_id: u8,
157}
158
159#[derive(Debug)]
160enum PendingAscii {
161    QueryVersion {
162        version: String,
163        compression: Compression,
164    },
165    /// QUERY reply parsed but compression negotiation failed (e.g. caller
166    /// asked for heatshrink and device advertises only `none`). Surfaces
167    /// as `FileEvent::Failed(_)` once the matching `ok<n>` arrives.
168    QueryFailed(FileError),
169    OpenSuccess,
170    OpenBusy,
171    OpenFail,
172    CloseSuccess,
173    CloseIoError,
174    CloseInvalid,
175    AbortSuccess,
176}
177
178const PROTOCOL_FILE_TRANSFER: u8 = 1;
179const PT_QUERY: u8 = 0;
180const PT_OPEN: u8 = 1;
181const PT_CLOSE: u8 = 2;
182const PT_WRITE: u8 = 3;
183const PT_ABORT: u8 = 4;
184
185impl<'a> FileTransfer<'a> {
186    /// Build a new file-transfer driver borrowing the given session.
187    /// The session must already have completed the SYNC handshake
188    /// (`session.is_synced() == true`).
189    pub fn new(session: &'a mut Session) -> Self {
190        Self {
191            session,
192            state: State::Idle,
193            requested_compression: Compression::None,
194            negotiated: None,
195            advertised_version: None,
196            pending_ascii: None,
197            out_events: std::collections::VecDeque::new(),
198            protocol_id: PROTOCOL_FILE_TRANSFER,
199        }
200    }
201
202    /// Issue the QUERY control packet. Caller must specify what
203    /// compression mode they want — the actual negotiated mode is
204    /// reported back via [`FileEvent::Negotiated`].
205    ///
206    /// # Panics
207    ///
208    /// Panics unless the state is `Idle` (i.e. this is a fresh
209    /// transfer). Programmer error; callers must construct a new
210    /// `FileTransfer` for retries after a `Failed` / `Closed` /
211    /// `Aborted` outcome.
212    pub fn query(&mut self, compression: Compression, now: Instant) {
213        assert!(
214            matches!(self.state, State::Idle),
215            "query() requires Idle state, found {:?}",
216            self.state
217        );
218        self.requested_compression = compression;
219        self.state = State::AwaitingQueryReply;
220        self.session.send(self.protocol_id, PT_QUERY, &[], now);
221    }
222
223    /// Send the OPEN packet. `name` is the destination filename on the
224    /// SD card. `dummy` requests the device pretend to receive a file
225    /// without actually writing it (used for protocol smoke tests).
226    ///
227    /// # Panics
228    ///
229    /// Panics unless QUERY has completed (`Negotiated` state). Without
230    /// QUERY, the compression byte in the OPEN payload would be a
231    /// guess.
232    pub fn open(&mut self, name: &str, dummy: bool, now: Instant) {
233        assert!(
234            matches!(self.state, State::Negotiated),
235            "open() requires Negotiated state (call query() first), found {:?}",
236            self.state
237        );
238        let comp_byte = match self.negotiated {
239            Some(Compression::Heatshrink { .. }) => 1u8,
240            _ => 0u8,
241        };
242        let mut payload = Vec::with_capacity(2 + name.len() + 1);
243        payload.push(if dummy { 1 } else { 0 });
244        payload.push(comp_byte);
245        payload.extend_from_slice(name.as_bytes());
246        payload.push(0);
247        self.state = State::AwaitingOpenReply;
248        self.session.send(self.protocol_id, PT_OPEN, &payload, now);
249    }
250
251    /// Send a WRITE packet with the given chunk. The caller is
252    /// responsible for splitting the source data so each chunk fits
253    /// inside the device's `max_block_size` (and, if compression is in
254    /// use, for compressing each chunk before passing it here).
255    ///
256    /// # Panics
257    ///
258    /// Panics unless the file is open and no WRITE is in flight (state
259    /// `Opened`). Callers must pump until [`FileEvent::WriteAcked`]
260    /// before issuing the next [`write`](Self::write). This mirrors
261    /// the Python reference's one-packet-in-flight policy and keeps
262    /// the state machine unambiguous — pipelining is out of scope for
263    /// 0.1.
264    pub fn write(&mut self, chunk: &[u8], now: Instant) {
265        assert!(
266            matches!(self.state, State::Opened),
267            "write() requires Opened state (pump until WriteAcked between writes), found {:?}",
268            self.state
269        );
270        self.state = State::AwaitingWriteAck;
271        self.session.send(self.protocol_id, PT_WRITE, chunk, now);
272    }
273
274    /// Send the CLOSE packet, finalising the transfer.
275    ///
276    /// # Panics
277    ///
278    /// Panics unless the file is open and no WRITE is in flight
279    /// (state `Opened`).
280    pub fn close(&mut self, now: Instant) {
281        assert!(
282            matches!(self.state, State::Opened),
283            "close() requires Opened state, found {:?}",
284            self.state
285        );
286        self.state = State::AwaitingCloseReply;
287        self.session.send(self.protocol_id, PT_CLOSE, &[], now);
288    }
289
290    /// Send the ABORT packet, cancelling the transfer.
291    ///
292    /// # Panics
293    ///
294    /// Panics unless the file is open and no WRITE is in flight
295    /// (state `Opened`). Aborting before OPEN completes makes no
296    /// protocol sense — there's nothing for the device to abort.
297    pub fn abort(&mut self, now: Instant) {
298        assert!(
299            matches!(self.state, State::Opened),
300            "abort() requires Opened state, found {:?}",
301            self.state
302        );
303        self.state = State::AwaitingAbortReply;
304        self.session.send(self.protocol_id, PT_ABORT, &[], now);
305    }
306
307    /// Compression mode negotiated during QUERY, if any.
308    pub fn negotiated_compression(&self) -> Option<&Compression> {
309        self.negotiated.as_ref()
310    }
311
312    /// Drain bytes the caller should write to the wire (delegates to the
313    /// underlying session).
314    pub fn poll_outbound(&mut self) -> Option<Vec<u8>> {
315        self.session.poll_outbound()
316    }
317
318    /// Push received bytes into the underlying session.
319    ///
320    /// `now` is forwarded into [`Session::feed`](crate::session::Session::feed)
321    /// so any packet dispatched as a side effect of an inbound ack gets
322    /// a real timestamp.
323    pub fn feed(&mut self, bytes: &[u8], now: Instant) {
324        self.session.feed(bytes, now);
325    }
326
327    /// Drive retransmit/timeout logic in the underlying session.
328    pub fn tick(&mut self, now: Instant) {
329        self.session.tick(now);
330    }
331
332    /// Per-attempt response timeout of the underlying session — used by
333    /// adapters to bound inbound reads.
334    pub fn response_timeout(&self) -> std::time::Duration {
335        self.session.response_timeout()
336    }
337
338    /// Pull the next file-level event, processing whatever session events
339    /// have accumulated. Returns `None` when there is nothing pending.
340    pub fn poll(&mut self) -> Option<FileEvent> {
341        // Drain session events until we have at least one FileEvent ready.
342        while let Some(event) = self.session.poll_event() {
343            self.handle_session_event(event);
344        }
345        self.out_events.pop_front()
346    }
347
348    fn handle_session_event(&mut self, event: Event) {
349        match event {
350            Event::AsciiLine(line) => self.handle_ascii_line(line),
351            Event::Ack(_) => self.handle_ack(),
352            Event::Synced { .. } => {
353                // Synced may arrive on the session prior to FileTransfer
354                // being constructed. Treat post-construction as benign
355                // passthrough.
356            }
357            Event::ResendRequested(_) => {
358                // The session re-sends on its own; nothing for us to do.
359            }
360            Event::FatalError => self.fail(FileError::SessionFatalError),
361            Event::OutOfSync { expected, got } => {
362                self.fail(FileError::SessionOutOfSync { expected, got })
363            }
364            Event::Timeout { .. } => self.fail(FileError::SessionTimeout),
365        }
366    }
367
368    fn handle_ascii_line(&mut self, line: String) {
369        if let Some(rest) = line.strip_prefix("PFT:version:") {
370            // Format: <version>:<compression-spec>
371            //   compression-spec is "none" or "heatshrink,<window>,<lookahead>"
372            if !matches!(self.state, State::AwaitingQueryReply) {
373                return;
374            }
375            let (version, comp) = match rest.split_once(':') {
376                Some(parts) => parts,
377                None => return,
378            };
379            self.advertised_version = Some(version.to_string());
380            let device_compression = parse_compression_spec(comp);
381            self.pending_ascii = Some(match self.choose_compression(&device_compression) {
382                Ok(chosen) => PendingAscii::QueryVersion {
383                    version: version.to_string(),
384                    compression: chosen,
385                },
386                Err(err) => PendingAscii::QueryFailed(err),
387            });
388            return;
389        }
390        match line.as_str() {
391            "PFT:success" => {
392                self.pending_ascii = Some(match self.state {
393                    State::AwaitingOpenReply => PendingAscii::OpenSuccess,
394                    State::AwaitingCloseReply => PendingAscii::CloseSuccess,
395                    State::AwaitingAbortReply => PendingAscii::AbortSuccess,
396                    _ => return, // unsolicited; ignore
397                });
398            }
399            "PFT:busy" => {
400                if matches!(self.state, State::AwaitingOpenReply) {
401                    self.pending_ascii = Some(PendingAscii::OpenBusy);
402                }
403            }
404            "PFT:fail" => {
405                if matches!(self.state, State::AwaitingOpenReply) {
406                    self.pending_ascii = Some(PendingAscii::OpenFail);
407                }
408            }
409            "PFT:ioerror" => {
410                if matches!(self.state, State::AwaitingCloseReply) {
411                    self.pending_ascii = Some(PendingAscii::CloseIoError);
412                }
413            }
414            "PFT:invalid" => {
415                if matches!(self.state, State::AwaitingCloseReply) {
416                    self.pending_ascii = Some(PendingAscii::CloseInvalid);
417                }
418            }
419            _ => {
420                // Unknown line — ignore. Marlin emits chatter we don't
421                // care about during binary mode.
422            }
423        }
424    }
425
426    fn handle_ack(&mut self) {
427        let pending = self.pending_ascii.take();
428
429        match (self.state, pending) {
430            (
431                State::AwaitingQueryReply,
432                Some(PendingAscii::QueryVersion {
433                    version,
434                    compression,
435                }),
436            ) => {
437                self.negotiated = Some(compression.clone());
438                self.state = State::Negotiated;
439                self.out_events.push_back(FileEvent::Negotiated {
440                    version,
441                    compression,
442                });
443            }
444            (State::AwaitingQueryReply, Some(PendingAscii::QueryFailed(err))) => {
445                self.fail(err);
446            }
447            (State::AwaitingQueryReply, None) => {
448                self.fail(FileError::ProtocolViolation {
449                    state: "AwaitingQueryReply",
450                    expected: "PFT:version:<v>:<compression-spec>",
451                });
452            }
453            (State::AwaitingOpenReply, Some(PendingAscii::OpenSuccess)) => {
454                self.state = State::Opened;
455                self.out_events.push_back(FileEvent::Opened);
456            }
457            (State::AwaitingOpenReply, Some(PendingAscii::OpenBusy)) => {
458                self.state = State::Failed;
459                self.out_events
460                    .push_back(FileEvent::Failed(FileError::OpenBusy));
461            }
462            (State::AwaitingOpenReply, Some(PendingAscii::OpenFail)) => {
463                self.state = State::Failed;
464                self.out_events
465                    .push_back(FileEvent::Failed(FileError::OpenFail));
466            }
467            (State::AwaitingOpenReply, None) => {
468                self.fail(FileError::ProtocolViolation {
469                    state: "AwaitingOpenReply",
470                    expected: "PFT:success | PFT:busy | PFT:fail",
471                });
472            }
473            (State::AwaitingWriteAck, None) => {
474                self.state = State::Opened;
475                self.out_events.push_back(FileEvent::WriteAcked);
476            }
477            (State::AwaitingWriteAck, Some(_)) => {
478                // The PFT setters are state-gated, so reaching this arm
479                // means the device sent something we couldn't interpret
480                // (or we have a logic bug). Surface it loudly rather than
481                // silently leaving the FSM in AwaitingWriteAck.
482                self.fail(FileError::ProtocolViolation {
483                    state: "AwaitingWriteAck",
484                    expected: "bare ok<n> (no PFT preamble)",
485                });
486            }
487            (State::AwaitingCloseReply, Some(PendingAscii::CloseSuccess)) => {
488                self.state = State::Closed;
489                self.out_events.push_back(FileEvent::Closed);
490            }
491            (State::AwaitingCloseReply, Some(PendingAscii::CloseIoError)) => {
492                self.state = State::Failed;
493                self.out_events
494                    .push_back(FileEvent::Failed(FileError::IoError));
495            }
496            (State::AwaitingCloseReply, Some(PendingAscii::CloseInvalid)) => {
497                self.state = State::Failed;
498                self.out_events
499                    .push_back(FileEvent::Failed(FileError::NoOpenFile));
500            }
501            (State::AwaitingCloseReply, None) => {
502                self.fail(FileError::ProtocolViolation {
503                    state: "AwaitingCloseReply",
504                    expected: "PFT:success | PFT:ioerror | PFT:invalid",
505                });
506            }
507            (State::AwaitingAbortReply, Some(PendingAscii::AbortSuccess)) => {
508                self.state = State::Aborted;
509                self.out_events.push_back(FileEvent::AbortAcked);
510            }
511            (State::AwaitingAbortReply, None) => {
512                self.fail(FileError::ProtocolViolation {
513                    state: "AwaitingAbortReply",
514                    expected: "PFT:success",
515                });
516            }
517            _ => {
518                // Ack arrived in a state where we don't have a pending
519                // ASCII line to interpret. Most often this is a file
520                // open or close that we've already finalized; ignore.
521            }
522        }
523    }
524
525    /// Resolve the negotiated compression mode given what the caller
526    /// requested and what the device just advertised.
527    ///
528    /// Returns `Err(CompressionUnsupported)` when the caller explicitly
529    /// asked for heatshrink but the device only advertises `none` —
530    /// proceeding would corrupt the upload on the device side.
531    ///
532    /// Caller-supplied window/lookahead override the device-advertised
533    /// values when both sides speak heatshrink. Per the
534    /// [`Compression::Heatshrink`] doc-comment, matching parameters is
535    /// the caller's responsibility.
536    fn choose_compression(&self, device: &Compression) -> Result<Compression, FileError> {
537        match (&self.requested_compression, device) {
538            (Compression::None, _) => Ok(Compression::None),
539            (Compression::Heatshrink { window, lookahead }, Compression::Heatshrink { .. }) => {
540                Ok(Compression::Heatshrink {
541                    window: *window,
542                    lookahead: *lookahead,
543                })
544            }
545            (Compression::Heatshrink { .. }, _) => Err(FileError::CompressionUnsupported),
546            (Compression::Auto, Compression::Heatshrink { window, lookahead }) => {
547                Ok(Compression::Heatshrink {
548                    window: *window,
549                    lookahead: *lookahead,
550                })
551            }
552            (Compression::Auto, _) => Ok(Compression::None),
553        }
554    }
555
556    fn fail(&mut self, err: FileError) {
557        self.state = State::Failed;
558        self.out_events.push_back(FileEvent::Failed(err));
559    }
560}
561
562fn parse_compression_spec(spec: &str) -> Compression {
563    let spec = spec.trim();
564    if spec == "none" {
565        return Compression::None;
566    }
567    if let Some(rest) = spec.strip_prefix("heatshrink,") {
568        let mut parts = rest.split(',');
569        let window: Option<u8> = parts.next().and_then(|s| s.trim().parse().ok());
570        let lookahead: Option<u8> = parts.next().and_then(|s| s.trim().parse().ok());
571        if let (Some(window), Some(lookahead)) = (window, lookahead) {
572            return Compression::Heatshrink { window, lookahead };
573        }
574    }
575    // Unknown spec — be conservative and report None so we don't claim
576    // compression that the device may not actually understand.
577    Compression::None
578}