1use std::collections::HashMap;
59use std::sync::atomic::{AtomicBool, AtomicI8, AtomicU16, Ordering};
60use std::sync::{Arc, Mutex, RwLock};
61use std::time::{Duration, Instant};
62
63use async_trait::async_trait;
64
65use crate::config::{BleConfig, BlePhy, DiscoveryConfig};
66use crate::error::{BleError, Result};
67use crate::platform::{
68 BleAdapter, ConnectionCallback, ConnectionEvent, DisconnectReason, DiscoveredDevice,
69 DiscoveryCallback,
70};
71use crate::transport::BleConnection;
72use crate::NodeId;
73
74#[derive(Clone, Default)]
79pub struct MockNetwork {
80 inner: Arc<MockNetworkInner>,
81}
82
83#[derive(Default)]
84struct MockNetworkInner {
85 advertising_nodes: RwLock<HashMap<NodeId, AdvertisingNode>>,
87 connections: RwLock<HashMap<(NodeId, NodeId), ConnectionState>>,
89 data_queue: Mutex<HashMap<NodeId, Vec<DataPacket>>>,
91}
92
93#[derive(Clone)]
95struct AdvertisingNode {
96 node_id: NodeId,
97 address: String,
98 name: Option<String>,
99 rssi: i8,
100 adv_data: Vec<u8>,
101}
102
103#[derive(Clone)]
105struct ConnectionState {
106 alive: Arc<AtomicBool>,
107}
108
109#[derive(Clone)]
111pub struct DataPacket {
112 pub from: NodeId,
114 pub to: NodeId,
116 pub data: Vec<u8>,
118 pub timestamp: Instant,
120}
121
122impl MockNetwork {
123 pub fn new() -> Self {
125 Self::default()
126 }
127
128 pub fn start_advertising(&self, node_id: NodeId, address: &str, name: Option<&str>) {
130 let mut nodes = self.inner.advertising_nodes.write().unwrap();
131 nodes.insert(
132 node_id,
133 AdvertisingNode {
134 node_id,
135 address: address.to_string(),
136 name: name.map(|s| s.to_string()),
137 rssi: -50, adv_data: vec![],
139 },
140 );
141 }
142
143 pub fn stop_advertising(&self, node_id: &NodeId) {
145 let mut nodes = self.inner.advertising_nodes.write().unwrap();
146 nodes.remove(node_id);
147 }
148
149 pub fn discover_nodes(&self, observer: &NodeId) -> Vec<DiscoveredDevice> {
151 let nodes = self.inner.advertising_nodes.read().unwrap();
152 nodes
153 .values()
154 .filter(|n| &n.node_id != observer)
155 .map(|n| DiscoveredDevice {
156 address: n.address.clone(),
157 name: n.name.clone(),
158 rssi: n.rssi,
159 is_hive_node: true,
160 node_id: Some(n.node_id),
161 adv_data: n.adv_data.clone(),
162 })
163 .collect()
164 }
165
166 pub fn connect(&self, from: &NodeId, to: &NodeId) -> Result<()> {
168 {
170 let nodes = self.inner.advertising_nodes.read().unwrap();
171 if !nodes.contains_key(to) {
172 return Err(BleError::ConnectionFailed(format!(
173 "Node {} is not advertising",
174 to
175 )));
176 }
177 }
178
179 let state = ConnectionState {
181 alive: Arc::new(AtomicBool::new(true)),
182 };
183
184 let mut connections = self.inner.connections.write().unwrap();
186 connections.insert((*from, *to), state.clone());
187 connections.insert((*to, *from), state);
188
189 Ok(())
190 }
191
192 pub fn disconnect(&self, from: &NodeId, to: &NodeId) {
194 let mut connections = self.inner.connections.write().unwrap();
195 if let Some(state) = connections.remove(&(*from, *to)) {
196 state.alive.store(false, Ordering::SeqCst);
197 }
198 if let Some(state) = connections.remove(&(*to, *from)) {
199 state.alive.store(false, Ordering::SeqCst);
200 }
201 }
202
203 pub fn is_connected(&self, from: &NodeId, to: &NodeId) -> bool {
205 let connections = self.inner.connections.read().unwrap();
206 connections
207 .get(&(*from, *to))
208 .is_some_and(|c| c.alive.load(Ordering::SeqCst))
209 }
210
211 pub fn send_data(&self, from: &NodeId, to: &NodeId, data: Vec<u8>) -> Result<()> {
213 {
215 let connections = self.inner.connections.read().unwrap();
216 if !connections.contains_key(&(*from, *to)) {
217 return Err(BleError::ConnectionFailed(format!(
218 "No connection from {} to {}",
219 from, to
220 )));
221 }
222 }
223
224 let mut queue = self.inner.data_queue.lock().unwrap();
226 let packets = queue.entry(*to).or_default();
227 packets.push(DataPacket {
228 from: *from,
229 to: *to,
230 data,
231 timestamp: Instant::now(),
232 });
233
234 Ok(())
235 }
236
237 pub fn receive_data(&self, node_id: &NodeId) -> Vec<DataPacket> {
239 let mut queue = self.inner.data_queue.lock().unwrap();
240 queue.remove(node_id).unwrap_or_default()
241 }
242
243 pub fn connected_peers(&self, node_id: &NodeId) -> Vec<NodeId> {
245 let connections = self.inner.connections.read().unwrap();
246 connections
247 .keys()
248 .filter(|(from, _)| from == node_id)
249 .map(|(_, to)| *to)
250 .collect()
251 }
252
253 pub fn reset(&self) {
255 self.inner.advertising_nodes.write().unwrap().clear();
256 self.inner.connections.write().unwrap().clear();
257 self.inner.data_queue.lock().unwrap().clear();
258 }
259}
260
261pub struct MockConnection {
263 peer_id: NodeId,
264 mtu: AtomicU16,
265 phy: BlePhy,
266 rssi: AtomicI8,
267 alive: Arc<AtomicBool>,
268 established_at: Instant,
269}
270
271impl Clone for MockConnection {
272 fn clone(&self) -> Self {
273 Self {
274 peer_id: self.peer_id,
275 mtu: AtomicU16::new(self.mtu.load(Ordering::SeqCst)),
276 phy: self.phy,
277 rssi: AtomicI8::new(self.rssi.load(Ordering::SeqCst)),
278 alive: self.alive.clone(),
279 established_at: self.established_at,
280 }
281 }
282}
283
284impl MockConnection {
285 pub fn new(peer_id: NodeId, mtu: u16, phy: BlePhy) -> Self {
287 Self {
288 peer_id,
289 mtu: AtomicU16::new(mtu),
290 phy,
291 rssi: AtomicI8::new(-50), alive: Arc::new(AtomicBool::new(true)),
293 established_at: Instant::now(),
294 }
295 }
296
297 pub fn kill(&self) {
299 self.alive.store(false, Ordering::SeqCst);
300 }
301
302 pub fn set_rssi(&self, rssi: i8) {
304 self.rssi.store(rssi, Ordering::SeqCst);
305 }
306
307 pub fn set_mtu(&self, mtu: u16) {
309 self.mtu.store(mtu, Ordering::SeqCst);
310 }
311}
312
313impl BleConnection for MockConnection {
314 fn peer_id(&self) -> &NodeId {
315 &self.peer_id
316 }
317
318 fn is_alive(&self) -> bool {
319 self.alive.load(Ordering::SeqCst)
320 }
321
322 fn mtu(&self) -> u16 {
323 self.mtu.load(Ordering::SeqCst)
324 }
325
326 fn phy(&self) -> BlePhy {
327 self.phy
328 }
329
330 fn rssi(&self) -> Option<i8> {
331 Some(self.rssi.load(Ordering::SeqCst))
332 }
333
334 fn connected_duration(&self) -> Duration {
335 self.established_at.elapsed()
336 }
337}
338
339#[derive(Clone, Debug)]
341pub struct MockAdapterConfig {
342 pub connection_failure_rate: f32,
344 pub connection_latency: Duration,
346 pub scan_latency: Duration,
348 pub supports_coded_phy: bool,
350 pub supports_extended_advertising: bool,
352 pub max_mtu: u16,
354 pub max_connections: u8,
356}
357
358impl Default for MockAdapterConfig {
359 fn default() -> Self {
360 Self {
361 connection_failure_rate: 0.0,
362 connection_latency: Duration::from_millis(50),
363 scan_latency: Duration::from_millis(10),
364 supports_coded_phy: true,
365 supports_extended_advertising: true,
366 max_mtu: 517,
367 max_connections: 8,
368 }
369 }
370}
371
372pub struct MockBleAdapter {
377 node_id: NodeId,
378 network: MockNetwork,
379 config: MockAdapterConfig,
380 powered: AtomicBool,
381 scanning: AtomicBool,
382 advertising: AtomicBool,
383 address: String,
384 discovery_callback: Mutex<Option<DiscoveryCallback>>,
385 connection_callback: Mutex<Option<ConnectionCallback>>,
386 connections: RwLock<HashMap<NodeId, Arc<MockConnection>>>,
387 events: Mutex<Vec<MockEvent>>,
389}
390
391#[derive(Clone, Debug)]
393pub enum MockEvent {
394 Initialized,
396 Started,
398 Stopped,
400 ScanStarted,
402 ScanStopped,
404 AdvertisingStarted,
406 AdvertisingStopped,
408 Connected(NodeId),
410 Disconnected(NodeId, DisconnectReason),
412 GattServiceRegistered,
414 GattServiceUnregistered,
416}
417
418impl MockBleAdapter {
419 pub fn new(node_id: NodeId, network: MockNetwork) -> Self {
421 Self::with_config(node_id, network, MockAdapterConfig::default())
422 }
423
424 pub fn with_config(node_id: NodeId, network: MockNetwork, config: MockAdapterConfig) -> Self {
426 let address = format!(
427 "00:11:22:{:02X}:{:02X}:{:02X}",
428 (node_id.as_u32() >> 16) & 0xFF,
429 (node_id.as_u32() >> 8) & 0xFF,
430 node_id.as_u32() & 0xFF
431 );
432 Self {
433 node_id,
434 network,
435 config,
436 powered: AtomicBool::new(false),
437 scanning: AtomicBool::new(false),
438 advertising: AtomicBool::new(false),
439 address,
440 discovery_callback: Mutex::new(None),
441 connection_callback: Mutex::new(None),
442 connections: RwLock::new(HashMap::new()),
443 events: Mutex::new(Vec::new()),
444 }
445 }
446
447 pub fn events(&self) -> Vec<MockEvent> {
449 self.events.lock().unwrap().clone()
450 }
451
452 pub fn clear_events(&self) {
454 self.events.lock().unwrap().clear();
455 }
456
457 fn record_event(&self, event: MockEvent) {
459 self.events.lock().unwrap().push(event);
460 }
461
462 pub fn trigger_discovery(&self) {
466 let devices = self.network.discover_nodes(&self.node_id);
467 if let Some(ref callback) = *self.discovery_callback.lock().unwrap() {
468 for device in devices {
469 callback(device);
470 }
471 }
472 }
473
474 pub fn inject_data(&self, from: &NodeId, data: Vec<u8>) {
478 if let Some(ref callback) = *self.connection_callback.lock().unwrap() {
479 callback(*from, ConnectionEvent::DataReceived { data });
480 }
481 }
482
483 pub fn simulate_disconnect(&self, peer_id: &NodeId, reason: DisconnectReason) {
485 {
487 let mut conns = self.connections.write().unwrap();
488 if let Some(conn) = conns.remove(peer_id) {
489 conn.kill();
490 }
491 }
492
493 self.network.disconnect(&self.node_id, peer_id);
495
496 if let Some(ref callback) = *self.connection_callback.lock().unwrap() {
498 callback(*peer_id, ConnectionEvent::Disconnected { reason });
499 }
500
501 self.record_event(MockEvent::Disconnected(*peer_id, reason));
502 }
503
504 pub fn node_id(&self) -> &NodeId {
506 &self.node_id
507 }
508
509 pub fn is_scanning(&self) -> bool {
511 self.scanning.load(Ordering::SeqCst)
512 }
513
514 pub fn is_advertising(&self) -> bool {
516 self.advertising.load(Ordering::SeqCst)
517 }
518}
519
520#[async_trait]
521impl BleAdapter for MockBleAdapter {
522 async fn init(&mut self, _config: &BleConfig) -> Result<()> {
523 self.powered.store(true, Ordering::SeqCst);
524 self.record_event(MockEvent::Initialized);
525 Ok(())
526 }
527
528 async fn start(&self) -> Result<()> {
529 self.record_event(MockEvent::Started);
530 Ok(())
531 }
532
533 async fn stop(&self) -> Result<()> {
534 self.scanning.store(false, Ordering::SeqCst);
535 self.advertising.store(false, Ordering::SeqCst);
536 self.network.stop_advertising(&self.node_id);
537 self.record_event(MockEvent::Stopped);
538 Ok(())
539 }
540
541 fn is_powered(&self) -> bool {
542 self.powered.load(Ordering::SeqCst)
543 }
544
545 fn address(&self) -> Option<String> {
546 Some(self.address.clone())
547 }
548
549 async fn start_scan(&self, _config: &DiscoveryConfig) -> Result<()> {
550 self.scanning.store(true, Ordering::SeqCst);
551 self.record_event(MockEvent::ScanStarted);
552
553 Ok(())
557 }
558
559 async fn stop_scan(&self) -> Result<()> {
560 self.scanning.store(false, Ordering::SeqCst);
561 self.record_event(MockEvent::ScanStopped);
562 Ok(())
563 }
564
565 async fn start_advertising(&self, _config: &DiscoveryConfig) -> Result<()> {
566 self.advertising.store(true, Ordering::SeqCst);
567 self.network
568 .start_advertising(self.node_id, &self.address, Some("HIVE"));
569 self.record_event(MockEvent::AdvertisingStarted);
570 Ok(())
571 }
572
573 async fn stop_advertising(&self) -> Result<()> {
574 self.advertising.store(false, Ordering::SeqCst);
575 self.network.stop_advertising(&self.node_id);
576 self.record_event(MockEvent::AdvertisingStopped);
577 Ok(())
578 }
579
580 fn set_discovery_callback(&mut self, callback: Option<DiscoveryCallback>) {
581 *self.discovery_callback.lock().unwrap() = callback;
582 }
583
584 async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn BleConnection>> {
585 if self.connections.read().unwrap().len() >= self.config.max_connections as usize {
587 return Err(BleError::ConnectionFailed(
588 "Maximum connections reached".to_string(),
589 ));
590 }
591
592 self.network.connect(&self.node_id, peer_id)?;
597
598 let conn = Arc::new(MockConnection::new(
600 *peer_id,
601 self.config.max_mtu,
602 BlePhy::Le1M,
603 ));
604
605 {
607 let mut conns = self.connections.write().unwrap();
608 conns.insert(*peer_id, conn.clone());
609 }
610
611 if let Some(ref callback) = *self.connection_callback.lock().unwrap() {
613 callback(
614 *peer_id,
615 ConnectionEvent::Connected {
616 mtu: conn.mtu(),
617 phy: conn.phy(),
618 },
619 );
620 }
621
622 self.record_event(MockEvent::Connected(*peer_id));
623 Ok(Box::new(conn.as_ref().clone()))
624 }
625
626 async fn disconnect(&self, peer_id: &NodeId) -> Result<()> {
627 self.simulate_disconnect(peer_id, DisconnectReason::LocalRequest);
628 Ok(())
629 }
630
631 fn get_connection(&self, peer_id: &NodeId) -> Option<Box<dyn BleConnection>> {
632 let conns = self.connections.read().unwrap();
633 conns
634 .get(peer_id)
635 .filter(|c| c.is_alive())
636 .map(|c| Box::new(c.as_ref().clone()) as Box<dyn BleConnection>)
637 }
638
639 fn peer_count(&self) -> usize {
640 self.connections
641 .read()
642 .unwrap()
643 .values()
644 .filter(|c| c.is_alive())
645 .count()
646 }
647
648 fn connected_peers(&self) -> Vec<NodeId> {
649 self.connections
650 .read()
651 .unwrap()
652 .iter()
653 .filter(|(_, c)| c.is_alive())
654 .map(|(id, _)| *id)
655 .collect()
656 }
657
658 fn set_connection_callback(&mut self, callback: Option<ConnectionCallback>) {
659 *self.connection_callback.lock().unwrap() = callback;
660 }
661
662 async fn register_gatt_service(&self) -> Result<()> {
663 self.record_event(MockEvent::GattServiceRegistered);
664 Ok(())
665 }
666
667 async fn unregister_gatt_service(&self) -> Result<()> {
668 self.record_event(MockEvent::GattServiceUnregistered);
669 Ok(())
670 }
671
672 fn supports_coded_phy(&self) -> bool {
673 self.config.supports_coded_phy
674 }
675
676 fn supports_extended_advertising(&self) -> bool {
677 self.config.supports_extended_advertising
678 }
679
680 fn max_mtu(&self) -> u16 {
681 self.config.max_mtu
682 }
683
684 fn max_connections(&self) -> u8 {
685 self.config.max_connections
686 }
687}
688
689#[cfg(test)]
690mod tests {
691 use super::*;
692
693 #[tokio::test]
694 async fn test_mock_adapter_init() {
695 let network = MockNetwork::new();
696 let mut adapter = MockBleAdapter::new(NodeId::new(0x111), network);
697
698 assert!(!adapter.is_powered());
699 adapter.init(&BleConfig::default()).await.unwrap();
700 assert!(adapter.is_powered());
701
702 let events = adapter.events();
703 assert!(matches!(events[0], MockEvent::Initialized));
704 }
705
706 #[tokio::test]
707 async fn test_mock_network_discovery() {
708 let network = MockNetwork::new();
709
710 let mut adapter1 = MockBleAdapter::new(NodeId::new(0x111), network.clone());
711 let mut adapter2 = MockBleAdapter::new(NodeId::new(0x222), network.clone());
712
713 adapter1.init(&BleConfig::default()).await.unwrap();
714 adapter2.init(&BleConfig::default()).await.unwrap();
715
716 adapter2
718 .start_advertising(&DiscoveryConfig::default())
719 .await
720 .unwrap();
721
722 let devices = network.discover_nodes(&NodeId::new(0x111));
724 assert_eq!(devices.len(), 1);
725 assert_eq!(devices[0].node_id, Some(NodeId::new(0x222)));
726 }
727
728 #[tokio::test]
729 async fn test_mock_connection() {
730 let network = MockNetwork::new();
731
732 let mut adapter1 = MockBleAdapter::new(NodeId::new(0x111), network.clone());
733 let mut adapter2 = MockBleAdapter::new(NodeId::new(0x222), network.clone());
734
735 adapter1.init(&BleConfig::default()).await.unwrap();
736 adapter2.init(&BleConfig::default()).await.unwrap();
737
738 adapter2
740 .start_advertising(&DiscoveryConfig::default())
741 .await
742 .unwrap();
743
744 let conn = adapter1.connect(&NodeId::new(0x222)).await.unwrap();
746 assert!(conn.is_alive());
747 assert_eq!(conn.peer_id(), &NodeId::new(0x222));
748
749 assert_eq!(adapter1.peer_count(), 1);
751 assert!(adapter1.connected_peers().contains(&NodeId::new(0x222)));
752 }
753
754 #[tokio::test]
755 async fn test_mock_disconnect() {
756 let network = MockNetwork::new();
757
758 let mut adapter1 = MockBleAdapter::new(NodeId::new(0x111), network.clone());
759 let mut adapter2 = MockBleAdapter::new(NodeId::new(0x222), network.clone());
760
761 adapter1.init(&BleConfig::default()).await.unwrap();
762 adapter2.init(&BleConfig::default()).await.unwrap();
763 adapter2
764 .start_advertising(&DiscoveryConfig::default())
765 .await
766 .unwrap();
767
768 let conn = adapter1.connect(&NodeId::new(0x222)).await.unwrap();
769 assert!(conn.is_alive());
770
771 adapter1.disconnect(&NodeId::new(0x222)).await.unwrap();
773 assert_eq!(adapter1.peer_count(), 0);
774 }
775
776 #[tokio::test]
777 async fn test_connection_limit() {
778 let network = MockNetwork::new();
779
780 let config = MockAdapterConfig {
781 max_connections: 2,
782 ..Default::default()
783 };
784 let mut adapter1 = MockBleAdapter::with_config(NodeId::new(0x111), network.clone(), config);
785 adapter1.init(&BleConfig::default()).await.unwrap();
786
787 for i in 2..=4 {
789 let mut other = MockBleAdapter::new(NodeId::new(i * 0x111), network.clone());
790 other.init(&BleConfig::default()).await.unwrap();
791 other
792 .start_advertising(&DiscoveryConfig::default())
793 .await
794 .unwrap();
795 }
796
797 adapter1.connect(&NodeId::new(0x222)).await.unwrap();
799 adapter1.connect(&NodeId::new(0x333)).await.unwrap();
800
801 let result = adapter1.connect(&NodeId::new(0x444)).await;
803 assert!(result.is_err());
804 }
805
806 #[tokio::test]
807 async fn test_event_tracking() {
808 let network = MockNetwork::new();
809 let mut adapter = MockBleAdapter::new(NodeId::new(0x111), network.clone());
810
811 adapter.init(&BleConfig::default()).await.unwrap();
812 adapter.start().await.unwrap();
813 adapter
814 .start_scan(&DiscoveryConfig::default())
815 .await
816 .unwrap();
817 adapter.stop_scan().await.unwrap();
818 adapter
819 .start_advertising(&DiscoveryConfig::default())
820 .await
821 .unwrap();
822 adapter.stop_advertising().await.unwrap();
823 adapter.stop().await.unwrap();
824
825 let events = adapter.events();
826 assert!(matches!(events[0], MockEvent::Initialized));
827 assert!(matches!(events[1], MockEvent::Started));
828 assert!(matches!(events[2], MockEvent::ScanStarted));
829 assert!(matches!(events[3], MockEvent::ScanStopped));
830 assert!(matches!(events[4], MockEvent::AdvertisingStarted));
831 assert!(matches!(events[5], MockEvent::AdvertisingStopped));
832 assert!(matches!(events[6], MockEvent::Stopped));
833 }
834
835 #[tokio::test]
836 async fn test_data_injection() {
837 let network = MockNetwork::new();
838 let mut adapter = MockBleAdapter::new(NodeId::new(0x111), network.clone());
839 adapter.init(&BleConfig::default()).await.unwrap();
840
841 let received = Arc::new(Mutex::new(Vec::new()));
843 let received_clone = received.clone();
844
845 adapter.set_connection_callback(Some(Arc::new(move |node_id, event| {
846 if let ConnectionEvent::DataReceived { data } = event {
847 received_clone.lock().unwrap().push((node_id, data));
848 }
849 })));
850
851 adapter.inject_data(&NodeId::new(0x222), vec![1, 2, 3, 4]);
853
854 let data = received.lock().unwrap();
855 assert_eq!(data.len(), 1);
856 assert_eq!(data[0].0, NodeId::new(0x222));
857 assert_eq!(data[0].1, vec![1, 2, 3, 4]);
858 }
859}