saorsa_core/adaptive/
client.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Client library and API for the adaptive P2P network
15//!
16//! This module provides a simple, ergonomic async API for applications to interact
17//! with the adaptive P2P network. It abstracts away the complexity of the underlying
18//! distributed systems and provides straightforward methods for:
19//! - Content storage and retrieval
20//! - Pub/sub messaging
21//! - Network statistics and monitoring
22
23use crate::adaptive::{
24    AdaptiveGossipSub, AdaptiveRouter, ChurnHandler, ContentHash, ContentStore, MonitoringSystem,
25    NodeId, ReplicationManager, RetrievalManager, StorageConfig,
26};
27use anyhow::Result;
28use async_trait::async_trait;
29use futures::Stream;
30use std::collections::HashMap;
31use std::pin::Pin;
32use std::{sync::Arc, time::Duration};
33use tokio::sync::{RwLock, mpsc};
34
35/// Client configuration
36#[derive(Debug, Clone)]
37pub struct ClientConfig {
38    /// Node address to connect to
39    pub node_address: String,
40
41    /// Connection timeout
42    pub connect_timeout: Duration,
43
44    /// Request timeout
45    pub request_timeout: Duration,
46
47    /// Enable debug logging
48    pub debug_logging: bool,
49
50    /// Client profile (full, light, compute, mobile)
51    pub profile: ClientProfile,
52}
53
54impl Default for ClientConfig {
55    fn default() -> Self {
56        // Load global config for defaults
57        let global_config = crate::config::Config::default();
58
59        // Use the global listen address as default node address
60        let node_address = global_config.network.listen_address.clone();
61
62        Self {
63            node_address,
64            connect_timeout: Duration::from_secs(global_config.network.connection_timeout),
65            request_timeout: Duration::from_secs(30),
66            debug_logging: false,
67            profile: ClientProfile::Full,
68        }
69    }
70}
71
72impl ClientConfig {
73    /// Create ClientConfig from global Config
74    pub fn from_global_config(config: &crate::config::Config) -> Self {
75        Self {
76            node_address: config.network.listen_address.clone(),
77            connect_timeout: Duration::from_secs(config.network.connection_timeout),
78            request_timeout: Duration::from_secs(30),
79            debug_logging: false,
80            profile: ClientProfile::Full,
81        }
82    }
83}
84
85/// Client profile for different deployment scenarios
86#[derive(Debug, Clone, Copy, PartialEq)]
87pub enum ClientProfile {
88    /// Full node with all capabilities
89    Full,
90
91    /// Light node with routing only
92    Light,
93
94    /// Compute-optimized node
95    Compute,
96
97    /// Mobile node with reduced parameters
98    Mobile,
99}
100
101/// Simple async client for the adaptive P2P network
102pub struct Client {
103    /// Client configuration
104    config: ClientConfig,
105
106    /// Network components
107    components: Arc<NetworkComponents>,
108
109    /// Client state
110    state: Arc<RwLock<ClientState>>,
111
112    /// Message receiver for subscriptions
113    subscription_rx: Arc<RwLock<mpsc::Receiver<SubscriptionMessage>>>,
114
115    /// Subscription sender
116    subscription_tx: mpsc::Sender<SubscriptionMessage>,
117}
118
119/// Internal network components
120struct NetworkComponents {
121    /// Node ID
122    node_id: NodeId,
123
124    /// Adaptive router
125    router: Arc<AdaptiveRouter>,
126
127    /// Gossip protocol
128    gossip: Arc<AdaptiveGossipSub>,
129
130    /// Content store
131    storage: Arc<ContentStore>,
132
133    /// Retrieval manager
134    retrieval: Arc<RetrievalManager>,
135
136    /// Replication manager
137    replication: Arc<ReplicationManager>,
138
139    /// Churn handler
140    churn: Arc<ChurnHandler>,
141
142    /// Monitoring system
143    monitoring: Arc<MonitoringSystem>,
144}
145
146/// Client state
147struct ClientState {
148    /// Connected status
149    connected: bool,
150
151    /// Local node information
152    node_info: Option<NodeInfo>,
153
154    /// Active subscriptions
155    subscriptions: HashMap<String, mpsc::Sender<Vec<u8>>>,
156}
157
158/// Subscription message
159struct SubscriptionMessage {
160    /// Topic
161    topic: String,
162
163    /// Message data
164    data: Vec<u8>,
165}
166
167/// Node information
168#[derive(Debug, Clone)]
169pub struct NodeInfo {
170    /// Node ID
171    pub id: String,
172
173    /// Network addresses
174    pub addresses: Vec<String>,
175
176    /// Node capabilities
177    pub capabilities: NodeCapabilities,
178
179    /// Trust score
180    pub trust_score: f64,
181}
182
183/// Node capabilities
184#[derive(Debug, Clone)]
185pub struct NodeCapabilities {
186    /// Storage capacity in GB
187    pub storage_gb: u64,
188
189    /// Compute benchmark score
190    pub compute_score: u64,
191
192    /// Available bandwidth in Mbps
193    pub bandwidth_mbps: u64,
194}
195
196/// Network statistics
197#[derive(Debug, Clone)]
198pub struct NetworkStats {
199    /// Number of connected peers
200    pub connected_peers: usize,
201
202    /// Routing success rate
203    pub routing_success_rate: f64,
204
205    /// Average trust score
206    pub average_trust_score: f64,
207
208    /// Cache hit rate
209    pub cache_hit_rate: f64,
210
211    /// Current churn rate
212    pub churn_rate: f64,
213
214    /// Total storage in bytes
215    pub total_storage: u64,
216
217    /// Total bandwidth in bytes/sec
218    pub total_bandwidth: u64,
219}
220
221/// Compute job for distributed processing
222#[derive(Debug, Clone)]
223pub struct ComputeJob {
224    /// Job ID
225    pub id: String,
226
227    /// Job type
228    pub job_type: String,
229
230    /// Input data
231    pub input: Vec<u8>,
232
233    /// Resource requirements
234    pub requirements: ResourceRequirements,
235}
236
237/// Resource requirements for compute jobs
238#[derive(Debug, Clone)]
239pub struct ResourceRequirements {
240    /// Minimum CPU cores
241    pub cpu_cores: u32,
242
243    /// Minimum memory in MB
244    pub memory_mb: u32,
245
246    /// Maximum execution time
247    pub max_duration: Duration,
248}
249
250/// Job ID type
251pub type JobId = String;
252
253/// Compute result
254#[derive(Debug, Clone)]
255pub struct ComputeResult {
256    /// Job ID
257    pub job_id: JobId,
258
259    /// Result data
260    pub output: Vec<u8>,
261
262    /// Execution time
263    pub execution_time: Duration,
264
265    /// Node that executed the job
266    pub executor_node: String,
267}
268
269/// Message stream for subscriptions
270pub type MessageStream = Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;
271
272/// Client error type
273#[derive(Debug, thiserror::Error)]
274pub enum ClientError {
275    #[error("Connection error: {0}")]
276    Connection(String),
277
278    #[error("Storage error: {0}")]
279    Storage(String),
280
281    #[error("Retrieval error: {0}")]
282    Retrieval(String),
283
284    #[error("Messaging error: {0}")]
285    Messaging(String),
286
287    #[error("Network error: {0}")]
288    Network(String),
289
290    #[error("Timeout error")]
291    Timeout,
292
293    #[error("Not connected")]
294    NotConnected,
295
296    #[error("Other error: {0}")]
297    Other(String),
298}
299
300/// Main client trait
301#[async_trait]
302pub trait AdaptiveP2PClient: Send + Sync {
303    /// Connect to the network
304    async fn connect(config: ClientConfig) -> Result<Self>
305    where
306        Self: Sized;
307
308    /// Storage operations
309    async fn store(&self, data: Vec<u8>) -> Result<ContentHash>;
310    async fn retrieve(&self, hash: &ContentHash) -> Result<Vec<u8>>;
311    async fn delete(&self, hash: &ContentHash) -> Result<()>;
312
313    /// Computation operations
314    async fn submit_compute_job(&self, job: ComputeJob) -> Result<JobId>;
315    async fn get_job_result(&self, job_id: &JobId) -> Result<ComputeResult>;
316
317    /// Messaging operations
318    async fn publish(&self, topic: &str, message: Vec<u8>) -> Result<()>;
319    async fn subscribe(&self, topic: &str) -> Result<MessageStream>;
320
321    /// Network information
322    async fn get_node_info(&self) -> Result<NodeInfo>;
323    async fn get_network_stats(&self) -> Result<NetworkStats>;
324
325    /// Disconnect from the network
326    async fn disconnect(&self) -> Result<()>;
327}
328
329impl Client {
330    /// Create a new client
331    pub async fn new(config: ClientConfig) -> Result<Self> {
332        let (subscription_tx, subscription_rx) = mpsc::channel(1000);
333
334        // Initialize network components based on profile
335        let components = Self::initialize_components(&config).await?;
336
337        let client = Self {
338            config,
339            components: Arc::new(components),
340            state: Arc::new(RwLock::new(ClientState {
341                connected: false,
342                node_info: None,
343                subscriptions: HashMap::new(),
344            })),
345            subscription_rx: Arc::new(RwLock::new(subscription_rx)),
346            subscription_tx,
347        };
348
349        Ok(client)
350    }
351
352    /// Initialize network components based on profile
353    async fn initialize_components(config: &ClientConfig) -> Result<NetworkComponents> {
354        // Create trust provider
355        let trust_provider = Arc::new(crate::adaptive::trust::MockTrustProvider::new());
356
357        // Create routing components
358        let hyperbolic = Arc::new(crate::adaptive::hyperbolic::HyperbolicSpace::new());
359        let som = Arc::new(crate::adaptive::som::SelfOrganizingMap::new(
360            crate::adaptive::som::SomConfig {
361                initial_learning_rate: 0.3,
362                initial_radius: 5.0,
363                iterations: 1000,
364                grid_size: crate::adaptive::som::GridSize::Fixed(10, 10),
365            },
366        ));
367        let router = Arc::new(AdaptiveRouter::new(trust_provider.clone(), hyperbolic, som));
368
369        // Create gossip protocol
370        let node_id = NodeId { hash: [0u8; 32] }; // Temporary node ID
371        let gossip = Arc::new(AdaptiveGossipSub::new(
372            node_id.clone(),
373            trust_provider.clone(),
374        ));
375
376        // Create storage
377        let storage_config = match config.profile {
378            ClientProfile::Full => StorageConfig::default(),
379            ClientProfile::Light => StorageConfig {
380                cache_size: 10 * 1024 * 1024, // 10MB only
381                ..Default::default()
382            },
383            ClientProfile::Compute => StorageConfig {
384                cache_size: 100 * 1024 * 1024, // 100MB
385                ..Default::default()
386            },
387            ClientProfile::Mobile => StorageConfig {
388                cache_size: 5 * 1024 * 1024, // 5MB
389                chunk_size: 256 * 1024,      // 256KB chunks
390                ..Default::default()
391            },
392        };
393        let storage = Arc::new(ContentStore::new(storage_config).await?);
394
395        // Create other components
396        let churn_predictor = Arc::new(crate::adaptive::learning::ChurnPredictor::new());
397        let replication = Arc::new(ReplicationManager::new(
398            Default::default(),
399            trust_provider.clone(),
400            churn_predictor.clone(),
401            router.clone(),
402        ));
403
404        let cache_manager = Arc::new(crate::adaptive::learning::QLearnCacheManager::new(
405            storage.get_config().cache_size,
406        ));
407        let retrieval = Arc::new(RetrievalManager::new(
408            router.clone(),
409            storage.clone(),
410            cache_manager.clone(),
411        ));
412
413        let churn = Arc::new(ChurnHandler::new(
414            node_id.clone(),
415            churn_predictor,
416            trust_provider.clone(),
417            replication.clone(),
418            router.clone(),
419            gossip.clone(),
420            Default::default(),
421        ));
422
423        let monitoring = Arc::new(MonitoringSystem::new(
424            crate::adaptive::monitoring::MonitoredComponents {
425                router: router.clone(),
426                churn_handler: churn.clone(),
427                gossip: gossip.clone(),
428                storage: storage.clone(),
429                replication: replication.clone(),
430                thompson: Arc::new(crate::adaptive::learning::ThompsonSampling::new()),
431                cache: cache_manager,
432            },
433            Default::default(),
434        )?);
435
436        Ok(NetworkComponents {
437            node_id,
438            router,
439            gossip,
440            storage,
441            retrieval,
442            replication,
443            churn,
444            monitoring,
445        })
446    }
447
448    /// Connect to a specific node
449    async fn connect_to_node(&self, address: &str) -> Result<()> {
450        // In a real implementation, this would establish a network connection
451        // For now, we'll simulate connection
452        tokio::time::sleep(Duration::from_millis(100)).await;
453
454        let mut state = self.state.write().await;
455        state.connected = true;
456        state.node_info = Some(NodeInfo {
457            id: "node_123".to_string(),
458            addresses: vec![address.to_string()],
459            capabilities: NodeCapabilities {
460                storage_gb: 100,
461                compute_score: 1000,
462                bandwidth_mbps: 100,
463            },
464            trust_score: 1.0,
465        });
466
467        Ok(())
468    }
469
470    /// Handle subscription messages
471    async fn handle_subscriptions(&self) {
472        let mut rx = self.subscription_rx.write().await;
473
474        while let Some(msg) = rx.recv().await {
475            let state = self.state.read().await;
476            if let Some(sender) = state.subscriptions.get(&msg.topic) {
477                let _ = sender.send(msg.data).await;
478            }
479        }
480    }
481}
482
483#[async_trait]
484impl AdaptiveP2PClient for Client {
485    async fn connect(config: ClientConfig) -> Result<Self> {
486        let client = Self::new(config.clone()).await?;
487
488        // Connect to the specified node
489        tokio::time::timeout(
490            config.connect_timeout,
491            client.connect_to_node(&config.node_address),
492        )
493        .await
494        .map_err(|_| ClientError::Timeout)?
495        .map_err(|e| ClientError::Connection(e.to_string()))?;
496
497        // Start background tasks
498        let client_clone = client.clone();
499        tokio::spawn(async move {
500            client_clone.handle_subscriptions().await;
501        });
502
503        // Start monitoring
504        client.components.monitoring.start().await;
505
506        // Start churn monitoring
507        client.components.churn.start_monitoring().await;
508
509        Ok(client)
510    }
511
512    async fn store(&self, data: Vec<u8>) -> Result<ContentHash> {
513        let state = self.state.read().await;
514        if !state.connected {
515            return Err(ClientError::NotConnected.into());
516        }
517
518        // Store data with automatic chunking and replication
519        let metadata = crate::adaptive::storage::ContentMetadata {
520            size: data.len(),
521            content_type: crate::adaptive::ContentType::DataRetrieval,
522            created_at: std::time::SystemTime::now()
523                .duration_since(std::time::UNIX_EPOCH)
524                .map(|d| d.as_secs())
525                .unwrap_or(0),
526            chunk_count: None,
527            replication_factor: 8,
528        };
529
530        let hash = self
531            .components
532            .storage
533            .store(data.clone(), metadata.clone())
534            .await
535            .map_err(|e| ClientError::Storage(e.to_string()))?;
536
537        // Trigger replication
538        self.components
539            .replication
540            .replicate_content(&hash, &data, metadata)
541            .await
542            .map_err(|e| ClientError::Storage(format!("Replication failed: {e}")))?;
543
544        Ok(hash)
545    }
546
547    async fn retrieve(&self, hash: &ContentHash) -> Result<Vec<u8>> {
548        let state = self.state.read().await;
549        if !state.connected {
550            return Err(ClientError::NotConnected.into());
551        }
552
553        // Use parallel retrieval strategies
554        tokio::time::timeout(
555            self.config.request_timeout,
556            self.components.retrieval.retrieve(
557                hash,
558                crate::adaptive::retrieval::RetrievalStrategy::Parallel,
559            ),
560        )
561        .await
562        .map_err(|_| ClientError::Timeout)?
563        .map_err(|e| ClientError::Retrieval(e.to_string()).into())
564    }
565
566    async fn delete(&self, hash: &ContentHash) -> Result<()> {
567        let state = self.state.read().await;
568        if !state.connected {
569            return Err(ClientError::NotConnected.into());
570        }
571
572        // Delete from local storage
573        self.components
574            .storage
575            .delete(hash)
576            .await
577            .map_err(|e| ClientError::Storage(e.to_string()).into())
578    }
579
580    async fn submit_compute_job(&self, _job: ComputeJob) -> Result<JobId> {
581        let state = self.state.read().await;
582        if !state.connected {
583            return Err(ClientError::NotConnected.into());
584        }
585
586        // In a real implementation, this would distribute the job
587        // For now, return a mock job ID
588        Ok(format!("job_{}", uuid::Uuid::new_v4()))
589    }
590
591    async fn get_job_result(&self, job_id: &JobId) -> Result<ComputeResult> {
592        let state = self.state.read().await;
593        if !state.connected {
594            return Err(ClientError::NotConnected.into());
595        }
596
597        // In a real implementation, this would retrieve the result
598        // For now, return a mock result
599        Ok(ComputeResult {
600            job_id: job_id.clone(),
601            output: b"Mock compute result".to_vec(),
602            execution_time: Duration::from_secs(5),
603            executor_node: "node_456".to_string(),
604        })
605    }
606
607    async fn publish(&self, topic: &str, message: Vec<u8>) -> Result<()> {
608        let state = self.state.read().await;
609        if !state.connected {
610            return Err(ClientError::NotConnected.into());
611        }
612
613        use crate::adaptive::gossip::GossipMessage;
614
615        let gossip_msg = GossipMessage {
616            topic: topic.to_string(),
617            data: message,
618            from: self.components.node_id.clone(),
619            seqno: 0, // TODO: Track sequence numbers
620            timestamp: std::time::SystemTime::now()
621                .duration_since(std::time::UNIX_EPOCH)
622                .map(|d| d.as_secs())
623                .unwrap_or(0),
624        };
625
626        self.components
627            .gossip
628            .publish(topic, gossip_msg)
629            .await
630            .map_err(|e| ClientError::Messaging(e.to_string()).into())
631    }
632
633    async fn subscribe(&self, topic: &str) -> Result<MessageStream> {
634        let state = self.state.read().await;
635        if !state.connected {
636            return Err(ClientError::NotConnected.into());
637        }
638
639        // Subscribe to topic
640        self.components
641            .gossip
642            .subscribe(topic)
643            .await
644            .map_err(|e| ClientError::Messaging(e.to_string()))?;
645
646        // Create message stream
647        let (tx, rx) = mpsc::channel(100);
648
649        let mut state = self.state.write().await;
650        state.subscriptions.insert(topic.to_string(), tx);
651
652        Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)))
653    }
654
655    async fn get_node_info(&self) -> Result<NodeInfo> {
656        let state = self.state.read().await;
657        if !state.connected {
658            return Err(ClientError::NotConnected.into());
659        }
660
661        state
662            .node_info
663            .clone()
664            .ok_or_else(|| ClientError::Other("Node info not available".to_string()).into())
665    }
666
667    async fn get_network_stats(&self) -> Result<NetworkStats> {
668        let state = self.state.read().await;
669        if !state.connected {
670            return Err(ClientError::NotConnected.into());
671        }
672
673        // Get stats from various components
674        let health = self.components.monitoring.get_health().await;
675        let routing_stats = self.components.router.get_stats().await;
676        let storage_stats = self.components.storage.get_stats().await;
677        let gossip_stats = self.components.gossip.get_stats().await;
678
679        Ok(NetworkStats {
680            connected_peers: gossip_stats.peer_count,
681            routing_success_rate: routing_stats.success_rate(),
682            average_trust_score: health.score,
683            cache_hit_rate: 0.0, // TODO: Get from cache manager
684            churn_rate: health.churn_rate,
685            total_storage: storage_stats.total_bytes,
686            total_bandwidth: 0, // TODO: Track bandwidth
687        })
688    }
689
690    async fn disconnect(&self) -> Result<()> {
691        let mut state = self.state.write().await;
692        state.connected = false;
693
694        // Clean up subscriptions
695        state.subscriptions.clear();
696
697        Ok(())
698    }
699}
700
701/// Clone implementation for task spawning
702impl Clone for Client {
703    fn clone(&self) -> Self {
704        Self {
705            config: self.config.clone(),
706            components: self.components.clone(),
707            state: self.state.clone(),
708            subscription_rx: self.subscription_rx.clone(),
709            subscription_tx: self.subscription_tx.clone(),
710        }
711    }
712}
713
714/// Convenience function to create and connect a client
715pub async fn connect(address: &str) -> Result<Client> {
716    let config = ClientConfig {
717        node_address: address.to_string(),
718        ..Default::default()
719    };
720
721    Client::connect(config).await
722}
723
724/// Create a client with a specific profile
725pub async fn connect_with_profile(address: &str, profile: ClientProfile) -> Result<Client> {
726    let config = ClientConfig {
727        node_address: address.to_string(),
728        profile,
729        ..Default::default()
730    };
731
732    Client::connect(config).await
733}
734
735#[cfg(test)]
736mod tests {
737    use super::*;
738
739    #[tokio::test]
740    async fn test_client_creation() {
741        let config = ClientConfig::default();
742        let client = Client::new(config).await.unwrap();
743
744        // Should start disconnected
745        let state = client.state.read().await;
746        assert!(!state.connected);
747    }
748
749    /// Helper function for tests to create a client with test configuration
750    async fn test_client() -> Result<Client> {
751        let config = ClientConfig::default();
752        Client::connect(config).await
753    }
754
755    #[tokio::test]
756    async fn test_client_connect() {
757        let client = test_client().await.unwrap();
758
759        // Should be connected
760        let state = client.state.read().await;
761        assert!(state.connected);
762        assert!(state.node_info.is_some());
763    }
764
765    #[tokio::test]
766    async fn test_storage_operations() {
767        let client = test_client().await.unwrap();
768
769        // Store data
770        let data = b"Hello, P2P world!".to_vec();
771        let hash = client.store(data.clone()).await.unwrap();
772
773        // Retrieve data
774        let retrieved = client.retrieve(&hash).await.unwrap();
775        assert_eq!(retrieved, data);
776    }
777
778    #[tokio::test]
779    async fn test_not_connected_error() {
780        let config = ClientConfig::default();
781        let client = Client::new(config).await.unwrap();
782
783        // Operations should fail when not connected
784        let result = client.store(vec![1, 2, 3]).await;
785        assert!(matches!(
786            result.unwrap_err().downcast_ref::<ClientError>(),
787            Some(ClientError::NotConnected)
788        ));
789    }
790
791    #[tokio::test]
792    async fn test_network_stats() {
793        let client = test_client().await.unwrap();
794
795        let stats = client.get_network_stats().await.unwrap();
796        assert!(stats.routing_success_rate >= 0.0);
797        assert!(stats.routing_success_rate <= 1.0);
798    }
799
800    #[tokio::test]
801    async fn test_client_profiles() {
802        // Test different profiles
803        for profile in [
804            ClientProfile::Full,
805            ClientProfile::Light,
806            ClientProfile::Compute,
807            ClientProfile::Mobile,
808        ] {
809            let config = ClientConfig {
810                profile,
811                ..Default::default()
812            };
813            let client = Client::connect(config).await.unwrap();
814            let info = client.get_node_info().await.unwrap();
815            assert!(!info.id.is_empty());
816        }
817    }
818
819    #[tokio::test]
820    async fn test_pubsub_messaging() {
821        let client = test_client().await.unwrap();
822
823        // Subscribe to topic
824        let _stream = client.subscribe("test_topic").await.unwrap();
825
826        // Publish message
827        let message = b"Test message".to_vec();
828        client.publish("test_topic", message.clone()).await.unwrap();
829
830        // In a real implementation, we would receive the message
831        // For now, just check that operations don't fail
832    }
833
834    #[tokio::test]
835    async fn test_compute_job() {
836        let client = test_client().await.unwrap();
837
838        let job = ComputeJob {
839            id: "test_job".to_string(),
840            job_type: "map_reduce".to_string(),
841            input: b"Input data".to_vec(),
842            requirements: ResourceRequirements {
843                cpu_cores: 2,
844                memory_mb: 1024,
845                max_duration: Duration::from_secs(60),
846            },
847        };
848
849        let job_id = client.submit_compute_job(job).await.unwrap();
850        assert!(!job_id.is_empty());
851
852        let result = client.get_job_result(&job_id).await.unwrap();
853        assert_eq!(result.job_id, job_id);
854    }
855}