chie_core/node/
mod.rs

1//! Node management for CHIE Protocol.
2
3use crate::storage::{ChunkStorage, StorageError};
4use chie_crypto::KeyPair;
5use chie_shared::{BandwidthProof, ChunkRequest, ChunkResponse, ContentCid, Points};
6use std::collections::HashMap;
7use std::path::PathBuf;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11/// Configuration for a CHIE node.
12#[derive(Debug, Clone)]
13pub struct NodeConfig {
14    /// Maximum storage to allocate (bytes).
15    pub max_storage_bytes: u64,
16
17    /// Maximum bandwidth to provide (bytes/second).
18    pub max_bandwidth_bps: u64,
19
20    /// Coordinator API endpoint.
21    pub coordinator_url: String,
22
23    /// Storage path for chunk data.
24    pub storage_path: PathBuf,
25}
26
27impl Default for NodeConfig {
28    fn default() -> Self {
29        Self {
30            max_storage_bytes: 50 * 1024 * 1024 * 1024, // 50 GB
31            max_bandwidth_bps: 100 * 1024 * 1024 / 8,   // 100 Mbps
32            coordinator_url: "https://coordinator.chie.network".to_string(),
33            storage_path: PathBuf::from("./chie-storage"),
34        }
35    }
36}
37
38/// Pinned content metadata.
39#[derive(Debug, Clone)]
40pub struct PinnedContent {
41    /// Content CID.
42    pub cid: ContentCid,
43
44    /// Size in bytes.
45    pub size_bytes: u64,
46
47    /// Encryption key for this content.
48    pub encryption_key: [u8; 32],
49
50    /// Expected revenue per GB.
51    pub predicted_revenue_per_gb: f64,
52}
53
54/// Error type for ContentNode operations.
55#[derive(Debug, thiserror::Error)]
56pub enum NodeError {
57    #[error("Storage error: {0}")]
58    Storage(#[from] StorageError),
59
60    #[error("Content not found: {0}")]
61    ContentNotFound(String),
62
63    #[error("Network error: {0}")]
64    Network(String),
65
66    #[error("Proof submission failed: {0}")]
67    ProofSubmission(String),
68}
69
70/// CHIE node instance.
71///
72/// # Examples
73///
74/// ```
75/// use chie_core::{ContentNode, NodeConfig, PinnedContent};
76/// use std::path::PathBuf;
77///
78/// let config = NodeConfig {
79///     storage_path: PathBuf::from("/tmp/chie-test"),
80///     max_storage_bytes: 10 * 1024 * 1024,
81///     max_bandwidth_bps: 100 * 1024 * 1024 / 8,
82///     coordinator_url: "https://coordinator.chie.network".to_string(),
83/// };
84///
85/// let mut node = ContentNode::new(config);
86///
87/// // Pin some content
88/// let content = PinnedContent {
89///     cid: "QmTest123".to_string(),
90///     size_bytes: 1024,
91///     encryption_key: [0u8; 32],
92///     predicted_revenue_per_gb: 10.0,
93/// };
94/// node.pin_content(content);
95///
96/// assert_eq!(node.pinned_count(), 1);
97/// assert!(node.has_content(&"QmTest123".to_string()));
98/// ```
99pub struct ContentNode {
100    /// Node configuration.
101    config: NodeConfig,
102
103    /// Cryptographic key pair (wrapped in Arc for concurrent access).
104    keypair: Arc<KeyPair>,
105
106    /// Pinned content (in-memory metadata).
107    pinned_contents: HashMap<ContentCid, PinnedContent>,
108
109    /// Total earnings accumulated.
110    earnings: Points,
111
112    /// Chunk storage backend.
113    storage: Option<Arc<RwLock<ChunkStorage>>>,
114
115    /// HTTP client with connection pooling for proof submission.
116    http_client: reqwest::Client,
117}
118
119impl ContentNode {
120    /// Create a new content node.
121    pub fn new(config: NodeConfig) -> Self {
122        // Create HTTP client with connection pooling
123        let http_client = reqwest::Client::builder()
124            .pool_max_idle_per_host(10)
125            .pool_idle_timeout(std::time::Duration::from_secs(30))
126            .timeout(std::time::Duration::from_secs(30))
127            .build()
128            .expect("Failed to build HTTP client");
129
130        Self {
131            config,
132            keypair: Arc::new(KeyPair::generate()),
133            pinned_contents: HashMap::new(),
134            earnings: 0,
135            storage: None,
136            http_client,
137        }
138    }
139
140    /// Create a new content node with storage backend.
141    pub async fn with_storage(config: NodeConfig) -> Result<Self, NodeError> {
142        let storage =
143            ChunkStorage::new(config.storage_path.clone(), config.max_storage_bytes).await?;
144
145        // Create HTTP client with connection pooling
146        let http_client = reqwest::Client::builder()
147            .pool_max_idle_per_host(10)
148            .pool_idle_timeout(std::time::Duration::from_secs(30))
149            .timeout(std::time::Duration::from_secs(30))
150            .build()
151            .map_err(|e| NodeError::Network(format!("Failed to build HTTP client: {}", e)))?;
152
153        Ok(Self {
154            config,
155            keypair: Arc::new(KeyPair::generate()),
156            pinned_contents: HashMap::new(),
157            earnings: 0,
158            storage: Some(Arc::new(RwLock::new(storage))),
159            http_client,
160        })
161    }
162
163    /// Set the storage backend.
164    #[inline]
165    pub fn set_storage(&mut self, storage: Arc<RwLock<ChunkStorage>>) {
166        self.storage = Some(storage);
167    }
168
169    /// Get a reference to the storage backend.
170    #[inline]
171    pub fn storage(&self) -> Option<&Arc<RwLock<ChunkStorage>>> {
172        self.storage.as_ref()
173    }
174
175    /// Get the node's public key.
176    #[inline]
177    pub fn public_key(&self) -> [u8; 32] {
178        self.keypair.public_key()
179    }
180
181    /// Get total earnings.
182    #[inline]
183    pub fn earnings(&self) -> Points {
184        self.earnings
185    }
186
187    /// Get the node configuration.
188    #[inline]
189    pub fn config(&self) -> &NodeConfig {
190        &self.config
191    }
192
193    /// Pin content for distribution.
194    #[inline]
195    pub fn pin_content(&mut self, content: PinnedContent) {
196        self.pinned_contents.insert(content.cid.clone(), content);
197    }
198
199    /// Unpin content.
200    #[inline]
201    pub fn unpin_content(&mut self, cid: &ContentCid) -> Option<PinnedContent> {
202        self.pinned_contents.remove(cid)
203    }
204
205    /// Check if content is pinned.
206    #[inline]
207    pub fn has_content(&self, cid: &ContentCid) -> bool {
208        self.pinned_contents.contains_key(cid)
209    }
210
211    /// Get pinned content count.
212    #[inline]
213    pub fn pinned_count(&self) -> usize {
214        self.pinned_contents.len()
215    }
216
217    /// Handle a chunk request and generate a response.
218    ///
219    /// This method reads the requested chunk from storage, signs it, and returns
220    /// a ChunkResponse. If storage is not configured, returns a placeholder response.
221    pub async fn handle_chunk_request(
222        &self,
223        request: ChunkRequest,
224    ) -> Result<ChunkResponse, NodeError> {
225        // Verify content is pinned
226        if !self.pinned_contents.contains_key(&request.content_cid) {
227            return Err(NodeError::ContentNotFound(request.content_cid.clone()));
228        }
229
230        // Read chunk from storage
231        let chunk_data = if let Some(storage) = &self.storage {
232            let storage_guard = storage.read().await;
233            storage_guard
234                .get_chunk(&request.content_cid, request.chunk_index)
235                .await?
236        } else {
237            // Fallback for nodes without storage (testing purposes)
238            vec![0u8; 1024]
239        };
240
241        let chunk_hash = chie_crypto::hash(&chunk_data);
242
243        // Sign the transfer (nonce || hash || requester_pubkey)
244        let message = [
245            &request.challenge_nonce[..],
246            &chunk_hash[..],
247            &request.requester_public_key[..],
248        ]
249        .concat();
250        let signature = self.keypair.sign(&message);
251
252        Ok(ChunkResponse {
253            encrypted_chunk: chunk_data,
254            chunk_hash,
255            provider_signature: signature.to_vec(),
256            provider_public_key: self.keypair.public_key(),
257            challenge_echo: request.challenge_nonce,
258            timestamp_ms: chrono::Utc::now().timestamp_millis(),
259        })
260    }
261
262    /// Handle a chunk request with verification.
263    ///
264    /// Same as handle_chunk_request but also verifies the chunk hash matches storage metadata.
265    pub async fn handle_chunk_request_verified(
266        &self,
267        request: ChunkRequest,
268    ) -> Result<ChunkResponse, NodeError> {
269        // Verify content is pinned
270        if !self.pinned_contents.contains_key(&request.content_cid) {
271            return Err(NodeError::ContentNotFound(request.content_cid.clone()));
272        }
273
274        // Read and verify chunk from storage
275        let (chunk_data, chunk_hash) = if let Some(storage) = &self.storage {
276            let storage_guard = storage.read().await;
277            storage_guard
278                .get_chunk_verified(&request.content_cid, request.chunk_index)
279                .await?
280        } else {
281            let data = vec![0u8; 1024];
282            let hash = chie_crypto::hash(&data);
283            (data, hash)
284        };
285
286        // Sign the transfer
287        let message = [
288            &request.challenge_nonce[..],
289            &chunk_hash[..],
290            &request.requester_public_key[..],
291        ]
292        .concat();
293        let signature = self.keypair.sign(&message);
294
295        Ok(ChunkResponse {
296            encrypted_chunk: chunk_data,
297            chunk_hash,
298            provider_signature: signature.to_vec(),
299            provider_public_key: self.keypair.public_key(),
300            challenge_echo: request.challenge_nonce,
301            timestamp_ms: chrono::Utc::now().timestamp_millis(),
302        })
303    }
304
305    /// Submit a proof to the coordinator using pooled connection.
306    pub async fn submit_proof(&self, proof: BandwidthProof) -> Result<(), NodeError> {
307        let response = self
308            .http_client
309            .post(format!("{}/api/proofs", self.config.coordinator_url))
310            .json(&proof)
311            .send()
312            .await
313            .map_err(|e| NodeError::Network(e.to_string()))?;
314
315        if !response.status().is_success() {
316            return Err(NodeError::ProofSubmission(format!(
317                "Server returned status: {}",
318                response.status()
319            )));
320        }
321
322        Ok(())
323    }
324
325    /// Submit multiple proofs in batch for improved efficiency.
326    pub async fn submit_proofs_batch(&self, proofs: Vec<BandwidthProof>) -> Result<(), NodeError> {
327        let response = self
328            .http_client
329            .post(format!("{}/api/proofs/batch", self.config.coordinator_url))
330            .json(&proofs)
331            .send()
332            .await
333            .map_err(|e| NodeError::Network(e.to_string()))?;
334
335        if !response.status().is_success() {
336            return Err(NodeError::ProofSubmission(format!(
337                "Batch submission failed with status: {}",
338                response.status()
339            )));
340        }
341
342        Ok(())
343    }
344
345    /// Add earnings (called when proof is verified).
346    pub fn add_earnings(&mut self, amount: Points) {
347        self.earnings += amount;
348    }
349
350    /// Get storage statistics if storage is configured.
351    pub async fn storage_stats(&self) -> Option<crate::storage::StorageStats> {
352        if let Some(storage) = &self.storage {
353            let storage_guard = storage.read().await;
354            Some(storage_guard.stats())
355        } else {
356            None
357        }
358    }
359
360    /// Handle multiple chunk requests concurrently for improved throughput.
361    pub async fn handle_chunk_requests_batch(
362        &self,
363        requests: Vec<ChunkRequest>,
364    ) -> Result<Vec<ChunkResponse>, NodeError> {
365        // Verify all requests upfront
366        for request in &requests {
367            if !self.pinned_contents.contains_key(&request.content_cid) {
368                return Err(NodeError::ContentNotFound(request.content_cid.clone()));
369            }
370        }
371
372        // Process requests sequentially (signing requires non-cloneable KeyPair)
373        let mut responses = Vec::with_capacity(requests.len());
374        for request in requests {
375            let response = self.handle_chunk_request(request).await?;
376            responses.push(response);
377        }
378
379        Ok(responses)
380    }
381}
382
383#[cfg(test)]
384mod tests {
385    use super::*;
386    use crate::protocol::{create_bandwidth_proof, create_chunk_request};
387    use chie_crypto::{KeyPair, generate_key, generate_nonce, hash, verify};
388    use tempfile::TempDir;
389
390    #[tokio::test]
391    async fn test_node_creation() {
392        let config = NodeConfig::default();
393        let node = ContentNode::new(config);
394
395        assert_eq!(node.earnings(), 0);
396        assert_eq!(node.pinned_count(), 0);
397    }
398
399    #[tokio::test]
400    async fn test_node_with_storage() {
401        let temp_dir = TempDir::new().unwrap();
402        let config = NodeConfig {
403            storage_path: temp_dir.path().to_path_buf(),
404            max_storage_bytes: 10 * 1024 * 1024, // 10 MB
405            ..Default::default()
406        };
407
408        let node = ContentNode::with_storage(config).await.unwrap();
409        assert!(node.storage().is_some());
410
411        let stats = node.storage_stats().await.unwrap();
412        assert_eq!(stats.used_bytes, 0);
413        assert_eq!(stats.max_bytes, 10 * 1024 * 1024);
414    }
415
416    #[tokio::test]
417    async fn test_pin_unpin_content() {
418        let config = NodeConfig::default();
419        let mut node = ContentNode::new(config);
420
421        let cid = "QmTest123".to_string();
422        let content = PinnedContent {
423            cid: cid.clone(),
424            size_bytes: 1024,
425            encryption_key: [0u8; 32],
426            predicted_revenue_per_gb: 10.0,
427        };
428
429        node.pin_content(content);
430        assert!(node.has_content(&cid));
431        assert_eq!(node.pinned_count(), 1);
432
433        let unpinned = node.unpin_content(&cid);
434        assert!(unpinned.is_some());
435        assert!(!node.has_content(&cid));
436        assert_eq!(node.pinned_count(), 0);
437    }
438
439    #[tokio::test]
440    async fn test_add_earnings() {
441        let config = NodeConfig::default();
442        let mut node = ContentNode::new(config);
443
444        assert_eq!(node.earnings(), 0);
445        node.add_earnings(100);
446        assert_eq!(node.earnings(), 100);
447        node.add_earnings(50);
448        assert_eq!(node.earnings(), 150);
449    }
450
451    #[tokio::test]
452    async fn test_handle_chunk_request_without_storage() {
453        let config = NodeConfig::default();
454        let mut node = ContentNode::new(config);
455
456        let cid = "QmTest123".to_string();
457        let content = PinnedContent {
458            cid: cid.clone(),
459            size_bytes: 1024,
460            encryption_key: [0u8; 32],
461            predicted_revenue_per_gb: 10.0,
462        };
463        node.pin_content(content);
464
465        let requester_keypair = KeyPair::generate();
466        let request = create_chunk_request(
467            cid.clone(),
468            0,
469            "peer-123".to_string(),
470            requester_keypair.public_key(),
471        );
472
473        let response = node.handle_chunk_request(request.clone()).await.unwrap();
474
475        // Verify response structure
476        assert_eq!(response.provider_public_key, node.public_key());
477        assert_eq!(response.challenge_echo, request.challenge_nonce);
478        assert_eq!(response.encrypted_chunk.len(), 1024); // Fallback data
479
480        // Verify signature
481        let message = [
482            &request.challenge_nonce[..],
483            &response.chunk_hash[..],
484            &request.requester_public_key[..],
485        ]
486        .concat();
487        let sig: [u8; 64] = response.provider_signature.as_slice().try_into().unwrap();
488        assert!(verify(&node.public_key(), &message, &sig).is_ok());
489    }
490
491    #[tokio::test]
492    async fn test_handle_chunk_request_content_not_found() {
493        let config = NodeConfig::default();
494        let node = ContentNode::new(config);
495
496        let requester_keypair = KeyPair::generate();
497        let request = create_chunk_request(
498            "QmNonExistent".to_string(),
499            0,
500            "peer-123".to_string(),
501            requester_keypair.public_key(),
502        );
503
504        let result = node.handle_chunk_request(request).await;
505        assert!(result.is_err());
506        assert!(matches!(result.unwrap_err(), NodeError::ContentNotFound(_)));
507    }
508
509    #[tokio::test]
510    async fn test_handle_chunk_request_with_storage() {
511        let temp_dir = TempDir::new().unwrap();
512        let config = NodeConfig {
513            storage_path: temp_dir.path().to_path_buf(),
514            max_storage_bytes: 10 * 1024 * 1024,
515            ..Default::default()
516        };
517
518        let mut node = ContentNode::with_storage(config).await.unwrap();
519
520        // Prepare test data
521        let cid = "QmTest123".to_string();
522        let test_data = b"Hello, CHIE Protocol!".to_vec();
523        let chunks = vec![test_data.clone()];
524
525        // Pin content to storage
526        if let Some(storage_arc) = node.storage() {
527            let mut storage = storage_arc.write().await;
528            let key = generate_key();
529            let nonce = generate_nonce();
530
531            storage
532                .pin_content(&cid, &chunks, &key, &nonce)
533                .await
534                .unwrap();
535        }
536
537        // Pin content metadata in node
538        let content = PinnedContent {
539            cid: cid.clone(),
540            size_bytes: test_data.len() as u64,
541            encryption_key: [0u8; 32],
542            predicted_revenue_per_gb: 10.0,
543        };
544        node.pin_content(content);
545
546        // Create and handle chunk request
547        let requester_keypair = KeyPair::generate();
548        let request = create_chunk_request(
549            cid.clone(),
550            0,
551            "peer-123".to_string(),
552            requester_keypair.public_key(),
553        );
554
555        let response = node.handle_chunk_request(request.clone()).await.unwrap();
556
557        // Verify response
558        assert_eq!(response.provider_public_key, node.public_key());
559        assert_eq!(response.challenge_echo, request.challenge_nonce);
560        assert!(!response.encrypted_chunk.is_empty());
561
562        // Verify signature
563        let message = [
564            &request.challenge_nonce[..],
565            &response.chunk_hash[..],
566            &request.requester_public_key[..],
567        ]
568        .concat();
569        let sig: [u8; 64] = response.provider_signature.as_slice().try_into().unwrap();
570        assert!(verify(&node.public_key(), &message, &sig).is_ok());
571    }
572
573    #[tokio::test]
574    async fn test_handle_chunk_request_verified() {
575        let temp_dir = TempDir::new().unwrap();
576        let config = NodeConfig {
577            storage_path: temp_dir.path().to_path_buf(),
578            max_storage_bytes: 10 * 1024 * 1024,
579            ..Default::default()
580        };
581
582        let mut node = ContentNode::with_storage(config).await.unwrap();
583
584        // Prepare test data
585        let cid = "QmTest456".to_string();
586        let test_data = b"Verified chunk test".to_vec();
587        let chunks = vec![test_data.clone()];
588        let expected_hash = hash(&test_data);
589
590        // Pin content to storage
591        if let Some(storage_arc) = node.storage() {
592            let mut storage = storage_arc.write().await;
593            let key = generate_key();
594            let nonce = generate_nonce();
595
596            storage
597                .pin_content(&cid, &chunks, &key, &nonce)
598                .await
599                .unwrap();
600        }
601
602        // Pin content metadata in node
603        let content = PinnedContent {
604            cid: cid.clone(),
605            size_bytes: test_data.len() as u64,
606            encryption_key: [0u8; 32],
607            predicted_revenue_per_gb: 10.0,
608        };
609        node.pin_content(content);
610
611        // Create and handle verified chunk request
612        let requester_keypair = KeyPair::generate();
613        let request = create_chunk_request(
614            cid.clone(),
615            0,
616            "peer-456".to_string(),
617            requester_keypair.public_key(),
618        );
619
620        let response = node
621            .handle_chunk_request_verified(request.clone())
622            .await
623            .unwrap();
624
625        // Verify hash matches expected
626        assert_eq!(response.chunk_hash, expected_hash);
627
628        // Verify signature
629        let message = [
630            &request.challenge_nonce[..],
631            &response.chunk_hash[..],
632            &request.requester_public_key[..],
633        ]
634        .concat();
635        let sig: [u8; 64] = response.provider_signature.as_slice().try_into().unwrap();
636        assert!(verify(&node.public_key(), &message, &sig).is_ok());
637    }
638
639    #[tokio::test]
640    async fn test_full_bandwidth_proof_flow() {
641        let temp_dir = TempDir::new().unwrap();
642        let config = NodeConfig {
643            storage_path: temp_dir.path().to_path_buf(),
644            max_storage_bytes: 10 * 1024 * 1024,
645            ..Default::default()
646        };
647
648        let mut provider_node = ContentNode::with_storage(config).await.unwrap();
649        let requester_keypair = KeyPair::generate();
650
651        // Setup: Pin content
652        let cid = "QmFullFlow".to_string();
653        let test_data = b"Full bandwidth proof flow test data".to_vec();
654        let chunks = vec![test_data.clone()];
655
656        if let Some(storage_arc) = provider_node.storage() {
657            let mut storage = storage_arc.write().await;
658            let key = generate_key();
659            let nonce = generate_nonce();
660            storage
661                .pin_content(&cid, &chunks, &key, &nonce)
662                .await
663                .unwrap();
664        }
665
666        let content = PinnedContent {
667            cid: cid.clone(),
668            size_bytes: test_data.len() as u64,
669            encryption_key: [0u8; 32],
670            predicted_revenue_per_gb: 10.0,
671        };
672        provider_node.pin_content(content);
673
674        // Step 1: Create chunk request
675        let start_time = chrono::Utc::now().timestamp_millis();
676        let request = create_chunk_request(
677            cid.clone(),
678            0,
679            "requester-peer".to_string(),
680            requester_keypair.public_key(),
681        );
682
683        // Step 2: Provider handles request
684        let response = provider_node
685            .handle_chunk_request_verified(request.clone())
686            .await
687            .unwrap();
688        let end_time = chrono::Utc::now().timestamp_millis();
689
690        // Step 3: Requester signs the response
691        let requester_message = [
692            &request.challenge_nonce[..],
693            &response.chunk_hash[..],
694            &response.provider_public_key[..],
695        ]
696        .concat();
697        let requester_signature = requester_keypair.sign(&requester_message);
698
699        // Step 4: Create bandwidth proof
700        let proof = create_bandwidth_proof(
701            &request,
702            "provider-peer".to_string(),
703            response.provider_public_key.to_vec(),
704            response.encrypted_chunk.len() as u64,
705            response.provider_signature.clone(),
706            requester_signature.to_vec(),
707            response.chunk_hash.to_vec(),
708            start_time,
709            end_time,
710            (end_time - start_time) as u32,
711        );
712
713        // Verify proof structure
714        assert_eq!(proof.content_cid, cid);
715        assert_eq!(proof.chunk_index, 0);
716        assert_eq!(
717            proof.bytes_transferred,
718            response.encrypted_chunk.len() as u64
719        );
720        assert_eq!(
721            proof.provider_public_key,
722            response.provider_public_key.to_vec()
723        );
724        assert_eq!(
725            proof.requester_public_key,
726            requester_keypair.public_key().to_vec()
727        );
728
729        // Verify both signatures
730        let provider_msg = [
731            &request.challenge_nonce[..],
732            &response.chunk_hash[..],
733            &request.requester_public_key[..],
734        ]
735        .concat();
736        let prov_sig: [u8; 64] = proof.provider_signature.as_slice().try_into().unwrap();
737        assert!(verify(&provider_node.public_key(), &provider_msg, &prov_sig).is_ok());
738        let req_sig: [u8; 64] = proof.requester_signature.as_slice().try_into().unwrap();
739        assert!(
740            verify(
741                &requester_keypair.public_key(),
742                &requester_message,
743                &req_sig
744            )
745            .is_ok()
746        );
747    }
748
749    #[tokio::test]
750    async fn test_node_config_default() {
751        let config = NodeConfig::default();
752        assert_eq!(config.max_storage_bytes, 50 * 1024 * 1024 * 1024);
753        assert_eq!(config.max_bandwidth_bps, 100 * 1024 * 1024 / 8);
754        assert_eq!(config.coordinator_url, "https://coordinator.chie.network");
755    }
756}