rs_matter/interaction_model/
core.rs

1/*
2 *
3 *    Copyright (c) 2020-2022 Project CHIP Authors
4 *
5 *    Licensed under the Apache License, Version 2.0 (the "License");
6 *    you may not use this file except in compliance with the License.
7 *    You may obtain a copy of the License at
8 *
9 *        http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *    Unless required by applicable law or agreed to in writing, software
12 *    distributed under the License is distributed on an "AS IS" BASIS,
13 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *    See the License for the specific language governing permissions and
15 *    limitations under the License.
16 */
17
18use core::time::Duration;
19
20use crate::{
21    acl::Accessor,
22    error::*,
23    tlv::{get_root_node_struct, FromTLV, TLVElement, TLVWriter, TagType, ToTLV},
24    transport::{exchange::Exchange, packet::Packet},
25    utils::epoch::Epoch,
26};
27use log::error;
28use num::{self, FromPrimitive};
29use num_derive::FromPrimitive;
30
31use super::messages::msg::{
32    self, InvReq, ReadReq, StatusResp, SubscribeReq, SubscribeResp, TimedReq, WriteReq,
33};
34
35#[macro_export]
36macro_rules! cmd_enter {
37    ($e:expr) => {{
38        use owo_colors::OwoColorize;
39        info! {"{} {}", "Handling command".cyan(), $e.cyan()}
40    }};
41}
42
43#[derive(FromPrimitive, Debug, Clone, Copy, PartialEq)]
44pub enum IMStatusCode {
45    Success = 0,
46    Failure = 1,
47    InvalidSubscription = 0x7D,
48    UnsupportedAccess = 0x7E,
49    UnsupportedEndpoint = 0x7F,
50    InvalidAction = 0x80,
51    UnsupportedCommand = 0x81,
52    InvalidCommand = 0x85,
53    UnsupportedAttribute = 0x86,
54    ConstraintError = 0x87,
55    UnsupportedWrite = 0x88,
56    ResourceExhausted = 0x89,
57    NotFound = 0x8b,
58    UnreportableAttribute = 0x8c,
59    InvalidDataType = 0x8d,
60    UnsupportedRead = 0x8f,
61    DataVersionMismatch = 0x92,
62    Timeout = 0x94,
63    Busy = 0x9c,
64    UnsupportedCluster = 0xc3,
65    NoUpstreamSubscription = 0xc5,
66    NeedsTimedInteraction = 0xc6,
67    UnsupportedEvent = 0xc7,
68    PathsExhausted = 0xc8,
69    TimedRequestMisMatch = 0xc9,
70    FailSafeRequired = 0xca,
71}
72
73impl From<ErrorCode> for IMStatusCode {
74    fn from(e: ErrorCode) -> Self {
75        match e {
76            ErrorCode::EndpointNotFound => IMStatusCode::UnsupportedEndpoint,
77            ErrorCode::ClusterNotFound => IMStatusCode::UnsupportedCluster,
78            ErrorCode::AttributeNotFound => IMStatusCode::UnsupportedAttribute,
79            ErrorCode::CommandNotFound => IMStatusCode::UnsupportedCommand,
80            ErrorCode::InvalidAction => IMStatusCode::InvalidAction,
81            ErrorCode::InvalidCommand => IMStatusCode::InvalidCommand,
82            ErrorCode::UnsupportedAccess => IMStatusCode::UnsupportedAccess,
83            ErrorCode::Busy => IMStatusCode::Busy,
84            ErrorCode::DataVersionMismatch => IMStatusCode::DataVersionMismatch,
85            ErrorCode::ResourceExhausted => IMStatusCode::ResourceExhausted,
86            _ => IMStatusCode::Failure,
87        }
88    }
89}
90
91impl From<Error> for IMStatusCode {
92    fn from(value: Error) -> Self {
93        Self::from(value.code())
94    }
95}
96
97impl FromTLV<'_> for IMStatusCode {
98    fn from_tlv(t: &TLVElement) -> Result<Self, Error> {
99        FromPrimitive::from_u16(t.u16()?).ok_or_else(|| ErrorCode::Invalid.into())
100    }
101}
102
103impl ToTLV for IMStatusCode {
104    fn to_tlv(&self, tw: &mut TLVWriter, tag_type: TagType) -> Result<(), Error> {
105        tw.u16(tag_type, *self as u16)
106    }
107}
108
109#[derive(FromPrimitive, Debug, Copy, Clone, Eq, PartialEq)]
110pub enum OpCode {
111    Reserved = 0,
112    StatusResponse = 1,
113    ReadRequest = 2,
114    SubscribeRequest = 3,
115    SubscribeResponse = 4,
116    ReportData = 5,
117    WriteRequest = 6,
118    WriteResponse = 7,
119    InvokeRequest = 8,
120    InvokeResponse = 9,
121    TimedRequest = 10,
122}
123
124/* Interaction Model ID as per the Matter Spec */
125pub const PROTO_ID_INTERACTION_MODEL: u16 = 0x01;
126
127// This is the amount of space we reserve for other things to be attached towards
128// the end of long reads.
129const LONG_READS_TLV_RESERVE_SIZE: usize = 24;
130
131impl<'a> ReadReq<'a> {
132    pub fn tx_start<'r, 'p>(&self, tx: &'r mut Packet<'p>) -> Result<TLVWriter<'r, 'p>, Error> {
133        tx.reset();
134        tx.set_proto_id(PROTO_ID_INTERACTION_MODEL);
135        tx.set_proto_opcode(OpCode::ReportData as u8);
136
137        let mut tw = Self::reserve_long_read_space(tx)?;
138
139        tw.start_struct(TagType::Anonymous)?;
140
141        if self.attr_requests.is_some() {
142            tw.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?;
143        }
144
145        Ok(tw)
146    }
147
148    pub fn tx_finish_chunk(&self, tx: &mut Packet) -> Result<(), Error> {
149        self.complete(tx, true)
150    }
151
152    pub fn tx_finish(&self, tx: &mut Packet) -> Result<(), Error> {
153        self.complete(tx, false)
154    }
155
156    fn complete(&self, tx: &mut Packet<'_>, more_chunks: bool) -> Result<(), Error> {
157        let mut tw = Self::restore_long_read_space(tx)?;
158
159        if self.attr_requests.is_some() {
160            tw.end_container()?;
161        }
162
163        if more_chunks {
164            tw.bool(
165                TagType::Context(msg::ReportDataTag::MoreChunkedMsgs as u8),
166                true,
167            )?;
168        }
169
170        tw.bool(
171            TagType::Context(msg::ReportDataTag::SupressResponse as u8),
172            !more_chunks,
173        )?;
174
175        tw.end_container()
176    }
177
178    fn reserve_long_read_space<'p, 'b>(tx: &'p mut Packet<'b>) -> Result<TLVWriter<'p, 'b>, Error> {
179        let wb = tx.get_writebuf()?;
180        wb.shrink(LONG_READS_TLV_RESERVE_SIZE)?;
181
182        Ok(TLVWriter::new(wb))
183    }
184
185    fn restore_long_read_space<'p, 'b>(tx: &'p mut Packet<'b>) -> Result<TLVWriter<'p, 'b>, Error> {
186        let wb = tx.get_writebuf()?;
187        wb.expand(LONG_READS_TLV_RESERVE_SIZE)?;
188
189        Ok(TLVWriter::new(wb))
190    }
191}
192
193impl<'a> WriteReq<'a> {
194    pub fn tx_start<'r, 'p>(
195        &self,
196        tx: &'r mut Packet<'p>,
197        epoch: Epoch,
198        timeout: Option<Duration>,
199    ) -> Result<Option<TLVWriter<'r, 'p>>, Error> {
200        if has_timed_out(epoch, timeout) {
201            Interaction::status_response(tx, IMStatusCode::Timeout)?;
202
203            Ok(None)
204        } else {
205            tx.reset();
206            tx.set_proto_id(PROTO_ID_INTERACTION_MODEL);
207            tx.set_proto_opcode(OpCode::WriteResponse as u8);
208
209            let mut tw = TLVWriter::new(tx.get_writebuf()?);
210
211            tw.start_struct(TagType::Anonymous)?;
212            tw.start_array(TagType::Context(msg::WriteRespTag::WriteResponses as u8))?;
213
214            Ok(Some(tw))
215        }
216    }
217
218    pub fn tx_finish(&self, tx: &mut Packet<'_>) -> Result<(), Error> {
219        let mut tw = TLVWriter::new(tx.get_writebuf()?);
220
221        tw.end_container()?;
222        tw.end_container()
223    }
224}
225
226impl<'a> InvReq<'a> {
227    pub fn tx_start<'r, 'p>(
228        &self,
229        tx: &'r mut Packet<'p>,
230        epoch: Epoch,
231        timeout: Option<Duration>,
232    ) -> Result<Option<TLVWriter<'r, 'p>>, Error> {
233        if has_timed_out(epoch, timeout) {
234            Interaction::status_response(tx, IMStatusCode::Timeout)?;
235
236            Ok(None)
237        } else {
238            let timed_tx = timeout.map(|_| true);
239            let timed_request = self.timed_request.filter(|a| *a);
240
241            // Either both should be None, or both should be Some(true)
242            if timed_tx != timed_request {
243                Interaction::status_response(tx, IMStatusCode::TimedRequestMisMatch)?;
244
245                Ok(None)
246            } else {
247                tx.reset();
248                tx.set_proto_id(PROTO_ID_INTERACTION_MODEL);
249                tx.set_proto_opcode(OpCode::InvokeResponse as u8);
250
251                let mut tw = TLVWriter::new(tx.get_writebuf()?);
252
253                tw.start_struct(TagType::Anonymous)?;
254
255                // Suppress Response -> TODO: Need to revisit this for cases where we send a command back
256                tw.bool(
257                    TagType::Context(msg::InvRespTag::SupressResponse as u8),
258                    false,
259                )?;
260
261                if self.inv_requests.is_some() {
262                    tw.start_array(TagType::Context(msg::InvRespTag::InvokeResponses as u8))?;
263                }
264
265                Ok(Some(tw))
266            }
267        }
268    }
269
270    pub fn tx_finish(&self, tx: &mut Packet<'_>) -> Result<(), Error> {
271        let mut tw = TLVWriter::new(tx.get_writebuf()?);
272
273        if self.inv_requests.is_some() {
274            tw.end_container()?;
275        }
276
277        tw.end_container()
278    }
279}
280
281impl TimedReq {
282    pub fn timeout(&self, epoch: Epoch) -> Duration {
283        epoch()
284            .checked_add(Duration::from_millis(self.timeout as _))
285            .unwrap()
286    }
287
288    pub fn tx_process(self, tx: &mut Packet<'_>, epoch: Epoch) -> Result<Duration, Error> {
289        Interaction::status_response(tx, IMStatusCode::Success)?;
290
291        Ok(epoch()
292            .checked_add(Duration::from_millis(self.timeout as _))
293            .unwrap())
294    }
295}
296
297impl<'a> SubscribeReq<'a> {
298    pub fn tx_start<'r, 'p>(
299        &self,
300        tx: &'r mut Packet<'p>,
301        subscription_id: u32,
302    ) -> Result<TLVWriter<'r, 'p>, Error> {
303        tx.reset();
304        tx.set_proto_id(PROTO_ID_INTERACTION_MODEL);
305        tx.set_proto_opcode(OpCode::ReportData as u8);
306
307        let mut tw = ReadReq::reserve_long_read_space(tx)?;
308
309        tw.start_struct(TagType::Anonymous)?;
310
311        tw.u32(
312            TagType::Context(msg::ReportDataTag::SubscriptionId as u8),
313            subscription_id,
314        )?;
315
316        if self.attr_requests.is_some() {
317            tw.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?;
318        }
319
320        Ok(tw)
321    }
322
323    pub fn tx_finish_chunk(&self, tx: &mut Packet<'_>, more_chunks: bool) -> Result<(), Error> {
324        let mut tw = ReadReq::restore_long_read_space(tx)?;
325
326        if self.attr_requests.is_some() {
327            tw.end_container()?;
328        }
329
330        if more_chunks {
331            tw.bool(
332                TagType::Context(msg::ReportDataTag::MoreChunkedMsgs as u8),
333                true,
334            )?;
335        }
336
337        tw.bool(
338            TagType::Context(msg::ReportDataTag::SupressResponse as u8),
339            false,
340        )?;
341
342        tw.end_container()
343    }
344
345    pub fn tx_process_final(&self, tx: &mut Packet, subscription_id: u32) -> Result<(), Error> {
346        tx.reset();
347        tx.set_proto_id(PROTO_ID_INTERACTION_MODEL);
348        tx.set_proto_opcode(OpCode::SubscribeResponse as u8);
349
350        let mut tw = TLVWriter::new(tx.get_writebuf()?);
351
352        let resp = SubscribeResp::new(subscription_id, 40);
353        resp.to_tlv(&mut tw, TagType::Anonymous)
354    }
355}
356
357pub struct ReadDriver<'a, 'r, 'p> {
358    exchange: &'r mut Exchange<'a>,
359    tx: &'r mut Packet<'p>,
360    rx: &'r mut Packet<'p>,
361    completed: bool,
362}
363
364impl<'a, 'r, 'p> ReadDriver<'a, 'r, 'p> {
365    fn new(exchange: &'r mut Exchange<'a>, tx: &'r mut Packet<'p>, rx: &'r mut Packet<'p>) -> Self {
366        Self {
367            exchange,
368            tx,
369            rx,
370            completed: false,
371        }
372    }
373
374    fn start(&mut self, req: &ReadReq) -> Result<(), Error> {
375        req.tx_start(self.tx)?;
376
377        Ok(())
378    }
379
380    pub fn accessor(&self) -> Result<Accessor<'a>, Error> {
381        self.exchange.accessor()
382    }
383
384    pub fn writer(&mut self) -> Result<TLVWriter<'_, 'p>, Error> {
385        if self.completed {
386            Err(ErrorCode::Invalid.into()) // TODO
387        } else {
388            Ok(TLVWriter::new(self.tx.get_writebuf()?))
389        }
390    }
391
392    pub async fn send_chunk(&mut self, req: &ReadReq<'_>) -> Result<bool, Error> {
393        req.tx_finish_chunk(self.tx)?;
394
395        if exchange_confirm(self.exchange, self.tx, self.rx).await? != IMStatusCode::Success {
396            self.completed = true;
397            Ok(false)
398        } else {
399            req.tx_start(self.tx)?;
400
401            Ok(true)
402        }
403    }
404
405    pub async fn complete(&mut self, req: &ReadReq<'_>) -> Result<(), Error> {
406        req.tx_finish(self.tx)?;
407
408        self.exchange.send_complete(self.tx).await
409    }
410}
411
412pub struct WriteDriver<'a, 'r, 'p> {
413    exchange: &'r mut Exchange<'a>,
414    tx: &'r mut Packet<'p>,
415    epoch: Epoch,
416    timeout: Option<Duration>,
417}
418
419impl<'a, 'r, 'p> WriteDriver<'a, 'r, 'p> {
420    fn new(
421        exchange: &'r mut Exchange<'a>,
422        epoch: Epoch,
423        timeout: Option<Duration>,
424        tx: &'r mut Packet<'p>,
425    ) -> Self {
426        Self {
427            exchange,
428            tx,
429            epoch,
430            timeout,
431        }
432    }
433
434    async fn start(&mut self, req: &WriteReq<'_>) -> Result<bool, Error> {
435        if req.tx_start(self.tx, self.epoch, self.timeout)?.is_some() {
436            Ok(true)
437        } else {
438            self.exchange.send_complete(self.tx).await?;
439
440            Ok(false)
441        }
442    }
443
444    pub fn accessor(&self) -> Result<Accessor<'a>, Error> {
445        self.exchange.accessor()
446    }
447
448    pub fn writer(&mut self) -> Result<TLVWriter<'_, 'p>, Error> {
449        Ok(TLVWriter::new(self.tx.get_writebuf()?))
450    }
451
452    pub async fn complete(&mut self, req: &WriteReq<'_>) -> Result<(), Error> {
453        if !req.supress_response.unwrap_or_default() {
454            req.tx_finish(self.tx)?;
455            self.exchange.send_complete(self.tx).await?;
456        }
457
458        Ok(())
459    }
460}
461
462pub struct InvokeDriver<'a, 'r, 'p> {
463    exchange: &'r mut Exchange<'a>,
464    tx: &'r mut Packet<'p>,
465    epoch: Epoch,
466    timeout: Option<Duration>,
467}
468
469impl<'a, 'r, 'p> InvokeDriver<'a, 'r, 'p> {
470    fn new(
471        exchange: &'r mut Exchange<'a>,
472        epoch: Epoch,
473        timeout: Option<Duration>,
474        tx: &'r mut Packet<'p>,
475    ) -> Self {
476        Self {
477            exchange,
478            tx,
479            epoch,
480            timeout,
481        }
482    }
483
484    async fn start(&mut self, req: &InvReq<'_>) -> Result<bool, Error> {
485        if req.tx_start(self.tx, self.epoch, self.timeout)?.is_some() {
486            Ok(true)
487        } else {
488            self.exchange.send_complete(self.tx).await?;
489
490            Ok(false)
491        }
492    }
493
494    pub fn accessor(&self) -> Result<Accessor<'a>, Error> {
495        self.exchange.accessor()
496    }
497
498    pub fn writer(&mut self) -> Result<TLVWriter<'_, 'p>, Error> {
499        Ok(TLVWriter::new(self.tx.get_writebuf()?))
500    }
501
502    pub fn writer_exchange(&mut self) -> Result<(TLVWriter<'_, 'p>, &Exchange<'a>), Error> {
503        Ok((TLVWriter::new(self.tx.get_writebuf()?), (self.exchange)))
504    }
505
506    pub async fn complete(&mut self, req: &InvReq<'_>) -> Result<(), Error> {
507        if !req.suppress_response.unwrap_or_default() {
508            req.tx_finish(self.tx)?;
509            self.exchange.send_complete(self.tx).await?;
510        }
511
512        Ok(())
513    }
514}
515
516pub struct SubscribeDriver<'a, 'r, 'p> {
517    exchange: &'r mut Exchange<'a>,
518    tx: &'r mut Packet<'p>,
519    rx: &'r mut Packet<'p>,
520    subscription_id: u32,
521    completed: bool,
522}
523
524impl<'a, 'r, 'p> SubscribeDriver<'a, 'r, 'p> {
525    fn new(
526        exchange: &'r mut Exchange<'a>,
527        subscription_id: u32,
528        tx: &'r mut Packet<'p>,
529        rx: &'r mut Packet<'p>,
530    ) -> Self {
531        Self {
532            exchange,
533            tx,
534            rx,
535            subscription_id,
536            completed: false,
537        }
538    }
539
540    fn start(&mut self, req: &SubscribeReq) -> Result<(), Error> {
541        req.tx_start(self.tx, self.subscription_id)?;
542
543        Ok(())
544    }
545
546    pub fn accessor(&self) -> Result<Accessor<'a>, Error> {
547        self.exchange.accessor()
548    }
549
550    pub fn writer(&mut self) -> Result<TLVWriter<'_, 'p>, Error> {
551        if self.completed {
552            Err(ErrorCode::Invalid.into()) // TODO
553        } else {
554            Ok(TLVWriter::new(self.tx.get_writebuf()?))
555        }
556    }
557
558    pub async fn send_chunk(&mut self, req: &SubscribeReq<'_>) -> Result<bool, Error> {
559        req.tx_finish_chunk(self.tx, true)?;
560
561        if exchange_confirm(self.exchange, self.tx, self.rx).await? != IMStatusCode::Success {
562            self.completed = true;
563            Ok(false)
564        } else {
565            req.tx_start(self.tx, self.subscription_id)?;
566
567            Ok(true)
568        }
569    }
570
571    pub async fn complete(&mut self, req: &SubscribeReq<'_>) -> Result<(), Error> {
572        if !self.completed {
573            req.tx_finish_chunk(self.tx, false)?;
574
575            if exchange_confirm(self.exchange, self.tx, self.rx).await? != IMStatusCode::Success {
576                self.completed = true;
577            } else {
578                req.tx_process_final(self.tx, self.subscription_id)?;
579                self.exchange.send_complete(self.tx).await?;
580            }
581        }
582
583        Ok(())
584    }
585}
586
587pub enum Interaction<'a, 'r, 'p> {
588    Read {
589        req: ReadReq<'r>,
590        driver: ReadDriver<'a, 'r, 'p>,
591    },
592    Write {
593        req: WriteReq<'r>,
594        driver: WriteDriver<'a, 'r, 'p>,
595    },
596    Invoke {
597        req: InvReq<'r>,
598        driver: InvokeDriver<'a, 'r, 'p>,
599    },
600    Subscribe {
601        req: SubscribeReq<'r>,
602        driver: SubscribeDriver<'a, 'r, 'p>,
603    },
604}
605
606impl<'a, 'r, 'p> Interaction<'a, 'r, 'p> {
607    pub async fn timeout(
608        exchange: &mut Exchange<'_>,
609        rx: &mut Packet<'_>,
610        tx: &mut Packet<'_>,
611    ) -> Result<Option<Duration>, Error> {
612        let epoch = exchange.matter.epoch;
613
614        let mut opcode: OpCode = rx.get_proto_opcode()?;
615
616        let mut timeout = None;
617
618        while opcode == OpCode::TimedRequest {
619            let rx_data = rx.as_slice();
620            let req = TimedReq::from_tlv(&get_root_node_struct(rx_data)?)?;
621
622            timeout = Some(req.tx_process(tx, epoch)?);
623
624            exchange.exchange(tx, rx).await?;
625
626            opcode = rx.get_proto_opcode()?;
627        }
628
629        Ok(timeout)
630    }
631
632    #[inline(always)]
633    pub fn new<S>(
634        exchange: &'r mut Exchange<'a>,
635        rx: &'r Packet<'p>,
636        tx: &'r mut Packet<'p>,
637        rx_status: &'r mut Packet<'p>,
638        subscription_id: S,
639        timeout: Option<Duration>,
640    ) -> Result<Interaction<'a, 'r, 'p>, Error>
641    where
642        S: FnOnce() -> u32,
643    {
644        let epoch = exchange.matter.epoch;
645
646        let opcode = rx.get_proto_opcode()?;
647        let rx_data = rx.as_slice();
648
649        match opcode {
650            OpCode::ReadRequest => {
651                let req = ReadReq::from_tlv(&get_root_node_struct(rx_data)?)?;
652                let driver = ReadDriver::new(exchange, tx, rx_status);
653
654                Ok(Self::Read { req, driver })
655            }
656            OpCode::WriteRequest => {
657                let req = WriteReq::from_tlv(&get_root_node_struct(rx_data)?)?;
658                let driver = WriteDriver::new(exchange, epoch, timeout, tx);
659
660                Ok(Self::Write { req, driver })
661            }
662            OpCode::InvokeRequest => {
663                let req = InvReq::from_tlv(&get_root_node_struct(rx_data)?)?;
664                let driver = InvokeDriver::new(exchange, epoch, timeout, tx);
665
666                Ok(Self::Invoke { req, driver })
667            }
668            OpCode::SubscribeRequest => {
669                let req = SubscribeReq::from_tlv(&get_root_node_struct(rx_data)?)?;
670                let driver = SubscribeDriver::new(exchange, subscription_id(), tx, rx_status);
671
672                Ok(Self::Subscribe { req, driver })
673            }
674            _ => {
675                error!("Opcode not handled: {:?}", opcode);
676                Err(ErrorCode::InvalidOpcode.into())
677            }
678        }
679    }
680
681    pub async fn start(&mut self) -> Result<bool, Error> {
682        let started = match self {
683            Self::Read { req, driver } => {
684                driver.start(req)?;
685                true
686            }
687            Self::Write { req, driver } => driver.start(req).await?,
688            Self::Invoke { req, driver } => driver.start(req).await?,
689            Self::Subscribe { req, driver } => {
690                driver.start(req)?;
691                true
692            }
693        };
694
695        Ok(started)
696    }
697
698    fn status_response(tx: &mut Packet, status: IMStatusCode) -> Result<(), Error> {
699        tx.reset();
700        tx.set_proto_id(PROTO_ID_INTERACTION_MODEL);
701        tx.set_proto_opcode(OpCode::StatusResponse as u8);
702
703        let mut tw = TLVWriter::new(tx.get_writebuf()?);
704
705        let status = StatusResp { status };
706        status.to_tlv(&mut tw, TagType::Anonymous)
707    }
708}
709
710async fn exchange_confirm(
711    exchange: &mut Exchange<'_>,
712    tx: &mut Packet<'_>,
713    rx: &mut Packet<'_>,
714) -> Result<IMStatusCode, Error> {
715    exchange.exchange(tx, rx).await?;
716
717    let opcode: OpCode = rx.get_proto_opcode()?;
718
719    if opcode == OpCode::StatusResponse {
720        let resp = StatusResp::from_tlv(&get_root_node_struct(rx.as_slice())?)?;
721        Ok(resp.status)
722    } else {
723        Interaction::status_response(tx, IMStatusCode::Busy)?; // TODO
724
725        exchange.send_complete(tx).await?;
726
727        Err(ErrorCode::Invalid.into()) // TODO
728    }
729}
730
731fn has_timed_out(epoch: Epoch, timeout: Option<Duration>) -> bool {
732    timeout.map(|timeout| epoch() > timeout).unwrap_or(false)
733}