1use core::cell::RefCell;
5use core::future::poll_fn;
6use core::mem::MaybeUninit;
7use core::task::Poll;
8
9use bt_hci::cmd::controller_baseband::{
10 HostBufferSize, HostNumberOfCompletedPackets, Reset, SetControllerToHostFlowControl, SetEventMask,
11 SetEventMaskPage2,
12};
13use bt_hci::cmd::info::ReadBdAddr;
14use bt_hci::cmd::le::{
15 LeConnUpdate, LeCreateConnCancel, LeEnableEncryption, LeLongTermKeyRequestReply, LeReadBufferSize,
16 LeReadFilterAcceptListSize, LeSetAdvEnable, LeSetEventMask, LeSetExtAdvEnable, LeSetExtScanEnable, LeSetRandomAddr,
17 LeSetScanEnable,
18};
19use bt_hci::cmd::link_control::Disconnect;
20use bt_hci::cmd::{AsyncCmd, SyncCmd};
21use bt_hci::controller::{blocking, Controller, ControllerCmdAsync, ControllerCmdSync};
22use bt_hci::data::{AclBroadcastFlag, AclPacket, AclPacketBoundary};
23#[cfg(feature = "scan")]
24use bt_hci::event::le::LeAdvertisingReport;
25#[cfg(feature = "scan")]
26use bt_hci::event::le::LeExtendedAdvertisingReport;
27use bt_hci::event::le::{
28 LeAdvertisingSetTerminated, LeConnectionComplete, LeConnectionUpdateComplete, LeDataLengthChange,
29 LeEnhancedConnectionComplete, LeEventKind, LeEventPacket, LePhyUpdateComplete, LeRemoteConnectionParameterRequest,
30};
31use bt_hci::event::{DisconnectionComplete, EventKind, NumberOfCompletedPackets, Vendor};
32use bt_hci::param::{
33 AddrKind, AdvHandle, AdvSet, BdAddr, ConnHandle, DisconnectReason, EventMask, EventMaskPage2, FilterDuplicates,
34 LeConnRole, LeEventMask, Status,
35};
36use bt_hci::{ControllerToHostPacket, FromHciBytes, WriteHci};
37use embassy_futures::select::{select3, select4, Either3, Either4};
38use embassy_sync::once_lock::OnceLock;
39use embassy_sync::waitqueue::WakerRegistration;
40#[cfg(feature = "gatt")]
41use embassy_sync::{blocking_mutex::raw::NoopRawMutex, channel::Channel};
42use embassy_time::Duration;
43use futures::pin_mut;
44
45use crate::att::{AttClient, AttServer};
46use crate::channel_manager::{ChannelManager, ChannelStorage};
47use crate::command::CommandState;
48use crate::connection::ConnectionEvent;
49use crate::connection_manager::{ConnectionManager, ConnectionStorage, PacketGrant};
50use crate::cursor::WriteCursor;
51use crate::pdu::Pdu;
52#[cfg(feature = "security")]
53use crate::security_manager::SecurityEventData;
54use crate::types::l2cap::{
55 ConnParamUpdateReq, ConnParamUpdateRes, L2capHeader, L2capSignal, L2capSignalHeader, L2CAP_CID_ATT,
56 L2CAP_CID_DYN_START, L2CAP_CID_LE_U_SECURITY_MANAGER, L2CAP_CID_LE_U_SIGNAL,
57};
58use crate::{att, Address, BleHostError, Error, PacketPool, Stack};
59
60pub(crate) struct BleHost<'d, T, P: PacketPool> {
68 initialized: OnceLock<InitialState>,
69 metrics: RefCell<HostMetrics>,
70 pub(crate) address: Option<Address>,
71 pub(crate) controller: T,
72 pub(crate) connections: ConnectionManager<'d, P>,
73 pub(crate) channels: ChannelManager<'d, P>,
74 #[cfg(feature = "gatt")]
75 pub(crate) att_client: Channel<NoopRawMutex, (ConnHandle, Pdu<P::Packet>), { crate::config::L2CAP_RX_QUEUE_SIZE }>,
76 pub(crate) advertise_state: AdvState<'d>,
77 pub(crate) advertise_command_state: CommandState<bool>,
78 pub(crate) connect_command_state: CommandState<bool>,
79 pub(crate) scan_command_state: CommandState<bool>,
80}
81
82#[derive(Clone, Copy)]
83pub(crate) struct InitialState {
84 acl_max: usize,
85}
86
87#[cfg_attr(feature = "defmt", derive(defmt::Format))]
88#[derive(Clone, Copy, Debug)]
89pub(crate) enum AdvHandleState {
90 None,
91 Advertising(AdvHandle),
92 Terminated(AdvHandle),
93}
94
95pub(crate) struct AdvInnerState<'d> {
96 handles: &'d mut [AdvHandleState],
97 waker: WakerRegistration,
98}
99
100pub(crate) struct AdvState<'d> {
101 state: RefCell<AdvInnerState<'d>>,
102}
103
104impl<'d> AdvState<'d> {
105 pub(crate) fn new(handles: &'d mut [AdvHandleState]) -> Self {
106 Self {
107 state: RefCell::new(AdvInnerState {
108 handles,
109 waker: WakerRegistration::new(),
110 }),
111 }
112 }
113
114 pub(crate) fn reset(&self) {
115 let mut state = self.state.borrow_mut();
116 for entry in state.handles.iter_mut() {
117 *entry = AdvHandleState::None;
118 }
119 state.waker.wake();
120 }
121
122 pub(crate) fn terminate(&self, handle: AdvHandle) {
124 let mut state = self.state.borrow_mut();
125 for entry in state.handles.iter_mut() {
126 match entry {
127 AdvHandleState::Advertising(h) if *h == handle => {
128 *entry = AdvHandleState::Terminated(handle);
129 }
130 _ => {}
131 }
132 }
133 state.waker.wake();
134 }
135
136 pub(crate) fn len(&self) -> usize {
137 let state = self.state.borrow();
138 state.handles.len()
139 }
140
141 pub(crate) fn start(&self, sets: &[AdvSet]) {
142 let mut state = self.state.borrow_mut();
143 assert!(sets.len() <= state.handles.len());
144 for handle in state.handles.iter_mut() {
145 *handle = AdvHandleState::None;
146 }
147
148 for (idx, entry) in sets.iter().enumerate() {
149 state.handles[idx] = AdvHandleState::Advertising(entry.adv_handle);
150 }
151 }
152
153 pub async fn wait(&self) {
154 poll_fn(|cx| {
155 let mut state = self.state.borrow_mut();
156 state.waker.register(cx.waker());
157
158 let mut terminated = 0;
159 for entry in state.handles.iter() {
160 match entry {
161 AdvHandleState::Terminated(_) => {
162 terminated += 1;
163 }
164 AdvHandleState::None => {
165 terminated += 1;
166 }
167 _ => {}
168 }
169 }
170 if terminated == state.handles.len() {
171 Poll::Ready(())
172 } else {
173 Poll::Pending
174 }
175 })
176 .await;
177 }
178}
179
180#[derive(Default, Clone)]
182pub struct HostMetrics {
183 pub connect_events: u32,
185 pub disconnect_events: u32,
187 pub rx_errors: u32,
189}
190
191impl<'d, T, P> BleHost<'d, T, P>
192where
193 T: Controller,
194 P: PacketPool,
195{
196 #[allow(clippy::too_many_arguments)]
201 pub(crate) fn new(
202 controller: T,
203 connections: &'d mut [ConnectionStorage<P::Packet>],
204 channels: &'d mut [ChannelStorage<P::Packet>],
205 advertise_handles: &'d mut [AdvHandleState],
206 ) -> Self {
207 Self {
208 address: None,
209 initialized: OnceLock::new(),
210 metrics: RefCell::new(HostMetrics::default()),
211 controller,
212 connections: ConnectionManager::new(connections, P::MTU as u16 - 4),
213 channels: ChannelManager::new(channels),
214 #[cfg(feature = "gatt")]
215 att_client: Channel::new(),
216 advertise_state: AdvState::new(advertise_handles),
217 advertise_command_state: CommandState::new(),
218 scan_command_state: CommandState::new(),
219 connect_command_state: CommandState::new(),
220 }
221 }
222
223 pub(crate) async fn command<C>(&self, cmd: C) -> Result<C::Return, BleHostError<T::Error>>
225 where
226 C: SyncCmd,
227 T: ControllerCmdSync<C>,
228 {
229 let _ = self.initialized.get().await;
230 let ret = cmd.exec(&self.controller).await?;
231 Ok(ret)
232 }
233
234 pub(crate) async fn async_command<C>(&self, cmd: C) -> Result<(), BleHostError<T::Error>>
236 where
237 C: AsyncCmd,
238 T: ControllerCmdAsync<C>,
239 {
240 let _ = self.initialized.get().await;
241 cmd.exec(&self.controller).await?;
242 Ok(())
243 }
244
245 fn handle_connection(
246 &self,
247 status: Status,
248 handle: ConnHandle,
249 peer_addr_kind: AddrKind,
250 peer_addr: BdAddr,
251 role: LeConnRole,
252 ) -> bool {
253 match status.to_result() {
254 Ok(_) => {
255 if let Err(err) = self.connections.connect(handle, peer_addr_kind, peer_addr, role) {
256 warn!("Error establishing connection: {:?}", err);
257 return false;
258 } else {
259 #[cfg(feature = "defmt")]
260 debug!(
261 "[host] connection with handle {:?} established to {:02x}",
262 handle, peer_addr
263 );
264
265 #[cfg(feature = "log")]
266 debug!(
267 "[host] connection with handle {:?} established to {:02x?}",
268 handle, peer_addr
269 );
270 let mut m = self.metrics.borrow_mut();
271 m.connect_events = m.connect_events.wrapping_add(1);
272 }
273 }
274 Err(bt_hci::param::Error::ADV_TIMEOUT) => {
275 self.advertise_state.reset();
276 }
277 Err(bt_hci::param::Error::UNKNOWN_CONN_IDENTIFIER) => {
278 warn!("[host] connect cancelled");
279 self.connect_command_state.canceled();
280 }
281 Err(e) => {
282 warn!("Error connection complete event: {:?}", e);
283 self.connect_command_state.canceled();
284 }
285 }
286 true
287 }
288
289 fn handle_acl(&self, acl: AclPacket<'_>, event_handler: &dyn EventHandler) -> Result<(), Error> {
290 self.connections.received(acl.handle())?;
291 let handle = acl.handle();
292 let (header, pdu) = match acl.boundary_flag() {
293 AclPacketBoundary::FirstFlushable => {
294 let (header, data) = L2capHeader::from_hci_bytes(acl.data())?;
295
296 if header.channel < L2CAP_CID_DYN_START
298 && !(&[L2CAP_CID_LE_U_SIGNAL, L2CAP_CID_ATT, L2CAP_CID_LE_U_SECURITY_MANAGER]
299 .contains(&header.channel))
300 {
301 warn!("[host] unsupported l2cap channel id {}", header.channel);
302 return Err(Error::NotSupported);
303 }
304
305 if header.channel == L2CAP_CID_LE_U_SIGNAL {
307 assert!(data.len() == header.length as usize);
308 self.channels.signal(acl.handle(), data, &self.connections)?;
309 return Ok(());
310 }
311
312 trace!(
313 "[host] inbound l2cap header channel = {}, fragment len = {}, total = {}",
314 header.channel,
315 data.len(),
316 header.length
317 );
318
319 if header.length as usize != data.len() {
321 #[cfg(feature = "l2cap-sdu-reassembly-optimization")]
323 if header.channel >= L2CAP_CID_DYN_START {
324 self.channels.received(header.channel, 1)?;
326
327 self.connections.reassembly(acl.handle(), |p| {
328 let r = if !p.in_progress() {
329 let (first, payload) = data.split_at(2);
331 let len: u16 = u16::from_le_bytes([first[0], first[1]]);
332 let Some(packet) = P::allocate() else {
333 warn!("[host] no memory for packets on channel {}", header.channel);
334 return Err(Error::OutOfMemory);
335 };
336 p.init(header.channel, len, packet)?;
337 p.update(payload)?
338 } else {
339 p.update(data)?
340 };
341 if r.is_some() {
343 Err(Error::InvalidState)
344 } else {
345 Ok(())
346 }
347 })?;
348 return Ok(());
349 }
350
351 let Some(packet) = P::allocate() else {
352 warn!("[host] no memory for packets on channel {}", header.channel);
353 return Err(Error::OutOfMemory);
354 };
355 self.connections.reassembly(acl.handle(), |p| {
356 p.init(header.channel, header.length, packet)?;
357 let r = p.update(data)?;
358 if r.is_some() {
359 Err(Error::InvalidState)
360 } else {
361 Ok(())
362 }
363 })?;
364 return Ok(());
365 } else {
366 #[allow(unused_mut)]
367 let mut result = None;
368
369 #[cfg(feature = "l2cap-sdu-reassembly-optimization")]
370 if header.channel >= L2CAP_CID_DYN_START {
371 self.channels.received(header.channel, 1)?;
373
374 if let Some((state, pdu)) = self.connections.reassembly(acl.handle(), |p| {
375 if !p.in_progress() {
376 let (first, payload) = data.split_at(2);
377 let len: u16 = u16::from_le_bytes([first[0], first[1]]);
378
379 let Some(packet) = P::allocate() else {
380 warn!("[host] no memory for packets on channel {}", header.channel);
381 return Err(Error::OutOfMemory);
382 };
383 p.init(header.channel, len, packet)?;
384 p.update(payload)
385 } else {
386 p.update(data)
387 }
388 })? {
389 result.replace((state, pdu));
390 } else {
391 return Ok(());
392 }
393 }
394
395 if let Some((state, pdu)) = result {
396 (state, pdu)
397 } else {
398 let Some(packet) = P::allocate() else {
399 warn!("[host] no memory for packets on channel {}", header.channel);
400 return Err(Error::OutOfMemory);
401 };
402 let result = self.connections.reassembly(acl.handle(), |p| {
403 p.init(header.channel, header.length, packet)?;
404 p.update(data)
405 })?;
406 let Some((state, pdu)) = result else {
407 return Err(Error::InvalidState);
408 };
409 (state, pdu)
410 }
411 }
412 }
413 AclPacketBoundary::Continuing => {
415 trace!("[host] inbound l2cap len = {}", acl.data().len(),);
416 if let Some((header, p)) = self.connections.reassembly(acl.handle(), |p| {
418 if !p.in_progress() {
419 warn!(
420 "[host] unexpected continuation fragment of length {} for handle {}: {:?}",
421 acl.data().len(),
422 acl.handle().raw(),
423 p
424 );
425 return Err(Error::InvalidState);
426 }
427 p.update(acl.data())
428 })? {
429 (header, p)
430 } else {
431 return Ok(());
433 }
434 }
435 other => {
436 warn!("Unexpected boundary flag: {:?}!", other);
437 return Err(Error::NotSupported);
438 }
439 };
440
441 match header.channel {
442 L2CAP_CID_ATT => {
443 let a = att::Att::decode(pdu.as_ref());
446 if let Ok(att::Att::Client(AttClient::Request(att::AttReq::ExchangeMtu { mtu }))) = a {
447 let mtu = self.connections.exchange_att_mtu(acl.handle(), mtu);
448
449 let rsp = att::Att::Server(AttServer::Response(att::AttRsp::ExchangeMtu { mtu }));
450 let l2cap = L2capHeader {
451 channel: L2CAP_CID_ATT,
452 length: 3,
453 };
454
455 let mut packet = pdu.into_inner();
456 let mut w = WriteCursor::new(packet.as_mut());
457 w.write_hci(&l2cap)?;
458 w.write(rsp)?;
459
460 info!("[host] agreed att MTU of {}", mtu);
461 let len = w.len();
462 self.connections.try_outbound(acl.handle(), Pdu::new(packet, len))?;
463 } else if let Ok(att::Att::Server(AttServer::Response(att::AttRsp::ExchangeMtu { mtu }))) = a {
464 info!("[host] remote agreed att MTU of {}", mtu);
465 self.connections.exchange_att_mtu(acl.handle(), mtu);
466 } else {
467 #[cfg(feature = "gatt")]
468 match a {
469 Ok(att::Att::Client(_)) => {
470 self.connections.post_gatt(acl.handle(), pdu)?;
471 }
472 Ok(att::Att::Server(_)) => {
473 if let Err(e) = self.att_client.try_send((acl.handle(), pdu)) {
474 return Err(Error::OutOfMemory);
475 }
476 }
477 Err(e) => {
478 warn!("Error decoding attribute payload: {:?}", e);
479 }
480 }
481 #[cfg(not(feature = "gatt"))]
482 {
483 if let Ok(att::Att::Client(_)) = a {
484 drop(a);
485
486 let opcode = pdu.as_ref()[0];
487 let rsp = att::Att::Server(AttServer::Response(att::AttRsp::Error {
488 request: opcode,
489 handle: acl.handle().raw(),
490 code: att::AttErrorCode::ATTRIBUTE_NOT_FOUND,
491 }));
492
493 let mut packet = pdu.into_inner();
494 let mut w = WriteCursor::new(packet.as_mut());
495
496 let l2cap = L2capHeader {
497 channel: L2CAP_CID_ATT,
498 length: rsp.size() as u16,
499 };
500
501 w.write_hci(&l2cap)?;
502 w.write(rsp)?;
503
504 let len = w.len();
505 self.connections.try_outbound(acl.handle(), Pdu::new(packet, len))?;
506 warn!("[host] got attribute request but 'gatt' feature is not enabled.");
507 return Ok(());
508 } else {
509 warn!("Got unsupported ATT: {:?}", a);
510 return Err(Error::NotSupported);
511 }
512 }
513 }
514 }
515 L2CAP_CID_LE_U_SIGNAL => {
516 panic!("le signalling channel was fragmented, impossible!");
517 }
518 L2CAP_CID_LE_U_SECURITY_MANAGER => {
519 self.connections
520 .handle_security_channel(acl.handle(), pdu, event_handler)?;
521 }
522 other if other >= L2CAP_CID_DYN_START => match self.channels.dispatch(header.channel, pdu) {
523 Ok(_) => {}
524 Err(e) => {
525 warn!("Error dispatching l2cap packet to channel: {:?}", e);
526 return Err(e);
527 }
528 },
529 chan => {
530 debug!(
531 "[host] conn {:?} attempted to use unsupported l2cap channel {}, ignoring",
532 acl.handle(),
533 chan
534 );
535 return Ok(());
536 }
537 }
538 Ok(())
539 }
540
541 pub(crate) async fn l2cap_signal<D: L2capSignal>(
543 &self,
544 conn: ConnHandle,
545 identifier: u8,
546 signal: &D,
547 p_buf: &mut [u8],
548 ) -> Result<(), BleHostError<T::Error>> {
549 let header = L2capSignalHeader {
555 identifier,
556 code: D::code(),
557 length: signal.size() as u16,
558 };
559 let l2cap = L2capHeader {
560 channel: D::channel(),
561 length: header.size() as u16 + header.length,
562 };
563
564 let mut w = WriteCursor::new(p_buf);
565 w.write_hci(&l2cap)?;
566 w.write_hci(&header)?;
567 w.write_hci(signal)?;
568
569 let mut sender = self.l2cap(conn, w.len() as u16, 1).await?;
570 sender.send(w.finish()).await?;
571
572 Ok(())
573 }
574
575 pub(crate) async fn l2cap(
580 &self,
581 handle: ConnHandle,
582 len: u16,
583 n_packets: u16,
584 ) -> Result<L2capSender<'_, 'd, T, P::Packet>, BleHostError<T::Error>> {
585 let acl_max = self.initialized.get().await.acl_max as u16;
587 let len = len + (4 * n_packets);
588 let n_acl = len.div_ceil(acl_max);
589 let grant = poll_fn(|cx| self.connections.poll_request_to_send(handle, n_acl as usize, Some(cx))).await?;
590 trace!("[host] granted send packets = {}, len = {}", n_packets, len);
591 Ok(L2capSender {
592 controller: &self.controller,
593 handle,
594 grant,
595 fragment_size: acl_max,
596 })
597 }
598
599 pub(crate) fn try_l2cap(
604 &self,
605 handle: ConnHandle,
606 len: u16,
607 n_packets: u16,
608 ) -> Result<L2capSender<'_, 'd, T, P::Packet>, BleHostError<T::Error>> {
609 let acl_max = self.initialized.try_get().map(|i| i.acl_max).unwrap_or(27) as u16;
610 let len = len + (4 * n_packets);
611 let n_acl = len.div_ceil(acl_max);
612 let grant = match self.connections.poll_request_to_send(handle, n_acl as usize, None) {
613 Poll::Ready(res) => res?,
614 Poll::Pending => {
615 return Err(Error::Busy.into());
616 }
617 };
618 Ok(L2capSender {
619 controller: &self.controller,
620 handle,
621 grant,
622 fragment_size: acl_max,
623 })
624 }
625
626 pub(crate) async fn send_conn_param_update_req(
627 &self,
628 handle: ConnHandle,
629 param: &ConnParamUpdateReq,
630 ) -> Result<(), BleHostError<T::Error>> {
631 self.channels.send_conn_param_update_req(handle, self, param).await
632 }
633
634 pub(crate) async fn send_conn_param_update_res(
635 &self,
636 handle: ConnHandle,
637 param: &ConnParamUpdateRes,
638 ) -> Result<(), BleHostError<T::Error>> {
639 self.channels.send_conn_param_update_res(handle, self, param).await
640 }
641
642 pub(crate) fn metrics<F: FnOnce(&HostMetrics) -> R, R>(&self, f: F) -> R {
644 let m = self.metrics.borrow();
645 f(&m)
646 }
647
648 pub(crate) fn log_status(&self, verbose: bool) {
650 let m = self.metrics.borrow();
651 debug!("[host] connect events: {}", m.connect_events);
652 debug!("[host] disconnect events: {}", m.disconnect_events);
653 debug!("[host] rx errors: {}", m.rx_errors);
654 self.connections.log_status(verbose);
655 self.channels.log_status(verbose);
656 }
657}
658
659pub struct Runner<'d, C, P: PacketPool> {
661 rx: RxRunner<'d, C, P>,
662 control: ControlRunner<'d, C, P>,
663 tx: TxRunner<'d, C, P>,
664}
665
666pub struct RxRunner<'d, C, P: PacketPool> {
668 stack: &'d Stack<'d, C, P>,
669}
670
671pub struct ControlRunner<'d, C, P: PacketPool> {
673 stack: &'d Stack<'d, C, P>,
674}
675
676pub struct TxRunner<'d, C, P: PacketPool> {
678 stack: &'d Stack<'d, C, P>,
679}
680
681pub trait EventHandler {
683 fn on_vendor(&self, vendor: &Vendor) {}
685 #[cfg(feature = "scan")]
687 fn on_adv_reports(&self, reports: bt_hci::param::LeAdvReportsIter) {}
688 #[cfg(feature = "scan")]
690 fn on_ext_adv_reports(&self, reports: bt_hci::param::LeExtAdvReportsIter) {}
691}
692
693struct DummyHandler;
694impl EventHandler for DummyHandler {}
695
696impl<'d, C: Controller, P: PacketPool> Runner<'d, C, P> {
697 pub(crate) fn new(stack: &'d Stack<'d, C, P>) -> Self {
698 Self {
699 rx: RxRunner { stack },
700 control: ControlRunner { stack },
701 tx: TxRunner { stack },
702 }
703 }
704
705 pub fn split(self) -> (RxRunner<'d, C, P>, ControlRunner<'d, C, P>, TxRunner<'d, C, P>) {
707 (self.rx, self.control, self.tx)
708 }
709
710 pub async fn run(&mut self) -> Result<(), BleHostError<C::Error>>
712 where
713 C: ControllerCmdSync<Disconnect>
714 + ControllerCmdSync<SetEventMask>
715 + ControllerCmdSync<SetEventMaskPage2>
716 + ControllerCmdSync<LeSetEventMask>
717 + ControllerCmdSync<LeSetRandomAddr>
718 + ControllerCmdSync<HostBufferSize>
719 + ControllerCmdAsync<LeConnUpdate>
720 + ControllerCmdSync<LeReadFilterAcceptListSize>
721 + ControllerCmdSync<SetControllerToHostFlowControl>
722 + ControllerCmdSync<Reset>
723 + ControllerCmdSync<LeCreateConnCancel>
724 + ControllerCmdSync<LeSetScanEnable>
725 + ControllerCmdSync<LeSetExtScanEnable>
726 + for<'t> ControllerCmdSync<LeSetAdvEnable>
727 + for<'t> ControllerCmdSync<LeSetExtAdvEnable<'t>>
728 + for<'t> ControllerCmdSync<HostNumberOfCompletedPackets<'t>>
729 + ControllerCmdSync<LeReadBufferSize>
730 + ControllerCmdSync<LeLongTermKeyRequestReply>
731 + ControllerCmdAsync<LeEnableEncryption>
732 + ControllerCmdSync<ReadBdAddr>,
733 {
734 let dummy = DummyHandler;
735 self.run_with_handler(&dummy).await
736 }
737
738 pub async fn run_with_handler<E: EventHandler>(&mut self, event_handler: &E) -> Result<(), BleHostError<C::Error>>
740 where
741 C: ControllerCmdSync<Disconnect>
742 + ControllerCmdSync<SetEventMask>
743 + ControllerCmdSync<SetEventMaskPage2>
744 + ControllerCmdSync<LeSetEventMask>
745 + ControllerCmdSync<LeSetRandomAddr>
746 + ControllerCmdSync<LeReadFilterAcceptListSize>
747 + ControllerCmdSync<HostBufferSize>
748 + ControllerCmdAsync<LeConnUpdate>
749 + ControllerCmdSync<SetControllerToHostFlowControl>
750 + for<'t> ControllerCmdSync<LeSetAdvEnable>
751 + for<'t> ControllerCmdSync<LeSetExtAdvEnable<'t>>
752 + for<'t> ControllerCmdSync<HostNumberOfCompletedPackets<'t>>
753 + ControllerCmdSync<LeSetScanEnable>
754 + ControllerCmdSync<LeSetExtScanEnable>
755 + ControllerCmdSync<Reset>
756 + ControllerCmdSync<LeCreateConnCancel>
757 + ControllerCmdSync<LeReadBufferSize>
758 + ControllerCmdSync<LeLongTermKeyRequestReply>
759 + ControllerCmdAsync<LeEnableEncryption>
760 + ControllerCmdSync<ReadBdAddr>,
761 {
762 let control_fut = self.control.run();
763 let rx_fut = self.rx.run_with_handler(event_handler);
764 let tx_fut = self.tx.run();
765 pin_mut!(control_fut, rx_fut, tx_fut);
766 match select3(&mut tx_fut, &mut rx_fut, &mut control_fut).await {
767 Either3::First(result) => {
768 trace!("[host] tx_fut exit");
769 result
770 }
771 Either3::Second(result) => {
772 trace!("[host] rx_fut exit");
773 result
774 }
775 Either3::Third(result) => {
776 trace!("[host] control_fut exit");
777 result
778 }
779 }
780 }
781}
782
783impl<'d, C: Controller, P: PacketPool> RxRunner<'d, C, P> {
784 pub async fn run(&mut self) -> Result<(), BleHostError<C::Error>>
786 where
787 C: ControllerCmdSync<Disconnect>,
788 {
789 let dummy = DummyHandler;
790 self.run_with_handler(&dummy).await
791 }
792
793 pub async fn run_with_handler<E: EventHandler>(&mut self, event_handler: &E) -> Result<(), BleHostError<C::Error>>
796 where
797 C: ControllerCmdSync<Disconnect>,
798 {
799 const MAX_HCI_PACKET_LEN: usize = 259;
800 let host = &self.stack.host;
801 loop {
804 let mut rx = [0u8; MAX_HCI_PACKET_LEN];
806 let result = host.controller.read(&mut rx).await;
812 match result {
815 Ok(ControllerToHostPacket::Acl(acl)) => match host.handle_acl(acl, event_handler) {
816 Ok(_) => {}
817 Err(e) => {
818 warn!(
819 "[host] encountered error processing ACL data for {:?}: {:?}",
820 acl.handle(),
821 e
822 );
823
824 match e {
825 Error::InvalidState | Error::Disconnected => {
826 warn!("[host] requesting {:?} to be disconnected", acl.handle());
827 host.connections.log_status(true);
828 host.connections.request_handle_disconnect(
829 acl.handle(),
830 DisconnectReason::RemoteUserTerminatedConn,
831 );
832 }
833 _ => {}
834 }
835
836 let mut m = host.metrics.borrow_mut();
837 m.rx_errors = m.rx_errors.wrapping_add(1);
838 }
839 },
840 Ok(ControllerToHostPacket::Event(event)) => {
841 match event.kind {
842 EventKind::Le => {
843 let event = unwrap!(LeEventPacket::from_hci_bytes_complete(event.data));
844 match event.kind {
845 LeEventKind::LeConnectionComplete => {
846 let e = unwrap!(LeConnectionComplete::from_hci_bytes_complete(event.data));
847 if !host.handle_connection(
848 e.status,
849 e.handle,
850 e.peer_addr_kind,
851 e.peer_addr,
852 e.role,
853 ) {
854 let _ = host
855 .command(Disconnect::new(
856 e.handle,
857 DisconnectReason::RemoteDeviceTerminatedConnLowResources,
858 ))
859 .await;
860 host.connect_command_state.canceled();
861 }
862 }
863 LeEventKind::LeEnhancedConnectionComplete => {
864 let e = unwrap!(LeEnhancedConnectionComplete::from_hci_bytes_complete(event.data));
865 if !host.handle_connection(
866 e.status,
867 e.handle,
868 e.peer_addr_kind,
869 e.peer_addr,
870 e.role,
871 ) {
872 let _ = host
873 .command(Disconnect::new(
874 e.handle,
875 DisconnectReason::RemoteDeviceTerminatedConnLowResources,
876 ))
877 .await;
878 host.connect_command_state.canceled();
879 }
880 }
881 LeEventKind::LeScanTimeout => {}
882 LeEventKind::LeAdvertisingSetTerminated => {
883 let set = unwrap!(LeAdvertisingSetTerminated::from_hci_bytes_complete(event.data));
884 host.advertise_state.terminate(set.adv_handle);
885 }
886 LeEventKind::LeExtendedAdvertisingReport => {
887 #[cfg(feature = "scan")]
888 {
889 let data =
890 unwrap!(LeExtendedAdvertisingReport::from_hci_bytes_complete(event.data));
891 event_handler.on_ext_adv_reports(data.reports.iter());
892 }
893 }
894 LeEventKind::LeAdvertisingReport => {
895 #[cfg(feature = "scan")]
896 {
897 let data = unwrap!(LeAdvertisingReport::from_hci_bytes_complete(event.data));
898 event_handler.on_adv_reports(data.reports.iter());
899 }
900 }
901 LeEventKind::LeLongTermKeyRequest => {
902 host.connections.handle_security_hci_le_event(event)?;
903 }
904 LeEventKind::LePhyUpdateComplete => {
905 let event = unwrap!(LePhyUpdateComplete::from_hci_bytes_complete(event.data));
906 if let Err(e) = event.status.to_result() {
907 warn!("[host] error updating phy for {:?}: {:?}", event.handle, e);
908 } else {
909 let _ = host.connections.post_handle_event(
910 event.handle,
911 ConnectionEvent::PhyUpdated {
912 tx_phy: event.tx_phy,
913 rx_phy: event.rx_phy,
914 },
915 );
916 }
917 }
918 LeEventKind::LeConnectionUpdateComplete => {
919 let event =
920 unwrap!(LeConnectionUpdateComplete::from_hci_bytes_complete(event.data));
921 if let Err(e) = event.status.to_result() {
922 warn!(
923 "[host] error updating connection parameters for {:?}: {:?}",
924 event.handle, e
925 );
926 } else {
927 let _ = host.connections.post_handle_event(
928 event.handle,
929 ConnectionEvent::ConnectionParamsUpdated {
930 conn_interval: Duration::from_micros(event.conn_interval.as_micros()),
931 peripheral_latency: event.peripheral_latency,
932 supervision_timeout: Duration::from_micros(
933 event.supervision_timeout.as_micros(),
934 ),
935 },
936 );
937 }
938 }
939 LeEventKind::LeDataLengthChange => {
940 let event = unwrap!(LeDataLengthChange::from_hci_bytes_complete(event.data));
941 let _ = host.connections.post_handle_event(
942 event.handle,
943 ConnectionEvent::DataLengthUpdated {
944 max_tx_octets: event.max_tx_octets,
945 max_tx_time: event.max_tx_time,
946 max_rx_octets: event.max_rx_octets,
947 max_rx_time: event.max_rx_time,
948 },
949 );
950 }
951 LeEventKind::LeRemoteConnectionParameterRequest => {
952 let event = unwrap!(LeRemoteConnectionParameterRequest::from_hci_bytes_complete(
953 event.data
954 ));
955 let _ = host.connections.post_handle_event(
956 event.handle,
957 ConnectionEvent::RequestConnectionParams {
958 min_connection_interval: Duration::from_micros(
959 event.interval_min.as_micros(),
960 ),
961 max_connection_interval: Duration::from_micros(
962 event.interval_min.as_micros(),
963 ),
964 max_latency: event.max_latency,
965 supervision_timeout: Duration::from_micros(event.timeout.as_micros()),
966 },
967 );
968 }
969 _ => {
970 warn!("Unknown LE event!");
971 }
972 }
973 }
974 EventKind::DisconnectionComplete => {
975 let e = unwrap!(DisconnectionComplete::from_hci_bytes_complete(event.data));
976 let handle = e.handle;
977 let reason = if let Err(e) = e.status.to_result() {
978 info!("[host] disconnection event on handle {}, status: {:?}", handle.raw(), e);
979 None
980 } else if let Err(err) = e.reason.to_result() {
981 info!(
982 "[host] disconnection event on handle {}, reason: {:?}",
983 handle.raw(),
984 err
985 );
986 Some(e.reason)
987 } else {
988 info!("[host] disconnection event on handle {}", handle.raw());
989 None
990 }
991 .unwrap_or(Status::UNSPECIFIED);
992 let _ = host.connections.disconnected(handle, reason);
993 let _ = host.channels.disconnected(handle);
994 let mut m = host.metrics.borrow_mut();
995 m.disconnect_events = m.disconnect_events.wrapping_add(1);
996 }
997 EventKind::NumberOfCompletedPackets => {
998 let c = unwrap!(NumberOfCompletedPackets::from_hci_bytes_complete(event.data));
999 for entry in c.completed_packets.iter() {
1001 match (entry.handle(), entry.num_completed_packets()) {
1002 (Ok(handle), Ok(completed)) => {
1003 let _ = host.connections.confirm_sent(handle, completed as usize);
1004 }
1005 (Ok(handle), Err(e)) => {
1006 warn!("[host] error processing completed packets for {:?}: {:?}", handle, e);
1007 }
1008 _ => {}
1009 }
1010 }
1011 }
1012 EventKind::Vendor => {
1013 let vendor = unwrap!(Vendor::from_hci_bytes_complete(event.data));
1014 event_handler.on_vendor(&vendor);
1015 }
1016 EventKind::EncryptionChangeV1 => {
1017 host.connections.handle_security_hci_event(event)?;
1018 }
1019 _ => {}
1021 }
1022 }
1023 Ok(_) => {}
1025 Err(e) => {
1026 return Err(BleHostError::Controller(e));
1027 }
1028 }
1029 }
1030 }
1031}
1032
1033impl<'d, C: Controller, P: PacketPool> ControlRunner<'d, C, P> {
1034 pub async fn run(&mut self) -> Result<(), BleHostError<C::Error>>
1036 where
1037 C: ControllerCmdSync<Disconnect>
1038 + ControllerCmdSync<SetEventMask>
1039 + ControllerCmdSync<SetEventMaskPage2>
1040 + ControllerCmdSync<LeSetEventMask>
1041 + ControllerCmdSync<LeSetRandomAddr>
1042 + ControllerCmdSync<HostBufferSize>
1043 + ControllerCmdAsync<LeConnUpdate>
1044 + ControllerCmdSync<LeReadFilterAcceptListSize>
1045 + ControllerCmdSync<SetControllerToHostFlowControl>
1046 + ControllerCmdSync<Reset>
1047 + ControllerCmdSync<LeCreateConnCancel>
1048 + for<'t> ControllerCmdSync<LeSetAdvEnable>
1049 + for<'t> ControllerCmdSync<LeSetExtAdvEnable<'t>>
1050 + ControllerCmdSync<LeSetScanEnable>
1051 + ControllerCmdSync<LeSetExtScanEnable>
1052 + for<'t> ControllerCmdSync<HostNumberOfCompletedPackets<'t>>
1053 + ControllerCmdSync<LeReadBufferSize>
1054 + ControllerCmdSync<LeLongTermKeyRequestReply>
1055 + ControllerCmdAsync<LeEnableEncryption>
1056 + ControllerCmdSync<ReadBdAddr>,
1057 {
1058 let host = &self.stack.host;
1059 Reset::new().exec(&host.controller).await?;
1060
1061 if let Some(addr) = host.address {
1062 LeSetRandomAddr::new(addr.addr).exec(&host.controller).await?;
1063 }
1064
1065 SetEventMask::new(
1066 EventMask::new()
1067 .enable_le_meta(true)
1068 .enable_conn_request(true)
1069 .enable_conn_complete(true)
1070 .enable_hardware_error(true)
1071 .enable_disconnection_complete(true)
1072 .enable_encryption_change_v1(true),
1073 )
1074 .exec(&host.controller)
1075 .await?;
1076
1077 SetEventMaskPage2::new(EventMaskPage2::new().enable_encryption_change_v2(true))
1078 .exec(&host.controller)
1079 .await?;
1080
1081 LeSetEventMask::new(
1082 LeEventMask::new()
1083 .enable_le_conn_complete(true)
1084 .enable_le_enhanced_conn_complete(true)
1085 .enable_le_conn_update_complete(true)
1086 .enable_le_adv_set_terminated(true)
1087 .enable_le_adv_report(true)
1088 .enable_le_scan_timeout(true)
1089 .enable_le_ext_adv_report(true)
1090 .enable_le_long_term_key_request(true)
1091 .enable_le_phy_update_complete(true)
1092 .enable_le_remote_conn_parameter_request(true)
1093 .enable_le_data_length_change(true),
1094 )
1095 .exec(&host.controller)
1096 .await?;
1097
1098 info!(
1099 "[host] using packet pool with MTU {} capacity {}",
1100 P::MTU,
1101 P::capacity(),
1102 );
1103
1104 let ret = LeReadFilterAcceptListSize::new().exec(&host.controller).await?;
1105 info!("[host] filter accept list size: {}", ret);
1106
1107 let ret = LeReadBufferSize::new().exec(&host.controller).await?;
1108 info!(
1109 "[host] setting txq to {}, fragmenting at {}",
1110 ret.total_num_le_acl_data_packets as usize, ret.le_acl_data_packet_length as usize
1111 );
1112 host.connections
1113 .set_link_credits(ret.total_num_le_acl_data_packets as usize);
1114
1115 const ACL_LEN: u16 = 255;
1116 const ACL_N: u16 = 1;
1117 info!(
1118 "[host] configuring host buffers ({} packets of size {})",
1119 ACL_N, ACL_LEN,
1120 );
1121 HostBufferSize::new(ACL_LEN, 0, ACL_N, 0).exec(&host.controller).await?;
1122
1123 let _ = host.initialized.init(InitialState {
1134 acl_max: ret.le_acl_data_packet_length as usize,
1135 });
1136 info!("[host] initialized");
1137
1138 let device_address = host.command(ReadBdAddr::new()).await?;
1139 if *device_address.raw() != [0, 0, 0, 0, 0, 0] {
1140 let device_address = Address {
1141 kind: AddrKind::PUBLIC,
1142 addr: device_address,
1143 };
1144 info!("[host] Device Address {}", device_address);
1145 if host.address.is_none() {
1146 #[cfg(feature = "security")]
1147 host.connections.security_manager.set_local_address(device_address);
1148 }
1149 }
1150
1151 loop {
1152 match select3(
1153 poll_fn(|cx| host.connections.poll_disconnecting(Some(cx))),
1154 poll_fn(|cx| host.channels.poll_disconnecting(Some(cx))),
1155 select4(
1156 poll_fn(|cx| host.connect_command_state.poll_cancelled(cx)),
1157 poll_fn(|cx| host.advertise_command_state.poll_cancelled(cx)),
1158 poll_fn(|cx| host.scan_command_state.poll_cancelled(cx)),
1159 #[cfg(feature = "security")]
1160 {
1161 host.connections.poll_security_events()
1162 },
1163 #[cfg(not(feature = "security"))]
1164 {
1165 poll_fn(|cx| Poll::<()>::Pending)
1166 },
1167 ),
1168 )
1169 .await
1170 {
1171 Either3::First(request) => {
1172 trace!("[host] poll disconnecting links");
1173 match host.command(Disconnect::new(request.handle(), request.reason())).await {
1174 Ok(_) => {}
1175 Err(BleHostError::BleHost(Error::Hci(bt_hci::param::Error::UNKNOWN_CONN_IDENTIFIER))) => {}
1176 Err(e) => {
1177 return Err(e);
1178 }
1179 }
1180 request.confirm();
1181 }
1182 Either3::Second(request) => {
1183 trace!("[host] poll disconnecting channels");
1184 match request.send(host).await {
1185 Ok(_) => {}
1186 Err(BleHostError::BleHost(Error::Hci(bt_hci::param::Error::UNKNOWN_CONN_IDENTIFIER))) => {}
1187 Err(BleHostError::BleHost(Error::NotFound)) => {}
1188 Err(e) => {
1189 return Err(e);
1190 }
1191 }
1192 request.confirm();
1193 }
1194 Either3::Third(states) => match states {
1195 Either4::First(_) => {
1196 trace!("[host] cancel connection create");
1197 if host.command(LeCreateConnCancel::new()).await.is_err() {
1199 warn!("[host] error cancelling connection");
1200 }
1201 host.connect_command_state.canceled();
1203 }
1204 Either4::Second(ext) => {
1205 trace!("[host] disabling advertising");
1206 if ext {
1207 host.command(LeSetExtAdvEnable::new(false, &[])).await?
1208 } else {
1209 host.command(LeSetAdvEnable::new(false)).await?
1210 }
1211 host.advertise_command_state.canceled();
1212 }
1213 Either4::Third(ext) => {
1214 trace!("[host] disabling scanning");
1215 if ext {
1216 host.command(LeSetExtScanEnable::new(
1218 false,
1219 FilterDuplicates::Disabled,
1220 bt_hci::param::Duration::from_secs(0),
1221 bt_hci::param::Duration::from_secs(0),
1222 ))
1223 .await?;
1224 } else {
1225 host.command(LeSetScanEnable::new(false, false)).await?;
1226 }
1227 host.scan_command_state.canceled();
1228 }
1229 Either4::Fourth(request) => {
1230 #[cfg(feature = "security")]
1231 {
1232 let event_data = request.unwrap_or(SecurityEventData::Timeout);
1233 host.connections.handle_security_event(host, event_data).await?;
1234 }
1235 }
1236 },
1237 }
1238 }
1239 }
1240}
1241
1242impl<'d, C: Controller, P: PacketPool> TxRunner<'d, C, P> {
1243 pub async fn run(&mut self) -> Result<(), BleHostError<C::Error>> {
1245 let host = &self.stack.host;
1246 let params = host.initialized.get().await;
1247 loop {
1248 let (conn, pdu) = host.connections.outbound().await;
1249 match host.l2cap(conn, pdu.len() as u16, 1).await {
1250 Ok(mut sender) => {
1251 if let Err(e) = sender.send(pdu.as_ref()).await {
1252 warn!("[host] error sending outbound pdu");
1253 return Err(e);
1254 }
1255 }
1256 Err(BleHostError::BleHost(Error::NotFound)) => {
1257 warn!("[host] unable to send data to disconnected host (ignored)");
1258 }
1259 Err(BleHostError::BleHost(Error::Disconnected)) => {
1260 warn!("[host] unable to send data to disconnected host (ignored)");
1261 }
1262 Err(e) => {
1263 warn!("[host] error requesting sending outbound pdu");
1264 return Err(e);
1265 }
1266 }
1267 }
1268 }
1269}
1270
1271pub struct L2capSender<'a, 'd, T: Controller, P> {
1272 pub(crate) controller: &'a T,
1273 pub(crate) handle: ConnHandle,
1274 pub(crate) grant: PacketGrant<'a, 'd, P>,
1275 pub(crate) fragment_size: u16,
1276}
1277
1278impl<'a, 'd, T: Controller, P> L2capSender<'a, 'd, T, P> {
1279 pub(crate) fn try_send(&mut self, pdu: &[u8]) -> Result<(), BleHostError<T::Error>>
1280 where
1281 T: blocking::Controller,
1282 {
1283 let mut pbf = AclPacketBoundary::FirstNonFlushable;
1284 for chunk in pdu.chunks(self.fragment_size as usize) {
1290 let acl = AclPacket::new(self.handle, pbf, AclBroadcastFlag::PointToPoint, chunk);
1291 match self.controller.try_write_acl_data(&acl) {
1292 Ok(result) => {
1293 self.grant.confirm(1);
1294 trace!("[host] sent acl packet len = {}", chunk.len());
1295 }
1296 Err(blocking::TryError::Busy) => {
1297 warn!("hci: acl data send busy");
1298 return Err(Error::Busy.into());
1299 }
1300 Err(blocking::TryError::Error(e)) => return Err(BleHostError::Controller(e)),
1301 }
1302 pbf = AclPacketBoundary::Continuing;
1303 }
1304 Ok(())
1305 }
1306
1307 pub(crate) async fn send(&mut self, pdu: &[u8]) -> Result<(), BleHostError<T::Error>> {
1308 let mut pbf = AclPacketBoundary::FirstNonFlushable;
1314 for chunk in pdu.chunks(self.fragment_size as usize) {
1315 let acl = AclPacket::new(self.handle, pbf, AclBroadcastFlag::PointToPoint, chunk);
1316 self.controller
1317 .write_acl_data(&acl)
1318 .await
1319 .map_err(BleHostError::Controller)?;
1320 self.grant.confirm(1);
1321 pbf = AclPacketBoundary::Continuing;
1322 trace!("[host] sent acl packet len = {}", chunk.len());
1323 }
1324 Ok(())
1325 }
1326}
1327
1328#[must_use = "to delay the drop handler invocation to the end of the scope"]
1330pub struct OnDrop<F: FnOnce()> {
1331 f: MaybeUninit<F>,
1332}
1333
1334impl<F: FnOnce()> OnDrop<F> {
1335 pub fn new(f: F) -> Self {
1337 Self { f: MaybeUninit::new(f) }
1338 }
1339
1340 pub fn defuse(self) {
1342 core::mem::forget(self)
1343 }
1344}
1345
1346impl<F: FnOnce()> Drop for OnDrop<F> {
1347 fn drop(&mut self) {
1348 unsafe { self.f.as_ptr().read()() }
1349 }
1350}