pk_command/
lib.rs

1const PK_VERSION: &'static str = "1.0.0";
2
3use std::cell::{Cell, RefCell};
4use std::pin::Pin;
5use std::task::Poll;
6use std::time::{Duration, Instant};
7
8pub mod types;
9mod util;
10use types::{Command, Operation, Role, Stage, Status};
11
12pub use util::{PkMHashmapWrapper, PkPollable, PkVHashmapWrapper};
13
14/// Trait defining how to access (get/set) variables by their string key.
15///
16/// This allows the `PkCommand` state machine to be generic over the actual variable storage.
17pub trait PkVariableAccessor {
18    /// Retrieves the value of a variable.
19    ///
20    /// # Arguments
21    /// * `key`: The name of the variable to retrieve.
22    ///
23    /// # Returns
24    /// `Some(Vec<u8>)` containing the variable's data if found, or `None` otherwise.
25    fn get(&self, key: String) -> Option<Vec<u8>>;
26
27    /// Sets the value of a variable.
28    ///
29    /// # Arguments
30    /// * `key`: The name of the variable to set.
31    /// * `value`: The new data for the variable.
32    ///
33    /// # Returns
34    /// `Ok(())` if successful, or an `Err(String)` describing the error.
35    fn set(&self, key: String, value: Vec<u8>) -> Result<(), String>;
36}
37
38/// Trait defining the `poll` method for a pk method which is invoked
39///
40/// This is designed for time-consuming tasks, like `Future`, but not for
41/// asynchronous programming. Ideal impletation may be multithreaded or something else that
42/// ensures the main thread not to be blocked.
43pub trait Pollable {
44    fn poll(&self) -> std::task::Poll<Result<Option<Vec<u8>>, String>>;
45}
46
47/// Trait defining how to invoke methods by their string key.
48///
49/// This allows the `PkCommand` state machine to be generic over the actual method implementation.
50pub trait PkMethodAccessor {
51    /// Calls a method with the given parameters.
52    ///
53    /// # Arguments
54    /// * `key`: The name of the method to call.
55    /// * `param`: The parameters for the method, as a byte vector.
56    ///
57    /// # Returns
58    /// A `Result` containing a pinned, boxed `Pollable` that will resolve to the method's output
59    /// (`Result<Option<Vec<u8>>, String>`), or an `Err(String)` if the method call cannot be initiated.
60    fn call(&self, key: String, param: Vec<u8>) -> Result<Pin<Box<dyn Pollable>>, String>;
61}
62
63/// Configuration for the `PkCommand` state machine.
64#[derive(Clone)]
65pub struct PkCommandConfig {
66    /// Timeout duration for waiting for an `ACKNO` command.
67    ack_timeout: Duration,
68    /// Timeout duration for waiting for the next command in a sequence when not waiting for an `ACKNO`.
69    inter_command_timeout: Duration,
70    /// Interval at which the Device should send `AWAIT` commands during long-running operations.
71    await_interval: Duration,
72    /// The maximum length of a single command packet, including headers and data.
73    packet_limit: u64,
74    /// The version string of the PK protocol interpreter.
75    pk_version: &'static str,
76}
77
78/// The main state machine for handling the PK Command protocol.
79///
80/// It manages transaction states, command parsing, command generation,
81/// acknowledgments, timeouts, and data slicing.
82pub struct PkCommand<VA, MA>
83where
84    VA: PkVariableAccessor,
85    MA: PkMethodAccessor,
86{
87    stage: Cell<Stage>,
88    status: Cell<Status>,
89    role: Cell<Role>,
90    last_sent_command: RefCell<Command>,
91    last_sent_msg_id: Cell<u16>,
92    last_received_msg_id: Cell<u16>,
93    data_param: RefCell<Vec<u8>>,
94    data_return: RefCell<Vec<u8>>,
95    sending_data_progress: Cell<u64>,
96    root_operation: Cell<Operation>,
97    root_object: RefCell<Option<String>>,
98    command_buffer: RefCell<Command>,
99    command_processed: Cell<bool>,
100    last_command_time: Cell<Instant>,
101    device_op_pending: Cell<bool>,
102    device_await_deadline: Cell<Option<Instant>>,
103    config: PkCommandConfig,
104    variable_accessor: VA,
105    method_accessor: MA,
106    pending_pollable: RefCell<Option<Pin<Box<dyn Pollable>>>>,
107    device_should_return: Cell<bool>, // 设备是否“收到了 QUERY 但还没有返回值”
108}
109
110impl<VA: PkVariableAccessor, MA: PkMethodAccessor> PkCommand<VA, MA> {
111    /// Ingests a raw command received from the other party.
112    ///
113    /// The command bytes are parsed, and if successful, the parsed `Command`
114    /// is stored in an internal buffer to be processed by the next call to `poll()`.
115    ///
116    /// # Arguments
117    /// * `command_bytes`: A `Vec<u8>` containing the raw bytes of the received command.
118    ///
119    /// # Returns
120    /// `Ok(())` if the command was successfully parsed and buffered.
121    /// `Err(&'static str)` if parsing failed.
122    pub fn incoming_command(&self, command_bytes: Vec<u8>) -> Result<(), &'static str> {
123        match Command::parse(&command_bytes) {
124            // Pass as slice
125            Ok(parsed_command) => {
126                self.command_buffer.replace(parsed_command);
127                self.command_processed.set(false);
128                self.last_command_time.replace(Instant::now());
129                Ok(())
130            }
131            Err(e) => Err(e),
132        }
133    }
134
135    /// Slices a chunk of data from either `data_param` (for Host sending)
136    /// or `data_return` (for Device sending).
137    ///
138    /// The size of the chunk is determined by `config.packet_limit` minus protocol overhead.
139    /// Updates `sending_data_progress`.
140    ///
141    /// # Arguments
142    /// * `role`: The current role of this `PkCommand` instance, determining which buffer to use.
143    ///
144    /// # Returns
145    /// `Ok((Vec<u8>, bool))` where the `Vec<u8>` is the data chunk and the `bool` is `true`
146    /// if this is the last chunk of data.
147    /// `Err(&'static str)` if there's no data to send or if the role is `Idle`.
148    fn slice_data(&self, role: Role) -> Result<(Vec<u8>, bool), &'static str> {
149        // 如果 Role 是 Device 则默认在发送返回值,反之亦然
150        match role {
151            Role::Device => {
152                let data = self.data_return.borrow();
153                if data.len() == 0 {
154                    return Err("No return data to slice.");
155                }
156                let start = self.sending_data_progress.get() as usize;
157                let end =
158                    std::cmp::min(start + (self.config.packet_limit - 14) as usize, data.len());
159                let is_last_packet = end == data.len();
160                self.sending_data_progress.set(end as u64);
161                Ok((data[start..end].to_vec(), is_last_packet))
162            }
163            Role::Host => {
164                let data = self.data_param.borrow();
165                if data.len() == 0 {
166                    return Err("No parameter data to slice.");
167                }
168                let start = self.sending_data_progress.get() as usize;
169                let end =
170                    std::cmp::min(start + (self.config.packet_limit - 14) as usize, data.len());
171                let is_last_packet = end == data.len();
172                self.sending_data_progress.set(end as u64);
173                Ok((data[start..end].to_vec(), is_last_packet))
174            }
175            Role::Idle => Err("Cannot slice data in Idle role."),
176        }
177    }
178
179    /// Polls the state machine for actions.
180    ///
181    /// This method should be called periodically. It processes incoming commands
182    /// from the internal buffer (filled by `incoming_command`), handles timeouts,
183    /// manages retransmissions, and progresses through the transaction stages.
184    ///
185    /// If the state machine determines that a command needs to be sent to the other party,
186    /// this method will return `Some(Command)`.
187    ///
188    /// # Returns
189    /// `Some(Command)` if a command needs to be sent, or `None` otherwise.
190    pub fn poll(&self) -> Option<Command> {
191        let next_msg_id_for_send = || util::msg_id::increment(self.last_received_msg_id.get());
192        let send = move |command: Command| -> Option<Command> {
193            self.last_command_time.set(Instant::now());
194            self.last_sent_msg_id.set(command.msg_id);
195            self.last_sent_command.replace(command.clone());
196            // 因为 ACK 的函数并没有嵌套调用这个,所以
197            self.status.set(Status::AwaitingAck);
198            Some(command)
199        };
200        let reset_transaction_state = || {
201            self.stage.set(Stage::Idle);
202            self.status.set(Status::Other);
203            self.role.set(Role::Idle);
204            // Clear other relevant fields like root_operation, data_param, data_return, device_op_pending etc.
205            self.data_param.replace(vec![]);
206            self.data_return.replace(vec![]);
207            self.sending_data_progress.set(0);
208            self.device_op_pending.set(false);
209            self.device_await_deadline.set(None);
210            self.pending_pollable.replace(None); //确保清理
211            self.device_should_return.set(false);
212        };
213        let ack = move |msg_id: u16, operation: Operation| -> Option<Command> {
214            self.last_command_time.set(Instant::now());
215            Some(Command {
216                msg_id: msg_id,
217                operation: Operation::Acknowledge,
218                object: Some(operation.to_name().to_string()),
219                data: None,
220            })
221        };
222        let err = |msg: &'static str| -> Option<Command> {
223            // 在收到 ERROR 或 ACKNO ERROR 后,状态数据清零
224            // 这个逻辑在下面处理 所以这里就不写了
225            self.status.set(Status::AwaitingErrAck);
226            let command = Command {
227                msg_id: 0,
228                operation: Operation::Error,
229                object: Some(String::from("ERROR")),
230                data: Some(msg.as_bytes().to_vec()),
231            };
232            self.last_command_time.set(Instant::now());
233            self.last_sent_msg_id.set(command.msg_id);
234            self.last_sent_command.replace(command.clone());
235            Some(command)
236        };
237        // 首先检查是否有新的指令进入 command buffer
238        match self.command_processed.get() {
239            true => {
240                // Idle 则忽略当前 poll
241                if self.stage.get() == Stage::Idle {
242                    return None;
243                }
244                if self.stage.get() == Stage::Started
245                    && self.role.get() == Role::Host
246                    && self.status.get() != Status::AwaitingAck
247                {
248                    return send(Command {
249                        msg_id: next_msg_id_for_send(),
250                        operation: Operation::Start,
251                        object: None,
252                        data: None,
253                    });
254                }
255                // 当设备有挂起的 INVOK 操作并且处于响应阶段时,轮询 Pollable
256                if self.role.get() == Role::Device
257                    && self.device_op_pending.get()
258                    && self.stage.get() == Stage::SendingResponse
259                {
260                    // 如果正在等待 AWAIT 的 ACK,则不轮询主 Pollable, ACK 超时机制处理 AWAIT 的重传
261                    if self.status.get() == Status::AwaitingAck {
262                        // Timeout for AWAIT's ACK will be handled by the generic timeout logic below.
263                    } else if self.status.get() == Status::AwaitingErrAck {
264                        // This state is unlikely if a device operation is pending normally.
265                        // Consider if an error should be raised or state reset.
266                    } else {
267                        // Status::Other, ready to poll the main INVOK pollable
268                        let mut pollable_store = self.pending_pollable.borrow_mut();
269
270                        if let Some(pinned_pollable) = pollable_store.as_mut() {
271                            match pinned_pollable.as_mut().poll() {
272                                Poll::Ready(result) => {
273                                    pollable_store.take(); // Remove completed pollable
274                                    self.device_op_pending.set(false);
275                                    self.device_await_deadline.set(None);
276
277                                    match result {
278                                        Ok(data_opt) => {
279                                            self.data_return.replace(data_opt.unwrap_or_default());
280                                            // Stage is already SendingResponse.
281                                            self.sending_data_progress.set(0); // Reset for sending return data.
282
283                                            let rturn_object_name =
284                                                if self.data_return.borrow().is_empty() {
285                                                    String::from("EMPTY")
286                                                } else {
287                                                    Operation::Invoke.to_name().to_string()
288                                                };
289                                            return send(Command {
290                                                msg_id: next_msg_id_for_send(),
291                                                operation: Operation::Return,
292                                                object: Some(rturn_object_name),
293                                                data: None,
294                                            });
295                                        }
296                                        Err(_) => {
297                                            reset_transaction_state();
298                                            return err("INVOK operation failed");
299                                        }
300                                    }
301                                }
302                                Poll::Pending => {
303                                    if Instant::now()
304                                        >= self
305                                            .device_await_deadline
306                                            .get()
307                                            .unwrap_or(Instant::now())
308                                    {
309                                        self.device_await_deadline
310                                            .set(Some(Instant::now() + self.config.await_interval));
311                                        return send(Command {
312                                            msg_id: next_msg_id_for_send(),
313                                            operation: Operation::Await,
314                                            object: None,
315                                            data: None,
316                                        });
317                                    }
318                                }
319                            }
320                        } else {
321                            // device_op_pending is true, but no pollable.
322                            reset_transaction_state();
323                            return err("Internal: Device op pending but no pollable.");
324                        }
325                    }
326                } // 结束 device_op_pending && Stage::SendingResponse 的处理
327                if self.device_should_return.get() {
328                    self.sending_data_progress.set(0); // 重置发送进度
329                    self.device_should_return.set(false);
330                    // 这时候的状态应该是收到了 QUERY,还没有发送返回值
331                    match self.root_operation.get() {
332                        Operation::GetVersion => {
333                            return send(Command {
334                                msg_id: next_msg_id_for_send(),
335                                operation: Operation::Return,
336                                object: Some(self.root_operation.get().to_name().to_string()),
337                                data: None,
338                            });
339                        }
340                        Operation::RequireVariable => {
341                            if self.data_return.borrow().len() == 0 {
342                                return send(Command {
343                                    msg_id: next_msg_id_for_send(),
344                                    operation: Operation::Return,
345                                    object: Some(String::from("EMPTY")),
346                                    data: None,
347                                });
348                            }
349                            return send(Command {
350                                msg_id: next_msg_id_for_send(),
351                                operation: Operation::Return,
352                                object: Some(self.root_operation.get().to_name().to_string()),
353                                data: None,
354                            });
355                        }
356                        Operation::SendVariable => {
357                            // SENDV doesn't return data in the RTURN command itself.
358                            // The result of the set operation is implicitly acknowledged by the ENDTR ACK.
359                            // If there was an error during set, it would be handled by the error path.
360                            // We still send RTURN EMPTY to signal the end of the Device's processing phase.
361                            self.data_return.replace(vec![]); // Ensure data_return is empty
362                            return send(Command {
363                                msg_id: next_msg_id_for_send(),
364                                operation: Operation::Return,
365                                object: Some(Operation::Empty.to_name().to_string()),
366                                data: None,
367                            });
368                        }
369                        Operation::Invoke => {
370                            // 忽略,因为 Invoke 的返回在上面轮询 Pollable 时处理
371                        }
372                        _ => {
373                            panic!("Not a root operation");
374                        }
375                    }
376                }
377
378                // 获取当前时间来比较超时
379                let elapsed_ms = self.last_command_time.get().elapsed();
380                match self.status.get() {
381                    Status::AwaitingAck | Status::AwaitingErrAck => {
382                        // 等待 ACK 时则检查 ACK 超时来确认是否重传
383                        if elapsed_ms >= self.config.ack_timeout {
384                            return Some(self.last_sent_command.borrow().clone());
385                        }
386                    }
387                    _ => {
388                        // 仅当不在 Idle 状态且没有挂起的设备操作时检查指令间超时
389                        if self.stage.get() != Stage::Idle
390                            && !self.device_op_pending.get()
391                            && elapsed_ms >= self.config.inter_command_timeout
392                        {
393                            reset_transaction_state(); // 在发送错误前重置状态
394                            return err("Operation timed out");
395                        }
396                    }
397                }
398            }
399            // 缓冲区内有新的指令
400            false => {
401                self.command_processed.set(true);
402                self.last_received_msg_id
403                    .set(self.command_buffer.borrow().msg_id); // Store received msg_id
404                let recv = self.command_buffer.borrow();
405                // 首先处理 Error 这种不被 Stage 描述的特殊情况
406                if recv.operation == Operation::Error {
407                    reset_transaction_state();
408                    return ack(0, Operation::Error);
409                } else {
410                    if self.status.get() == Status::AwaitingErrAck {
411                        if recv.operation == Operation::Acknowledge
412                            && recv.object == Some(String::from("ERROR"))
413                        {
414                            self.status.set(Status::Other);
415                            self.root_operation.set(Operation::Empty);
416                            self.stage.set(Stage::Idle);
417                            self.role.set(Role::Idle);
418                            return None;
419                        } else {
420                            return err("Should be ACKNO ERROR");
421                        }
422                    }
423                }
424                match self.stage.get() {
425                    Stage::Idle => {
426                        // 在 Idle 状态下只能收到 START,且自身为 Device
427                        if recv.operation != Operation::Start {
428                            return err("not in a chain");
429                        }
430                        self.role.set(Role::Device);
431                        self.stage.set(Stage::Started);
432                        self.status.set(Status::Other); // Awaiting root command from Host
433                        return ack(recv.msg_id, recv.operation);
434                    }
435                    Stage::Started => {
436                        // 在 Started 状态下,根据当前角色不同,预期的行为应该是
437                        // - Host -> 接收到 ACK,指示当前的根操作
438                        // - Device -> 接收到根操作,进行 ACK
439                        match self.role.get() {
440                            Role::Host => {
441                                if recv.operation == Operation::Acknowledge {
442                                    self.status.set(Status::Other);
443                                    self.stage.set(Stage::RootOperationAssigned);
444                                    return send(Command {
445                                        msg_id: next_msg_id_for_send(),
446                                        operation: self.root_operation.get(),
447                                        object: self.root_object.borrow().clone(),
448                                        data: None,
449                                    });
450                                }
451                            }
452                            Role::Device => {
453                                if recv.operation.is_root() {
454                                    self.root_operation.set(recv.operation);
455                                    // Validate if object is present for ops that require it
456                                    if (recv.operation == Operation::RequireVariable
457                                        || recv.operation == Operation::SendVariable
458                                        || recv.operation == Operation::Invoke)
459                                        && recv.object.is_none()
460                                    {
461                                        reset_transaction_state();
462                                        return err(
463                                            "Operation requires an object but none was provided.",
464                                        );
465                                    }
466                                    self.root_object.replace(recv.object.clone());
467                                    self.stage.set(Stage::RootOperationAssigned);
468                                    return ack(recv.msg_id, recv.operation);
469                                } else {
470                                    return err("not a root operation");
471                                }
472                            }
473                            _ => {
474                                // 考虑代码问题,因为 Stage 已经是 Started 了,Role 不可能是 Idle
475                                panic!("Role cannot be Idle if Stage is Started")
476                            }
477                        }
478                    }
479                    Stage::RootOperationAssigned => {
480                        /* Host -> 接收到 ACK,**开始**传输数据。也就是说参数的*第一段*或 EMPTY 指令
481                          Device -> 接收到 EMPTY 或数据的第一段
482                        */
483                        match self.role.get() {
484                            Role::Host => {
485                                if recv.operation == Operation::Acknowledge {
486                                    self.status.set(Status::Other);
487                                    self.stage.set(Stage::SendingParameter);
488                                    if self.data_param.borrow().len() == 0 {
489                                        return send(Command {
490                                            msg_id: next_msg_id_for_send(),
491                                            operation: Operation::Empty,
492                                            object: None,
493                                            data: None,
494                                        });
495                                    } else {
496                                        match self.slice_data(Role::Host) {
497                                            Ok((data_chunk, _is_last)) => {
498                                                return send(Command {
499                                                    msg_id: next_msg_id_for_send(),
500                                                    operation: Operation::Data,
501                                                    object: Some(
502                                                        self.root_operation
503                                                            .get()
504                                                            .to_name()
505                                                            .to_string(),
506                                                    ),
507                                                    data: Some(data_chunk),
508                                                });
509                                            }
510                                            Err(e) => {
511                                                reset_transaction_state();
512                                                return err(e);
513                                            }
514                                        }
515                                    }
516                                } else {
517                                    return err("Should be ACKNO");
518                                }
519                            }
520                            Role::Device => {
521                                if recv.operation == Operation::Empty {
522                                    self.stage.set(Stage::SendingParameter);
523                                    return ack(recv.msg_id, recv.operation);
524                                } else if recv.operation == Operation::Data {
525                                    self.stage.set(Stage::SendingParameter);
526                                    {
527                                        // 缩小可变借用的作用域,确保归还
528                                        self.data_param.borrow_mut().append(&mut Vec::from(
529                                            recv.data.as_ref().unwrap().clone(),
530                                        ));
531                                    }
532                                    return ack(recv.msg_id, recv.operation);
533                                } else {
534                                    return err("Should be EMPTY or DATA");
535                                }
536                            }
537                            _ => {
538                                // 同上
539                                panic!("Role cannot be Idle if Stage is RootOperationAssigned")
540                            }
541                        }
542                    }
543                    Stage::SendingParameter => {
544                        // 此阶段:
545                        // - Host: 已发送第一个参数数据包(SDATA)或 EMPTY,并收到 ACKNO。
546                        //         现在需要判断是继续发送 SDATA 还是发送 ENDTR。
547                        // - Device: 已收到第一个参数数据包(SDATA)或 EMPTY,并发送了 ACKNO。
548                        //           现在等待接收后续的 SDATA 或 ENDTR。
549                        match self.role.get() {
550                            Role::Host => {
551                                // Host 必须是收到了 ACKNO
552                                if recv.operation != Operation::Acknowledge {
553                                    return err("Host expected ACKNO in SendingParameter stage");
554                                }
555                                self.status.set(Status::Other);
556
557                                // 将借用操作限制在最小作用域,以避免后续调用 send() 或 err() 时发生冲突
558                                let last_sent_op;
559                                {
560                                    last_sent_op = self.last_sent_command.borrow().operation;
561                                } // 不可变借用在此结束
562
563                                match last_sent_op {
564                                    Operation::Empty => {
565                                        // 收到对 EMPTY 的 ACKNO,参数传输结束,发送 ENDTR
566                                        self.stage.set(Stage::ParameterSent);
567                                        return send(Command {
568                                            msg_id: next_msg_id_for_send(),
569                                            operation: Operation::EndTransaction,
570                                            object: None,
571                                            data: None,
572                                        });
573                                    }
574                                    Operation::Data => {
575                                        // 收到对 SDATA 的 ACKNO
576                                        let param_data_len = self.data_param.borrow().len() as u64;
577                                        if self.sending_data_progress.get() < param_data_len {
578                                            // 还有参数数据需要发送
579                                            let (data_chunk, _is_last) =
580                                                match self.slice_data(Role::Host) {
581                                                    Ok(d) => d,
582                                                    Err(e) => {
583                                                        reset_transaction_state();
584                                                        return err(e);
585                                                    }
586                                                };
587                                            return send(Command {
588                                                msg_id: next_msg_id_for_send(),
589                                                operation: Operation::Data,
590                                                object: Some(
591                                                    self.root_operation.get().to_name().to_string(),
592                                                ),
593                                                data: Some(data_chunk),
594                                            });
595                                        } else {
596                                            // 参数数据已全部发送完毕,发送 ENDTR
597                                            self.stage.set(Stage::ParameterSent);
598                                            return send(Command {
599                                                msg_id: next_msg_id_for_send(),
600                                                operation: Operation::EndTransaction,
601                                                object: None,
602                                                data: None,
603                                            });
604                                        }
605                                    }
606                                    _ => {
607                                        return err(
608                                            "Host received ACKNO for unexpected command in SendingParameter stage",
609                                        );
610                                    }
611                                }
612                            }
613                            Role::Device => {
614                                // Device 等待 SDATA 或 ENDTR
615                                if recv.operation == Operation::Data {
616                                    if let Some(ref data_vec) = recv.data {
617                                        self.data_param.borrow_mut().extend_from_slice(data_vec);
618                                    }
619                                    return ack(recv.msg_id, recv.operation);
620                                } else if recv.operation == Operation::EndTransaction {
621                                    self.stage.set(Stage::ParameterSent);
622                                    return ack(recv.msg_id, recv.operation);
623                                } else {
624                                    return err(
625                                        "Device expected DATA or ENDTR in SendingParameter stage",
626                                    );
627                                }
628                            }
629                            Role::Idle => {
630                                panic!("Role cannot be Idle if Stage is SendingParameter")
631                            }
632                        }
633                    }
634                    Stage::ParameterSent => {
635                        /* Host -> 收到对 ENDTR 的 ACK,发送 QUERY。等待回传数据或 AWAKE 保活。
636                        Device -> 收到 QUERY,执行逻辑,处理保活和/或回传数据。 */
637                        match self.role.get() {
638                            Role::Host => match recv.operation {
639                                Operation::Acknowledge => {
640                                    self.status.set(Status::Other); // ACK received
641                                    if recv.object == Some(String::from("ENDTR")) {
642                                        return send(Command {
643                                            msg_id: util::msg_id::increment(recv.msg_id),
644                                            operation: Operation::Query,
645                                            object: None,
646                                            data: None,
647                                        });
648                                    } else if recv.object == Some(String::from("QUERY")) {
649                                        return None;
650                                    } else {
651                                        return err(
652                                            "Host: Unexpected ACK object in ParameterSent stage",
653                                        );
654                                    }
655                                }
656                                Operation::Await => {
657                                    return ack(recv.msg_id, recv.operation);
658                                }
659                                Operation::Return => {
660                                    if recv.object == Some(String::from("EMPTY"))
661                                        || recv.object
662                                            == Some(self.root_operation.get().to_name().to_string())
663                                    {
664                                        self.stage.set(Stage::SendingResponse);
665                                        return ack(recv.msg_id, recv.operation);
666                                    }
667                                }
668                                _ => {
669                                    return err("Should be ACKNO, AWAIT or RETURN");
670                                }
671                            },
672                            Role::Device => {
673                                if recv.operation == Operation::Query {
674                                    // 开始执行逻辑,然后 ACK
675                                    match self.root_operation.get() {
676                                        Operation::GetVersion => {
677                                            self.data_return.replace(
678                                                self.config.pk_version.as_bytes().to_vec(),
679                                            );
680                                            self.stage.set(Stage::SendingResponse);
681                                        }
682                                        Operation::RequireVariable => {
683                                            let key = match self
684                                                .root_object
685                                                .borrow()
686                                                .as_ref()
687                                                .cloned()
688                                            {
689                                                Some(k) => k,
690                                                None => {
691                                                    // This check should ideally be when root_op was received
692                                                    reset_transaction_state();
693                                                    return err(
694                                                        "Internal: Missing object name for REQUV.",
695                                                    );
696                                                }
697                                            };
698                                            self.data_return.replace(
699                                                self.variable_accessor.get(key).unwrap_or(vec![]),
700                                            );
701                                            self.stage.set(Stage::SendingResponse);
702                                        }
703                                        Operation::SendVariable => {
704                                            let key = match self
705                                                .root_object
706                                                .borrow()
707                                                .as_ref()
708                                                .cloned()
709                                            {
710                                                Some(k) => k,
711                                                None => {
712                                                    // This check should ideally be when root_op was received
713                                                    reset_transaction_state();
714                                                    return err(
715                                                        "Internal: Missing object name for SENDV.",
716                                                    );
717                                                }
718                                            };
719                                            self.data_return.replace(
720                                                if let Err(e) = self
721                                                    .variable_accessor
722                                                    .set(key, self.data_param.borrow().clone())
723                                                {
724                                                    e.as_bytes().to_vec()
725                                                } else {
726                                                    vec![]
727                                                },
728                                            );
729                                            self.stage.set(Stage::SendingResponse); // Note: SENDV error reporting via data_return
730                                        }
731                                        Operation::Invoke => {
732                                            self.device_op_pending.set(true);
733                                            self.device_await_deadline.set(Some(
734                                                Instant::now() + self.config.await_interval,
735                                            ));
736                                            // The object for INVOK is self.root_object, not from QUERY (recv.object)
737                                            let method_name = match self
738                                                .root_object
739                                                .borrow()
740                                                .as_ref()
741                                                .cloned()
742                                            {
743                                                Some(name) => name,
744                                                None => {
745                                                    reset_transaction_state();
746                                                    return err(
747                                                        "Internal: Missing method name for INVOK",
748                                                    );
749                                                }
750                                            };
751                                            match self
752                                                .method_accessor
753                                                .call(method_name, self.data_param.borrow().clone())
754                                            {
755                                                Ok(pollable) => {
756                                                    self.pending_pollable.replace(Some(pollable));
757                                                }
758                                                Err(_) => {
759                                                    reset_transaction_state();
760                                                    // log::error!("Failed to create INVOK pollable: {}", e_str);
761                                                    return err(
762                                                        "Failed to initiate INVOK operation",
763                                                    );
764                                                }
765                                            }
766                                        }
767                                        _ => {
768                                            reset_transaction_state();
769                                            return err("Not a root operation");
770                                        }
771                                    }
772                                    self.stage.set(Stage::SendingResponse);
773                                    self.device_should_return.set(true);
774                                    return ack(recv.msg_id, recv.operation);
775                                }
776                            }
777                            Role::Idle => {
778                                panic!("Role cannot be Idle if Stage is ParameterSent")
779                            }
780                        }
781                    }
782                    Stage::SendingResponse => {
783                        /* Host -> 接收数据。
784                        Device -> 收到对 RTURN/SDATA 的 ACK,继续发送数据或终止 */
785                        match self.role.get() {
786                            Role::Host => {
787                                // Host 等待 SDATA 或 ENDTR
788                                if recv.operation == Operation::Data {
789                                    // Host receives SDATA from Device
790                                    if let Some(ref data_vec) = recv.data {
791                                        self.data_return.borrow_mut().extend_from_slice(data_vec);
792                                    }
793                                    return ack(recv.msg_id, recv.operation);
794                                } else if recv.operation == Operation::EndTransaction {
795                                    let endtr_ack = ack(recv.msg_id, recv.operation);
796                                    self.stage.set(Stage::Idle);
797                                    self.status.set(Status::Other); // After sending ACK, status is Other
798                                    return endtr_ack;
799                                } else {
800                                    return err(
801                                        "Host expected SDATA or ENDTR in SendingResponse stage",
802                                    );
803                                }
804                            }
805                            Role::Device => {
806                                // Device 必须是收到了 ACKNO
807                                if recv.operation != Operation::Acknowledge {
808                                    return err("Device expected ACKNO in SendingResponse stage");
809                                }
810                                self.status.set(Status::Other);
811
812                                // 将借用操作限制在最小作用域
813                                let last_sent_op;
814                                {
815                                    last_sent_op = self.last_sent_command.borrow().operation;
816                                } // 不可变借用在此结束
817
818                                match last_sent_op {
819                                    Operation::Return => {
820                                        // 收到对 RETURN 的 ACKNO
821                                        let return_data_len =
822                                            self.data_return.borrow().len() as u64;
823                                        if return_data_len == 0 {
824                                            // 没有返回值,直接发送 ENDTR
825                                            self.stage.set(Stage::Idle); // Transaction ends
826                                            return send(Command {
827                                                msg_id: next_msg_id_for_send(),
828                                                operation: Operation::EndTransaction,
829                                                object: None,
830                                                data: None,
831                                            });
832                                        } else {
833                                            // 有返回值
834                                            let (data_chunk, _) =
835                                                match self.slice_data(Role::Device) {
836                                                    Ok(d) => d,
837                                                    Err(e) => {
838                                                        reset_transaction_state();
839                                                        return err(e);
840                                                    }
841                                                };
842
843                                            return send(Command {
844                                                msg_id: next_msg_id_for_send(),
845                                                operation: Operation::Data,
846                                                object: Some(
847                                                    self.root_operation.get().to_name().to_string(),
848                                                ),
849                                                data: Some(data_chunk),
850                                            });
851                                        }
852                                    }
853                                    Operation::Data => {
854                                        if self.sending_data_progress.get()
855                                            < self.data_return.borrow().len() as u64
856                                        {
857                                            let (data_chunk, _) =
858                                                match self.slice_data(Role::Device) {
859                                                    Ok(d) => d,
860                                                    Err(e) => {
861                                                        reset_transaction_state();
862                                                        return err(e);
863                                                    }
864                                                };
865                                            return send(Command {
866                                                msg_id: next_msg_id_for_send(),
867                                                operation: Operation::Data,
868                                                object: Some(
869                                                    self.root_operation.get().to_name().to_string(),
870                                                ),
871                                                data: Some(data_chunk),
872                                            });
873                                        } else {
874                                            return send(Command {
875                                                msg_id: next_msg_id_for_send(),
876                                                operation: Operation::EndTransaction,
877                                                object: None,
878                                                data: None,
879                                            });
880                                        }
881                                    }
882                                    Operation::EndTransaction => {
883                                        self.role.set(Role::Idle);
884                                        self.stage.set(Stage::Idle);
885                                        // reset_transaction_state();
886                                        // 结束后不清理数据,考虑到外部可能手动获取,这里就不操心了
887                                        return None;
888                                    }
889                                    Operation::Await => {
890                                        // Device received ACKNO AWAIT
891                                        // self.status is Other. Device continues pending op.
892                                        return None;
893                                    }
894                                    _ => {
895                                        return err(
896                                            "Device received ACKNO for unexpected command in SendingResponse stage",
897                                        );
898                                    }
899                                }
900                            }
901                            _ => {
902                                panic!("Role cannot be Idle if Stage is SendingResponse")
903                            }
904                        }
905                    }
906                }
907            }
908        }
909        None
910    }
911
912    /// Initiates a new root operation from the Host side.
913    ///
914    /// This method should only be called when the `PkCommand` instance is in an `Idle` state.
915    /// It sets up the necessary internal state to begin a new transaction.
916    /// The actual `START` command and subsequent root operation command will be generated
917    /// by subsequent calls to `poll()`.
918    ///
919    /// # Arguments
920    /// * `operation`: The root `Operation` to perform (e.g., `SENDV`, `REQUV`, `INVOK`, `PKVER`).
921    /// * `object`: An optional `String` representing the object of the operation (e.g., variable name, method name).
922    /// * `data`: Optional `Vec<u8>` containing parameter data for the operation (e.g., for `SENDV` or `INVOK`).
923    ///
924    /// # Returns
925    /// `Ok(())` if the operation can be initiated, or `Err(&'static str)` if not (e.g., not idle, or not a root operation).
926    pub fn perform(
927        &self,
928        operation: Operation,
929        object: Option<String>,
930        data: Option<Vec<u8>>,
931    ) -> Result<(), &'static str> {
932        if operation.is_root()
933            && self.stage.get() == Stage::Idle
934            && self.status.get() == Status::Other
935            && self.role.get() == Role::Idle
936        {
937            self.root_operation.set(operation);
938            self.root_object.replace(object);
939            self.data_param.replace(data.unwrap_or(vec![]));
940            self.role.set(Role::Host);
941            self.stage.set(Stage::Started);
942            self.status.set(Status::Other);
943            Ok(())
944        } else if !operation.is_root() {
945            Err("Cannot initiate a non-root operation")
946        } else {
947            Err("Cannot initiate an operation when the transaction is in progress")
948        }
949    }
950
951    fn reset_transaction_state(&self) -> () {
952        self.stage.set(Stage::Idle);
953        self.status.set(Status::Other);
954        self.role.set(Role::Idle);
955        // Clear other relevant fields like root_operation, data_param, data_return, device_op_pending etc.
956        self.data_param.borrow_mut().clear();
957        self.data_return.borrow_mut().clear();
958        self.sending_data_progress.set(0);
959        self.device_op_pending.set(false);
960        self.device_await_deadline.set(None);
961        self.pending_pollable.borrow_mut().take(); // Clear the pollable
962    }
963
964    /// Checks if the transaction is complete (i.e., the state machine is in the `Idle` stage).
965    ///
966    /// # Returns
967    /// `true` if the transaction is complete, `false` otherwise.
968    pub fn is_complete(&self) -> bool {
969        self.stage.get() == Stage::Idle
970    }
971
972    /// Retrieves the return data from the completed transaction.
973    ///
974    /// This method should only be called when `is_complete()` returns `true` and the
975    /// instance is acting as the Host.
976    ///
977    /// # Returns
978    /// `Some(Vec<u8>)` containing the return data if available, or `None` if there was no return data.
979    pub fn get_return_data(&self) -> Option<Vec<u8>> {
980        if self.stage.get() == Stage::Idle && self.role.get() == Role::Host {
981            let data = self.data_return.borrow().clone();
982            self.reset_transaction_state();
983            if data.is_empty() {
984                None
985            } else {
986                Some(data.clone())
987            }
988        } else {
989            None // Not in a state to provide return data
990        }
991    }
992
993    /// Waits for the transaction to complete and then executes a callback with the return data.
994    ///
995    /// This is a polling-based wait. The callback is only executed once the state machine enters the `Idle` stage.
996    ///
997    /// # Arguments
998    /// * `callback`: A closure that takes an `Option<Vec<u8>>` (the return data) and is executed upon completion.
999    ///
1000    /// # Returns
1001    /// `true` if the callback was executed, or `false` otherwise
1002    ///
1003    /// # Note
1004    /// This method assumes `poll()` is being called externally to drive the state machine.
1005    /// It does not block the current thread waiting for completion, but rather checks the state
1006    /// and executes the callback if complete. You must ensure `poll()` is called frequently
1007    /// for the transaction to progress.
1008    pub fn wait_for_complete_and<F>(&self, callback: F) -> bool
1009    where
1010        F: FnOnce(Option<Vec<u8>>) -> (),
1011    {
1012        // 这个函数也是轮询的,用来给 Host 方返回值(因为在上面的 perform 中并没有告诉 PK 该怎么处理返回值)
1013        if self.stage.get() == Stage::Idle {
1014            let data = self.data_return.borrow().clone();
1015            self.reset_transaction_state();
1016            callback(if data.len() == 0 { None } else { Some(data) });
1017            true
1018        } else {
1019            false
1020        }
1021    }
1022
1023    /// Creates a new `PkCommand` state machine instance.
1024    ///
1025    /// # Arguments
1026    /// * `config`: The `PkCommandConfig` to use.
1027    /// * `variable_accessor`: An implementation of `PkVariableAccessor` for variable operations.
1028    /// * `method_accessor`: An implementation of `PkMethodAccessor` for method invocation.
1029    ///
1030    pub fn new(config: PkCommandConfig, variable_accessor: VA, method_accessor: MA) -> Self {
1031        PkCommand {
1032            stage: Cell::new(Stage::Idle),
1033            status: Cell::new(Status::Other),
1034            role: Cell::new(Role::Idle),
1035            last_sent_command: RefCell::new(Command {
1036                msg_id: 0,
1037                operation: Operation::Empty,
1038                object: None,
1039                data: None,
1040            }),
1041            last_sent_msg_id: Cell::new(0),
1042            last_received_msg_id: Cell::new(0),
1043            data_param: RefCell::new(vec![]),
1044            data_return: RefCell::new(vec![]),
1045            sending_data_progress: Cell::new(0),
1046            root_operation: Cell::new(Operation::Empty),
1047            root_object: RefCell::new(None),
1048            command_buffer: RefCell::new(Command {
1049                msg_id: 0,
1050                operation: Operation::Empty,
1051                object: None,
1052                data: None,
1053            }),
1054            command_processed: Cell::new(true),
1055            last_command_time: Cell::new(Instant::now()),
1056            device_op_pending: Cell::new(false),
1057            device_await_deadline: Cell::new(None),
1058            config,
1059            variable_accessor,
1060            method_accessor,
1061            pending_pollable: RefCell::new(None),
1062            device_should_return: Cell::new(false),
1063        }
1064    }
1065}
1066impl PkCommandConfig {
1067    /// Creates a `PkCommandConfig` with default timeout values.
1068    ///
1069    /// # Arguments
1070    /// * `packet_limit`: The maximum packet size allowed by the transport layer.
1071    ///
1072    pub fn default(packet_limit: u64) -> Self {
1073        PkCommandConfig {
1074            ack_timeout: Duration::from_millis(100),
1075            inter_command_timeout: Duration::from_millis(500),
1076            await_interval: Duration::from_millis(300),
1077            packet_limit,
1078            pk_version: PK_VERSION,
1079        }
1080    }
1081
1082    /// Creates a new `PkCommandConfig` with specified values.
1083    ///
1084    /// # Arguments
1085    /// * `ack_timeout`: ACK timeout in milliseconds.
1086    /// * `inter_command_timeout`: Inter-command timeout in milliseconds.
1087    /// * `await_interval`: AWAIT interval in milliseconds.
1088    /// * `packet_limit`: The maximum packet size allowed by the transport layer.
1089    ///
1090    pub fn new(
1091        ack_timeout: u64,
1092        inter_command_timeout: u64,
1093        await_interval: u64,
1094        packet_limit: u64,
1095    ) -> Self {
1096        PkCommandConfig {
1097            ack_timeout: Duration::from_millis(ack_timeout),
1098            inter_command_timeout: Duration::from_millis(inter_command_timeout),
1099            await_interval: Duration::from_millis(await_interval),
1100            packet_limit, // Default packet limit if not specified
1101            pk_version: PK_VERSION,
1102        }
1103    }
1104}