1use core::cell::RefCell;
5use core::future::poll_fn;
6use core::mem::MaybeUninit;
7use core::task::Poll;
8
9use bt_hci::cmd::controller_baseband::{
10 HostBufferSize, HostNumberOfCompletedPackets, Reset, SetControllerToHostFlowControl, SetEventMask,
11 SetEventMaskPage2,
12};
13use bt_hci::cmd::info::ReadBdAddr;
14use bt_hci::cmd::le::{
15 LeConnUpdate, LeCreateConnCancel, LeEnableEncryption, LeLongTermKeyRequestReply, LeReadBufferSize,
16 LeReadFilterAcceptListSize, LeSetAdvEnable, LeSetEventMask, LeSetExtAdvEnable, LeSetExtScanEnable, LeSetRandomAddr,
17 LeSetScanEnable,
18};
19use bt_hci::cmd::link_control::Disconnect;
20use bt_hci::cmd::{AsyncCmd, SyncCmd};
21use bt_hci::controller::{blocking, Controller, ControllerCmdAsync, ControllerCmdSync};
22use bt_hci::data::{AclBroadcastFlag, AclPacket, AclPacketBoundary};
23#[cfg(feature = "scan")]
24use bt_hci::event::le::LeAdvertisingReport;
25#[cfg(feature = "scan")]
26use bt_hci::event::le::LeExtendedAdvertisingReport;
27use bt_hci::event::le::{
28 LeAdvertisingSetTerminated, LeConnectionComplete, LeConnectionUpdateComplete, LeDataLengthChange,
29 LeEnhancedConnectionComplete, LeEventKind, LeEventPacket, LePhyUpdateComplete, LeRemoteConnectionParameterRequest,
30};
31use bt_hci::event::{DisconnectionComplete, EventKind, NumberOfCompletedPackets, Vendor};
32use bt_hci::param::{
33 AddrKind, AdvHandle, AdvSet, BdAddr, ConnHandle, DisconnectReason, EventMask, EventMaskPage2, FilterDuplicates,
34 LeConnRole, LeEventMask, Status,
35};
36use bt_hci::{ControllerToHostPacket, FromHciBytes, WriteHci};
37use embassy_futures::select::{select3, select4, Either3, Either4};
38use embassy_sync::once_lock::OnceLock;
39use embassy_sync::waitqueue::WakerRegistration;
40use embassy_time::Duration;
41use futures::pin_mut;
42
43use crate::att::{AttClient, AttServer};
44use crate::channel_manager::{ChannelManager, ChannelStorage};
45use crate::command::CommandState;
46use crate::connection::ConnectionEvent;
47use crate::connection_manager::{ConnectionManager, ConnectionStorage, PacketGrant};
48use crate::cursor::WriteCursor;
49use crate::pdu::Pdu;
50#[cfg(feature = "security")]
51use crate::security_manager::SecurityEventData;
52use crate::types::l2cap::{
53 ConnParamUpdateReq, ConnParamUpdateRes, L2capHeader, L2capSignal, L2capSignalHeader, L2CAP_CID_ATT,
54 L2CAP_CID_DYN_START, L2CAP_CID_LE_U_SECURITY_MANAGER, L2CAP_CID_LE_U_SIGNAL,
55};
56use crate::{att, Address, BleHostError, Error, PacketPool, Stack};
57
58pub(crate) struct BleHost<'d, T, P: PacketPool> {
66 initialized: OnceLock<InitialState>,
67 metrics: RefCell<HostMetrics>,
68 pub(crate) address: Option<Address>,
69 pub(crate) controller: T,
70 pub(crate) connections: ConnectionManager<'d, P>,
71 pub(crate) channels: ChannelManager<'d, P>,
72 pub(crate) advertise_state: AdvState<'d>,
73 pub(crate) advertise_command_state: CommandState<bool>,
74 pub(crate) connect_command_state: CommandState<bool>,
75 pub(crate) scan_command_state: CommandState<bool>,
76}
77
78#[derive(Clone, Copy)]
79pub(crate) struct InitialState {
80 acl_max: usize,
81}
82
83#[cfg_attr(feature = "defmt", derive(defmt::Format))]
84#[derive(Clone, Copy, Debug)]
85pub(crate) enum AdvHandleState {
86 None,
87 Advertising(AdvHandle),
88 Terminated(AdvHandle),
89}
90
91pub(crate) struct AdvInnerState<'d> {
92 handles: &'d mut [AdvHandleState],
93 waker: WakerRegistration,
94}
95
96pub(crate) struct AdvState<'d> {
97 state: RefCell<AdvInnerState<'d>>,
98}
99
100impl<'d> AdvState<'d> {
101 pub(crate) fn new(handles: &'d mut [AdvHandleState]) -> Self {
102 Self {
103 state: RefCell::new(AdvInnerState {
104 handles,
105 waker: WakerRegistration::new(),
106 }),
107 }
108 }
109
110 pub(crate) fn reset(&self) {
111 let mut state = self.state.borrow_mut();
112 for entry in state.handles.iter_mut() {
113 *entry = AdvHandleState::None;
114 }
115 state.waker.wake();
116 }
117
118 pub(crate) fn terminate(&self, handle: AdvHandle) {
120 let mut state = self.state.borrow_mut();
121 for entry in state.handles.iter_mut() {
122 match entry {
123 AdvHandleState::Advertising(h) if *h == handle => {
124 *entry = AdvHandleState::Terminated(handle);
125 }
126 _ => {}
127 }
128 }
129 state.waker.wake();
130 }
131
132 pub(crate) fn len(&self) -> usize {
133 let state = self.state.borrow();
134 state.handles.len()
135 }
136
137 pub(crate) fn start(&self, sets: &[AdvSet]) {
138 let mut state = self.state.borrow_mut();
139 assert!(sets.len() <= state.handles.len());
140 for handle in state.handles.iter_mut() {
141 *handle = AdvHandleState::None;
142 }
143
144 for (idx, entry) in sets.iter().enumerate() {
145 state.handles[idx] = AdvHandleState::Advertising(entry.adv_handle);
146 }
147 }
148
149 pub async fn wait(&self) {
150 poll_fn(|cx| {
151 let mut state = self.state.borrow_mut();
152 state.waker.register(cx.waker());
153
154 let mut terminated = 0;
155 for entry in state.handles.iter() {
156 match entry {
157 AdvHandleState::Terminated(_) => {
158 terminated += 1;
159 }
160 AdvHandleState::None => {
161 terminated += 1;
162 }
163 _ => {}
164 }
165 }
166 if terminated == state.handles.len() {
167 Poll::Ready(())
168 } else {
169 Poll::Pending
170 }
171 })
172 .await;
173 }
174}
175
176#[derive(Default, Clone)]
178pub struct HostMetrics {
179 pub connect_events: u32,
181 pub disconnect_events: u32,
183 pub rx_errors: u32,
185}
186
187impl<'d, T, P> BleHost<'d, T, P>
188where
189 T: Controller,
190 P: PacketPool,
191{
192 #[allow(clippy::too_many_arguments)]
197 pub(crate) fn new(
198 controller: T,
199 connections: &'d mut [ConnectionStorage<P::Packet>],
200 channels: &'d mut [ChannelStorage<P::Packet>],
201 advertise_handles: &'d mut [AdvHandleState],
202 ) -> Self {
203 Self {
204 address: None,
205 initialized: OnceLock::new(),
206 metrics: RefCell::new(HostMetrics::default()),
207 controller,
208 connections: ConnectionManager::new(connections, P::MTU as u16 - 4),
209 channels: ChannelManager::new(channels),
210 advertise_state: AdvState::new(advertise_handles),
211 advertise_command_state: CommandState::new(),
212 scan_command_state: CommandState::new(),
213 connect_command_state: CommandState::new(),
214 }
215 }
216
217 pub(crate) async fn command<C>(&self, cmd: C) -> Result<C::Return, BleHostError<T::Error>>
219 where
220 C: SyncCmd,
221 T: ControllerCmdSync<C>,
222 {
223 let _ = self.initialized.get().await;
224 let ret = cmd.exec(&self.controller).await?;
225 Ok(ret)
226 }
227
228 pub(crate) async fn async_command<C>(&self, cmd: C) -> Result<(), BleHostError<T::Error>>
230 where
231 C: AsyncCmd,
232 T: ControllerCmdAsync<C>,
233 {
234 let _ = self.initialized.get().await;
235 cmd.exec(&self.controller).await?;
236 Ok(())
237 }
238
239 fn handle_connection(
240 &self,
241 status: Status,
242 handle: ConnHandle,
243 peer_addr_kind: AddrKind,
244 peer_addr: BdAddr,
245 role: LeConnRole,
246 ) -> bool {
247 match status.to_result() {
248 Ok(_) => {
249 if let Err(err) = self.connections.connect(handle, peer_addr_kind, peer_addr, role) {
250 warn!("Error establishing connection: {:?}", err);
251 return false;
252 } else {
253 #[cfg(feature = "defmt")]
254 debug!(
255 "[host] connection with handle {:?} established to {:02x}",
256 handle, peer_addr
257 );
258
259 #[cfg(feature = "log")]
260 debug!(
261 "[host] connection with handle {:?} established to {:02x?}",
262 handle, peer_addr
263 );
264 let mut m = self.metrics.borrow_mut();
265 m.connect_events = m.connect_events.wrapping_add(1);
266 }
267 }
268 Err(bt_hci::param::Error::ADV_TIMEOUT) => {
269 self.advertise_state.reset();
270 }
271 Err(bt_hci::param::Error::UNKNOWN_CONN_IDENTIFIER) => {
272 warn!("[host] connect cancelled");
273 self.connect_command_state.canceled();
274 }
275 Err(e) => {
276 warn!("Error connection complete event: {:?}", e);
277 self.connect_command_state.canceled();
278 }
279 }
280 true
281 }
282
283 fn handle_acl(&self, acl: AclPacket<'_>, event_handler: &dyn EventHandler) -> Result<(), Error> {
284 self.connections.received(acl.handle())?;
285 let handle = acl.handle();
286 let (header, pdu) = match acl.boundary_flag() {
287 AclPacketBoundary::FirstFlushable => {
288 let (header, data) = L2capHeader::from_hci_bytes(acl.data())?;
289
290 if header.channel < L2CAP_CID_DYN_START
292 && !(&[L2CAP_CID_LE_U_SIGNAL, L2CAP_CID_ATT, L2CAP_CID_LE_U_SECURITY_MANAGER]
293 .contains(&header.channel))
294 {
295 warn!("[host] unsupported l2cap channel id {}", header.channel);
296 return Err(Error::NotSupported);
297 }
298
299 if header.channel == L2CAP_CID_LE_U_SIGNAL {
301 assert!(data.len() == header.length as usize);
302 self.channels.signal(acl.handle(), data, &self.connections)?;
303 return Ok(());
304 }
305
306 trace!(
307 "[host] inbound l2cap header channel = {}, fragment len = {}, total = {}",
308 header.channel,
309 data.len(),
310 header.length
311 );
312
313 if header.length as usize != data.len() {
315 #[cfg(feature = "l2cap-sdu-reassembly-optimization")]
317 if header.channel >= L2CAP_CID_DYN_START {
318 self.channels.received(header.channel, 1)?;
320
321 self.connections.reassembly(acl.handle(), |p| {
322 let r = if !p.in_progress() {
323 let (first, payload) = data.split_at(2);
325 let len: u16 = u16::from_le_bytes([first[0], first[1]]);
326 let Some(packet) = P::allocate() else {
327 warn!("[host] no memory for packets on channel {}", header.channel);
328 return Err(Error::OutOfMemory);
329 };
330 p.init(header.channel, len, packet)?;
331 p.update(payload)?
332 } else {
333 p.update(data)?
334 };
335 if r.is_some() {
337 Err(Error::InvalidState)
338 } else {
339 Ok(())
340 }
341 })?;
342 return Ok(());
343 }
344
345 let Some(packet) = P::allocate() else {
346 warn!("[host] no memory for packets on channel {}", header.channel);
347 return Err(Error::OutOfMemory);
348 };
349 self.connections.reassembly(acl.handle(), |p| {
350 p.init(header.channel, header.length, packet)?;
351 let r = p.update(data)?;
352 if r.is_some() {
353 Err(Error::InvalidState)
354 } else {
355 Ok(())
356 }
357 })?;
358 return Ok(());
359 } else {
360 #[allow(unused_mut)]
361 let mut result = None;
362
363 #[cfg(feature = "l2cap-sdu-reassembly-optimization")]
364 if header.channel >= L2CAP_CID_DYN_START {
365 self.channels.received(header.channel, 1)?;
367
368 if let Some((state, pdu)) = self.connections.reassembly(acl.handle(), |p| {
369 if !p.in_progress() {
370 let (first, payload) = data.split_at(2);
371 let len: u16 = u16::from_le_bytes([first[0], first[1]]);
372
373 let Some(packet) = P::allocate() else {
374 warn!("[host] no memory for packets on channel {}", header.channel);
375 return Err(Error::OutOfMemory);
376 };
377 p.init(header.channel, len, packet)?;
378 p.update(payload)
379 } else {
380 p.update(data)
381 }
382 })? {
383 result.replace((state, pdu));
384 } else {
385 return Ok(());
386 }
387 }
388
389 if let Some((state, pdu)) = result {
390 (state, pdu)
391 } else {
392 let Some(packet) = P::allocate() else {
393 warn!("[host] no memory for packets on channel {}", header.channel);
394 return Err(Error::OutOfMemory);
395 };
396 let result = self.connections.reassembly(acl.handle(), |p| {
397 p.init(header.channel, header.length, packet)?;
398 p.update(data)
399 })?;
400 let Some((state, pdu)) = result else {
401 return Err(Error::InvalidState);
402 };
403 (state, pdu)
404 }
405 }
406 }
407 AclPacketBoundary::Continuing => {
409 trace!("[host] inbound l2cap len = {}", acl.data().len(),);
410 if let Some((header, p)) = self.connections.reassembly(acl.handle(), |p| {
412 if !p.in_progress() {
413 warn!(
414 "[host] unexpected continuation fragment of length {} for handle {}: {:?}",
415 acl.data().len(),
416 acl.handle().raw(),
417 p
418 );
419 return Err(Error::InvalidState);
420 }
421 p.update(acl.data())
422 })? {
423 (header, p)
424 } else {
425 return Ok(());
427 }
428 }
429 other => {
430 warn!("Unexpected boundary flag: {:?}!", other);
431 return Err(Error::NotSupported);
432 }
433 };
434
435 match header.channel {
436 L2CAP_CID_ATT => {
437 let a = att::Att::decode(pdu.as_ref());
440 if let Ok(att::Att::Client(AttClient::Request(att::AttReq::ExchangeMtu { mtu }))) = a {
441 let mtu = self.connections.exchange_att_mtu(acl.handle(), mtu);
442
443 let rsp = att::Att::Server(AttServer::Response(att::AttRsp::ExchangeMtu { mtu }));
444 let l2cap = L2capHeader {
445 channel: L2CAP_CID_ATT,
446 length: 3,
447 };
448
449 let mut packet = pdu.into_inner();
450 let mut w = WriteCursor::new(packet.as_mut());
451 w.write_hci(&l2cap)?;
452 w.write(rsp)?;
453
454 info!("[host] agreed att MTU of {}", mtu);
455 let len = w.len();
456 self.connections.try_outbound(acl.handle(), Pdu::new(packet, len))?;
457 } else if let Ok(att::Att::Server(AttServer::Response(att::AttRsp::ExchangeMtu { mtu }))) = a {
458 info!("[host] remote agreed att MTU of {}", mtu);
459 self.connections.exchange_att_mtu(acl.handle(), mtu);
460 } else {
461 #[cfg(feature = "gatt")]
462 match a {
463 Ok(att::Att::Client(_)) => {
464 self.connections.post_gatt(acl.handle(), pdu)?;
465 }
466 Ok(att::Att::Server(_)) => {
467 if let Err(e) = self.connections.post_gatt_client(acl.handle(), pdu) {
468 return Err(Error::OutOfMemory);
469 }
470 }
471 Err(e) => {
472 warn!("Error decoding attribute payload: {:?}", e);
473 }
474 }
475 #[cfg(not(feature = "gatt"))]
476 {
477 if let Ok(att::Att::Client(_)) = a {
478 drop(a);
479
480 let opcode = pdu.as_ref()[0];
481 let rsp = att::Att::Server(AttServer::Response(att::AttRsp::Error {
482 request: opcode,
483 handle: acl.handle().raw(),
484 code: att::AttErrorCode::ATTRIBUTE_NOT_FOUND,
485 }));
486
487 let mut packet = pdu.into_inner();
488 let mut w = WriteCursor::new(packet.as_mut());
489
490 let l2cap = L2capHeader {
491 channel: L2CAP_CID_ATT,
492 length: rsp.size() as u16,
493 };
494
495 w.write_hci(&l2cap)?;
496 w.write(rsp)?;
497
498 let len = w.len();
499 self.connections.try_outbound(acl.handle(), Pdu::new(packet, len))?;
500 warn!("[host] got attribute request but 'gatt' feature is not enabled.");
501 return Ok(());
502 } else {
503 warn!("Got unsupported ATT: {:?}", a);
504 return Err(Error::NotSupported);
505 }
506 }
507 }
508 }
509 L2CAP_CID_LE_U_SIGNAL => {
510 panic!("le signalling channel was fragmented, impossible!");
511 }
512 L2CAP_CID_LE_U_SECURITY_MANAGER => {
513 self.connections
514 .handle_security_channel(acl.handle(), pdu, event_handler)?;
515 }
516 other if other >= L2CAP_CID_DYN_START => match self.channels.dispatch(header.channel, pdu) {
517 Ok(_) => {}
518 Err(e) => {
519 warn!("Error dispatching l2cap packet to channel: {:?}", e);
520 return Err(e);
521 }
522 },
523 chan => {
524 debug!(
525 "[host] conn {:?} attempted to use unsupported l2cap channel {}, ignoring",
526 acl.handle(),
527 chan
528 );
529 return Ok(());
530 }
531 }
532 Ok(())
533 }
534
535 pub(crate) async fn l2cap_signal<D: L2capSignal>(
537 &self,
538 conn: ConnHandle,
539 identifier: u8,
540 signal: &D,
541 p_buf: &mut [u8],
542 ) -> Result<(), BleHostError<T::Error>> {
543 let header = L2capSignalHeader {
549 identifier,
550 code: D::code(),
551 length: signal.size() as u16,
552 };
553 let l2cap = L2capHeader {
554 channel: D::channel(),
555 length: header.size() as u16 + header.length,
556 };
557
558 let mut w = WriteCursor::new(p_buf);
559 w.write_hci(&l2cap)?;
560 w.write_hci(&header)?;
561 w.write_hci(signal)?;
562
563 let mut sender = self.l2cap(conn, w.len() as u16, 1).await?;
564 sender.send(w.finish()).await?;
565
566 Ok(())
567 }
568
569 pub(crate) async fn l2cap(
574 &self,
575 handle: ConnHandle,
576 len: u16,
577 n_packets: u16,
578 ) -> Result<L2capSender<'_, 'd, T, P::Packet>, BleHostError<T::Error>> {
579 let acl_max = self.initialized.get().await.acl_max as u16;
581 let len = len + (4 * n_packets);
582 let n_acl = len.div_ceil(acl_max);
583 let grant = poll_fn(|cx| self.connections.poll_request_to_send(handle, n_acl as usize, Some(cx))).await?;
584 trace!("[host] granted send packets = {}, len = {}", n_packets, len);
585 Ok(L2capSender {
586 controller: &self.controller,
587 handle,
588 grant,
589 fragment_size: acl_max,
590 })
591 }
592
593 pub(crate) fn try_l2cap(
598 &self,
599 handle: ConnHandle,
600 len: u16,
601 n_packets: u16,
602 ) -> Result<L2capSender<'_, 'd, T, P::Packet>, BleHostError<T::Error>> {
603 let acl_max = self.initialized.try_get().map(|i| i.acl_max).unwrap_or(27) as u16;
604 let len = len + (4 * n_packets);
605 let n_acl = len.div_ceil(acl_max);
606 let grant = match self.connections.poll_request_to_send(handle, n_acl as usize, None) {
607 Poll::Ready(res) => res?,
608 Poll::Pending => {
609 return Err(Error::Busy.into());
610 }
611 };
612 Ok(L2capSender {
613 controller: &self.controller,
614 handle,
615 grant,
616 fragment_size: acl_max,
617 })
618 }
619
620 pub(crate) async fn send_conn_param_update_req(
621 &self,
622 handle: ConnHandle,
623 param: &ConnParamUpdateReq,
624 ) -> Result<(), BleHostError<T::Error>> {
625 self.channels.send_conn_param_update_req(handle, self, param).await
626 }
627
628 pub(crate) async fn send_conn_param_update_res(
629 &self,
630 handle: ConnHandle,
631 param: &ConnParamUpdateRes,
632 ) -> Result<(), BleHostError<T::Error>> {
633 self.channels.send_conn_param_update_res(handle, self, param).await
634 }
635
636 pub(crate) fn metrics<F: FnOnce(&HostMetrics) -> R, R>(&self, f: F) -> R {
638 let m = self.metrics.borrow();
639 f(&m)
640 }
641
642 pub(crate) fn log_status(&self, verbose: bool) {
644 let m = self.metrics.borrow();
645 debug!("[host] connect events: {}", m.connect_events);
646 debug!("[host] disconnect events: {}", m.disconnect_events);
647 debug!("[host] rx errors: {}", m.rx_errors);
648 self.connections.log_status(verbose);
649 self.channels.log_status(verbose);
650 }
651}
652
653pub struct Runner<'d, C, P: PacketPool> {
655 rx: RxRunner<'d, C, P>,
656 control: ControlRunner<'d, C, P>,
657 tx: TxRunner<'d, C, P>,
658}
659
660pub struct RxRunner<'d, C, P: PacketPool> {
662 stack: &'d Stack<'d, C, P>,
663}
664
665pub struct ControlRunner<'d, C, P: PacketPool> {
667 stack: &'d Stack<'d, C, P>,
668}
669
670pub struct TxRunner<'d, C, P: PacketPool> {
672 stack: &'d Stack<'d, C, P>,
673}
674
675pub trait EventHandler {
677 fn on_vendor(&self, vendor: &Vendor) {}
679 #[cfg(feature = "scan")]
681 fn on_adv_reports(&self, reports: bt_hci::param::LeAdvReportsIter) {}
682 #[cfg(feature = "scan")]
684 fn on_ext_adv_reports(&self, reports: bt_hci::param::LeExtAdvReportsIter) {}
685}
686
687struct DummyHandler;
688impl EventHandler for DummyHandler {}
689
690impl<'d, C: Controller, P: PacketPool> Runner<'d, C, P> {
691 pub(crate) fn new(stack: &'d Stack<'d, C, P>) -> Self {
692 Self {
693 rx: RxRunner { stack },
694 control: ControlRunner { stack },
695 tx: TxRunner { stack },
696 }
697 }
698
699 pub fn split(self) -> (RxRunner<'d, C, P>, ControlRunner<'d, C, P>, TxRunner<'d, C, P>) {
701 (self.rx, self.control, self.tx)
702 }
703
704 pub async fn run(&mut self) -> Result<(), BleHostError<C::Error>>
706 where
707 C: ControllerCmdSync<Disconnect>
708 + ControllerCmdSync<SetEventMask>
709 + ControllerCmdSync<SetEventMaskPage2>
710 + ControllerCmdSync<LeSetEventMask>
711 + ControllerCmdSync<LeSetRandomAddr>
712 + ControllerCmdSync<HostBufferSize>
713 + ControllerCmdAsync<LeConnUpdate>
714 + ControllerCmdSync<LeReadFilterAcceptListSize>
715 + ControllerCmdSync<SetControllerToHostFlowControl>
716 + ControllerCmdSync<Reset>
717 + ControllerCmdSync<LeCreateConnCancel>
718 + ControllerCmdSync<LeSetScanEnable>
719 + ControllerCmdSync<LeSetExtScanEnable>
720 + for<'t> ControllerCmdSync<LeSetAdvEnable>
721 + for<'t> ControllerCmdSync<LeSetExtAdvEnable<'t>>
722 + for<'t> ControllerCmdSync<HostNumberOfCompletedPackets<'t>>
723 + ControllerCmdSync<LeReadBufferSize>
724 + ControllerCmdSync<LeLongTermKeyRequestReply>
725 + ControllerCmdAsync<LeEnableEncryption>
726 + ControllerCmdSync<ReadBdAddr>,
727 {
728 let dummy = DummyHandler;
729 self.run_with_handler(&dummy).await
730 }
731
732 pub async fn run_with_handler<E: EventHandler>(&mut self, event_handler: &E) -> Result<(), BleHostError<C::Error>>
734 where
735 C: ControllerCmdSync<Disconnect>
736 + ControllerCmdSync<SetEventMask>
737 + ControllerCmdSync<SetEventMaskPage2>
738 + ControllerCmdSync<LeSetEventMask>
739 + ControllerCmdSync<LeSetRandomAddr>
740 + ControllerCmdSync<LeReadFilterAcceptListSize>
741 + ControllerCmdSync<HostBufferSize>
742 + ControllerCmdAsync<LeConnUpdate>
743 + ControllerCmdSync<SetControllerToHostFlowControl>
744 + for<'t> ControllerCmdSync<LeSetAdvEnable>
745 + for<'t> ControllerCmdSync<LeSetExtAdvEnable<'t>>
746 + for<'t> ControllerCmdSync<HostNumberOfCompletedPackets<'t>>
747 + ControllerCmdSync<LeSetScanEnable>
748 + ControllerCmdSync<LeSetExtScanEnable>
749 + ControllerCmdSync<Reset>
750 + ControllerCmdSync<LeCreateConnCancel>
751 + ControllerCmdSync<LeReadBufferSize>
752 + ControllerCmdSync<LeLongTermKeyRequestReply>
753 + ControllerCmdAsync<LeEnableEncryption>
754 + ControllerCmdSync<ReadBdAddr>,
755 {
756 let control_fut = self.control.run();
757 let rx_fut = self.rx.run_with_handler(event_handler);
758 let tx_fut = self.tx.run();
759 pin_mut!(control_fut, rx_fut, tx_fut);
760 match select3(&mut tx_fut, &mut rx_fut, &mut control_fut).await {
761 Either3::First(result) => {
762 trace!("[host] tx_fut exit");
763 result
764 }
765 Either3::Second(result) => {
766 trace!("[host] rx_fut exit");
767 result
768 }
769 Either3::Third(result) => {
770 trace!("[host] control_fut exit");
771 result
772 }
773 }
774 }
775}
776
777impl<'d, C: Controller, P: PacketPool> RxRunner<'d, C, P> {
778 pub async fn run(&mut self) -> Result<(), BleHostError<C::Error>>
780 where
781 C: ControllerCmdSync<Disconnect>,
782 {
783 let dummy = DummyHandler;
784 self.run_with_handler(&dummy).await
785 }
786
787 pub async fn run_with_handler<E: EventHandler>(&mut self, event_handler: &E) -> Result<(), BleHostError<C::Error>>
790 where
791 C: ControllerCmdSync<Disconnect>,
792 {
793 const MAX_HCI_PACKET_LEN: usize = 259;
794 let host = &self.stack.host;
795 loop {
798 let mut rx = [0u8; MAX_HCI_PACKET_LEN];
800 let result = host.controller.read(&mut rx).await;
806 match result {
809 Ok(ControllerToHostPacket::Acl(acl)) => match host.handle_acl(acl, event_handler) {
810 Ok(_) => {}
811 Err(e) => {
812 warn!(
813 "[host] encountered error processing ACL data for {:?}: {:?}",
814 acl.handle(),
815 e
816 );
817
818 match e {
819 Error::InvalidState | Error::Disconnected => {
820 warn!("[host] requesting {:?} to be disconnected", acl.handle());
821 host.connections.log_status(true);
822 host.connections.request_handle_disconnect(
823 acl.handle(),
824 DisconnectReason::RemoteUserTerminatedConn,
825 );
826 }
827 _ => {}
828 }
829
830 let mut m = host.metrics.borrow_mut();
831 m.rx_errors = m.rx_errors.wrapping_add(1);
832 }
833 },
834 Ok(ControllerToHostPacket::Event(event)) => {
835 match event.kind {
836 EventKind::Le => {
837 let event = unwrap!(LeEventPacket::from_hci_bytes_complete(event.data));
838 match event.kind {
839 LeEventKind::LeConnectionComplete => {
840 let e = unwrap!(LeConnectionComplete::from_hci_bytes_complete(event.data));
841 if !host.handle_connection(
842 e.status,
843 e.handle,
844 e.peer_addr_kind,
845 e.peer_addr,
846 e.role,
847 ) {
848 let _ = host
849 .command(Disconnect::new(
850 e.handle,
851 DisconnectReason::RemoteDeviceTerminatedConnLowResources,
852 ))
853 .await;
854 host.connect_command_state.canceled();
855 }
856 }
857 LeEventKind::LeEnhancedConnectionComplete => {
858 let e = unwrap!(LeEnhancedConnectionComplete::from_hci_bytes_complete(event.data));
859 if !host.handle_connection(
860 e.status,
861 e.handle,
862 e.peer_addr_kind,
863 e.peer_addr,
864 e.role,
865 ) {
866 let _ = host
867 .command(Disconnect::new(
868 e.handle,
869 DisconnectReason::RemoteDeviceTerminatedConnLowResources,
870 ))
871 .await;
872 host.connect_command_state.canceled();
873 }
874 }
875 LeEventKind::LeScanTimeout => {}
876 LeEventKind::LeAdvertisingSetTerminated => {
877 let set = unwrap!(LeAdvertisingSetTerminated::from_hci_bytes_complete(event.data));
878 host.advertise_state.terminate(set.adv_handle);
879 }
880 LeEventKind::LeExtendedAdvertisingReport => {
881 #[cfg(feature = "scan")]
882 {
883 let data =
884 unwrap!(LeExtendedAdvertisingReport::from_hci_bytes_complete(event.data));
885 event_handler.on_ext_adv_reports(data.reports.iter());
886 }
887 }
888 LeEventKind::LeAdvertisingReport => {
889 #[cfg(feature = "scan")]
890 {
891 let data = unwrap!(LeAdvertisingReport::from_hci_bytes_complete(event.data));
892 event_handler.on_adv_reports(data.reports.iter());
893 }
894 }
895 LeEventKind::LeLongTermKeyRequest => {
896 host.connections.handle_security_hci_le_event(event)?;
897 }
898 LeEventKind::LePhyUpdateComplete => {
899 let event = unwrap!(LePhyUpdateComplete::from_hci_bytes_complete(event.data));
900 if let Err(e) = event.status.to_result() {
901 warn!("[host] error updating phy for {:?}: {:?}", event.handle, e);
902 } else {
903 let _ = host.connections.post_handle_event(
904 event.handle,
905 ConnectionEvent::PhyUpdated {
906 tx_phy: event.tx_phy,
907 rx_phy: event.rx_phy,
908 },
909 );
910 }
911 }
912 LeEventKind::LeConnectionUpdateComplete => {
913 let event =
914 unwrap!(LeConnectionUpdateComplete::from_hci_bytes_complete(event.data));
915 if let Err(e) = event.status.to_result() {
916 warn!(
917 "[host] error updating connection parameters for {:?}: {:?}",
918 event.handle, e
919 );
920 } else {
921 let _ = host.connections.post_handle_event(
922 event.handle,
923 ConnectionEvent::ConnectionParamsUpdated {
924 conn_interval: Duration::from_micros(event.conn_interval.as_micros()),
925 peripheral_latency: event.peripheral_latency,
926 supervision_timeout: Duration::from_micros(
927 event.supervision_timeout.as_micros(),
928 ),
929 },
930 );
931 }
932 }
933 LeEventKind::LeDataLengthChange => {
934 let event = unwrap!(LeDataLengthChange::from_hci_bytes_complete(event.data));
935 let _ = host.connections.post_handle_event(
936 event.handle,
937 ConnectionEvent::DataLengthUpdated {
938 max_tx_octets: event.max_tx_octets,
939 max_tx_time: event.max_tx_time,
940 max_rx_octets: event.max_rx_octets,
941 max_rx_time: event.max_rx_time,
942 },
943 );
944 }
945 LeEventKind::LeRemoteConnectionParameterRequest => {
946 let event = unwrap!(LeRemoteConnectionParameterRequest::from_hci_bytes_complete(
947 event.data
948 ));
949 let _ = host.connections.post_handle_event(
950 event.handle,
951 ConnectionEvent::RequestConnectionParams {
952 min_connection_interval: Duration::from_micros(
953 event.interval_min.as_micros(),
954 ),
955 max_connection_interval: Duration::from_micros(
956 event.interval_min.as_micros(),
957 ),
958 max_latency: event.max_latency,
959 supervision_timeout: Duration::from_micros(event.timeout.as_micros()),
960 },
961 );
962 }
963 _ => {
964 warn!("Unknown LE event!");
965 }
966 }
967 }
968 EventKind::DisconnectionComplete => {
969 let e = unwrap!(DisconnectionComplete::from_hci_bytes_complete(event.data));
970 let handle = e.handle;
971 let reason = if let Err(e) = e.status.to_result() {
972 info!("[host] disconnection event on handle {}, status: {:?}", handle.raw(), e);
973 None
974 } else if let Err(err) = e.reason.to_result() {
975 info!(
976 "[host] disconnection event on handle {}, reason: {:?}",
977 handle.raw(),
978 err
979 );
980 Some(e.reason)
981 } else {
982 info!("[host] disconnection event on handle {}", handle.raw());
983 None
984 }
985 .unwrap_or(Status::UNSPECIFIED);
986 let _ = host.connections.disconnected(handle, reason);
987 let _ = host.channels.disconnected(handle);
988 let mut m = host.metrics.borrow_mut();
989 m.disconnect_events = m.disconnect_events.wrapping_add(1);
990 }
991 EventKind::NumberOfCompletedPackets => {
992 let c = unwrap!(NumberOfCompletedPackets::from_hci_bytes_complete(event.data));
993 for entry in c.completed_packets.iter() {
995 match (entry.handle(), entry.num_completed_packets()) {
996 (Ok(handle), Ok(completed)) => {
997 let _ = host.connections.confirm_sent(handle, completed as usize);
998 }
999 (Ok(handle), Err(e)) => {
1000 warn!("[host] error processing completed packets for {:?}: {:?}", handle, e);
1001 }
1002 _ => {}
1003 }
1004 }
1005 }
1006 EventKind::Vendor => {
1007 let vendor = unwrap!(Vendor::from_hci_bytes_complete(event.data));
1008 event_handler.on_vendor(&vendor);
1009 }
1010 EventKind::EncryptionChangeV1 => {
1011 host.connections.handle_security_hci_event(event)?;
1012 }
1013 _ => {}
1015 }
1016 }
1017 Ok(_) => {}
1019 Err(e) => {
1020 return Err(BleHostError::Controller(e));
1021 }
1022 }
1023 }
1024 }
1025}
1026
1027impl<'d, C: Controller, P: PacketPool> ControlRunner<'d, C, P> {
1028 pub async fn run(&mut self) -> Result<(), BleHostError<C::Error>>
1030 where
1031 C: ControllerCmdSync<Disconnect>
1032 + ControllerCmdSync<SetEventMask>
1033 + ControllerCmdSync<SetEventMaskPage2>
1034 + ControllerCmdSync<LeSetEventMask>
1035 + ControllerCmdSync<LeSetRandomAddr>
1036 + ControllerCmdSync<HostBufferSize>
1037 + ControllerCmdAsync<LeConnUpdate>
1038 + ControllerCmdSync<LeReadFilterAcceptListSize>
1039 + ControllerCmdSync<SetControllerToHostFlowControl>
1040 + ControllerCmdSync<Reset>
1041 + ControllerCmdSync<LeCreateConnCancel>
1042 + for<'t> ControllerCmdSync<LeSetAdvEnable>
1043 + for<'t> ControllerCmdSync<LeSetExtAdvEnable<'t>>
1044 + ControllerCmdSync<LeSetScanEnable>
1045 + ControllerCmdSync<LeSetExtScanEnable>
1046 + for<'t> ControllerCmdSync<HostNumberOfCompletedPackets<'t>>
1047 + ControllerCmdSync<LeReadBufferSize>
1048 + ControllerCmdSync<LeLongTermKeyRequestReply>
1049 + ControllerCmdAsync<LeEnableEncryption>
1050 + ControllerCmdSync<ReadBdAddr>,
1051 {
1052 let host = &self.stack.host;
1053 Reset::new().exec(&host.controller).await?;
1054
1055 if let Some(addr) = host.address {
1056 LeSetRandomAddr::new(addr.addr).exec(&host.controller).await?;
1057 }
1058
1059 SetEventMask::new(
1060 EventMask::new()
1061 .enable_le_meta(true)
1062 .enable_conn_request(true)
1063 .enable_conn_complete(true)
1064 .enable_hardware_error(true)
1065 .enable_disconnection_complete(true)
1066 .enable_encryption_change_v1(true),
1067 )
1068 .exec(&host.controller)
1069 .await?;
1070
1071 SetEventMaskPage2::new(EventMaskPage2::new().enable_encryption_change_v2(true))
1072 .exec(&host.controller)
1073 .await?;
1074
1075 let mask = LeEventMask::new()
1076 .enable_le_conn_complete(true)
1077 .enable_le_enhanced_conn_complete(true)
1078 .enable_le_conn_update_complete(true)
1079 .enable_le_adv_set_terminated(true)
1080 .enable_le_adv_report(true)
1081 .enable_le_scan_timeout(true)
1082 .enable_le_ext_adv_report(true)
1083 .enable_le_long_term_key_request(true)
1084 .enable_le_phy_update_complete(true)
1085 .enable_le_data_length_change(true);
1086
1087 #[cfg(feature = "connection-params-update")]
1088 let mask = mask.enable_le_remote_conn_parameter_request(true);
1089
1090 LeSetEventMask::new(mask).exec(&host.controller).await?;
1091
1092 info!(
1093 "[host] using packet pool with MTU {} capacity {}",
1094 P::MTU,
1095 P::capacity(),
1096 );
1097
1098 let ret = LeReadFilterAcceptListSize::new().exec(&host.controller).await?;
1099 info!("[host] filter accept list size: {}", ret);
1100
1101 let ret = LeReadBufferSize::new().exec(&host.controller).await?;
1102 info!(
1103 "[host] setting txq to {}, fragmenting at {}",
1104 ret.total_num_le_acl_data_packets as usize, ret.le_acl_data_packet_length as usize
1105 );
1106 host.connections
1107 .set_link_credits(ret.total_num_le_acl_data_packets as usize);
1108
1109 const ACL_LEN: u16 = 255;
1110 const ACL_N: u16 = 1;
1111 info!(
1112 "[host] configuring host buffers ({} packets of size {})",
1113 ACL_N, ACL_LEN,
1114 );
1115 HostBufferSize::new(ACL_LEN, 0, ACL_N, 0).exec(&host.controller).await?;
1116
1117 let _ = host.initialized.init(InitialState {
1128 acl_max: ret.le_acl_data_packet_length as usize,
1129 });
1130 info!("[host] initialized");
1131
1132 let device_address = host.command(ReadBdAddr::new()).await?;
1133 if *device_address.raw() != [0, 0, 0, 0, 0, 0] {
1134 let device_address = Address {
1135 kind: AddrKind::PUBLIC,
1136 addr: device_address,
1137 };
1138 info!("[host] Device Address {}", device_address);
1139 if host.address.is_none() {
1140 #[cfg(feature = "security")]
1141 host.connections.security_manager.set_local_address(device_address);
1142 }
1143 }
1144
1145 loop {
1146 match select3(
1147 poll_fn(|cx| host.connections.poll_disconnecting(Some(cx))),
1148 poll_fn(|cx| host.channels.poll_disconnecting(Some(cx))),
1149 select4(
1150 poll_fn(|cx| host.connect_command_state.poll_cancelled(cx)),
1151 poll_fn(|cx| host.advertise_command_state.poll_cancelled(cx)),
1152 poll_fn(|cx| host.scan_command_state.poll_cancelled(cx)),
1153 #[cfg(feature = "security")]
1154 {
1155 host.connections.poll_security_events()
1156 },
1157 #[cfg(not(feature = "security"))]
1158 {
1159 poll_fn(|cx| Poll::<()>::Pending)
1160 },
1161 ),
1162 )
1163 .await
1164 {
1165 Either3::First(request) => {
1166 trace!("[host] poll disconnecting links");
1167 match host.command(Disconnect::new(request.handle(), request.reason())).await {
1168 Ok(_) => {}
1169 Err(BleHostError::BleHost(Error::Hci(bt_hci::param::Error::UNKNOWN_CONN_IDENTIFIER))) => {}
1170 Err(e) => {
1171 return Err(e);
1172 }
1173 }
1174 request.confirm();
1175 }
1176 Either3::Second(request) => {
1177 trace!("[host] poll disconnecting channels");
1178 match request.send(host).await {
1179 Ok(_) => {}
1180 Err(BleHostError::BleHost(Error::Hci(bt_hci::param::Error::UNKNOWN_CONN_IDENTIFIER))) => {}
1181 Err(BleHostError::BleHost(Error::NotFound)) => {}
1182 Err(e) => {
1183 return Err(e);
1184 }
1185 }
1186 request.confirm();
1187 }
1188 Either3::Third(states) => match states {
1189 Either4::First(_) => {
1190 trace!("[host] cancel connection create");
1191 if host.command(LeCreateConnCancel::new()).await.is_err() {
1193 warn!("[host] error cancelling connection");
1194 }
1195 host.connect_command_state.canceled();
1197 }
1198 Either4::Second(ext) => {
1199 trace!("[host] disabling advertising");
1200 if ext {
1201 host.command(LeSetExtAdvEnable::new(false, &[])).await?
1202 } else {
1203 host.command(LeSetAdvEnable::new(false)).await?
1204 }
1205 host.advertise_command_state.canceled();
1206 }
1207 Either4::Third(ext) => {
1208 trace!("[host] disabling scanning");
1209 if ext {
1210 host.command(LeSetExtScanEnable::new(
1212 false,
1213 FilterDuplicates::Disabled,
1214 bt_hci::param::Duration::from_secs(0),
1215 bt_hci::param::Duration::from_secs(0),
1216 ))
1217 .await?;
1218 } else {
1219 host.command(LeSetScanEnable::new(false, false)).await?;
1220 }
1221 host.scan_command_state.canceled();
1222 }
1223 Either4::Fourth(request) => {
1224 #[cfg(feature = "security")]
1225 {
1226 let event_data = request.unwrap_or(SecurityEventData::Timeout);
1227 host.connections.handle_security_event(host, event_data).await?;
1228 }
1229 }
1230 },
1231 }
1232 }
1233 }
1234}
1235
1236impl<'d, C: Controller, P: PacketPool> TxRunner<'d, C, P> {
1237 pub async fn run(&mut self) -> Result<(), BleHostError<C::Error>> {
1239 let host = &self.stack.host;
1240 let params = host.initialized.get().await;
1241 loop {
1242 let (conn, pdu) = host.connections.outbound().await;
1243 match host.l2cap(conn, pdu.len() as u16, 1).await {
1244 Ok(mut sender) => {
1245 if let Err(e) = sender.send(pdu.as_ref()).await {
1246 warn!("[host] error sending outbound pdu");
1247 return Err(e);
1248 }
1249 }
1250 Err(BleHostError::BleHost(Error::NotFound)) => {
1251 warn!("[host] unable to send data to disconnected host (ignored)");
1252 }
1253 Err(BleHostError::BleHost(Error::Disconnected)) => {
1254 warn!("[host] unable to send data to disconnected host (ignored)");
1255 }
1256 Err(e) => {
1257 warn!("[host] error requesting sending outbound pdu");
1258 return Err(e);
1259 }
1260 }
1261 }
1262 }
1263}
1264
1265pub struct L2capSender<'a, 'd, T: Controller, P> {
1266 pub(crate) controller: &'a T,
1267 pub(crate) handle: ConnHandle,
1268 pub(crate) grant: PacketGrant<'a, 'd, P>,
1269 pub(crate) fragment_size: u16,
1270}
1271
1272impl<'a, 'd, T: Controller, P> L2capSender<'a, 'd, T, P> {
1273 pub(crate) fn try_send(&mut self, pdu: &[u8]) -> Result<(), BleHostError<T::Error>>
1274 where
1275 T: blocking::Controller,
1276 {
1277 let mut pbf = AclPacketBoundary::FirstNonFlushable;
1278 for chunk in pdu.chunks(self.fragment_size as usize) {
1284 let acl = AclPacket::new(self.handle, pbf, AclBroadcastFlag::PointToPoint, chunk);
1285 match self.controller.try_write_acl_data(&acl) {
1286 Ok(result) => {
1287 self.grant.confirm(1);
1288 trace!("[host] sent acl packet len = {}", chunk.len());
1289 }
1290 Err(blocking::TryError::Busy) => {
1291 warn!("hci: acl data send busy");
1292 return Err(Error::Busy.into());
1293 }
1294 Err(blocking::TryError::Error(e)) => return Err(BleHostError::Controller(e)),
1295 }
1296 pbf = AclPacketBoundary::Continuing;
1297 }
1298 Ok(())
1299 }
1300
1301 pub(crate) async fn send(&mut self, pdu: &[u8]) -> Result<(), BleHostError<T::Error>> {
1302 let mut pbf = AclPacketBoundary::FirstNonFlushable;
1308 for chunk in pdu.chunks(self.fragment_size as usize) {
1309 let acl = AclPacket::new(self.handle, pbf, AclBroadcastFlag::PointToPoint, chunk);
1310 self.controller
1311 .write_acl_data(&acl)
1312 .await
1313 .map_err(BleHostError::Controller)?;
1314 self.grant.confirm(1);
1315 pbf = AclPacketBoundary::Continuing;
1316 trace!("[host] sent acl packet len = {}", chunk.len());
1317 }
1318 Ok(())
1319 }
1320}
1321
1322#[must_use = "to delay the drop handler invocation to the end of the scope"]
1324pub struct OnDrop<F: FnOnce()> {
1325 f: MaybeUninit<F>,
1326}
1327
1328impl<F: FnOnce()> OnDrop<F> {
1329 pub fn new(f: F) -> Self {
1331 Self { f: MaybeUninit::new(f) }
1332 }
1333
1334 pub fn defuse(self) {
1336 core::mem::forget(self)
1337 }
1338}
1339
1340impl<F: FnOnce()> Drop for OnDrop<F> {
1341 fn drop(&mut self) {
1342 unsafe { self.f.as_ptr().read()() }
1343 }
1344}