use either::Either;
pub use super::{AttrId, ClusterId, EndptId};
use crate::error::{Error, ErrorCode};
use crate::tlv::{FromTLV, TLVBuilderParent, TLVElement, TLVTag, TLVWrite, TagType, ToTLV};
use crate::transport::exchange::{Exchange, OwnedSender, OwnedSenderTx};
use super::{
IMStatusCode, InvReqBuilder, InvokeResp, OpCode, ReadReqBuilder, ReportDataResp, StatusResp,
SubscribeReqBuilder, SubscribeResp, TimedReq, WriteReqBuilder, WriteResp, IM_REVISION,
};
pub trait ImClient<'a>: Sized + Into<Exchange<'a>> {
async fn read_with<B>(self, mut build: B) -> Result<ReadRespChunk<'a>, Error>
where
B: FnMut(ReadReqBuilder<ReadSender<'a>>) -> Result<ReadSender<'a>, Error>,
{
let mut sender = self.read_sender().await?;
loop {
match sender.tx().await? {
TxOutcome::BuildRequest(builder) => {
sender = build(builder)?;
}
TxOutcome::GotResponse(chunk) => return Ok(chunk),
}
}
}
async fn read_sender(self) -> Result<ReadSender<'a>, Error> {
let exchange: Exchange<'a> = self.into();
let sender = exchange.into_sender()?;
Ok(ReadSender {
state: ReadSenderState::Ready(sender),
})
}
async fn write_with<B>(
self,
timed_timeout_ms: Option<u16>,
mut build: B,
) -> Result<WriteRespHandle<'a>, Error>
where
B: FnMut(WriteReqBuilder<WriteSender<'a>>) -> Result<WriteSender<'a>, Error>,
{
let mut sender = self.write_sender(timed_timeout_ms).await?;
loop {
match sender.tx().await? {
TxOutcome::BuildRequest(builder) => {
sender = build(builder)?;
}
TxOutcome::GotResponse(handle) => return Ok(handle),
}
}
}
async fn write_sender(self, timed_timeout_ms: Option<u16>) -> Result<WriteSender<'a>, Error> {
let mut exchange: Exchange<'a> = self.into();
if let Some(timeout_ms) = timed_timeout_ms {
send_timed_request(&mut exchange, timeout_ms).await?;
}
let sender = exchange.into_sender()?;
Ok(WriteSender {
state: WriteSenderState::Ready(sender),
})
}
async fn invoke_with<B>(
self,
timed_timeout_ms: Option<u16>,
mut build: B,
) -> Result<InvokeRespChunk<'a>, Error>
where
B: FnMut(InvReqBuilder<InvokeSender<'a>>) -> Result<InvokeSender<'a>, Error>,
{
let mut sender = self.invoke_sender(timed_timeout_ms).await?;
loop {
match sender.tx().await? {
TxOutcome::BuildRequest(builder) => {
sender = build(builder)?;
}
TxOutcome::GotResponse(chunk) => return Ok(chunk),
}
}
}
async fn invoke_sender(self, timed_timeout_ms: Option<u16>) -> Result<InvokeSender<'a>, Error> {
let mut exchange: Exchange<'a> = self.into();
if let Some(timeout_ms) = timed_timeout_ms {
send_timed_request(&mut exchange, timeout_ms).await?;
}
let sender = exchange.into_sender()?;
Ok(InvokeSender {
state: InvokeSenderState::Ready(sender),
})
}
async fn subscribe_with<B>(self, mut build: B) -> Result<SubscribePrimingChunk<'a>, Error>
where
B: FnMut(SubscribeReqBuilder<SubscribeSender<'a>>) -> Result<SubscribeSender<'a>, Error>,
{
let mut sender = self.subscribe_sender().await?;
loop {
match sender.tx().await? {
TxOutcome::BuildRequest(builder) => {
sender = build(builder)?;
}
TxOutcome::GotResponse(chunk) => return Ok(chunk),
}
}
}
async fn subscribe_sender(self) -> Result<SubscribeSender<'a>, Error> {
let exchange: Exchange<'a> = self.into();
let sender = exchange.into_sender()?;
Ok(SubscribeSender {
state: SubscribeSenderState::Ready(sender),
})
}
}
impl<'a> ImClient<'a> for Exchange<'a> {}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum TxOutcome<F, S> {
BuildRequest(F),
GotResponse(S),
}
pub struct ReadSender<'a> {
state: ReadSenderState<'a>,
}
enum ReadSenderState<'a> {
Ready(OwnedSender<'a>),
Slot(ReadSenderSlot<'a>),
}
impl<'a> ReadSender<'a> {
pub async fn tx(
mut self,
) -> Result<TxOutcome<ReadReqBuilder<ReadSender<'a>>, ReadRespChunk<'a>>, Error> {
let sender = match self.state {
ReadSenderState::Slot(slot) => slot.commit()?,
ReadSenderState::Ready(s) => s,
};
match sender.tx().await? {
Either::Left(tx) => {
self.state = ReadSenderState::Slot(ReadSenderSlot { tx, cursor: 0 });
let builder = ReadReqBuilder::new(self, &TLVTag::Anonymous)?;
Ok(TxOutcome::BuildRequest(builder))
}
Either::Right(exchange) => Ok(TxOutcome::GotResponse(
ReadRespChunk::receive(exchange).await?,
)),
}
}
}
impl<'a> TLVBuilderParent for ReadSender<'a> {
type Write = ReadSenderSlot<'a>;
fn writer(&mut self) -> &mut Self::Write {
match &mut self.state {
ReadSenderState::Slot(slot) => slot,
ReadSenderState::Ready(_) => panic!(
"ReadSender::writer() called outside the build phase — \
only reachable via a ReadReqBuilder yielded by ReadSender::tx."
),
}
}
}
impl<'a> core::fmt::Debug for ReadSender<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "ReadSender")
}
}
#[cfg(feature = "defmt")]
impl<'a> defmt::Format for ReadSender<'a> {
fn format(&self, f: defmt::Formatter<'_>) {
defmt::write!(f, "ReadSender")
}
}
pub struct ReadSenderSlot<'a> {
tx: OwnedSenderTx<'a>,
cursor: usize,
}
impl<'a> ReadSenderSlot<'a> {
fn commit(self) -> Result<OwnedSender<'a>, Error> {
self.tx.complete(0, self.cursor, OpCode::ReadRequest.into())
}
}
impl<'a> TLVWrite for ReadSenderSlot<'a> {
type Position = usize;
fn write(&mut self, byte: u8) -> Result<(), Error> {
let payload = self.tx.payload();
if self.cursor >= payload.len() {
return Err(ErrorCode::NoSpace.into());
}
payload[self.cursor] = byte;
self.cursor += 1;
Ok(())
}
fn get_tail(&self) -> Self::Position {
self.cursor
}
fn rewind_to(&mut self, pos: Self::Position) {
self.cursor = pos;
}
}
impl<'a> core::fmt::Debug for ReadSenderSlot<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "ReadSenderSlot({})", self.cursor)
}
}
#[cfg(feature = "defmt")]
impl<'a> defmt::Format for ReadSenderSlot<'a> {
fn format(&self, f: defmt::Formatter<'_>) {
defmt::write!(f, "ReadSenderSlot({})", self.cursor)
}
}
pub struct ReadRespChunk<'a> {
exchange: Exchange<'a>,
}
impl<'a> ReadRespChunk<'a> {
async fn receive(mut exchange: Exchange<'a>) -> Result<Self, Error> {
exchange.recv_fetch().await?;
{
let rx = exchange.rx()?;
check_opcode(rx.meta().proto_opcode, OpCode::ReportData)?;
}
Ok(Self { exchange })
}
pub fn response(&self) -> Result<ReportDataResp<'_>, Error> {
let rx = self.exchange.rx()?;
let element = TLVElement::new(rx.payload());
ReportDataResp::from_tlv(&element)
}
pub async fn complete(mut self) -> Result<Option<Self>, Error> {
let (more_chunks, suppress_response) = {
let resp = self.response()?;
(
resp.more_chunks.unwrap_or(false),
resp.suppress_response.unwrap_or(false),
)
};
if more_chunks {
self.exchange
.send_with(|_, wb| {
StatusResp::write(wb, IMStatusCode::Success)?;
Ok(Some(OpCode::StatusResponse.into()))
})
.await?;
self.exchange.recv_fetch().await?;
{
let rx = self.exchange.rx()?;
check_opcode(rx.meta().proto_opcode, OpCode::ReportData)?;
}
Ok(Some(self))
} else {
if !suppress_response {
self.exchange
.send_with(|_, wb| {
StatusResp::write(wb, IMStatusCode::Success)?;
Ok(Some(OpCode::StatusResponse.into()))
})
.await?;
} else {
self.exchange.acknowledge().await?;
}
Ok(None)
}
}
}
pub struct WriteSender<'a> {
state: WriteSenderState<'a>,
}
enum WriteSenderState<'a> {
Ready(OwnedSender<'a>),
Slot(WriteSenderSlot<'a>),
}
impl<'a> WriteSender<'a> {
pub async fn tx(
mut self,
) -> Result<TxOutcome<WriteReqBuilder<WriteSender<'a>>, WriteRespHandle<'a>>, Error> {
let sender = match self.state {
WriteSenderState::Slot(slot) => slot.commit()?,
WriteSenderState::Ready(s) => s,
};
match sender.tx().await? {
Either::Left(tx) => {
self.state = WriteSenderState::Slot(WriteSenderSlot { tx, cursor: 0 });
let builder = WriteReqBuilder::new(self, &TLVTag::Anonymous)?;
Ok(TxOutcome::BuildRequest(builder))
}
Either::Right(exchange) => Ok(TxOutcome::GotResponse(
WriteRespHandle::receive(exchange).await?,
)),
}
}
}
impl<'a> TLVBuilderParent for WriteSender<'a> {
type Write = WriteSenderSlot<'a>;
fn writer(&mut self) -> &mut Self::Write {
match &mut self.state {
WriteSenderState::Slot(slot) => slot,
WriteSenderState::Ready(_) => panic!(
"WriteSender::writer() called outside the build phase — \
only reachable via a WriteReqBuilder yielded by WriteSender::tx."
),
}
}
}
impl<'a> core::fmt::Debug for WriteSender<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "WriteSender")
}
}
#[cfg(feature = "defmt")]
impl<'a> defmt::Format for WriteSender<'a> {
fn format(&self, f: defmt::Formatter<'_>) {
defmt::write!(f, "WriteSender")
}
}
pub struct WriteSenderSlot<'a> {
tx: OwnedSenderTx<'a>,
cursor: usize,
}
impl<'a> WriteSenderSlot<'a> {
fn commit(self) -> Result<OwnedSender<'a>, Error> {
self.tx
.complete(0, self.cursor, OpCode::WriteRequest.into())
}
}
impl<'a> TLVWrite for WriteSenderSlot<'a> {
type Position = usize;
fn write(&mut self, byte: u8) -> Result<(), Error> {
let payload = self.tx.payload();
if self.cursor >= payload.len() {
return Err(ErrorCode::NoSpace.into());
}
payload[self.cursor] = byte;
self.cursor += 1;
Ok(())
}
fn get_tail(&self) -> Self::Position {
self.cursor
}
fn rewind_to(&mut self, pos: Self::Position) {
self.cursor = pos;
}
}
impl<'a> core::fmt::Debug for WriteSenderSlot<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "WriteSenderSlot({})", self.cursor)
}
}
#[cfg(feature = "defmt")]
impl<'a> defmt::Format for WriteSenderSlot<'a> {
fn format(&self, f: defmt::Formatter<'_>) {
defmt::write!(f, "WriteSenderSlot({})", self.cursor)
}
}
pub struct WriteRespHandle<'a> {
exchange: Exchange<'a>,
}
impl<'a> WriteRespHandle<'a> {
async fn receive(mut exchange: Exchange<'a>) -> Result<Self, Error> {
exchange.recv_fetch().await?;
{
let rx = exchange.rx()?;
check_opcode(rx.meta().proto_opcode, OpCode::WriteResponse)?;
}
exchange.acknowledge().await?;
Ok(Self { exchange })
}
pub fn response(&self) -> Result<WriteResp<'_>, Error> {
let rx = self.exchange.rx()?;
WriteResp::from_tlv(&TLVElement::new(rx.payload()))
}
}
pub struct InvokeSender<'a> {
state: InvokeSenderState<'a>,
}
enum InvokeSenderState<'a> {
Ready(OwnedSender<'a>),
Slot(InvokeSenderSlot<'a>),
}
impl<'a> InvokeSender<'a> {
pub async fn tx(
mut self,
) -> Result<TxOutcome<InvReqBuilder<InvokeSender<'a>>, InvokeRespChunk<'a>>, Error> {
let sender = match self.state {
InvokeSenderState::Slot(slot) => slot.commit()?,
InvokeSenderState::Ready(s) => s,
};
match sender.tx().await? {
Either::Left(tx) => {
self.state = InvokeSenderState::Slot(InvokeSenderSlot { tx, cursor: 0 });
let builder = InvReqBuilder::new(self, &TLVTag::Anonymous)?;
Ok(TxOutcome::BuildRequest(builder))
}
Either::Right(exchange) => {
Ok(TxOutcome::GotResponse(
InvokeRespChunk::receive(exchange).await?,
))
}
}
}
}
impl<'a> TLVBuilderParent for InvokeSender<'a> {
type Write = InvokeSenderSlot<'a>;
fn writer(&mut self) -> &mut Self::Write {
match &mut self.state {
InvokeSenderState::Slot(slot) => slot,
InvokeSenderState::Ready(_) => panic!(
"InvokeSender::writer() called outside the build phase \
(state = Ready); only reachable via an InvReqBuilder \
yielded by InvokeSender::tx — see module docs."
),
}
}
}
impl<'a> core::fmt::Debug for InvokeSender<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "InvokeSender")
}
}
#[cfg(feature = "defmt")]
impl<'a> defmt::Format for InvokeSender<'a> {
fn format(&self, f: defmt::Formatter<'_>) {
defmt::write!(f, "InvokeSender")
}
}
pub struct InvokeSenderSlot<'a> {
tx: OwnedSenderTx<'a>,
cursor: usize,
}
impl<'a> InvokeSenderSlot<'a> {
fn commit(self) -> Result<OwnedSender<'a>, Error> {
self.tx
.complete(0, self.cursor, OpCode::InvokeRequest.into())
}
}
impl<'a> TLVWrite for InvokeSenderSlot<'a> {
type Position = usize;
fn write(&mut self, byte: u8) -> Result<(), Error> {
let payload = self.tx.payload();
if self.cursor >= payload.len() {
return Err(ErrorCode::NoSpace.into());
}
payload[self.cursor] = byte;
self.cursor += 1;
Ok(())
}
fn get_tail(&self) -> Self::Position {
self.cursor
}
fn rewind_to(&mut self, pos: Self::Position) {
self.cursor = pos;
}
}
impl<'a> core::fmt::Debug for InvokeSenderSlot<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "InvokeSenderSlot({})", self.cursor)
}
}
#[cfg(feature = "defmt")]
impl<'a> defmt::Format for InvokeSenderSlot<'a> {
fn format(&self, f: defmt::Formatter<'_>) {
defmt::write!(f, "InvokeSenderSlot({})", self.cursor)
}
}
pub struct InvokeRespChunk<'a> {
exchange: Exchange<'a>,
status_only: bool,
}
impl<'a> InvokeRespChunk<'a> {
async fn receive(mut exchange: Exchange<'a>) -> Result<Self, Error> {
exchange.recv_fetch().await?;
let opcode = exchange.rx()?.meta().proto_opcode;
if opcode == OpCode::InvokeResponse as u8 {
Ok(Self {
exchange,
status_only: false,
})
} else if opcode == OpCode::StatusResponse as u8 {
let status = {
let rx = exchange.rx()?;
let element = TLVElement::new(rx.payload());
StatusResp::from_tlv(&element)?.status
};
if status == IMStatusCode::Success {
Ok(Self {
exchange,
status_only: true,
})
} else {
error!("Invoke reply: StatusResponse({:?})", status);
Err(status.to_error_code().unwrap_or(ErrorCode::Failure).into())
}
} else {
Err(ErrorCode::InvalidOpcode.into())
}
}
pub fn is_status_only(&self) -> bool {
self.status_only
}
pub fn response(&self) -> Result<Option<InvokeResp<'_>>, Error> {
if self.status_only {
return Ok(None);
}
let rx = self.exchange.rx()?;
let element = TLVElement::new(rx.payload());
InvokeResp::from_tlv(&element).map(Some)
}
pub async fn complete(mut self) -> Result<Option<Self>, Error> {
if self.status_only {
self.exchange.acknowledge().await?;
return Ok(None);
}
let (more_chunks, suppress_response) = {
let resp = self
.response()?
.expect("status_only checked above; response() must be Some");
(
resp.more_chunks.unwrap_or(false),
resp.suppress_response.unwrap_or(false),
)
};
if more_chunks {
if suppress_response {
send_abort(&mut self.exchange).await?;
return Err(ErrorCode::InvalidData.into());
}
self.exchange
.send_with(|_, wb| {
StatusResp::write(wb, IMStatusCode::Success)?;
Ok(Some(OpCode::StatusResponse.into()))
})
.await?;
self.exchange.recv_fetch().await?;
{
let rx = self.exchange.rx()?;
check_opcode(rx.meta().proto_opcode, OpCode::InvokeResponse)?;
}
Ok(Some(self))
} else {
self.exchange.acknowledge().await?;
Ok(None)
}
}
}
pub struct SubscribeSender<'a> {
state: SubscribeSenderState<'a>,
}
enum SubscribeSenderState<'a> {
Ready(OwnedSender<'a>),
Slot(SubscribeSenderSlot<'a>),
}
impl<'a> SubscribeSender<'a> {
pub async fn tx(
mut self,
) -> Result<TxOutcome<SubscribeReqBuilder<SubscribeSender<'a>>, SubscribePrimingChunk<'a>>, Error>
{
let sender = match self.state {
SubscribeSenderState::Slot(slot) => slot.commit()?,
SubscribeSenderState::Ready(s) => s,
};
match sender.tx().await? {
Either::Left(tx) => {
self.state = SubscribeSenderState::Slot(SubscribeSenderSlot { tx, cursor: 0 });
let builder = SubscribeReqBuilder::new(self, &TLVTag::Anonymous)?;
Ok(TxOutcome::BuildRequest(builder))
}
Either::Right(exchange) => Ok(TxOutcome::GotResponse(
SubscribePrimingChunk::receive(exchange).await?,
)),
}
}
}
impl<'a> TLVBuilderParent for SubscribeSender<'a> {
type Write = SubscribeSenderSlot<'a>;
fn writer(&mut self) -> &mut Self::Write {
match &mut self.state {
SubscribeSenderState::Slot(slot) => slot,
SubscribeSenderState::Ready(_) => panic!(
"SubscribeSender::writer() called outside the build phase — \
only reachable via a SubscribeReqBuilder yielded by SubscribeSender::tx."
),
}
}
}
impl<'a> core::fmt::Debug for SubscribeSender<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "SubscribeSender")
}
}
#[cfg(feature = "defmt")]
impl<'a> defmt::Format for SubscribeSender<'a> {
fn format(&self, f: defmt::Formatter<'_>) {
defmt::write!(f, "SubscribeSender")
}
}
pub struct SubscribeSenderSlot<'a> {
tx: OwnedSenderTx<'a>,
cursor: usize,
}
impl<'a> SubscribeSenderSlot<'a> {
fn commit(self) -> Result<OwnedSender<'a>, Error> {
self.tx
.complete(0, self.cursor, OpCode::SubscribeRequest.into())
}
}
impl<'a> TLVWrite for SubscribeSenderSlot<'a> {
type Position = usize;
fn write(&mut self, byte: u8) -> Result<(), Error> {
let payload = self.tx.payload();
if self.cursor >= payload.len() {
return Err(ErrorCode::NoSpace.into());
}
payload[self.cursor] = byte;
self.cursor += 1;
Ok(())
}
fn get_tail(&self) -> Self::Position {
self.cursor
}
fn rewind_to(&mut self, pos: Self::Position) {
self.cursor = pos;
}
}
impl<'a> core::fmt::Debug for SubscribeSenderSlot<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "SubscribeSenderSlot({})", self.cursor)
}
}
#[cfg(feature = "defmt")]
impl<'a> defmt::Format for SubscribeSenderSlot<'a> {
fn format(&self, f: defmt::Formatter<'_>) {
defmt::write!(f, "SubscribeSenderSlot({})", self.cursor)
}
}
pub struct SubscribePrimingChunk<'a> {
exchange: Exchange<'a>,
}
impl<'a> core::fmt::Debug for SubscribePrimingChunk<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "SubscribePrimingChunk")
}
}
#[cfg(feature = "defmt")]
impl<'a> defmt::Format for SubscribePrimingChunk<'a> {
fn format(&self, f: defmt::Formatter<'_>) {
defmt::write!(f, "SubscribePrimingChunk")
}
}
impl<'a> SubscribePrimingChunk<'a> {
async fn receive(mut exchange: Exchange<'a>) -> Result<Self, Error> {
exchange.recv_fetch().await?;
{
let rx = exchange.rx()?;
check_opcode(rx.meta().proto_opcode, OpCode::ReportData)?;
}
Ok(Self { exchange })
}
pub fn response(&self) -> Result<ReportDataResp<'_>, Error> {
let rx = self.exchange.rx()?;
let element = TLVElement::new(rx.payload());
ReportDataResp::from_tlv(&element)
}
pub async fn complete(mut self) -> Result<SubscribeOutcome<'a>, Error> {
let (more_chunks, suppress_response) = {
let resp = self.response()?;
(
resp.more_chunks.unwrap_or(false),
resp.suppress_response.unwrap_or(false),
)
};
if more_chunks {
if suppress_response {
send_abort(&mut self.exchange).await?;
return Err(ErrorCode::InvalidData.into());
}
self.exchange
.send_with(|_, wb| {
StatusResp::write(wb, IMStatusCode::Success)?;
Ok(Some(OpCode::StatusResponse.into()))
})
.await?;
self.exchange.recv_fetch().await?;
{
let rx = self.exchange.rx()?;
check_opcode(rx.meta().proto_opcode, OpCode::ReportData)?;
}
Ok(SubscribeOutcome::NextChunk(self))
} else {
if !suppress_response {
self.exchange
.send_with(|_, wb| {
StatusResp::write(wb, IMStatusCode::Success)?;
Ok(Some(OpCode::StatusResponse.into()))
})
.await?;
}
self.exchange.recv_fetch().await?;
let opcode = self.exchange.rx()?.meta().proto_opcode;
if opcode == OpCode::SubscribeResponse as u8 {
let (subscription_id, max_int) = {
let rx = self.exchange.rx()?;
let resp = SubscribeResp::from_tlv(&TLVElement::new(rx.payload()))?;
(resp.subs_id, resp.max_int)
};
self.exchange.acknowledge().await?;
Ok(SubscribeOutcome::Established(SubscribeEstablished {
subscription_id,
max_int,
}))
} else if opcode == OpCode::StatusResponse as u8 {
let status = {
let rx = self.exchange.rx()?;
StatusResp::from_tlv(&TLVElement::new(rx.payload()))?.status
};
self.exchange.acknowledge().await?;
error!(
"Subscribe establishment aborted: StatusResponse({:?})",
status
);
Err(status.to_error_code().unwrap_or(ErrorCode::Failure).into())
} else {
Err(ErrorCode::InvalidOpcode.into())
}
}
}
}
#[derive(Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum SubscribeOutcome<'a> {
NextChunk(SubscribePrimingChunk<'a>),
Established(SubscribeEstablished),
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct SubscribeEstablished {
pub subscription_id: u32,
pub max_int: u16,
}
async fn send_timed_request(exchange: &mut Exchange<'_>, timeout_ms: u16) -> Result<(), Error> {
let req = TimedReq {
timeout: timeout_ms,
interaction_model_revision: Some(IM_REVISION),
};
exchange
.send_with(|_, wb| {
req.to_tlv(&TagType::Anonymous, wb)?;
Ok(Some(OpCode::TimedRequest.into()))
})
.await?;
exchange.recv_fetch().await?;
let rx = exchange.rx()?;
check_opcode(rx.meta().proto_opcode, OpCode::StatusResponse)?;
let status_resp = StatusResp::from_tlv(&TLVElement::new(rx.payload()))?;
if status_resp.status != IMStatusCode::Success {
error!("TimedRequest failed with status: {:?}", status_resp.status);
return Err(status_resp
.status
.to_error_code()
.unwrap_or(ErrorCode::Failure)
.into());
}
Ok(())
}
async fn send_abort(exchange: &mut Exchange<'_>) -> Result<(), Error> {
exchange
.send_with(|_, wb| {
StatusResp::write(wb, IMStatusCode::Failure)?;
Ok(Some(OpCode::StatusResponse.into()))
})
.await
}
fn check_opcode(received: u8, expected: OpCode) -> Result<(), Error> {
if received != expected as u8 {
error!(
"Unexpected IM opcode: received {}, expected {:?}",
received, expected
);
Err(ErrorCode::InvalidOpcode.into())
} else {
Ok(())
}
}