pk_command/lib.rs
1//! # PK Command
2//!
3//! A lightweight, reliable data transfer protocol designed for constrained channels
4//! (e.g., HID, Serial) between a **Host** and an **Embedded Device**.
5//!
6//! ## Key Features
7//! - **Reliability**: Built-in ACK/Retransmission mechanism.
8//! - **Efficiency**: Fixed-length headers and data slicing for small MTUs.
9//! - **Flexibility**: Supports variable access (GET/SET) and remote method invocation (INVOK).
10//! - **no_std Support**: Core logic is compatible with embedded systems without an OS. **Note that `alloc` is still used in no_std environments.**
11//! - **Wait Mechanism**: Keep-alive `AWAIT` packets for long-running operations.
12//!
13//! ## Architecture
14//! The protocol operates using **Transaction Chains**. A chain starts with a `START`
15//! command and ends with `ENDTR`. Large payloads are automatically sliced into
16//! `SDATA` packets.
17//!
18//! ## Command Format
19//! Every command follows the fixed layout:
20//!
21//! `[MSG ID][OPERATION NAME] [OBJECT] [DATA]`.
22//!
23//! - `MSG ID`: A base-94 encoded unsigned integer (using ASCII characters from '!' to '~') that uniquely identifies the command within a transaction.
24//! - `OPERATION NAME`: A 5-character ASCII string representing the command type. All the operations defined in the specification are defined in this library. (See [`types::Operation`] for details.)
25//! - `OBJECT`: An optional 5-character ASCII string that provides additional context for the operation (e.g., variable name, method name).
26//! - `DATA`: An optional binary payload that carries parameters or return values. It can be of arbitrary length and may contain any byte values.
27//!
28//! See [`Command`] for a structured representation of commands and utilities for parsing and serialization.
29//!
30//! Note that:
31//! - `OBJECT` is either omitted, or **exactly 5 ASCII characters**.
32//! - `DATA` (payload) is **arbitrary binary data**.
33//! - The total length of a command is limited by the transportation layer's MTU (e.g., 64 bytes for HID). (See [`PkCommandConfig`].)
34//! But the protocol handles slicing and reassembly of large payloads automatically, so you can work with large data without worrying about the underlying transport constraints.
35//!
36//! ## Example
37//! ```no_run
38//! use pk_command::{PkCommand, PkCommandConfig, PkHashmapMethod, PkHashmapVariable};
39//!
40//! // 1. Setup configuration and accessors
41//! let config = PkCommandConfig::default(64);
42//! let vars = PkHashmapVariable::new(vec![]);
43//! let methods = PkHashmapMethod::new(vec![]);
44//!
45//! // 2. Initialize the state machine
46//! let pk = PkCommand::<_, _, std::time::Instant>::new(config, vars, methods);
47//! # let transport=pk_command::doc_util::Transport::new();
48//!
49//! // 3. Basic loop driving the protocol
50//! loop {
51//! // Handle received bytes from your transport (HID/Serial/etc.)
52//! if let Some(bytes) = transport.recv() {
53//! pk.incoming_command(bytes);
54//! }
55//!
56//! // Process and get commands to send back
57//! if let Some(cmd) = pk.poll() {
58//! transport.send(cmd.to_bytes());
59//! }
60//!
61//! if pk.is_complete() {
62//! break;
63//! }
64//! }
65//! ```
66//!
67//! # Feature flags
68//! - `std`: Enables features that require the Rust standard library. (Mainly the convenient wrappers like [`PkPromise`], [`PkHashmapVariable`], [`PkHashmapMethod`]) **Enabled by default.**
69//! - `embassy`: Enables integration with the [Embassy](https://embassy.dev/) async framework. Flags below are also enabled when this is active:
70//! - `embassy-time`: Enables the support for [embassy-time](https://crates.io/crates/embassy-time) crate, which provides timekeeping utilities for embedded environments.
71//! - `embassy-runtime`: Enables the support for [embassy-executor](https://crates.io/crates/embassy-executor) crate, which provides integration between the main state machine and Embassy async tasks.
72//! - `tokio-runtime`: Enables integration with the [Tokio](https://tokio.rs/) async runtime. Provides [`tokio_adapter`] for running async operations within method implementations. Requires `std` feature.
73//! - `smol-runtime`: Enables integration with the [Smol](https://github.com/smol-rs/smol) async executor. Provides [`smol_adapter`] for running async operations within method implementations. Requires `std` feature.
74
75#![warn(missing_docs)]
76#![cfg_attr(not(feature = "std"), no_std)]
77#![cfg_attr(docsrs, feature(doc_cfg))]
78
79const PK_VERSION: &str = env!("CARGO_PKG_VERSION");
80
81// Compile-time guard: async runtime adapters require `std` feature.
82#[cfg(all(
83 any(feature = "tokio-runtime", feature = "smol-runtime"),
84 not(feature = "std")
85))]
86compile_error!("Enabling 'tokio-runtime' or 'smol-runtime' requires the 'std' feature.");
87
88#[cfg(not(feature = "std"))]
89extern crate alloc;
90#[cfg(not(feature = "std"))]
91use alloc::{
92 boxed::Box,
93 string::{String, ToString},
94 vec,
95 vec::Vec,
96};
97
98#[cfg(not(feature = "std"))]
99extern crate core as std;
100
101// The items below (not gated behind the "std" feature)
102// are re-exported by `std` crate from `core`,
103// so just simply renaming `core` as `std` should work
104use std::cell::{Cell, RefCell};
105use std::ops::Add;
106use std::pin::Pin;
107use std::task::Poll;
108use std::time::Duration;
109
110/// Core data structures and types for PK Command.
111pub mod types;
112use types::{Command, Operation, Role, Stage, Status};
113
114/// Utilities used in examples.
115#[doc(hidden)]
116#[cfg(feature = "doc")]
117pub mod doc_util;
118
119mod util;
120#[cfg_attr(docsrs, doc(cfg(feature = "embassy-runtime")))]
121#[cfg(feature = "embassy-runtime")]
122pub use util::async_adapters::embassy as embassy_adapter;
123#[cfg_attr(docsrs, doc(cfg(all(feature = "std", feature = "smol-runtime"))))]
124#[cfg(all(feature = "std", feature = "smol-runtime"))]
125pub use util::async_adapters::smol as smol_adapter;
126#[cfg(all(feature = "std", feature = "tokio-runtime"))]
127#[cfg_attr(docsrs, doc(cfg(all(feature = "std", feature = "tokio-runtime"))))]
128pub use util::async_adapters::tokio as tokio_adapter;
129#[cfg(feature = "std")]
130#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
131pub use util::{PkHashmapMethod, PkHashmapVariable, PkPromise, msg_id};
132
133/// Trait defining how to access (get/set) variables by their string key.
134///
135/// This allows the [`PkCommand`] state machine to be generic over the actual variable storage.
136/// In `std` environments, a convenient implementation using [`std::collections::HashMap`] is provided as [`PkHashmapVariable`].
137/// In `no_std` environments, you must provide your own implementation.
138///
139/// # Example
140/// ```
141/// use pk_command::PkVariableAccessor;
142///
143/// struct MyVariableStore;
144/// impl PkVariableAccessor for MyVariableStore {
145/// fn get(&self, key: String) -> Option<Vec<u8>> {
146/// if key == "VERSION" {
147/// Some(b"1.0.0".to_vec())
148/// } else {
149/// None
150/// }
151/// }
152/// fn set(&self, key: String, value: Vec<u8>) -> Result<(), String> {
153/// // Logic to store the value
154/// Ok(())
155/// }
156/// }
157/// ```
158pub trait PkVariableAccessor {
159 /// Retrieves the value of a variable.
160 ///
161 /// # Arguments
162 /// * `key`: The name of the variable to retrieve.
163 ///
164 /// # Returns
165 /// `Some(Vec<u8>)` containing the variable's data if found, or `None` otherwise.
166 fn get(&self, key: String) -> Option<Vec<u8>>;
167
168 /// Sets the value of a variable.
169 ///
170 /// # Arguments
171 /// * `key`: The name of the variable to set.
172 /// * `value`: The new data for the variable.
173 ///
174 /// # Returns
175 /// `Ok(())` if successful, or an `Err(String)` describing the error.
176 fn set(&self, key: String, value: Vec<u8>) -> Result<(), String>;
177}
178
179/// A handle for a long-running operation that can be polled for completion.
180///
181/// This is used primarily by the `INVOK` operation. Since PK Command is designed
182/// for poll-based environments (like in embedded systems, or within an async runtime),
183/// methods that take time to execute should return a [`Pollable`].
184///
185/// The state machine will call [`poll()`](crate::Pollable::poll) periodically and send `AWAIT` packets
186/// to the host to keep the transaction alive as long as [`Poll::Pending`] is returned.
187pub trait Pollable {
188 /// Polls the operation for completion.
189 ///
190 /// # Returns
191 /// - `Poll::Ready(Ok(Some(data)))`: Operation finished with result data.
192 /// - `Poll::Ready(Ok(None))`: Operation finished successfully with no data.
193 /// - `Poll::Ready(Err(e))`: Operation failed with an error message.
194 /// - `Poll::Pending`: Operation is still in progress.
195 fn poll(&self) -> std::task::Poll<Result<Option<Vec<u8>>, String>>;
196}
197
198/// Trait defining how to invoke methods by their string key.
199///
200/// This allows the [`PkCommand`] state machine to call arbitrary logic on the device.
201/// In `std` environments, a convenient implementation using [`std::collections::HashMap`] is provided as [`PkHashmapMethod`].
202/// In `no_std` environments, you must provide your own implementation.
203///
204/// # Example
205/// ```
206/// use pk_command::{PkMethodAccessor, Pollable};
207/// use std::pin::Pin;
208/// use std::task::Poll;
209///
210/// struct MyMethod;
211/// impl Pollable for MyMethod {
212/// fn poll(&self) -> Poll<Result<Option<Vec<u8>>, String>> {
213/// Poll::Ready(Ok(Some(b"Hello from PK!".to_vec())))
214/// }
215/// }
216///
217/// struct MyMethodStore;
218/// impl PkMethodAccessor for MyMethodStore {
219/// fn call(&self, key: String, param: Vec<u8>) -> Result<Pin<Box<dyn Pollable>>, String> {
220/// if key == "GREET" {
221/// Ok(Box::pin(MyMethod))
222/// } else {
223/// Err("Method not found".to_string())
224/// }
225/// }
226/// }
227/// ```
228pub trait PkMethodAccessor {
229 /// Calls a method with the given parameters.
230 ///
231 /// # Arguments
232 /// * `key`: The name of the method to call.
233 /// * `param`: The parameters for the method, as a byte vector.
234 ///
235 /// # Returns
236 /// A `Result` containing a pinned, boxed `Pollable` that will resolve to the method's output,
237 /// or an `Err(String)` if the method call cannot be initiated.
238 fn call(&self, key: String, param: Vec<u8>) -> Result<Pin<Box<dyn Pollable>>, String>;
239}
240
241/// Trait representing an instant in time.
242///
243/// This trait abstracts over the [`std::time::Instant`] to support `no_std` environments
244/// where a custom timer implementation might be needed. In `std` environments, using [`std::time::Instant`] is recommended.
245///
246/// # Note
247///
248/// If you want to provide a custom implementation to be used in [`PkCommand`], besides implementing this trait,
249/// you also need to ensure that `Add<Duration, Output = Instant>`, `PartialOrd`, and `Copy` are implemented for your type.
250///
251/// # Tips
252/// If you are using Embassy on your embedded device, you can use the provided [`EmbassyInstant`] adapter, which wraps [`embassy_time::Instant`] and implements the necessary traits for compatibility with [`PkCommand`].
253pub trait PkInstant
254where
255 Self: Sized,
256{
257 /// Returns the current instant.
258 fn now() -> Self;
259 /// Returns the duration elapsed since this instant.
260 fn elapsed(&self) -> Duration;
261}
262
263#[cfg(feature = "std")]
264impl PkInstant for std::time::Instant {
265 fn now() -> Self {
266 std::time::Instant::now()
267 }
268
269 fn elapsed(&self) -> Duration {
270 std::time::Instant::elapsed(self)
271 }
272}
273
274/// A [`PkInstant`] adapter for [`embassy_time::Instant`].
275///
276///
277///
278/// This type bridges the PK Command timing abstraction with the
279/// [`embassy-time`](https://crates.io/crates/embassy-time) crate, enabling timeout handling in `no_std`/embedded
280/// environments that use Embassy.
281///
282/// # Availability
283/// Enabled when the `embassy` feature is active (it also enables `embassy-time`).
284///
285/// # Why this wrapper exists
286/// [`embassy_time::Instant`] is tied to [`embassy_time::Duration`], while
287/// PK Command uses [`core::time::Duration`]. These durations can be converted,
288/// but their types are not directly compatible with the `PkCommand` signature,
289/// so a newtype adapter keeps the API consistent without large generic changes.
290///
291/// # Notes
292/// - Implements [`Add<Duration>`] with millisecond precision.
293/// - [`elapsed()`](crate::EmbassyInstant::elapsed) guards against clock rollback by returning `0` if the
294/// current instant is earlier than the stored instant.
295///
296/// # Example
297///
298/// ```
299/// use pk_command::{
300/// EmbassyInstant, PkCommand, PkCommandConfig, PkHashmapMethod, PkHashmapVariable,
301/// };
302///
303/// // embassy_timer::Instant must be constructed within an Embassy context
304/// // otherwise the code would not compile.
305/// #[embassy_executor::task]
306/// async fn example() {
307/// let pk = PkCommand::<_, _, EmbassyInstant>::new(
308/// PkCommandConfig::default(64),
309/// PkHashmapVariable::new(vec![]),
310/// PkHashmapMethod::new(vec![]),
311/// );
312/// }
313/// ```
314///
315/// # See also
316///
317/// To use PK Command within an Embassy context, you might also want to have a
318/// look at [`embassy_adapter`].
319///
320#[cfg(feature = "embassy-time")]
321#[cfg_attr(docsrs, doc(cfg(feature = "embassy-time")))]
322#[derive(Clone, Copy, PartialOrd, PartialEq, Eq, Ord)]
323pub struct EmbassyInstant(embassy_time::Instant);
324
325#[cfg(feature = "embassy-time")]
326impl EmbassyInstant {
327 fn into_inner(self) -> embassy_time::Instant {
328 self.0
329 }
330}
331
332#[cfg(feature = "embassy-time")]
333impl Add<Duration> for EmbassyInstant {
334 type Output = EmbassyInstant;
335
336 fn add(self, rhs: Duration) -> Self::Output {
337 EmbassyInstant(self.0 + embassy_time::Duration::from_millis(rhs.as_millis() as u64))
338 }
339}
340
341#[cfg(feature = "embassy-time")]
342impl From<embassy_time::Instant> for EmbassyInstant {
343 fn from(inst: embassy_time::Instant) -> Self {
344 EmbassyInstant(inst)
345 }
346}
347
348#[cfg(feature = "embassy-time")]
349impl PkInstant for EmbassyInstant {
350 fn now() -> Self {
351 embassy_time::Instant::now().into()
352 }
353
354 fn elapsed(&self) -> core::time::Duration {
355 let now = embassy_time::Instant::now();
356 if now >= self.into_inner() {
357 (now - self.into_inner()).into()
358 } else {
359 // This case can happen if the system clock was adjusted backwards.
360 core::time::Duration::from_secs(0)
361 }
362 }
363}
364
365/// Configuration for the [`PkCommand`] state machine.
366///
367/// Use this struct to define timeout durations and packet size limits according to your
368/// transport layer's constraints (e.g., HID, Serial, etc.).
369#[derive(Clone)]
370pub struct PkCommandConfig {
371 /// Timeout duration for waiting for an `ACKNO` command. Default is 100ms.
372 ack_timeout: Duration,
373 /// Timeout duration for waiting for the next command in a sequence. Default is 500ms.
374 inter_command_timeout: Duration,
375 /// Interval at which the Device sends `AWAIT` keep-alive commands. Default is 300ms.
376 await_interval: Duration,
377 /// The maximum length of a single command packet (in bytes), including headers.
378 packet_limit: u64,
379 /// The version string of the package.
380 pk_version: &'static str,
381}
382
383impl PkCommandConfig {
384 /// Creates a [`PkCommandConfig`] with default (as recommended in the specification file) timeout values.
385 ///
386 /// # Default timeouts
387 /// - ACK timeout: 100ms
388 /// - Inter command timeout: 500ms
389 /// - `AWAIT` interval: 300ms
390 ///
391 /// # Arguments
392 /// * `packet_limit`: The maximum packet size (MTU) of the underlying transport (e.g., 64 for HID).
393 ///
394 /// # Returns
395 /// A [`PkCommandConfig`] instance with default timeouts and the specified packet limit.
396 ///
397 /// # Note
398 /// This is **not** an implementation of [`Default`] trait because `packet_limit` must be specified.
399 pub fn default(packet_limit: u64) -> Self {
400 PkCommandConfig {
401 ack_timeout: Duration::from_millis(100),
402 inter_command_timeout: Duration::from_millis(500),
403 await_interval: Duration::from_millis(300),
404 packet_limit,
405 pk_version: PK_VERSION,
406 }
407 }
408
409 /// Creates a new [`PkCommandConfig`] with custom timing and packet limit.
410 ///
411 /// # Arguments
412 /// * `ack_timeout`: Timeout for ACKs in milliseconds.
413 /// * `inter_command_timeout`: Timeout between commands in milliseconds.
414 /// * `await_interval`: Interval for sending `AWAIT` keep-alives in milliseconds.
415 /// * `packet_limit`: Maximum length of a single packet in bytes.
416 ///
417 /// # Note
418 /// To avoid undesirable behavior, you should ensure that the timeout values on both sides (Host and Device) are exactly the same.
419 pub fn new(
420 ack_timeout: u64,
421 inter_command_timeout: u64,
422 await_interval: u64,
423 packet_limit: u64,
424 ) -> Self {
425 PkCommandConfig {
426 ack_timeout: Duration::from_millis(ack_timeout),
427 inter_command_timeout: Duration::from_millis(inter_command_timeout),
428 await_interval: Duration::from_millis(await_interval),
429 packet_limit,
430 pk_version: PK_VERSION,
431 }
432 }
433}
434
435/// The main state machine for handling the PK Command protocol.
436///
437/// It manages the lifecycle of a transaction, including:
438/// - Parsing incoming raw bytes into commands.
439/// - Generating response commands (ACKs, data slices, etc.).
440/// - Handling timeouts and retransmissions.
441/// - Managing data slicing for large transfers.
442///
443/// This struct is generic over:
444/// - `VA`: A [`PkVariableAccessor`] for variable storage.
445/// - `MA`: A [`PkMethodAccessor`] for method invocation.
446/// - `Instant`: A [`PkInstant`] for time tracking (allowing for `no_std` timer implementations). Typically [`std::time::Instant`] in `std` environments, or [`EmbassyInstant`] in Embassy environments.
447///
448/// # Usage Pattern
449/// 1. Feed received data into [`incoming_command()`](crate::PkCommand::incoming_command).
450/// 2. Regularly call [`poll()`](crate::PkCommand::poll) to progress the state machine and check for commands to send.
451/// 3. If [`poll()`](crate::PkCommand::poll) returns `Some(Command)`, serialize it with [`to_bytes()`](crate::types::Command::to_bytes) and send it over your transport.
452///
453/// # Host vs Device
454///
455/// They are not actual device types, but rather roles in a transaction.
456///
457/// - **Host** is the one who calls [`perform()`](crate::PkCommand::perform) to initiate a transaction (e.g., `SENDV`, `INVOK`).
458/// - **Device** is the one who reacts against the transaction and automatically responds to incoming root commands using the provided accessors.
459///
460/// # Example
461/// ```no_run
462/// use pk_command::{PkCommand, PkCommandConfig, PkHashmapMethod, PkHashmapVariable};
463///
464/// let config = PkCommandConfig::default(64);
465/// let vars = PkHashmapVariable::new(vec![]);
466/// let methods = PkHashmapMethod::new(vec![]);
467/// let pk = PkCommand::<_, _, std::time::Instant>::new(config, vars, methods);
468/// # let transport = pk_command::doc_util::Transport::new();
469/// loop {
470/// // 1. Receive data from transport...
471/// if let Some(received_bytes) = transport.recv() {
472/// pk.incoming_command(received_bytes);
473/// }
474///
475/// // 2. Drive the state machine
476/// if let Some(cmd) = pk.poll() {
477/// let bytes = cmd.to_bytes();
478/// transport.send(bytes);
479/// }
480/// std::thread::sleep(std::time::Duration::from_millis(10));
481/// }
482/// ```
483pub struct PkCommand<VA, MA, Instant>
484where
485 VA: PkVariableAccessor,
486 MA: PkMethodAccessor,
487 Instant: PkInstant + Add<Duration, Output = Instant> + PartialOrd + Copy,
488{
489 stage: Cell<Stage>,
490 status: Cell<Status>,
491 role: Cell<Role>,
492 last_sent_command: RefCell<Command>,
493 last_sent_msg_id: Cell<u16>,
494 last_received_msg_id: Cell<u16>,
495 data_param: RefCell<Vec<u8>>,
496 data_return: RefCell<Vec<u8>>,
497 sending_data_progress: Cell<u64>,
498 root_operation: Cell<Operation>,
499 root_object: RefCell<Option<String>>,
500 command_buffer: RefCell<Command>,
501 command_processed: Cell<bool>,
502 last_command_time: Cell<Instant>,
503 device_op_pending: Cell<bool>,
504 device_await_deadline: Cell<Option<Instant>>,
505 config: PkCommandConfig,
506 variable_accessor: VA,
507 method_accessor: MA,
508 pending_pollable: RefCell<Option<Pin<Box<dyn Pollable>>>>,
509 device_should_return: Cell<bool>, // 设备是否“收到了 QUERY 但还没有返回值”
510}
511
512impl<
513 VA: PkVariableAccessor,
514 MA: PkMethodAccessor,
515 Instant: PkInstant + Add<Duration, Output = Instant> + PartialOrd + Copy,
516> PkCommand<VA, MA, Instant>
517{
518 /// Ingests a raw command received from the other party.
519 ///
520 /// This should be called whenever new bytes arrive on your transport layer. The
521 /// state machine will parse the bytes and update its internal buffers for the
522 /// next [`poll()`](crate::PkCommand::poll) cycle.
523 ///
524 /// # Arguments
525 /// * `command_bytes`: The raw bytes of the received command.
526 ///
527 /// # Returns
528 /// `Ok(())` if the command was successfully parsed and buffered.
529 /// `Err(&'static str)` if parsing failed (e.g., invalid format, unknown operation).
530 pub fn incoming_command(&self, command_bytes: Vec<u8>) -> Result<(), &'static str> {
531 match Command::parse(&command_bytes) {
532 // Pass as slice
533 Ok(parsed_command) => {
534 self.command_buffer.replace(parsed_command);
535 self.command_processed.set(false);
536 self.last_command_time.replace(Instant::now());
537 Ok(())
538 }
539 Err(e) => Err(e),
540 }
541 }
542
543 /// Slices a chunk of data from internal buffers for multipart transfer.
544 ///
545 /// This is an internal utility used during `SDATA` phases.
546 fn slice_data(&self, role: Role) -> Result<(Vec<u8>, bool), &'static str> {
547 // 如果 Role 是 Device 则默认在发送返回值,反之亦然
548 match role {
549 Role::Device => {
550 let data = self.data_return.borrow();
551 if data.is_empty() {
552 return Err("No return data to slice.");
553 }
554 let start = self.sending_data_progress.get() as usize;
555 let end =
556 std::cmp::min(start + (self.config.packet_limit - 14) as usize, data.len());
557 let is_last_packet = end == data.len();
558 self.sending_data_progress.set(end as u64);
559 Ok((data[start..end].to_vec(), is_last_packet))
560 }
561 Role::Host => {
562 let data = self.data_param.borrow();
563 if data.is_empty() {
564 return Err("No parameter data to slice.");
565 }
566 let start = self.sending_data_progress.get() as usize;
567 let end =
568 std::cmp::min(start + (self.config.packet_limit - 14) as usize, data.len());
569 let is_last_packet = end == data.len();
570 self.sending_data_progress.set(end as u64);
571 Ok((data[start..end].to_vec(), is_last_packet))
572 }
573 Role::Idle => Err("Cannot slice data in Idle role."),
574 }
575 }
576
577 /// Polls the state machine for progress and pending actions.
578 ///
579 /// See [`PkCommand`] for more details.
580 ///
581 /// This method must be called frequently in your main loop. It handles:
582 /// 1. **Processing**: Consuming commands received via `incoming_command`.
583 /// 2. **Execution**: Running device-side logic (variable access, method polling).
584 /// 3. **Protocol Flow**: Automatically generating ACKs, data slices, and ENDTRs.
585 /// 4. **Reliability**: Handling timeouts and retransmitting lost packets.
586 ///
587 /// # Returns
588 ///
589 /// - `Some(Command)`: A command that needs to be sent to the peer.
590 /// Serialize it with [`to_bytes()`](crate::types::Command::to_bytes) and transmit it.
591 /// - `None`: No action required at this time.
592 pub fn poll(&self) -> Option<Command> {
593 let next_msg_id_for_send = || util::msg_id::increment(self.last_received_msg_id.get());
594 let send = move |command: Command| -> Option<Command> {
595 self.last_command_time.set(Instant::now());
596 self.last_sent_msg_id.set(command.msg_id);
597 self.last_sent_command.replace(command.clone());
598 // 因为 ACK 的函数并没有嵌套调用这个,所以
599 self.status.set(Status::AwaitingAck);
600 Some(command)
601 };
602 let reset_transaction_state = || {
603 self.stage.set(Stage::Idle);
604 self.status.set(Status::Other);
605 self.role.set(Role::Idle);
606 // Clear other relevant fields like root_operation, data_param, data_return, device_op_pending etc.
607 self.data_param.replace(vec![]);
608 self.data_return.replace(vec![]);
609 self.sending_data_progress.set(0);
610 self.device_op_pending.set(false);
611 self.device_await_deadline.set(None);
612 self.pending_pollable.replace(None); //确保清理
613 self.device_should_return.set(false);
614 };
615 let ack = move |msg_id: u16, operation: Operation| -> Option<Command> {
616 self.last_command_time.set(Instant::now());
617 Some(Command {
618 msg_id,
619 operation: Operation::Acknowledge,
620 object: Some(operation.to_name().to_string()),
621 data: None,
622 })
623 };
624 let err = |msg: &'static str| -> Option<Command> {
625 // 在收到 ERROR 或 ACKNO ERROR 后,状态数据清零
626 // 这个逻辑在下面处理 所以这里就不写了
627 self.status.set(Status::AwaitingErrAck);
628 let command = Command {
629 msg_id: 0,
630 operation: Operation::Error,
631 object: Some(String::from("ERROR")),
632 data: Some(msg.as_bytes().to_vec()),
633 };
634 self.last_command_time.set(Instant::now());
635 self.last_sent_msg_id.set(command.msg_id);
636 self.last_sent_command.replace(command.clone());
637 Some(command)
638 };
639 // 首先检查是否有新的指令进入 command buffer
640 match self.command_processed.get() {
641 true => {
642 // Idle 则忽略当前 poll
643 if self.stage.get() == Stage::Idle {
644 return None;
645 }
646 if self.stage.get() == Stage::Started
647 && self.role.get() == Role::Host
648 && self.status.get() != Status::AwaitingAck
649 {
650 return send(Command {
651 msg_id: next_msg_id_for_send(),
652 operation: Operation::Start,
653 object: None,
654 data: None,
655 });
656 }
657 // 当设备有挂起的 INVOK 操作并且处于响应阶段时,轮询 Pollable
658 if self.role.get() == Role::Device
659 && self.device_op_pending.get()
660 && self.stage.get() == Stage::SendingResponse
661 {
662 // 如果正在等待 AWAIT 的 ACK,则不轮询主 Pollable, ACK 超时机制处理 AWAIT 的重传
663 if self.status.get() == Status::AwaitingAck {
664 // Timeout for AWAIT's ACK will be handled by the generic timeout logic below.
665 } else if self.status.get() == Status::AwaitingErrAck {
666 // This state is unlikely if a device operation is pending normally.
667 // Consider if an error should be raised or state reset.
668 } else {
669 // Status::Other, ready to poll the main INVOK pollable
670 let mut pollable_store = self.pending_pollable.borrow_mut();
671
672 if let Some(pinned_pollable) = pollable_store.as_mut() {
673 match pinned_pollable.as_mut().poll() {
674 Poll::Ready(result) => {
675 pollable_store.take(); // Remove completed pollable
676 self.device_op_pending.set(false);
677 self.device_await_deadline.set(None);
678
679 match result {
680 Ok(data_opt) => {
681 self.data_return.replace(data_opt.unwrap_or_default());
682 // Stage is already SendingResponse.
683 self.sending_data_progress.set(0); // Reset for sending return data.
684
685 let rturn_object_name =
686 if self.data_return.borrow().is_empty() {
687 String::from("EMPTY")
688 } else {
689 Operation::Invoke.to_name().to_string()
690 };
691 return send(Command {
692 msg_id: next_msg_id_for_send(),
693 operation: Operation::Return,
694 object: Some(rturn_object_name),
695 data: None,
696 });
697 }
698 Err(_) => {
699 reset_transaction_state();
700 return err("INVOK operation failed");
701 }
702 }
703 }
704 Poll::Pending => {
705 if Instant::now()
706 >= self
707 .device_await_deadline
708 .get()
709 .unwrap_or(Instant::now())
710 {
711 self.device_await_deadline
712 .set(Some(Instant::now() + self.config.await_interval));
713 return send(Command {
714 msg_id: next_msg_id_for_send(),
715 operation: Operation::Await,
716 object: None,
717 data: None,
718 });
719 }
720 }
721 }
722 } else {
723 // device_op_pending is true, but no pollable.
724 reset_transaction_state();
725 return err("Internal: Device op pending but no pollable.");
726 }
727 }
728 } // 结束 device_op_pending && Stage::SendingResponse 的处理
729 if self.device_should_return.get() {
730 self.sending_data_progress.set(0); // 重置发送进度
731 self.device_should_return.set(false);
732 // 这时候的状态应该是收到了 QUERY,还没有发送返回值
733 match self.root_operation.get() {
734 Operation::GetVersion => {
735 return send(Command {
736 msg_id: next_msg_id_for_send(),
737 operation: Operation::Return,
738 object: Some(self.root_operation.get().to_name().to_string()),
739 data: None,
740 });
741 }
742 Operation::RequireVariable => {
743 if self.data_return.borrow().is_empty() {
744 return send(Command {
745 msg_id: next_msg_id_for_send(),
746 operation: Operation::Return,
747 object: Some(String::from("EMPTY")),
748 data: None,
749 });
750 }
751 return send(Command {
752 msg_id: next_msg_id_for_send(),
753 operation: Operation::Return,
754 object: Some(self.root_operation.get().to_name().to_string()),
755 data: None,
756 });
757 }
758 Operation::SendVariable => {
759 // SENDV doesn't return data in the RTURN command itself.
760 // The result of the set operation is implicitly acknowledged by the ENDTR ACK.
761 // If there was an error during set, it would be handled by the error path.
762 // We still send RTURN EMPTY to signal the end of the Device's processing phase.
763 self.data_return.replace(vec![]); // Ensure data_return is empty
764 return send(Command {
765 msg_id: next_msg_id_for_send(),
766 operation: Operation::Return,
767 object: Some(Operation::Empty.to_name().to_string()),
768 data: None,
769 });
770 }
771 Operation::Invoke => {
772 // 忽略,因为 Invoke 的返回在上面轮询 Pollable 时处理
773 }
774 _ => {
775 panic!("Not a root operation");
776 }
777 }
778 }
779
780 // 获取当前时间来比较超时
781 let elapsed_ms = self.last_command_time.get().elapsed();
782 match self.status.get() {
783 Status::AwaitingAck | Status::AwaitingErrAck => {
784 // 等待 ACK 时则检查 ACK 超时来确认是否重传
785 if elapsed_ms >= self.config.ack_timeout {
786 return Some(self.last_sent_command.borrow().clone());
787 }
788 }
789 _ => {
790 // 仅当不在 Idle 状态且没有挂起的设备操作时检查指令间超时
791 if self.stage.get() != Stage::Idle
792 && !self.device_op_pending.get()
793 && elapsed_ms >= self.config.inter_command_timeout
794 {
795 reset_transaction_state(); // 在发送错误前重置状态
796 return err("Operation timed out");
797 }
798 }
799 }
800 }
801 // 缓冲区内有新的指令
802 false => {
803 self.command_processed.set(true);
804 self.last_received_msg_id
805 .set(self.command_buffer.borrow().msg_id); // Store received msg_id
806 let recv = self.command_buffer.borrow();
807 // 首先处理 Error 这种不被 Stage 描述的特殊情况
808 if recv.operation == Operation::Error {
809 reset_transaction_state();
810 return ack(0, Operation::Error);
811 } else if self.status.get() == Status::AwaitingErrAck {
812 if recv.operation == Operation::Acknowledge
813 && Some(String::from("ERROR")) == recv.object
814 {
815 self.status.set(Status::Other);
816 self.root_operation.set(Operation::Empty);
817 self.stage.set(Stage::Idle);
818 self.role.set(Role::Idle);
819 return None;
820 } else {
821 return err("Should be ACKNO ERROR");
822 }
823 }
824 match self.stage.get() {
825 Stage::Idle => {
826 // 在 Idle 状态下只能收到 START,且自身为 Device
827 if recv.operation != Operation::Start {
828 return err("not in a chain");
829 }
830 self.role.set(Role::Device);
831 self.stage.set(Stage::Started);
832 self.status.set(Status::Other); // Awaiting root command from Host
833 return ack(recv.msg_id, recv.operation);
834 }
835 Stage::Started => {
836 // 在 Started 状态下,根据当前角色不同,预期的行为应该是
837 // - Host -> 接收到 ACK,指示当前的根操作
838 // - Device -> 接收到根操作,进行 ACK
839 match self.role.get() {
840 Role::Host => {
841 if recv.operation == Operation::Acknowledge {
842 self.status.set(Status::Other);
843 self.stage.set(Stage::RootOperationAssigned);
844 return send(Command {
845 msg_id: next_msg_id_for_send(),
846 operation: self.root_operation.get(),
847 object: self.root_object.borrow().clone(),
848 data: None,
849 });
850 }
851 }
852 Role::Device => {
853 if recv.operation.is_root() {
854 self.root_operation.set(recv.operation);
855 // Validate if object is present for ops that require it
856 if (recv.operation == Operation::RequireVariable
857 || recv.operation == Operation::SendVariable
858 || recv.operation == Operation::Invoke)
859 && recv.object.is_none()
860 {
861 reset_transaction_state();
862 return err(
863 "Operation requires an object but none was provided.",
864 );
865 }
866 self.root_object.replace(recv.object.clone());
867 self.stage.set(Stage::RootOperationAssigned);
868 return ack(recv.msg_id, recv.operation);
869 } else {
870 return err("not a root operation");
871 }
872 }
873 _ => {
874 // 考虑代码问题,因为 Stage 已经是 Started 了,Role 不可能是 Idle
875 panic!("Role cannot be Idle if Stage is Started")
876 }
877 }
878 }
879 Stage::RootOperationAssigned => {
880 /* Host -> 接收到 ACK,**开始**传输数据。也就是说参数的*第一段*或 EMPTY 指令
881 Device -> 接收到 EMPTY 或数据的第一段
882 */
883 match self.role.get() {
884 Role::Host => {
885 if recv.operation == Operation::Acknowledge {
886 self.status.set(Status::Other);
887 self.stage.set(Stage::SendingParameter);
888 if self.data_param.borrow().is_empty() {
889 return send(Command {
890 msg_id: next_msg_id_for_send(),
891 operation: Operation::Empty,
892 object: None,
893 data: None,
894 });
895 } else {
896 match self.slice_data(Role::Host) {
897 Ok((data_chunk, _is_last)) => {
898 return send(Command {
899 msg_id: next_msg_id_for_send(),
900 operation: Operation::Data,
901 object: Some(
902 self.root_operation
903 .get()
904 .to_name()
905 .to_string(),
906 ),
907 data: Some(data_chunk),
908 });
909 }
910 Err(e) => {
911 reset_transaction_state();
912 return err(e);
913 }
914 }
915 }
916 } else {
917 return err("Should be ACKNO");
918 }
919 }
920 Role::Device => {
921 if recv.operation == Operation::Empty {
922 self.stage.set(Stage::SendingParameter);
923 return ack(recv.msg_id, recv.operation);
924 } else if recv.operation == Operation::Data {
925 self.stage.set(Stage::SendingParameter);
926 {
927 // 缩小可变借用的作用域,确保归还
928 self.data_param
929 .borrow_mut()
930 .append(&mut recv.data.as_ref().unwrap().clone());
931 }
932 return ack(recv.msg_id, recv.operation);
933 } else {
934 return err("Should be EMPTY or DATA");
935 }
936 }
937 _ => {
938 // 同上
939 panic!("Role cannot be Idle if Stage is RootOperationAssigned")
940 }
941 }
942 }
943 Stage::SendingParameter => {
944 // 此阶段:
945 // - Host: 已发送第一个参数数据包(SDATA)或 EMPTY,并收到 ACKNO。
946 // 现在需要判断是继续发送 SDATA 还是发送 ENDTR。
947 // - Device: 已收到第一个参数数据包(SDATA)或 EMPTY,并发送了 ACKNO。
948 // 现在等待接收后续的 SDATA 或 ENDTR。
949 match self.role.get() {
950 Role::Host => {
951 // Host 必须是收到了 ACKNO
952 if recv.operation != Operation::Acknowledge {
953 return err("Host expected ACKNO in SendingParameter stage");
954 }
955 self.status.set(Status::Other);
956
957 // 将借用操作限制在最小作用域,以避免后续调用 send() 或 err() 时发生冲突
958 let last_sent_op;
959 {
960 last_sent_op = self.last_sent_command.borrow().operation;
961 } // 不可变借用在此结束
962
963 match last_sent_op {
964 Operation::Empty => {
965 // 收到对 EMPTY 的 ACKNO,参数传输结束,发送 ENDTR
966 self.stage.set(Stage::ParameterSent);
967 return send(Command {
968 msg_id: next_msg_id_for_send(),
969 operation: Operation::EndTransaction,
970 object: None,
971 data: None,
972 });
973 }
974 Operation::Data => {
975 // 收到对 SDATA 的 ACKNO
976 let param_data_len = self.data_param.borrow().len() as u64;
977 if self.sending_data_progress.get() < param_data_len {
978 // 还有参数数据需要发送
979 let (data_chunk, _is_last) =
980 match self.slice_data(Role::Host) {
981 Ok(d) => d,
982 Err(e) => {
983 reset_transaction_state();
984 return err(e);
985 }
986 };
987 return send(Command {
988 msg_id: next_msg_id_for_send(),
989 operation: Operation::Data,
990 object: Some(
991 self.root_operation.get().to_name().to_string(),
992 ),
993 data: Some(data_chunk),
994 });
995 } else {
996 // 参数数据已全部发送完毕,发送 ENDTR
997 self.stage.set(Stage::ParameterSent);
998 return send(Command {
999 msg_id: next_msg_id_for_send(),
1000 operation: Operation::EndTransaction,
1001 object: None,
1002 data: None,
1003 });
1004 }
1005 }
1006 _ => {
1007 return err(
1008 "Host received ACKNO for unexpected command in SendingParameter stage",
1009 );
1010 }
1011 }
1012 }
1013 Role::Device => {
1014 // Device 等待 SDATA 或 ENDTR
1015 if recv.operation == Operation::Data {
1016 if let Some(ref data_vec) = recv.data {
1017 self.data_param.borrow_mut().extend_from_slice(data_vec);
1018 }
1019 return ack(recv.msg_id, recv.operation);
1020 } else if recv.operation == Operation::EndTransaction {
1021 self.stage.set(Stage::ParameterSent);
1022 return ack(recv.msg_id, recv.operation);
1023 } else {
1024 return err(
1025 "Device expected DATA or ENDTR in SendingParameter stage",
1026 );
1027 }
1028 }
1029 Role::Idle => {
1030 panic!("Role cannot be Idle if Stage is SendingParameter")
1031 }
1032 }
1033 }
1034 Stage::ParameterSent => {
1035 /* Host -> 收到对 ENDTR 的 ACK,发送 QUERY。等待回传数据或 AWAIT 保活。
1036 Device -> 收到 QUERY,执行逻辑,处理保活和/或回传数据。 */
1037 match self.role.get() {
1038 Role::Host => match recv.operation {
1039 Operation::Acknowledge => {
1040 self.status.set(Status::Other); // ACK received
1041 if Some(String::from("ENDTR")) == recv.object {
1042 return send(Command {
1043 msg_id: util::msg_id::increment(recv.msg_id),
1044 operation: Operation::Query,
1045 object: None,
1046 data: None,
1047 });
1048 } else if Some(String::from("QUERY")) == recv.object {
1049 return None;
1050 } else {
1051 return err(
1052 "Host: Unexpected ACK object in ParameterSent stage",
1053 );
1054 }
1055 }
1056 Operation::Await => {
1057 return ack(recv.msg_id, recv.operation);
1058 }
1059 Operation::Return => {
1060 if Some(String::from("EMPTY")) == recv.object
1061 || Some(self.root_operation.get().to_name().to_string())
1062 == recv.object
1063 {
1064 self.stage.set(Stage::SendingResponse);
1065 return ack(recv.msg_id, recv.operation);
1066 }
1067 }
1068 _ => {
1069 return err("Should be ACKNO, AWAIT or RETURN");
1070 }
1071 },
1072 Role::Device => {
1073 if recv.operation == Operation::Query {
1074 // 开始执行逻辑,然后 ACK
1075 match self.root_operation.get() {
1076 Operation::GetVersion => {
1077 self.data_return.replace(
1078 self.config.pk_version.as_bytes().to_vec(),
1079 );
1080 self.stage.set(Stage::SendingResponse);
1081 }
1082 Operation::RequireVariable => {
1083 let key = match self
1084 .root_object
1085 .borrow()
1086 .as_ref()
1087 .cloned()
1088 {
1089 Some(k) => k,
1090 None => {
1091 // This check should ideally be when root_op was received
1092 reset_transaction_state();
1093 return err(
1094 "Internal: Missing object name for REQUV.",
1095 );
1096 }
1097 };
1098 self.data_return.replace(
1099 self.variable_accessor.get(key).unwrap_or(vec![]),
1100 );
1101 self.stage.set(Stage::SendingResponse);
1102 }
1103 Operation::SendVariable => {
1104 let key = match self
1105 .root_object
1106 .borrow()
1107 .as_ref()
1108 .cloned()
1109 {
1110 Some(k) => k,
1111 None => {
1112 // This check should ideally be when root_op was received
1113 reset_transaction_state();
1114 return err(
1115 "Internal: Missing object name for SENDV.",
1116 );
1117 }
1118 };
1119 self.data_return.replace(
1120 if let Err(e) = self
1121 .variable_accessor
1122 .set(key, self.data_param.borrow().clone())
1123 {
1124 e.as_bytes().to_vec()
1125 } else {
1126 vec![]
1127 },
1128 );
1129 self.stage.set(Stage::SendingResponse); // Note: SENDV error reporting via data_return
1130 }
1131 Operation::Invoke => {
1132 self.device_op_pending.set(true);
1133 self.device_await_deadline.set(Some(
1134 Instant::now() + self.config.await_interval,
1135 ));
1136 // The object for INVOK is self.root_object, not from QUERY (recv.object)
1137 let method_name = match self
1138 .root_object
1139 .borrow()
1140 .as_ref()
1141 .cloned()
1142 {
1143 Some(name) => name,
1144 None => {
1145 reset_transaction_state();
1146 return err(
1147 "Internal: Missing method name for INVOK",
1148 );
1149 }
1150 };
1151 match self
1152 .method_accessor
1153 .call(method_name, self.data_param.borrow().clone())
1154 {
1155 Ok(pollable) => {
1156 self.pending_pollable.replace(Some(pollable));
1157 }
1158 Err(_) => {
1159 reset_transaction_state();
1160 // log::error!("Failed to create INVOK pollable: {}", e_str);
1161 return err(
1162 "Failed to initiate INVOK operation",
1163 );
1164 }
1165 }
1166 }
1167 _ => {
1168 reset_transaction_state();
1169 return err("Not a root operation");
1170 }
1171 }
1172 self.stage.set(Stage::SendingResponse);
1173 self.device_should_return.set(true);
1174 return ack(recv.msg_id, recv.operation);
1175 }
1176 }
1177 Role::Idle => {
1178 panic!("Role cannot be Idle if Stage is ParameterSent")
1179 }
1180 }
1181 }
1182 Stage::SendingResponse => {
1183 /* Host -> 接收数据。
1184 Device -> 收到对 RTURN/SDATA 的 ACK,继续发送数据或终止 */
1185 match self.role.get() {
1186 Role::Host => {
1187 // Host 等待 SDATA 或 ENDTR
1188 if recv.operation == Operation::Data {
1189 // Host receives SDATA from Device
1190 if let Some(ref data_vec) = recv.data {
1191 self.data_return.borrow_mut().extend_from_slice(data_vec);
1192 }
1193 return ack(recv.msg_id, recv.operation);
1194 } else if recv.operation == Operation::EndTransaction {
1195 let endtr_ack = ack(recv.msg_id, recv.operation);
1196 self.stage.set(Stage::Idle);
1197 self.status.set(Status::Other); // After sending ACK, status is Other
1198 return endtr_ack;
1199 } else {
1200 return err(
1201 "Host expected SDATA or ENDTR in SendingResponse stage",
1202 );
1203 }
1204 }
1205 Role::Device => {
1206 // Device 必须是收到了 ACKNO
1207 if recv.operation != Operation::Acknowledge {
1208 return err("Device expected ACKNO in SendingResponse stage");
1209 }
1210 self.status.set(Status::Other);
1211
1212 // 将借用操作限制在最小作用域
1213 let last_sent_op;
1214 {
1215 last_sent_op = self.last_sent_command.borrow().operation;
1216 } // 不可变借用在此结束
1217
1218 match last_sent_op {
1219 Operation::Return => {
1220 // 收到对 RETURN 的 ACKNO
1221 let return_data_len =
1222 self.data_return.borrow().len() as u64;
1223 if return_data_len == 0 {
1224 // 没有返回值,直接发送 ENDTR
1225 // self.stage.set(Stage::Idle); // Transaction ends
1226 // REMOVE: Do not set to Idle yet, wait for ENDTR's ACKNO
1227 return send(Command {
1228 msg_id: next_msg_id_for_send(),
1229 operation: Operation::EndTransaction,
1230 object: None,
1231 data: None,
1232 });
1233 } else {
1234 // 有返回值
1235 let (data_chunk, _) =
1236 match self.slice_data(Role::Device) {
1237 Ok(d) => d,
1238 Err(e) => {
1239 reset_transaction_state();
1240 return err(e);
1241 }
1242 };
1243
1244 return send(Command {
1245 msg_id: next_msg_id_for_send(),
1246 operation: Operation::Data,
1247 object: Some(
1248 self.root_operation.get().to_name().to_string(),
1249 ),
1250 data: Some(data_chunk),
1251 });
1252 }
1253 }
1254 Operation::Data => {
1255 if self.sending_data_progress.get()
1256 < self.data_return.borrow().len() as u64
1257 {
1258 let (data_chunk, _) =
1259 match self.slice_data(Role::Device) {
1260 Ok(d) => d,
1261 Err(e) => {
1262 reset_transaction_state();
1263 return err(e);
1264 }
1265 };
1266 return send(Command {
1267 msg_id: next_msg_id_for_send(),
1268 operation: Operation::Data,
1269 object: Some(
1270 self.root_operation.get().to_name().to_string(),
1271 ),
1272 data: Some(data_chunk),
1273 });
1274 } else {
1275 return send(Command {
1276 msg_id: next_msg_id_for_send(),
1277 operation: Operation::EndTransaction,
1278 object: None,
1279 data: None,
1280 });
1281 }
1282 }
1283 Operation::EndTransaction => {
1284 self.role.set(Role::Idle);
1285 self.stage.set(Stage::Idle);
1286 // reset_transaction_state();
1287 // 结束后不清理数据,考虑到外部可能手动获取,这里就不操心了
1288 return None;
1289 }
1290 Operation::Await => {
1291 // Device received ACKNO AWAIT
1292 // self.status is Other. Device continues pending op.
1293 return None;
1294 }
1295 _ => {
1296 return err(
1297 "Device received ACKNO for unexpected command in SendingResponse stage",
1298 );
1299 }
1300 }
1301 }
1302 _ => {
1303 panic!("Role cannot be Idle if Stage is SendingResponse")
1304 }
1305 }
1306 }
1307 }
1308 }
1309 }
1310 None
1311 }
1312
1313 /// Initiates a new root operation from the Host side.
1314 ///
1315 /// This starts a new transaction chain. It can only be called when the state machine is `Idle`.
1316 /// The actual protocol exchange (beginning with a `START` packet) is driven by subsequent [`poll()`](crate::PkCommand::poll) calls.
1317 ///
1318 /// # Arguments
1319 /// * `operation`: The root operation to perform (`SENDV`, `REQUV`, `INVOK`, or `PKVER`).
1320 /// * `object`: The target name (e.g., variable name for `REQUV`, method name for `INVOK`).
1321 /// * `data`: Optional parameter data (e.g., the value to set for `SENDV`).
1322 ///
1323 /// # Returns
1324 /// - `Ok(())`: The transaction was successfully queued.
1325 /// - `Err(&'static str)`: The request was invalid (e.g., already in a transaction, not a root op).
1326 pub fn perform(
1327 &self,
1328 operation: Operation,
1329 object: Option<String>,
1330 data: Option<Vec<u8>>,
1331 ) -> Result<(), &'static str> {
1332 if operation.is_root()
1333 && self.stage.get() == Stage::Idle
1334 && self.status.get() == Status::Other
1335 && self.role.get() == Role::Idle
1336 {
1337 self.root_operation.set(operation);
1338 self.root_object.replace(object);
1339 self.data_param.replace(data.unwrap_or(vec![]));
1340 self.role.set(Role::Host);
1341 self.stage.set(Stage::Started);
1342 self.status.set(Status::Other);
1343 Ok(())
1344 } else if !operation.is_root() {
1345 Err("Cannot initiate a non-root operation")
1346 } else {
1347 Err("Cannot initiate an operation when the transaction is in progress")
1348 }
1349 }
1350
1351 fn reset_transaction_state(&self) {
1352 self.stage.set(Stage::Idle);
1353 self.status.set(Status::Other);
1354 self.role.set(Role::Idle);
1355 // Clear other relevant fields like root_operation, data_param, data_return, device_op_pending etc.
1356 self.data_param.borrow_mut().clear();
1357 self.data_return.borrow_mut().clear();
1358 self.sending_data_progress.set(0);
1359 self.device_op_pending.set(false);
1360 self.device_await_deadline.set(None);
1361 self.pending_pollable.borrow_mut().take(); // Clear the pollable
1362 }
1363
1364 /// Returns `true` if the state machine is currently [`Idle`](crate::types::Stage::Idle) (no active transaction).
1365 pub fn is_complete(&self) -> bool {
1366 self.stage.get() == Stage::Idle
1367 }
1368
1369 /// Retrieves the return data from a finished transaction and resets the transaction state.
1370 ///
1371 /// This should be called by the Host after [`is_complete()`](crate::PkCommand::is_complete) returns `true` for a root
1372 /// operation that expects return data (e.g., `REQUV` or `INVOK`).
1373 ///
1374 /// # Returns
1375 /// - `Some(Vec<u8>)`: The returned payload.
1376 /// - `None`: If there was no data or the state machine is not in a completed host state.
1377 pub fn get_return_data(&self) -> Option<Vec<u8>> {
1378 if self.stage.get() == Stage::Idle && self.role.get() == Role::Host {
1379 let data = self.data_return.borrow().clone();
1380 self.reset_transaction_state();
1381 if data.is_empty() {
1382 None
1383 } else {
1384 Some(data.clone())
1385 }
1386 } else {
1387 None // Not in a state to provide return data
1388 }
1389 }
1390
1391 /// Checks for transaction completion and executes a callback with the resulting data.
1392 ///
1393 /// This is a convenience method for polling for completion on the Host side.
1394 /// If the transaction is complete, it calls the `callback` with the return data
1395 /// (if any) and returns `true`.
1396 ///
1397 /// # Note
1398 ///
1399 /// This function is also poll-based. You should also call it regularly (e.g., in your main loop), and when the transaction completes, it would call the provided function.
1400 ///
1401 /// # Returns
1402 /// `true` if the transaction was complete and the callback was executed.
1403 ///
1404 /// # Example
1405 /// ```no_run
1406 /// use pk_command::{PkCommand, PkCommandConfig, PkHashmapVariable, PkHashmapMethod};
1407 ///
1408 /// let config = PkCommandConfig::default(64);
1409 /// let vars = PkHashmapVariable::new(vec![]);
1410 /// let methods = PkHashmapMethod::new(vec![]);
1411 /// let pk = PkCommand::<_, _, std::time::Instant>::new(config, vars, methods);
1412 /// # let transport = pk_command::doc_util::Transport::new();
1413 ///
1414 /// loop {
1415 /// // 1. Receive data from transport...
1416 /// if let Some(received_data) = transport.recv() {
1417 /// pk.incoming_command(received_data);
1418 /// }
1419 ///
1420 /// // 3. perform some operation
1421 /// # let some_condition=true;
1422 /// # let operation= pk_command::types::Operation::RequireVariable;
1423 /// # let object=None;
1424 /// # let data=None;
1425 /// if some_condition && pk.is_complete() {
1426 /// pk.perform(operation, object, data).unwrap();
1427 /// }
1428 ///
1429 /// // 4. poll
1430 /// let cmd=pk.poll();
1431 ///
1432 /// // 5. check for completion and handle return data
1433 /// // We can see that this function is poll-based as well, and should be called regularly. (typically
1434 /// // right after calling `poll()`) When the transaction completes, it would call the provided function
1435 /// // with the return data (if any).
1436 /// let mut should_break=false;
1437 /// pk.wait_for_complete_and(|data_opt| {
1438 /// println!("Transaction complete! Return data: {:?}", data_opt);
1439 /// should_break=true;
1440 /// });
1441 ///
1442 /// // 6. Send cmd back via transport...
1443 /// if let Some(cmd_to_send) = cmd {
1444 /// transport.send(cmd_to_send.to_bytes());
1445 /// }
1446 ///
1447 /// // 7. break if needed
1448 /// if should_break {
1449 /// break;
1450 /// }
1451 /// }
1452 /// ```
1453 pub fn wait_for_complete_and<F>(&self, callback: F) -> bool
1454 where
1455 F: FnOnce(Option<Vec<u8>>),
1456 {
1457 // 这个函数也是轮询的,用来给 Host 方返回值(因为在上面的 perform 中并没有告诉 PK 该怎么处理返回值)
1458 if self.stage.get() == Stage::Idle {
1459 let data = self.data_return.borrow().clone();
1460 self.reset_transaction_state();
1461 callback(if data.is_empty() { None } else { Some(data) });
1462 true
1463 } else {
1464 false
1465 }
1466 }
1467
1468 /// Creates a new [`PkCommand`] state machine.
1469 ///
1470 /// # Arguments
1471 /// * `config`: Configuration defining timeouts and packet limits.
1472 /// * `variable_accessor`: Provider for reading/writing variables.
1473 /// * `method_accessor`: Provider for invoking methods.
1474 ///
1475 /// # Note
1476 ///
1477 /// ## The Instant Type Parameter
1478 ///
1479 /// The `Instant` type parameter must implement [`PkInstant`], [`Copy`], [`PartialOrd`], and [`Add<Duration>`].
1480 /// We provide a default implementation for [`std::time::Instant`] in `std` environments,
1481 /// so that could be used directly.
1482 ///
1483 /// For `no_std` environments, users can implement their own [`PkInstant`] and use it here.
1484 ///
1485 /// ## The VA, MA Type Parameters
1486 ///
1487 /// In `std` environments, the library provides two convenient implementations for variable
1488 /// and method accessors: [`PkHashmapVariable`] and [`PkHashmapMethod`], which use
1489 /// [`HashMap`](std::collections::HashMap)s internally. You can use them directly or implement
1490 /// your own if you have different storage needs.
1491 ///
1492 /// In `no_std` environments, you must provide your own implementations of `VA`, `MA`,
1493 /// and `Instant`.
1494 ///
1495 /// ## For Embassy Users
1496 ///
1497 /// The library provides `no_std` utilities, based on the Embassy ecosystem,
1498 /// to help you integrate PK Command into a Embassy-based application.
1499 /// They are:
1500 ///
1501 /// - [`EmbassyInstant`] for `Instant`.
1502 /// - [`EmbassyPollable`](crate::embassy_adapter::EmbassyPollable) for [`Pollable`], and [`embassy_method_accessor!`](crate::embassy_method_accessor) for [`PkMethodAccessor`].
1503 ///
1504 /// Unfortunately still, you may need to provide your own implementation of [`PkVariableAccessor`].
1505 ///
1506 /// # Example
1507 /// ```
1508 /// use pk_command::{PkCommand, PkCommandConfig, PkHashmapVariable, PkHashmapMethod};
1509 ///
1510 /// // The third type parameter here is the PkInstant type. This can't be usually inferred so you must
1511 /// // specify it explicitly. If you are using std, just use std::time::Instant. If you are using no_std,
1512 /// // implement your own PkInstant and specify it here.
1513 /// let pk = PkCommand::<_, _, std::time::Instant>::new(
1514 /// PkCommandConfig::default(64),
1515 /// PkHashmapVariable::new(vec![]),
1516 /// PkHashmapMethod::new(vec![]),
1517 /// );
1518 /// ```
1519 pub fn new(config: PkCommandConfig, variable_accessor: VA, method_accessor: MA) -> Self {
1520 PkCommand {
1521 stage: Cell::new(Stage::Idle),
1522 status: Cell::new(Status::Other),
1523 role: Cell::new(Role::Idle),
1524 last_sent_command: RefCell::new(Command {
1525 msg_id: 0,
1526 operation: Operation::Empty,
1527 object: None,
1528 data: None,
1529 }),
1530 last_sent_msg_id: Cell::new(0),
1531 last_received_msg_id: Cell::new(0),
1532 data_param: RefCell::new(vec![]),
1533 data_return: RefCell::new(vec![]),
1534 sending_data_progress: Cell::new(0),
1535 root_operation: Cell::new(Operation::Empty),
1536 root_object: RefCell::new(None),
1537 command_buffer: RefCell::new(Command {
1538 msg_id: 0,
1539 operation: Operation::Empty,
1540 object: None,
1541 data: None,
1542 }),
1543 command_processed: Cell::new(true),
1544 last_command_time: Cell::new(Instant::now()),
1545 device_op_pending: Cell::new(false),
1546 device_await_deadline: Cell::new(None),
1547 config,
1548 variable_accessor,
1549 method_accessor,
1550 pending_pollable: RefCell::new(None),
1551 device_should_return: Cell::new(false),
1552 }
1553 }
1554}