use structbuf::Unpacker;
use tracing::{error, trace, warn};
use super::*;
#[derive(Debug)]
pub(super) struct State {
xfer: tokio::sync::mpsc::Receiver<host::Result<AclTransfer>>,
recv: SyncMutex<Receiver>, }
impl State {
#[must_use]
pub fn new(t: &Arc<dyn host::Transport>, acl_data_len: u16) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(1);
tokio::task::spawn(Self::recv_task(Arc::clone(t), acl_data_len, tx));
Self {
xfer: rx,
recv: SyncMutex::new(Receiver::new()),
}
}
#[inline]
pub async fn recv(&mut self) -> Result<()> {
loop {
let xfer = self.xfer.recv().await.unwrap()?;
self.recv.lock().recombine(xfer);
}
}
#[inline]
pub fn register_chan(&self, ch: &Arc<RawChan>) {
self.recv.lock().register_chan(Arc::clone(ch));
}
#[inline]
pub fn remove_chan(&self, cid: LeCid) {
self.recv.lock().remove_chan(cid);
}
async fn recv_task(
t: Arc<dyn host::Transport>,
acl_data_len: u16,
ch: tokio::sync::mpsc::Sender<host::Result<AclTransfer>>,
) {
let alloc = Alloc::new(t, host::Direction::In, acl_data_len);
loop {
let r = tokio::select! {
r = alloc.xfer().submit() => r,
_ = ch.closed() => break,
};
if ch.send(r).await.is_err() {
break;
}
}
}
}
#[derive(Debug)]
pub(super) struct Receiver {
cont: HashMap<LeU, Option<Cid>>,
chans: HashMap<LeCid, Chan>,
}
impl Receiver {
#[inline]
#[must_use]
pub fn new() -> Self {
Self {
cont: HashMap::new(),
chans: HashMap::new(),
}
}
fn register_chan(&mut self, ch: Arc<RawChan>) {
trace!("Adding channel: {}", ch.cid);
self.cont.entry(ch.cid.link).or_default();
assert!(self.chans.insert(ch.cid, Chan::new(ch)).is_none());
}
fn remove_chan(&mut self, cid: LeCid) {
let Some(_) = self.chans.remove(&cid) else { return };
trace!("Removing channel: {}", cid);
if !self.chans.keys().any(|other| other.link == cid.link) {
self.cont.remove(&cid.link); return;
}
let cont = self.cont.get_mut(&cid.link).unwrap();
if *cont == Some(cid.chan) {
*cont = None;
}
}
fn recombine(&mut self, xfer: AclTransfer) {
let pkt = xfer.as_ref();
let Some((link, l2cap_hdr, data)) = parse_hdr(pkt) else { return };
let Some(cont_cid) = self.cont.get_mut(&link) else {
warn!("PDU fragment for an unknown {link}: {pkt:02X?}");
return;
};
if let Some((pdu_len, cid)) = l2cap_hdr {
if let Some(cid) = *cont_cid {
(self.chans.get_mut(&link.chan(cid)).unwrap()).ensure_complete();
*cont_cid = None;
}
if !cid.is_le() {
warn!("PDU fragment for an invalid {cid}: {pkt:02X?}");
return;
}
let cid = link.chan(cid);
let Some(ch) = self.chans.get_mut(&cid) else {
warn!("PDU fragment for an unknown {cid}: {pkt:02X?}");
return;
};
trace!("{cid}: {:02X?}", &pkt[4 + 4..]); ch.first(pdu_len, xfer);
if !ch.buf.is_none() {
*cont_cid = Some(cid.chan);
}
return;
}
let Some(cid) = *cont_cid else {
warn!("Unexpected continuation PDU fragment for {link}: {pkt:02X?}");
return;
};
trace!("Cont. PDU fragment for {cid}: {pkt:02X?}");
let ch = self.chans.get_mut(&link.chan(cid)).unwrap();
ch.cont(data);
if ch.buf.is_none() {
*cont_cid = None;
}
}
}
#[derive(Debug)]
struct Chan {
raw: Arc<RawChan>,
buf: StructBuf,
}
impl Chan {
#[inline]
#[must_use]
pub const fn new(ch: Arc<RawChan>) -> Self {
Self {
raw: ch,
buf: StructBuf::none(),
}
}
#[inline]
fn ensure_complete(&mut self) {
if !self.buf.is_none() {
self.buf = StructBuf::none();
error!("Incomplete PDU for {}", self.raw.cid);
self.raw.set_error();
}
}
pub fn first(&mut self, pdu_len: u16, xfer: AclTransfer) {
self.ensure_complete();
let mut cs = self.raw.state.lock();
let frame_len = L2CAP_HDR + usize::from(pdu_len);
if cs.can_recv(self.raw.cid, frame_len) {
if xfer.as_ref().len() == ACL_HDR + frame_len {
cs.push(self.raw.cid, Frame::complete(xfer));
} else {
self.buf = Frame::first(&xfer, frame_len);
}
}
}
pub fn cont(&mut self, acl_data: &[u8]) {
let mut p = self.buf.append();
if !p.can_put(acl_data.len()) {
error!(
"PDU fragment for {} exceeds expected length ({} > {})",
self.raw.cid,
acl_data.len(),
self.buf.remaining()
);
self.buf = StructBuf::none();
self.raw.set_error();
return;
}
p.put(acl_data);
if self.buf.is_full() {
let buf = Frame::Buf(self.buf.take());
self.raw.state.lock().push(self.raw.cid, buf);
}
}
}
#[allow(clippy::type_complexity)]
#[must_use]
fn parse_hdr(pkt: &[u8]) -> Option<(LeU, Option<(u16, Cid)>, &[u8])> {
let mut p = Unpacker::new(pkt);
let Some(mut hdr) = p.skip(ACL_HDR) else {
error!("ACL data packet with missing header: {pkt:02X?}");
return None;
};
let cn_flag = hdr.u16();
let Some(cn) = hci::ConnHandle::new(cn_flag) else {
error!("ACL data packet for an invalid connection handle: {pkt:02X?}");
return None;
};
if p.len() != usize::from(hdr.u16()) {
error!("ACL data packet length mismatch: {pkt:02X?}");
return None;
}
let is_first = (cn_flag >> hci::ConnHandle::BITS) & 0b11 != 0b01;
let l2cap_hdr = if is_first {
let Some(mut hdr) = p.skip(L2CAP_HDR) else {
error!("ACL data packet with missing L2CAP header: {pkt:02X?}");
return None;
};
let pdu_len = hdr.u16();
if usize::from(pdu_len) < p.len() {
error!("ACL data packet with an invalid PDU length: {pkt:02X?}");
return None;
}
let Some(cid) = Cid::new(hdr.u16()) else {
error!("ACL data packet for an invalid CID: {pkt:02X?}");
return None;
};
Some((pdu_len, cid))
} else if p.is_empty() {
warn!("ACL data packet without payload: {pkt:02X?}");
return None;
} else {
None
};
Some((LeU::new(cn), l2cap_hdr, p.into_inner()))
}