1use crate::adaptive::{
24 AdaptiveGossipSub, AdaptiveRouter, ChurnHandler, ContentHash, ContentStore, MonitoringSystem,
25 NodeId, ReplicationManager, RetrievalManager, StorageConfig,
26};
27use anyhow::Result;
28use async_trait::async_trait;
29use futures::Stream;
30use std::collections::HashMap;
31use std::pin::Pin;
32use std::{sync::Arc, time::Duration};
33use tokio::sync::{RwLock, mpsc};
34
35#[derive(Debug, Clone)]
37pub struct ClientConfig {
38 pub node_address: String,
40
41 pub connect_timeout: Duration,
43
44 pub request_timeout: Duration,
46
47 pub debug_logging: bool,
49
50 pub profile: ClientProfile,
52}
53
54impl Default for ClientConfig {
55 fn default() -> Self {
56 let global_config = crate::config::Config::default();
58
59 let node_address = global_config.network.listen_address.clone();
61
62 Self {
63 node_address,
64 connect_timeout: Duration::from_secs(global_config.network.connection_timeout),
65 request_timeout: Duration::from_secs(30),
66 debug_logging: false,
67 profile: ClientProfile::Full,
68 }
69 }
70}
71
72impl ClientConfig {
73 pub fn from_global_config(config: &crate::config::Config) -> Self {
75 Self {
76 node_address: config.network.listen_address.clone(),
77 connect_timeout: Duration::from_secs(config.network.connection_timeout),
78 request_timeout: Duration::from_secs(30),
79 debug_logging: false,
80 profile: ClientProfile::Full,
81 }
82 }
83}
84
85#[derive(Debug, Clone, Copy, PartialEq)]
87pub enum ClientProfile {
88 Full,
90
91 Light,
93
94 Compute,
96
97 Mobile,
99}
100
101pub struct Client {
103 config: ClientConfig,
105
106 components: Arc<NetworkComponents>,
108
109 state: Arc<RwLock<ClientState>>,
111
112 subscription_rx: Arc<RwLock<mpsc::Receiver<SubscriptionMessage>>>,
114
115 subscription_tx: mpsc::Sender<SubscriptionMessage>,
117}
118
119struct NetworkComponents {
121 node_id: NodeId,
123
124 router: Arc<AdaptiveRouter>,
126
127 gossip: Arc<AdaptiveGossipSub>,
129
130 storage: Arc<ContentStore>,
132
133 retrieval: Arc<RetrievalManager>,
135
136 replication: Arc<ReplicationManager>,
138
139 churn: Arc<ChurnHandler>,
141
142 monitoring: Arc<MonitoringSystem>,
144}
145
146struct ClientState {
148 connected: bool,
150
151 node_info: Option<NodeInfo>,
153
154 subscriptions: HashMap<String, mpsc::Sender<Vec<u8>>>,
156}
157
158struct SubscriptionMessage {
160 topic: String,
162
163 data: Vec<u8>,
165}
166
167#[derive(Debug, Clone)]
169pub struct NodeInfo {
170 pub id: String,
172
173 pub addresses: Vec<String>,
175
176 pub capabilities: NodeCapabilities,
178
179 pub trust_score: f64,
181}
182
183#[derive(Debug, Clone)]
185pub struct NodeCapabilities {
186 pub storage_gb: u64,
188
189 pub compute_score: u64,
191
192 pub bandwidth_mbps: u64,
194}
195
196#[derive(Debug, Clone)]
198pub struct NetworkStats {
199 pub connected_peers: usize,
201
202 pub routing_success_rate: f64,
204
205 pub average_trust_score: f64,
207
208 pub cache_hit_rate: f64,
210
211 pub churn_rate: f64,
213
214 pub total_storage: u64,
216
217 pub total_bandwidth: u64,
219}
220
221#[derive(Debug, Clone)]
223pub struct ComputeJob {
224 pub id: String,
226
227 pub job_type: String,
229
230 pub input: Vec<u8>,
232
233 pub requirements: ResourceRequirements,
235}
236
237#[derive(Debug, Clone)]
239pub struct ResourceRequirements {
240 pub cpu_cores: u32,
242
243 pub memory_mb: u32,
245
246 pub max_duration: Duration,
248}
249
250pub type JobId = String;
252
253#[derive(Debug, Clone)]
255pub struct ComputeResult {
256 pub job_id: JobId,
258
259 pub output: Vec<u8>,
261
262 pub execution_time: Duration,
264
265 pub executor_node: String,
267}
268
269pub type MessageStream = Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;
271
272#[derive(Debug, thiserror::Error)]
274pub enum ClientError {
275 #[error("Connection error: {0}")]
276 Connection(String),
277
278 #[error("Storage error: {0}")]
279 Storage(String),
280
281 #[error("Retrieval error: {0}")]
282 Retrieval(String),
283
284 #[error("Messaging error: {0}")]
285 Messaging(String),
286
287 #[error("Network error: {0}")]
288 Network(String),
289
290 #[error("Timeout error")]
291 Timeout,
292
293 #[error("Not connected")]
294 NotConnected,
295
296 #[error("Other error: {0}")]
297 Other(String),
298}
299
300#[async_trait]
302pub trait AdaptiveP2PClient: Send + Sync {
303 async fn connect(config: ClientConfig) -> Result<Self>
305 where
306 Self: Sized;
307
308 async fn store(&self, data: Vec<u8>) -> Result<ContentHash>;
310 async fn retrieve(&self, hash: &ContentHash) -> Result<Vec<u8>>;
311 async fn delete(&self, hash: &ContentHash) -> Result<()>;
312
313 async fn submit_compute_job(&self, job: ComputeJob) -> Result<JobId>;
315 async fn get_job_result(&self, job_id: &JobId) -> Result<ComputeResult>;
316
317 async fn publish(&self, topic: &str, message: Vec<u8>) -> Result<()>;
319 async fn subscribe(&self, topic: &str) -> Result<MessageStream>;
320
321 async fn get_node_info(&self) -> Result<NodeInfo>;
323 async fn get_network_stats(&self) -> Result<NetworkStats>;
324
325 async fn disconnect(&self) -> Result<()>;
327}
328
329impl Client {
330 pub async fn new(config: ClientConfig) -> Result<Self> {
332 Self::new_with_monitoring(config, true).await
333 }
334
335 pub async fn new_with_monitoring(
337 config: ClientConfig,
338 enable_monitoring: bool,
339 ) -> Result<Self> {
340 let (subscription_tx, subscription_rx) = mpsc::channel(1000);
341
342 let components =
344 Self::initialize_components_with_monitoring(&config, enable_monitoring).await?;
345
346 let client = Self {
347 config,
348 components: Arc::new(components),
349 state: Arc::new(RwLock::new(ClientState {
350 connected: false,
351 node_info: None,
352 subscriptions: HashMap::new(),
353 })),
354 subscription_rx: Arc::new(RwLock::new(subscription_rx)),
355 subscription_tx,
356 };
357
358 Ok(client)
359 }
360
361 #[allow(dead_code)]
363 async fn initialize_components(config: &ClientConfig) -> Result<NetworkComponents> {
364 Self::initialize_components_with_monitoring(config, true).await
365 }
366
367 async fn initialize_components_with_monitoring(
369 config: &ClientConfig,
370 enable_monitoring: bool,
371 ) -> Result<NetworkComponents> {
372 let trust_provider = Arc::new(crate::adaptive::trust::MockTrustProvider::new());
374
375 let hyperbolic = Arc::new(crate::adaptive::hyperbolic::HyperbolicSpace::new());
377 let som = Arc::new(crate::adaptive::som::SelfOrganizingMap::new(
378 crate::adaptive::som::SomConfig {
379 initial_learning_rate: 0.3,
380 initial_radius: 5.0,
381 iterations: 1000,
382 grid_size: crate::adaptive::som::GridSize::Fixed(10, 10),
383 },
384 ));
385 let router = Arc::new(AdaptiveRouter::new(trust_provider.clone()));
386 let _hyperbolic = hyperbolic;
388 let _som = som;
389
390 let node_id = NodeId { hash: [0u8; 32] }; let gossip = Arc::new(AdaptiveGossipSub::new(
393 node_id.clone(),
394 trust_provider.clone(),
395 ));
396
397 let storage_config = match config.profile {
399 ClientProfile::Full => StorageConfig::default(),
400 ClientProfile::Light => StorageConfig {
401 cache_size: 10 * 1024 * 1024, ..Default::default()
403 },
404 ClientProfile::Compute => StorageConfig {
405 cache_size: 100 * 1024 * 1024, ..Default::default()
407 },
408 ClientProfile::Mobile => StorageConfig {
409 cache_size: 5 * 1024 * 1024, chunk_size: 256 * 1024, ..Default::default()
412 },
413 };
414 let storage = Arc::new(ContentStore::new(storage_config).await?);
415
416 let churn_predictor = Arc::new(crate::adaptive::learning::ChurnPredictor::new());
418 let replication = Arc::new(ReplicationManager::new(
419 Default::default(),
420 trust_provider.clone(),
421 churn_predictor.clone(),
422 router.clone(),
423 ));
424
425 let cache_manager = Arc::new(crate::adaptive::learning::QLearnCacheManager::new(
426 storage.get_config().cache_size,
427 ));
428 let retrieval = Arc::new(RetrievalManager::new(
429 router.clone(),
430 storage.clone(),
431 cache_manager.clone(),
432 ));
433
434 let churn = Arc::new(ChurnHandler::new(
435 node_id.clone(),
436 churn_predictor,
437 trust_provider.clone(),
438 replication.clone(),
439 router.clone(),
440 gossip.clone(),
441 Default::default(),
442 ));
443
444 #[cfg(feature = "metrics")]
447 let registry = Some(prometheus::Registry::new());
448 #[cfg(not(feature = "metrics"))]
449 let registry = None;
450
451 let monitoring = if enable_monitoring {
452 Arc::new(MonitoringSystem::new_with_registry(
453 crate::adaptive::monitoring::MonitoredComponents {
454 router: router.clone(),
455 churn_handler: churn.clone(),
456 gossip: gossip.clone(),
457 storage: storage.clone(),
458 replication: replication.clone(),
459 thompson: Arc::new(crate::adaptive::learning::ThompsonSampling::new()),
460 cache: cache_manager.clone(),
461 },
462 Default::default(),
463 registry,
464 )?)
465 } else {
466 #[cfg(feature = "metrics")]
468 let test_registry = Some(prometheus::Registry::new());
469 #[cfg(not(feature = "metrics"))]
470 let test_registry = None;
471
472 Arc::new(MonitoringSystem::new_with_registry(
473 crate::adaptive::monitoring::MonitoredComponents {
474 router: router.clone(),
475 churn_handler: churn.clone(),
476 gossip: gossip.clone(),
477 storage: storage.clone(),
478 replication: replication.clone(),
479 thompson: Arc::new(crate::adaptive::learning::ThompsonSampling::new()),
480 cache: cache_manager.clone(),
481 },
482 Default::default(),
483 test_registry,
484 )?)
485 };
486
487 Ok(NetworkComponents {
488 node_id,
489 router,
490 gossip,
491 storage,
492 retrieval,
493 replication,
494 churn,
495 monitoring,
496 })
497 }
498
499 async fn connect_to_node(&self, address: &str) -> Result<()> {
501 tokio::time::sleep(Duration::from_millis(100)).await;
504
505 let mut state = self.state.write().await;
506 state.connected = true;
507 state.node_info = Some(NodeInfo {
508 id: "node_123".to_string(),
509 addresses: vec![address.to_string()],
510 capabilities: NodeCapabilities {
511 storage_gb: 100,
512 compute_score: 1000,
513 bandwidth_mbps: 100,
514 },
515 trust_score: 1.0,
516 });
517
518 Ok(())
519 }
520
521 async fn handle_subscriptions(&self) {
523 let mut rx = self.subscription_rx.write().await;
524
525 while let Some(msg) = rx.recv().await {
526 let state = self.state.read().await;
527 if let Some(sender) = state.subscriptions.get(&msg.topic) {
528 let _ = sender.send(msg.data).await;
529 }
530 }
531 }
532}
533
534#[async_trait]
535impl AdaptiveP2PClient for Client {
536 async fn connect(config: ClientConfig) -> Result<Self> {
537 let client = Self::new(config.clone()).await?;
538
539 tokio::time::timeout(
541 config.connect_timeout,
542 client.connect_to_node(&config.node_address),
543 )
544 .await
545 .map_err(|_| ClientError::Timeout)?
546 .map_err(|e| ClientError::Connection(e.to_string()))?;
547
548 let client_clone = client.clone();
550 tokio::spawn(async move {
551 client_clone.handle_subscriptions().await;
552 });
553
554 client.components.monitoring.start().await;
556
557 client.components.churn.start_monitoring().await;
559
560 Ok(client)
561 }
562
563 async fn store(&self, data: Vec<u8>) -> Result<ContentHash> {
564 let state = self.state.read().await;
565 if !state.connected {
566 return Err(ClientError::NotConnected.into());
567 }
568
569 let metadata = crate::adaptive::storage::ContentMetadata {
571 size: data.len(),
572 content_type: crate::adaptive::ContentType::DataRetrieval,
573 created_at: std::time::SystemTime::now()
574 .duration_since(std::time::UNIX_EPOCH)
575 .map(|d| d.as_secs())
576 .unwrap_or(0),
577 chunk_count: None,
578 replication_factor: 8,
579 };
580
581 let hash = self
582 .components
583 .storage
584 .store(data.clone(), metadata.clone())
585 .await
586 .map_err(|e| ClientError::Storage(e.to_string()))?;
587
588 self.components
590 .replication
591 .replicate_content(&hash, &data, metadata)
592 .await
593 .map_err(|e| ClientError::Storage(format!("Replication failed: {e}")))?;
594
595 Ok(hash)
596 }
597
598 async fn retrieve(&self, hash: &ContentHash) -> Result<Vec<u8>> {
599 let state = self.state.read().await;
600 if !state.connected {
601 return Err(ClientError::NotConnected.into());
602 }
603
604 tokio::time::timeout(
606 self.config.request_timeout,
607 self.components.retrieval.retrieve(
608 hash,
609 crate::adaptive::retrieval::RetrievalStrategy::Parallel,
610 ),
611 )
612 .await
613 .map_err(|_| ClientError::Timeout)?
614 .map_err(|e| ClientError::Retrieval(e.to_string()).into())
615 }
616
617 async fn delete(&self, hash: &ContentHash) -> Result<()> {
618 let state = self.state.read().await;
619 if !state.connected {
620 return Err(ClientError::NotConnected.into());
621 }
622
623 self.components
625 .storage
626 .delete(hash)
627 .await
628 .map_err(|e| ClientError::Storage(e.to_string()).into())
629 }
630
631 async fn submit_compute_job(&self, _job: ComputeJob) -> Result<JobId> {
632 let state = self.state.read().await;
633 if !state.connected {
634 return Err(ClientError::NotConnected.into());
635 }
636
637 Ok(format!("job_{}", uuid::Uuid::new_v4()))
640 }
641
642 async fn get_job_result(&self, job_id: &JobId) -> Result<ComputeResult> {
643 let state = self.state.read().await;
644 if !state.connected {
645 return Err(ClientError::NotConnected.into());
646 }
647
648 Ok(ComputeResult {
651 job_id: job_id.clone(),
652 output: b"Mock compute result".to_vec(),
653 execution_time: Duration::from_secs(5),
654 executor_node: "node_456".to_string(),
655 })
656 }
657
658 async fn publish(&self, topic: &str, message: Vec<u8>) -> Result<()> {
659 let state = self.state.read().await;
660 if !state.connected {
661 return Err(ClientError::NotConnected.into());
662 }
663
664 use crate::adaptive::gossip::GossipMessage;
665
666 let gossip_msg = GossipMessage {
667 topic: topic.to_string(),
668 data: message,
669 from: self.components.node_id.clone(),
670 seqno: 0, timestamp: std::time::SystemTime::now()
672 .duration_since(std::time::UNIX_EPOCH)
673 .map(|d| d.as_secs())
674 .unwrap_or(0),
675 };
676
677 self.components
678 .gossip
679 .publish(topic, gossip_msg)
680 .await
681 .map_err(|e| ClientError::Messaging(e.to_string()).into())
682 }
683
684 async fn subscribe(&self, topic: &str) -> Result<MessageStream> {
685 let state = self.state.read().await;
686 if !state.connected {
687 return Err(ClientError::NotConnected.into());
688 }
689
690 self.components
692 .gossip
693 .subscribe(topic)
694 .await
695 .map_err(|e| ClientError::Messaging(e.to_string()))?;
696
697 let (tx, rx) = mpsc::channel(100);
699
700 let mut state = self.state.write().await;
701 state.subscriptions.insert(topic.to_string(), tx);
702
703 Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)))
704 }
705
706 async fn get_node_info(&self) -> Result<NodeInfo> {
707 let state = self.state.read().await;
708 if !state.connected {
709 return Err(ClientError::NotConnected.into());
710 }
711
712 state
713 .node_info
714 .clone()
715 .ok_or_else(|| ClientError::Other("Node info not available".to_string()).into())
716 }
717
718 async fn get_network_stats(&self) -> Result<NetworkStats> {
719 let state = self.state.read().await;
720 if !state.connected {
721 return Err(ClientError::NotConnected.into());
722 }
723
724 let health = self.components.monitoring.get_health().await;
726 let routing_stats = self.components.router.get_stats().await;
727 let storage_stats = self.components.storage.get_stats().await;
728 let gossip_stats = self.components.gossip.get_stats().await;
729
730 Ok(NetworkStats {
731 connected_peers: gossip_stats.peer_count,
732 routing_success_rate: routing_stats.success_rate(),
733 average_trust_score: health.score,
734 cache_hit_rate: 0.0, churn_rate: health.churn_rate,
736 total_storage: storage_stats.total_bytes,
737 total_bandwidth: 0, })
739 }
740
741 async fn disconnect(&self) -> Result<()> {
742 let mut state = self.state.write().await;
743 state.connected = false;
744
745 state.subscriptions.clear();
747
748 Ok(())
749 }
750}
751
752impl Clone for Client {
754 fn clone(&self) -> Self {
755 Self {
756 config: self.config.clone(),
757 components: self.components.clone(),
758 state: self.state.clone(),
759 subscription_rx: self.subscription_rx.clone(),
760 subscription_tx: self.subscription_tx.clone(),
761 }
762 }
763}
764
765pub async fn connect(address: &str) -> Result<Client> {
767 let config = ClientConfig {
768 node_address: address.to_string(),
769 ..Default::default()
770 };
771
772 Client::connect(config).await
773}
774
775pub async fn connect_with_profile(address: &str, profile: ClientProfile) -> Result<Client> {
777 let config = ClientConfig {
778 node_address: address.to_string(),
779 profile,
780 ..Default::default()
781 };
782
783 Client::connect(config).await
784}
785
786#[cfg(test)]
787mod tests {
788 use super::*;
789 use crate::adaptive::monitoring::MonitoringSystem;
790 use std::time::Duration;
791
792 #[tokio::test]
793 async fn test_client_creation() {
794 let client = new_test_client(ClientConfig::default()).await.unwrap();
795
796 let state = client.state.read().await;
798 assert!(!state.connected);
799 }
800
801 pub async fn new_test_client(config: ClientConfig) -> Result<Client> {
803 let (subscription_tx, subscription_rx) = mpsc::channel(1000);
804
805 let trust_provider = Arc::new(crate::adaptive::trust::MockTrustProvider::new());
807 let hyperbolic = Arc::new(crate::adaptive::hyperbolic::HyperbolicSpace::new());
808 let som = Arc::new(crate::adaptive::som::SelfOrganizingMap::new(
809 crate::adaptive::som::SomConfig {
810 initial_learning_rate: 0.3,
811 initial_radius: 5.0,
812 iterations: 1000,
813 grid_size: crate::adaptive::som::GridSize::Fixed(10, 10),
814 },
815 ));
816 let router = Arc::new(AdaptiveRouter::new(trust_provider.clone()));
817 let _hyperbolic = hyperbolic;
819 let _som = som;
820
821 let node_id = NodeId { hash: [0u8; 32] };
822 let gossip = Arc::new(AdaptiveGossipSub::new(
823 node_id.clone(),
824 trust_provider.clone(),
825 ));
826
827 let storage_config = StorageConfig {
829 cache_size: 1024 * 1024, ..Default::default()
831 };
832 let storage = Arc::new(ContentStore::new(storage_config).await?);
833
834 let churn_predictor = Arc::new(crate::adaptive::learning::ChurnPredictor::new());
835 let replication = Arc::new(ReplicationManager::new(
836 Default::default(),
837 trust_provider.clone(),
838 churn_predictor.clone(),
839 router.clone(),
840 ));
841
842 let cache = Arc::new(crate::adaptive::learning::QLearnCacheManager::new(
843 storage.get_config().cache_size,
844 ));
845 let retrieval = Arc::new(RetrievalManager::new(
846 router.clone(),
847 storage.clone(),
848 cache.clone(),
849 ));
850
851 let churn = Arc::new(ChurnHandler::new(
852 node_id.clone(),
853 churn_predictor,
854 trust_provider.clone(),
855 replication.clone(),
856 router.clone(),
857 gossip.clone(),
858 Default::default(),
859 ));
860
861 let thompson = Arc::new(crate::adaptive::learning::ThompsonSampling::new());
863
864 #[cfg(feature = "metrics")]
866 let test_registry = Some(prometheus::Registry::new());
867 #[cfg(not(feature = "metrics"))]
868 let test_registry = None;
869
870 let monitoring = Arc::new(
871 MonitoringSystem::new_with_registry(
872 crate::adaptive::monitoring::MonitoredComponents {
873 router: router.clone(),
874 churn_handler: churn.clone(),
875 gossip: gossip.clone(),
876 storage: storage.clone(),
877 replication: replication.clone(),
878 thompson: thompson.clone(),
879 cache: cache.clone(),
880 },
881 crate::adaptive::monitoring::MonitoringConfig::default(),
882 test_registry,
883 )
884 .expect("Failed to create monitoring system for tests"),
885 );
886
887 let components = NetworkComponents {
888 node_id,
889 router,
890 gossip,
891 storage,
892 retrieval,
893 replication,
894 churn,
895 monitoring,
896 };
897
898 let client = Client {
899 config,
900 components: Arc::new(components),
901 state: Arc::new(RwLock::new(ClientState {
902 connected: false,
903 node_info: None,
904 subscriptions: HashMap::new(),
905 })),
906 subscription_rx: Arc::new(RwLock::new(subscription_rx)),
907 subscription_tx,
908 };
909
910 Ok(client)
911 }
912
913 #[tokio::test]
914 async fn test_client_connect() {
915 let client = new_test_client(ClientConfig::default()).await.unwrap();
916
917 client.connect_to_node("127.0.0.1:8000").await.unwrap();
919
920 let state = client.state.read().await;
922 assert!(state.connected);
923 assert!(state.node_info.is_some());
924 }
925
926 #[tokio::test]
927 async fn test_storage_operations() {
928 let client = new_test_client(ClientConfig::default()).await.unwrap();
929
930 client.connect_to_node("127.0.0.1:8000").await.unwrap();
932
933 let data = b"Hello, P2P world!".to_vec();
935 let hash = client.store(data.clone()).await.unwrap();
936
937 let retrieved = client.retrieve(&hash).await.unwrap();
939 assert_eq!(retrieved, data);
940 }
941
942 #[tokio::test]
943 async fn test_not_connected_error() {
944 let client = new_test_client(ClientConfig::default()).await.unwrap();
945
946 let result = client.store(vec![1, 2, 3]).await;
948 assert!(result.is_err());
949 assert!(result.unwrap_err().to_string().contains("Not connected"));
950 }
951
952 #[tokio::test]
953 async fn test_network_stats() {
954 let client = new_test_client(ClientConfig::default()).await.unwrap();
955
956 client.connect_to_node("127.0.0.1:8000").await.unwrap();
958
959 let stats = client.get_network_stats().await.unwrap();
960 assert!(stats.routing_success_rate >= 0.0);
961 assert!(stats.routing_success_rate <= 1.0);
962 }
963
964 #[tokio::test]
965 async fn test_client_profiles() {
966 for profile in [
968 ClientProfile::Full,
969 ClientProfile::Light,
970 ClientProfile::Compute,
971 ClientProfile::Mobile,
972 ] {
973 let config = ClientConfig {
974 profile,
975 ..Default::default()
976 };
977 match Client::connect(config).await {
978 Ok(client) => {
979 let info = client.get_node_info().await.unwrap();
980 assert!(!info.id.is_empty());
981 }
982 Err(e) => {
983 let es = format!("{}", e);
985 if es.contains("Duplicate") && es.contains("registration") {
986 continue;
987 }
988 panic!("Client::connect failed: {}", es);
989 }
990 }
991 }
992 }
993
994 #[tokio::test]
995 #[ignore = "requires full adaptive gossip stack"]
996 async fn test_pubsub_messaging() {
997 let client = new_test_client(ClientConfig::default()).await.unwrap();
998
999 client.connect_to_node("127.0.0.1:8000").await.unwrap();
1001
1002 let _stream = tokio::time::timeout(Duration::from_secs(2), client.subscribe("test_topic"))
1004 .await
1005 .expect("subscribe should not hang")
1006 .unwrap();
1007
1008 let message = b"Test message".to_vec();
1010 tokio::time::timeout(
1011 Duration::from_secs(2),
1012 client.publish("test_topic", message.clone()),
1013 )
1014 .await
1015 .expect("publish should not hang")
1016 .unwrap();
1017
1018 }
1021
1022 #[tokio::test]
1023 async fn test_compute_job() {
1024 let client = new_test_client(ClientConfig::default()).await.unwrap();
1025
1026 client.connect_to_node("127.0.0.1:8000").await.unwrap();
1028
1029 let job = ComputeJob {
1030 id: "test_job".to_string(),
1031 job_type: "map_reduce".to_string(),
1032 input: b"Input data".to_vec(),
1033 requirements: ResourceRequirements {
1034 cpu_cores: 2,
1035 memory_mb: 1024,
1036 max_duration: Duration::from_secs(60),
1037 },
1038 };
1039
1040 let job_id = client.submit_compute_job(job).await.unwrap();
1041 assert!(!job_id.is_empty());
1042
1043 let result = client.get_job_result(&job_id).await.unwrap();
1044 assert_eq!(result.job_id, job_id);
1045 }
1046}