use core::cell::RefCell;
use core::future::Future;
use core::marker::PhantomData;
use bt_hci::controller::Controller;
use bt_hci::param::{ConnHandle, PhyKind, Status};
use bt_hci::uuid::declarations::{CHARACTERISTIC, PRIMARY_SERVICE};
use bt_hci::uuid::descriptors::CLIENT_CHARACTERISTIC_CONFIGURATION;
use embassy_futures::select::{select, Either};
use embassy_sync::blocking_mutex::raw::{NoopRawMutex, RawMutex};
use embassy_sync::channel::Channel;
use embassy_sync::pubsub::{self, PubSubChannel, WaitResult};
use embassy_time::Duration;
use heapless::Vec;
use crate::att::{
self, Att, AttCfm, AttClient, AttCmd, AttErrorCode, AttReq, AttRsp, AttServer, AttUns, ATT_HANDLE_VALUE_IND,
ATT_HANDLE_VALUE_NTF,
};
use crate::attribute::{AttributeData, Characteristic, Uuid};
use crate::attribute_server::{AttributeServer, DynamicAttributeServer};
use crate::connection::Connection;
#[cfg(feature = "security")]
use crate::connection::SecurityLevel;
use crate::cursor::{ReadCursor, WriteCursor};
use crate::pdu::Pdu;
use crate::prelude::{ConnectionEvent, ConnectionParamsRequest};
#[cfg(feature = "security")]
use crate::security_manager::PassKey;
use crate::types::gatt_traits::{AsGatt, FromGatt, FromGattError};
use crate::types::l2cap::L2capHeader;
#[cfg(feature = "security")]
use crate::BondInformation;
use crate::{config, BleHostError, Error, PacketPool, Stack};
pub enum GattConnectionEvent<'stack, 'server, P: PacketPool> {
Disconnected {
reason: Status,
},
PhyUpdated {
tx_phy: PhyKind,
rx_phy: PhyKind,
},
ConnectionParamsUpdated {
conn_interval: Duration,
peripheral_latency: u16,
supervision_timeout: Duration,
},
RequestConnectionParams(ConnectionParamsRequest),
DataLengthUpdated {
max_tx_octets: u16,
max_tx_time: u16,
max_rx_octets: u16,
max_rx_time: u16,
},
Gatt {
event: GattEvent<'stack, 'server, P>,
},
#[cfg(feature = "security")]
PassKeyDisplay(PassKey),
#[cfg(feature = "security")]
PassKeyConfirm(PassKey),
#[cfg(feature = "security")]
PassKeyInput,
#[cfg(feature = "security")]
PairingComplete {
security_level: SecurityLevel,
bond: Option<BondInformation>,
},
#[cfg(feature = "security")]
PairingFailed(Error),
}
pub struct GattConnection<'stack, 'server, P: PacketPool> {
connection: Connection<'stack, P>,
pub(crate) server: &'server dyn DynamicAttributeServer<P>,
}
impl<P: PacketPool> Drop for GattConnection<'_, '_, P> {
fn drop(&mut self) {
trace!("[gatt {}] disconnecting from server", self.connection.handle().raw());
self.server.disconnect(&self.connection);
}
}
impl<'stack, 'server, P: PacketPool> GattConnection<'stack, 'server, P> {
pub(crate) fn try_new<'values, M: RawMutex, const AT: usize, const CT: usize, const CN: usize>(
connection: Connection<'stack, P>,
server: &'server AttributeServer<'values, M, P, AT, CT, CN>,
) -> Result<Self, Error> {
trace!("[gatt {}] connecting to server", connection.handle().raw());
server.connect(&connection)?;
Ok(Self { connection, server })
}
pub fn pass_key_confirm(&self) -> Result<(), Error> {
self.connection.pass_key_confirm()
}
pub fn pass_key_cancel(&self) -> Result<(), Error> {
self.connection.pass_key_cancel()
}
pub fn pass_key_input(&self, pass_key: u32) -> Result<(), Error> {
self.connection.pass_key_input(pass_key)
}
pub async fn next(&self) -> GattConnectionEvent<'stack, 'server, P> {
match select(self.connection.next(), self.connection.next_gatt()).await {
Either::First(event) => match event {
ConnectionEvent::Disconnected { reason } => GattConnectionEvent::Disconnected { reason },
ConnectionEvent::ConnectionParamsUpdated {
conn_interval,
peripheral_latency,
supervision_timeout,
} => GattConnectionEvent::ConnectionParamsUpdated {
conn_interval,
peripheral_latency,
supervision_timeout,
},
ConnectionEvent::RequestConnectionParams(req) => GattConnectionEvent::RequestConnectionParams(req),
ConnectionEvent::PhyUpdated { tx_phy, rx_phy } => GattConnectionEvent::PhyUpdated { tx_phy, rx_phy },
ConnectionEvent::DataLengthUpdated {
max_tx_octets,
max_tx_time,
max_rx_octets,
max_rx_time,
} => GattConnectionEvent::DataLengthUpdated {
max_tx_octets,
max_tx_time,
max_rx_octets,
max_rx_time,
},
#[cfg(feature = "security")]
ConnectionEvent::PassKeyDisplay(key) => GattConnectionEvent::PassKeyDisplay(key),
#[cfg(feature = "security")]
ConnectionEvent::PassKeyConfirm(key) => GattConnectionEvent::PassKeyConfirm(key),
#[cfg(feature = "security")]
ConnectionEvent::PassKeyInput => GattConnectionEvent::PassKeyInput,
#[cfg(feature = "security")]
ConnectionEvent::PairingComplete { security_level, bond } => {
GattConnectionEvent::PairingComplete { security_level, bond }
}
#[cfg(feature = "security")]
ConnectionEvent::PairingFailed(err) => GattConnectionEvent::PairingFailed(err),
},
Either::Second(data) => GattConnectionEvent::Gatt {
event: GattEvent::new(GattData::new(data, self.connection.clone()), self.server),
},
}
}
pub fn raw(&self) -> &Connection<'stack, P> {
&self.connection
}
}
pub struct GattData<'stack, P: PacketPool> {
pdu: Option<Pdu<P::Packet>>,
connection: Connection<'stack, P>,
}
impl<'stack, P: PacketPool> GattData<'stack, P> {
pub(crate) const fn new(pdu: Pdu<P::Packet>, connection: Connection<'stack, P>) -> Self {
Self {
pdu: Some(pdu),
connection,
}
}
pub fn handle(&self) -> Option<u16> {
match self.incoming() {
AttClient::Request(AttReq::Write { handle, .. }) => Some(handle),
AttClient::Command(AttCmd::Write { handle, .. }) => Some(handle),
AttClient::Request(AttReq::Read { handle }) => Some(handle),
AttClient::Request(AttReq::ReadBlob { handle, .. }) => Some(handle),
_ => None,
}
}
pub fn incoming(&self) -> AttClient<'_> {
let att = unwrap!(Att::decode(self.pdu.as_ref().unwrap().as_ref()));
let Att::Client(client) = att else {
unreachable!("Expected Att::Client, got {:?}", att)
};
client
}
pub async fn reply(self, rsp: AttRsp<'_>) -> Result<(), Error> {
let pdu = assemble(&self.connection, AttServer::Response(rsp))?;
self.connection.send(pdu).await;
Ok(())
}
pub async fn send_unsolicited(connection: &Connection<'_, P>, uns: AttUns<'_>) -> Result<(), Error> {
let pdu = assemble(connection, AttServer::Unsolicited(uns))?;
connection.send(pdu).await;
Ok(())
}
}
pub enum GattEvent<'stack, 'server, P: PacketPool> {
Read(ReadEvent<'stack, 'server, P>),
Write(WriteEvent<'stack, 'server, P>),
Other(OtherEvent<'stack, 'server, P>),
NotAllowed(NotAllowedEvent<'stack, 'server, P>),
}
impl<'stack, 'server, P: PacketPool> GattEvent<'stack, 'server, P> {
pub fn new(data: GattData<'stack, P>, server: &'server dyn DynamicAttributeServer<P>) -> Self {
let att = data.incoming();
let allowed = match &att {
AttClient::Command(AttCmd::Write { handle, .. }) => server.can_write(&data.connection, *handle),
AttClient::Request(req) => match req {
AttReq::Write { handle, .. } | AttReq::PrepareWrite { handle, .. } => {
server.can_write(&data.connection, *handle)
}
AttReq::Read { handle } | AttReq::ReadBlob { handle, .. } => server.can_read(&data.connection, *handle),
AttReq::ReadMultiple { handles } => handles.chunks_exact(2).try_for_each(|handle| {
server.can_read(&data.connection, u16::from_le_bytes(handle.try_into().unwrap()))
}),
_ => Ok(()),
},
_ => Ok(()),
};
if let Err(err) = allowed {
return GattEvent::NotAllowed(NotAllowedEvent { data, err, server });
}
match att {
AttClient::Request(AttReq::Write { .. }) | AttClient::Command(AttCmd::Write { .. }) => {
GattEvent::Write(WriteEvent { data, server })
}
AttClient::Request(AttReq::Read { .. }) | AttClient::Request(AttReq::ReadBlob { .. }) => {
GattEvent::Read(ReadEvent { data, server })
}
_ => GattEvent::Other(OtherEvent { data, server }),
}
}
pub fn accept(self) -> Result<Reply<'stack, P>, Error> {
match self {
Self::Read(e) => e.accept(),
Self::Write(e) => e.accept(),
Self::Other(e) => e.accept(),
Self::NotAllowed(e) => e.accept(),
}
}
pub fn reject(self, err: AttErrorCode) -> Result<Reply<'stack, P>, Error> {
match self {
Self::Read(e) => e.reject(err),
Self::Write(e) => e.reject(err),
Self::Other(e) => e.reject(err),
Self::NotAllowed(e) => e.reject(err),
}
}
pub fn payload(&self) -> &GattData<'stack, P> {
match self {
Self::Read(e) => e.payload(),
Self::Write(e) => e.payload(),
Self::Other(e) => e.payload(),
Self::NotAllowed(e) => e.payload(),
}
}
pub fn into_payload(self) -> GattData<'stack, P> {
match self {
Self::Read(e) => e.into_payload(),
Self::Write(e) => e.into_payload(),
Self::Other(e) => e.into_payload(),
Self::NotAllowed(e) => e.into_payload(),
}
}
}
pub struct ReadEvent<'stack, 'server, P: PacketPool> {
data: GattData<'stack, P>,
server: &'server dyn DynamicAttributeServer<P>,
}
impl<'stack, P: PacketPool> ReadEvent<'stack, '_, P> {
pub fn handle(&self) -> u16 {
unwrap!(self.data.handle())
}
pub fn accept(mut self) -> Result<Reply<'stack, P>, Error> {
process(&mut self.data, self.server, Ok(()))
}
pub fn reject(mut self, err: AttErrorCode) -> Result<Reply<'stack, P>, Error> {
process(&mut self.data, self.server, Err(err))
}
pub fn payload(&self) -> &GattData<'stack, P> {
&self.data
}
pub fn into_payload(mut self) -> GattData<'stack, P> {
GattData {
pdu: self.data.pdu.take(),
connection: self.data.connection.clone(),
}
}
}
impl<P: PacketPool> Drop for ReadEvent<'_, '_, P> {
fn drop(&mut self) {
let _ = process(&mut self.data, self.server, Ok(()));
}
}
pub struct WriteEvent<'stack, 'server, P: PacketPool> {
data: GattData<'stack, P>,
server: &'server dyn DynamicAttributeServer<P>,
}
impl<'stack, P: PacketPool> WriteEvent<'stack, '_, P> {
pub fn handle(&self) -> u16 {
unwrap!(self.data.handle())
}
pub fn data(&self) -> &[u8] {
&self.data.pdu.as_ref().unwrap().as_ref()[3..]
}
pub fn value<T: FromGatt>(&self, _c: &Characteristic<T>) -> Result<T, FromGattError> {
T::from_gatt(self.data())
}
pub fn accept(mut self) -> Result<Reply<'stack, P>, Error> {
process(&mut self.data, self.server, Ok(()))
}
pub fn reject(mut self, err: AttErrorCode) -> Result<Reply<'stack, P>, Error> {
process(&mut self.data, self.server, Err(err))
}
pub fn payload(&self) -> &GattData<'stack, P> {
&self.data
}
pub fn into_payload(mut self) -> GattData<'stack, P> {
GattData {
pdu: self.data.pdu.take(),
connection: self.data.connection.clone(),
}
}
}
impl<P: PacketPool> Drop for WriteEvent<'_, '_, P> {
fn drop(&mut self) {
let _ = process(&mut self.data, self.server, Ok(()));
}
}
pub struct OtherEvent<'stack, 'server, P: PacketPool> {
data: GattData<'stack, P>,
server: &'server dyn DynamicAttributeServer<P>,
}
impl<'stack, P: PacketPool> OtherEvent<'stack, '_, P> {
pub fn accept(mut self) -> Result<Reply<'stack, P>, Error> {
process(&mut self.data, self.server, Ok(()))
}
pub fn reject(mut self, err: AttErrorCode) -> Result<Reply<'stack, P>, Error> {
process(&mut self.data, self.server, Err(err))
}
pub fn payload(&self) -> &GattData<'stack, P> {
&self.data
}
pub fn into_payload(mut self) -> GattData<'stack, P> {
GattData {
pdu: self.data.pdu.take(),
connection: self.data.connection.clone(),
}
}
}
impl<P: PacketPool> Drop for OtherEvent<'_, '_, P> {
fn drop(&mut self) {
let _ = process(&mut self.data, self.server, Ok(()));
}
}
pub struct NotAllowedEvent<'stack, 'server, P: PacketPool> {
data: GattData<'stack, P>,
err: AttErrorCode,
server: &'server dyn DynamicAttributeServer<P>,
}
impl<'stack, P: PacketPool> NotAllowedEvent<'stack, '_, P> {
pub fn handle(&self) -> u16 {
unwrap!(self.data.handle())
}
pub fn accept(mut self) -> Result<Reply<'stack, P>, Error> {
process(&mut self.data, self.server, Err(self.err))
}
pub fn reject(mut self, err: AttErrorCode) -> Result<Reply<'stack, P>, Error> {
process(&mut self.data, self.server, Err(err))
}
pub fn payload(&self) -> &GattData<'stack, P> {
&self.data
}
pub fn into_payload(mut self) -> GattData<'stack, P> {
GattData {
pdu: self.data.pdu.take(),
connection: self.data.connection.clone(),
}
}
}
impl<P: PacketPool> Drop for NotAllowedEvent<'_, '_, P> {
fn drop(&mut self) {
let _ = process(&mut self.data, self.server, Err(self.err));
}
}
fn process<'stack, P>(
data: &mut GattData<'stack, P>,
server: &dyn DynamicAttributeServer<P>,
result: Result<(), AttErrorCode>,
) -> Result<Reply<'stack, P>, Error>
where
P: PacketPool,
{
if let Some(pdu) = data.pdu.take() {
let res = match result {
Ok(_) => process_accept(&pdu, &data.connection, server),
Err(code) => process_reject(&pdu, &data.connection, code),
};
res
} else {
Ok(Reply::new(data.connection.clone(), None))
}
}
fn process_accept<'stack, P>(
pdu: &Pdu<P::Packet>,
connection: &Connection<'stack, P>,
server: &dyn DynamicAttributeServer<P>,
) -> Result<Reply<'stack, P>, Error>
where
P: PacketPool,
{
let att = unwrap!(Att::decode(pdu.as_ref()));
let Att::Client(att) = att else {
unreachable!("Expected Att::Client, got {:?}", att)
};
let mut tx = P::allocate().ok_or(Error::OutOfMemory)?;
let mut w = WriteCursor::new(tx.as_mut());
let (mut header, mut data) = w.split(4)?;
let mtu = connection.get_att_mtu() as usize;
let written = {
let buf = data.write_buf();
let limit = buf.len().min(mtu);
server.process(connection, &att, &mut buf[..limit])?
};
if let Some(written) = written {
data.commit(written)?;
header.write(data.len() as u16)?;
header.write(4_u16)?;
let len = header.len() + data.len();
let pdu = Pdu::new(tx, len);
Ok(Reply::new(connection.clone(), Some(pdu)))
} else {
Ok(Reply::new(connection.clone(), None))
}
}
fn process_reject<'stack, P: PacketPool>(
pdu: &Pdu<P::Packet>,
connection: &Connection<'stack, P>,
code: AttErrorCode,
) -> Result<Reply<'stack, P>, Error> {
let att = unwrap!(Att::decode(pdu.as_ref()));
let Att::Client(att) = att else {
unreachable!("Expected Att::Client, got {:?}", att)
};
let handle = match att {
AttClient::Request(AttReq::Write { handle, .. }) => handle,
AttClient::Command(AttCmd::Write { handle, .. }) => handle,
AttClient::Request(AttReq::Read { handle }) => handle,
AttClient::Request(AttReq::ReadBlob { handle, .. }) => handle,
_ => 0, };
let request = pdu.as_ref()[0];
let rsp = AttRsp::Error { request, handle, code };
let pdu = assemble(connection, AttServer::Response(rsp))?;
Ok(Reply::new(connection.clone(), Some(pdu)))
}
pub(crate) fn assemble<'stack, P: PacketPool>(
conn: &Connection<'stack, P>,
att: AttServer<'_>,
) -> Result<Pdu<P::Packet>, Error> {
let mut tx = P::allocate().ok_or(Error::OutOfMemory)?;
let mut w = WriteCursor::new(tx.as_mut());
let (mut header, mut data) = w.split(4)?;
data.write(Att::Server(att))?;
let mtu = conn.get_att_mtu();
data.truncate(mtu as usize);
header.write(data.len() as u16)?;
header.write(4_u16)?;
let len = header.len() + data.len();
Ok(Pdu::new(tx, len))
}
pub struct Reply<'stack, P: PacketPool> {
connection: Connection<'stack, P>,
pdu: Option<Pdu<P::Packet>>,
}
impl<'stack, P: PacketPool> Reply<'stack, P> {
fn new(connection: Connection<'stack, P>, pdu: Option<Pdu<P::Packet>>) -> Self {
Self { connection, pdu }
}
pub fn try_send(mut self) -> Result<(), Error> {
if let Some(pdu) = self.pdu.take() {
self.connection.try_send(pdu)
} else {
Ok(())
}
}
pub async fn send(mut self) {
if let Some(pdu) = self.pdu.take() {
self.connection.send(pdu).await
}
}
}
#[cfg(test)]
impl<'stack, P: PacketPool> Reply<'stack, P> {
fn att_payload(&self) -> Option<&[u8]> {
self.pdu.as_ref().map(|pdu| &pdu.as_ref()[4..])
}
}
impl<P: PacketPool> Drop for Reply<'_, P> {
fn drop(&mut self) {
if let Some(pdu) = self.pdu.take() {
if self.connection.try_send(pdu).is_err() {
warn!("[gatt] error sending reply (outbound buffer full)");
}
}
}
}
pub struct NotificationListener<'lst, const MTU: usize> {
handle: u16,
listener: pubsub::DynSubscriber<'lst, Notification<MTU>>,
}
impl<'lst, const MTU: usize> NotificationListener<'lst, MTU> {
#[allow(clippy::should_implement_trait)]
pub async fn next(&mut self) -> Notification<MTU> {
loop {
if let WaitResult::Message(m) = self.listener.next_message().await {
if m.handle == self.handle {
return m;
}
}
}
}
}
const MAX_NOTIF: usize = config::GATT_CLIENT_NOTIFICATION_MAX_SUBSCRIBERS;
const NOTIF_QSIZE: usize = config::GATT_CLIENT_NOTIFICATION_QUEUE_SIZE;
pub struct GattClient<'reference, T: Controller, P: PacketPool, const MAX_SERVICES: usize> {
known_services: RefCell<Vec<ServiceHandle, MAX_SERVICES>>,
stack: &'reference Stack<'reference, T, P>,
connection: Connection<'reference, P>,
response_channel: Channel<NoopRawMutex, (ConnHandle, Pdu<P::Packet>), 1>,
notifications: PubSubChannel<NoopRawMutex, Notification<512>, NOTIF_QSIZE, MAX_NOTIF, 1>,
}
#[derive(Debug, PartialEq, Clone)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct Notification<const MTU: usize> {
handle: u16,
data: [u8; MTU],
len: usize,
}
impl<const MTU: usize> AsRef<[u8]> for Notification<MTU> {
fn as_ref(&self) -> &[u8] {
&self.data[..self.len]
}
}
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct ServiceHandle {
start: u16,
end: u16,
uuid: Uuid,
}
impl ServiceHandle {
pub fn handle_range(&self) -> core::ops::RangeInclusive<u16> {
self.start..=self.end
}
pub fn uuid(&self) -> Uuid {
self.uuid.clone()
}
}
pub(crate) struct Response<P> {
pdu: Pdu<P>,
handle: ConnHandle,
}
pub(crate) trait Client<'d, E, P: PacketPool> {
fn request(&self, req: AttReq<'_>) -> impl Future<Output = Result<Response<P::Packet>, BleHostError<E>>>;
fn command(&self, cmd: AttCmd<'_>) -> impl Future<Output = Result<(), BleHostError<E>>>;
}
impl<'reference, T: Controller, P: PacketPool, const MAX_SERVICES: usize> Client<'reference, T::Error, P>
for GattClient<'reference, T, P, MAX_SERVICES>
{
async fn request(&self, req: AttReq<'_>) -> Result<Response<P::Packet>, BleHostError<T::Error>> {
let data = Att::Client(AttClient::Request(req));
self.send_att_data(data).await?;
let (h, pdu) = self.response_channel.receive().await;
assert_eq!(h, self.connection.handle());
Ok(Response { handle: h, pdu })
}
async fn command(&self, cmd: AttCmd<'_>) -> Result<(), BleHostError<T::Error>> {
let data = Att::Client(AttClient::Command(cmd));
self.send_att_data(data).await?;
Ok(())
}
}
impl<'reference, T: Controller, P: PacketPool, const MAX_SERVICES: usize> GattClient<'reference, T, P, MAX_SERVICES> {
async fn send_att_data(&self, data: Att<'_>) -> Result<(), BleHostError<T::Error>> {
let header = L2capHeader {
channel: crate::types::l2cap::L2CAP_CID_ATT,
length: data.size() as u16,
};
let mut buf = P::allocate().ok_or(Error::OutOfMemory)?;
let mut w = WriteCursor::new(buf.as_mut());
w.write_hci(&header)?;
w.write(data)?;
let len = w.len();
self.connection.send(Pdu::new(buf, len)).await;
Ok(())
}
}
impl<'reference, C: Controller, P: PacketPool, const MAX_SERVICES: usize> GattClient<'reference, C, P, MAX_SERVICES> {
pub async fn new(
stack: &'reference Stack<'reference, C, P>,
connection: &Connection<'reference, P>,
) -> Result<GattClient<'reference, C, P, MAX_SERVICES>, BleHostError<C::Error>> {
let l2cap = L2capHeader { channel: 4, length: 3 };
let mut buf = P::allocate().ok_or(Error::OutOfMemory)?;
let mut w = WriteCursor::new(buf.as_mut());
w.write_hci(&l2cap)?;
w.write(att::Att::Client(att::AttClient::Request(att::AttReq::ExchangeMtu {
mtu: P::MTU as u16 - 4,
})))?;
let len = w.len();
connection.send(Pdu::new(buf, len)).await;
loop {
let pdu = connection.next_gatt_client().await;
match pdu.as_ref()[0] {
att::ATT_EXCHANGE_MTU_RSP | att::ATT_ERROR_RSP => break,
_ => {
warn!("[gatt] unexpected PDU during MTU exchange, discarding");
}
}
}
Ok(Self {
known_services: RefCell::new(heapless::Vec::new()),
stack,
connection: connection.clone(),
response_channel: Channel::new(),
notifications: PubSubChannel::new(),
})
}
pub async fn services(&self) -> Result<Vec<ServiceHandle, MAX_SERVICES>, BleHostError<C::Error>> {
let mut start: u16 = 0x0001;
let mut result = Vec::new();
loop {
let data = att::AttReq::ReadByGroupType {
start,
end: u16::MAX,
group_type: PRIMARY_SERVICE.into(),
};
let response = self.request(data).await?;
let res = Self::response(response.pdu.as_ref())?;
match res {
AttRsp::Error { request, handle, code } => {
if code == att::AttErrorCode::ATTRIBUTE_NOT_FOUND {
break;
}
return Err(Error::Att(code).into());
}
AttRsp::ReadByGroupType { mut it } => {
let mut end: u16 = 0;
while let Some(res) = it.next() {
let (handle, data) = res?;
let mut r = ReadCursor::new(data);
end = r.read()?;
let uuid = Uuid::try_from(r.remaining())?;
let svc = ServiceHandle {
start: handle,
end,
uuid,
};
result.push(svc.clone()).map_err(|_| Error::InsufficientSpace)?;
let mut known = self.known_services.borrow_mut();
if !known.contains(&svc) {
known.push(svc).map_err(|_| Error::InsufficientSpace)?;
}
}
if end == 0xFFFF {
break;
}
start = end + 1;
}
res => {
trace!("[gatt client] response: {:?}", res);
return Err(Error::UnexpectedGattResponse.into());
}
}
}
Ok(result)
}
pub async fn services_by_uuid(
&self,
uuid: &Uuid,
) -> Result<Vec<ServiceHandle, MAX_SERVICES>, BleHostError<C::Error>> {
let mut start: u16 = 0x0001;
let mut result = Vec::new();
loop {
let data = att::AttReq::FindByTypeValue {
start_handle: start,
end_handle: 0xffff,
att_type: PRIMARY_SERVICE.into(),
att_value: uuid.as_raw(),
};
let response = self.request(data).await?;
let res = Self::response(response.pdu.as_ref())?;
match res {
AttRsp::Error { request, handle, code } => {
if code == att::AttErrorCode::ATTRIBUTE_NOT_FOUND {
break;
}
return Err(Error::Att(code).into());
}
AttRsp::FindByTypeValue { mut it } => {
let mut end: u16 = 0;
while let Some(res) = it.next() {
let (handle, e) = res?;
end = e;
let svc = ServiceHandle {
start: handle,
end,
uuid: uuid.clone(),
};
result.push(svc.clone()).map_err(|_| Error::InsufficientSpace)?;
let mut known = self.known_services.borrow_mut();
if !known.contains(&svc) {
known.push(svc).map_err(|_| Error::InsufficientSpace)?;
}
}
if end == 0xFFFF {
break;
}
start = end + 1;
}
res => {
trace!("[gatt client] response: {:?}", res);
return Err(Error::UnexpectedGattResponse.into());
}
}
}
Ok(result)
}
pub async fn characteristics<const N: usize>(
&self,
service: &ServiceHandle,
) -> Result<Vec<Characteristic<[u8]>, N>, BleHostError<C::Error>> {
let mut start: u16 = service.start;
let mut characteristics = Vec::new();
loop {
let data = att::AttReq::ReadByType {
start,
end: service.end,
attribute_type: CHARACTERISTIC.into(),
};
let response = self.request(data).await?;
match Self::response(response.pdu.as_ref())? {
AttRsp::ReadByType { mut it } => {
while let Some(res) = it.next() {
let (declaration_handle, item) = res?;
if declaration_handle == 0xffff {
return Err(Error::Att(AttErrorCode::INVALID_HANDLE).into());
}
let expected_items_len = 5;
let item_len = item.len();
if item_len < expected_items_len {
return Err(Error::MalformedCharacteristicDeclaration {
expected: expected_items_len,
actual: item_len,
}
.into());
}
let AttributeData::Declaration {
props,
handle,
uuid: decl_uuid,
} = AttributeData::decode_declaration(item)?
else {
unreachable!()
};
characteristics
.push(Characteristic {
handle,
props,
cccd_handle: None,
phantom: PhantomData,
})
.map_err(|_| Error::InsufficientSpace)?;
start = declaration_handle + 1;
}
}
AttRsp::Error { request, handle, code } => match code {
att::AttErrorCode::ATTRIBUTE_NOT_FOUND => break,
_ => return Err(Error::Att(code).into()),
},
_ => return Err(Error::UnexpectedGattResponse.into()),
}
}
let mut iter = characteristics.iter_mut().peekable();
while let Some(characteristic) = iter.next() {
if characteristic.props.has_cccd() {
let end = iter.peek().map(|x| x.handle - 2).unwrap_or(service.end);
characteristic.cccd_handle = match self.get_characteristic_cccd(characteristic.handle + 1, end).await {
Ok(handle) => Some(handle),
Err(BleHostError::BleHost(Error::NotFound)) => None,
Err(err) => return Err(err),
};
}
}
Ok(characteristics)
}
pub async fn characteristic_by_uuid<T: AsGatt + ?Sized>(
&self,
service: &ServiceHandle,
uuid: &Uuid,
) -> Result<Characteristic<T>, BleHostError<C::Error>> {
let mut start: u16 = service.start;
let mut found_indicate_or_notify_uuid = Option::None;
loop {
let data = att::AttReq::ReadByType {
start,
end: service.end,
attribute_type: CHARACTERISTIC.into(),
};
let response = self.request(data).await?;
match Self::response(response.pdu.as_ref())? {
AttRsp::ReadByType { mut it } => {
while let Some(res) = it.next() {
let (handle, item) = res?;
let expected_items_len = 5;
let item_len = item.len();
if item_len < expected_items_len {
return Err(Error::MalformedCharacteristicDeclaration {
expected: expected_items_len,
actual: item_len,
}
.into());
}
if let AttributeData::Declaration {
props,
handle,
uuid: decl_uuid,
} = AttributeData::decode_declaration(item)?
{
if let Some((start_handle, _)) = found_indicate_or_notify_uuid {
return Ok(Characteristic {
handle: start_handle,
cccd_handle: Some(self.get_characteristic_cccd(start_handle, handle).await?),
props,
phantom: PhantomData,
});
}
if *uuid == decl_uuid {
if !props.has_cccd() {
return Ok(Characteristic {
handle,
cccd_handle: None,
props,
phantom: PhantomData,
});
}
found_indicate_or_notify_uuid = Some((handle, props));
}
if handle == 0xFFFF {
return Err(Error::NotFound.into());
}
start = handle + 1;
} else {
return Err(Error::InvalidCharacteristicDeclarationData.into());
}
}
}
AttRsp::Error { request, handle, code } => match code {
att::AttErrorCode::ATTRIBUTE_NOT_FOUND => match found_indicate_or_notify_uuid {
Some((handle, props)) => {
return Ok(Characteristic {
handle,
cccd_handle: Some(self.get_characteristic_cccd(handle, service.end).await?),
props,
phantom: PhantomData,
});
}
None => return Err(Error::NotFound.into()),
},
_ => return Err(Error::Att(code).into()),
},
_ => return Err(Error::UnexpectedGattResponse.into()),
}
}
}
async fn get_characteristic_cccd(
&self,
char_start_handle: u16,
char_end_handle: u16,
) -> Result<u16, BleHostError<C::Error>> {
let mut start_handle = char_start_handle;
while start_handle <= char_end_handle {
let data = att::AttReq::FindInformation {
start_handle,
end_handle: char_end_handle,
};
let response = self.request(data).await?;
match Self::response(response.pdu.as_ref())? {
AttRsp::FindInformation { mut it } => {
while let Some(Ok((handle, uuid))) = it.next() {
if uuid == CLIENT_CHARACTERISTIC_CONFIGURATION.into() {
return Ok(handle);
}
start_handle = handle + 1;
}
}
AttRsp::Error { request, handle, code } => return Err(Error::Att(code).into()),
_ => return Err(Error::UnexpectedGattResponse.into()),
}
}
Err(Error::NotFound.into())
}
pub async fn read_characteristic<T: AsGatt + ?Sized>(
&self,
characteristic: &Characteristic<T>,
dest: &mut [u8],
) -> Result<usize, BleHostError<C::Error>> {
let response = self
.request(att::AttReq::Read {
handle: characteristic.handle,
})
.await?;
match Self::response(response.pdu.as_ref())? {
AttRsp::Read { data } => {
let to_copy = data.len().min(dest.len());
dest[..to_copy].copy_from_slice(&data[..to_copy]);
Ok(to_copy)
}
AttRsp::Error { request, handle, code } => Err(Error::Att(code).into()),
_ => Err(Error::UnexpectedGattResponse.into()),
}
}
pub async fn read_characteristic_long<T: AsGatt + ?Sized>(
&self,
characteristic: &Characteristic<T>,
dest: &mut [u8],
) -> Result<usize, BleHostError<C::Error>> {
let first_read_len = self.read_characteristic(characteristic, dest).await?;
let att_mtu = self.connection.att_mtu() as usize;
if first_read_len != att_mtu - 1 {
return Ok(first_read_len);
}
let mut offset = first_read_len;
loop {
let response = self
.request(att::AttReq::ReadBlob {
handle: characteristic.handle,
offset: offset as u16,
})
.await?;
match Self::response(response.pdu.as_ref())? {
AttRsp::ReadBlob { data } => {
debug!("[read_characteristic_long] Blob read returned {} bytes", data.len());
if data.is_empty() {
break; }
let blob_read_len = data.len();
let len_to_copy = blob_read_len.min(dest.len() - offset);
dest[offset..offset + len_to_copy].copy_from_slice(&data[..len_to_copy]);
offset += len_to_copy;
if blob_read_len < att_mtu - 1 || len_to_copy < blob_read_len {
break;
}
}
AttRsp::Error { code, .. } if code == att::AttErrorCode::INVALID_OFFSET => {
trace!("[read_characteristic_long] Got INVALID_OFFSET, no more data");
break; }
AttRsp::Error { code, .. } if code == att::AttErrorCode::ATTRIBUTE_NOT_LONG => {
trace!("[read_characteristic_long] read_handle_long] Attribute not long, no blob reads needed");
break; }
AttRsp::Error { code, .. } => {
trace!("[read_characteristic] Got error: {:?}", code);
return Err(Error::Att(code).into());
}
_ => return Err(Error::UnexpectedGattResponse.into()),
}
}
Ok(offset)
}
pub async fn read_characteristic_by_uuid(
&self,
service: &ServiceHandle,
uuid: &Uuid,
dest: &mut [u8],
) -> Result<usize, BleHostError<C::Error>> {
let data = att::AttReq::ReadByType {
start: service.start,
end: service.end,
attribute_type: uuid.clone(),
};
let response = self.request(data).await?;
match Self::response(response.pdu.as_ref())? {
AttRsp::ReadByType { mut it } => {
let mut to_copy = 0;
if let Some(item) = it.next() {
let (_handle, data) = item?;
to_copy = data.len().min(dest.len());
dest[..to_copy].copy_from_slice(&data[..to_copy]);
}
Ok(to_copy)
}
AttRsp::Error { request, handle, code } => Err(Error::Att(code).into()),
_ => Err(Error::UnexpectedGattResponse.into()),
}
}
pub async fn write_characteristic<T: AsGatt + ?Sized>(
&self,
handle: &Characteristic<T>,
buf: &[u8],
) -> Result<(), BleHostError<C::Error>> {
let data = att::AttReq::Write {
handle: handle.handle,
data: buf,
};
let response = self.request(data).await?;
match Self::response(response.pdu.as_ref())? {
AttRsp::Write => Ok(()),
AttRsp::Error { request, handle, code } => Err(Error::Att(code).into()),
_ => Err(Error::UnexpectedGattResponse.into()),
}
}
pub async fn write_characteristic_without_response<T: AsGatt + ?Sized>(
&self,
handle: &Characteristic<T>,
buf: &[u8],
) -> Result<(), BleHostError<C::Error>> {
let data = att::AttCmd::Write {
handle: handle.handle,
data: buf,
};
self.command(data).await?;
Ok(())
}
pub async fn subscribe<T: AsGatt + ?Sized>(
&self,
characteristic: &Characteristic<T>,
indication: bool,
) -> Result<NotificationListener<'_, 512>, BleHostError<C::Error>> {
let properties = u16::to_le_bytes(if indication { 0x02 } else { 0x01 });
let data = att::AttReq::Write {
handle: characteristic.cccd_handle.ok_or(Error::NotSupported)?,
data: &properties,
};
let response = self.request(data).await?;
match Self::response(response.pdu.as_ref())? {
AttRsp::Write => match self.notifications.dyn_subscriber() {
Ok(listener) => Ok(NotificationListener {
listener,
handle: characteristic.handle,
}),
Err(embassy_sync::pubsub::Error::MaximumSubscribersReached) => {
Err(Error::GattSubscriberLimitReached.into())
}
Err(_) => Err(Error::Other.into()),
},
AttRsp::Error { request, handle, code } => Err(Error::Att(code).into()),
_ => Err(Error::UnexpectedGattResponse.into()),
}
}
pub async fn unsubscribe<T: AsGatt + ?Sized>(
&self,
characteristic: &Characteristic<T>,
) -> Result<(), BleHostError<C::Error>> {
let properties = u16::to_le_bytes(0);
let data = att::AttReq::Write {
handle: characteristic.cccd_handle.ok_or(Error::NotSupported)?,
data: &[0, 0],
};
let response = self.request(data).await?;
match Self::response(response.pdu.as_ref())? {
AttRsp::Write => Ok(()),
AttRsp::Error { request, handle, code } => Err(Error::Att(code).into()),
_ => Err(Error::UnexpectedGattResponse.into()),
}
}
pub async fn confirm_indication(&self) -> Result<(), BleHostError<C::Error>> {
self.send_att_data(Att::Client(AttClient::Confirmation(AttCfm::ConfirmIndication)))
.await
}
async fn handle_notification_packet(&self, data: &[u8]) -> Result<(), BleHostError<C::Error>> {
let mut r = ReadCursor::new(data);
let value_handle: u16 = r.read()?;
let value_attr = r.remaining();
let handle = value_handle;
let mut data = [0u8; 512];
let to_copy = data.len().min(value_attr.len());
data[..to_copy].copy_from_slice(&value_attr[..to_copy]);
let n = Notification {
handle,
data,
len: to_copy,
};
self.notifications.immediate_publisher().publish_immediate(n);
Ok(())
}
pub async fn task(&self) -> Result<(), BleHostError<C::Error>> {
loop {
let handle = self.connection.handle();
let pdu = self.connection.next_gatt_client().await;
let data = pdu.as_ref();
if matches!(pdu.as_ref()[0], ATT_HANDLE_VALUE_IND | ATT_HANDLE_VALUE_NTF) {
self.handle_notification_packet(&pdu.as_ref()[1..]).await?;
} else {
self.response_channel.send((handle, pdu)).await;
}
}
}
fn response<'a>(data: &'a [u8]) -> Result<AttRsp<'a>, BleHostError<C::Error>> {
let att = Att::decode(data)?;
match att {
Att::Server(AttServer::Response(rsp)) => Ok(rsp),
_ => Err(Error::UnexpectedGattResponse.into()),
}
}
}
#[cfg(test)]
mod tests {
extern crate std;
use core::task::Poll;
use bt_hci::param::{AddrKind, BdAddr, ConnHandle, LeConnRole};
use embassy_sync::blocking_mutex::raw::NoopRawMutex;
use super::*;
use crate::att::{self, Att, AttClient, AttReq};
use crate::attribute::Service;
use crate::attribute_server::AttributeServer;
use crate::connection::ConnParams;
use crate::connection_manager::tests::{setup, ADDR_1};
use crate::cursor::WriteCursor;
use crate::pdu::Pdu;
use crate::prelude::*;
fn build_read_by_type_pdu(start: u16, end: u16, uuid: &Uuid) -> (<DefaultPacketPool as PacketPool>::Packet, usize) {
let att = Att::Client(AttClient::Request(AttReq::ReadByType {
start,
end,
attribute_type: uuid.clone(),
}));
let mut packet = DefaultPacketPool::allocate().unwrap();
let mut w = WriteCursor::new(packet.as_mut());
w.write(att).unwrap();
let len = w.len();
(packet, len)
}
#[test]
fn test_process_accept_read_by_type_no_partial_entries() {
let _ = env_logger::try_init();
const MAX_ATTRIBUTES: usize = 64;
const CONNECTIONS_MAX: usize = 3;
const CCCD_MAX: usize = 64;
const NUM_CHARACTERISTICS: u8 = 9;
const ATT_MTU: u16 = 185;
const ENTRY_SIZE: usize = 21;
const RESPONSE_HEADER_SIZE: usize = 2;
let char_decl_uuid = Uuid::new_short(0x2803);
let mut table: AttributeTable<'_, NoopRawMutex, MAX_ATTRIBUTES> = AttributeTable::new();
{
let mut svc = table.add_service(Service {
uuid: Uuid::new_long([0x32; 16]),
});
for i in 0..NUM_CHARACTERISTICS {
let mut uuid_bytes = [0x32u8; 16];
uuid_bytes[0] = i;
let _char = svc
.add_characteristic_ro::<[u8; 2], _>(Uuid::new_long(uuid_bytes), &[0, 0])
.build();
}
}
let server = AttributeServer::<_, DefaultPacketPool, MAX_ATTRIBUTES, CCCD_MAX, CONNECTIONS_MAX>::new(table);
let mgr = setup();
assert!(mgr.poll_accept(LeConnRole::Peripheral, &[], None).is_pending());
unwrap!(mgr.connect(
ConnHandle::new(0),
AddrKind::RANDOM,
BdAddr::new(ADDR_1),
LeConnRole::Peripheral,
ConnParams::new(),
));
let Poll::Ready(conn) = mgr.poll_accept(LeConnRole::Peripheral, &[], None) else {
panic!("expected connection to be accepted");
};
conn.set_att_mtu(ATT_MTU);
let mut start: u16 = 1;
let mut total_chars_found: usize = 0;
loop {
let (packet, len) = build_read_by_type_pdu(start, u16::MAX, &char_decl_uuid);
let pdu = Pdu::new(packet, len);
let reply = process_accept::<DefaultPacketPool>(&pdu, &conn, &server).unwrap();
let att_bytes = reply
.att_payload()
.expect("process_accept should produce a response PDU");
if att_bytes[0] == att::ATT_ERROR_RSP {
break;
}
assert_eq!(att_bytes[0], att::ATT_READ_BY_TYPE_RSP);
let entry_len = att_bytes[1] as usize;
assert_eq!(entry_len, ENTRY_SIZE);
let payload = &att_bytes[RESPONSE_HEADER_SIZE..];
assert_eq!(
payload.len() % entry_len,
0,
"ReadByType payload length {} is not a multiple of entry size {} — \
partial entry detected (ATT MTU truncation bug)",
payload.len(),
entry_len,
);
let num_entries = payload.len() / entry_len;
assert!(num_entries > 0);
total_chars_found += num_entries;
let last_entry = &payload[(num_entries - 1) * entry_len..];
let last_handle = u16::from_le_bytes([last_entry[0], last_entry[1]]);
start = last_handle + 1;
core::mem::forget(reply);
}
assert_eq!(
total_chars_found, NUM_CHARACTERISTICS as usize,
"should discover all {} characteristics",
NUM_CHARACTERISTICS,
);
}
}