1use core::time::Duration;
19
20use crate::{
21 acl::Accessor,
22 error::*,
23 tlv::{get_root_node_struct, FromTLV, TLVElement, TLVWriter, TagType, ToTLV},
24 transport::{exchange::Exchange, packet::Packet},
25 utils::epoch::Epoch,
26};
27use log::error;
28use num::{self, FromPrimitive};
29use num_derive::FromPrimitive;
30
31use super::messages::msg::{
32 self, InvReq, ReadReq, StatusResp, SubscribeReq, SubscribeResp, TimedReq, WriteReq,
33};
34
35#[macro_export]
36macro_rules! cmd_enter {
37 ($e:expr) => {{
38 use owo_colors::OwoColorize;
39 info! {"{} {}", "Handling command".cyan(), $e.cyan()}
40 }};
41}
42
43#[derive(FromPrimitive, Debug, Clone, Copy, PartialEq)]
44pub enum IMStatusCode {
45 Success = 0,
46 Failure = 1,
47 InvalidSubscription = 0x7D,
48 UnsupportedAccess = 0x7E,
49 UnsupportedEndpoint = 0x7F,
50 InvalidAction = 0x80,
51 UnsupportedCommand = 0x81,
52 InvalidCommand = 0x85,
53 UnsupportedAttribute = 0x86,
54 ConstraintError = 0x87,
55 UnsupportedWrite = 0x88,
56 ResourceExhausted = 0x89,
57 NotFound = 0x8b,
58 UnreportableAttribute = 0x8c,
59 InvalidDataType = 0x8d,
60 UnsupportedRead = 0x8f,
61 DataVersionMismatch = 0x92,
62 Timeout = 0x94,
63 Busy = 0x9c,
64 UnsupportedCluster = 0xc3,
65 NoUpstreamSubscription = 0xc5,
66 NeedsTimedInteraction = 0xc6,
67 UnsupportedEvent = 0xc7,
68 PathsExhausted = 0xc8,
69 TimedRequestMisMatch = 0xc9,
70 FailSafeRequired = 0xca,
71}
72
73impl From<ErrorCode> for IMStatusCode {
74 fn from(e: ErrorCode) -> Self {
75 match e {
76 ErrorCode::EndpointNotFound => IMStatusCode::UnsupportedEndpoint,
77 ErrorCode::ClusterNotFound => IMStatusCode::UnsupportedCluster,
78 ErrorCode::AttributeNotFound => IMStatusCode::UnsupportedAttribute,
79 ErrorCode::CommandNotFound => IMStatusCode::UnsupportedCommand,
80 ErrorCode::InvalidAction => IMStatusCode::InvalidAction,
81 ErrorCode::InvalidCommand => IMStatusCode::InvalidCommand,
82 ErrorCode::UnsupportedAccess => IMStatusCode::UnsupportedAccess,
83 ErrorCode::Busy => IMStatusCode::Busy,
84 ErrorCode::DataVersionMismatch => IMStatusCode::DataVersionMismatch,
85 ErrorCode::ResourceExhausted => IMStatusCode::ResourceExhausted,
86 _ => IMStatusCode::Failure,
87 }
88 }
89}
90
91impl From<Error> for IMStatusCode {
92 fn from(value: Error) -> Self {
93 Self::from(value.code())
94 }
95}
96
97impl FromTLV<'_> for IMStatusCode {
98 fn from_tlv(t: &TLVElement) -> Result<Self, Error> {
99 FromPrimitive::from_u16(t.u16()?).ok_or_else(|| ErrorCode::Invalid.into())
100 }
101}
102
103impl ToTLV for IMStatusCode {
104 fn to_tlv(&self, tw: &mut TLVWriter, tag_type: TagType) -> Result<(), Error> {
105 tw.u16(tag_type, *self as u16)
106 }
107}
108
109#[derive(FromPrimitive, Debug, Copy, Clone, Eq, PartialEq)]
110pub enum OpCode {
111 Reserved = 0,
112 StatusResponse = 1,
113 ReadRequest = 2,
114 SubscribeRequest = 3,
115 SubscribeResponse = 4,
116 ReportData = 5,
117 WriteRequest = 6,
118 WriteResponse = 7,
119 InvokeRequest = 8,
120 InvokeResponse = 9,
121 TimedRequest = 10,
122}
123
124pub const PROTO_ID_INTERACTION_MODEL: u16 = 0x01;
126
127const LONG_READS_TLV_RESERVE_SIZE: usize = 24;
130
131impl<'a> ReadReq<'a> {
132 pub fn tx_start<'r, 'p>(&self, tx: &'r mut Packet<'p>) -> Result<TLVWriter<'r, 'p>, Error> {
133 tx.reset();
134 tx.set_proto_id(PROTO_ID_INTERACTION_MODEL);
135 tx.set_proto_opcode(OpCode::ReportData as u8);
136
137 let mut tw = Self::reserve_long_read_space(tx)?;
138
139 tw.start_struct(TagType::Anonymous)?;
140
141 if self.attr_requests.is_some() {
142 tw.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?;
143 }
144
145 Ok(tw)
146 }
147
148 pub fn tx_finish_chunk(&self, tx: &mut Packet) -> Result<(), Error> {
149 self.complete(tx, true)
150 }
151
152 pub fn tx_finish(&self, tx: &mut Packet) -> Result<(), Error> {
153 self.complete(tx, false)
154 }
155
156 fn complete(&self, tx: &mut Packet<'_>, more_chunks: bool) -> Result<(), Error> {
157 let mut tw = Self::restore_long_read_space(tx)?;
158
159 if self.attr_requests.is_some() {
160 tw.end_container()?;
161 }
162
163 if more_chunks {
164 tw.bool(
165 TagType::Context(msg::ReportDataTag::MoreChunkedMsgs as u8),
166 true,
167 )?;
168 }
169
170 tw.bool(
171 TagType::Context(msg::ReportDataTag::SupressResponse as u8),
172 !more_chunks,
173 )?;
174
175 tw.end_container()
176 }
177
178 fn reserve_long_read_space<'p, 'b>(tx: &'p mut Packet<'b>) -> Result<TLVWriter<'p, 'b>, Error> {
179 let wb = tx.get_writebuf()?;
180 wb.shrink(LONG_READS_TLV_RESERVE_SIZE)?;
181
182 Ok(TLVWriter::new(wb))
183 }
184
185 fn restore_long_read_space<'p, 'b>(tx: &'p mut Packet<'b>) -> Result<TLVWriter<'p, 'b>, Error> {
186 let wb = tx.get_writebuf()?;
187 wb.expand(LONG_READS_TLV_RESERVE_SIZE)?;
188
189 Ok(TLVWriter::new(wb))
190 }
191}
192
193impl<'a> WriteReq<'a> {
194 pub fn tx_start<'r, 'p>(
195 &self,
196 tx: &'r mut Packet<'p>,
197 epoch: Epoch,
198 timeout: Option<Duration>,
199 ) -> Result<Option<TLVWriter<'r, 'p>>, Error> {
200 if has_timed_out(epoch, timeout) {
201 Interaction::status_response(tx, IMStatusCode::Timeout)?;
202
203 Ok(None)
204 } else {
205 tx.reset();
206 tx.set_proto_id(PROTO_ID_INTERACTION_MODEL);
207 tx.set_proto_opcode(OpCode::WriteResponse as u8);
208
209 let mut tw = TLVWriter::new(tx.get_writebuf()?);
210
211 tw.start_struct(TagType::Anonymous)?;
212 tw.start_array(TagType::Context(msg::WriteRespTag::WriteResponses as u8))?;
213
214 Ok(Some(tw))
215 }
216 }
217
218 pub fn tx_finish(&self, tx: &mut Packet<'_>) -> Result<(), Error> {
219 let mut tw = TLVWriter::new(tx.get_writebuf()?);
220
221 tw.end_container()?;
222 tw.end_container()
223 }
224}
225
226impl<'a> InvReq<'a> {
227 pub fn tx_start<'r, 'p>(
228 &self,
229 tx: &'r mut Packet<'p>,
230 epoch: Epoch,
231 timeout: Option<Duration>,
232 ) -> Result<Option<TLVWriter<'r, 'p>>, Error> {
233 if has_timed_out(epoch, timeout) {
234 Interaction::status_response(tx, IMStatusCode::Timeout)?;
235
236 Ok(None)
237 } else {
238 let timed_tx = timeout.map(|_| true);
239 let timed_request = self.timed_request.filter(|a| *a);
240
241 if timed_tx != timed_request {
243 Interaction::status_response(tx, IMStatusCode::TimedRequestMisMatch)?;
244
245 Ok(None)
246 } else {
247 tx.reset();
248 tx.set_proto_id(PROTO_ID_INTERACTION_MODEL);
249 tx.set_proto_opcode(OpCode::InvokeResponse as u8);
250
251 let mut tw = TLVWriter::new(tx.get_writebuf()?);
252
253 tw.start_struct(TagType::Anonymous)?;
254
255 tw.bool(
257 TagType::Context(msg::InvRespTag::SupressResponse as u8),
258 false,
259 )?;
260
261 if self.inv_requests.is_some() {
262 tw.start_array(TagType::Context(msg::InvRespTag::InvokeResponses as u8))?;
263 }
264
265 Ok(Some(tw))
266 }
267 }
268 }
269
270 pub fn tx_finish(&self, tx: &mut Packet<'_>) -> Result<(), Error> {
271 let mut tw = TLVWriter::new(tx.get_writebuf()?);
272
273 if self.inv_requests.is_some() {
274 tw.end_container()?;
275 }
276
277 tw.end_container()
278 }
279}
280
281impl TimedReq {
282 pub fn timeout(&self, epoch: Epoch) -> Duration {
283 epoch()
284 .checked_add(Duration::from_millis(self.timeout as _))
285 .unwrap()
286 }
287
288 pub fn tx_process(self, tx: &mut Packet<'_>, epoch: Epoch) -> Result<Duration, Error> {
289 Interaction::status_response(tx, IMStatusCode::Success)?;
290
291 Ok(epoch()
292 .checked_add(Duration::from_millis(self.timeout as _))
293 .unwrap())
294 }
295}
296
297impl<'a> SubscribeReq<'a> {
298 pub fn tx_start<'r, 'p>(
299 &self,
300 tx: &'r mut Packet<'p>,
301 subscription_id: u32,
302 ) -> Result<TLVWriter<'r, 'p>, Error> {
303 tx.reset();
304 tx.set_proto_id(PROTO_ID_INTERACTION_MODEL);
305 tx.set_proto_opcode(OpCode::ReportData as u8);
306
307 let mut tw = ReadReq::reserve_long_read_space(tx)?;
308
309 tw.start_struct(TagType::Anonymous)?;
310
311 tw.u32(
312 TagType::Context(msg::ReportDataTag::SubscriptionId as u8),
313 subscription_id,
314 )?;
315
316 if self.attr_requests.is_some() {
317 tw.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?;
318 }
319
320 Ok(tw)
321 }
322
323 pub fn tx_finish_chunk(&self, tx: &mut Packet<'_>, more_chunks: bool) -> Result<(), Error> {
324 let mut tw = ReadReq::restore_long_read_space(tx)?;
325
326 if self.attr_requests.is_some() {
327 tw.end_container()?;
328 }
329
330 if more_chunks {
331 tw.bool(
332 TagType::Context(msg::ReportDataTag::MoreChunkedMsgs as u8),
333 true,
334 )?;
335 }
336
337 tw.bool(
338 TagType::Context(msg::ReportDataTag::SupressResponse as u8),
339 false,
340 )?;
341
342 tw.end_container()
343 }
344
345 pub fn tx_process_final(&self, tx: &mut Packet, subscription_id: u32) -> Result<(), Error> {
346 tx.reset();
347 tx.set_proto_id(PROTO_ID_INTERACTION_MODEL);
348 tx.set_proto_opcode(OpCode::SubscribeResponse as u8);
349
350 let mut tw = TLVWriter::new(tx.get_writebuf()?);
351
352 let resp = SubscribeResp::new(subscription_id, 40);
353 resp.to_tlv(&mut tw, TagType::Anonymous)
354 }
355}
356
357pub struct ReadDriver<'a, 'r, 'p> {
358 exchange: &'r mut Exchange<'a>,
359 tx: &'r mut Packet<'p>,
360 rx: &'r mut Packet<'p>,
361 completed: bool,
362}
363
364impl<'a, 'r, 'p> ReadDriver<'a, 'r, 'p> {
365 fn new(exchange: &'r mut Exchange<'a>, tx: &'r mut Packet<'p>, rx: &'r mut Packet<'p>) -> Self {
366 Self {
367 exchange,
368 tx,
369 rx,
370 completed: false,
371 }
372 }
373
374 fn start(&mut self, req: &ReadReq) -> Result<(), Error> {
375 req.tx_start(self.tx)?;
376
377 Ok(())
378 }
379
380 pub fn accessor(&self) -> Result<Accessor<'a>, Error> {
381 self.exchange.accessor()
382 }
383
384 pub fn writer(&mut self) -> Result<TLVWriter<'_, 'p>, Error> {
385 if self.completed {
386 Err(ErrorCode::Invalid.into()) } else {
388 Ok(TLVWriter::new(self.tx.get_writebuf()?))
389 }
390 }
391
392 pub async fn send_chunk(&mut self, req: &ReadReq<'_>) -> Result<bool, Error> {
393 req.tx_finish_chunk(self.tx)?;
394
395 if exchange_confirm(self.exchange, self.tx, self.rx).await? != IMStatusCode::Success {
396 self.completed = true;
397 Ok(false)
398 } else {
399 req.tx_start(self.tx)?;
400
401 Ok(true)
402 }
403 }
404
405 pub async fn complete(&mut self, req: &ReadReq<'_>) -> Result<(), Error> {
406 req.tx_finish(self.tx)?;
407
408 self.exchange.send_complete(self.tx).await
409 }
410}
411
412pub struct WriteDriver<'a, 'r, 'p> {
413 exchange: &'r mut Exchange<'a>,
414 tx: &'r mut Packet<'p>,
415 epoch: Epoch,
416 timeout: Option<Duration>,
417}
418
419impl<'a, 'r, 'p> WriteDriver<'a, 'r, 'p> {
420 fn new(
421 exchange: &'r mut Exchange<'a>,
422 epoch: Epoch,
423 timeout: Option<Duration>,
424 tx: &'r mut Packet<'p>,
425 ) -> Self {
426 Self {
427 exchange,
428 tx,
429 epoch,
430 timeout,
431 }
432 }
433
434 async fn start(&mut self, req: &WriteReq<'_>) -> Result<bool, Error> {
435 if req.tx_start(self.tx, self.epoch, self.timeout)?.is_some() {
436 Ok(true)
437 } else {
438 self.exchange.send_complete(self.tx).await?;
439
440 Ok(false)
441 }
442 }
443
444 pub fn accessor(&self) -> Result<Accessor<'a>, Error> {
445 self.exchange.accessor()
446 }
447
448 pub fn writer(&mut self) -> Result<TLVWriter<'_, 'p>, Error> {
449 Ok(TLVWriter::new(self.tx.get_writebuf()?))
450 }
451
452 pub async fn complete(&mut self, req: &WriteReq<'_>) -> Result<(), Error> {
453 if !req.supress_response.unwrap_or_default() {
454 req.tx_finish(self.tx)?;
455 self.exchange.send_complete(self.tx).await?;
456 }
457
458 Ok(())
459 }
460}
461
462pub struct InvokeDriver<'a, 'r, 'p> {
463 exchange: &'r mut Exchange<'a>,
464 tx: &'r mut Packet<'p>,
465 epoch: Epoch,
466 timeout: Option<Duration>,
467}
468
469impl<'a, 'r, 'p> InvokeDriver<'a, 'r, 'p> {
470 fn new(
471 exchange: &'r mut Exchange<'a>,
472 epoch: Epoch,
473 timeout: Option<Duration>,
474 tx: &'r mut Packet<'p>,
475 ) -> Self {
476 Self {
477 exchange,
478 tx,
479 epoch,
480 timeout,
481 }
482 }
483
484 async fn start(&mut self, req: &InvReq<'_>) -> Result<bool, Error> {
485 if req.tx_start(self.tx, self.epoch, self.timeout)?.is_some() {
486 Ok(true)
487 } else {
488 self.exchange.send_complete(self.tx).await?;
489
490 Ok(false)
491 }
492 }
493
494 pub fn accessor(&self) -> Result<Accessor<'a>, Error> {
495 self.exchange.accessor()
496 }
497
498 pub fn writer(&mut self) -> Result<TLVWriter<'_, 'p>, Error> {
499 Ok(TLVWriter::new(self.tx.get_writebuf()?))
500 }
501
502 pub fn writer_exchange(&mut self) -> Result<(TLVWriter<'_, 'p>, &Exchange<'a>), Error> {
503 Ok((TLVWriter::new(self.tx.get_writebuf()?), (self.exchange)))
504 }
505
506 pub async fn complete(&mut self, req: &InvReq<'_>) -> Result<(), Error> {
507 if !req.suppress_response.unwrap_or_default() {
508 req.tx_finish(self.tx)?;
509 self.exchange.send_complete(self.tx).await?;
510 }
511
512 Ok(())
513 }
514}
515
516pub struct SubscribeDriver<'a, 'r, 'p> {
517 exchange: &'r mut Exchange<'a>,
518 tx: &'r mut Packet<'p>,
519 rx: &'r mut Packet<'p>,
520 subscription_id: u32,
521 completed: bool,
522}
523
524impl<'a, 'r, 'p> SubscribeDriver<'a, 'r, 'p> {
525 fn new(
526 exchange: &'r mut Exchange<'a>,
527 subscription_id: u32,
528 tx: &'r mut Packet<'p>,
529 rx: &'r mut Packet<'p>,
530 ) -> Self {
531 Self {
532 exchange,
533 tx,
534 rx,
535 subscription_id,
536 completed: false,
537 }
538 }
539
540 fn start(&mut self, req: &SubscribeReq) -> Result<(), Error> {
541 req.tx_start(self.tx, self.subscription_id)?;
542
543 Ok(())
544 }
545
546 pub fn accessor(&self) -> Result<Accessor<'a>, Error> {
547 self.exchange.accessor()
548 }
549
550 pub fn writer(&mut self) -> Result<TLVWriter<'_, 'p>, Error> {
551 if self.completed {
552 Err(ErrorCode::Invalid.into()) } else {
554 Ok(TLVWriter::new(self.tx.get_writebuf()?))
555 }
556 }
557
558 pub async fn send_chunk(&mut self, req: &SubscribeReq<'_>) -> Result<bool, Error> {
559 req.tx_finish_chunk(self.tx, true)?;
560
561 if exchange_confirm(self.exchange, self.tx, self.rx).await? != IMStatusCode::Success {
562 self.completed = true;
563 Ok(false)
564 } else {
565 req.tx_start(self.tx, self.subscription_id)?;
566
567 Ok(true)
568 }
569 }
570
571 pub async fn complete(&mut self, req: &SubscribeReq<'_>) -> Result<(), Error> {
572 if !self.completed {
573 req.tx_finish_chunk(self.tx, false)?;
574
575 if exchange_confirm(self.exchange, self.tx, self.rx).await? != IMStatusCode::Success {
576 self.completed = true;
577 } else {
578 req.tx_process_final(self.tx, self.subscription_id)?;
579 self.exchange.send_complete(self.tx).await?;
580 }
581 }
582
583 Ok(())
584 }
585}
586
587pub enum Interaction<'a, 'r, 'p> {
588 Read {
589 req: ReadReq<'r>,
590 driver: ReadDriver<'a, 'r, 'p>,
591 },
592 Write {
593 req: WriteReq<'r>,
594 driver: WriteDriver<'a, 'r, 'p>,
595 },
596 Invoke {
597 req: InvReq<'r>,
598 driver: InvokeDriver<'a, 'r, 'p>,
599 },
600 Subscribe {
601 req: SubscribeReq<'r>,
602 driver: SubscribeDriver<'a, 'r, 'p>,
603 },
604}
605
606impl<'a, 'r, 'p> Interaction<'a, 'r, 'p> {
607 pub async fn timeout(
608 exchange: &mut Exchange<'_>,
609 rx: &mut Packet<'_>,
610 tx: &mut Packet<'_>,
611 ) -> Result<Option<Duration>, Error> {
612 let epoch = exchange.matter.epoch;
613
614 let mut opcode: OpCode = rx.get_proto_opcode()?;
615
616 let mut timeout = None;
617
618 while opcode == OpCode::TimedRequest {
619 let rx_data = rx.as_slice();
620 let req = TimedReq::from_tlv(&get_root_node_struct(rx_data)?)?;
621
622 timeout = Some(req.tx_process(tx, epoch)?);
623
624 exchange.exchange(tx, rx).await?;
625
626 opcode = rx.get_proto_opcode()?;
627 }
628
629 Ok(timeout)
630 }
631
632 #[inline(always)]
633 pub fn new<S>(
634 exchange: &'r mut Exchange<'a>,
635 rx: &'r Packet<'p>,
636 tx: &'r mut Packet<'p>,
637 rx_status: &'r mut Packet<'p>,
638 subscription_id: S,
639 timeout: Option<Duration>,
640 ) -> Result<Interaction<'a, 'r, 'p>, Error>
641 where
642 S: FnOnce() -> u32,
643 {
644 let epoch = exchange.matter.epoch;
645
646 let opcode = rx.get_proto_opcode()?;
647 let rx_data = rx.as_slice();
648
649 match opcode {
650 OpCode::ReadRequest => {
651 let req = ReadReq::from_tlv(&get_root_node_struct(rx_data)?)?;
652 let driver = ReadDriver::new(exchange, tx, rx_status);
653
654 Ok(Self::Read { req, driver })
655 }
656 OpCode::WriteRequest => {
657 let req = WriteReq::from_tlv(&get_root_node_struct(rx_data)?)?;
658 let driver = WriteDriver::new(exchange, epoch, timeout, tx);
659
660 Ok(Self::Write { req, driver })
661 }
662 OpCode::InvokeRequest => {
663 let req = InvReq::from_tlv(&get_root_node_struct(rx_data)?)?;
664 let driver = InvokeDriver::new(exchange, epoch, timeout, tx);
665
666 Ok(Self::Invoke { req, driver })
667 }
668 OpCode::SubscribeRequest => {
669 let req = SubscribeReq::from_tlv(&get_root_node_struct(rx_data)?)?;
670 let driver = SubscribeDriver::new(exchange, subscription_id(), tx, rx_status);
671
672 Ok(Self::Subscribe { req, driver })
673 }
674 _ => {
675 error!("Opcode not handled: {:?}", opcode);
676 Err(ErrorCode::InvalidOpcode.into())
677 }
678 }
679 }
680
681 pub async fn start(&mut self) -> Result<bool, Error> {
682 let started = match self {
683 Self::Read { req, driver } => {
684 driver.start(req)?;
685 true
686 }
687 Self::Write { req, driver } => driver.start(req).await?,
688 Self::Invoke { req, driver } => driver.start(req).await?,
689 Self::Subscribe { req, driver } => {
690 driver.start(req)?;
691 true
692 }
693 };
694
695 Ok(started)
696 }
697
698 fn status_response(tx: &mut Packet, status: IMStatusCode) -> Result<(), Error> {
699 tx.reset();
700 tx.set_proto_id(PROTO_ID_INTERACTION_MODEL);
701 tx.set_proto_opcode(OpCode::StatusResponse as u8);
702
703 let mut tw = TLVWriter::new(tx.get_writebuf()?);
704
705 let status = StatusResp { status };
706 status.to_tlv(&mut tw, TagType::Anonymous)
707 }
708}
709
710async fn exchange_confirm(
711 exchange: &mut Exchange<'_>,
712 tx: &mut Packet<'_>,
713 rx: &mut Packet<'_>,
714) -> Result<IMStatusCode, Error> {
715 exchange.exchange(tx, rx).await?;
716
717 let opcode: OpCode = rx.get_proto_opcode()?;
718
719 if opcode == OpCode::StatusResponse {
720 let resp = StatusResp::from_tlv(&get_root_node_struct(rx.as_slice())?)?;
721 Ok(resp.status)
722 } else {
723 Interaction::status_response(tx, IMStatusCode::Busy)?; exchange.send_complete(tx).await?;
726
727 Err(ErrorCode::Invalid.into()) }
729}
730
731fn has_timed_out(epoch: Epoch, timeout: Option<Duration>) -> bool {
732 timeout.map(|timeout| epoch() > timeout).unwrap_or(false)
733}