1use crate::transport::TransportError;
8
9use super::addr::BleAddr;
10
11pub trait BleStream: Send + Sync {
17 fn send(
19 &self,
20 data: &[u8],
21 ) -> impl std::future::Future<Output = Result<(), TransportError>> + Send;
22
23 fn recv(
27 &self,
28 buf: &mut [u8],
29 ) -> impl std::future::Future<Output = Result<usize, TransportError>> + Send;
30
31 fn send_mtu(&self) -> u16;
33
34 fn recv_mtu(&self) -> u16;
36
37 fn remote_addr(&self) -> &BleAddr;
39}
40
41pub trait BleAcceptor: Send {
43 type Stream: BleStream + 'static;
45
46 fn accept(
48 &mut self,
49 ) -> impl std::future::Future<Output = Result<Self::Stream, TransportError>> + Send;
50}
51
52pub trait BleScanner: Send {
54 fn next(&mut self) -> impl std::future::Future<Output = Option<BleAddr>> + Send;
58}
59
60pub trait BleIo: Send + Sync + 'static {
66 type Stream: BleStream + 'static;
68 type Acceptor: BleAcceptor<Stream = Self::Stream> + 'static;
70 type Scanner: BleScanner + 'static;
72
73 fn listen(
75 &self,
76 psm: u16,
77 ) -> impl std::future::Future<Output = Result<Self::Acceptor, TransportError>> + Send;
78
79 fn connect(
81 &self,
82 addr: &BleAddr,
83 psm: u16,
84 ) -> impl std::future::Future<Output = Result<Self::Stream, TransportError>> + Send;
85
86 fn start_advertising(
88 &self,
89 ) -> impl std::future::Future<Output = Result<(), TransportError>> + Send;
90
91 fn stop_advertising(
93 &self,
94 ) -> impl std::future::Future<Output = Result<(), TransportError>> + Send;
95
96 fn start_scanning(
98 &self,
99 ) -> impl std::future::Future<Output = Result<Self::Scanner, TransportError>> + Send;
100
101 fn local_addr(&self) -> Result<BleAddr, TransportError>;
103
104 fn adapter_name(&self) -> &str;
106}
107
108#[cfg(bluer_available)]
113mod bluer_impl {
114 use super::*;
115 use crate::transport::TransportError;
116
117 use bluer::l2cap::{SeqPacket, SeqPacketListener, Socket, SocketAddr};
118 use bluer::{
119 AdapterEvent, AddressType, DiscoveryFilter, DiscoveryTransport, adv::Advertisement,
120 };
121 use futures::StreamExt;
122 use std::collections::{BTreeSet, HashSet};
123 use std::pin::Pin;
124 use tokio::sync::Mutex;
125 use tracing::{debug, trace};
126
127 pub const FIPS_SERVICE_UUID: bluer::Uuid =
132 bluer::Uuid::from_u128(0x9c90_b790_2cc5_42c0_9f87_c9cc_4064_8f4c);
133
134 fn map_err(context: &str, e: bluer::Error) -> TransportError {
136 TransportError::Io(std::io::Error::other(format!("{}: {}", context, e)))
137 }
138
139 fn map_io_err(context: &str, e: std::io::Error) -> TransportError {
141 TransportError::Io(std::io::Error::new(e.kind(), format!("{}: {}", context, e)))
142 }
143
144 pub struct BluerStream {
150 conn: SeqPacket,
151 remote: BleAddr,
152 send_mtu: u16,
153 recv_mtu: u16,
154 }
155
156 impl BluerStream {
157 pub fn new(conn: SeqPacket, remote: BleAddr) -> Result<Self, TransportError> {
159 let send_mtu = conn.send_mtu().map_err(|e| map_io_err("send_mtu", e))? as u16;
160 let recv_mtu = conn.recv_mtu().map_err(|e| map_io_err("recv_mtu", e))? as u16;
161
162 match conn.as_ref().phy() {
164 Ok(phy) => {
165 debug!(addr = %remote, phy, send_mtu, recv_mtu, "BLE connection established")
166 }
167 Err(_) => {
168 debug!(addr = %remote, send_mtu, recv_mtu, "BLE connection established (PHY query unsupported)")
169 }
170 }
171
172 Ok(Self {
173 conn,
174 remote,
175 send_mtu,
176 recv_mtu,
177 })
178 }
179 }
180
181 impl BleStream for BluerStream {
182 async fn send(&self, data: &[u8]) -> Result<(), TransportError> {
183 self.conn
184 .send(data)
185 .await
186 .map(|_| ())
187 .map_err(|e| TransportError::SendFailed(format!("{}", e)))
188 }
189
190 async fn recv(&self, buf: &mut [u8]) -> Result<usize, TransportError> {
191 self.conn
192 .recv(buf)
193 .await
194 .map_err(|e| TransportError::RecvFailed(format!("{}", e)))
195 }
196
197 fn send_mtu(&self) -> u16 {
198 self.send_mtu
199 }
200
201 fn recv_mtu(&self) -> u16 {
202 self.recv_mtu
203 }
204
205 fn remote_addr(&self) -> &BleAddr {
206 &self.remote
207 }
208 }
209
210 pub struct BluerAcceptor {
216 listener: SeqPacketListener,
217 adapter_name: String,
218 }
219
220 impl BleAcceptor for BluerAcceptor {
221 type Stream = BluerStream;
222
223 async fn accept(&mut self) -> Result<BluerStream, TransportError> {
224 let (conn, peer_sa) = self
225 .listener
226 .accept()
227 .await
228 .map_err(|e| map_io_err("accept", e))?;
229
230 let remote = BleAddr::from_bluer(peer_sa.addr, &self.adapter_name);
231 BluerStream::new(conn, remote)
232 }
233 }
234
235 pub struct BluerScanner {
241 events: Pin<Box<dyn futures::Stream<Item = AdapterEvent> + Send>>,
242 adapter: bluer::Adapter,
243 adapter_name: String,
244 }
245
246 impl BleScanner for BluerScanner {
247 async fn next(&mut self) -> Option<BleAddr> {
248 loop {
249 match self.events.next().await {
250 Some(AdapterEvent::DeviceAdded(addr)) => {
251 if let Ok(device) = self.adapter.device(addr) {
253 match device.uuids().await {
254 Ok(Some(uuids)) if uuids.contains(&FIPS_SERVICE_UUID) => {
255 let ble_addr = BleAddr::from_bluer(addr, &self.adapter_name);
256 debug!(addr = %ble_addr, "BLE scanner: FIPS peer found");
257 return Some(ble_addr);
258 }
259 Ok(_) => {
260 trace!(addr = %addr, "BLE scanner: device without FIPS UUID");
261 }
262 Err(e) => {
263 trace!(addr = %addr, error = %e, "BLE scanner: failed to read UUIDs");
264 }
265 }
266 }
267 }
268 Some(_) => continue,
269 None => return None,
270 }
271 }
272 }
273 }
274
275 pub struct BluerIo {
281 #[allow(dead_code)] session: bluer::Session,
283 adapter: bluer::Adapter,
284 adapter_name: String,
285 adv_handle: Mutex<Option<bluer::adv::AdvertisementHandle>>,
286 mtu: u16,
287 }
288
289 impl BluerIo {
290 pub async fn new(adapter_name: &str, mtu: u16) -> Result<Self, TransportError> {
294 let session = bluer::Session::new()
295 .await
296 .map_err(|e| map_err("Session::new", e))?;
297
298 let adapter = if adapter_name == "default" {
299 session
300 .default_adapter()
301 .await
302 .map_err(|e| map_err("default_adapter", e))?
303 } else {
304 session
305 .adapter(adapter_name)
306 .map_err(|e| map_err("adapter", e))?
307 };
308
309 adapter
310 .set_powered(true)
311 .await
312 .map_err(|e| map_err("set_powered", e))?;
313
314 let name = adapter.name().to_string();
315 debug!(adapter = %name, "BluerIo initialized");
316
317 Ok(Self {
318 session,
319 adapter,
320 adapter_name: name,
321 adv_handle: Mutex::new(None),
322 mtu,
323 })
324 }
325 }
326
327 impl BleIo for BluerIo {
328 type Stream = BluerStream;
329 type Acceptor = BluerAcceptor;
330 type Scanner = BluerScanner;
331
332 async fn listen(&self, psm: u16) -> Result<Self::Acceptor, TransportError> {
333 let local_addr = self
334 .adapter
335 .address()
336 .await
337 .map_err(|e| map_err("address", e))?;
338
339 let sa = SocketAddr::new(local_addr, AddressType::LePublic, psm);
340 let listener = SeqPacketListener::bind(sa)
341 .await
342 .map_err(|e| map_io_err("bind", e))?;
343
344 listener
346 .as_ref()
347 .set_recv_mtu(self.mtu)
348 .map_err(|e| map_io_err("set_recv_mtu", e))?;
349
350 if let Err(e) = listener.as_ref().set_power_forced_active(true) {
352 debug!(error = %e, "BLE listener: set_power_forced_active not supported");
353 }
354
355 debug!(psm, mtu = self.mtu, "BLE listener bound");
356
357 Ok(BluerAcceptor {
358 listener,
359 adapter_name: self.adapter_name.clone(),
360 })
361 }
362
363 async fn connect(&self, addr: &BleAddr, psm: u16) -> Result<Self::Stream, TransportError> {
364 let target_sa = addr.to_socket_addr(psm);
365
366 let socket = Socket::<SeqPacket>::new_seq_packet()
367 .map_err(|e| map_io_err("new_seq_packet", e))?;
368 socket
369 .bind(SocketAddr::any_le())
370 .map_err(|e| map_io_err("bind", e))?;
371 socket
372 .set_recv_mtu(self.mtu)
373 .map_err(|e| map_io_err("set_recv_mtu", e))?;
374
375 if let Err(e) = socket.set_power_forced_active(true) {
377 debug!(error = %e, "BLE connect: set_power_forced_active not supported");
378 }
379
380 let conn = socket
381 .connect(target_sa)
382 .await
383 .map_err(|e| map_io_err("connect", e))?;
384
385 let remote = addr.clone();
386 BluerStream::new(conn, remote)
387 }
388
389 async fn start_advertising(&self) -> Result<(), TransportError> {
390 let adv = Advertisement {
391 advertisement_type: bluer::adv::Type::Peripheral,
392 service_uuids: {
393 let mut s = BTreeSet::new();
394 s.insert(FIPS_SERVICE_UUID);
395 s
396 },
397 local_name: Some("fips".to_string()),
398 min_interval: Some(std::time::Duration::from_millis(400)),
399 max_interval: Some(std::time::Duration::from_millis(600)),
400 ..Default::default()
401 };
402
403 let handle = self
404 .adapter
405 .advertise(adv)
406 .await
407 .map_err(|e| map_err("advertise", e))?;
408
409 *self.adv_handle.lock().await = Some(handle);
410 debug!("BLE advertising started");
411 Ok(())
412 }
413
414 async fn stop_advertising(&self) -> Result<(), TransportError> {
415 let _ = self.adv_handle.lock().await.take();
416 debug!("BLE advertising stopped");
417 Ok(())
418 }
419
420 async fn start_scanning(&self) -> Result<Self::Scanner, TransportError> {
421 if let Ok(cached) = self.adapter.device_addresses().await {
427 let count = cached.len();
428 for addr in cached {
429 let _ = self.adapter.remove_device(addr).await;
430 }
431 if count > 0 {
432 debug!(count, "BLE scanner: cleared cached devices");
433 }
434 }
435
436 let filter = DiscoveryFilter {
438 transport: DiscoveryTransport::Le,
439 uuids: {
440 let mut s = HashSet::new();
441 s.insert(FIPS_SERVICE_UUID);
442 s
443 },
444 ..Default::default()
445 };
446
447 self.adapter
448 .set_discovery_filter(filter)
449 .await
450 .map_err(|e| map_err("set_discovery_filter", e))?;
451
452 let events = self
453 .adapter
454 .discover_devices()
455 .await
456 .map_err(|e| map_err("discover_devices", e))?;
457
458 debug!("BLE scanning started");
459
460 Ok(BluerScanner {
461 events: Box::pin(events),
462 adapter: self.adapter.clone(),
463 adapter_name: self.adapter_name.clone(),
464 })
465 }
466
467 fn local_addr(&self) -> Result<BleAddr, TransportError> {
468 let addr = futures::executor::block_on(self.adapter.address())
472 .map_err(|e| map_err("address", e))?;
473 Ok(BleAddr::from_bluer(addr, &self.adapter_name))
474 }
475
476 fn adapter_name(&self) -> &str {
477 &self.adapter_name
478 }
479 }
480
481 #[allow(dead_code)]
483 fn _assert_bluer_io_send_sync() {
484 fn require<T: Send + Sync>() {}
485 require::<BluerIo>();
486 }
487}
488
489#[cfg(bluer_available)]
490pub use bluer_impl::{BluerAcceptor, BluerIo, BluerScanner, BluerStream, FIPS_SERVICE_UUID};
491
492pub struct MockBleStream {
498 addr: BleAddr,
499 send_mtu: u16,
500 recv_mtu: u16,
501 tx: tokio::sync::mpsc::Sender<Vec<u8>>,
502 rx: tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Vec<u8>>>,
503}
504
505impl MockBleStream {
506 pub fn pair(addr_a: BleAddr, addr_b: BleAddr, mtu: u16) -> (Self, Self) {
508 let (tx_a, rx_a) = tokio::sync::mpsc::channel(64);
509 let (tx_b, rx_b) = tokio::sync::mpsc::channel(64);
510 let stream_a = Self {
511 addr: addr_b.clone(),
512 send_mtu: mtu,
513 recv_mtu: mtu,
514 tx: tx_a,
515 rx: tokio::sync::Mutex::new(rx_b),
516 };
517 let stream_b = Self {
518 addr: addr_a,
519 send_mtu: mtu,
520 recv_mtu: mtu,
521 tx: tx_b,
522 rx: tokio::sync::Mutex::new(rx_a),
523 };
524 (stream_a, stream_b)
525 }
526}
527
528impl BleStream for MockBleStream {
529 async fn send(&self, data: &[u8]) -> Result<(), TransportError> {
530 self.tx
531 .send(data.to_vec())
532 .await
533 .map_err(|_| TransportError::SendFailed("channel closed".into()))
534 }
535
536 async fn recv(&self, buf: &mut [u8]) -> Result<usize, TransportError> {
537 let mut rx = self.rx.lock().await;
538 match rx.recv().await {
539 Some(data) => {
540 let len = data.len().min(buf.len());
541 buf[..len].copy_from_slice(&data[..len]);
542 Ok(len)
543 }
544 None => Ok(0), }
546 }
547
548 fn send_mtu(&self) -> u16 {
549 self.send_mtu
550 }
551
552 fn recv_mtu(&self) -> u16 {
553 self.recv_mtu
554 }
555
556 fn remote_addr(&self) -> &BleAddr {
557 &self.addr
558 }
559}
560
561pub struct MockBleAcceptor {
563 rx: tokio::sync::mpsc::Receiver<MockBleStream>,
564}
565
566impl BleAcceptor for MockBleAcceptor {
567 type Stream = MockBleStream;
568
569 async fn accept(&mut self) -> Result<MockBleStream, TransportError> {
570 self.rx
571 .recv()
572 .await
573 .ok_or(TransportError::RecvFailed("acceptor channel closed".into()))
574 }
575}
576
577pub struct MockBleScanner {
579 rx: tokio::sync::mpsc::Receiver<BleAddr>,
580}
581
582impl BleScanner for MockBleScanner {
583 async fn next(&mut self) -> Option<BleAddr> {
584 self.rx.recv().await
585 }
586}
587
588type ConnectHandler =
590 Box<dyn Fn(&BleAddr, u16) -> Result<MockBleStream, TransportError> + Send + Sync>;
591
592pub struct MockBleIo {
597 adapter: String,
598 local_addr: BleAddr,
599 accept_tx: tokio::sync::mpsc::Sender<MockBleStream>,
600 accept_rx: std::sync::Mutex<Option<tokio::sync::mpsc::Receiver<MockBleStream>>>,
601 scan_tx: tokio::sync::mpsc::Sender<BleAddr>,
602 scan_rx: std::sync::Mutex<Option<tokio::sync::mpsc::Receiver<BleAddr>>>,
603 connect_handler: std::sync::Mutex<Option<ConnectHandler>>,
604}
605
606impl MockBleIo {
607 pub fn new(adapter: &str, local_addr: BleAddr) -> Self {
609 let (accept_tx, accept_rx) = tokio::sync::mpsc::channel(16);
610 let (scan_tx, scan_rx) = tokio::sync::mpsc::channel(64);
611 Self {
612 adapter: adapter.to_string(),
613 local_addr,
614 accept_tx,
615 accept_rx: std::sync::Mutex::new(Some(accept_rx)),
616 scan_tx,
617 scan_rx: std::sync::Mutex::new(Some(scan_rx)),
618 connect_handler: std::sync::Mutex::new(None),
619 }
620 }
621
622 pub async fn inject_inbound(&self, stream: MockBleStream) {
624 let _ = self.accept_tx.send(stream).await;
625 }
626
627 pub async fn inject_scan_result(&self, addr: BleAddr) {
629 let _ = self.scan_tx.send(addr).await;
630 }
631
632 pub fn set_connect_handler<F>(&self, handler: F)
634 where
635 F: Fn(&BleAddr, u16) -> Result<MockBleStream, TransportError> + Send + Sync + 'static,
636 {
637 *self.connect_handler.lock().unwrap() = Some(Box::new(handler));
638 }
639}
640
641impl BleIo for MockBleIo {
642 type Stream = MockBleStream;
643 type Acceptor = MockBleAcceptor;
644 type Scanner = MockBleScanner;
645
646 async fn listen(&self, _psm: u16) -> Result<Self::Acceptor, TransportError> {
647 let rx = self
648 .accept_rx
649 .lock()
650 .unwrap()
651 .take()
652 .ok_or_else(|| TransportError::NotSupported("acceptor already taken".into()))?;
653 Ok(MockBleAcceptor { rx })
654 }
655
656 async fn connect(&self, addr: &BleAddr, psm: u16) -> Result<Self::Stream, TransportError> {
657 let handler = self.connect_handler.lock().unwrap();
658 match handler.as_ref() {
659 Some(f) => f(addr, psm),
660 None => Err(TransportError::ConnectionRefused),
661 }
662 }
663
664 async fn start_advertising(&self) -> Result<(), TransportError> {
665 Ok(())
666 }
667
668 async fn stop_advertising(&self) -> Result<(), TransportError> {
669 Ok(())
670 }
671
672 async fn start_scanning(&self) -> Result<Self::Scanner, TransportError> {
673 let rx = self
674 .scan_rx
675 .lock()
676 .unwrap()
677 .take()
678 .ok_or_else(|| TransportError::NotSupported("scanner already taken".into()))?;
679 Ok(MockBleScanner { rx })
680 }
681
682 fn local_addr(&self) -> Result<BleAddr, TransportError> {
683 Ok(self.local_addr.clone())
684 }
685
686 fn adapter_name(&self) -> &str {
687 &self.adapter
688 }
689}
690
691#[cfg(test)]
696mod tests {
697 use super::*;
698
699 fn test_addr(n: u8) -> BleAddr {
700 BleAddr {
701 adapter: "hci0".to_string(),
702 device: [0xAA, 0xBB, 0xCC, 0xDD, 0xEE, n],
703 }
704 }
705
706 #[tokio::test]
707 async fn test_mock_stream_pair_send_recv() {
708 let (a, b) = MockBleStream::pair(test_addr(1), test_addr(2), 2048);
709
710 a.send(b"hello").await.unwrap();
711 let mut buf = [0u8; 64];
712 let n = b.recv(&mut buf).await.unwrap();
713 assert_eq!(&buf[..n], b"hello");
714
715 b.send(b"world").await.unwrap();
716 let n = a.recv(&mut buf).await.unwrap();
717 assert_eq!(&buf[..n], b"world");
718 }
719
720 #[tokio::test]
721 async fn test_mock_stream_mtu() {
722 let (a, b) = MockBleStream::pair(test_addr(1), test_addr(2), 512);
723 assert_eq!(a.send_mtu(), 512);
724 assert_eq!(a.recv_mtu(), 512);
725 assert_eq!(b.send_mtu(), 512);
726 assert_eq!(b.recv_mtu(), 512);
727 }
728
729 #[tokio::test]
730 async fn test_mock_stream_remote_addr() {
731 let (a, b) = MockBleStream::pair(test_addr(1), test_addr(2), 2048);
732 assert_eq!(a.remote_addr(), &test_addr(2));
733 assert_eq!(b.remote_addr(), &test_addr(1));
734 }
735
736 #[tokio::test]
737 async fn test_mock_io_listen_accept() {
738 let io = MockBleIo::new("hci0", test_addr(1));
739 let mut acceptor = io.listen(0x0085).await.unwrap();
740
741 let (stream_a, _stream_b) = MockBleStream::pair(test_addr(1), test_addr(2), 2048);
742 io.inject_inbound(stream_a).await;
743
744 let accepted = acceptor.accept().await.unwrap();
745 assert_eq!(accepted.remote_addr(), &test_addr(2));
747 }
748
749 #[tokio::test]
750 async fn test_mock_io_connect() {
751 let io = MockBleIo::new("hci0", test_addr(1));
752 let local = test_addr(1);
753 io.set_connect_handler(move |addr, _psm| {
754 let (stream, _peer) = MockBleStream::pair(local.clone(), addr.clone(), 2048);
755 Ok(stream)
756 });
757
758 let stream = io.connect(&test_addr(2), 0x0085).await.unwrap();
759 assert_eq!(stream.remote_addr(), &test_addr(2));
760 }
761
762 #[tokio::test]
763 async fn test_mock_io_connect_no_handler() {
764 let io = MockBleIo::new("hci0", test_addr(1));
765 let result = io.connect(&test_addr(2), 0x0085).await;
766 assert!(result.is_err());
767 }
768
769 #[tokio::test]
770 async fn test_mock_io_scan() {
771 let io = MockBleIo::new("hci0", test_addr(1));
772 let mut scanner = io.start_scanning().await.unwrap();
773
774 io.inject_scan_result(test_addr(2)).await;
775 io.inject_scan_result(test_addr(3)).await;
776
777 assert_eq!(scanner.next().await, Some(test_addr(2)));
778 assert_eq!(scanner.next().await, Some(test_addr(3)));
779 }
780
781 #[tokio::test]
782 async fn test_mock_io_local_addr() {
783 let io = MockBleIo::new("hci0", test_addr(1));
784 assert_eq!(io.local_addr().unwrap(), test_addr(1));
785 assert_eq!(io.adapter_name(), "hci0");
786 }
787
788 #[tokio::test]
789 async fn test_mock_io_advertising_noop() {
790 let io = MockBleIo::new("hci0", test_addr(1));
791 io.start_advertising().await.unwrap();
792 io.stop_advertising().await.unwrap();
793 }
794
795 #[tokio::test]
796 async fn test_mock_io_listen_twice_fails() {
797 let io = MockBleIo::new("hci0", test_addr(1));
798 let _acceptor = io.listen(0x0085).await.unwrap();
799 assert!(io.listen(0x0085).await.is_err());
800 }
801}