1use std::collections::HashMap;
59use std::sync::atomic::{AtomicBool, AtomicI8, AtomicU16, Ordering};
60use std::sync::{Arc, Mutex, RwLock};
61use std::time::{Duration, Instant};
62
63use async_trait::async_trait;
64
65use crate::config::{BleConfig, BlePhy, DiscoveryConfig};
66use crate::error::{BleError, Result};
67use crate::platform::{
68 BleAdapter, ConnectionCallback, ConnectionEvent, DisconnectReason, DiscoveredDevice,
69 DiscoveryCallback,
70};
71use crate::transport::BleConnection;
72use crate::NodeId;
73
74#[derive(Clone, Default)]
79pub struct MockNetwork {
80 inner: Arc<MockNetworkInner>,
81}
82
83#[derive(Default)]
84struct MockNetworkInner {
85 advertising_nodes: RwLock<HashMap<NodeId, AdvertisingNode>>,
87 connections: RwLock<HashMap<(NodeId, NodeId), ConnectionState>>,
89 data_queue: Mutex<HashMap<NodeId, Vec<DataPacket>>>,
91}
92
93#[derive(Clone)]
95struct AdvertisingNode {
96 node_id: NodeId,
97 address: String,
98 name: Option<String>,
99 rssi: i8,
100 adv_data: Vec<u8>,
101}
102
103#[derive(Clone)]
105struct ConnectionState {
106 alive: Arc<AtomicBool>,
107}
108
109#[derive(Clone)]
111pub struct DataPacket {
112 pub from: NodeId,
114 pub to: NodeId,
116 pub data: Vec<u8>,
118 pub timestamp: Instant,
120}
121
122impl MockNetwork {
123 pub fn new() -> Self {
125 Self::default()
126 }
127
128 pub fn start_advertising(&self, node_id: NodeId, address: &str, name: Option<&str>) {
130 let mut nodes = self.inner.advertising_nodes.write().unwrap();
131 nodes.insert(
132 node_id,
133 AdvertisingNode {
134 node_id,
135 address: address.to_string(),
136 name: name.map(|s| s.to_string()),
137 rssi: -50, adv_data: vec![],
139 },
140 );
141 }
142
143 pub fn stop_advertising(&self, node_id: &NodeId) {
145 let mut nodes = self.inner.advertising_nodes.write().unwrap();
146 nodes.remove(node_id);
147 }
148
149 pub fn discover_nodes(&self, observer: &NodeId) -> Vec<DiscoveredDevice> {
151 let nodes = self.inner.advertising_nodes.read().unwrap();
152 nodes
153 .values()
154 .filter(|n| &n.node_id != observer)
155 .map(|n| DiscoveredDevice {
156 address: n.address.clone(),
157 name: n.name.clone(),
158 rssi: n.rssi,
159 is_hive_node: true,
160 node_id: Some(n.node_id),
161 adv_data: n.adv_data.clone(),
162 })
163 .collect()
164 }
165
166 pub fn connect(&self, from: &NodeId, to: &NodeId) -> Result<()> {
168 {
170 let nodes = self.inner.advertising_nodes.read().unwrap();
171 if !nodes.contains_key(to) {
172 return Err(BleError::ConnectionFailed(format!(
173 "Node {} is not advertising",
174 to
175 )));
176 }
177 }
178
179 let state = ConnectionState {
181 alive: Arc::new(AtomicBool::new(true)),
182 };
183
184 let mut connections = self.inner.connections.write().unwrap();
186 connections.insert((*from, *to), state.clone());
187 connections.insert((*to, *from), state);
188
189 Ok(())
190 }
191
192 pub fn disconnect(&self, from: &NodeId, to: &NodeId) {
194 let mut connections = self.inner.connections.write().unwrap();
195 if let Some(state) = connections.remove(&(*from, *to)) {
196 state.alive.store(false, Ordering::SeqCst);
197 }
198 if let Some(state) = connections.remove(&(*to, *from)) {
199 state.alive.store(false, Ordering::SeqCst);
200 }
201 }
202
203 pub fn is_connected(&self, from: &NodeId, to: &NodeId) -> bool {
205 let connections = self.inner.connections.read().unwrap();
206 connections
207 .get(&(*from, *to))
208 .is_some_and(|c| c.alive.load(Ordering::SeqCst))
209 }
210
211 pub fn send_data(&self, from: &NodeId, to: &NodeId, data: Vec<u8>) -> Result<()> {
213 {
215 let connections = self.inner.connections.read().unwrap();
216 if !connections.contains_key(&(*from, *to)) {
217 return Err(BleError::ConnectionFailed(format!(
218 "No connection from {} to {}",
219 from, to
220 )));
221 }
222 }
223
224 let mut queue = self.inner.data_queue.lock().unwrap();
226 let packets = queue.entry(*to).or_default();
227 packets.push(DataPacket {
228 from: *from,
229 to: *to,
230 data,
231 timestamp: Instant::now(),
232 });
233
234 Ok(())
235 }
236
237 pub fn receive_data(&self, node_id: &NodeId) -> Vec<DataPacket> {
239 let mut queue = self.inner.data_queue.lock().unwrap();
240 queue.remove(node_id).unwrap_or_default()
241 }
242
243 pub fn connected_peers(&self, node_id: &NodeId) -> Vec<NodeId> {
245 let connections = self.inner.connections.read().unwrap();
246 connections
247 .keys()
248 .filter(|(from, _)| from == node_id)
249 .map(|(_, to)| *to)
250 .collect()
251 }
252
253 pub fn reset(&self) {
255 self.inner.advertising_nodes.write().unwrap().clear();
256 self.inner.connections.write().unwrap().clear();
257 self.inner.data_queue.lock().unwrap().clear();
258 }
259}
260
261pub struct MockConnection {
263 peer_id: NodeId,
264 mtu: AtomicU16,
265 phy: BlePhy,
266 rssi: AtomicI8,
267 alive: Arc<AtomicBool>,
268 established_at: Instant,
269}
270
271impl Clone for MockConnection {
272 fn clone(&self) -> Self {
273 Self {
274 peer_id: self.peer_id,
275 mtu: AtomicU16::new(self.mtu.load(Ordering::SeqCst)),
276 phy: self.phy,
277 rssi: AtomicI8::new(self.rssi.load(Ordering::SeqCst)),
278 alive: self.alive.clone(),
279 established_at: self.established_at,
280 }
281 }
282}
283
284impl MockConnection {
285 pub fn new(peer_id: NodeId, mtu: u16, phy: BlePhy) -> Self {
287 Self {
288 peer_id,
289 mtu: AtomicU16::new(mtu),
290 phy,
291 rssi: AtomicI8::new(-50), alive: Arc::new(AtomicBool::new(true)),
293 established_at: Instant::now(),
294 }
295 }
296
297 pub fn kill(&self) {
299 self.alive.store(false, Ordering::SeqCst);
300 }
301
302 pub fn set_rssi(&self, rssi: i8) {
304 self.rssi.store(rssi, Ordering::SeqCst);
305 }
306
307 pub fn set_mtu(&self, mtu: u16) {
309 self.mtu.store(mtu, Ordering::SeqCst);
310 }
311}
312
313impl BleConnection for MockConnection {
314 fn peer_id(&self) -> &NodeId {
315 &self.peer_id
316 }
317
318 fn is_alive(&self) -> bool {
319 self.alive.load(Ordering::SeqCst)
320 }
321
322 fn mtu(&self) -> u16 {
323 self.mtu.load(Ordering::SeqCst)
324 }
325
326 fn phy(&self) -> BlePhy {
327 self.phy
328 }
329
330 fn rssi(&self) -> Option<i8> {
331 Some(self.rssi.load(Ordering::SeqCst))
332 }
333
334 fn connected_duration(&self) -> Duration {
335 self.established_at.elapsed()
336 }
337}
338
339#[derive(Clone, Debug)]
341pub struct MockAdapterConfig {
342 pub connection_failure_rate: f32,
344 pub connection_latency: Duration,
346 pub scan_latency: Duration,
348 pub supports_coded_phy: bool,
350 pub supports_extended_advertising: bool,
352 pub max_mtu: u16,
354 pub max_connections: u8,
356}
357
358impl Default for MockAdapterConfig {
359 fn default() -> Self {
360 Self {
361 connection_failure_rate: 0.0,
362 connection_latency: Duration::from_millis(50),
363 scan_latency: Duration::from_millis(10),
364 supports_coded_phy: true,
365 supports_extended_advertising: true,
366 max_mtu: 517,
367 max_connections: 8,
368 }
369 }
370}
371
372pub struct MockBleAdapter {
377 node_id: NodeId,
378 network: MockNetwork,
379 config: MockAdapterConfig,
380 powered: AtomicBool,
381 scanning: AtomicBool,
382 advertising: AtomicBool,
383 address: String,
384 discovery_callback: Mutex<Option<DiscoveryCallback>>,
385 connection_callback: Mutex<Option<ConnectionCallback>>,
386 connections: RwLock<HashMap<NodeId, Arc<MockConnection>>>,
387 events: Mutex<Vec<MockEvent>>,
389}
390
391#[derive(Clone, Debug)]
393pub enum MockEvent {
394 Initialized,
396 Started,
398 Stopped,
400 ScanStarted,
402 ScanStopped,
404 AdvertisingStarted,
406 AdvertisingStopped,
408 Connected(NodeId),
410 Disconnected(NodeId, DisconnectReason),
412 GattServiceRegistered,
414 GattServiceUnregistered,
416}
417
418impl MockBleAdapter {
419 pub fn new(node_id: NodeId, network: MockNetwork) -> Self {
421 Self::with_config(node_id, network, MockAdapterConfig::default())
422 }
423
424 pub fn with_config(node_id: NodeId, network: MockNetwork, config: MockAdapterConfig) -> Self {
426 let address = format!(
427 "00:11:22:{:02X}:{:02X}:{:02X}",
428 (node_id.as_u32() >> 16) & 0xFF,
429 (node_id.as_u32() >> 8) & 0xFF,
430 node_id.as_u32() & 0xFF
431 );
432 Self {
433 node_id,
434 network,
435 config,
436 powered: AtomicBool::new(false),
437 scanning: AtomicBool::new(false),
438 advertising: AtomicBool::new(false),
439 address,
440 discovery_callback: Mutex::new(None),
441 connection_callback: Mutex::new(None),
442 connections: RwLock::new(HashMap::new()),
443 events: Mutex::new(Vec::new()),
444 }
445 }
446
447 pub fn events(&self) -> Vec<MockEvent> {
449 self.events.lock().unwrap().clone()
450 }
451
452 pub fn clear_events(&self) {
454 self.events.lock().unwrap().clear();
455 }
456
457 fn record_event(&self, event: MockEvent) {
459 self.events.lock().unwrap().push(event);
460 }
461
462 pub fn trigger_discovery(&self) {
466 let devices = self.network.discover_nodes(&self.node_id);
467 if let Some(ref callback) = *self.discovery_callback.lock().unwrap() {
468 for device in devices {
469 callback(device);
470 }
471 }
472 }
473
474 pub fn inject_data(&self, from: &NodeId, data: Vec<u8>) {
478 if let Some(ref callback) = *self.connection_callback.lock().unwrap() {
479 callback(*from, ConnectionEvent::DataReceived { data });
480 }
481 }
482
483 pub fn simulate_disconnect(&self, peer_id: &NodeId, reason: DisconnectReason) {
485 {
487 let mut conns = self.connections.write().unwrap();
488 if let Some(conn) = conns.remove(peer_id) {
489 conn.kill();
490 }
491 }
492
493 self.network.disconnect(&self.node_id, peer_id);
495
496 if let Some(ref callback) = *self.connection_callback.lock().unwrap() {
498 callback(*peer_id, ConnectionEvent::Disconnected { reason });
499 }
500
501 self.record_event(MockEvent::Disconnected(*peer_id, reason));
502 }
503
504 pub fn node_id(&self) -> &NodeId {
506 &self.node_id
507 }
508
509 pub fn is_scanning(&self) -> bool {
511 self.scanning.load(Ordering::SeqCst)
512 }
513
514 pub fn is_advertising(&self) -> bool {
516 self.advertising.load(Ordering::SeqCst)
517 }
518}
519
520#[async_trait]
521impl BleAdapter for MockBleAdapter {
522 async fn init(&mut self, _config: &BleConfig) -> Result<()> {
523 self.powered.store(true, Ordering::SeqCst);
524 self.record_event(MockEvent::Initialized);
525 Ok(())
526 }
527
528 async fn start(&self) -> Result<()> {
529 self.record_event(MockEvent::Started);
530 Ok(())
531 }
532
533 async fn stop(&self) -> Result<()> {
534 self.scanning.store(false, Ordering::SeqCst);
535 self.advertising.store(false, Ordering::SeqCst);
536 self.network.stop_advertising(&self.node_id);
537 self.record_event(MockEvent::Stopped);
538 Ok(())
539 }
540
541 fn is_powered(&self) -> bool {
542 self.powered.load(Ordering::SeqCst)
543 }
544
545 fn address(&self) -> Option<String> {
546 Some(self.address.clone())
547 }
548
549 async fn start_scan(&self, _config: &DiscoveryConfig) -> Result<()> {
550 self.scanning.store(true, Ordering::SeqCst);
551 self.record_event(MockEvent::ScanStarted);
552
553 Ok(())
557 }
558
559 async fn stop_scan(&self) -> Result<()> {
560 self.scanning.store(false, Ordering::SeqCst);
561 self.record_event(MockEvent::ScanStopped);
562 Ok(())
563 }
564
565 async fn start_advertising(&self, _config: &DiscoveryConfig) -> Result<()> {
566 self.advertising.store(true, Ordering::SeqCst);
567 self.network
568 .start_advertising(self.node_id, &self.address, Some("HIVE"));
569 self.record_event(MockEvent::AdvertisingStarted);
570 Ok(())
571 }
572
573 async fn stop_advertising(&self) -> Result<()> {
574 self.advertising.store(false, Ordering::SeqCst);
575 self.network.stop_advertising(&self.node_id);
576 self.record_event(MockEvent::AdvertisingStopped);
577 Ok(())
578 }
579
580 fn set_discovery_callback(&mut self, callback: Option<DiscoveryCallback>) {
581 *self.discovery_callback.lock().unwrap() = callback;
582 }
583
584 async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn BleConnection>> {
585 if self.connections.read().unwrap().len() >= self.config.max_connections as usize {
587 return Err(BleError::ConnectionFailed(
588 "Maximum connections reached".to_string(),
589 ));
590 }
591
592 self.network.connect(&self.node_id, peer_id)?;
597
598 let conn = Arc::new(MockConnection::new(
600 *peer_id,
601 self.config.max_mtu,
602 BlePhy::Le1M,
603 ));
604
605 {
607 let mut conns = self.connections.write().unwrap();
608 conns.insert(*peer_id, conn.clone());
609 }
610
611 if let Some(ref callback) = *self.connection_callback.lock().unwrap() {
613 callback(
614 *peer_id,
615 ConnectionEvent::Connected {
616 mtu: conn.mtu(),
617 phy: conn.phy(),
618 },
619 );
620 }
621
622 self.record_event(MockEvent::Connected(*peer_id));
623 Ok(Box::new(conn.as_ref().clone()))
624 }
625
626 async fn disconnect(&self, peer_id: &NodeId) -> Result<()> {
627 self.simulate_disconnect(peer_id, DisconnectReason::LocalRequest);
628 Ok(())
629 }
630
631 fn get_connection(&self, peer_id: &NodeId) -> Option<Box<dyn BleConnection>> {
632 let conns = self.connections.read().unwrap();
633 conns
634 .get(peer_id)
635 .filter(|c| c.is_alive())
636 .map(|c| Box::new(c.as_ref().clone()) as Box<dyn BleConnection>)
637 }
638
639 fn peer_count(&self) -> usize {
640 self.connections
641 .read()
642 .unwrap()
643 .values()
644 .filter(|c| c.is_alive())
645 .count()
646 }
647
648 fn connected_peers(&self) -> Vec<NodeId> {
649 self.connections
650 .read()
651 .unwrap()
652 .iter()
653 .filter(|(_, c)| c.is_alive())
654 .map(|(id, _)| *id)
655 .collect()
656 }
657
658 fn set_connection_callback(&mut self, callback: Option<ConnectionCallback>) {
659 *self.connection_callback.lock().unwrap() = callback;
660 }
661
662 async fn register_gatt_service(&self) -> Result<()> {
663 self.record_event(MockEvent::GattServiceRegistered);
664 Ok(())
665 }
666
667 async fn unregister_gatt_service(&self) -> Result<()> {
668 self.record_event(MockEvent::GattServiceUnregistered);
669 Ok(())
670 }
671
672 async fn write_to_peer(
673 &self,
674 peer_id: &NodeId,
675 _char_uuid: uuid::Uuid,
676 data: &[u8],
677 ) -> Result<()> {
678 self.network
679 .send_data(&self.node_id, peer_id, data.to_vec())
680 }
681
682 fn supports_coded_phy(&self) -> bool {
683 self.config.supports_coded_phy
684 }
685
686 fn supports_extended_advertising(&self) -> bool {
687 self.config.supports_extended_advertising
688 }
689
690 fn max_mtu(&self) -> u16 {
691 self.config.max_mtu
692 }
693
694 fn max_connections(&self) -> u8 {
695 self.config.max_connections
696 }
697}
698
699#[cfg(test)]
700mod tests {
701 use super::*;
702
703 #[tokio::test]
704 async fn test_mock_adapter_init() {
705 let network = MockNetwork::new();
706 let mut adapter = MockBleAdapter::new(NodeId::new(0x111), network);
707
708 assert!(!adapter.is_powered());
709 adapter.init(&BleConfig::default()).await.unwrap();
710 assert!(adapter.is_powered());
711
712 let events = adapter.events();
713 assert!(matches!(events[0], MockEvent::Initialized));
714 }
715
716 #[tokio::test]
717 async fn test_mock_network_discovery() {
718 let network = MockNetwork::new();
719
720 let mut adapter1 = MockBleAdapter::new(NodeId::new(0x111), network.clone());
721 let mut adapter2 = MockBleAdapter::new(NodeId::new(0x222), network.clone());
722
723 adapter1.init(&BleConfig::default()).await.unwrap();
724 adapter2.init(&BleConfig::default()).await.unwrap();
725
726 adapter2
728 .start_advertising(&DiscoveryConfig::default())
729 .await
730 .unwrap();
731
732 let devices = network.discover_nodes(&NodeId::new(0x111));
734 assert_eq!(devices.len(), 1);
735 assert_eq!(devices[0].node_id, Some(NodeId::new(0x222)));
736 }
737
738 #[tokio::test]
739 async fn test_mock_connection() {
740 let network = MockNetwork::new();
741
742 let mut adapter1 = MockBleAdapter::new(NodeId::new(0x111), network.clone());
743 let mut adapter2 = MockBleAdapter::new(NodeId::new(0x222), network.clone());
744
745 adapter1.init(&BleConfig::default()).await.unwrap();
746 adapter2.init(&BleConfig::default()).await.unwrap();
747
748 adapter2
750 .start_advertising(&DiscoveryConfig::default())
751 .await
752 .unwrap();
753
754 let conn = adapter1.connect(&NodeId::new(0x222)).await.unwrap();
756 assert!(conn.is_alive());
757 assert_eq!(conn.peer_id(), &NodeId::new(0x222));
758
759 assert_eq!(adapter1.peer_count(), 1);
761 assert!(adapter1.connected_peers().contains(&NodeId::new(0x222)));
762 }
763
764 #[tokio::test]
765 async fn test_mock_disconnect() {
766 let network = MockNetwork::new();
767
768 let mut adapter1 = MockBleAdapter::new(NodeId::new(0x111), network.clone());
769 let mut adapter2 = MockBleAdapter::new(NodeId::new(0x222), network.clone());
770
771 adapter1.init(&BleConfig::default()).await.unwrap();
772 adapter2.init(&BleConfig::default()).await.unwrap();
773 adapter2
774 .start_advertising(&DiscoveryConfig::default())
775 .await
776 .unwrap();
777
778 let conn = adapter1.connect(&NodeId::new(0x222)).await.unwrap();
779 assert!(conn.is_alive());
780
781 adapter1.disconnect(&NodeId::new(0x222)).await.unwrap();
783 assert_eq!(adapter1.peer_count(), 0);
784 }
785
786 #[tokio::test]
787 async fn test_connection_limit() {
788 let network = MockNetwork::new();
789
790 let config = MockAdapterConfig {
791 max_connections: 2,
792 ..Default::default()
793 };
794 let mut adapter1 = MockBleAdapter::with_config(NodeId::new(0x111), network.clone(), config);
795 adapter1.init(&BleConfig::default()).await.unwrap();
796
797 for i in 2..=4 {
799 let mut other = MockBleAdapter::new(NodeId::new(i * 0x111), network.clone());
800 other.init(&BleConfig::default()).await.unwrap();
801 other
802 .start_advertising(&DiscoveryConfig::default())
803 .await
804 .unwrap();
805 }
806
807 adapter1.connect(&NodeId::new(0x222)).await.unwrap();
809 adapter1.connect(&NodeId::new(0x333)).await.unwrap();
810
811 let result = adapter1.connect(&NodeId::new(0x444)).await;
813 assert!(result.is_err());
814 }
815
816 #[tokio::test]
817 async fn test_event_tracking() {
818 let network = MockNetwork::new();
819 let mut adapter = MockBleAdapter::new(NodeId::new(0x111), network.clone());
820
821 adapter.init(&BleConfig::default()).await.unwrap();
822 adapter.start().await.unwrap();
823 adapter
824 .start_scan(&DiscoveryConfig::default())
825 .await
826 .unwrap();
827 adapter.stop_scan().await.unwrap();
828 adapter
829 .start_advertising(&DiscoveryConfig::default())
830 .await
831 .unwrap();
832 adapter.stop_advertising().await.unwrap();
833 adapter.stop().await.unwrap();
834
835 let events = adapter.events();
836 assert!(matches!(events[0], MockEvent::Initialized));
837 assert!(matches!(events[1], MockEvent::Started));
838 assert!(matches!(events[2], MockEvent::ScanStarted));
839 assert!(matches!(events[3], MockEvent::ScanStopped));
840 assert!(matches!(events[4], MockEvent::AdvertisingStarted));
841 assert!(matches!(events[5], MockEvent::AdvertisingStopped));
842 assert!(matches!(events[6], MockEvent::Stopped));
843 }
844
845 #[tokio::test]
846 async fn test_write_to_peer() {
847 let network = MockNetwork::new();
848
849 let mut adapter1 = MockBleAdapter::new(NodeId::new(0x111), network.clone());
850 let mut adapter2 = MockBleAdapter::new(NodeId::new(0x222), network.clone());
851
852 adapter1.init(&BleConfig::default()).await.unwrap();
853 adapter2.init(&BleConfig::default()).await.unwrap();
854 adapter2
855 .start_advertising(&DiscoveryConfig::default())
856 .await
857 .unwrap();
858
859 adapter1.connect(&NodeId::new(0x222)).await.unwrap();
861
862 let char_uuid = uuid::Uuid::from_fields(
864 0x0003,
865 0x0000,
866 0x1000,
867 &[0x80, 0x00, 0x00, 0x80, 0x5F, 0x9B, 0x34, 0xFB],
868 );
869 let result = adapter1
870 .write_to_peer(&NodeId::new(0x222), char_uuid, &[1, 2, 3, 4])
871 .await;
872 assert!(result.is_ok());
873
874 let packets = network.receive_data(&NodeId::new(0x222));
876 assert_eq!(packets.len(), 1);
877 assert_eq!(packets[0].data, vec![1, 2, 3, 4]);
878 }
879
880 #[tokio::test]
881 async fn test_write_to_peer_no_connection() {
882 let network = MockNetwork::new();
883 let mut adapter = MockBleAdapter::new(NodeId::new(0x111), network);
884 adapter.init(&BleConfig::default()).await.unwrap();
885
886 let char_uuid = uuid::Uuid::from_fields(
887 0x0003,
888 0x0000,
889 0x1000,
890 &[0x80, 0x00, 0x00, 0x80, 0x5F, 0x9B, 0x34, 0xFB],
891 );
892 let result = adapter
893 .write_to_peer(&NodeId::new(0x999), char_uuid, &[1, 2, 3])
894 .await;
895 assert!(result.is_err());
896 }
897
898 #[tokio::test]
899 async fn test_data_injection() {
900 let network = MockNetwork::new();
901 let mut adapter = MockBleAdapter::new(NodeId::new(0x111), network.clone());
902 adapter.init(&BleConfig::default()).await.unwrap();
903
904 let received = Arc::new(Mutex::new(Vec::new()));
906 let received_clone = received.clone();
907
908 adapter.set_connection_callback(Some(Arc::new(move |node_id, event| {
909 if let ConnectionEvent::DataReceived { data } = event {
910 received_clone.lock().unwrap().push((node_id, data));
911 }
912 })));
913
914 adapter.inject_data(&NodeId::new(0x222), vec![1, 2, 3, 4]);
916
917 let data = received.lock().unwrap();
918 assert_eq!(data.len(), 1);
919 assert_eq!(data[0].0, NodeId::new(0x222));
920 assert_eq!(data[0].1, vec![1, 2, 3, 4]);
921 }
922}