Skip to main content

chie_core/node/
mod.rs

1//! Node management for CHIE Protocol.
2
3use crate::storage::{ChunkStorage, StorageError};
4use chie_crypto::KeyPair;
5use chie_shared::{BandwidthProof, ChunkRequest, ChunkResponse, ContentCid, Points};
6use std::collections::HashMap;
7use std::path::PathBuf;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11/// Configuration for a CHIE node.
12#[derive(Debug, Clone)]
13pub struct NodeConfig {
14    /// Maximum storage to allocate (bytes).
15    pub max_storage_bytes: u64,
16
17    /// Maximum bandwidth to provide (bytes/second).
18    pub max_bandwidth_bps: u64,
19
20    /// Coordinator API endpoint.
21    pub coordinator_url: String,
22
23    /// Storage path for chunk data.
24    pub storage_path: PathBuf,
25}
26
27impl Default for NodeConfig {
28    fn default() -> Self {
29        Self {
30            max_storage_bytes: 50 * 1024 * 1024 * 1024, // 50 GB
31            max_bandwidth_bps: 100 * 1024 * 1024 / 8,   // 100 Mbps
32            coordinator_url: "https://coordinator.chie.network".to_string(),
33            storage_path: PathBuf::from("./chie-storage"),
34        }
35    }
36}
37
38/// Pinned content metadata.
39#[derive(Debug, Clone)]
40pub struct PinnedContent {
41    /// Content CID.
42    pub cid: ContentCid,
43
44    /// Size in bytes.
45    pub size_bytes: u64,
46
47    /// Encryption key for this content.
48    pub encryption_key: [u8; 32],
49
50    /// Expected revenue per GB.
51    pub predicted_revenue_per_gb: f64,
52}
53
54/// Error type for ContentNode operations.
55#[derive(Debug, thiserror::Error)]
56pub enum NodeError {
57    #[error("Storage error: {0}")]
58    Storage(#[from] StorageError),
59
60    #[error("Content not found: {0}")]
61    ContentNotFound(String),
62
63    #[error("Network error: {0}")]
64    Network(String),
65
66    #[error("Proof submission failed: {0}")]
67    ProofSubmission(String),
68}
69
70/// CHIE node instance.
71///
72/// # Examples
73///
74/// ```
75/// use chie_core::{ContentNode, NodeConfig, PinnedContent};
76/// use std::path::PathBuf;
77///
78/// let config = NodeConfig {
79///     storage_path: PathBuf::from("/tmp/chie-test"),
80///     max_storage_bytes: 10 * 1024 * 1024,
81///     max_bandwidth_bps: 100 * 1024 * 1024 / 8,
82///     coordinator_url: "https://coordinator.chie.network".to_string(),
83/// };
84///
85/// let mut node = ContentNode::new(config);
86///
87/// // Pin some content
88/// let content = PinnedContent {
89///     cid: "QmTest123".to_string(),
90///     size_bytes: 1024,
91///     encryption_key: [0u8; 32],
92///     predicted_revenue_per_gb: 10.0,
93/// };
94/// node.pin_content(content);
95///
96/// assert_eq!(node.pinned_count(), 1);
97/// assert!(node.has_content(&"QmTest123".to_string()));
98/// ```
99pub struct ContentNode {
100    /// Node configuration.
101    config: NodeConfig,
102
103    /// Cryptographic key pair (wrapped in Arc for concurrent access).
104    keypair: Arc<KeyPair>,
105
106    /// Pinned content (in-memory metadata).
107    pinned_contents: HashMap<ContentCid, PinnedContent>,
108
109    /// Total earnings accumulated.
110    earnings: Points,
111
112    /// Chunk storage backend.
113    storage: Option<Arc<RwLock<ChunkStorage>>>,
114
115    /// HTTP client with connection pooling for proof submission.
116    http_client: reqwest::Client,
117}
118
119impl ContentNode {
120    /// Create a new content node.
121    pub fn new(config: NodeConfig) -> Self {
122        // Create HTTP client with connection pooling
123        let http_client = reqwest::Client::builder()
124            .pool_max_idle_per_host(10)
125            .pool_idle_timeout(std::time::Duration::from_secs(30))
126            .timeout(std::time::Duration::from_secs(30))
127            .build()
128            .expect("Failed to build HTTP client");
129
130        Self {
131            config,
132            keypair: Arc::new(KeyPair::generate()),
133            pinned_contents: HashMap::new(),
134            earnings: 0,
135            storage: None,
136            http_client,
137        }
138    }
139
140    /// Create a new content node with storage backend.
141    pub async fn with_storage(config: NodeConfig) -> Result<Self, NodeError> {
142        let storage =
143            ChunkStorage::new(config.storage_path.clone(), config.max_storage_bytes).await?;
144
145        // Create HTTP client with connection pooling
146        let http_client = reqwest::Client::builder()
147            .pool_max_idle_per_host(10)
148            .pool_idle_timeout(std::time::Duration::from_secs(30))
149            .timeout(std::time::Duration::from_secs(30))
150            .build()
151            .map_err(|e| NodeError::Network(format!("Failed to build HTTP client: {}", e)))?;
152
153        Ok(Self {
154            config,
155            keypair: Arc::new(KeyPair::generate()),
156            pinned_contents: HashMap::new(),
157            earnings: 0,
158            storage: Some(Arc::new(RwLock::new(storage))),
159            http_client,
160        })
161    }
162
163    /// Set the storage backend.
164    #[inline]
165    pub fn set_storage(&mut self, storage: Arc<RwLock<ChunkStorage>>) {
166        self.storage = Some(storage);
167    }
168
169    /// Get a reference to the storage backend.
170    #[inline]
171    pub fn storage(&self) -> Option<&Arc<RwLock<ChunkStorage>>> {
172        self.storage.as_ref()
173    }
174
175    /// Get the node's public key.
176    #[inline]
177    pub fn public_key(&self) -> [u8; 32] {
178        self.keypair.public_key()
179    }
180
181    /// Get total earnings.
182    #[inline]
183    pub fn earnings(&self) -> Points {
184        self.earnings
185    }
186
187    /// Get the node configuration.
188    #[inline]
189    pub fn config(&self) -> &NodeConfig {
190        &self.config
191    }
192
193    /// Pin content for distribution.
194    #[inline]
195    pub fn pin_content(&mut self, content: PinnedContent) {
196        self.pinned_contents.insert(content.cid.clone(), content);
197    }
198
199    /// Unpin content.
200    #[inline]
201    pub fn unpin_content(&mut self, cid: &ContentCid) -> Option<PinnedContent> {
202        self.pinned_contents.remove(cid)
203    }
204
205    /// Check if content is pinned.
206    #[inline]
207    pub fn has_content(&self, cid: &ContentCid) -> bool {
208        self.pinned_contents.contains_key(cid)
209    }
210
211    /// Get pinned content count.
212    #[inline]
213    pub fn pinned_count(&self) -> usize {
214        self.pinned_contents.len()
215    }
216
217    /// Get all pinned content metadata.
218    #[inline]
219    pub fn pinned_contents(&self) -> &HashMap<ContentCid, PinnedContent> {
220        &self.pinned_contents
221    }
222
223    /// Handle a chunk request and generate a response.
224    ///
225    /// This method reads the requested chunk from storage, signs it, and returns
226    /// a ChunkResponse. If storage is not configured, returns a placeholder response.
227    pub async fn handle_chunk_request(
228        &self,
229        request: ChunkRequest,
230    ) -> Result<ChunkResponse, NodeError> {
231        // Verify content is pinned
232        if !self.pinned_contents.contains_key(&request.content_cid) {
233            return Err(NodeError::ContentNotFound(request.content_cid.clone()));
234        }
235
236        // Read chunk from storage
237        let chunk_data = if let Some(storage) = &self.storage {
238            let storage_guard = storage.read().await;
239            storage_guard
240                .get_chunk(&request.content_cid, request.chunk_index)
241                .await?
242        } else {
243            // Fallback for nodes without storage (testing purposes)
244            vec![0u8; 1024]
245        };
246
247        let chunk_hash = chie_crypto::hash(&chunk_data);
248
249        // Sign the transfer (nonce || hash || requester_pubkey)
250        let message = [
251            &request.challenge_nonce[..],
252            &chunk_hash[..],
253            &request.requester_public_key[..],
254        ]
255        .concat();
256        let signature = self.keypair.sign(&message);
257
258        Ok(ChunkResponse {
259            encrypted_chunk: chunk_data,
260            chunk_hash,
261            provider_signature: signature.to_vec(),
262            provider_public_key: self.keypair.public_key(),
263            challenge_echo: request.challenge_nonce,
264            timestamp_ms: chrono::Utc::now().timestamp_millis(),
265        })
266    }
267
268    /// Handle a chunk request with verification.
269    ///
270    /// Same as handle_chunk_request but also verifies the chunk hash matches storage metadata.
271    pub async fn handle_chunk_request_verified(
272        &self,
273        request: ChunkRequest,
274    ) -> Result<ChunkResponse, NodeError> {
275        // Verify content is pinned
276        if !self.pinned_contents.contains_key(&request.content_cid) {
277            return Err(NodeError::ContentNotFound(request.content_cid.clone()));
278        }
279
280        // Read and verify chunk from storage
281        let (chunk_data, chunk_hash) = if let Some(storage) = &self.storage {
282            let storage_guard = storage.read().await;
283            storage_guard
284                .get_chunk_verified(&request.content_cid, request.chunk_index)
285                .await?
286        } else {
287            let data = vec![0u8; 1024];
288            let hash = chie_crypto::hash(&data);
289            (data, hash)
290        };
291
292        // Sign the transfer
293        let message = [
294            &request.challenge_nonce[..],
295            &chunk_hash[..],
296            &request.requester_public_key[..],
297        ]
298        .concat();
299        let signature = self.keypair.sign(&message);
300
301        Ok(ChunkResponse {
302            encrypted_chunk: chunk_data,
303            chunk_hash,
304            provider_signature: signature.to_vec(),
305            provider_public_key: self.keypair.public_key(),
306            challenge_echo: request.challenge_nonce,
307            timestamp_ms: chrono::Utc::now().timestamp_millis(),
308        })
309    }
310
311    /// Submit a proof to the coordinator using pooled connection.
312    pub async fn submit_proof(&self, proof: BandwidthProof) -> Result<(), NodeError> {
313        let response = self
314            .http_client
315            .post(format!("{}/api/proofs", self.config.coordinator_url))
316            .json(&proof)
317            .send()
318            .await
319            .map_err(|e| NodeError::Network(e.to_string()))?;
320
321        if !response.status().is_success() {
322            return Err(NodeError::ProofSubmission(format!(
323                "Server returned status: {}",
324                response.status()
325            )));
326        }
327
328        Ok(())
329    }
330
331    /// Submit multiple proofs in batch for improved efficiency.
332    pub async fn submit_proofs_batch(&self, proofs: Vec<BandwidthProof>) -> Result<(), NodeError> {
333        let response = self
334            .http_client
335            .post(format!("{}/api/proofs/batch", self.config.coordinator_url))
336            .json(&proofs)
337            .send()
338            .await
339            .map_err(|e| NodeError::Network(e.to_string()))?;
340
341        if !response.status().is_success() {
342            return Err(NodeError::ProofSubmission(format!(
343                "Batch submission failed with status: {}",
344                response.status()
345            )));
346        }
347
348        Ok(())
349    }
350
351    /// Add earnings (called when proof is verified).
352    pub fn add_earnings(&mut self, amount: Points) {
353        self.earnings += amount;
354    }
355
356    /// Get storage statistics if storage is configured.
357    pub async fn storage_stats(&self) -> Option<crate::storage::StorageStats> {
358        if let Some(storage) = &self.storage {
359            let storage_guard = storage.read().await;
360            Some(storage_guard.stats())
361        } else {
362            None
363        }
364    }
365
366    /// Handle multiple chunk requests concurrently for improved throughput.
367    pub async fn handle_chunk_requests_batch(
368        &self,
369        requests: Vec<ChunkRequest>,
370    ) -> Result<Vec<ChunkResponse>, NodeError> {
371        // Verify all requests upfront
372        for request in &requests {
373            if !self.pinned_contents.contains_key(&request.content_cid) {
374                return Err(NodeError::ContentNotFound(request.content_cid.clone()));
375            }
376        }
377
378        // Process requests sequentially (signing requires non-cloneable KeyPair)
379        let mut responses = Vec::with_capacity(requests.len());
380        for request in requests {
381            let response = self.handle_chunk_request(request).await?;
382            responses.push(response);
383        }
384
385        Ok(responses)
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392    use crate::protocol::{create_bandwidth_proof, create_chunk_request};
393    use chie_crypto::{KeyPair, generate_key, generate_nonce, hash, verify};
394    use tempfile::TempDir;
395
396    #[tokio::test]
397    async fn test_node_creation() {
398        let config = NodeConfig::default();
399        let node = ContentNode::new(config);
400
401        assert_eq!(node.earnings(), 0);
402        assert_eq!(node.pinned_count(), 0);
403    }
404
405    #[tokio::test]
406    async fn test_node_with_storage() {
407        let temp_dir = TempDir::new().unwrap();
408        let config = NodeConfig {
409            storage_path: temp_dir.path().to_path_buf(),
410            max_storage_bytes: 10 * 1024 * 1024, // 10 MB
411            ..Default::default()
412        };
413
414        let node = ContentNode::with_storage(config).await.unwrap();
415        assert!(node.storage().is_some());
416
417        let stats = node.storage_stats().await.unwrap();
418        assert_eq!(stats.used_bytes, 0);
419        assert_eq!(stats.max_bytes, 10 * 1024 * 1024);
420    }
421
422    #[tokio::test]
423    async fn test_pin_unpin_content() {
424        let config = NodeConfig::default();
425        let mut node = ContentNode::new(config);
426
427        let cid = "QmTest123".to_string();
428        let content = PinnedContent {
429            cid: cid.clone(),
430            size_bytes: 1024,
431            encryption_key: [0u8; 32],
432            predicted_revenue_per_gb: 10.0,
433        };
434
435        node.pin_content(content);
436        assert!(node.has_content(&cid));
437        assert_eq!(node.pinned_count(), 1);
438
439        let unpinned = node.unpin_content(&cid);
440        assert!(unpinned.is_some());
441        assert!(!node.has_content(&cid));
442        assert_eq!(node.pinned_count(), 0);
443    }
444
445    #[tokio::test]
446    async fn test_add_earnings() {
447        let config = NodeConfig::default();
448        let mut node = ContentNode::new(config);
449
450        assert_eq!(node.earnings(), 0);
451        node.add_earnings(100);
452        assert_eq!(node.earnings(), 100);
453        node.add_earnings(50);
454        assert_eq!(node.earnings(), 150);
455    }
456
457    #[tokio::test]
458    async fn test_handle_chunk_request_without_storage() {
459        let config = NodeConfig::default();
460        let mut node = ContentNode::new(config);
461
462        let cid = "QmTest123".to_string();
463        let content = PinnedContent {
464            cid: cid.clone(),
465            size_bytes: 1024,
466            encryption_key: [0u8; 32],
467            predicted_revenue_per_gb: 10.0,
468        };
469        node.pin_content(content);
470
471        let requester_keypair = KeyPair::generate();
472        let request = create_chunk_request(
473            cid.clone(),
474            0,
475            "peer-123".to_string(),
476            requester_keypair.public_key(),
477        );
478
479        let response = node.handle_chunk_request(request.clone()).await.unwrap();
480
481        // Verify response structure
482        assert_eq!(response.provider_public_key, node.public_key());
483        assert_eq!(response.challenge_echo, request.challenge_nonce);
484        assert_eq!(response.encrypted_chunk.len(), 1024); // Fallback data
485
486        // Verify signature
487        let message = [
488            &request.challenge_nonce[..],
489            &response.chunk_hash[..],
490            &request.requester_public_key[..],
491        ]
492        .concat();
493        let sig: [u8; 64] = response.provider_signature.as_slice().try_into().unwrap();
494        assert!(verify(&node.public_key(), &message, &sig).is_ok());
495    }
496
497    #[tokio::test]
498    async fn test_handle_chunk_request_content_not_found() {
499        let config = NodeConfig::default();
500        let node = ContentNode::new(config);
501
502        let requester_keypair = KeyPair::generate();
503        let request = create_chunk_request(
504            "QmNonExistent".to_string(),
505            0,
506            "peer-123".to_string(),
507            requester_keypair.public_key(),
508        );
509
510        let result = node.handle_chunk_request(request).await;
511        assert!(result.is_err());
512        assert!(matches!(result.unwrap_err(), NodeError::ContentNotFound(_)));
513    }
514
515    #[tokio::test]
516    async fn test_handle_chunk_request_with_storage() {
517        let temp_dir = TempDir::new().unwrap();
518        let config = NodeConfig {
519            storage_path: temp_dir.path().to_path_buf(),
520            max_storage_bytes: 10 * 1024 * 1024,
521            ..Default::default()
522        };
523
524        let mut node = ContentNode::with_storage(config).await.unwrap();
525
526        // Prepare test data
527        let cid = "QmTest123".to_string();
528        let test_data = b"Hello, CHIE Protocol!".to_vec();
529        let chunks = vec![test_data.clone()];
530
531        // Pin content to storage
532        if let Some(storage_arc) = node.storage() {
533            let mut storage = storage_arc.write().await;
534            let key = generate_key();
535            let nonce = generate_nonce();
536
537            storage
538                .pin_content(&cid, &chunks, &key, &nonce)
539                .await
540                .unwrap();
541        }
542
543        // Pin content metadata in node
544        let content = PinnedContent {
545            cid: cid.clone(),
546            size_bytes: test_data.len() as u64,
547            encryption_key: [0u8; 32],
548            predicted_revenue_per_gb: 10.0,
549        };
550        node.pin_content(content);
551
552        // Create and handle chunk request
553        let requester_keypair = KeyPair::generate();
554        let request = create_chunk_request(
555            cid.clone(),
556            0,
557            "peer-123".to_string(),
558            requester_keypair.public_key(),
559        );
560
561        let response = node.handle_chunk_request(request.clone()).await.unwrap();
562
563        // Verify response
564        assert_eq!(response.provider_public_key, node.public_key());
565        assert_eq!(response.challenge_echo, request.challenge_nonce);
566        assert!(!response.encrypted_chunk.is_empty());
567
568        // Verify signature
569        let message = [
570            &request.challenge_nonce[..],
571            &response.chunk_hash[..],
572            &request.requester_public_key[..],
573        ]
574        .concat();
575        let sig: [u8; 64] = response.provider_signature.as_slice().try_into().unwrap();
576        assert!(verify(&node.public_key(), &message, &sig).is_ok());
577    }
578
579    #[tokio::test]
580    async fn test_handle_chunk_request_verified() {
581        let temp_dir = TempDir::new().unwrap();
582        let config = NodeConfig {
583            storage_path: temp_dir.path().to_path_buf(),
584            max_storage_bytes: 10 * 1024 * 1024,
585            ..Default::default()
586        };
587
588        let mut node = ContentNode::with_storage(config).await.unwrap();
589
590        // Prepare test data
591        let cid = "QmTest456".to_string();
592        let test_data = b"Verified chunk test".to_vec();
593        let chunks = vec![test_data.clone()];
594        let expected_hash = hash(&test_data);
595
596        // Pin content to storage
597        if let Some(storage_arc) = node.storage() {
598            let mut storage = storage_arc.write().await;
599            let key = generate_key();
600            let nonce = generate_nonce();
601
602            storage
603                .pin_content(&cid, &chunks, &key, &nonce)
604                .await
605                .unwrap();
606        }
607
608        // Pin content metadata in node
609        let content = PinnedContent {
610            cid: cid.clone(),
611            size_bytes: test_data.len() as u64,
612            encryption_key: [0u8; 32],
613            predicted_revenue_per_gb: 10.0,
614        };
615        node.pin_content(content);
616
617        // Create and handle verified chunk request
618        let requester_keypair = KeyPair::generate();
619        let request = create_chunk_request(
620            cid.clone(),
621            0,
622            "peer-456".to_string(),
623            requester_keypair.public_key(),
624        );
625
626        let response = node
627            .handle_chunk_request_verified(request.clone())
628            .await
629            .unwrap();
630
631        // Verify hash matches expected
632        assert_eq!(response.chunk_hash, expected_hash);
633
634        // Verify signature
635        let message = [
636            &request.challenge_nonce[..],
637            &response.chunk_hash[..],
638            &request.requester_public_key[..],
639        ]
640        .concat();
641        let sig: [u8; 64] = response.provider_signature.as_slice().try_into().unwrap();
642        assert!(verify(&node.public_key(), &message, &sig).is_ok());
643    }
644
645    #[tokio::test]
646    async fn test_full_bandwidth_proof_flow() {
647        let temp_dir = TempDir::new().unwrap();
648        let config = NodeConfig {
649            storage_path: temp_dir.path().to_path_buf(),
650            max_storage_bytes: 10 * 1024 * 1024,
651            ..Default::default()
652        };
653
654        let mut provider_node = ContentNode::with_storage(config).await.unwrap();
655        let requester_keypair = KeyPair::generate();
656
657        // Setup: Pin content
658        let cid = "QmFullFlow".to_string();
659        let test_data = b"Full bandwidth proof flow test data".to_vec();
660        let chunks = vec![test_data.clone()];
661
662        if let Some(storage_arc) = provider_node.storage() {
663            let mut storage = storage_arc.write().await;
664            let key = generate_key();
665            let nonce = generate_nonce();
666            storage
667                .pin_content(&cid, &chunks, &key, &nonce)
668                .await
669                .unwrap();
670        }
671
672        let content = PinnedContent {
673            cid: cid.clone(),
674            size_bytes: test_data.len() as u64,
675            encryption_key: [0u8; 32],
676            predicted_revenue_per_gb: 10.0,
677        };
678        provider_node.pin_content(content);
679
680        // Step 1: Create chunk request
681        let start_time = chrono::Utc::now().timestamp_millis();
682        let request = create_chunk_request(
683            cid.clone(),
684            0,
685            "requester-peer".to_string(),
686            requester_keypair.public_key(),
687        );
688
689        // Step 2: Provider handles request
690        let response = provider_node
691            .handle_chunk_request_verified(request.clone())
692            .await
693            .unwrap();
694        let end_time = chrono::Utc::now().timestamp_millis();
695
696        // Step 3: Requester signs the response
697        let requester_message = [
698            &request.challenge_nonce[..],
699            &response.chunk_hash[..],
700            &response.provider_public_key[..],
701        ]
702        .concat();
703        let requester_signature = requester_keypair.sign(&requester_message);
704
705        // Step 4: Create bandwidth proof
706        let proof = create_bandwidth_proof(
707            &request,
708            "provider-peer".to_string(),
709            response.provider_public_key.to_vec(),
710            response.encrypted_chunk.len() as u64,
711            response.provider_signature.clone(),
712            requester_signature.to_vec(),
713            response.chunk_hash.to_vec(),
714            start_time,
715            end_time,
716            (end_time - start_time) as u32,
717        );
718
719        // Verify proof structure
720        assert_eq!(proof.content_cid, cid);
721        assert_eq!(proof.chunk_index, 0);
722        assert_eq!(
723            proof.bytes_transferred,
724            response.encrypted_chunk.len() as u64
725        );
726        assert_eq!(
727            proof.provider_public_key,
728            response.provider_public_key.to_vec()
729        );
730        assert_eq!(
731            proof.requester_public_key,
732            requester_keypair.public_key().to_vec()
733        );
734
735        // Verify both signatures
736        let provider_msg = [
737            &request.challenge_nonce[..],
738            &response.chunk_hash[..],
739            &request.requester_public_key[..],
740        ]
741        .concat();
742        let prov_sig: [u8; 64] = proof.provider_signature.as_slice().try_into().unwrap();
743        assert!(verify(&provider_node.public_key(), &provider_msg, &prov_sig).is_ok());
744        let req_sig: [u8; 64] = proof.requester_signature.as_slice().try_into().unwrap();
745        assert!(
746            verify(
747                &requester_keypair.public_key(),
748                &requester_message,
749                &req_sig
750            )
751            .is_ok()
752        );
753    }
754
755    #[tokio::test]
756    async fn test_node_config_default() {
757        let config = NodeConfig::default();
758        assert_eq!(config.max_storage_bytes, 50 * 1024 * 1024 * 1024);
759        assert_eq!(config.max_bandwidth_bps, 100 * 1024 * 1024 / 8);
760        assert_eq!(config.coordinator_url, "https://coordinator.chie.network");
761    }
762}