1use std::collections::HashMap;
44use std::sync::atomic::{AtomicBool, AtomicI8, AtomicU16, Ordering};
45use std::sync::{Arc, Mutex, RwLock};
46use std::time::{Duration, Instant};
47
48use async_trait::async_trait;
49
50use crate::config::{BleConfig, BlePhy, DiscoveryConfig};
51use crate::error::{BleError, Result};
52use crate::platform::{
53 BleAdapter, ConnectionCallback, ConnectionEvent, DisconnectReason, DiscoveredDevice,
54 DiscoveryCallback,
55};
56use crate::transport::BleConnection;
57use crate::NodeId;
58
59#[derive(Clone, Default)]
64pub struct MockNetwork {
65 inner: Arc<MockNetworkInner>,
66}
67
68#[derive(Default)]
69struct MockNetworkInner {
70 advertising_nodes: RwLock<HashMap<NodeId, AdvertisingNode>>,
72 connections: RwLock<HashMap<(NodeId, NodeId), ConnectionState>>,
74 data_queue: Mutex<HashMap<NodeId, Vec<DataPacket>>>,
76}
77
78#[derive(Clone)]
80struct AdvertisingNode {
81 node_id: NodeId,
82 address: String,
83 name: Option<String>,
84 rssi: i8,
85 adv_data: Vec<u8>,
86}
87
88#[derive(Clone)]
90struct ConnectionState {
91 alive: Arc<AtomicBool>,
92}
93
94#[derive(Clone)]
96pub struct DataPacket {
97 pub from: NodeId,
99 pub to: NodeId,
101 pub data: Vec<u8>,
103 pub timestamp: Instant,
105}
106
107impl MockNetwork {
108 pub fn new() -> Self {
110 Self::default()
111 }
112
113 pub fn start_advertising(&self, node_id: NodeId, address: &str, name: Option<&str>) {
115 let mut nodes = self.inner.advertising_nodes.write().unwrap();
116 nodes.insert(
117 node_id,
118 AdvertisingNode {
119 node_id,
120 address: address.to_string(),
121 name: name.map(|s| s.to_string()),
122 rssi: -50, adv_data: vec![],
124 },
125 );
126 }
127
128 pub fn stop_advertising(&self, node_id: &NodeId) {
130 let mut nodes = self.inner.advertising_nodes.write().unwrap();
131 nodes.remove(node_id);
132 }
133
134 pub fn discover_nodes(&self, observer: &NodeId) -> Vec<DiscoveredDevice> {
136 let nodes = self.inner.advertising_nodes.read().unwrap();
137 nodes
138 .values()
139 .filter(|n| &n.node_id != observer)
140 .map(|n| DiscoveredDevice {
141 address: n.address.clone(),
142 name: n.name.clone(),
143 rssi: n.rssi,
144 is_hive_node: true,
145 node_id: Some(n.node_id),
146 adv_data: n.adv_data.clone(),
147 })
148 .collect()
149 }
150
151 pub fn connect(&self, from: &NodeId, to: &NodeId) -> Result<()> {
153 {
155 let nodes = self.inner.advertising_nodes.read().unwrap();
156 if !nodes.contains_key(to) {
157 return Err(BleError::ConnectionFailed(format!(
158 "Node {} is not advertising",
159 to
160 )));
161 }
162 }
163
164 let state = ConnectionState {
166 alive: Arc::new(AtomicBool::new(true)),
167 };
168
169 let mut connections = self.inner.connections.write().unwrap();
171 connections.insert((*from, *to), state.clone());
172 connections.insert((*to, *from), state);
173
174 Ok(())
175 }
176
177 pub fn disconnect(&self, from: &NodeId, to: &NodeId) {
179 let mut connections = self.inner.connections.write().unwrap();
180 if let Some(state) = connections.remove(&(*from, *to)) {
181 state.alive.store(false, Ordering::SeqCst);
182 }
183 if let Some(state) = connections.remove(&(*to, *from)) {
184 state.alive.store(false, Ordering::SeqCst);
185 }
186 }
187
188 pub fn is_connected(&self, from: &NodeId, to: &NodeId) -> bool {
190 let connections = self.inner.connections.read().unwrap();
191 connections
192 .get(&(*from, *to))
193 .is_some_and(|c| c.alive.load(Ordering::SeqCst))
194 }
195
196 pub fn send_data(&self, from: &NodeId, to: &NodeId, data: Vec<u8>) -> Result<()> {
198 {
200 let connections = self.inner.connections.read().unwrap();
201 if !connections.contains_key(&(*from, *to)) {
202 return Err(BleError::ConnectionFailed(format!(
203 "No connection from {} to {}",
204 from, to
205 )));
206 }
207 }
208
209 let mut queue = self.inner.data_queue.lock().unwrap();
211 let packets = queue.entry(*to).or_default();
212 packets.push(DataPacket {
213 from: *from,
214 to: *to,
215 data,
216 timestamp: Instant::now(),
217 });
218
219 Ok(())
220 }
221
222 pub fn receive_data(&self, node_id: &NodeId) -> Vec<DataPacket> {
224 let mut queue = self.inner.data_queue.lock().unwrap();
225 queue.remove(node_id).unwrap_or_default()
226 }
227
228 pub fn connected_peers(&self, node_id: &NodeId) -> Vec<NodeId> {
230 let connections = self.inner.connections.read().unwrap();
231 connections
232 .keys()
233 .filter(|(from, _)| from == node_id)
234 .map(|(_, to)| *to)
235 .collect()
236 }
237
238 pub fn reset(&self) {
240 self.inner.advertising_nodes.write().unwrap().clear();
241 self.inner.connections.write().unwrap().clear();
242 self.inner.data_queue.lock().unwrap().clear();
243 }
244}
245
246pub struct MockConnection {
248 peer_id: NodeId,
249 mtu: AtomicU16,
250 phy: BlePhy,
251 rssi: AtomicI8,
252 alive: Arc<AtomicBool>,
253 established_at: Instant,
254}
255
256impl Clone for MockConnection {
257 fn clone(&self) -> Self {
258 Self {
259 peer_id: self.peer_id,
260 mtu: AtomicU16::new(self.mtu.load(Ordering::SeqCst)),
261 phy: self.phy,
262 rssi: AtomicI8::new(self.rssi.load(Ordering::SeqCst)),
263 alive: self.alive.clone(),
264 established_at: self.established_at,
265 }
266 }
267}
268
269impl MockConnection {
270 pub fn new(peer_id: NodeId, mtu: u16, phy: BlePhy) -> Self {
272 Self {
273 peer_id,
274 mtu: AtomicU16::new(mtu),
275 phy,
276 rssi: AtomicI8::new(-50), alive: Arc::new(AtomicBool::new(true)),
278 established_at: Instant::now(),
279 }
280 }
281
282 pub fn kill(&self) {
284 self.alive.store(false, Ordering::SeqCst);
285 }
286
287 pub fn set_rssi(&self, rssi: i8) {
289 self.rssi.store(rssi, Ordering::SeqCst);
290 }
291
292 pub fn set_mtu(&self, mtu: u16) {
294 self.mtu.store(mtu, Ordering::SeqCst);
295 }
296}
297
298impl BleConnection for MockConnection {
299 fn peer_id(&self) -> &NodeId {
300 &self.peer_id
301 }
302
303 fn is_alive(&self) -> bool {
304 self.alive.load(Ordering::SeqCst)
305 }
306
307 fn mtu(&self) -> u16 {
308 self.mtu.load(Ordering::SeqCst)
309 }
310
311 fn phy(&self) -> BlePhy {
312 self.phy
313 }
314
315 fn rssi(&self) -> Option<i8> {
316 Some(self.rssi.load(Ordering::SeqCst))
317 }
318
319 fn connected_duration(&self) -> Duration {
320 self.established_at.elapsed()
321 }
322}
323
324#[derive(Clone, Debug)]
326pub struct MockAdapterConfig {
327 pub connection_failure_rate: f32,
329 pub connection_latency: Duration,
331 pub scan_latency: Duration,
333 pub supports_coded_phy: bool,
335 pub supports_extended_advertising: bool,
337 pub max_mtu: u16,
339 pub max_connections: u8,
341}
342
343impl Default for MockAdapterConfig {
344 fn default() -> Self {
345 Self {
346 connection_failure_rate: 0.0,
347 connection_latency: Duration::from_millis(50),
348 scan_latency: Duration::from_millis(10),
349 supports_coded_phy: true,
350 supports_extended_advertising: true,
351 max_mtu: 517,
352 max_connections: 8,
353 }
354 }
355}
356
357pub struct MockBleAdapter {
362 node_id: NodeId,
363 network: MockNetwork,
364 config: MockAdapterConfig,
365 powered: AtomicBool,
366 scanning: AtomicBool,
367 advertising: AtomicBool,
368 address: String,
369 discovery_callback: Mutex<Option<DiscoveryCallback>>,
370 connection_callback: Mutex<Option<ConnectionCallback>>,
371 connections: RwLock<HashMap<NodeId, Arc<MockConnection>>>,
372 events: Mutex<Vec<MockEvent>>,
374}
375
376#[derive(Clone, Debug)]
378pub enum MockEvent {
379 Initialized,
381 Started,
383 Stopped,
385 ScanStarted,
387 ScanStopped,
389 AdvertisingStarted,
391 AdvertisingStopped,
393 Connected(NodeId),
395 Disconnected(NodeId, DisconnectReason),
397 GattServiceRegistered,
399 GattServiceUnregistered,
401}
402
403impl MockBleAdapter {
404 pub fn new(node_id: NodeId, network: MockNetwork) -> Self {
406 Self::with_config(node_id, network, MockAdapterConfig::default())
407 }
408
409 pub fn with_config(node_id: NodeId, network: MockNetwork, config: MockAdapterConfig) -> Self {
411 let address = format!(
412 "00:11:22:{:02X}:{:02X}:{:02X}",
413 (node_id.as_u32() >> 16) & 0xFF,
414 (node_id.as_u32() >> 8) & 0xFF,
415 node_id.as_u32() & 0xFF
416 );
417 Self {
418 node_id,
419 network,
420 config,
421 powered: AtomicBool::new(false),
422 scanning: AtomicBool::new(false),
423 advertising: AtomicBool::new(false),
424 address,
425 discovery_callback: Mutex::new(None),
426 connection_callback: Mutex::new(None),
427 connections: RwLock::new(HashMap::new()),
428 events: Mutex::new(Vec::new()),
429 }
430 }
431
432 pub fn events(&self) -> Vec<MockEvent> {
434 self.events.lock().unwrap().clone()
435 }
436
437 pub fn clear_events(&self) {
439 self.events.lock().unwrap().clear();
440 }
441
442 fn record_event(&self, event: MockEvent) {
444 self.events.lock().unwrap().push(event);
445 }
446
447 pub fn trigger_discovery(&self) {
451 let devices = self.network.discover_nodes(&self.node_id);
452 if let Some(ref callback) = *self.discovery_callback.lock().unwrap() {
453 for device in devices {
454 callback(device);
455 }
456 }
457 }
458
459 pub fn inject_data(&self, from: &NodeId, data: Vec<u8>) {
463 if let Some(ref callback) = *self.connection_callback.lock().unwrap() {
464 callback(*from, ConnectionEvent::DataReceived { data });
465 }
466 }
467
468 pub fn simulate_disconnect(&self, peer_id: &NodeId, reason: DisconnectReason) {
470 {
472 let mut conns = self.connections.write().unwrap();
473 if let Some(conn) = conns.remove(peer_id) {
474 conn.kill();
475 }
476 }
477
478 self.network.disconnect(&self.node_id, peer_id);
480
481 if let Some(ref callback) = *self.connection_callback.lock().unwrap() {
483 callback(*peer_id, ConnectionEvent::Disconnected { reason });
484 }
485
486 self.record_event(MockEvent::Disconnected(*peer_id, reason));
487 }
488
489 pub fn node_id(&self) -> &NodeId {
491 &self.node_id
492 }
493
494 pub fn is_scanning(&self) -> bool {
496 self.scanning.load(Ordering::SeqCst)
497 }
498
499 pub fn is_advertising(&self) -> bool {
501 self.advertising.load(Ordering::SeqCst)
502 }
503}
504
505#[async_trait]
506impl BleAdapter for MockBleAdapter {
507 async fn init(&mut self, _config: &BleConfig) -> Result<()> {
508 self.powered.store(true, Ordering::SeqCst);
509 self.record_event(MockEvent::Initialized);
510 Ok(())
511 }
512
513 async fn start(&self) -> Result<()> {
514 self.record_event(MockEvent::Started);
515 Ok(())
516 }
517
518 async fn stop(&self) -> Result<()> {
519 self.scanning.store(false, Ordering::SeqCst);
520 self.advertising.store(false, Ordering::SeqCst);
521 self.network.stop_advertising(&self.node_id);
522 self.record_event(MockEvent::Stopped);
523 Ok(())
524 }
525
526 fn is_powered(&self) -> bool {
527 self.powered.load(Ordering::SeqCst)
528 }
529
530 fn address(&self) -> Option<String> {
531 Some(self.address.clone())
532 }
533
534 async fn start_scan(&self, _config: &DiscoveryConfig) -> Result<()> {
535 self.scanning.store(true, Ordering::SeqCst);
536 self.record_event(MockEvent::ScanStarted);
537
538 Ok(())
542 }
543
544 async fn stop_scan(&self) -> Result<()> {
545 self.scanning.store(false, Ordering::SeqCst);
546 self.record_event(MockEvent::ScanStopped);
547 Ok(())
548 }
549
550 async fn start_advertising(&self, _config: &DiscoveryConfig) -> Result<()> {
551 self.advertising.store(true, Ordering::SeqCst);
552 self.network
553 .start_advertising(self.node_id, &self.address, Some("HIVE"));
554 self.record_event(MockEvent::AdvertisingStarted);
555 Ok(())
556 }
557
558 async fn stop_advertising(&self) -> Result<()> {
559 self.advertising.store(false, Ordering::SeqCst);
560 self.network.stop_advertising(&self.node_id);
561 self.record_event(MockEvent::AdvertisingStopped);
562 Ok(())
563 }
564
565 fn set_discovery_callback(&mut self, callback: Option<DiscoveryCallback>) {
566 *self.discovery_callback.lock().unwrap() = callback;
567 }
568
569 async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn BleConnection>> {
570 if self.connections.read().unwrap().len() >= self.config.max_connections as usize {
572 return Err(BleError::ConnectionFailed(
573 "Maximum connections reached".to_string(),
574 ));
575 }
576
577 self.network.connect(&self.node_id, peer_id)?;
582
583 let conn = Arc::new(MockConnection::new(
585 *peer_id,
586 self.config.max_mtu,
587 BlePhy::Le1M,
588 ));
589
590 {
592 let mut conns = self.connections.write().unwrap();
593 conns.insert(*peer_id, conn.clone());
594 }
595
596 if let Some(ref callback) = *self.connection_callback.lock().unwrap() {
598 callback(
599 *peer_id,
600 ConnectionEvent::Connected {
601 mtu: conn.mtu(),
602 phy: conn.phy(),
603 },
604 );
605 }
606
607 self.record_event(MockEvent::Connected(*peer_id));
608 Ok(Box::new(conn.as_ref().clone()))
609 }
610
611 async fn disconnect(&self, peer_id: &NodeId) -> Result<()> {
612 self.simulate_disconnect(peer_id, DisconnectReason::LocalRequest);
613 Ok(())
614 }
615
616 fn get_connection(&self, peer_id: &NodeId) -> Option<Box<dyn BleConnection>> {
617 let conns = self.connections.read().unwrap();
618 conns
619 .get(peer_id)
620 .filter(|c| c.is_alive())
621 .map(|c| Box::new(c.as_ref().clone()) as Box<dyn BleConnection>)
622 }
623
624 fn peer_count(&self) -> usize {
625 self.connections
626 .read()
627 .unwrap()
628 .values()
629 .filter(|c| c.is_alive())
630 .count()
631 }
632
633 fn connected_peers(&self) -> Vec<NodeId> {
634 self.connections
635 .read()
636 .unwrap()
637 .iter()
638 .filter(|(_, c)| c.is_alive())
639 .map(|(id, _)| *id)
640 .collect()
641 }
642
643 fn set_connection_callback(&mut self, callback: Option<ConnectionCallback>) {
644 *self.connection_callback.lock().unwrap() = callback;
645 }
646
647 async fn register_gatt_service(&self) -> Result<()> {
648 self.record_event(MockEvent::GattServiceRegistered);
649 Ok(())
650 }
651
652 async fn unregister_gatt_service(&self) -> Result<()> {
653 self.record_event(MockEvent::GattServiceUnregistered);
654 Ok(())
655 }
656
657 fn supports_coded_phy(&self) -> bool {
658 self.config.supports_coded_phy
659 }
660
661 fn supports_extended_advertising(&self) -> bool {
662 self.config.supports_extended_advertising
663 }
664
665 fn max_mtu(&self) -> u16 {
666 self.config.max_mtu
667 }
668
669 fn max_connections(&self) -> u8 {
670 self.config.max_connections
671 }
672}
673
674#[cfg(test)]
675mod tests {
676 use super::*;
677
678 #[tokio::test]
679 async fn test_mock_adapter_init() {
680 let network = MockNetwork::new();
681 let mut adapter = MockBleAdapter::new(NodeId::new(0x111), network);
682
683 assert!(!adapter.is_powered());
684 adapter.init(&BleConfig::default()).await.unwrap();
685 assert!(adapter.is_powered());
686
687 let events = adapter.events();
688 assert!(matches!(events[0], MockEvent::Initialized));
689 }
690
691 #[tokio::test]
692 async fn test_mock_network_discovery() {
693 let network = MockNetwork::new();
694
695 let mut adapter1 = MockBleAdapter::new(NodeId::new(0x111), network.clone());
696 let mut adapter2 = MockBleAdapter::new(NodeId::new(0x222), network.clone());
697
698 adapter1.init(&BleConfig::default()).await.unwrap();
699 adapter2.init(&BleConfig::default()).await.unwrap();
700
701 adapter2
703 .start_advertising(&DiscoveryConfig::default())
704 .await
705 .unwrap();
706
707 let devices = network.discover_nodes(&NodeId::new(0x111));
709 assert_eq!(devices.len(), 1);
710 assert_eq!(devices[0].node_id, Some(NodeId::new(0x222)));
711 }
712
713 #[tokio::test]
714 async fn test_mock_connection() {
715 let network = MockNetwork::new();
716
717 let mut adapter1 = MockBleAdapter::new(NodeId::new(0x111), network.clone());
718 let mut adapter2 = MockBleAdapter::new(NodeId::new(0x222), network.clone());
719
720 adapter1.init(&BleConfig::default()).await.unwrap();
721 adapter2.init(&BleConfig::default()).await.unwrap();
722
723 adapter2
725 .start_advertising(&DiscoveryConfig::default())
726 .await
727 .unwrap();
728
729 let conn = adapter1.connect(&NodeId::new(0x222)).await.unwrap();
731 assert!(conn.is_alive());
732 assert_eq!(conn.peer_id(), &NodeId::new(0x222));
733
734 assert_eq!(adapter1.peer_count(), 1);
736 assert!(adapter1.connected_peers().contains(&NodeId::new(0x222)));
737 }
738
739 #[tokio::test]
740 async fn test_mock_disconnect() {
741 let network = MockNetwork::new();
742
743 let mut adapter1 = MockBleAdapter::new(NodeId::new(0x111), network.clone());
744 let mut adapter2 = MockBleAdapter::new(NodeId::new(0x222), network.clone());
745
746 adapter1.init(&BleConfig::default()).await.unwrap();
747 adapter2.init(&BleConfig::default()).await.unwrap();
748 adapter2
749 .start_advertising(&DiscoveryConfig::default())
750 .await
751 .unwrap();
752
753 let conn = adapter1.connect(&NodeId::new(0x222)).await.unwrap();
754 assert!(conn.is_alive());
755
756 adapter1.disconnect(&NodeId::new(0x222)).await.unwrap();
758 assert_eq!(adapter1.peer_count(), 0);
759 }
760
761 #[tokio::test]
762 async fn test_connection_limit() {
763 let network = MockNetwork::new();
764
765 let config = MockAdapterConfig {
766 max_connections: 2,
767 ..Default::default()
768 };
769 let mut adapter1 = MockBleAdapter::with_config(NodeId::new(0x111), network.clone(), config);
770 adapter1.init(&BleConfig::default()).await.unwrap();
771
772 for i in 2..=4 {
774 let mut other = MockBleAdapter::new(NodeId::new(i * 0x111), network.clone());
775 other.init(&BleConfig::default()).await.unwrap();
776 other
777 .start_advertising(&DiscoveryConfig::default())
778 .await
779 .unwrap();
780 }
781
782 adapter1.connect(&NodeId::new(0x222)).await.unwrap();
784 adapter1.connect(&NodeId::new(0x333)).await.unwrap();
785
786 let result = adapter1.connect(&NodeId::new(0x444)).await;
788 assert!(result.is_err());
789 }
790
791 #[tokio::test]
792 async fn test_event_tracking() {
793 let network = MockNetwork::new();
794 let mut adapter = MockBleAdapter::new(NodeId::new(0x111), network.clone());
795
796 adapter.init(&BleConfig::default()).await.unwrap();
797 adapter.start().await.unwrap();
798 adapter
799 .start_scan(&DiscoveryConfig::default())
800 .await
801 .unwrap();
802 adapter.stop_scan().await.unwrap();
803 adapter
804 .start_advertising(&DiscoveryConfig::default())
805 .await
806 .unwrap();
807 adapter.stop_advertising().await.unwrap();
808 adapter.stop().await.unwrap();
809
810 let events = adapter.events();
811 assert!(matches!(events[0], MockEvent::Initialized));
812 assert!(matches!(events[1], MockEvent::Started));
813 assert!(matches!(events[2], MockEvent::ScanStarted));
814 assert!(matches!(events[3], MockEvent::ScanStopped));
815 assert!(matches!(events[4], MockEvent::AdvertisingStarted));
816 assert!(matches!(events[5], MockEvent::AdvertisingStopped));
817 assert!(matches!(events[6], MockEvent::Stopped));
818 }
819
820 #[tokio::test]
821 async fn test_data_injection() {
822 let network = MockNetwork::new();
823 let mut adapter = MockBleAdapter::new(NodeId::new(0x111), network.clone());
824 adapter.init(&BleConfig::default()).await.unwrap();
825
826 let received = Arc::new(Mutex::new(Vec::new()));
828 let received_clone = received.clone();
829
830 adapter.set_connection_callback(Some(Arc::new(move |node_id, event| {
831 if let ConnectionEvent::DataReceived { data } = event {
832 received_clone.lock().unwrap().push((node_id, data));
833 }
834 })));
835
836 adapter.inject_data(&NodeId::new(0x222), vec![1, 2, 3, 4]);
838
839 let data = received.lock().unwrap();
840 assert_eq!(data.len(), 1);
841 assert_eq!(data[0].0, NodeId::new(0x222));
842 assert_eq!(data[0].1, vec![1, 2, 3, 4]);
843 }
844}