saorsa_core/adaptive/
client.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Client library and API for the adaptive P2P network
15//!
16//! This module provides a simple, ergonomic async API for applications to interact
17//! with the adaptive P2P network. It abstracts away the complexity of the underlying
18//! distributed systems and provides straightforward methods for:
19//! - Content storage and retrieval
20//! - Pub/sub messaging
21//! - Network statistics and monitoring
22
23use crate::adaptive::{
24    AdaptiveGossipSub, AdaptiveRouter, ChurnHandler, ContentHash, ContentStore, MonitoringSystem,
25    NodeId, ReplicationManager, RetrievalManager, StorageConfig,
26};
27use anyhow::Result;
28use async_trait::async_trait;
29use futures::Stream;
30use std::collections::HashMap;
31use std::pin::Pin;
32use std::{sync::Arc, time::Duration};
33use tokio::sync::{RwLock, mpsc};
34
35/// Client configuration
36#[derive(Debug, Clone)]
37pub struct ClientConfig {
38    /// Node address to connect to
39    pub node_address: String,
40
41    /// Connection timeout
42    pub connect_timeout: Duration,
43
44    /// Request timeout
45    pub request_timeout: Duration,
46
47    /// Enable debug logging
48    pub debug_logging: bool,
49
50    /// Client profile (full, light, compute, mobile)
51    pub profile: ClientProfile,
52}
53
54impl Default for ClientConfig {
55    fn default() -> Self {
56        // Load global config for defaults
57        let global_config = crate::config::Config::default();
58
59        // Use the global listen address as default node address
60        let node_address = global_config.network.listen_address.clone();
61
62        Self {
63            node_address,
64            connect_timeout: Duration::from_secs(global_config.network.connection_timeout),
65            request_timeout: Duration::from_secs(30),
66            debug_logging: false,
67            profile: ClientProfile::Full,
68        }
69    }
70}
71
72impl ClientConfig {
73    /// Create ClientConfig from global Config
74    pub fn from_global_config(config: &crate::config::Config) -> Self {
75        Self {
76            node_address: config.network.listen_address.clone(),
77            connect_timeout: Duration::from_secs(config.network.connection_timeout),
78            request_timeout: Duration::from_secs(30),
79            debug_logging: false,
80            profile: ClientProfile::Full,
81        }
82    }
83}
84
85/// Client profile for different deployment scenarios
86#[derive(Debug, Clone, Copy, PartialEq)]
87pub enum ClientProfile {
88    /// Full node with all capabilities
89    Full,
90
91    /// Light node with routing only
92    Light,
93
94    /// Compute-optimized node
95    Compute,
96
97    /// Mobile node with reduced parameters
98    Mobile,
99}
100
101/// Simple async client for the adaptive P2P network
102pub struct Client {
103    /// Client configuration
104    config: ClientConfig,
105
106    /// Network components
107    components: Arc<NetworkComponents>,
108
109    /// Client state
110    state: Arc<RwLock<ClientState>>,
111
112    /// Message receiver for subscriptions
113    subscription_rx: Arc<RwLock<mpsc::Receiver<SubscriptionMessage>>>,
114
115    /// Subscription sender
116    subscription_tx: mpsc::Sender<SubscriptionMessage>,
117}
118
119/// Internal network components
120struct NetworkComponents {
121    /// Node ID
122    node_id: NodeId,
123
124    /// Adaptive router
125    router: Arc<AdaptiveRouter>,
126
127    /// Gossip protocol
128    gossip: Arc<AdaptiveGossipSub>,
129
130    /// Content store
131    storage: Arc<ContentStore>,
132
133    /// Retrieval manager
134    retrieval: Arc<RetrievalManager>,
135
136    /// Replication manager
137    replication: Arc<ReplicationManager>,
138
139    /// Churn handler
140    churn: Arc<ChurnHandler>,
141
142    /// Monitoring system
143    monitoring: Arc<MonitoringSystem>,
144}
145
146/// Client state
147struct ClientState {
148    /// Connected status
149    connected: bool,
150
151    /// Local node information
152    node_info: Option<NodeInfo>,
153
154    /// Active subscriptions
155    subscriptions: HashMap<String, mpsc::Sender<Vec<u8>>>,
156}
157
158/// Subscription message
159struct SubscriptionMessage {
160    /// Topic
161    topic: String,
162
163    /// Message data
164    data: Vec<u8>,
165}
166
167/// Node information
168#[derive(Debug, Clone)]
169pub struct NodeInfo {
170    /// Node ID
171    pub id: String,
172
173    /// Network addresses
174    pub addresses: Vec<String>,
175
176    /// Node capabilities
177    pub capabilities: NodeCapabilities,
178
179    /// Trust score
180    pub trust_score: f64,
181}
182
183/// Node capabilities
184#[derive(Debug, Clone)]
185pub struct NodeCapabilities {
186    /// Storage capacity in GB
187    pub storage_gb: u64,
188
189    /// Compute benchmark score
190    pub compute_score: u64,
191
192    /// Available bandwidth in Mbps
193    pub bandwidth_mbps: u64,
194}
195
196/// Network statistics
197#[derive(Debug, Clone)]
198pub struct NetworkStats {
199    /// Number of connected peers
200    pub connected_peers: usize,
201
202    /// Routing success rate
203    pub routing_success_rate: f64,
204
205    /// Average trust score
206    pub average_trust_score: f64,
207
208    /// Cache hit rate
209    pub cache_hit_rate: f64,
210
211    /// Current churn rate
212    pub churn_rate: f64,
213
214    /// Total storage in bytes
215    pub total_storage: u64,
216
217    /// Total bandwidth in bytes/sec
218    pub total_bandwidth: u64,
219}
220
221/// Compute job for distributed processing
222#[derive(Debug, Clone)]
223pub struct ComputeJob {
224    /// Job ID
225    pub id: String,
226
227    /// Job type
228    pub job_type: String,
229
230    /// Input data
231    pub input: Vec<u8>,
232
233    /// Resource requirements
234    pub requirements: ResourceRequirements,
235}
236
237/// Resource requirements for compute jobs
238#[derive(Debug, Clone)]
239pub struct ResourceRequirements {
240    /// Minimum CPU cores
241    pub cpu_cores: u32,
242
243    /// Minimum memory in MB
244    pub memory_mb: u32,
245
246    /// Maximum execution time
247    pub max_duration: Duration,
248}
249
250/// Job ID type
251pub type JobId = String;
252
253/// Compute result
254#[derive(Debug, Clone)]
255pub struct ComputeResult {
256    /// Job ID
257    pub job_id: JobId,
258
259    /// Result data
260    pub output: Vec<u8>,
261
262    /// Execution time
263    pub execution_time: Duration,
264
265    /// Node that executed the job
266    pub executor_node: String,
267}
268
269/// Message stream for subscriptions
270pub type MessageStream = Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;
271
272/// Client error type
273#[derive(Debug, thiserror::Error)]
274pub enum ClientError {
275    #[error("Connection error: {0}")]
276    Connection(String),
277
278    #[error("Storage error: {0}")]
279    Storage(String),
280
281    #[error("Retrieval error: {0}")]
282    Retrieval(String),
283
284    #[error("Messaging error: {0}")]
285    Messaging(String),
286
287    #[error("Network error: {0}")]
288    Network(String),
289
290    #[error("Timeout error")]
291    Timeout,
292
293    #[error("Not connected")]
294    NotConnected,
295
296    #[error("Other error: {0}")]
297    Other(String),
298}
299
300/// Main client trait
301#[async_trait]
302pub trait AdaptiveP2PClient: Send + Sync {
303    /// Connect to the network
304    async fn connect(config: ClientConfig) -> Result<Self>
305    where
306        Self: Sized;
307
308    /// Storage operations
309    async fn store(&self, data: Vec<u8>) -> Result<ContentHash>;
310    async fn retrieve(&self, hash: &ContentHash) -> Result<Vec<u8>>;
311    async fn delete(&self, hash: &ContentHash) -> Result<()>;
312
313    /// Computation operations
314    async fn submit_compute_job(&self, job: ComputeJob) -> Result<JobId>;
315    async fn get_job_result(&self, job_id: &JobId) -> Result<ComputeResult>;
316
317    /// Messaging operations
318    async fn publish(&self, topic: &str, message: Vec<u8>) -> Result<()>;
319    async fn subscribe(&self, topic: &str) -> Result<MessageStream>;
320
321    /// Network information
322    async fn get_node_info(&self) -> Result<NodeInfo>;
323    async fn get_network_stats(&self) -> Result<NetworkStats>;
324
325    /// Disconnect from the network
326    async fn disconnect(&self) -> Result<()>;
327}
328
329impl Client {
330    /// Create a new client
331    pub async fn new(config: ClientConfig) -> Result<Self> {
332        Self::new_with_monitoring(config, true).await
333    }
334
335    /// Create a new client with optional monitoring (for testing)
336    pub async fn new_with_monitoring(
337        config: ClientConfig,
338        enable_monitoring: bool,
339    ) -> Result<Self> {
340        let (subscription_tx, subscription_rx) = mpsc::channel(1000);
341
342        // Initialize network components based on profile
343        let components =
344            Self::initialize_components_with_monitoring(&config, enable_monitoring).await?;
345
346        let client = Self {
347            config,
348            components: Arc::new(components),
349            state: Arc::new(RwLock::new(ClientState {
350                connected: false,
351                node_info: None,
352                subscriptions: HashMap::new(),
353            })),
354            subscription_rx: Arc::new(RwLock::new(subscription_rx)),
355            subscription_tx,
356        };
357
358        Ok(client)
359    }
360
361    /// Initialize network components based on profile
362    #[allow(dead_code)]
363    async fn initialize_components(config: &ClientConfig) -> Result<NetworkComponents> {
364        Self::initialize_components_with_monitoring(config, true).await
365    }
366
367    /// Initialize network components with optional monitoring
368    async fn initialize_components_with_monitoring(
369        config: &ClientConfig,
370        enable_monitoring: bool,
371    ) -> Result<NetworkComponents> {
372        // Create trust provider
373        let trust_provider = Arc::new(crate::adaptive::trust::MockTrustProvider::new());
374
375        // Create routing components
376        let hyperbolic = Arc::new(crate::adaptive::hyperbolic::HyperbolicSpace::new());
377        let som = Arc::new(crate::adaptive::som::SelfOrganizingMap::new(
378            crate::adaptive::som::SomConfig {
379                initial_learning_rate: 0.3,
380                initial_radius: 5.0,
381                iterations: 1000,
382                grid_size: crate::adaptive::som::GridSize::Fixed(10, 10),
383            },
384        ));
385        let router = Arc::new(AdaptiveRouter::new(trust_provider.clone()));
386        // Store hyperbolic and som for potential future use
387        let _hyperbolic = hyperbolic;
388        let _som = som;
389
390        // Create gossip protocol
391        let node_id = NodeId { hash: [0u8; 32] }; // Temporary node ID
392        let gossip = Arc::new(AdaptiveGossipSub::new(
393            node_id.clone(),
394            trust_provider.clone(),
395        ));
396
397        // Create storage
398        let storage_config = match config.profile {
399            ClientProfile::Full => StorageConfig::default(),
400            ClientProfile::Light => StorageConfig {
401                cache_size: 10 * 1024 * 1024, // 10MB only
402                ..Default::default()
403            },
404            ClientProfile::Compute => StorageConfig {
405                cache_size: 100 * 1024 * 1024, // 100MB
406                ..Default::default()
407            },
408            ClientProfile::Mobile => StorageConfig {
409                cache_size: 5 * 1024 * 1024, // 5MB
410                chunk_size: 256 * 1024,      // 256KB chunks
411                ..Default::default()
412            },
413        };
414        let storage = Arc::new(ContentStore::new(storage_config).await?);
415
416        // Create other components
417        let churn_predictor = Arc::new(crate::adaptive::learning::ChurnPredictor::new());
418        let replication = Arc::new(ReplicationManager::new(
419            Default::default(),
420            trust_provider.clone(),
421            churn_predictor.clone(),
422            router.clone(),
423        ));
424
425        let cache_manager = Arc::new(crate::adaptive::learning::QLearnCacheManager::new(
426            storage.get_config().cache_size,
427        ));
428        let retrieval = Arc::new(RetrievalManager::new(
429            router.clone(),
430            storage.clone(),
431            cache_manager.clone(),
432        ));
433
434        let churn = Arc::new(ChurnHandler::new(
435            node_id.clone(),
436            churn_predictor,
437            trust_provider.clone(),
438            replication.clone(),
439            router.clone(),
440            gossip.clone(),
441            Default::default(),
442        ));
443
444        // Always create a unique registry for client instances to avoid metric conflicts
445        // when multiple clients are created in tests or concurrent scenarios
446        #[cfg(feature = "metrics")]
447        let registry = Some(prometheus::Registry::new());
448        #[cfg(not(feature = "metrics"))]
449        let registry = None;
450
451        let monitoring = if enable_monitoring {
452            Arc::new(MonitoringSystem::new_with_registry(
453                crate::adaptive::monitoring::MonitoredComponents {
454                    router: router.clone(),
455                    churn_handler: churn.clone(),
456                    gossip: gossip.clone(),
457                    storage: storage.clone(),
458                    replication: replication.clone(),
459                    thompson: Arc::new(crate::adaptive::learning::ThompsonSampling::new()),
460                    cache: cache_manager.clone(),
461                },
462                Default::default(),
463                registry,
464            )?)
465        } else {
466            // Create a minimal monitoring system for testing
467            #[cfg(feature = "metrics")]
468            let test_registry = Some(prometheus::Registry::new());
469            #[cfg(not(feature = "metrics"))]
470            let test_registry = None;
471
472            Arc::new(MonitoringSystem::new_with_registry(
473                crate::adaptive::monitoring::MonitoredComponents {
474                    router: router.clone(),
475                    churn_handler: churn.clone(),
476                    gossip: gossip.clone(),
477                    storage: storage.clone(),
478                    replication: replication.clone(),
479                    thompson: Arc::new(crate::adaptive::learning::ThompsonSampling::new()),
480                    cache: cache_manager.clone(),
481                },
482                Default::default(),
483                test_registry,
484            )?)
485        };
486
487        Ok(NetworkComponents {
488            node_id,
489            router,
490            gossip,
491            storage,
492            retrieval,
493            replication,
494            churn,
495            monitoring,
496        })
497    }
498
499    /// Connect to a specific node
500    async fn connect_to_node(&self, address: &str) -> Result<()> {
501        // In a real implementation, this would establish a network connection
502        // For now, we'll simulate connection
503        tokio::time::sleep(Duration::from_millis(100)).await;
504
505        let mut state = self.state.write().await;
506        state.connected = true;
507        state.node_info = Some(NodeInfo {
508            id: "node_123".to_string(),
509            addresses: vec![address.to_string()],
510            capabilities: NodeCapabilities {
511                storage_gb: 100,
512                compute_score: 1000,
513                bandwidth_mbps: 100,
514            },
515            trust_score: 1.0,
516        });
517
518        Ok(())
519    }
520
521    /// Handle subscription messages
522    async fn handle_subscriptions(&self) {
523        let mut rx = self.subscription_rx.write().await;
524
525        while let Some(msg) = rx.recv().await {
526            let state = self.state.read().await;
527            if let Some(sender) = state.subscriptions.get(&msg.topic) {
528                let _ = sender.send(msg.data).await;
529            }
530        }
531    }
532}
533
534#[async_trait]
535impl AdaptiveP2PClient for Client {
536    async fn connect(config: ClientConfig) -> Result<Self> {
537        let client = Self::new(config.clone()).await?;
538
539        // Connect to the specified node
540        tokio::time::timeout(
541            config.connect_timeout,
542            client.connect_to_node(&config.node_address),
543        )
544        .await
545        .map_err(|_| ClientError::Timeout)?
546        .map_err(|e| ClientError::Connection(e.to_string()))?;
547
548        // Start background tasks
549        let client_clone = client.clone();
550        tokio::spawn(async move {
551            client_clone.handle_subscriptions().await;
552        });
553
554        // Start monitoring
555        client.components.monitoring.start().await;
556
557        // Start churn monitoring
558        client.components.churn.start_monitoring().await;
559
560        Ok(client)
561    }
562
563    async fn store(&self, data: Vec<u8>) -> Result<ContentHash> {
564        let state = self.state.read().await;
565        if !state.connected {
566            return Err(ClientError::NotConnected.into());
567        }
568
569        // Store data with automatic chunking and replication
570        let metadata = crate::adaptive::storage::ContentMetadata {
571            size: data.len(),
572            content_type: crate::adaptive::ContentType::DataRetrieval,
573            created_at: std::time::SystemTime::now()
574                .duration_since(std::time::UNIX_EPOCH)
575                .map(|d| d.as_secs())
576                .unwrap_or(0),
577            chunk_count: None,
578            replication_factor: 8,
579        };
580
581        let hash = self
582            .components
583            .storage
584            .store(data.clone(), metadata.clone())
585            .await
586            .map_err(|e| ClientError::Storage(e.to_string()))?;
587
588        // Trigger replication
589        self.components
590            .replication
591            .replicate_content(&hash, &data, metadata)
592            .await
593            .map_err(|e| ClientError::Storage(format!("Replication failed: {e}")))?;
594
595        Ok(hash)
596    }
597
598    async fn retrieve(&self, hash: &ContentHash) -> Result<Vec<u8>> {
599        let state = self.state.read().await;
600        if !state.connected {
601            return Err(ClientError::NotConnected.into());
602        }
603
604        // Use parallel retrieval strategies
605        tokio::time::timeout(
606            self.config.request_timeout,
607            self.components.retrieval.retrieve(
608                hash,
609                crate::adaptive::retrieval::RetrievalStrategy::Parallel,
610            ),
611        )
612        .await
613        .map_err(|_| ClientError::Timeout)?
614        .map_err(|e| ClientError::Retrieval(e.to_string()).into())
615    }
616
617    async fn delete(&self, hash: &ContentHash) -> Result<()> {
618        let state = self.state.read().await;
619        if !state.connected {
620            return Err(ClientError::NotConnected.into());
621        }
622
623        // Delete from local storage
624        self.components
625            .storage
626            .delete(hash)
627            .await
628            .map_err(|e| ClientError::Storage(e.to_string()).into())
629    }
630
631    async fn submit_compute_job(&self, _job: ComputeJob) -> Result<JobId> {
632        let state = self.state.read().await;
633        if !state.connected {
634            return Err(ClientError::NotConnected.into());
635        }
636
637        // In a real implementation, this would distribute the job
638        // For now, return a mock job ID
639        Ok(format!("job_{}", uuid::Uuid::new_v4()))
640    }
641
642    async fn get_job_result(&self, job_id: &JobId) -> Result<ComputeResult> {
643        let state = self.state.read().await;
644        if !state.connected {
645            return Err(ClientError::NotConnected.into());
646        }
647
648        // In a real implementation, this would retrieve the result
649        // For now, return a mock result
650        Ok(ComputeResult {
651            job_id: job_id.clone(),
652            output: b"Mock compute result".to_vec(),
653            execution_time: Duration::from_secs(5),
654            executor_node: "node_456".to_string(),
655        })
656    }
657
658    async fn publish(&self, topic: &str, message: Vec<u8>) -> Result<()> {
659        let state = self.state.read().await;
660        if !state.connected {
661            return Err(ClientError::NotConnected.into());
662        }
663
664        use crate::adaptive::gossip::GossipMessage;
665
666        let gossip_msg = GossipMessage {
667            topic: topic.to_string(),
668            data: message,
669            from: self.components.node_id.clone(),
670            seqno: 0, // TODO: Track sequence numbers
671            timestamp: std::time::SystemTime::now()
672                .duration_since(std::time::UNIX_EPOCH)
673                .map(|d| d.as_secs())
674                .unwrap_or(0),
675        };
676
677        self.components
678            .gossip
679            .publish(topic, gossip_msg)
680            .await
681            .map_err(|e| ClientError::Messaging(e.to_string()).into())
682    }
683
684    async fn subscribe(&self, topic: &str) -> Result<MessageStream> {
685        let state = self.state.read().await;
686        if !state.connected {
687            return Err(ClientError::NotConnected.into());
688        }
689
690        // Subscribe to topic
691        self.components
692            .gossip
693            .subscribe(topic)
694            .await
695            .map_err(|e| ClientError::Messaging(e.to_string()))?;
696
697        // Create message stream
698        let (tx, rx) = mpsc::channel(100);
699
700        let mut state = self.state.write().await;
701        state.subscriptions.insert(topic.to_string(), tx);
702
703        Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)))
704    }
705
706    async fn get_node_info(&self) -> Result<NodeInfo> {
707        let state = self.state.read().await;
708        if !state.connected {
709            return Err(ClientError::NotConnected.into());
710        }
711
712        state
713            .node_info
714            .clone()
715            .ok_or_else(|| ClientError::Other("Node info not available".to_string()).into())
716    }
717
718    async fn get_network_stats(&self) -> Result<NetworkStats> {
719        let state = self.state.read().await;
720        if !state.connected {
721            return Err(ClientError::NotConnected.into());
722        }
723
724        // Get stats from various components
725        let health = self.components.monitoring.get_health().await;
726        let routing_stats = self.components.router.get_stats().await;
727        let storage_stats = self.components.storage.get_stats().await;
728        let gossip_stats = self.components.gossip.get_stats().await;
729
730        Ok(NetworkStats {
731            connected_peers: gossip_stats.peer_count,
732            routing_success_rate: routing_stats.success_rate(),
733            average_trust_score: health.score,
734            cache_hit_rate: 0.0, // TODO: Get from cache manager
735            churn_rate: health.churn_rate,
736            total_storage: storage_stats.total_bytes,
737            total_bandwidth: 0, // TODO: Track bandwidth
738        })
739    }
740
741    async fn disconnect(&self) -> Result<()> {
742        let mut state = self.state.write().await;
743        state.connected = false;
744
745        // Clean up subscriptions
746        state.subscriptions.clear();
747
748        Ok(())
749    }
750}
751
752/// Clone implementation for task spawning
753impl Clone for Client {
754    fn clone(&self) -> Self {
755        Self {
756            config: self.config.clone(),
757            components: self.components.clone(),
758            state: self.state.clone(),
759            subscription_rx: self.subscription_rx.clone(),
760            subscription_tx: self.subscription_tx.clone(),
761        }
762    }
763}
764
765/// Convenience function to create and connect a client
766pub async fn connect(address: &str) -> Result<Client> {
767    let config = ClientConfig {
768        node_address: address.to_string(),
769        ..Default::default()
770    };
771
772    Client::connect(config).await
773}
774
775/// Create a client with a specific profile
776pub async fn connect_with_profile(address: &str, profile: ClientProfile) -> Result<Client> {
777    let config = ClientConfig {
778        node_address: address.to_string(),
779        profile,
780        ..Default::default()
781    };
782
783    Client::connect(config).await
784}
785
786#[cfg(test)]
787mod tests {
788    use super::*;
789    use crate::adaptive::monitoring::MonitoringSystem;
790
791    #[tokio::test]
792    async fn test_client_creation() {
793        let client = new_test_client(ClientConfig::default()).await.unwrap();
794
795        // Should start disconnected
796        let state = client.state.read().await;
797        assert!(!state.connected);
798    }
799
800    /// Create a minimal test client without any monitoring components
801    pub async fn new_test_client(config: ClientConfig) -> Result<Client> {
802        let (subscription_tx, subscription_rx) = mpsc::channel(1000);
803
804        // Create minimal components for testing
805        let trust_provider = Arc::new(crate::adaptive::trust::MockTrustProvider::new());
806        let hyperbolic = Arc::new(crate::adaptive::hyperbolic::HyperbolicSpace::new());
807        let som = Arc::new(crate::adaptive::som::SelfOrganizingMap::new(
808            crate::adaptive::som::SomConfig {
809                initial_learning_rate: 0.3,
810                initial_radius: 5.0,
811                iterations: 1000,
812                grid_size: crate::adaptive::som::GridSize::Fixed(10, 10),
813            },
814        ));
815        let router = Arc::new(AdaptiveRouter::new(trust_provider.clone()));
816        // Store hyperbolic and som for potential future use
817        let _hyperbolic = hyperbolic;
818        let _som = som;
819
820        let node_id = NodeId { hash: [0u8; 32] };
821        let gossip = Arc::new(AdaptiveGossipSub::new(
822            node_id.clone(),
823            trust_provider.clone(),
824        ));
825
826        // Create storage with minimal config
827        let storage_config = StorageConfig {
828            cache_size: 1024 * 1024, // 1MB for tests
829            ..Default::default()
830        };
831        let storage = Arc::new(ContentStore::new(storage_config).await?);
832
833        let churn_predictor = Arc::new(crate::adaptive::learning::ChurnPredictor::new());
834        let replication = Arc::new(ReplicationManager::new(
835            Default::default(),
836            trust_provider.clone(),
837            churn_predictor.clone(),
838            router.clone(),
839        ));
840
841        let cache = Arc::new(crate::adaptive::learning::QLearnCacheManager::new(
842            storage.get_config().cache_size,
843        ));
844        let retrieval = Arc::new(RetrievalManager::new(
845            router.clone(),
846            storage.clone(),
847            cache.clone(),
848        ));
849
850        let churn = Arc::new(ChurnHandler::new(
851            node_id.clone(),
852            churn_predictor,
853            trust_provider.clone(),
854            replication.clone(),
855            router.clone(),
856            gossip.clone(),
857            Default::default(),
858        ));
859
860        // Create Thompson sampling for tests
861        let thompson = Arc::new(crate::adaptive::learning::ThompsonSampling::new());
862
863        // Create monitoring system with a test-specific registry to avoid conflicts
864        #[cfg(feature = "metrics")]
865        let test_registry = Some(prometheus::Registry::new());
866        #[cfg(not(feature = "metrics"))]
867        let test_registry = None;
868
869        let monitoring = Arc::new(
870            MonitoringSystem::new_with_registry(
871                crate::adaptive::monitoring::MonitoredComponents {
872                    router: router.clone(),
873                    churn_handler: churn.clone(),
874                    gossip: gossip.clone(),
875                    storage: storage.clone(),
876                    replication: replication.clone(),
877                    thompson: thompson.clone(),
878                    cache: cache.clone(),
879                },
880                crate::adaptive::monitoring::MonitoringConfig::default(),
881                test_registry,
882            )
883            .expect("Failed to create monitoring system for tests"),
884        );
885
886        let components = NetworkComponents {
887            node_id,
888            router,
889            gossip,
890            storage,
891            retrieval,
892            replication,
893            churn,
894            monitoring,
895        };
896
897        let client = Client {
898            config,
899            components: Arc::new(components),
900            state: Arc::new(RwLock::new(ClientState {
901                connected: false,
902                node_info: None,
903                subscriptions: HashMap::new(),
904            })),
905            subscription_rx: Arc::new(RwLock::new(subscription_rx)),
906            subscription_tx,
907        };
908
909        Ok(client)
910    }
911
912    #[tokio::test]
913    async fn test_client_connect() {
914        let client = new_test_client(ClientConfig::default()).await.unwrap();
915
916        // Manually trigger connection for test
917        client.connect_to_node("127.0.0.1:8000").await.unwrap();
918
919        // Should be connected
920        let state = client.state.read().await;
921        assert!(state.connected);
922        assert!(state.node_info.is_some());
923    }
924
925    #[tokio::test]
926    async fn test_storage_operations() {
927        let client = new_test_client(ClientConfig::default()).await.unwrap();
928
929        // Connect first
930        client.connect_to_node("127.0.0.1:8000").await.unwrap();
931
932        // Store data
933        let data = b"Hello, P2P world!".to_vec();
934        let hash = client.store(data.clone()).await.unwrap();
935
936        // Retrieve data
937        let retrieved = client.retrieve(&hash).await.unwrap();
938        assert_eq!(retrieved, data);
939    }
940
941    #[tokio::test]
942    async fn test_not_connected_error() {
943        let client = new_test_client(ClientConfig::default()).await.unwrap();
944
945        // Operations should fail when not connected
946        let result = client.store(vec![1, 2, 3]).await;
947        assert!(result.is_err());
948        assert!(result.unwrap_err().to_string().contains("Not connected"));
949    }
950
951    #[tokio::test]
952    async fn test_network_stats() {
953        let client = new_test_client(ClientConfig::default()).await.unwrap();
954
955        // Connect first
956        client.connect_to_node("127.0.0.1:8000").await.unwrap();
957
958        let stats = client.get_network_stats().await.unwrap();
959        assert!(stats.routing_success_rate >= 0.0);
960        assert!(stats.routing_success_rate <= 1.0);
961    }
962
963    #[tokio::test]
964    async fn test_client_profiles() {
965        // Test different profiles
966        for profile in [
967            ClientProfile::Full,
968            ClientProfile::Light,
969            ClientProfile::Compute,
970            ClientProfile::Mobile,
971        ] {
972            let config = ClientConfig {
973                profile,
974                ..Default::default()
975            };
976            match Client::connect(config).await {
977                Ok(client) => {
978                    let info = client.get_node_info().await.unwrap();
979                    assert!(!info.id.is_empty());
980                }
981                Err(e) => {
982                    // Some environments may disallow repeated metrics registration; skip gracefully
983                    let es = format!("{}", e);
984                    if es.contains("Duplicate") && es.contains("registration") {
985                        continue;
986                    }
987                    panic!("Client::connect failed: {}", es);
988                }
989            }
990        }
991    }
992
993    #[tokio::test]
994    async fn test_pubsub_messaging() {
995        let client = new_test_client(ClientConfig::default()).await.unwrap();
996
997        // Connect first
998        client.connect_to_node("127.0.0.1:8000").await.unwrap();
999
1000        // Subscribe to topic
1001        let _stream = client.subscribe("test_topic").await.unwrap();
1002
1003        // Publish message
1004        let message = b"Test message".to_vec();
1005        client.publish("test_topic", message.clone()).await.unwrap();
1006
1007        // In a real implementation, we would receive the message
1008        // For now, just check that operations don't fail
1009    }
1010
1011    #[tokio::test]
1012    async fn test_compute_job() {
1013        let client = new_test_client(ClientConfig::default()).await.unwrap();
1014
1015        // Connect first
1016        client.connect_to_node("127.0.0.1:8000").await.unwrap();
1017
1018        let job = ComputeJob {
1019            id: "test_job".to_string(),
1020            job_type: "map_reduce".to_string(),
1021            input: b"Input data".to_vec(),
1022            requirements: ResourceRequirements {
1023                cpu_cores: 2,
1024                memory_mb: 1024,
1025                max_duration: Duration::from_secs(60),
1026            },
1027        };
1028
1029        let job_id = client.submit_compute_job(job).await.unwrap();
1030        assert!(!job_id.is_empty());
1031
1032        let result = client.get_job_result(&job_id).await.unwrap();
1033        assert_eq!(result.job_id, job_id);
1034    }
1035}