Skip to main content

pk_command/
lib.rs

1//! # PK Command
2//!
3//! A lightweight, reliable data transfer protocol designed for constrained channels
4//! (e.g., HID, Serial) between a **Host** and an **Embedded Device**.
5//!
6//! ## Key Features
7//! - **Reliability**: Built-in ACK/Retransmission mechanism.
8//! - **Efficiency**: Fixed-length headers and data slicing for small MTUs.
9//! - **Flexibility**: Supports variable access (GET/SET) and remote method invocation (INVOK).
10//! - **no_std Support**: Core logic is compatible with embedded systems without an OS. **Note that `alloc` is still used in no_std environments.**
11//! - **Wait Mechanism**: Keep-alive `AWAIT` packets for long-running operations.
12//!
13//! ## Architecture
14//! The protocol operates using **Transaction Chains**. A chain starts with a `START`
15//! command and ends with `ENDTR`. Large payloads are automatically sliced into
16//! `SDATA` packets.
17//!
18//! ## Command Format
19//! Every command follows the fixed layout:
20//!
21//! `[MSG ID][OPERATION NAME] [OBJECT] [DATA]`.
22//!
23//! - `MSG ID`: A base-94 encoded unsigned integer (using ASCII characters from '!' to '~') that uniquely identifies the command within a transaction.
24//! - `OPERATION NAME`: A 5-character ASCII string representing the command type. All the operations defined in the specification are defined in this library. (See [`types::Operation`] for details.)
25//! - `OBJECT`: An optional 5-character ASCII string that provides additional context for the operation (e.g., variable name, method name).
26//! - `DATA`: An optional binary payload that carries parameters or return values. It can be of arbitrary length and may contain any byte values.
27//!
28//! See [`Command`] for a structured representation of commands and utilities for parsing and serialization.
29//!
30//! Note that:
31//! - `OBJECT` is either omitted, or **exactly 5 ASCII characters**.
32//! - `DATA` (payload) is **arbitrary binary data**.
33//! - The total length of a command is limited by the transportation layer's MTU (e.g., 64 bytes for HID). (See [`PkCommandConfig`].)
34//!   But the protocol handles slicing and reassembly of large payloads automatically, so you can work with large data without worrying about the underlying transport constraints.
35//!
36//! ## Example
37//! ```no_run
38//! use pk_command::{PkCommand, PkCommandConfig, PkHashmapMethod, PkHashmapVariable};
39//!
40//! // 1. Setup configuration and accessors
41//! let config = PkCommandConfig::default(64);
42//! let vars = PkHashmapVariable::new(vec![]);
43//! let methods = PkHashmapMethod::new(vec![]);
44//!
45//! // 2. Initialize the state machine
46//! let pk = PkCommand::<_, _, std::time::Instant>::new(config, vars, methods);
47//! # let transport=pk_command::doc_util::Transport::new();
48//!
49//! // 3. Basic loop driving the protocol
50//! loop {
51//!     // Handle received bytes from your transport (HID/Serial/etc.)
52//!     if let Some(bytes) = transport.recv() {
53//!         pk.incoming_command(bytes);
54//!     }
55//!
56//!     // Process and get commands to send back
57//!     if let Some(cmd) = pk.poll() {
58//!         transport.send(cmd.to_bytes());
59//!     }
60//!
61//!     if pk.is_complete() {
62//!         break;
63//!     }
64//! }
65//! ```
66//!
67//! # Feature flags
68//! - `std`: Enables features that require the Rust standard library. (Mainly the convenient wrappers like [`PkPromise`], [`PkHashmapVariable`], [`PkHashmapMethod`]) **Enabled by default.**
69//! - `embassy`: Enables integration with the [Embassy](https://embassy.dev/) async framework. Flags below are also enabled when this is active:
70//!   - `embassy-time`: Enables the support for [embassy-time](https://crates.io/crates/embassy-time) crate, which provides timekeeping utilities for embedded environments.
71//!   - `embassy-runtime`: Enables the support for [embassy-executor](https://crates.io/crates/embassy-executor) crate, which provides integration between the main state machine and Embassy async tasks.
72//! - `tokio-runtime`: Enables integration with the [Tokio](https://tokio.rs/) async runtime. Provides [`tokio_adapter`] for running async operations within method implementations. Requires `std` feature.
73//! - `smol-runtime`: Enables integration with the [Smol](https://github.com/smol-rs/smol) async executor. Provides [`smol_adapter`] for running async operations within method implementations. Requires `std` feature.
74
75#![warn(missing_docs)]
76#![cfg_attr(not(feature = "std"), no_std)]
77#![cfg_attr(docsrs, feature(doc_cfg))]
78
79const PK_VERSION: &str = env!("CARGO_PKG_VERSION");
80
81// Compile-time guard: async runtime adapters require `std` feature.
82#[cfg(all(
83    any(feature = "tokio-runtime", feature = "smol-runtime"),
84    not(feature = "std")
85))]
86compile_error!("Enabling 'tokio-runtime' or 'smol-runtime' requires the 'std' feature.");
87
88#[cfg(not(feature = "std"))]
89extern crate alloc;
90#[cfg(not(feature = "std"))]
91use alloc::{
92    boxed::Box,
93    string::{String, ToString},
94    vec,
95    vec::Vec,
96};
97
98#[cfg(not(feature = "std"))]
99extern crate core as std;
100
101// The items below (not gated behind the "std" feature)
102// are re-exported by `std` crate from `core`,
103// so just simply renaming `core` as `std` should work
104use std::cell::{Cell, RefCell};
105use std::ops::Add;
106use std::pin::Pin;
107use std::task::Poll;
108use std::time::Duration;
109
110/// Core data structures and types for PK Command.
111pub mod types;
112use types::{Command, Operation, Role, Stage, Status};
113
114/// Utilities used in examples.
115#[doc(hidden)]
116#[cfg(feature = "doc")]
117pub mod doc_util;
118
119mod util;
120#[cfg_attr(docsrs, doc(cfg(feature = "embassy-runtime")))]
121#[cfg(feature = "embassy-runtime")]
122pub use util::async_adapters::embassy as embassy_adapter;
123#[cfg_attr(docsrs, doc(cfg(all(feature = "std", feature = "smol-runtime"))))]
124#[cfg(all(feature = "std", feature = "smol-runtime"))]
125pub use util::async_adapters::smol as smol_adapter;
126#[cfg(all(feature = "std", feature = "tokio-runtime"))]
127#[cfg_attr(docsrs, doc(cfg(all(feature = "std", feature = "tokio-runtime"))))]
128pub use util::async_adapters::tokio as tokio_adapter;
129#[cfg(feature = "std")]
130#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
131pub use util::{PkHashmapMethod, PkHashmapVariable, PkPromise, msg_id};
132
133/// Trait defining how to access (get/set) variables by their string key.
134///
135/// This allows the [`PkCommand`] state machine to be generic over the actual variable storage.
136/// In `std` environments, a convenient implementation using [`std::collections::HashMap`] is provided as [`PkHashmapVariable`].
137/// In `no_std` environments, you must provide your own implementation.
138///
139/// # Example
140/// ```
141/// use pk_command::PkVariableAccessor;
142///
143/// struct MyVariableStore;
144/// impl PkVariableAccessor for MyVariableStore {
145///     fn get(&self, key: String) -> Option<Vec<u8>> {
146///         if key == "VERSION" {
147///             Some(b"1.0.0".to_vec())
148///         } else {
149///             None
150///         }
151///     }
152///     fn set(&self, key: String, value: Vec<u8>) -> Result<(), String> {
153///         // Logic to store the value
154///         Ok(())
155///     }
156/// }
157/// ```
158pub trait PkVariableAccessor {
159    /// Retrieves the value of a variable.
160    ///
161    /// # Arguments
162    /// * `key`: The name of the variable to retrieve.
163    ///
164    /// # Returns
165    /// `Some(Vec<u8>)` containing the variable's data if found, or `None` otherwise.
166    fn get(&self, key: String) -> Option<Vec<u8>>;
167
168    /// Sets the value of a variable.
169    ///
170    /// # Arguments
171    /// * `key`: The name of the variable to set.
172    /// * `value`: The new data for the variable.
173    ///
174    /// # Returns
175    /// `Ok(())` if successful, or an `Err(String)` describing the error.
176    fn set(&self, key: String, value: Vec<u8>) -> Result<(), String>;
177}
178
179/// A handle for a long-running operation that can be polled for completion.
180///
181/// This is used primarily by the `INVOK` operation. Since PK Command is designed
182/// for poll-based environments (like in embedded systems, or within an async runtime),
183/// methods that take time to execute should return a [`Pollable`].
184///
185/// The state machine will call [`poll()`](crate::Pollable::poll) periodically and send `AWAIT` packets
186/// to the host to keep the transaction alive as long as [`Poll::Pending`] is returned.
187pub trait Pollable {
188    /// Polls the operation for completion.
189    ///
190    /// # Returns
191    /// - `Poll::Ready(Ok(Some(data)))`: Operation finished with result data.
192    /// - `Poll::Ready(Ok(None))`: Operation finished successfully with no data.
193    /// - `Poll::Ready(Err(e))`: Operation failed with an error message.
194    /// - `Poll::Pending`: Operation is still in progress.
195    fn poll(&self) -> std::task::Poll<Result<Option<Vec<u8>>, String>>;
196}
197
198/// Trait defining how to invoke methods by their string key.
199///
200/// This allows the [`PkCommand`] state machine to call arbitrary logic on the device.
201/// In `std` environments, a convenient implementation using [`std::collections::HashMap`] is provided as [`PkHashmapMethod`].
202/// In `no_std` environments, you must provide your own implementation.
203///
204/// # Example
205/// ```
206/// use pk_command::{PkMethodAccessor, Pollable};
207/// use std::pin::Pin;
208/// use std::task::Poll;
209///
210/// struct MyMethod;
211/// impl Pollable for MyMethod {
212///     fn poll(&self) -> Poll<Result<Option<Vec<u8>>, String>> {
213///         Poll::Ready(Ok(Some(b"Hello from PK!".to_vec())))
214///     }
215/// }
216///
217/// struct MyMethodStore;
218/// impl PkMethodAccessor for MyMethodStore {
219///     fn call(&self, key: String, param: Vec<u8>) -> Result<Pin<Box<dyn Pollable>>, String> {
220///         if key == "GREET" {
221///             Ok(Box::pin(MyMethod))
222///         } else {
223///             Err("Method not found".to_string())
224///         }
225///     }
226/// }
227/// ```
228pub trait PkMethodAccessor {
229    /// Calls a method with the given parameters.
230    ///
231    /// # Arguments
232    /// * `key`: The name of the method to call.
233    /// * `param`: The parameters for the method, as a byte vector.
234    ///
235    /// # Returns
236    /// A `Result` containing a pinned, boxed `Pollable` that will resolve to the method's output,
237    /// or an `Err(String)` if the method call cannot be initiated.
238    fn call(&self, key: String, param: Vec<u8>) -> Result<Pin<Box<dyn Pollable>>, String>;
239}
240
241/// Trait representing an instant in time.
242///
243/// This trait abstracts over the [`std::time::Instant`] to support `no_std` environments
244/// where a custom timer implementation might be needed. In `std` environments, using [`std::time::Instant`] is recommended.
245///
246/// # Note
247///
248/// If you want to provide a custom implementation to be used in [`PkCommand`], besides implementing this trait,
249/// you also need to ensure that `Add<Duration, Output = Instant>`, `PartialOrd`, and `Copy` are implemented for your type.
250///
251/// # Tips
252/// If you are using Embassy on your embedded device, you can use the provided [`EmbassyInstant`] adapter, which wraps [`embassy_time::Instant`] and implements the necessary traits for compatibility with [`PkCommand`].
253pub trait PkInstant
254where
255    Self: Sized,
256{
257    /// Returns the current instant.
258    fn now() -> Self;
259    /// Returns the duration elapsed since this instant.
260    fn elapsed(&self) -> Duration;
261}
262
263#[cfg(feature = "std")]
264impl PkInstant for std::time::Instant {
265    fn now() -> Self {
266        std::time::Instant::now()
267    }
268
269    fn elapsed(&self) -> Duration {
270        std::time::Instant::elapsed(self)
271    }
272}
273
274/// A [`PkInstant`] adapter for [`embassy_time::Instant`].
275///
276///
277///
278/// This type bridges the PK Command timing abstraction with the
279/// [`embassy-time`](https://crates.io/crates/embassy-time) crate, enabling timeout handling in `no_std`/embedded
280/// environments that use Embassy.
281///
282/// # Availability
283/// Enabled when the `embassy` feature is active (it also enables `embassy-time`).
284///
285/// # Why this wrapper exists
286/// [`embassy_time::Instant`] is tied to [`embassy_time::Duration`], while
287/// PK Command uses [`core::time::Duration`]. These durations can be converted,
288/// but their types are not directly compatible with the `PkCommand` signature,
289/// so a newtype adapter keeps the API consistent without large generic changes.
290///
291/// # Notes
292/// - Implements [`Add<Duration>`] with millisecond precision.
293/// - [`elapsed()`](crate::EmbassyInstant::elapsed) guards against clock rollback by returning `0` if the
294///   current instant is earlier than the stored instant.
295///
296/// # Example
297///
298/// ```
299/// use pk_command::{
300///     EmbassyInstant, PkCommand, PkCommandConfig, PkHashmapMethod, PkHashmapVariable,
301/// };
302///
303/// // embassy_timer::Instant must be constructed within an Embassy context
304/// // otherwise the code would not compile.
305/// #[embassy_executor::task]
306/// async fn example() {
307///     let pk = PkCommand::<_, _, EmbassyInstant>::new(
308///         PkCommandConfig::default(64),
309///         PkHashmapVariable::new(vec![]),
310///         PkHashmapMethod::new(vec![]),
311///     );
312/// }
313/// ```
314///
315/// # See also
316///
317/// To use PK Command within an Embassy context, you might also want to have a
318/// look at [`embassy_adapter`].
319///
320#[cfg(feature = "embassy-time")]
321#[cfg_attr(docsrs, doc(cfg(feature = "embassy-time")))]
322#[derive(Clone, Copy, PartialOrd, PartialEq, Eq, Ord)]
323pub struct EmbassyInstant(embassy_time::Instant);
324
325#[cfg(feature = "embassy-time")]
326impl EmbassyInstant {
327    fn into_inner(self) -> embassy_time::Instant {
328        self.0
329    }
330}
331
332#[cfg(feature = "embassy-time")]
333impl Add<Duration> for EmbassyInstant {
334    type Output = EmbassyInstant;
335
336    fn add(self, rhs: Duration) -> Self::Output {
337        EmbassyInstant(self.0 + embassy_time::Duration::from_millis(rhs.as_millis() as u64))
338    }
339}
340
341#[cfg(feature = "embassy-time")]
342impl From<embassy_time::Instant> for EmbassyInstant {
343    fn from(inst: embassy_time::Instant) -> Self {
344        EmbassyInstant(inst)
345    }
346}
347
348#[cfg(feature = "embassy-time")]
349impl PkInstant for EmbassyInstant {
350    fn now() -> Self {
351        embassy_time::Instant::now().into()
352    }
353
354    fn elapsed(&self) -> core::time::Duration {
355        let now = embassy_time::Instant::now();
356        if now >= self.into_inner() {
357            (now - self.into_inner()).into()
358        } else {
359            // This case can happen if the system clock was adjusted backwards.
360            core::time::Duration::from_secs(0)
361        }
362    }
363}
364
365/// Configuration for the [`PkCommand`] state machine.
366///
367/// Use this struct to define timeout durations and packet size limits according to your
368/// transport layer's constraints (e.g., HID, Serial, etc.).
369#[derive(Clone)]
370pub struct PkCommandConfig {
371    /// Timeout duration for waiting for an `ACKNO` command. Default is 100ms.
372    ack_timeout: Duration,
373    /// Timeout duration for waiting for the next command in a sequence. Default is 500ms.
374    inter_command_timeout: Duration,
375    /// Interval at which the Device sends `AWAIT` keep-alive commands. Default is 300ms.
376    await_interval: Duration,
377    /// The maximum length of a single command packet (in bytes), including headers.
378    packet_limit: u64,
379    /// The version string of the package.
380    pk_version: &'static str,
381}
382
383impl PkCommandConfig {
384    /// Creates a [`PkCommandConfig`] with default (as recommended in the specification file) timeout values.
385    ///
386    /// # Default timeouts
387    /// - ACK timeout: 100ms
388    /// - Inter command timeout: 500ms
389    /// - `AWAIT` interval: 300ms
390    ///
391    /// # Arguments
392    /// * `packet_limit`: The maximum packet size (MTU) of the underlying transport (e.g., 64 for HID).
393    ///
394    /// # Returns
395    /// A [`PkCommandConfig`] instance with default timeouts and the specified packet limit.
396    ///
397    /// # Note
398    /// This is **not** an implementation of [`Default`] trait because `packet_limit` must be specified.
399    pub fn default(packet_limit: u64) -> Self {
400        PkCommandConfig {
401            ack_timeout: Duration::from_millis(100),
402            inter_command_timeout: Duration::from_millis(500),
403            await_interval: Duration::from_millis(300),
404            packet_limit,
405            pk_version: PK_VERSION,
406        }
407    }
408
409    /// Creates a new [`PkCommandConfig`] with custom timing and packet limit.
410    ///
411    /// # Arguments
412    /// * `ack_timeout`: Timeout for ACKs in milliseconds.
413    /// * `inter_command_timeout`: Timeout between commands in milliseconds.
414    /// * `await_interval`: Interval for sending `AWAIT` keep-alives in milliseconds.
415    /// * `packet_limit`: Maximum length of a single packet in bytes.
416    ///
417    /// # Note
418    /// To avoid undesirable behavior, you should ensure that the timeout values on both sides (Host and Device) are exactly the same.
419    pub fn new(
420        ack_timeout: u64,
421        inter_command_timeout: u64,
422        await_interval: u64,
423        packet_limit: u64,
424    ) -> Self {
425        PkCommandConfig {
426            ack_timeout: Duration::from_millis(ack_timeout),
427            inter_command_timeout: Duration::from_millis(inter_command_timeout),
428            await_interval: Duration::from_millis(await_interval),
429            packet_limit,
430            pk_version: PK_VERSION,
431        }
432    }
433}
434
435/// The main state machine for handling the PK Command protocol.
436///
437/// It manages the lifecycle of a transaction, including:
438/// - Parsing incoming raw bytes into commands.
439/// - Generating response commands (ACKs, data slices, etc.).
440/// - Handling timeouts and retransmissions.
441/// - Managing data slicing for large transfers.
442///
443/// This struct is generic over:
444/// - `VA`: A [`PkVariableAccessor`] for variable storage.
445/// - `MA`: A [`PkMethodAccessor`] for method invocation.
446/// - `Instant`: A [`PkInstant`] for time tracking (allowing for `no_std` timer implementations). Typically [`std::time::Instant`] in `std` environments, or [`EmbassyInstant`] in Embassy environments.
447///
448/// # Usage Pattern
449/// 1. Feed received data into [`incoming_command()`](crate::PkCommand::incoming_command).
450/// 2. Regularly call [`poll()`](crate::PkCommand::poll) to progress the state machine and check for commands to send.
451/// 3. If [`poll()`](crate::PkCommand::poll) returns `Some(Command)`, serialize it with [`to_bytes()`](crate::types::Command::to_bytes) and send it over your transport.
452///
453/// # Host vs Device
454///
455/// They are not actual device types, but rather roles in a transaction.
456///
457/// - **Host** is the one who calls [`perform()`](crate::PkCommand::perform) to initiate a transaction (e.g., `SENDV`, `INVOK`).
458/// - **Device** is the one who reacts against the transaction and automatically responds to incoming root commands using the provided accessors.
459///
460/// # Example
461/// ```no_run
462/// use pk_command::{PkCommand, PkCommandConfig, PkHashmapMethod, PkHashmapVariable};
463///
464/// let config = PkCommandConfig::default(64);
465/// let vars = PkHashmapVariable::new(vec![]);
466/// let methods = PkHashmapMethod::new(vec![]);
467/// let pk = PkCommand::<_, _, std::time::Instant>::new(config, vars, methods);
468/// # let transport = pk_command::doc_util::Transport::new();
469/// loop {
470///     // 1. Receive data from transport...
471///     if let Some(received_bytes) = transport.recv() {
472///         pk.incoming_command(received_bytes);
473///     }
474///
475///     // 2. Drive the state machine
476///     if let Some(cmd) = pk.poll() {
477///         let bytes = cmd.to_bytes();
478///         transport.send(bytes);
479///     }
480///     std::thread::sleep(std::time::Duration::from_millis(10));
481/// }
482/// ```
483pub struct PkCommand<VA, MA, Instant>
484where
485    VA: PkVariableAccessor,
486    MA: PkMethodAccessor,
487    Instant: PkInstant + Add<Duration, Output = Instant> + PartialOrd + Copy,
488{
489    stage: Cell<Stage>,
490    status: Cell<Status>,
491    role: Cell<Role>,
492    last_sent_command: RefCell<Command>,
493    last_sent_msg_id: Cell<u16>,
494    last_received_msg_id: Cell<u16>,
495    data_param: RefCell<Vec<u8>>,
496    data_return: RefCell<Vec<u8>>,
497    sending_data_progress: Cell<u64>,
498    root_operation: Cell<Operation>,
499    root_object: RefCell<Option<String>>,
500    command_buffer: RefCell<Command>,
501    command_processed: Cell<bool>,
502    last_command_time: Cell<Instant>,
503    device_op_pending: Cell<bool>,
504    device_await_deadline: Cell<Option<Instant>>,
505    config: PkCommandConfig,
506    variable_accessor: VA,
507    method_accessor: MA,
508    pending_pollable: RefCell<Option<Pin<Box<dyn Pollable>>>>,
509    device_should_return: Cell<bool>, // 设备是否“收到了 QUERY 但还没有返回值”
510}
511
512impl<
513    VA: PkVariableAccessor,
514    MA: PkMethodAccessor,
515    Instant: PkInstant + Add<Duration, Output = Instant> + PartialOrd + Copy,
516> PkCommand<VA, MA, Instant>
517{
518    /// Ingests a raw command received from the other party.
519    ///
520    /// This should be called whenever new bytes arrive on your transport layer. The
521    /// state machine will parse the bytes and update its internal buffers for the
522    /// next [`poll()`](crate::PkCommand::poll) cycle.
523    ///
524    /// # Arguments
525    /// * `command_bytes`: The raw bytes of the received command.
526    ///
527    /// # Returns
528    /// `Ok(())` if the command was successfully parsed and buffered.
529    /// `Err(&'static str)` if parsing failed (e.g., invalid format, unknown operation).
530    pub fn incoming_command(&self, command_bytes: Vec<u8>) -> Result<(), &'static str> {
531        match Command::parse(&command_bytes) {
532            // Pass as slice
533            Ok(parsed_command) => {
534                self.command_buffer.replace(parsed_command);
535                self.command_processed.set(false);
536                self.last_command_time.replace(Instant::now());
537                Ok(())
538            }
539            Err(e) => Err(e),
540        }
541    }
542
543    /// Slices a chunk of data from internal buffers for multipart transfer.
544    ///
545    /// This is an internal utility used during `SDATA` phases.
546    fn slice_data(&self, role: Role) -> Result<(Vec<u8>, bool), &'static str> {
547        // 如果 Role 是 Device 则默认在发送返回值,反之亦然
548        match role {
549            Role::Device => {
550                let data = self.data_return.borrow();
551                if data.is_empty() {
552                    return Err("No return data to slice.");
553                }
554                let start = self.sending_data_progress.get() as usize;
555                let end =
556                    std::cmp::min(start + (self.config.packet_limit - 14) as usize, data.len());
557                let is_last_packet = end == data.len();
558                self.sending_data_progress.set(end as u64);
559                Ok((data[start..end].to_vec(), is_last_packet))
560            }
561            Role::Host => {
562                let data = self.data_param.borrow();
563                if data.is_empty() {
564                    return Err("No parameter data to slice.");
565                }
566                let start = self.sending_data_progress.get() as usize;
567                let end =
568                    std::cmp::min(start + (self.config.packet_limit - 14) as usize, data.len());
569                let is_last_packet = end == data.len();
570                self.sending_data_progress.set(end as u64);
571                Ok((data[start..end].to_vec(), is_last_packet))
572            }
573            Role::Idle => Err("Cannot slice data in Idle role."),
574        }
575    }
576
577    /// Polls the state machine for progress and pending actions.
578    ///
579    /// See [`PkCommand`] for more details.
580    ///
581    /// This method must be called frequently in your main loop. It handles:
582    /// 1. **Processing**: Consuming commands received via `incoming_command`.
583    /// 2. **Execution**: Running device-side logic (variable access, method polling).
584    /// 3. **Protocol Flow**: Automatically generating ACKs, data slices, and ENDTRs.
585    /// 4. **Reliability**: Handling timeouts and retransmitting lost packets.
586    ///
587    /// # Returns
588    ///
589    /// - `Some(Command)`: A command that needs to be sent to the peer.
590    ///   Serialize it with [`to_bytes()`](crate::types::Command::to_bytes) and transmit it.
591    /// - `None`: No action required at this time.
592    pub fn poll(&self) -> Option<Command> {
593        let next_msg_id_for_send = || util::msg_id::increment(self.last_received_msg_id.get());
594        let send = move |command: Command| -> Option<Command> {
595            self.last_command_time.set(Instant::now());
596            self.last_sent_msg_id.set(command.msg_id);
597            self.last_sent_command.replace(command.clone());
598            // 因为 ACK 的函数并没有嵌套调用这个,所以
599            self.status.set(Status::AwaitingAck);
600            Some(command)
601        };
602        let reset_transaction_state = || {
603            self.stage.set(Stage::Idle);
604            self.status.set(Status::Other);
605            self.role.set(Role::Idle);
606            // Clear other relevant fields like root_operation, data_param, data_return, device_op_pending etc.
607            self.data_param.replace(vec![]);
608            self.data_return.replace(vec![]);
609            self.sending_data_progress.set(0);
610            self.device_op_pending.set(false);
611            self.device_await_deadline.set(None);
612            self.pending_pollable.replace(None); //确保清理
613            self.device_should_return.set(false);
614        };
615        let ack = move |msg_id: u16, operation: Operation| -> Option<Command> {
616            self.last_command_time.set(Instant::now());
617            Some(Command {
618                msg_id,
619                operation: Operation::Acknowledge,
620                object: Some(operation.to_name().to_string()),
621                data: None,
622            })
623        };
624        let err = |msg: &'static str| -> Option<Command> {
625            // 在收到 ERROR 或 ACKNO ERROR 后,状态数据清零
626            // 这个逻辑在下面处理 所以这里就不写了
627            self.status.set(Status::AwaitingErrAck);
628            let command = Command {
629                msg_id: 0,
630                operation: Operation::Error,
631                object: Some(String::from("ERROR")),
632                data: Some(msg.as_bytes().to_vec()),
633            };
634            self.last_command_time.set(Instant::now());
635            self.last_sent_msg_id.set(command.msg_id);
636            self.last_sent_command.replace(command.clone());
637            Some(command)
638        };
639        // 首先检查是否有新的指令进入 command buffer
640        match self.command_processed.get() {
641            true => {
642                // Idle 则忽略当前 poll
643                if self.stage.get() == Stage::Idle {
644                    return None;
645                }
646                if self.stage.get() == Stage::Started
647                    && self.role.get() == Role::Host
648                    && self.status.get() != Status::AwaitingAck
649                {
650                    return send(Command {
651                        msg_id: next_msg_id_for_send(),
652                        operation: Operation::Start,
653                        object: None,
654                        data: None,
655                    });
656                }
657                // 当设备有挂起的 INVOK 操作并且处于响应阶段时,轮询 Pollable
658                if self.role.get() == Role::Device
659                    && self.device_op_pending.get()
660                    && self.stage.get() == Stage::SendingResponse
661                {
662                    // 如果正在等待 AWAIT 的 ACK,则不轮询主 Pollable, ACK 超时机制处理 AWAIT 的重传
663                    if self.status.get() == Status::AwaitingAck {
664                        // Timeout for AWAIT's ACK will be handled by the generic timeout logic below.
665                    } else if self.status.get() == Status::AwaitingErrAck {
666                        // This state is unlikely if a device operation is pending normally.
667                        // Consider if an error should be raised or state reset.
668                    } else {
669                        // Status::Other, ready to poll the main INVOK pollable
670                        let mut pollable_store = self.pending_pollable.borrow_mut();
671
672                        if let Some(pinned_pollable) = pollable_store.as_mut() {
673                            match pinned_pollable.as_mut().poll() {
674                                Poll::Ready(result) => {
675                                    pollable_store.take(); // Remove completed pollable
676                                    self.device_op_pending.set(false);
677                                    self.device_await_deadline.set(None);
678
679                                    match result {
680                                        Ok(data_opt) => {
681                                            self.data_return.replace(data_opt.unwrap_or_default());
682                                            // Stage is already SendingResponse.
683                                            self.sending_data_progress.set(0); // Reset for sending return data.
684
685                                            let rturn_object_name =
686                                                if self.data_return.borrow().is_empty() {
687                                                    String::from("EMPTY")
688                                                } else {
689                                                    Operation::Invoke.to_name().to_string()
690                                                };
691                                            return send(Command {
692                                                msg_id: next_msg_id_for_send(),
693                                                operation: Operation::Return,
694                                                object: Some(rturn_object_name),
695                                                data: None,
696                                            });
697                                        }
698                                        Err(_) => {
699                                            reset_transaction_state();
700                                            return err("INVOK operation failed");
701                                        }
702                                    }
703                                }
704                                Poll::Pending => {
705                                    if Instant::now()
706                                        >= self
707                                            .device_await_deadline
708                                            .get()
709                                            .unwrap_or(Instant::now())
710                                    {
711                                        self.device_await_deadline
712                                            .set(Some(Instant::now() + self.config.await_interval));
713                                        return send(Command {
714                                            msg_id: next_msg_id_for_send(),
715                                            operation: Operation::Await,
716                                            object: None,
717                                            data: None,
718                                        });
719                                    }
720                                }
721                            }
722                        } else {
723                            // device_op_pending is true, but no pollable.
724                            reset_transaction_state();
725                            return err("Internal: Device op pending but no pollable.");
726                        }
727                    }
728                } // 结束 device_op_pending && Stage::SendingResponse 的处理
729                if self.device_should_return.get() {
730                    self.sending_data_progress.set(0); // 重置发送进度
731                    self.device_should_return.set(false);
732                    // 这时候的状态应该是收到了 QUERY,还没有发送返回值
733                    match self.root_operation.get() {
734                        Operation::GetVersion => {
735                            return send(Command {
736                                msg_id: next_msg_id_for_send(),
737                                operation: Operation::Return,
738                                object: Some(self.root_operation.get().to_name().to_string()),
739                                data: None,
740                            });
741                        }
742                        Operation::RequireVariable => {
743                            if self.data_return.borrow().is_empty() {
744                                return send(Command {
745                                    msg_id: next_msg_id_for_send(),
746                                    operation: Operation::Return,
747                                    object: Some(String::from("EMPTY")),
748                                    data: None,
749                                });
750                            }
751                            return send(Command {
752                                msg_id: next_msg_id_for_send(),
753                                operation: Operation::Return,
754                                object: Some(self.root_operation.get().to_name().to_string()),
755                                data: None,
756                            });
757                        }
758                        Operation::SendVariable => {
759                            // SENDV doesn't return data in the RTURN command itself.
760                            // The result of the set operation is implicitly acknowledged by the ENDTR ACK.
761                            // If there was an error during set, it would be handled by the error path.
762                            // We still send RTURN EMPTY to signal the end of the Device's processing phase.
763                            self.data_return.replace(vec![]); // Ensure data_return is empty
764                            return send(Command {
765                                msg_id: next_msg_id_for_send(),
766                                operation: Operation::Return,
767                                object: Some(Operation::Empty.to_name().to_string()),
768                                data: None,
769                            });
770                        }
771                        Operation::Invoke => {
772                            // 忽略,因为 Invoke 的返回在上面轮询 Pollable 时处理
773                        }
774                        _ => {
775                            panic!("Not a root operation");
776                        }
777                    }
778                }
779
780                // 获取当前时间来比较超时
781                let elapsed_ms = self.last_command_time.get().elapsed();
782                match self.status.get() {
783                    Status::AwaitingAck | Status::AwaitingErrAck => {
784                        // 等待 ACK 时则检查 ACK 超时来确认是否重传
785                        if elapsed_ms >= self.config.ack_timeout {
786                            return Some(self.last_sent_command.borrow().clone());
787                        }
788                    }
789                    _ => {
790                        // 仅当不在 Idle 状态且没有挂起的设备操作时检查指令间超时
791                        if self.stage.get() != Stage::Idle
792                            && !self.device_op_pending.get()
793                            && elapsed_ms >= self.config.inter_command_timeout
794                        {
795                            reset_transaction_state(); // 在发送错误前重置状态
796                            return err("Operation timed out");
797                        }
798                    }
799                }
800            }
801            // 缓冲区内有新的指令
802            false => {
803                self.command_processed.set(true);
804                self.last_received_msg_id
805                    .set(self.command_buffer.borrow().msg_id); // Store received msg_id
806                let recv = self.command_buffer.borrow();
807                // 首先处理 Error 这种不被 Stage 描述的特殊情况
808                if recv.operation == Operation::Error {
809                    reset_transaction_state();
810                    return ack(0, Operation::Error);
811                } else if self.status.get() == Status::AwaitingErrAck {
812                    if recv.operation == Operation::Acknowledge
813                        && Some(String::from("ERROR")) == recv.object
814                    {
815                        self.status.set(Status::Other);
816                        self.root_operation.set(Operation::Empty);
817                        self.stage.set(Stage::Idle);
818                        self.role.set(Role::Idle);
819                        return None;
820                    } else {
821                        return err("Should be ACKNO ERROR");
822                    }
823                }
824                match self.stage.get() {
825                    Stage::Idle => {
826                        // 在 Idle 状态下只能收到 START,且自身为 Device
827                        if recv.operation != Operation::Start {
828                            return err("not in a chain");
829                        }
830                        self.role.set(Role::Device);
831                        self.stage.set(Stage::Started);
832                        self.status.set(Status::Other); // Awaiting root command from Host
833                        return ack(recv.msg_id, recv.operation);
834                    }
835                    Stage::Started => {
836                        // 在 Started 状态下,根据当前角色不同,预期的行为应该是
837                        // - Host -> 接收到 ACK,指示当前的根操作
838                        // - Device -> 接收到根操作,进行 ACK
839                        match self.role.get() {
840                            Role::Host => {
841                                if recv.operation == Operation::Acknowledge {
842                                    self.status.set(Status::Other);
843                                    self.stage.set(Stage::RootOperationAssigned);
844                                    return send(Command {
845                                        msg_id: next_msg_id_for_send(),
846                                        operation: self.root_operation.get(),
847                                        object: self.root_object.borrow().clone(),
848                                        data: None,
849                                    });
850                                }
851                            }
852                            Role::Device => {
853                                if recv.operation.is_root() {
854                                    self.root_operation.set(recv.operation);
855                                    // Validate if object is present for ops that require it
856                                    if (recv.operation == Operation::RequireVariable
857                                        || recv.operation == Operation::SendVariable
858                                        || recv.operation == Operation::Invoke)
859                                        && recv.object.is_none()
860                                    {
861                                        reset_transaction_state();
862                                        return err(
863                                            "Operation requires an object but none was provided.",
864                                        );
865                                    }
866                                    self.root_object.replace(recv.object.clone());
867                                    self.stage.set(Stage::RootOperationAssigned);
868                                    return ack(recv.msg_id, recv.operation);
869                                } else {
870                                    return err("not a root operation");
871                                }
872                            }
873                            _ => {
874                                // 考虑代码问题,因为 Stage 已经是 Started 了,Role 不可能是 Idle
875                                panic!("Role cannot be Idle if Stage is Started")
876                            }
877                        }
878                    }
879                    Stage::RootOperationAssigned => {
880                        /* Host -> 接收到 ACK,**开始**传输数据。也就是说参数的*第一段*或 EMPTY 指令
881                          Device -> 接收到 EMPTY 或数据的第一段
882                        */
883                        match self.role.get() {
884                            Role::Host => {
885                                if recv.operation == Operation::Acknowledge {
886                                    self.status.set(Status::Other);
887                                    self.stage.set(Stage::SendingParameter);
888                                    if self.data_param.borrow().is_empty() {
889                                        return send(Command {
890                                            msg_id: next_msg_id_for_send(),
891                                            operation: Operation::Empty,
892                                            object: None,
893                                            data: None,
894                                        });
895                                    } else {
896                                        match self.slice_data(Role::Host) {
897                                            Ok((data_chunk, _is_last)) => {
898                                                return send(Command {
899                                                    msg_id: next_msg_id_for_send(),
900                                                    operation: Operation::Data,
901                                                    object: Some(
902                                                        self.root_operation
903                                                            .get()
904                                                            .to_name()
905                                                            .to_string(),
906                                                    ),
907                                                    data: Some(data_chunk),
908                                                });
909                                            }
910                                            Err(e) => {
911                                                reset_transaction_state();
912                                                return err(e);
913                                            }
914                                        }
915                                    }
916                                } else {
917                                    return err("Should be ACKNO");
918                                }
919                            }
920                            Role::Device => {
921                                if recv.operation == Operation::Empty {
922                                    self.stage.set(Stage::SendingParameter);
923                                    return ack(recv.msg_id, recv.operation);
924                                } else if recv.operation == Operation::Data {
925                                    self.stage.set(Stage::SendingParameter);
926                                    {
927                                        // 缩小可变借用的作用域,确保归还
928                                        self.data_param
929                                            .borrow_mut()
930                                            .append(&mut recv.data.as_ref().unwrap().clone());
931                                    }
932                                    return ack(recv.msg_id, recv.operation);
933                                } else {
934                                    return err("Should be EMPTY or DATA");
935                                }
936                            }
937                            _ => {
938                                // 同上
939                                panic!("Role cannot be Idle if Stage is RootOperationAssigned")
940                            }
941                        }
942                    }
943                    Stage::SendingParameter => {
944                        // 此阶段:
945                        // - Host: 已发送第一个参数数据包(SDATA)或 EMPTY,并收到 ACKNO。
946                        //         现在需要判断是继续发送 SDATA 还是发送 ENDTR。
947                        // - Device: 已收到第一个参数数据包(SDATA)或 EMPTY,并发送了 ACKNO。
948                        //           现在等待接收后续的 SDATA 或 ENDTR。
949                        match self.role.get() {
950                            Role::Host => {
951                                // Host 必须是收到了 ACKNO
952                                if recv.operation != Operation::Acknowledge {
953                                    return err("Host expected ACKNO in SendingParameter stage");
954                                }
955                                self.status.set(Status::Other);
956
957                                // 将借用操作限制在最小作用域,以避免后续调用 send() 或 err() 时发生冲突
958                                let last_sent_op;
959                                {
960                                    last_sent_op = self.last_sent_command.borrow().operation;
961                                } // 不可变借用在此结束
962
963                                match last_sent_op {
964                                    Operation::Empty => {
965                                        // 收到对 EMPTY 的 ACKNO,参数传输结束,发送 ENDTR
966                                        self.stage.set(Stage::ParameterSent);
967                                        return send(Command {
968                                            msg_id: next_msg_id_for_send(),
969                                            operation: Operation::EndTransaction,
970                                            object: None,
971                                            data: None,
972                                        });
973                                    }
974                                    Operation::Data => {
975                                        // 收到对 SDATA 的 ACKNO
976                                        let param_data_len = self.data_param.borrow().len() as u64;
977                                        if self.sending_data_progress.get() < param_data_len {
978                                            // 还有参数数据需要发送
979                                            let (data_chunk, _is_last) =
980                                                match self.slice_data(Role::Host) {
981                                                    Ok(d) => d,
982                                                    Err(e) => {
983                                                        reset_transaction_state();
984                                                        return err(e);
985                                                    }
986                                                };
987                                            return send(Command {
988                                                msg_id: next_msg_id_for_send(),
989                                                operation: Operation::Data,
990                                                object: Some(
991                                                    self.root_operation.get().to_name().to_string(),
992                                                ),
993                                                data: Some(data_chunk),
994                                            });
995                                        } else {
996                                            // 参数数据已全部发送完毕,发送 ENDTR
997                                            self.stage.set(Stage::ParameterSent);
998                                            return send(Command {
999                                                msg_id: next_msg_id_for_send(),
1000                                                operation: Operation::EndTransaction,
1001                                                object: None,
1002                                                data: None,
1003                                            });
1004                                        }
1005                                    }
1006                                    _ => {
1007                                        return err(
1008                                            "Host received ACKNO for unexpected command in SendingParameter stage",
1009                                        );
1010                                    }
1011                                }
1012                            }
1013                            Role::Device => {
1014                                // Device 等待 SDATA 或 ENDTR
1015                                if recv.operation == Operation::Data {
1016                                    if let Some(ref data_vec) = recv.data {
1017                                        self.data_param.borrow_mut().extend_from_slice(data_vec);
1018                                    }
1019                                    return ack(recv.msg_id, recv.operation);
1020                                } else if recv.operation == Operation::EndTransaction {
1021                                    self.stage.set(Stage::ParameterSent);
1022                                    return ack(recv.msg_id, recv.operation);
1023                                } else {
1024                                    return err(
1025                                        "Device expected DATA or ENDTR in SendingParameter stage",
1026                                    );
1027                                }
1028                            }
1029                            Role::Idle => {
1030                                panic!("Role cannot be Idle if Stage is SendingParameter")
1031                            }
1032                        }
1033                    }
1034                    Stage::ParameterSent => {
1035                        /* Host -> 收到对 ENDTR 的 ACK,发送 QUERY。等待回传数据或 AWAIT 保活。
1036                        Device -> 收到 QUERY,执行逻辑,处理保活和/或回传数据。 */
1037                        match self.role.get() {
1038                            Role::Host => match recv.operation {
1039                                Operation::Acknowledge => {
1040                                    self.status.set(Status::Other); // ACK received
1041                                    if Some(String::from("ENDTR")) == recv.object {
1042                                        return send(Command {
1043                                            msg_id: util::msg_id::increment(recv.msg_id),
1044                                            operation: Operation::Query,
1045                                            object: None,
1046                                            data: None,
1047                                        });
1048                                    } else if Some(String::from("QUERY")) == recv.object {
1049                                        return None;
1050                                    } else {
1051                                        return err(
1052                                            "Host: Unexpected ACK object in ParameterSent stage",
1053                                        );
1054                                    }
1055                                }
1056                                Operation::Await => {
1057                                    return ack(recv.msg_id, recv.operation);
1058                                }
1059                                Operation::Return => {
1060                                    if Some(String::from("EMPTY")) == recv.object
1061                                        || Some(self.root_operation.get().to_name().to_string())
1062                                            == recv.object
1063                                    {
1064                                        self.stage.set(Stage::SendingResponse);
1065                                        return ack(recv.msg_id, recv.operation);
1066                                    }
1067                                }
1068                                _ => {
1069                                    return err("Should be ACKNO, AWAIT or RETURN");
1070                                }
1071                            },
1072                            Role::Device => {
1073                                if recv.operation == Operation::Query {
1074                                    // 开始执行逻辑,然后 ACK
1075                                    match self.root_operation.get() {
1076                                        Operation::GetVersion => {
1077                                            self.data_return.replace(
1078                                                self.config.pk_version.as_bytes().to_vec(),
1079                                            );
1080                                            self.stage.set(Stage::SendingResponse);
1081                                        }
1082                                        Operation::RequireVariable => {
1083                                            let key = match self
1084                                                .root_object
1085                                                .borrow()
1086                                                .as_ref()
1087                                                .cloned()
1088                                            {
1089                                                Some(k) => k,
1090                                                None => {
1091                                                    // This check should ideally be when root_op was received
1092                                                    reset_transaction_state();
1093                                                    return err(
1094                                                        "Internal: Missing object name for REQUV.",
1095                                                    );
1096                                                }
1097                                            };
1098                                            self.data_return.replace(
1099                                                self.variable_accessor.get(key).unwrap_or(vec![]),
1100                                            );
1101                                            self.stage.set(Stage::SendingResponse);
1102                                        }
1103                                        Operation::SendVariable => {
1104                                            let key = match self
1105                                                .root_object
1106                                                .borrow()
1107                                                .as_ref()
1108                                                .cloned()
1109                                            {
1110                                                Some(k) => k,
1111                                                None => {
1112                                                    // This check should ideally be when root_op was received
1113                                                    reset_transaction_state();
1114                                                    return err(
1115                                                        "Internal: Missing object name for SENDV.",
1116                                                    );
1117                                                }
1118                                            };
1119                                            self.data_return.replace(
1120                                                if let Err(e) = self
1121                                                    .variable_accessor
1122                                                    .set(key, self.data_param.borrow().clone())
1123                                                {
1124                                                    e.as_bytes().to_vec()
1125                                                } else {
1126                                                    vec![]
1127                                                },
1128                                            );
1129                                            self.stage.set(Stage::SendingResponse); // Note: SENDV error reporting via data_return
1130                                        }
1131                                        Operation::Invoke => {
1132                                            self.device_op_pending.set(true);
1133                                            self.device_await_deadline.set(Some(
1134                                                Instant::now() + self.config.await_interval,
1135                                            ));
1136                                            // The object for INVOK is self.root_object, not from QUERY (recv.object)
1137                                            let method_name = match self
1138                                                .root_object
1139                                                .borrow()
1140                                                .as_ref()
1141                                                .cloned()
1142                                            {
1143                                                Some(name) => name,
1144                                                None => {
1145                                                    reset_transaction_state();
1146                                                    return err(
1147                                                        "Internal: Missing method name for INVOK",
1148                                                    );
1149                                                }
1150                                            };
1151                                            match self
1152                                                .method_accessor
1153                                                .call(method_name, self.data_param.borrow().clone())
1154                                            {
1155                                                Ok(pollable) => {
1156                                                    self.pending_pollable.replace(Some(pollable));
1157                                                }
1158                                                Err(_) => {
1159                                                    reset_transaction_state();
1160                                                    // log::error!("Failed to create INVOK pollable: {}", e_str);
1161                                                    return err(
1162                                                        "Failed to initiate INVOK operation",
1163                                                    );
1164                                                }
1165                                            }
1166                                        }
1167                                        _ => {
1168                                            reset_transaction_state();
1169                                            return err("Not a root operation");
1170                                        }
1171                                    }
1172                                    self.stage.set(Stage::SendingResponse);
1173                                    self.device_should_return.set(true);
1174                                    return ack(recv.msg_id, recv.operation);
1175                                }
1176                            }
1177                            Role::Idle => {
1178                                panic!("Role cannot be Idle if Stage is ParameterSent")
1179                            }
1180                        }
1181                    }
1182                    Stage::SendingResponse => {
1183                        /* Host -> 接收数据。
1184                        Device -> 收到对 RTURN/SDATA 的 ACK,继续发送数据或终止 */
1185                        match self.role.get() {
1186                            Role::Host => {
1187                                // Host 等待 SDATA 或 ENDTR
1188                                if recv.operation == Operation::Data {
1189                                    // Host receives SDATA from Device
1190                                    if let Some(ref data_vec) = recv.data {
1191                                        self.data_return.borrow_mut().extend_from_slice(data_vec);
1192                                    }
1193                                    return ack(recv.msg_id, recv.operation);
1194                                } else if recv.operation == Operation::EndTransaction {
1195                                    let endtr_ack = ack(recv.msg_id, recv.operation);
1196                                    self.stage.set(Stage::Idle);
1197                                    self.status.set(Status::Other); // After sending ACK, status is Other
1198                                    return endtr_ack;
1199                                } else {
1200                                    return err(
1201                                        "Host expected SDATA or ENDTR in SendingResponse stage",
1202                                    );
1203                                }
1204                            }
1205                            Role::Device => {
1206                                // Device 必须是收到了 ACKNO
1207                                if recv.operation != Operation::Acknowledge {
1208                                    return err("Device expected ACKNO in SendingResponse stage");
1209                                }
1210                                self.status.set(Status::Other);
1211
1212                                // 将借用操作限制在最小作用域
1213                                let last_sent_op;
1214                                {
1215                                    last_sent_op = self.last_sent_command.borrow().operation;
1216                                } // 不可变借用在此结束
1217
1218                                match last_sent_op {
1219                                    Operation::Return => {
1220                                        // 收到对 RETURN 的 ACKNO
1221                                        let return_data_len =
1222                                            self.data_return.borrow().len() as u64;
1223                                        if return_data_len == 0 {
1224                                            // 没有返回值,直接发送 ENDTR
1225                                            // self.stage.set(Stage::Idle); // Transaction ends
1226                                            // REMOVE: Do not set to Idle yet, wait for ENDTR's ACKNO
1227                                            return send(Command {
1228                                                msg_id: next_msg_id_for_send(),
1229                                                operation: Operation::EndTransaction,
1230                                                object: None,
1231                                                data: None,
1232                                            });
1233                                        } else {
1234                                            // 有返回值
1235                                            let (data_chunk, _) =
1236                                                match self.slice_data(Role::Device) {
1237                                                    Ok(d) => d,
1238                                                    Err(e) => {
1239                                                        reset_transaction_state();
1240                                                        return err(e);
1241                                                    }
1242                                                };
1243
1244                                            return send(Command {
1245                                                msg_id: next_msg_id_for_send(),
1246                                                operation: Operation::Data,
1247                                                object: Some(
1248                                                    self.root_operation.get().to_name().to_string(),
1249                                                ),
1250                                                data: Some(data_chunk),
1251                                            });
1252                                        }
1253                                    }
1254                                    Operation::Data => {
1255                                        if self.sending_data_progress.get()
1256                                            < self.data_return.borrow().len() as u64
1257                                        {
1258                                            let (data_chunk, _) =
1259                                                match self.slice_data(Role::Device) {
1260                                                    Ok(d) => d,
1261                                                    Err(e) => {
1262                                                        reset_transaction_state();
1263                                                        return err(e);
1264                                                    }
1265                                                };
1266                                            return send(Command {
1267                                                msg_id: next_msg_id_for_send(),
1268                                                operation: Operation::Data,
1269                                                object: Some(
1270                                                    self.root_operation.get().to_name().to_string(),
1271                                                ),
1272                                                data: Some(data_chunk),
1273                                            });
1274                                        } else {
1275                                            return send(Command {
1276                                                msg_id: next_msg_id_for_send(),
1277                                                operation: Operation::EndTransaction,
1278                                                object: None,
1279                                                data: None,
1280                                            });
1281                                        }
1282                                    }
1283                                    Operation::EndTransaction => {
1284                                        self.role.set(Role::Idle);
1285                                        self.stage.set(Stage::Idle);
1286                                        // reset_transaction_state();
1287                                        // 结束后不清理数据,考虑到外部可能手动获取,这里就不操心了
1288                                        return None;
1289                                    }
1290                                    Operation::Await => {
1291                                        // Device received ACKNO AWAIT
1292                                        // self.status is Other. Device continues pending op.
1293                                        return None;
1294                                    }
1295                                    _ => {
1296                                        return err(
1297                                            "Device received ACKNO for unexpected command in SendingResponse stage",
1298                                        );
1299                                    }
1300                                }
1301                            }
1302                            _ => {
1303                                panic!("Role cannot be Idle if Stage is SendingResponse")
1304                            }
1305                        }
1306                    }
1307                }
1308            }
1309        }
1310        None
1311    }
1312
1313    /// Initiates a new root operation from the Host side.
1314    ///
1315    /// This starts a new transaction chain. It can only be called when the state machine is `Idle`.
1316    /// The actual protocol exchange (beginning with a `START` packet) is driven by subsequent [`poll()`](crate::PkCommand::poll) calls.
1317    ///
1318    /// # Arguments
1319    /// * `operation`: The root operation to perform (`SENDV`, `REQUV`, `INVOK`, or `PKVER`).
1320    /// * `object`: The target name (e.g., variable name for `REQUV`, method name for `INVOK`).
1321    /// * `data`: Optional parameter data (e.g., the value to set for `SENDV`).
1322    ///
1323    /// # Returns
1324    /// - `Ok(())`: The transaction was successfully queued.
1325    /// - `Err(&'static str)`: The request was invalid (e.g., already in a transaction, not a root op).
1326    pub fn perform(
1327        &self,
1328        operation: Operation,
1329        object: Option<String>,
1330        data: Option<Vec<u8>>,
1331    ) -> Result<(), &'static str> {
1332        if operation.is_root()
1333            && self.stage.get() == Stage::Idle
1334            && self.status.get() == Status::Other
1335            && self.role.get() == Role::Idle
1336        {
1337            self.root_operation.set(operation);
1338            self.root_object.replace(object);
1339            self.data_param.replace(data.unwrap_or(vec![]));
1340            self.role.set(Role::Host);
1341            self.stage.set(Stage::Started);
1342            self.status.set(Status::Other);
1343            Ok(())
1344        } else if !operation.is_root() {
1345            Err("Cannot initiate a non-root operation")
1346        } else {
1347            Err("Cannot initiate an operation when the transaction is in progress")
1348        }
1349    }
1350
1351    fn reset_transaction_state(&self) {
1352        self.stage.set(Stage::Idle);
1353        self.status.set(Status::Other);
1354        self.role.set(Role::Idle);
1355        // Clear other relevant fields like root_operation, data_param, data_return, device_op_pending etc.
1356        self.data_param.borrow_mut().clear();
1357        self.data_return.borrow_mut().clear();
1358        self.sending_data_progress.set(0);
1359        self.device_op_pending.set(false);
1360        self.device_await_deadline.set(None);
1361        self.pending_pollable.borrow_mut().take(); // Clear the pollable
1362    }
1363
1364    /// Returns `true` if the state machine is currently [`Idle`](crate::types::Stage::Idle) (no active transaction).
1365    pub fn is_complete(&self) -> bool {
1366        self.stage.get() == Stage::Idle
1367    }
1368
1369    /// Retrieves the return data from a finished transaction and resets the transaction state.
1370    ///
1371    /// This should be called by the Host after [`is_complete()`](crate::PkCommand::is_complete) returns `true` for a root
1372    /// operation that expects return data (e.g., `REQUV` or `INVOK`).
1373    ///
1374    /// # Returns
1375    /// - `Some(Vec<u8>)`: The returned payload.
1376    /// - `None`: If there was no data or the state machine is not in a completed host state.
1377    pub fn get_return_data(&self) -> Option<Vec<u8>> {
1378        if self.stage.get() == Stage::Idle && self.role.get() == Role::Host {
1379            let data = self.data_return.borrow().clone();
1380            self.reset_transaction_state();
1381            if data.is_empty() {
1382                None
1383            } else {
1384                Some(data.clone())
1385            }
1386        } else {
1387            None // Not in a state to provide return data
1388        }
1389    }
1390
1391    /// Checks for transaction completion and executes a callback with the resulting data.
1392    ///
1393    /// This is a convenience method for polling for completion on the Host side.
1394    /// If the transaction is complete, it calls the `callback` with the return data
1395    /// (if any) and returns `true`.
1396    ///
1397    /// # Note
1398    ///
1399    /// This function is also poll-based. You should also call it regularly (e.g., in your main loop), and when the transaction completes, it would call the provided function.
1400    ///
1401    /// # Returns
1402    /// `true` if the transaction was complete and the callback was executed.
1403    ///
1404    /// # Example
1405    /// ```no_run
1406    /// use pk_command::{PkCommand, PkCommandConfig, PkHashmapVariable, PkHashmapMethod};
1407    ///
1408    /// let config = PkCommandConfig::default(64);
1409    /// let vars = PkHashmapVariable::new(vec![]);
1410    /// let methods = PkHashmapMethod::new(vec![]);
1411    /// let pk = PkCommand::<_, _, std::time::Instant>::new(config, vars, methods);
1412    /// # let transport = pk_command::doc_util::Transport::new();
1413    ///
1414    /// loop {
1415    ///     // 1. Receive data from transport...
1416    ///     if let Some(received_data) = transport.recv() {
1417    ///         pk.incoming_command(received_data);
1418    ///     }
1419    ///
1420    ///     // 3. perform some operation
1421    ///     # let some_condition=true;
1422    ///     # let operation= pk_command::types::Operation::RequireVariable;
1423    ///     # let object=None;
1424    ///     # let data=None;
1425    ///     if some_condition && pk.is_complete() {
1426    ///         pk.perform(operation, object, data).unwrap();
1427    ///     }
1428    ///
1429    ///     // 4. poll
1430    ///     let cmd=pk.poll();
1431    ///
1432    ///     // 5. check for completion and handle return data
1433    ///     //    We can see that this function is poll-based as well, and should be called regularly. (typically
1434    ///     //    right after calling `poll()`) When the transaction completes, it would call the provided function
1435    ///     //    with the return data (if any).
1436    ///     let mut should_break=false;
1437    ///     pk.wait_for_complete_and(|data_opt| {
1438    ///         println!("Transaction complete! Return data: {:?}", data_opt);
1439    ///         should_break=true;
1440    ///     });
1441    ///
1442    ///     // 6. Send cmd back via transport...
1443    ///     if let Some(cmd_to_send) = cmd {
1444    ///         transport.send(cmd_to_send.to_bytes());
1445    ///     }
1446    ///
1447    ///     // 7. break if needed
1448    ///     if should_break {
1449    ///         break;
1450    ///     }
1451    /// }
1452    /// ```
1453    pub fn wait_for_complete_and<F>(&self, callback: F) -> bool
1454    where
1455        F: FnOnce(Option<Vec<u8>>),
1456    {
1457        // 这个函数也是轮询的,用来给 Host 方返回值(因为在上面的 perform 中并没有告诉 PK 该怎么处理返回值)
1458        if self.stage.get() == Stage::Idle {
1459            let data = self.data_return.borrow().clone();
1460            self.reset_transaction_state();
1461            callback(if data.is_empty() { None } else { Some(data) });
1462            true
1463        } else {
1464            false
1465        }
1466    }
1467
1468    /// Creates a new [`PkCommand`] state machine.
1469    ///
1470    /// # Arguments
1471    /// * `config`: Configuration defining timeouts and packet limits.
1472    /// * `variable_accessor`: Provider for reading/writing variables.
1473    /// * `method_accessor`: Provider for invoking methods.
1474    ///
1475    /// # Note
1476    ///
1477    /// ## The Instant Type Parameter
1478    ///
1479    /// The `Instant` type parameter must implement [`PkInstant`], [`Copy`], [`PartialOrd`], and [`Add<Duration>`].
1480    /// We provide a default implementation for [`std::time::Instant`] in `std` environments,
1481    /// so that could be used directly.
1482    ///
1483    /// For `no_std` environments, users can implement their own [`PkInstant`] and use it here.
1484    ///
1485    /// ## The VA, MA Type Parameters
1486    ///
1487    /// In `std` environments, the library provides two convenient implementations for variable
1488    /// and method accessors: [`PkHashmapVariable`] and [`PkHashmapMethod`], which use
1489    /// [`HashMap`](std::collections::HashMap)s internally. You can use them directly or implement
1490    /// your own if you have different storage needs.
1491    ///
1492    /// In `no_std` environments, you must provide your own implementations of `VA`, `MA`,
1493    /// and `Instant`.
1494    ///
1495    /// ## For Embassy Users
1496    ///
1497    /// The library provides `no_std` utilities, based on the Embassy ecosystem,
1498    /// to help you integrate PK Command into a Embassy-based application.
1499    /// They are:
1500    ///
1501    /// - [`EmbassyInstant`] for `Instant`.
1502    /// - [`EmbassyPollable`](crate::embassy_adapter::EmbassyPollable) for [`Pollable`], and [`embassy_method_accessor!`](crate::embassy_method_accessor) for [`PkMethodAccessor`].
1503    ///
1504    /// Unfortunately still, you may need to provide your own implementation of [`PkVariableAccessor`].
1505    ///
1506    /// # Example
1507    /// ```
1508    /// use pk_command::{PkCommand, PkCommandConfig, PkHashmapVariable, PkHashmapMethod};
1509    ///
1510    /// // The third type parameter here is the PkInstant type. This can't be usually inferred so you must
1511    /// // specify it explicitly. If you are using std, just use std::time::Instant. If you are using no_std,
1512    /// //  implement your own PkInstant and specify it here.
1513    /// let pk = PkCommand::<_, _, std::time::Instant>::new(
1514    ///     PkCommandConfig::default(64),
1515    ///     PkHashmapVariable::new(vec![]),
1516    ///     PkHashmapMethod::new(vec![]),
1517    /// );
1518    /// ```
1519    pub fn new(config: PkCommandConfig, variable_accessor: VA, method_accessor: MA) -> Self {
1520        PkCommand {
1521            stage: Cell::new(Stage::Idle),
1522            status: Cell::new(Status::Other),
1523            role: Cell::new(Role::Idle),
1524            last_sent_command: RefCell::new(Command {
1525                msg_id: 0,
1526                operation: Operation::Empty,
1527                object: None,
1528                data: None,
1529            }),
1530            last_sent_msg_id: Cell::new(0),
1531            last_received_msg_id: Cell::new(0),
1532            data_param: RefCell::new(vec![]),
1533            data_return: RefCell::new(vec![]),
1534            sending_data_progress: Cell::new(0),
1535            root_operation: Cell::new(Operation::Empty),
1536            root_object: RefCell::new(None),
1537            command_buffer: RefCell::new(Command {
1538                msg_id: 0,
1539                operation: Operation::Empty,
1540                object: None,
1541                data: None,
1542            }),
1543            command_processed: Cell::new(true),
1544            last_command_time: Cell::new(Instant::now()),
1545            device_op_pending: Cell::new(false),
1546            device_await_deadline: Cell::new(None),
1547            config,
1548            variable_accessor,
1549            method_accessor,
1550            pending_pollable: RefCell::new(None),
1551            device_should_return: Cell::new(false),
1552        }
1553    }
1554}