1use crate::adaptive::{
24 AdaptiveGossipSub, AdaptiveRouter, ChurnHandler, ContentHash, ContentStore, MonitoringSystem,
25 NodeId, ReplicationManager, RetrievalManager, StorageConfig,
26};
27use anyhow::Result;
28use async_trait::async_trait;
29use futures::Stream;
30use std::collections::HashMap;
31use std::pin::Pin;
32use std::{sync::Arc, time::Duration};
33use tokio::sync::{RwLock, mpsc};
34
35#[derive(Debug, Clone)]
37pub struct ClientConfig {
38 pub node_address: String,
40
41 pub connect_timeout: Duration,
43
44 pub request_timeout: Duration,
46
47 pub debug_logging: bool,
49
50 pub profile: ClientProfile,
52}
53
54impl Default for ClientConfig {
55 fn default() -> Self {
56 let global_config = crate::config::Config::default();
58
59 let node_address = global_config.network.listen_address.clone();
61
62 Self {
63 node_address,
64 connect_timeout: Duration::from_secs(global_config.network.connection_timeout),
65 request_timeout: Duration::from_secs(30),
66 debug_logging: false,
67 profile: ClientProfile::Full,
68 }
69 }
70}
71
72impl ClientConfig {
73 pub fn from_global_config(config: &crate::config::Config) -> Self {
75 Self {
76 node_address: config.network.listen_address.clone(),
77 connect_timeout: Duration::from_secs(config.network.connection_timeout),
78 request_timeout: Duration::from_secs(30),
79 debug_logging: false,
80 profile: ClientProfile::Full,
81 }
82 }
83}
84
85#[derive(Debug, Clone, Copy, PartialEq)]
87pub enum ClientProfile {
88 Full,
90
91 Light,
93
94 Compute,
96
97 Mobile,
99}
100
101pub struct Client {
103 config: ClientConfig,
105
106 components: Arc<NetworkComponents>,
108
109 state: Arc<RwLock<ClientState>>,
111
112 subscription_rx: Arc<RwLock<mpsc::Receiver<SubscriptionMessage>>>,
114
115 subscription_tx: mpsc::Sender<SubscriptionMessage>,
117}
118
119struct NetworkComponents {
121 node_id: NodeId,
123
124 router: Arc<AdaptiveRouter>,
126
127 gossip: Arc<AdaptiveGossipSub>,
129
130 storage: Arc<ContentStore>,
132
133 retrieval: Arc<RetrievalManager>,
135
136 replication: Arc<ReplicationManager>,
138
139 churn: Arc<ChurnHandler>,
141
142 monitoring: Arc<MonitoringSystem>,
144}
145
146struct ClientState {
148 connected: bool,
150
151 node_info: Option<NodeInfo>,
153
154 subscriptions: HashMap<String, mpsc::Sender<Vec<u8>>>,
156}
157
158struct SubscriptionMessage {
160 topic: String,
162
163 data: Vec<u8>,
165}
166
167#[derive(Debug, Clone)]
169pub struct NodeInfo {
170 pub id: String,
172
173 pub addresses: Vec<String>,
175
176 pub capabilities: NodeCapabilities,
178
179 pub trust_score: f64,
181}
182
183#[derive(Debug, Clone)]
185pub struct NodeCapabilities {
186 pub storage_gb: u64,
188
189 pub compute_score: u64,
191
192 pub bandwidth_mbps: u64,
194}
195
196#[derive(Debug, Clone)]
198pub struct NetworkStats {
199 pub connected_peers: usize,
201
202 pub routing_success_rate: f64,
204
205 pub average_trust_score: f64,
207
208 pub cache_hit_rate: f64,
210
211 pub churn_rate: f64,
213
214 pub total_storage: u64,
216
217 pub total_bandwidth: u64,
219}
220
221#[derive(Debug, Clone)]
223pub struct ComputeJob {
224 pub id: String,
226
227 pub job_type: String,
229
230 pub input: Vec<u8>,
232
233 pub requirements: ResourceRequirements,
235}
236
237#[derive(Debug, Clone)]
239pub struct ResourceRequirements {
240 pub cpu_cores: u32,
242
243 pub memory_mb: u32,
245
246 pub max_duration: Duration,
248}
249
250pub type JobId = String;
252
253#[derive(Debug, Clone)]
255pub struct ComputeResult {
256 pub job_id: JobId,
258
259 pub output: Vec<u8>,
261
262 pub execution_time: Duration,
264
265 pub executor_node: String,
267}
268
269pub type MessageStream = Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;
271
272#[derive(Debug, thiserror::Error)]
274pub enum ClientError {
275 #[error("Connection error: {0}")]
276 Connection(String),
277
278 #[error("Storage error: {0}")]
279 Storage(String),
280
281 #[error("Retrieval error: {0}")]
282 Retrieval(String),
283
284 #[error("Messaging error: {0}")]
285 Messaging(String),
286
287 #[error("Network error: {0}")]
288 Network(String),
289
290 #[error("Timeout error")]
291 Timeout,
292
293 #[error("Not connected")]
294 NotConnected,
295
296 #[error("Other error: {0}")]
297 Other(String),
298}
299
300#[async_trait]
302pub trait AdaptiveP2PClient: Send + Sync {
303 async fn connect(config: ClientConfig) -> Result<Self>
305 where
306 Self: Sized;
307
308 async fn store(&self, data: Vec<u8>) -> Result<ContentHash>;
310 async fn retrieve(&self, hash: &ContentHash) -> Result<Vec<u8>>;
311 async fn delete(&self, hash: &ContentHash) -> Result<()>;
312
313 async fn submit_compute_job(&self, job: ComputeJob) -> Result<JobId>;
315 async fn get_job_result(&self, job_id: &JobId) -> Result<ComputeResult>;
316
317 async fn publish(&self, topic: &str, message: Vec<u8>) -> Result<()>;
319 async fn subscribe(&self, topic: &str) -> Result<MessageStream>;
320
321 async fn get_node_info(&self) -> Result<NodeInfo>;
323 async fn get_network_stats(&self) -> Result<NetworkStats>;
324
325 async fn disconnect(&self) -> Result<()>;
327}
328
329impl Client {
330 pub async fn new(config: ClientConfig) -> Result<Self> {
332 Self::new_with_monitoring(config, true).await
333 }
334
335 pub async fn new_with_monitoring(
337 config: ClientConfig,
338 enable_monitoring: bool,
339 ) -> Result<Self> {
340 let (subscription_tx, subscription_rx) = mpsc::channel(1000);
341
342 let components =
344 Self::initialize_components_with_monitoring(&config, enable_monitoring).await?;
345
346 let client = Self {
347 config,
348 components: Arc::new(components),
349 state: Arc::new(RwLock::new(ClientState {
350 connected: false,
351 node_info: None,
352 subscriptions: HashMap::new(),
353 })),
354 subscription_rx: Arc::new(RwLock::new(subscription_rx)),
355 subscription_tx,
356 };
357
358 Ok(client)
359 }
360
361 #[allow(dead_code)]
363 async fn initialize_components(config: &ClientConfig) -> Result<NetworkComponents> {
364 Self::initialize_components_with_monitoring(config, true).await
365 }
366
367 async fn initialize_components_with_monitoring(
369 config: &ClientConfig,
370 enable_monitoring: bool,
371 ) -> Result<NetworkComponents> {
372 let trust_provider = Arc::new(crate::adaptive::trust::MockTrustProvider::new());
374
375 let hyperbolic = Arc::new(crate::adaptive::hyperbolic::HyperbolicSpace::new());
377 let som = Arc::new(crate::adaptive::som::SelfOrganizingMap::new(
378 crate::adaptive::som::SomConfig {
379 initial_learning_rate: 0.3,
380 initial_radius: 5.0,
381 iterations: 1000,
382 grid_size: crate::adaptive::som::GridSize::Fixed(10, 10),
383 },
384 ));
385 let router = Arc::new(AdaptiveRouter::new(trust_provider.clone()));
386 let _hyperbolic = hyperbolic;
388 let _som = som;
389
390 let node_id = NodeId { hash: [0u8; 32] }; let gossip = Arc::new(AdaptiveGossipSub::new(
393 node_id.clone(),
394 trust_provider.clone(),
395 ));
396
397 let storage_config = match config.profile {
399 ClientProfile::Full => StorageConfig::default(),
400 ClientProfile::Light => StorageConfig {
401 cache_size: 10 * 1024 * 1024, ..Default::default()
403 },
404 ClientProfile::Compute => StorageConfig {
405 cache_size: 100 * 1024 * 1024, ..Default::default()
407 },
408 ClientProfile::Mobile => StorageConfig {
409 cache_size: 5 * 1024 * 1024, chunk_size: 256 * 1024, ..Default::default()
412 },
413 };
414 let storage = Arc::new(ContentStore::new(storage_config).await?);
415
416 let churn_predictor = Arc::new(crate::adaptive::learning::ChurnPredictor::new());
418 let replication = Arc::new(ReplicationManager::new(
419 Default::default(),
420 trust_provider.clone(),
421 churn_predictor.clone(),
422 router.clone(),
423 ));
424
425 let cache_manager = Arc::new(crate::adaptive::learning::QLearnCacheManager::new(
426 storage.get_config().cache_size,
427 ));
428 let retrieval = Arc::new(RetrievalManager::new(
429 router.clone(),
430 storage.clone(),
431 cache_manager.clone(),
432 ));
433
434 let churn = Arc::new(ChurnHandler::new(
435 node_id.clone(),
436 churn_predictor,
437 trust_provider.clone(),
438 replication.clone(),
439 router.clone(),
440 gossip.clone(),
441 Default::default(),
442 ));
443
444 #[cfg(feature = "metrics")]
447 let registry = Some(prometheus::Registry::new());
448 #[cfg(not(feature = "metrics"))]
449 let registry = None;
450
451 let monitoring = if enable_monitoring {
452 Arc::new(MonitoringSystem::new_with_registry(
453 crate::adaptive::monitoring::MonitoredComponents {
454 router: router.clone(),
455 churn_handler: churn.clone(),
456 gossip: gossip.clone(),
457 storage: storage.clone(),
458 replication: replication.clone(),
459 thompson: Arc::new(crate::adaptive::learning::ThompsonSampling::new()),
460 cache: cache_manager.clone(),
461 },
462 Default::default(),
463 registry,
464 )?)
465 } else {
466 #[cfg(feature = "metrics")]
468 let test_registry = Some(prometheus::Registry::new());
469 #[cfg(not(feature = "metrics"))]
470 let test_registry = None;
471
472 Arc::new(MonitoringSystem::new_with_registry(
473 crate::adaptive::monitoring::MonitoredComponents {
474 router: router.clone(),
475 churn_handler: churn.clone(),
476 gossip: gossip.clone(),
477 storage: storage.clone(),
478 replication: replication.clone(),
479 thompson: Arc::new(crate::adaptive::learning::ThompsonSampling::new()),
480 cache: cache_manager.clone(),
481 },
482 Default::default(),
483 test_registry,
484 )?)
485 };
486
487 Ok(NetworkComponents {
488 node_id,
489 router,
490 gossip,
491 storage,
492 retrieval,
493 replication,
494 churn,
495 monitoring,
496 })
497 }
498
499 async fn connect_to_node(&self, address: &str) -> Result<()> {
501 tokio::time::sleep(Duration::from_millis(100)).await;
504
505 let mut state = self.state.write().await;
506 state.connected = true;
507 state.node_info = Some(NodeInfo {
508 id: "node_123".to_string(),
509 addresses: vec![address.to_string()],
510 capabilities: NodeCapabilities {
511 storage_gb: 100,
512 compute_score: 1000,
513 bandwidth_mbps: 100,
514 },
515 trust_score: 1.0,
516 });
517
518 Ok(())
519 }
520
521 async fn handle_subscriptions(&self) {
523 let mut rx = self.subscription_rx.write().await;
524
525 while let Some(msg) = rx.recv().await {
526 let state = self.state.read().await;
527 if let Some(sender) = state.subscriptions.get(&msg.topic) {
528 let _ = sender.send(msg.data).await;
529 }
530 }
531 }
532}
533
534#[async_trait]
535impl AdaptiveP2PClient for Client {
536 async fn connect(config: ClientConfig) -> Result<Self> {
537 let client = Self::new(config.clone()).await?;
538
539 tokio::time::timeout(
541 config.connect_timeout,
542 client.connect_to_node(&config.node_address),
543 )
544 .await
545 .map_err(|_| ClientError::Timeout)?
546 .map_err(|e| ClientError::Connection(e.to_string()))?;
547
548 let client_clone = client.clone();
550 tokio::spawn(async move {
551 client_clone.handle_subscriptions().await;
552 });
553
554 client.components.monitoring.start().await;
556
557 client.components.churn.start_monitoring().await;
559
560 Ok(client)
561 }
562
563 async fn store(&self, data: Vec<u8>) -> Result<ContentHash> {
564 let state = self.state.read().await;
565 if !state.connected {
566 return Err(ClientError::NotConnected.into());
567 }
568
569 let metadata = crate::adaptive::storage::ContentMetadata {
571 size: data.len(),
572 content_type: crate::adaptive::ContentType::DataRetrieval,
573 created_at: std::time::SystemTime::now()
574 .duration_since(std::time::UNIX_EPOCH)
575 .map(|d| d.as_secs())
576 .unwrap_or(0),
577 chunk_count: None,
578 replication_factor: 8,
579 };
580
581 let hash = self
582 .components
583 .storage
584 .store(data.clone(), metadata.clone())
585 .await
586 .map_err(|e| ClientError::Storage(e.to_string()))?;
587
588 self.components
590 .replication
591 .replicate_content(&hash, &data, metadata)
592 .await
593 .map_err(|e| ClientError::Storage(format!("Replication failed: {e}")))?;
594
595 Ok(hash)
596 }
597
598 async fn retrieve(&self, hash: &ContentHash) -> Result<Vec<u8>> {
599 let state = self.state.read().await;
600 if !state.connected {
601 return Err(ClientError::NotConnected.into());
602 }
603
604 tokio::time::timeout(
606 self.config.request_timeout,
607 self.components.retrieval.retrieve(
608 hash,
609 crate::adaptive::retrieval::RetrievalStrategy::Parallel,
610 ),
611 )
612 .await
613 .map_err(|_| ClientError::Timeout)?
614 .map_err(|e| ClientError::Retrieval(e.to_string()).into())
615 }
616
617 async fn delete(&self, hash: &ContentHash) -> Result<()> {
618 let state = self.state.read().await;
619 if !state.connected {
620 return Err(ClientError::NotConnected.into());
621 }
622
623 self.components
625 .storage
626 .delete(hash)
627 .await
628 .map_err(|e| ClientError::Storage(e.to_string()).into())
629 }
630
631 async fn submit_compute_job(&self, _job: ComputeJob) -> Result<JobId> {
632 let state = self.state.read().await;
633 if !state.connected {
634 return Err(ClientError::NotConnected.into());
635 }
636
637 Ok(format!("job_{}", uuid::Uuid::new_v4()))
640 }
641
642 async fn get_job_result(&self, job_id: &JobId) -> Result<ComputeResult> {
643 let state = self.state.read().await;
644 if !state.connected {
645 return Err(ClientError::NotConnected.into());
646 }
647
648 Ok(ComputeResult {
651 job_id: job_id.clone(),
652 output: b"Mock compute result".to_vec(),
653 execution_time: Duration::from_secs(5),
654 executor_node: "node_456".to_string(),
655 })
656 }
657
658 async fn publish(&self, topic: &str, message: Vec<u8>) -> Result<()> {
659 let state = self.state.read().await;
660 if !state.connected {
661 return Err(ClientError::NotConnected.into());
662 }
663
664 use crate::adaptive::gossip::GossipMessage;
665
666 let gossip_msg = GossipMessage {
667 topic: topic.to_string(),
668 data: message,
669 from: self.components.node_id.clone(),
670 seqno: 0, timestamp: std::time::SystemTime::now()
672 .duration_since(std::time::UNIX_EPOCH)
673 .map(|d| d.as_secs())
674 .unwrap_or(0),
675 };
676
677 self.components
678 .gossip
679 .publish(topic, gossip_msg)
680 .await
681 .map_err(|e| ClientError::Messaging(e.to_string()).into())
682 }
683
684 async fn subscribe(&self, topic: &str) -> Result<MessageStream> {
685 let state = self.state.read().await;
686 if !state.connected {
687 return Err(ClientError::NotConnected.into());
688 }
689
690 self.components
692 .gossip
693 .subscribe(topic)
694 .await
695 .map_err(|e| ClientError::Messaging(e.to_string()))?;
696
697 let (tx, rx) = mpsc::channel(100);
699
700 let mut state = self.state.write().await;
701 state.subscriptions.insert(topic.to_string(), tx);
702
703 Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)))
704 }
705
706 async fn get_node_info(&self) -> Result<NodeInfo> {
707 let state = self.state.read().await;
708 if !state.connected {
709 return Err(ClientError::NotConnected.into());
710 }
711
712 state
713 .node_info
714 .clone()
715 .ok_or_else(|| ClientError::Other("Node info not available".to_string()).into())
716 }
717
718 async fn get_network_stats(&self) -> Result<NetworkStats> {
719 let state = self.state.read().await;
720 if !state.connected {
721 return Err(ClientError::NotConnected.into());
722 }
723
724 let health = self.components.monitoring.get_health().await;
726 let routing_stats = self.components.router.get_stats().await;
727 let storage_stats = self.components.storage.get_stats().await;
728 let gossip_stats = self.components.gossip.get_stats().await;
729
730 Ok(NetworkStats {
731 connected_peers: gossip_stats.peer_count,
732 routing_success_rate: routing_stats.success_rate(),
733 average_trust_score: health.score,
734 cache_hit_rate: 0.0, churn_rate: health.churn_rate,
736 total_storage: storage_stats.total_bytes,
737 total_bandwidth: 0, })
739 }
740
741 async fn disconnect(&self) -> Result<()> {
742 let mut state = self.state.write().await;
743 state.connected = false;
744
745 state.subscriptions.clear();
747
748 Ok(())
749 }
750}
751
752impl Clone for Client {
754 fn clone(&self) -> Self {
755 Self {
756 config: self.config.clone(),
757 components: self.components.clone(),
758 state: self.state.clone(),
759 subscription_rx: self.subscription_rx.clone(),
760 subscription_tx: self.subscription_tx.clone(),
761 }
762 }
763}
764
765pub async fn connect(address: &str) -> Result<Client> {
767 let config = ClientConfig {
768 node_address: address.to_string(),
769 ..Default::default()
770 };
771
772 Client::connect(config).await
773}
774
775pub async fn connect_with_profile(address: &str, profile: ClientProfile) -> Result<Client> {
777 let config = ClientConfig {
778 node_address: address.to_string(),
779 profile,
780 ..Default::default()
781 };
782
783 Client::connect(config).await
784}
785
786#[cfg(test)]
787mod tests {
788 use super::*;
789 use crate::adaptive::monitoring::MonitoringSystem;
790
791 #[tokio::test]
792 async fn test_client_creation() {
793 let client = new_test_client(ClientConfig::default()).await.unwrap();
794
795 let state = client.state.read().await;
797 assert!(!state.connected);
798 }
799
800 pub async fn new_test_client(config: ClientConfig) -> Result<Client> {
802 let (subscription_tx, subscription_rx) = mpsc::channel(1000);
803
804 let trust_provider = Arc::new(crate::adaptive::trust::MockTrustProvider::new());
806 let hyperbolic = Arc::new(crate::adaptive::hyperbolic::HyperbolicSpace::new());
807 let som = Arc::new(crate::adaptive::som::SelfOrganizingMap::new(
808 crate::adaptive::som::SomConfig {
809 initial_learning_rate: 0.3,
810 initial_radius: 5.0,
811 iterations: 1000,
812 grid_size: crate::adaptive::som::GridSize::Fixed(10, 10),
813 },
814 ));
815 let router = Arc::new(AdaptiveRouter::new(trust_provider.clone()));
816 let _hyperbolic = hyperbolic;
818 let _som = som;
819
820 let node_id = NodeId { hash: [0u8; 32] };
821 let gossip = Arc::new(AdaptiveGossipSub::new(
822 node_id.clone(),
823 trust_provider.clone(),
824 ));
825
826 let storage_config = StorageConfig {
828 cache_size: 1024 * 1024, ..Default::default()
830 };
831 let storage = Arc::new(ContentStore::new(storage_config).await?);
832
833 let churn_predictor = Arc::new(crate::adaptive::learning::ChurnPredictor::new());
834 let replication = Arc::new(ReplicationManager::new(
835 Default::default(),
836 trust_provider.clone(),
837 churn_predictor.clone(),
838 router.clone(),
839 ));
840
841 let cache = Arc::new(crate::adaptive::learning::QLearnCacheManager::new(
842 storage.get_config().cache_size,
843 ));
844 let retrieval = Arc::new(RetrievalManager::new(
845 router.clone(),
846 storage.clone(),
847 cache.clone(),
848 ));
849
850 let churn = Arc::new(ChurnHandler::new(
851 node_id.clone(),
852 churn_predictor,
853 trust_provider.clone(),
854 replication.clone(),
855 router.clone(),
856 gossip.clone(),
857 Default::default(),
858 ));
859
860 let thompson = Arc::new(crate::adaptive::learning::ThompsonSampling::new());
862
863 #[cfg(feature = "metrics")]
865 let test_registry = Some(prometheus::Registry::new());
866 #[cfg(not(feature = "metrics"))]
867 let test_registry = None;
868
869 let monitoring = Arc::new(
870 MonitoringSystem::new_with_registry(
871 crate::adaptive::monitoring::MonitoredComponents {
872 router: router.clone(),
873 churn_handler: churn.clone(),
874 gossip: gossip.clone(),
875 storage: storage.clone(),
876 replication: replication.clone(),
877 thompson: thompson.clone(),
878 cache: cache.clone(),
879 },
880 crate::adaptive::monitoring::MonitoringConfig::default(),
881 test_registry,
882 )
883 .expect("Failed to create monitoring system for tests"),
884 );
885
886 let components = NetworkComponents {
887 node_id,
888 router,
889 gossip,
890 storage,
891 retrieval,
892 replication,
893 churn,
894 monitoring,
895 };
896
897 let client = Client {
898 config,
899 components: Arc::new(components),
900 state: Arc::new(RwLock::new(ClientState {
901 connected: false,
902 node_info: None,
903 subscriptions: HashMap::new(),
904 })),
905 subscription_rx: Arc::new(RwLock::new(subscription_rx)),
906 subscription_tx,
907 };
908
909 Ok(client)
910 }
911
912 #[tokio::test]
913 async fn test_client_connect() {
914 let client = new_test_client(ClientConfig::default()).await.unwrap();
915
916 client.connect_to_node("127.0.0.1:8000").await.unwrap();
918
919 let state = client.state.read().await;
921 assert!(state.connected);
922 assert!(state.node_info.is_some());
923 }
924
925 #[tokio::test]
926 async fn test_storage_operations() {
927 let client = new_test_client(ClientConfig::default()).await.unwrap();
928
929 client.connect_to_node("127.0.0.1:8000").await.unwrap();
931
932 let data = b"Hello, P2P world!".to_vec();
934 let hash = client.store(data.clone()).await.unwrap();
935
936 let retrieved = client.retrieve(&hash).await.unwrap();
938 assert_eq!(retrieved, data);
939 }
940
941 #[tokio::test]
942 async fn test_not_connected_error() {
943 let client = new_test_client(ClientConfig::default()).await.unwrap();
944
945 let result = client.store(vec![1, 2, 3]).await;
947 assert!(result.is_err());
948 assert!(result.unwrap_err().to_string().contains("Not connected"));
949 }
950
951 #[tokio::test]
952 async fn test_network_stats() {
953 let client = new_test_client(ClientConfig::default()).await.unwrap();
954
955 client.connect_to_node("127.0.0.1:8000").await.unwrap();
957
958 let stats = client.get_network_stats().await.unwrap();
959 assert!(stats.routing_success_rate >= 0.0);
960 assert!(stats.routing_success_rate <= 1.0);
961 }
962
963 #[tokio::test]
964 async fn test_client_profiles() {
965 for profile in [
967 ClientProfile::Full,
968 ClientProfile::Light,
969 ClientProfile::Compute,
970 ClientProfile::Mobile,
971 ] {
972 let config = ClientConfig {
973 profile,
974 ..Default::default()
975 };
976 match Client::connect(config).await {
977 Ok(client) => {
978 let info = client.get_node_info().await.unwrap();
979 assert!(!info.id.is_empty());
980 }
981 Err(e) => {
982 let es = format!("{}", e);
984 if es.contains("Duplicate") && es.contains("registration") {
985 continue;
986 }
987 panic!("Client::connect failed: {}", es);
988 }
989 }
990 }
991 }
992
993 #[tokio::test]
994 async fn test_pubsub_messaging() {
995 let client = new_test_client(ClientConfig::default()).await.unwrap();
996
997 client.connect_to_node("127.0.0.1:8000").await.unwrap();
999
1000 let _stream = client.subscribe("test_topic").await.unwrap();
1002
1003 let message = b"Test message".to_vec();
1005 client.publish("test_topic", message.clone()).await.unwrap();
1006
1007 }
1010
1011 #[tokio::test]
1012 async fn test_compute_job() {
1013 let client = new_test_client(ClientConfig::default()).await.unwrap();
1014
1015 client.connect_to_node("127.0.0.1:8000").await.unwrap();
1017
1018 let job = ComputeJob {
1019 id: "test_job".to_string(),
1020 job_type: "map_reduce".to_string(),
1021 input: b"Input data".to_vec(),
1022 requirements: ResourceRequirements {
1023 cpu_cores: 2,
1024 memory_mb: 1024,
1025 max_duration: Duration::from_secs(60),
1026 },
1027 };
1028
1029 let job_id = client.submit_compute_job(job).await.unwrap();
1030 assert!(!job_id.is_empty());
1031
1032 let result = client.get_job_result(&job_id).await.unwrap();
1033 assert_eq!(result.job_id, job_id);
1034 }
1035}