#![cfg_attr(not(any(feature = "std", test)), no_std)]
#![forbid(unsafe_code)]
#![allow(clippy::int_plus_one)]
#![allow(clippy::too_many_arguments)]
pub use heapless::Vec;
use heapless::FnvIndexMap;
use mctp::{Eid, Error, MsgIC, MsgType, Result, Tag, TagValue};
pub mod control;
pub mod fragment;
pub mod i2c;
mod reassemble;
pub mod router;
pub mod serial;
pub mod usb;
#[macro_use]
mod util;
use fragment::{Fragmenter, SendOutput};
use reassemble::Reassembler;
pub use router::Router;
use crate::fmt::*;
pub(crate) use config::*;
const REASSEMBLY_EXPIRY_TIMEOUT: u32 = 6000;
pub const DEFERRED_TIMEOUT: u32 = 6000;
pub const TIMEOUT_INTERVAL: u32 = 100;
pub(crate) const HEADER_LEN: usize = 4;
pub mod config {
pub const MAX_PAYLOAD: usize =
get_build_var!("MCTP_ESTACK_MAX_MESSAGE", 1032);
pub const NUM_RECEIVE: usize = get_build_var!("MCTP_ESTACK_NUM_RECEIVE", 4);
pub const FLOWS: usize = get_build_var!("MCTP_ESTACK_FLOWS", 64);
pub const MAX_MTU: usize = get_build_var!("MCTP_ESTACK_MAX_MTU", 255);
const _: () =
assert!(MAX_MTU >= crate::HEADER_LEN + 1, "MAX_MTU too small");
}
#[derive(Debug)]
struct Flow {
expiry_stamp: Option<EventStamp>,
cookie: Option<AppCookie>,
}
#[derive(Debug, Eq, PartialEq, Clone, Copy, Hash, PartialOrd, Ord)]
pub struct AppCookie(pub usize);
type Header = libmctp::base_packet::MCTPTransportHeader<[u8; HEADER_LEN]>;
#[must_use]
#[derive(Debug)]
pub struct ReceiveHandle(usize);
#[derive(Debug)]
pub struct Stack {
own_eid: Eid,
flows: FnvIndexMap<(Eid, TagValue), Flow, FLOWS>,
reassemblers: [Option<(Reassembler, Vec<u8, MAX_PAYLOAD>)>; NUM_RECEIVE],
now: EventStamp,
next_timeout: u64,
mtu: usize,
next_tag: u8,
next_seq: u8,
}
impl Stack {
pub fn new(own_eid: Eid, mtu: usize, now_millis: u64) -> Self {
let now = EventStamp {
clock: now_millis,
counter: 0,
};
assert!(mtu >= HEADER_LEN + 1);
Self {
own_eid,
now,
next_timeout: 0,
mtu,
flows: Default::default(),
reassemblers: Default::default(),
next_tag: 0,
next_seq: 0,
}
}
fn update_clock(&mut self, now_millis: u64) -> Result<()> {
if now_millis < self.now.clock {
Err(Error::InvalidInput)
} else {
if now_millis > self.now.clock {
self.now.clock = now_millis;
self.now.counter = 0;
} else {
}
Ok(())
}
}
pub fn update(&mut self, now_millis: u64) -> Result<(u64, bool)> {
self.update_clock(now_millis)?;
if let Some(remain) = self.next_timeout.checked_sub(now_millis) {
if remain > 0 {
return Ok((remain, false));
}
}
let mut timeout = TIMEOUT_INTERVAL;
let mut any_expired = false;
for r in self.reassemblers.iter_mut() {
if let Some((re, _buf)) = r {
match re.check_expired(
&self.now,
REASSEMBLY_EXPIRY_TIMEOUT,
DEFERRED_TIMEOUT,
) {
None => {
trace!("Expired");
any_expired = true;
*r = None;
}
Some(t) => timeout = timeout.min(t),
}
}
}
self.flows.retain(|_k, flow| {
match flow.expiry_stamp {
None => true,
Some(stamp) => {
match stamp
.check_timeout(&self.now, REASSEMBLY_EXPIRY_TIMEOUT)
{
None => {
any_expired = true;
false
}
Some(t) => {
timeout = timeout.min(t);
true
}
}
}
}
});
self.next_timeout = timeout as u64 + now_millis;
Ok((timeout as u64, any_expired))
}
pub fn start_send(
&mut self,
dest: Eid,
typ: MsgType,
tag: Option<Tag>,
tag_expires: bool,
ic: MsgIC,
mtu: Option<usize>,
cookie: Option<AppCookie>,
) -> Result<Fragmenter> {
let tag = match tag {
None => {
Tag::Owned(self.set_flow(dest, None, tag_expires, cookie)?)
}
Some(Tag::Owned(tv)) => {
let check =
self.set_flow(dest, Some(tv), tag_expires, cookie)?;
debug_assert!(check == tv);
Tag::Owned(tv)
}
Some(Tag::Unowned(tv)) => Tag::Unowned(tv),
};
let mut frag_mtu = self.mtu;
if let Some(m) = mtu {
frag_mtu = frag_mtu.min(m);
}
self.next_seq = (self.next_seq + 1) & mctp::MCTP_SEQ_MASK;
Fragmenter::new(
typ,
self.own_eid,
dest,
tag,
frag_mtu,
cookie,
ic,
self.next_seq,
)
}
pub fn receive(
&mut self,
packet: &[u8],
) -> Result<Option<(MctpMessage<'_>, ReceiveHandle)>> {
let idx = self.get_reassembler(packet)?;
let (re, buf) = if let Some(r) = &mut self.reassemblers[idx] {
r
} else {
let mut re =
Reassembler::new(self.own_eid, packet, self.now.increment())?;
if !re.tag.is_owner() {
if let Some(f) = self.lookup_flow(re.peer, re.tag.tag()) {
re.set_cookie(f.cookie);
} else {
return Err(Error::Unreachable);
}
}
self.reassemblers[idx].insert((re, Vec::new()))
};
match re.receive(packet, buf, self.now.increment()) {
Ok(Some(_msg)) => {
if !re.tag.is_owner() {
let (peer, tv) = (re.peer, re.tag.tag());
self.remove_flow(peer, tv);
}
let (re, buf) = self.reassemblers[idx].as_mut().unwrap();
let msg = re.message(buf)?;
let handle = re.take_handle(idx);
Ok(Some((msg, handle)))
}
Ok(None) => Ok(None),
Err(e) => {
self.reassemblers[idx] = None;
Err(e)
}
}
}
pub fn fetch_message_with<F>(&mut self, handle: ReceiveHandle, f: F)
where
F: FnOnce(MctpMessage),
{
let m = self.fetch_message(&handle);
f(m);
self.finished_receive(handle);
}
pub fn fetch_message(&mut self, handle: &ReceiveHandle) -> MctpMessage<'_> {
let Some(Some((re, buf))) = self.reassemblers.get_mut(handle.0) else {
unreachable!("Bad ReceiveHandle");
};
let Ok(msg) = re.message(buf) else {
unreachable!("Bad ReceiveHandle");
};
msg
}
pub fn finished_receive(&mut self, handle: ReceiveHandle) {
if let Some(r) = self.reassemblers.get_mut(handle.0) {
if let Some((re, _buf)) = r {
re.return_handle(handle);
*r = None;
return;
}
}
unreachable!("Bad ReceiveHandle");
}
pub fn return_handle(&mut self, handle: ReceiveHandle) {
let (re, _buf) = self.reassemblers[handle.0].as_mut().unwrap();
re.return_handle(handle);
}
pub fn get_deferred(
&mut self,
source: Eid,
tag: Tag,
) -> Option<ReceiveHandle> {
self.done_reassemblers()
.filter(|(_i, re)| re.tag == tag && re.peer == source)
.min_by_key(|(_i, re)| re.stamp)
.map(|(i, re)| re.take_handle(i))
}
pub fn get_deferred_bycookie(
&mut self,
cookies: &[AppCookie],
) -> Option<ReceiveHandle> {
self.done_reassemblers()
.filter(|(_i, re)| {
if let Some(c) = re.cookie {
if cookies.contains(&c) {
return true;
}
}
false
})
.min_by_key(|(_i, re)| re.stamp)
.map(|(i, re)| re.take_handle(i))
}
fn done_reassemblers(
&mut self,
) -> impl Iterator<Item = (usize, &mut Reassembler)> {
self.reassemblers
.iter_mut()
.enumerate()
.filter_map(|(i, r)| {
r.as_mut()
.and_then(|(re, _buf)| re.is_done().then_some((i, re)))
})
}
pub fn set_cookie(
&mut self,
handle: &ReceiveHandle,
cookie: Option<AppCookie>,
) {
let (re, _buf) = self.reassemblers[handle.0].as_mut().unwrap();
re.set_cookie(cookie)
}
pub fn set_eid(&mut self, eid: u8) -> Result<()> {
self.own_eid = Eid::new_normal(eid)
.inspect_err(|_e| warn!("Invalid Set EID {}", eid))?;
info!("Set EID to {}", eid);
Ok(())
}
pub fn eid(&self) -> Eid {
self.own_eid
}
pub fn is_local_dest(&self, packet: &[u8]) -> bool {
Reassembler::is_local_dest(self.own_eid, packet)
}
fn get_reassembler(&mut self, packet: &[u8]) -> Result<usize> {
let pos = self.reassemblers.iter().position(|r| {
r.as_ref()
.is_some_and(|(re, _buf)| re.matches_packet(packet))
});
if let Some(pos) = pos {
return Ok(pos);
}
let pos = self.reassemblers.iter().position(|r| r.is_none());
if let Some(pos) = pos {
return Ok(pos);
}
trace!("out of reassemblers");
Err(Error::NoSpace)
}
fn alloc_tag(&mut self, peer: Eid) -> Option<TagValue> {
let mut used = 0u8;
for (_fpeer, tag) in
self.flows.keys().filter(|(fpeer, _tag)| *fpeer == peer)
{
debug_assert!(tag.0 <= mctp::MCTP_TAG_MAX);
let bit = 1u8 << tag.0;
debug_assert!(used & bit == 0);
used |= bit;
}
let mut tag = None;
self.next_tag = (self.next_tag + 1) & mctp::MCTP_TAG_MAX;
let end = self.next_tag + mctp::MCTP_TAG_MAX;
for t in self.next_tag..=end {
let t = t & mctp::MCTP_TAG_MAX;
let tagmask = 1 << t;
if used & tagmask == 0 {
tag = Some(TagValue(t));
break;
}
}
tag
}
fn new_flow(
&mut self,
peer: Eid,
fixedtag: Option<TagValue>,
flow_expires: bool,
cookie: Option<AppCookie>,
) -> Result<TagValue> {
let tag = fixedtag.or_else(|| self.alloc_tag(peer));
trace!("new flow tag {}", peer);
let Some(tag) = tag else {
return Err(Error::TagUnavailable);
};
let expiry_stamp = flow_expires.then(|| self.now.increment());
let f = Flow {
expiry_stamp,
cookie,
};
let r = self
.flows
.insert((peer, tag), f)
.map_err(|_| Error::TagUnavailable)?;
debug_assert!(r.is_none(), "Duplicate flow insertion");
trace!("new flow {}", peer);
Ok(tag)
}
fn set_flow(
&mut self,
peer: Eid,
tag: Option<TagValue>,
flow_expires: bool,
cookie: Option<AppCookie>,
) -> Result<TagValue> {
trace!("set flow {}", peer);
if let Some(tv) = tag {
if let Some(f) = self.flows.get_mut(&(peer, tv)) {
if f.expiry_stamp.is_some() {
trace!("Can't specify an owned tag that didn't have tag_expires=false");
return Err(Error::BadArgument);
}
if f.cookie != cookie {
trace!("varying app for flow");
}
return Ok(tv);
}
}
self.new_flow(peer, tag, flow_expires, cookie)
}
fn lookup_flow(&self, peer: Eid, tv: TagValue) -> Option<&Flow> {
self.flows.get(&(peer, tv))
}
fn remove_flow(&mut self, peer: Eid, tv: TagValue) {
trace!("remove flow");
let r = self.flows.remove(&(peer, tv));
debug_assert!(r.is_some(), "non-existent remove_flow");
}
pub fn cancel_flow(&mut self, source: Eid, tv: TagValue) -> Result<()> {
trace!("cancel flow {}", source);
let tag = Tag::Unowned(tv);
let mut removed = false;
for r in self.reassemblers.iter_mut() {
if let Some((re, _buf)) = r.as_mut() {
if re.tag == tag && re.peer == source {
if re.handle_taken() {
trace!("Outstanding handle");
return Err(Error::BadArgument);
} else {
*r = None;
removed = true;
}
}
}
}
trace!("removed flow");
let r = self.flows.remove(&(source, tv));
if removed {
debug_assert!(r.is_some());
}
Ok(())
}
}
pub struct MctpMessage<'a> {
pub source: Eid,
pub dest: Eid,
pub tag: Tag,
pub typ: MsgType,
pub ic: MsgIC,
pub payload: &'a [u8],
pub cookie: Option<AppCookie>,
}
impl core::fmt::Debug for MctpMessage<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Mctpmessage")
.field("source", &self.source)
.field("dest", &self.dest)
.field("tag", &self.tag)
.field("typ", &self.typ)
.field("ic", &self.ic)
.field("cookie", &self.cookie)
.field("payload length", &self.payload.len())
.finish_non_exhaustive()
}
}
#[derive(Default, Debug, Ord, PartialOrd, PartialEq, Eq, Copy, Clone)]
pub(crate) struct EventStamp {
pub clock: u64,
pub counter: u32,
}
impl EventStamp {
fn increment(&mut self) -> Self {
self.counter += 1;
Self {
clock: self.clock,
counter: self.counter,
}
}
pub fn check_timeout(&self, now: &EventStamp, timeout: u32) -> Option<u32> {
let Some(elapsed) = now.clock.checked_sub(self.clock) else {
debug_assert!(false, "Timestamp backwards");
return None;
};
let Ok(elapsed) = u32::try_from(elapsed) else {
return None;
};
timeout.checked_sub(elapsed)
}
}
#[cfg(not(any(feature = "log", feature = "defmt")))]
compile_error!("Either log or defmt feature must be enabled");
pub(crate) mod fmt {
#[cfg(feature = "defmt")]
pub use defmt::{debug, error, info, trace, warn};
#[cfg(feature = "log")]
pub use log::{debug, error, info, trace, warn};
}
#[cfg(test)]
mod tests {
}