pk_command/lib.rs
1const PK_VERSION: &'static str = "1.0.0";
2
3use std::cell::{Cell, RefCell};
4use std::pin::Pin;
5use std::task::Poll;
6use std::time::{Duration, Instant};
7
8pub mod types;
9mod util;
10use types::{Command, Operation, Role, Stage, Status};
11
12pub use util::{PkMHashmapWrapper, PkPollable, PkVHashmapWrapper};
13
14/// Trait defining how to access (get/set) variables by their string key.
15///
16/// This allows the `PkCommand` state machine to be generic over the actual variable storage.
17pub trait PkVariableAccessor {
18 /// Retrieves the value of a variable.
19 ///
20 /// # Arguments
21 /// * `key`: The name of the variable to retrieve.
22 ///
23 /// # Returns
24 /// `Some(Vec<u8>)` containing the variable's data if found, or `None` otherwise.
25 fn get(&self, key: String) -> Option<Vec<u8>>;
26
27 /// Sets the value of a variable.
28 ///
29 /// # Arguments
30 /// * `key`: The name of the variable to set.
31 /// * `value`: The new data for the variable.
32 ///
33 /// # Returns
34 /// `Ok(())` if successful, or an `Err(String)` describing the error.
35 fn set(&self, key: String, value: Vec<u8>) -> Result<(), String>;
36}
37
38/// Trait defining the `poll` method for a pk method which is invoked
39///
40/// This is designed for time-consuming tasks, like `Future`, but not for
41/// asynchronous programming. Ideal impletation may be multithreaded or something else that
42/// ensures the main thread not to be blocked.
43pub trait Pollable {
44 fn poll(&self) -> std::task::Poll<Result<Option<Vec<u8>>, String>>;
45}
46
47/// Trait defining how to invoke methods by their string key.
48///
49/// This allows the `PkCommand` state machine to be generic over the actual method implementation.
50pub trait PkMethodAccessor {
51 /// Calls a method with the given parameters.
52 ///
53 /// # Arguments
54 /// * `key`: The name of the method to call.
55 /// * `param`: The parameters for the method, as a byte vector.
56 ///
57 /// # Returns
58 /// A `Result` containing a pinned, boxed `Pollable` that will resolve to the method's output
59 /// (`Result<Option<Vec<u8>>, String>`), or an `Err(String)` if the method call cannot be initiated.
60 fn call(&self, key: String, param: Vec<u8>) -> Result<Pin<Box<dyn Pollable>>, String>;
61}
62
63/// Configuration for the `PkCommand` state machine.
64#[derive(Clone)]
65pub struct PkCommandConfig {
66 /// Timeout duration for waiting for an `ACKNO` command.
67 ack_timeout: Duration,
68 /// Timeout duration for waiting for the next command in a sequence when not waiting for an `ACKNO`.
69 inter_command_timeout: Duration,
70 /// Interval at which the Device should send `AWAIT` commands during long-running operations.
71 await_interval: Duration,
72 /// The maximum length of a single command packet, including headers and data.
73 packet_limit: u64,
74 /// The version string of the PK protocol interpreter.
75 pk_version: &'static str,
76}
77
78/// The main state machine for handling the PK Command protocol.
79///
80/// It manages transaction states, command parsing, command generation,
81/// acknowledgments, timeouts, and data slicing.
82pub struct PkCommand<VA, MA>
83where
84 VA: PkVariableAccessor,
85 MA: PkMethodAccessor,
86{
87 stage: Cell<Stage>,
88 status: Cell<Status>,
89 role: Cell<Role>,
90 last_sent_command: RefCell<Command>,
91 last_sent_msg_id: Cell<u16>,
92 last_received_msg_id: Cell<u16>,
93 data_param: RefCell<Vec<u8>>,
94 data_return: RefCell<Vec<u8>>,
95 sending_data_progress: Cell<u64>,
96 root_operation: Cell<Operation>,
97 root_object: RefCell<Option<String>>,
98 command_buffer: RefCell<Command>,
99 command_processed: Cell<bool>,
100 last_command_time: Cell<Instant>,
101 device_op_pending: Cell<bool>,
102 device_await_deadline: Cell<Option<Instant>>,
103 config: PkCommandConfig,
104 variable_accessor: VA,
105 method_accessor: MA,
106 pending_pollable: RefCell<Option<Pin<Box<dyn Pollable>>>>,
107 device_should_return: Cell<bool>, // 设备是否“收到了 QUERY 但还没有返回值”
108}
109
110impl<VA: PkVariableAccessor, MA: PkMethodAccessor> PkCommand<VA, MA> {
111 /// Ingests a raw command received from the other party.
112 ///
113 /// The command bytes are parsed, and if successful, the parsed `Command`
114 /// is stored in an internal buffer to be processed by the next call to `poll()`.
115 ///
116 /// # Arguments
117 /// * `command_bytes`: A `Vec<u8>` containing the raw bytes of the received command.
118 ///
119 /// # Returns
120 /// `Ok(())` if the command was successfully parsed and buffered.
121 /// `Err(&'static str)` if parsing failed.
122 pub fn incoming_command(&self, command_bytes: Vec<u8>) -> Result<(), &'static str> {
123 match Command::parse(&command_bytes) {
124 // Pass as slice
125 Ok(parsed_command) => {
126 self.command_buffer.replace(parsed_command);
127 self.command_processed.set(false);
128 self.last_command_time.replace(Instant::now());
129 Ok(())
130 }
131 Err(e) => Err(e),
132 }
133 }
134
135 /// Slices a chunk of data from either `data_param` (for Host sending)
136 /// or `data_return` (for Device sending).
137 ///
138 /// The size of the chunk is determined by `config.packet_limit` minus protocol overhead.
139 /// Updates `sending_data_progress`.
140 ///
141 /// # Arguments
142 /// * `role`: The current role of this `PkCommand` instance, determining which buffer to use.
143 ///
144 /// # Returns
145 /// `Ok((Vec<u8>, bool))` where the `Vec<u8>` is the data chunk and the `bool` is `true`
146 /// if this is the last chunk of data.
147 /// `Err(&'static str)` if there's no data to send or if the role is `Idle`.
148 fn slice_data(&self, role: Role) -> Result<(Vec<u8>, bool), &'static str> {
149 // 如果 Role 是 Device 则默认在发送返回值,反之亦然
150 match role {
151 Role::Device => {
152 let data = self.data_return.borrow();
153 if data.len() == 0 {
154 return Err("No return data to slice.");
155 }
156 let start = self.sending_data_progress.get() as usize;
157 let end =
158 std::cmp::min(start + (self.config.packet_limit - 14) as usize, data.len());
159 let is_last_packet = end == data.len();
160 self.sending_data_progress.set(end as u64);
161 Ok((data[start..end].to_vec(), is_last_packet))
162 }
163 Role::Host => {
164 let data = self.data_param.borrow();
165 if data.len() == 0 {
166 return Err("No parameter data to slice.");
167 }
168 let start = self.sending_data_progress.get() as usize;
169 let end =
170 std::cmp::min(start + (self.config.packet_limit - 14) as usize, data.len());
171 let is_last_packet = end == data.len();
172 self.sending_data_progress.set(end as u64);
173 Ok((data[start..end].to_vec(), is_last_packet))
174 }
175 Role::Idle => Err("Cannot slice data in Idle role."),
176 }
177 }
178
179 /// Polls the state machine for actions.
180 ///
181 /// This method should be called periodically. It processes incoming commands
182 /// from the internal buffer (filled by `incoming_command`), handles timeouts,
183 /// manages retransmissions, and progresses through the transaction stages.
184 ///
185 /// If the state machine determines that a command needs to be sent to the other party,
186 /// this method will return `Some(Command)`.
187 ///
188 /// # Returns
189 /// `Some(Command)` if a command needs to be sent, or `None` otherwise.
190 pub fn poll(&self) -> Option<Command> {
191 let next_msg_id_for_send = || util::msg_id::increment(self.last_received_msg_id.get());
192 let send = move |command: Command| -> Option<Command> {
193 self.last_command_time.set(Instant::now());
194 self.last_sent_msg_id.set(command.msg_id);
195 self.last_sent_command.replace(command.clone());
196 // 因为 ACK 的函数并没有嵌套调用这个,所以
197 self.status.set(Status::AwaitingAck);
198 Some(command)
199 };
200 let reset_transaction_state = || {
201 self.stage.set(Stage::Idle);
202 self.status.set(Status::Other);
203 self.role.set(Role::Idle);
204 // Clear other relevant fields like root_operation, data_param, data_return, device_op_pending etc.
205 self.data_param.replace(vec![]);
206 self.data_return.replace(vec![]);
207 self.sending_data_progress.set(0);
208 self.device_op_pending.set(false);
209 self.device_await_deadline.set(None);
210 self.pending_pollable.replace(None); //确保清理
211 self.device_should_return.set(false);
212 };
213 let ack = move |msg_id: u16, operation: Operation| -> Option<Command> {
214 self.last_command_time.set(Instant::now());
215 Some(Command {
216 msg_id: msg_id,
217 operation: Operation::Acknowledge,
218 object: Some(operation.to_name().to_string()),
219 data: None,
220 })
221 };
222 let err = |msg: &'static str| -> Option<Command> {
223 // 在收到 ERROR 或 ACKNO ERROR 后,状态数据清零
224 // 这个逻辑在下面处理 所以这里就不写了
225 self.status.set(Status::AwaitingErrAck);
226 let command = Command {
227 msg_id: 0,
228 operation: Operation::Error,
229 object: Some(String::from("ERROR")),
230 data: Some(msg.as_bytes().to_vec()),
231 };
232 self.last_command_time.set(Instant::now());
233 self.last_sent_msg_id.set(command.msg_id);
234 self.last_sent_command.replace(command.clone());
235 Some(command)
236 };
237 // 首先检查是否有新的指令进入 command buffer
238 match self.command_processed.get() {
239 true => {
240 // Idle 则忽略当前 poll
241 if self.stage.get() == Stage::Idle {
242 return None;
243 }
244 if self.stage.get() == Stage::Started
245 && self.role.get() == Role::Host
246 && self.status.get() != Status::AwaitingAck
247 {
248 return send(Command {
249 msg_id: next_msg_id_for_send(),
250 operation: Operation::Start,
251 object: None,
252 data: None,
253 });
254 }
255 // 当设备有挂起的 INVOK 操作并且处于响应阶段时,轮询 Pollable
256 if self.role.get() == Role::Device
257 && self.device_op_pending.get()
258 && self.stage.get() == Stage::SendingResponse
259 {
260 // 如果正在等待 AWAIT 的 ACK,则不轮询主 Pollable, ACK 超时机制处理 AWAIT 的重传
261 if self.status.get() == Status::AwaitingAck {
262 // Timeout for AWAIT's ACK will be handled by the generic timeout logic below.
263 } else if self.status.get() == Status::AwaitingErrAck {
264 // This state is unlikely if a device operation is pending normally.
265 // Consider if an error should be raised or state reset.
266 } else {
267 // Status::Other, ready to poll the main INVOK pollable
268 let mut pollable_store = self.pending_pollable.borrow_mut();
269
270 if let Some(pinned_pollable) = pollable_store.as_mut() {
271 match pinned_pollable.as_mut().poll() {
272 Poll::Ready(result) => {
273 pollable_store.take(); // Remove completed pollable
274 self.device_op_pending.set(false);
275 self.device_await_deadline.set(None);
276
277 match result {
278 Ok(data_opt) => {
279 self.data_return.replace(data_opt.unwrap_or_default());
280 // Stage is already SendingResponse.
281 self.sending_data_progress.set(0); // Reset for sending return data.
282
283 let rturn_object_name =
284 if self.data_return.borrow().is_empty() {
285 String::from("EMPTY")
286 } else {
287 Operation::Invoke.to_name().to_string()
288 };
289 return send(Command {
290 msg_id: next_msg_id_for_send(),
291 operation: Operation::Return,
292 object: Some(rturn_object_name),
293 data: None,
294 });
295 }
296 Err(_) => {
297 reset_transaction_state();
298 return err("INVOK operation failed");
299 }
300 }
301 }
302 Poll::Pending => {
303 if Instant::now()
304 >= self
305 .device_await_deadline
306 .get()
307 .unwrap_or(Instant::now())
308 {
309 self.device_await_deadline
310 .set(Some(Instant::now() + self.config.await_interval));
311 return send(Command {
312 msg_id: next_msg_id_for_send(),
313 operation: Operation::Await,
314 object: None,
315 data: None,
316 });
317 }
318 }
319 }
320 } else {
321 // device_op_pending is true, but no pollable.
322 reset_transaction_state();
323 return err("Internal: Device op pending but no pollable.");
324 }
325 }
326 } // 结束 device_op_pending && Stage::SendingResponse 的处理
327 if self.device_should_return.get() {
328 self.sending_data_progress.set(0); // 重置发送进度
329 self.device_should_return.set(false);
330 // 这时候的状态应该是收到了 QUERY,还没有发送返回值
331 match self.root_operation.get() {
332 Operation::GetVersion => {
333 return send(Command {
334 msg_id: next_msg_id_for_send(),
335 operation: Operation::Return,
336 object: Some(self.root_operation.get().to_name().to_string()),
337 data: None,
338 });
339 }
340 Operation::RequireVariable => {
341 if self.data_return.borrow().len() == 0 {
342 return send(Command {
343 msg_id: next_msg_id_for_send(),
344 operation: Operation::Return,
345 object: Some(String::from("EMPTY")),
346 data: None,
347 });
348 }
349 return send(Command {
350 msg_id: next_msg_id_for_send(),
351 operation: Operation::Return,
352 object: Some(self.root_operation.get().to_name().to_string()),
353 data: None,
354 });
355 }
356 Operation::SendVariable => {
357 // SENDV doesn't return data in the RTURN command itself.
358 // The result of the set operation is implicitly acknowledged by the ENDTR ACK.
359 // If there was an error during set, it would be handled by the error path.
360 // We still send RTURN EMPTY to signal the end of the Device's processing phase.
361 self.data_return.replace(vec![]); // Ensure data_return is empty
362 return send(Command {
363 msg_id: next_msg_id_for_send(),
364 operation: Operation::Return,
365 object: Some(Operation::Empty.to_name().to_string()),
366 data: None,
367 });
368 }
369 Operation::Invoke => {
370 // 忽略,因为 Invoke 的返回在上面轮询 Pollable 时处理
371 }
372 _ => {
373 panic!("Not a root operation");
374 }
375 }
376 }
377
378 // 获取当前时间来比较超时
379 let elapsed_ms = self.last_command_time.get().elapsed();
380 match self.status.get() {
381 Status::AwaitingAck | Status::AwaitingErrAck => {
382 // 等待 ACK 时则检查 ACK 超时来确认是否重传
383 if elapsed_ms >= self.config.ack_timeout {
384 return Some(self.last_sent_command.borrow().clone());
385 }
386 }
387 _ => {
388 // 仅当不在 Idle 状态且没有挂起的设备操作时检查指令间超时
389 if self.stage.get() != Stage::Idle
390 && !self.device_op_pending.get()
391 && elapsed_ms >= self.config.inter_command_timeout
392 {
393 reset_transaction_state(); // 在发送错误前重置状态
394 return err("Operation timed out");
395 }
396 }
397 }
398 }
399 // 缓冲区内有新的指令
400 false => {
401 self.command_processed.set(true);
402 self.last_received_msg_id
403 .set(self.command_buffer.borrow().msg_id); // Store received msg_id
404 let recv = self.command_buffer.borrow();
405 // 首先处理 Error 这种不被 Stage 描述的特殊情况
406 if recv.operation == Operation::Error {
407 reset_transaction_state();
408 return ack(0, Operation::Error);
409 } else {
410 if self.status.get() == Status::AwaitingErrAck {
411 if recv.operation == Operation::Acknowledge
412 && recv.object == Some(String::from("ERROR"))
413 {
414 self.status.set(Status::Other);
415 self.root_operation.set(Operation::Empty);
416 self.stage.set(Stage::Idle);
417 self.role.set(Role::Idle);
418 return None;
419 } else {
420 return err("Should be ACKNO ERROR");
421 }
422 }
423 }
424 match self.stage.get() {
425 Stage::Idle => {
426 // 在 Idle 状态下只能收到 START,且自身为 Device
427 if recv.operation != Operation::Start {
428 return err("not in a chain");
429 }
430 self.role.set(Role::Device);
431 self.stage.set(Stage::Started);
432 self.status.set(Status::Other); // Awaiting root command from Host
433 return ack(recv.msg_id, recv.operation);
434 }
435 Stage::Started => {
436 // 在 Started 状态下,根据当前角色不同,预期的行为应该是
437 // - Host -> 接收到 ACK,指示当前的根操作
438 // - Device -> 接收到根操作,进行 ACK
439 match self.role.get() {
440 Role::Host => {
441 if recv.operation == Operation::Acknowledge {
442 self.status.set(Status::Other);
443 self.stage.set(Stage::RootOperationAssigned);
444 return send(Command {
445 msg_id: next_msg_id_for_send(),
446 operation: self.root_operation.get(),
447 object: self.root_object.borrow().clone(),
448 data: None,
449 });
450 }
451 }
452 Role::Device => {
453 if recv.operation.is_root() {
454 self.root_operation.set(recv.operation);
455 // Validate if object is present for ops that require it
456 if (recv.operation == Operation::RequireVariable
457 || recv.operation == Operation::SendVariable
458 || recv.operation == Operation::Invoke)
459 && recv.object.is_none()
460 {
461 reset_transaction_state();
462 return err(
463 "Operation requires an object but none was provided.",
464 );
465 }
466 self.root_object.replace(recv.object.clone());
467 self.stage.set(Stage::RootOperationAssigned);
468 return ack(recv.msg_id, recv.operation);
469 } else {
470 return err("not a root operation");
471 }
472 }
473 _ => {
474 // 考虑代码问题,因为 Stage 已经是 Started 了,Role 不可能是 Idle
475 panic!("Role cannot be Idle if Stage is Started")
476 }
477 }
478 }
479 Stage::RootOperationAssigned => {
480 /* Host -> 接收到 ACK,**开始**传输数据。也就是说参数的*第一段*或 EMPTY 指令
481 Device -> 接收到 EMPTY 或数据的第一段
482 */
483 match self.role.get() {
484 Role::Host => {
485 if recv.operation == Operation::Acknowledge {
486 self.status.set(Status::Other);
487 self.stage.set(Stage::SendingParameter);
488 if self.data_param.borrow().len() == 0 {
489 return send(Command {
490 msg_id: next_msg_id_for_send(),
491 operation: Operation::Empty,
492 object: None,
493 data: None,
494 });
495 } else {
496 match self.slice_data(Role::Host) {
497 Ok((data_chunk, _is_last)) => {
498 return send(Command {
499 msg_id: next_msg_id_for_send(),
500 operation: Operation::Data,
501 object: Some(
502 self.root_operation
503 .get()
504 .to_name()
505 .to_string(),
506 ),
507 data: Some(data_chunk),
508 });
509 }
510 Err(e) => {
511 reset_transaction_state();
512 return err(e);
513 }
514 }
515 }
516 } else {
517 return err("Should be ACKNO");
518 }
519 }
520 Role::Device => {
521 if recv.operation == Operation::Empty {
522 self.stage.set(Stage::SendingParameter);
523 return ack(recv.msg_id, recv.operation);
524 } else if recv.operation == Operation::Data {
525 self.stage.set(Stage::SendingParameter);
526 {
527 // 缩小可变借用的作用域,确保归还
528 self.data_param.borrow_mut().append(&mut Vec::from(
529 recv.data.as_ref().unwrap().clone(),
530 ));
531 }
532 return ack(recv.msg_id, recv.operation);
533 } else {
534 return err("Should be EMPTY or DATA");
535 }
536 }
537 _ => {
538 // 同上
539 panic!("Role cannot be Idle if Stage is RootOperationAssigned")
540 }
541 }
542 }
543 Stage::SendingParameter => {
544 // 此阶段:
545 // - Host: 已发送第一个参数数据包(SDATA)或 EMPTY,并收到 ACKNO。
546 // 现在需要判断是继续发送 SDATA 还是发送 ENDTR。
547 // - Device: 已收到第一个参数数据包(SDATA)或 EMPTY,并发送了 ACKNO。
548 // 现在等待接收后续的 SDATA 或 ENDTR。
549 match self.role.get() {
550 Role::Host => {
551 // Host 必须是收到了 ACKNO
552 if recv.operation != Operation::Acknowledge {
553 return err("Host expected ACKNO in SendingParameter stage");
554 }
555 self.status.set(Status::Other);
556
557 // 将借用操作限制在最小作用域,以避免后续调用 send() 或 err() 时发生冲突
558 let last_sent_op;
559 {
560 last_sent_op = self.last_sent_command.borrow().operation;
561 } // 不可变借用在此结束
562
563 match last_sent_op {
564 Operation::Empty => {
565 // 收到对 EMPTY 的 ACKNO,参数传输结束,发送 ENDTR
566 self.stage.set(Stage::ParameterSent);
567 return send(Command {
568 msg_id: next_msg_id_for_send(),
569 operation: Operation::EndTransaction,
570 object: None,
571 data: None,
572 });
573 }
574 Operation::Data => {
575 // 收到对 SDATA 的 ACKNO
576 let param_data_len = self.data_param.borrow().len() as u64;
577 if self.sending_data_progress.get() < param_data_len {
578 // 还有参数数据需要发送
579 let (data_chunk, _is_last) =
580 match self.slice_data(Role::Host) {
581 Ok(d) => d,
582 Err(e) => {
583 reset_transaction_state();
584 return err(e);
585 }
586 };
587 return send(Command {
588 msg_id: next_msg_id_for_send(),
589 operation: Operation::Data,
590 object: Some(
591 self.root_operation.get().to_name().to_string(),
592 ),
593 data: Some(data_chunk),
594 });
595 } else {
596 // 参数数据已全部发送完毕,发送 ENDTR
597 self.stage.set(Stage::ParameterSent);
598 return send(Command {
599 msg_id: next_msg_id_for_send(),
600 operation: Operation::EndTransaction,
601 object: None,
602 data: None,
603 });
604 }
605 }
606 _ => {
607 return err(
608 "Host received ACKNO for unexpected command in SendingParameter stage",
609 );
610 }
611 }
612 }
613 Role::Device => {
614 // Device 等待 SDATA 或 ENDTR
615 if recv.operation == Operation::Data {
616 if let Some(ref data_vec) = recv.data {
617 self.data_param.borrow_mut().extend_from_slice(data_vec);
618 }
619 return ack(recv.msg_id, recv.operation);
620 } else if recv.operation == Operation::EndTransaction {
621 self.stage.set(Stage::ParameterSent);
622 return ack(recv.msg_id, recv.operation);
623 } else {
624 return err(
625 "Device expected DATA or ENDTR in SendingParameter stage",
626 );
627 }
628 }
629 Role::Idle => {
630 panic!("Role cannot be Idle if Stage is SendingParameter")
631 }
632 }
633 }
634 Stage::ParameterSent => {
635 /* Host -> 收到对 ENDTR 的 ACK,发送 QUERY。等待回传数据或 AWAKE 保活。
636 Device -> 收到 QUERY,执行逻辑,处理保活和/或回传数据。 */
637 match self.role.get() {
638 Role::Host => match recv.operation {
639 Operation::Acknowledge => {
640 self.status.set(Status::Other); // ACK received
641 if recv.object == Some(String::from("ENDTR")) {
642 return send(Command {
643 msg_id: util::msg_id::increment(recv.msg_id),
644 operation: Operation::Query,
645 object: None,
646 data: None,
647 });
648 } else if recv.object == Some(String::from("QUERY")) {
649 return None;
650 } else {
651 return err(
652 "Host: Unexpected ACK object in ParameterSent stage",
653 );
654 }
655 }
656 Operation::Await => {
657 return ack(recv.msg_id, recv.operation);
658 }
659 Operation::Return => {
660 if recv.object == Some(String::from("EMPTY"))
661 || recv.object
662 == Some(self.root_operation.get().to_name().to_string())
663 {
664 self.stage.set(Stage::SendingResponse);
665 return ack(recv.msg_id, recv.operation);
666 }
667 }
668 _ => {
669 return err("Should be ACKNO, AWAIT or RETURN");
670 }
671 },
672 Role::Device => {
673 if recv.operation == Operation::Query {
674 // 开始执行逻辑,然后 ACK
675 match self.root_operation.get() {
676 Operation::GetVersion => {
677 self.data_return.replace(
678 self.config.pk_version.as_bytes().to_vec(),
679 );
680 self.stage.set(Stage::SendingResponse);
681 }
682 Operation::RequireVariable => {
683 let key = match self
684 .root_object
685 .borrow()
686 .as_ref()
687 .cloned()
688 {
689 Some(k) => k,
690 None => {
691 // This check should ideally be when root_op was received
692 reset_transaction_state();
693 return err(
694 "Internal: Missing object name for REQUV.",
695 );
696 }
697 };
698 self.data_return.replace(
699 self.variable_accessor.get(key).unwrap_or(vec![]),
700 );
701 self.stage.set(Stage::SendingResponse);
702 }
703 Operation::SendVariable => {
704 let key = match self
705 .root_object
706 .borrow()
707 .as_ref()
708 .cloned()
709 {
710 Some(k) => k,
711 None => {
712 // This check should ideally be when root_op was received
713 reset_transaction_state();
714 return err(
715 "Internal: Missing object name for SENDV.",
716 );
717 }
718 };
719 self.data_return.replace(
720 if let Err(e) = self
721 .variable_accessor
722 .set(key, self.data_param.borrow().clone())
723 {
724 e.as_bytes().to_vec()
725 } else {
726 vec![]
727 },
728 );
729 self.stage.set(Stage::SendingResponse); // Note: SENDV error reporting via data_return
730 }
731 Operation::Invoke => {
732 self.device_op_pending.set(true);
733 self.device_await_deadline.set(Some(
734 Instant::now() + self.config.await_interval,
735 ));
736 // The object for INVOK is self.root_object, not from QUERY (recv.object)
737 let method_name = match self
738 .root_object
739 .borrow()
740 .as_ref()
741 .cloned()
742 {
743 Some(name) => name,
744 None => {
745 reset_transaction_state();
746 return err(
747 "Internal: Missing method name for INVOK",
748 );
749 }
750 };
751 match self
752 .method_accessor
753 .call(method_name, self.data_param.borrow().clone())
754 {
755 Ok(pollable) => {
756 self.pending_pollable.replace(Some(pollable));
757 }
758 Err(_) => {
759 reset_transaction_state();
760 // log::error!("Failed to create INVOK pollable: {}", e_str);
761 return err(
762 "Failed to initiate INVOK operation",
763 );
764 }
765 }
766 }
767 _ => {
768 reset_transaction_state();
769 return err("Not a root operation");
770 }
771 }
772 self.stage.set(Stage::SendingResponse);
773 self.device_should_return.set(true);
774 return ack(recv.msg_id, recv.operation);
775 }
776 }
777 Role::Idle => {
778 panic!("Role cannot be Idle if Stage is ParameterSent")
779 }
780 }
781 }
782 Stage::SendingResponse => {
783 /* Host -> 接收数据。
784 Device -> 收到对 RTURN/SDATA 的 ACK,继续发送数据或终止 */
785 match self.role.get() {
786 Role::Host => {
787 // Host 等待 SDATA 或 ENDTR
788 if recv.operation == Operation::Data {
789 // Host receives SDATA from Device
790 if let Some(ref data_vec) = recv.data {
791 self.data_return.borrow_mut().extend_from_slice(data_vec);
792 }
793 return ack(recv.msg_id, recv.operation);
794 } else if recv.operation == Operation::EndTransaction {
795 let endtr_ack = ack(recv.msg_id, recv.operation);
796 self.stage.set(Stage::Idle);
797 self.status.set(Status::Other); // After sending ACK, status is Other
798 return endtr_ack;
799 } else {
800 return err(
801 "Host expected SDATA or ENDTR in SendingResponse stage",
802 );
803 }
804 }
805 Role::Device => {
806 // Device 必须是收到了 ACKNO
807 if recv.operation != Operation::Acknowledge {
808 return err("Device expected ACKNO in SendingResponse stage");
809 }
810 self.status.set(Status::Other);
811
812 // 将借用操作限制在最小作用域
813 let last_sent_op;
814 {
815 last_sent_op = self.last_sent_command.borrow().operation;
816 } // 不可变借用在此结束
817
818 match last_sent_op {
819 Operation::Return => {
820 // 收到对 RETURN 的 ACKNO
821 let return_data_len =
822 self.data_return.borrow().len() as u64;
823 if return_data_len == 0 {
824 // 没有返回值,直接发送 ENDTR
825 self.stage.set(Stage::Idle); // Transaction ends
826 return send(Command {
827 msg_id: next_msg_id_for_send(),
828 operation: Operation::EndTransaction,
829 object: None,
830 data: None,
831 });
832 } else {
833 // 有返回值
834 let (data_chunk, _) =
835 match self.slice_data(Role::Device) {
836 Ok(d) => d,
837 Err(e) => {
838 reset_transaction_state();
839 return err(e);
840 }
841 };
842
843 return send(Command {
844 msg_id: next_msg_id_for_send(),
845 operation: Operation::Data,
846 object: Some(
847 self.root_operation.get().to_name().to_string(),
848 ),
849 data: Some(data_chunk),
850 });
851 }
852 }
853 Operation::Data => {
854 if self.sending_data_progress.get()
855 < self.data_return.borrow().len() as u64
856 {
857 let (data_chunk, _) =
858 match self.slice_data(Role::Device) {
859 Ok(d) => d,
860 Err(e) => {
861 reset_transaction_state();
862 return err(e);
863 }
864 };
865 return send(Command {
866 msg_id: next_msg_id_for_send(),
867 operation: Operation::Data,
868 object: Some(
869 self.root_operation.get().to_name().to_string(),
870 ),
871 data: Some(data_chunk),
872 });
873 } else {
874 return send(Command {
875 msg_id: next_msg_id_for_send(),
876 operation: Operation::EndTransaction,
877 object: None,
878 data: None,
879 });
880 }
881 }
882 Operation::EndTransaction => {
883 self.role.set(Role::Idle);
884 self.stage.set(Stage::Idle);
885 // reset_transaction_state();
886 // 结束后不清理数据,考虑到外部可能手动获取,这里就不操心了
887 return None;
888 }
889 Operation::Await => {
890 // Device received ACKNO AWAIT
891 // self.status is Other. Device continues pending op.
892 return None;
893 }
894 _ => {
895 return err(
896 "Device received ACKNO for unexpected command in SendingResponse stage",
897 );
898 }
899 }
900 }
901 _ => {
902 panic!("Role cannot be Idle if Stage is SendingResponse")
903 }
904 }
905 }
906 }
907 }
908 }
909 None
910 }
911
912 /// Initiates a new root operation from the Host side.
913 ///
914 /// This method should only be called when the `PkCommand` instance is in an `Idle` state.
915 /// It sets up the necessary internal state to begin a new transaction.
916 /// The actual `START` command and subsequent root operation command will be generated
917 /// by subsequent calls to `poll()`.
918 ///
919 /// # Arguments
920 /// * `operation`: The root `Operation` to perform (e.g., `SENDV`, `REQUV`, `INVOK`, `PKVER`).
921 /// * `object`: An optional `String` representing the object of the operation (e.g., variable name, method name).
922 /// * `data`: Optional `Vec<u8>` containing parameter data for the operation (e.g., for `SENDV` or `INVOK`).
923 ///
924 /// # Returns
925 /// `Ok(())` if the operation can be initiated, or `Err(&'static str)` if not (e.g., not idle, or not a root operation).
926 pub fn perform(
927 &self,
928 operation: Operation,
929 object: Option<String>,
930 data: Option<Vec<u8>>,
931 ) -> Result<(), &'static str> {
932 if operation.is_root()
933 && self.stage.get() == Stage::Idle
934 && self.status.get() == Status::Other
935 && self.role.get() == Role::Idle
936 {
937 self.root_operation.set(operation);
938 self.root_object.replace(object);
939 self.data_param.replace(data.unwrap_or(vec![]));
940 self.role.set(Role::Host);
941 self.stage.set(Stage::Started);
942 self.status.set(Status::Other);
943 Ok(())
944 } else if !operation.is_root() {
945 Err("Cannot initiate a non-root operation")
946 } else {
947 Err("Cannot initiate an operation when the transaction is in progress")
948 }
949 }
950
951 fn reset_transaction_state(&self) -> () {
952 self.stage.set(Stage::Idle);
953 self.status.set(Status::Other);
954 self.role.set(Role::Idle);
955 // Clear other relevant fields like root_operation, data_param, data_return, device_op_pending etc.
956 self.data_param.borrow_mut().clear();
957 self.data_return.borrow_mut().clear();
958 self.sending_data_progress.set(0);
959 self.device_op_pending.set(false);
960 self.device_await_deadline.set(None);
961 self.pending_pollable.borrow_mut().take(); // Clear the pollable
962 }
963
964 /// Checks if the transaction is complete (i.e., the state machine is in the `Idle` stage).
965 ///
966 /// # Returns
967 /// `true` if the transaction is complete, `false` otherwise.
968 pub fn is_complete(&self) -> bool {
969 self.stage.get() == Stage::Idle
970 }
971
972 /// Retrieves the return data from the completed transaction.
973 ///
974 /// This method should only be called when `is_complete()` returns `true` and the
975 /// instance is acting as the Host.
976 ///
977 /// # Returns
978 /// `Some(Vec<u8>)` containing the return data if available, or `None` if there was no return data.
979 pub fn get_return_data(&self) -> Option<Vec<u8>> {
980 if self.stage.get() == Stage::Idle && self.role.get() == Role::Host {
981 let data = self.data_return.borrow().clone();
982 self.reset_transaction_state();
983 if data.is_empty() {
984 None
985 } else {
986 Some(data.clone())
987 }
988 } else {
989 None // Not in a state to provide return data
990 }
991 }
992
993 /// Waits for the transaction to complete and then executes a callback with the return data.
994 ///
995 /// This is a polling-based wait. The callback is only executed once the state machine enters the `Idle` stage.
996 ///
997 /// # Arguments
998 /// * `callback`: A closure that takes an `Option<Vec<u8>>` (the return data) and is executed upon completion.
999 ///
1000 /// # Returns
1001 /// `true` if the callback was executed, or `false` otherwise
1002 ///
1003 /// # Note
1004 /// This method assumes `poll()` is being called externally to drive the state machine.
1005 /// It does not block the current thread waiting for completion, but rather checks the state
1006 /// and executes the callback if complete. You must ensure `poll()` is called frequently
1007 /// for the transaction to progress.
1008 pub fn wait_for_complete_and<F>(&self, callback: F) -> bool
1009 where
1010 F: FnOnce(Option<Vec<u8>>) -> (),
1011 {
1012 // 这个函数也是轮询的,用来给 Host 方返回值(因为在上面的 perform 中并没有告诉 PK 该怎么处理返回值)
1013 if self.stage.get() == Stage::Idle {
1014 let data = self.data_return.borrow().clone();
1015 self.reset_transaction_state();
1016 callback(if data.len() == 0 { None } else { Some(data) });
1017 true
1018 } else {
1019 false
1020 }
1021 }
1022
1023 /// Creates a new `PkCommand` state machine instance.
1024 ///
1025 /// # Arguments
1026 /// * `config`: The `PkCommandConfig` to use.
1027 /// * `variable_accessor`: An implementation of `PkVariableAccessor` for variable operations.
1028 /// * `method_accessor`: An implementation of `PkMethodAccessor` for method invocation.
1029 ///
1030 pub fn new(config: PkCommandConfig, variable_accessor: VA, method_accessor: MA) -> Self {
1031 PkCommand {
1032 stage: Cell::new(Stage::Idle),
1033 status: Cell::new(Status::Other),
1034 role: Cell::new(Role::Idle),
1035 last_sent_command: RefCell::new(Command {
1036 msg_id: 0,
1037 operation: Operation::Empty,
1038 object: None,
1039 data: None,
1040 }),
1041 last_sent_msg_id: Cell::new(0),
1042 last_received_msg_id: Cell::new(0),
1043 data_param: RefCell::new(vec![]),
1044 data_return: RefCell::new(vec![]),
1045 sending_data_progress: Cell::new(0),
1046 root_operation: Cell::new(Operation::Empty),
1047 root_object: RefCell::new(None),
1048 command_buffer: RefCell::new(Command {
1049 msg_id: 0,
1050 operation: Operation::Empty,
1051 object: None,
1052 data: None,
1053 }),
1054 command_processed: Cell::new(true),
1055 last_command_time: Cell::new(Instant::now()),
1056 device_op_pending: Cell::new(false),
1057 device_await_deadline: Cell::new(None),
1058 config,
1059 variable_accessor,
1060 method_accessor,
1061 pending_pollable: RefCell::new(None),
1062 device_should_return: Cell::new(false),
1063 }
1064 }
1065}
1066impl PkCommandConfig {
1067 /// Creates a `PkCommandConfig` with default timeout values.
1068 ///
1069 /// # Arguments
1070 /// * `packet_limit`: The maximum packet size allowed by the transport layer.
1071 ///
1072 pub fn default(packet_limit: u64) -> Self {
1073 PkCommandConfig {
1074 ack_timeout: Duration::from_millis(100),
1075 inter_command_timeout: Duration::from_millis(500),
1076 await_interval: Duration::from_millis(300),
1077 packet_limit,
1078 pk_version: PK_VERSION,
1079 }
1080 }
1081
1082 /// Creates a new `PkCommandConfig` with specified values.
1083 ///
1084 /// # Arguments
1085 /// * `ack_timeout`: ACK timeout in milliseconds.
1086 /// * `inter_command_timeout`: Inter-command timeout in milliseconds.
1087 /// * `await_interval`: AWAIT interval in milliseconds.
1088 /// * `packet_limit`: The maximum packet size allowed by the transport layer.
1089 ///
1090 pub fn new(
1091 ack_timeout: u64,
1092 inter_command_timeout: u64,
1093 await_interval: u64,
1094 packet_limit: u64,
1095 ) -> Self {
1096 PkCommandConfig {
1097 ack_timeout: Duration::from_millis(ack_timeout),
1098 inter_command_timeout: Duration::from_millis(inter_command_timeout),
1099 await_interval: Duration::from_millis(await_interval),
1100 packet_limit, // Default packet limit if not specified
1101 pk_version: PK_VERSION,
1102 }
1103 }
1104}