1use crate::adaptive::{
24 AdaptiveGossipSub, AdaptiveRouter, ChurnHandler, ContentHash, ContentStore, MonitoringSystem,
25 NodeId, ReplicationManager, RetrievalManager, StorageConfig,
26};
27use anyhow::Result;
28use async_trait::async_trait;
29use futures::Stream;
30use std::collections::HashMap;
31use std::pin::Pin;
32use std::{sync::Arc, time::Duration};
33use tokio::sync::{RwLock, mpsc};
34
35#[derive(Debug, Clone)]
37pub struct ClientConfig {
38 pub node_address: String,
40
41 pub connect_timeout: Duration,
43
44 pub request_timeout: Duration,
46
47 pub debug_logging: bool,
49
50 pub profile: ClientProfile,
52}
53
54impl Default for ClientConfig {
55 fn default() -> Self {
56 let global_config = crate::config::Config::default();
58
59 let node_address = global_config.network.listen_address.clone();
61
62 Self {
63 node_address,
64 connect_timeout: Duration::from_secs(global_config.network.connection_timeout),
65 request_timeout: Duration::from_secs(30),
66 debug_logging: false,
67 profile: ClientProfile::Full,
68 }
69 }
70}
71
72impl ClientConfig {
73 pub fn from_global_config(config: &crate::config::Config) -> Self {
75 Self {
76 node_address: config.network.listen_address.clone(),
77 connect_timeout: Duration::from_secs(config.network.connection_timeout),
78 request_timeout: Duration::from_secs(30),
79 debug_logging: false,
80 profile: ClientProfile::Full,
81 }
82 }
83}
84
85#[derive(Debug, Clone, Copy, PartialEq)]
87pub enum ClientProfile {
88 Full,
90
91 Light,
93
94 Compute,
96
97 Mobile,
99}
100
101pub struct Client {
103 config: ClientConfig,
105
106 components: Arc<NetworkComponents>,
108
109 state: Arc<RwLock<ClientState>>,
111
112 subscription_rx: Arc<RwLock<mpsc::Receiver<SubscriptionMessage>>>,
114
115 subscription_tx: mpsc::Sender<SubscriptionMessage>,
117}
118
119struct NetworkComponents {
121 node_id: NodeId,
123
124 router: Arc<AdaptiveRouter>,
126
127 gossip: Arc<AdaptiveGossipSub>,
129
130 storage: Arc<ContentStore>,
132
133 retrieval: Arc<RetrievalManager>,
135
136 replication: Arc<ReplicationManager>,
138
139 churn: Arc<ChurnHandler>,
141
142 monitoring: Arc<MonitoringSystem>,
144}
145
146struct ClientState {
148 connected: bool,
150
151 node_info: Option<NodeInfo>,
153
154 subscriptions: HashMap<String, mpsc::Sender<Vec<u8>>>,
156}
157
158struct SubscriptionMessage {
160 topic: String,
162
163 data: Vec<u8>,
165}
166
167#[derive(Debug, Clone)]
169pub struct NodeInfo {
170 pub id: String,
172
173 pub addresses: Vec<String>,
175
176 pub capabilities: NodeCapabilities,
178
179 pub trust_score: f64,
181}
182
183#[derive(Debug, Clone)]
185pub struct NodeCapabilities {
186 pub storage_gb: u64,
188
189 pub compute_score: u64,
191
192 pub bandwidth_mbps: u64,
194}
195
196#[derive(Debug, Clone)]
198pub struct NetworkStats {
199 pub connected_peers: usize,
201
202 pub routing_success_rate: f64,
204
205 pub average_trust_score: f64,
207
208 pub cache_hit_rate: f64,
210
211 pub churn_rate: f64,
213
214 pub total_storage: u64,
216
217 pub total_bandwidth: u64,
219}
220
221#[derive(Debug, Clone)]
223pub struct ComputeJob {
224 pub id: String,
226
227 pub job_type: String,
229
230 pub input: Vec<u8>,
232
233 pub requirements: ResourceRequirements,
235}
236
237#[derive(Debug, Clone)]
239pub struct ResourceRequirements {
240 pub cpu_cores: u32,
242
243 pub memory_mb: u32,
245
246 pub max_duration: Duration,
248}
249
250pub type JobId = String;
252
253#[derive(Debug, Clone)]
255pub struct ComputeResult {
256 pub job_id: JobId,
258
259 pub output: Vec<u8>,
261
262 pub execution_time: Duration,
264
265 pub executor_node: String,
267}
268
269pub type MessageStream = Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;
271
272#[derive(Debug, thiserror::Error)]
274pub enum ClientError {
275 #[error("Connection error: {0}")]
276 Connection(String),
277
278 #[error("Storage error: {0}")]
279 Storage(String),
280
281 #[error("Retrieval error: {0}")]
282 Retrieval(String),
283
284 #[error("Messaging error: {0}")]
285 Messaging(String),
286
287 #[error("Network error: {0}")]
288 Network(String),
289
290 #[error("Timeout error")]
291 Timeout,
292
293 #[error("Not connected")]
294 NotConnected,
295
296 #[error("Other error: {0}")]
297 Other(String),
298}
299
300#[async_trait]
302pub trait AdaptiveP2PClient: Send + Sync {
303 async fn connect(config: ClientConfig) -> Result<Self>
305 where
306 Self: Sized;
307
308 async fn store(&self, data: Vec<u8>) -> Result<ContentHash>;
310 async fn retrieve(&self, hash: &ContentHash) -> Result<Vec<u8>>;
311 async fn delete(&self, hash: &ContentHash) -> Result<()>;
312
313 async fn submit_compute_job(&self, job: ComputeJob) -> Result<JobId>;
315 async fn get_job_result(&self, job_id: &JobId) -> Result<ComputeResult>;
316
317 async fn publish(&self, topic: &str, message: Vec<u8>) -> Result<()>;
319 async fn subscribe(&self, topic: &str) -> Result<MessageStream>;
320
321 async fn get_node_info(&self) -> Result<NodeInfo>;
323 async fn get_network_stats(&self) -> Result<NetworkStats>;
324
325 async fn disconnect(&self) -> Result<()>;
327}
328
329impl Client {
330 pub async fn new(config: ClientConfig) -> Result<Self> {
332 let (subscription_tx, subscription_rx) = mpsc::channel(1000);
333
334 let components = Self::initialize_components(&config).await?;
336
337 let client = Self {
338 config,
339 components: Arc::new(components),
340 state: Arc::new(RwLock::new(ClientState {
341 connected: false,
342 node_info: None,
343 subscriptions: HashMap::new(),
344 })),
345 subscription_rx: Arc::new(RwLock::new(subscription_rx)),
346 subscription_tx,
347 };
348
349 Ok(client)
350 }
351
352 async fn initialize_components(config: &ClientConfig) -> Result<NetworkComponents> {
354 let trust_provider = Arc::new(crate::adaptive::trust::MockTrustProvider::new());
356
357 let hyperbolic = Arc::new(crate::adaptive::hyperbolic::HyperbolicSpace::new());
359 let som = Arc::new(crate::adaptive::som::SelfOrganizingMap::new(
360 crate::adaptive::som::SomConfig {
361 initial_learning_rate: 0.3,
362 initial_radius: 5.0,
363 iterations: 1000,
364 grid_size: crate::adaptive::som::GridSize::Fixed(10, 10),
365 },
366 ));
367 let router = Arc::new(AdaptiveRouter::new(trust_provider.clone(), hyperbolic, som));
368
369 let node_id = NodeId { hash: [0u8; 32] }; let gossip = Arc::new(AdaptiveGossipSub::new(
372 node_id.clone(),
373 trust_provider.clone(),
374 ));
375
376 let storage_config = match config.profile {
378 ClientProfile::Full => StorageConfig::default(),
379 ClientProfile::Light => StorageConfig {
380 cache_size: 10 * 1024 * 1024, ..Default::default()
382 },
383 ClientProfile::Compute => StorageConfig {
384 cache_size: 100 * 1024 * 1024, ..Default::default()
386 },
387 ClientProfile::Mobile => StorageConfig {
388 cache_size: 5 * 1024 * 1024, chunk_size: 256 * 1024, ..Default::default()
391 },
392 };
393 let storage = Arc::new(ContentStore::new(storage_config).await?);
394
395 let churn_predictor = Arc::new(crate::adaptive::learning::ChurnPredictor::new());
397 let replication = Arc::new(ReplicationManager::new(
398 Default::default(),
399 trust_provider.clone(),
400 churn_predictor.clone(),
401 router.clone(),
402 ));
403
404 let cache_manager = Arc::new(crate::adaptive::learning::QLearnCacheManager::new(
405 storage.get_config().cache_size,
406 ));
407 let retrieval = Arc::new(RetrievalManager::new(
408 router.clone(),
409 storage.clone(),
410 cache_manager.clone(),
411 ));
412
413 let churn = Arc::new(ChurnHandler::new(
414 node_id.clone(),
415 churn_predictor,
416 trust_provider.clone(),
417 replication.clone(),
418 router.clone(),
419 gossip.clone(),
420 Default::default(),
421 ));
422
423 let monitoring = Arc::new(MonitoringSystem::new(
424 crate::adaptive::monitoring::MonitoredComponents {
425 router: router.clone(),
426 churn_handler: churn.clone(),
427 gossip: gossip.clone(),
428 storage: storage.clone(),
429 replication: replication.clone(),
430 thompson: Arc::new(crate::adaptive::learning::ThompsonSampling::new()),
431 cache: cache_manager,
432 },
433 Default::default(),
434 )?);
435
436 Ok(NetworkComponents {
437 node_id,
438 router,
439 gossip,
440 storage,
441 retrieval,
442 replication,
443 churn,
444 monitoring,
445 })
446 }
447
448 async fn connect_to_node(&self, address: &str) -> Result<()> {
450 tokio::time::sleep(Duration::from_millis(100)).await;
453
454 let mut state = self.state.write().await;
455 state.connected = true;
456 state.node_info = Some(NodeInfo {
457 id: "node_123".to_string(),
458 addresses: vec![address.to_string()],
459 capabilities: NodeCapabilities {
460 storage_gb: 100,
461 compute_score: 1000,
462 bandwidth_mbps: 100,
463 },
464 trust_score: 1.0,
465 });
466
467 Ok(())
468 }
469
470 async fn handle_subscriptions(&self) {
472 let mut rx = self.subscription_rx.write().await;
473
474 while let Some(msg) = rx.recv().await {
475 let state = self.state.read().await;
476 if let Some(sender) = state.subscriptions.get(&msg.topic) {
477 let _ = sender.send(msg.data).await;
478 }
479 }
480 }
481}
482
483#[async_trait]
484impl AdaptiveP2PClient for Client {
485 async fn connect(config: ClientConfig) -> Result<Self> {
486 let client = Self::new(config.clone()).await?;
487
488 tokio::time::timeout(
490 config.connect_timeout,
491 client.connect_to_node(&config.node_address),
492 )
493 .await
494 .map_err(|_| ClientError::Timeout)?
495 .map_err(|e| ClientError::Connection(e.to_string()))?;
496
497 let client_clone = client.clone();
499 tokio::spawn(async move {
500 client_clone.handle_subscriptions().await;
501 });
502
503 client.components.monitoring.start().await;
505
506 client.components.churn.start_monitoring().await;
508
509 Ok(client)
510 }
511
512 async fn store(&self, data: Vec<u8>) -> Result<ContentHash> {
513 let state = self.state.read().await;
514 if !state.connected {
515 return Err(ClientError::NotConnected.into());
516 }
517
518 let metadata = crate::adaptive::storage::ContentMetadata {
520 size: data.len(),
521 content_type: crate::adaptive::ContentType::DataRetrieval,
522 created_at: std::time::SystemTime::now()
523 .duration_since(std::time::UNIX_EPOCH)
524 .map(|d| d.as_secs())
525 .unwrap_or(0),
526 chunk_count: None,
527 replication_factor: 8,
528 };
529
530 let hash = self
531 .components
532 .storage
533 .store(data.clone(), metadata.clone())
534 .await
535 .map_err(|e| ClientError::Storage(e.to_string()))?;
536
537 self.components
539 .replication
540 .replicate_content(&hash, &data, metadata)
541 .await
542 .map_err(|e| ClientError::Storage(format!("Replication failed: {e}")))?;
543
544 Ok(hash)
545 }
546
547 async fn retrieve(&self, hash: &ContentHash) -> Result<Vec<u8>> {
548 let state = self.state.read().await;
549 if !state.connected {
550 return Err(ClientError::NotConnected.into());
551 }
552
553 tokio::time::timeout(
555 self.config.request_timeout,
556 self.components.retrieval.retrieve(
557 hash,
558 crate::adaptive::retrieval::RetrievalStrategy::Parallel,
559 ),
560 )
561 .await
562 .map_err(|_| ClientError::Timeout)?
563 .map_err(|e| ClientError::Retrieval(e.to_string()).into())
564 }
565
566 async fn delete(&self, hash: &ContentHash) -> Result<()> {
567 let state = self.state.read().await;
568 if !state.connected {
569 return Err(ClientError::NotConnected.into());
570 }
571
572 self.components
574 .storage
575 .delete(hash)
576 .await
577 .map_err(|e| ClientError::Storage(e.to_string()).into())
578 }
579
580 async fn submit_compute_job(&self, _job: ComputeJob) -> Result<JobId> {
581 let state = self.state.read().await;
582 if !state.connected {
583 return Err(ClientError::NotConnected.into());
584 }
585
586 Ok(format!("job_{}", uuid::Uuid::new_v4()))
589 }
590
591 async fn get_job_result(&self, job_id: &JobId) -> Result<ComputeResult> {
592 let state = self.state.read().await;
593 if !state.connected {
594 return Err(ClientError::NotConnected.into());
595 }
596
597 Ok(ComputeResult {
600 job_id: job_id.clone(),
601 output: b"Mock compute result".to_vec(),
602 execution_time: Duration::from_secs(5),
603 executor_node: "node_456".to_string(),
604 })
605 }
606
607 async fn publish(&self, topic: &str, message: Vec<u8>) -> Result<()> {
608 let state = self.state.read().await;
609 if !state.connected {
610 return Err(ClientError::NotConnected.into());
611 }
612
613 use crate::adaptive::gossip::GossipMessage;
614
615 let gossip_msg = GossipMessage {
616 topic: topic.to_string(),
617 data: message,
618 from: self.components.node_id.clone(),
619 seqno: 0, timestamp: std::time::SystemTime::now()
621 .duration_since(std::time::UNIX_EPOCH)
622 .map(|d| d.as_secs())
623 .unwrap_or(0),
624 };
625
626 self.components
627 .gossip
628 .publish(topic, gossip_msg)
629 .await
630 .map_err(|e| ClientError::Messaging(e.to_string()).into())
631 }
632
633 async fn subscribe(&self, topic: &str) -> Result<MessageStream> {
634 let state = self.state.read().await;
635 if !state.connected {
636 return Err(ClientError::NotConnected.into());
637 }
638
639 self.components
641 .gossip
642 .subscribe(topic)
643 .await
644 .map_err(|e| ClientError::Messaging(e.to_string()))?;
645
646 let (tx, rx) = mpsc::channel(100);
648
649 let mut state = self.state.write().await;
650 state.subscriptions.insert(topic.to_string(), tx);
651
652 Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)))
653 }
654
655 async fn get_node_info(&self) -> Result<NodeInfo> {
656 let state = self.state.read().await;
657 if !state.connected {
658 return Err(ClientError::NotConnected.into());
659 }
660
661 state
662 .node_info
663 .clone()
664 .ok_or_else(|| ClientError::Other("Node info not available".to_string()).into())
665 }
666
667 async fn get_network_stats(&self) -> Result<NetworkStats> {
668 let state = self.state.read().await;
669 if !state.connected {
670 return Err(ClientError::NotConnected.into());
671 }
672
673 let health = self.components.monitoring.get_health().await;
675 let routing_stats = self.components.router.get_stats().await;
676 let storage_stats = self.components.storage.get_stats().await;
677 let gossip_stats = self.components.gossip.get_stats().await;
678
679 Ok(NetworkStats {
680 connected_peers: gossip_stats.peer_count,
681 routing_success_rate: routing_stats.success_rate(),
682 average_trust_score: health.score,
683 cache_hit_rate: 0.0, churn_rate: health.churn_rate,
685 total_storage: storage_stats.total_bytes,
686 total_bandwidth: 0, })
688 }
689
690 async fn disconnect(&self) -> Result<()> {
691 let mut state = self.state.write().await;
692 state.connected = false;
693
694 state.subscriptions.clear();
696
697 Ok(())
698 }
699}
700
701impl Clone for Client {
703 fn clone(&self) -> Self {
704 Self {
705 config: self.config.clone(),
706 components: self.components.clone(),
707 state: self.state.clone(),
708 subscription_rx: self.subscription_rx.clone(),
709 subscription_tx: self.subscription_tx.clone(),
710 }
711 }
712}
713
714pub async fn connect(address: &str) -> Result<Client> {
716 let config = ClientConfig {
717 node_address: address.to_string(),
718 ..Default::default()
719 };
720
721 Client::connect(config).await
722}
723
724pub async fn connect_with_profile(address: &str, profile: ClientProfile) -> Result<Client> {
726 let config = ClientConfig {
727 node_address: address.to_string(),
728 profile,
729 ..Default::default()
730 };
731
732 Client::connect(config).await
733}
734
735#[cfg(test)]
736mod tests {
737 use super::*;
738
739 #[tokio::test]
740 async fn test_client_creation() {
741 let config = ClientConfig::default();
742 let client = Client::new(config).await.unwrap();
743
744 let state = client.state.read().await;
746 assert!(!state.connected);
747 }
748
749 async fn test_client() -> Result<Client> {
751 let config = ClientConfig::default();
752 Client::connect(config).await
753 }
754
755 #[tokio::test]
756 async fn test_client_connect() {
757 let client = test_client().await.unwrap();
758
759 let state = client.state.read().await;
761 assert!(state.connected);
762 assert!(state.node_info.is_some());
763 }
764
765 #[tokio::test]
766 async fn test_storage_operations() {
767 let client = test_client().await.unwrap();
768
769 let data = b"Hello, P2P world!".to_vec();
771 let hash = client.store(data.clone()).await.unwrap();
772
773 let retrieved = client.retrieve(&hash).await.unwrap();
775 assert_eq!(retrieved, data);
776 }
777
778 #[tokio::test]
779 async fn test_not_connected_error() {
780 let config = ClientConfig::default();
781 let client = Client::new(config).await.unwrap();
782
783 let result = client.store(vec![1, 2, 3]).await;
785 assert!(matches!(
786 result.unwrap_err().downcast_ref::<ClientError>(),
787 Some(ClientError::NotConnected)
788 ));
789 }
790
791 #[tokio::test]
792 async fn test_network_stats() {
793 let client = test_client().await.unwrap();
794
795 let stats = client.get_network_stats().await.unwrap();
796 assert!(stats.routing_success_rate >= 0.0);
797 assert!(stats.routing_success_rate <= 1.0);
798 }
799
800 #[tokio::test]
801 async fn test_client_profiles() {
802 for profile in [
804 ClientProfile::Full,
805 ClientProfile::Light,
806 ClientProfile::Compute,
807 ClientProfile::Mobile,
808 ] {
809 let config = ClientConfig {
810 profile,
811 ..Default::default()
812 };
813 let client = Client::connect(config).await.unwrap();
814 let info = client.get_node_info().await.unwrap();
815 assert!(!info.id.is_empty());
816 }
817 }
818
819 #[tokio::test]
820 async fn test_pubsub_messaging() {
821 let client = test_client().await.unwrap();
822
823 let _stream = client.subscribe("test_topic").await.unwrap();
825
826 let message = b"Test message".to_vec();
828 client.publish("test_topic", message.clone()).await.unwrap();
829
830 }
833
834 #[tokio::test]
835 async fn test_compute_job() {
836 let client = test_client().await.unwrap();
837
838 let job = ComputeJob {
839 id: "test_job".to_string(),
840 job_type: "map_reduce".to_string(),
841 input: b"Input data".to_vec(),
842 requirements: ResourceRequirements {
843 cpu_cores: 2,
844 memory_mb: 1024,
845 max_duration: Duration::from_secs(60),
846 },
847 };
848
849 let job_id = client.submit_compute_job(job).await.unwrap();
850 assert!(!job_id.is_empty());
851
852 let result = client.get_job_result(&job_id).await.unwrap();
853 assert_eq!(result.job_id, job_id);
854 }
855}