1use crate::storage::{ChunkStorage, StorageError};
4use chie_crypto::KeyPair;
5use chie_shared::{BandwidthProof, ChunkRequest, ChunkResponse, ContentCid, Points};
6use std::collections::HashMap;
7use std::path::PathBuf;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11#[derive(Debug, Clone)]
13pub struct NodeConfig {
14 pub max_storage_bytes: u64,
16
17 pub max_bandwidth_bps: u64,
19
20 pub coordinator_url: String,
22
23 pub storage_path: PathBuf,
25}
26
27impl Default for NodeConfig {
28 fn default() -> Self {
29 Self {
30 max_storage_bytes: 50 * 1024 * 1024 * 1024, max_bandwidth_bps: 100 * 1024 * 1024 / 8, coordinator_url: "https://coordinator.chie.network".to_string(),
33 storage_path: PathBuf::from("./chie-storage"),
34 }
35 }
36}
37
38#[derive(Debug, Clone)]
40pub struct PinnedContent {
41 pub cid: ContentCid,
43
44 pub size_bytes: u64,
46
47 pub encryption_key: [u8; 32],
49
50 pub predicted_revenue_per_gb: f64,
52}
53
54#[derive(Debug, thiserror::Error)]
56pub enum NodeError {
57 #[error("Storage error: {0}")]
58 Storage(#[from] StorageError),
59
60 #[error("Content not found: {0}")]
61 ContentNotFound(String),
62
63 #[error("Network error: {0}")]
64 Network(String),
65
66 #[error("Proof submission failed: {0}")]
67 ProofSubmission(String),
68}
69
70pub struct ContentNode {
100 config: NodeConfig,
102
103 keypair: Arc<KeyPair>,
105
106 pinned_contents: HashMap<ContentCid, PinnedContent>,
108
109 earnings: Points,
111
112 storage: Option<Arc<RwLock<ChunkStorage>>>,
114
115 http_client: reqwest::Client,
117}
118
119impl ContentNode {
120 pub fn new(config: NodeConfig) -> Self {
122 let http_client = reqwest::Client::builder()
124 .pool_max_idle_per_host(10)
125 .pool_idle_timeout(std::time::Duration::from_secs(30))
126 .timeout(std::time::Duration::from_secs(30))
127 .build()
128 .expect("Failed to build HTTP client");
129
130 Self {
131 config,
132 keypair: Arc::new(KeyPair::generate()),
133 pinned_contents: HashMap::new(),
134 earnings: 0,
135 storage: None,
136 http_client,
137 }
138 }
139
140 pub async fn with_storage(config: NodeConfig) -> Result<Self, NodeError> {
142 let storage =
143 ChunkStorage::new(config.storage_path.clone(), config.max_storage_bytes).await?;
144
145 let http_client = reqwest::Client::builder()
147 .pool_max_idle_per_host(10)
148 .pool_idle_timeout(std::time::Duration::from_secs(30))
149 .timeout(std::time::Duration::from_secs(30))
150 .build()
151 .map_err(|e| NodeError::Network(format!("Failed to build HTTP client: {}", e)))?;
152
153 Ok(Self {
154 config,
155 keypair: Arc::new(KeyPair::generate()),
156 pinned_contents: HashMap::new(),
157 earnings: 0,
158 storage: Some(Arc::new(RwLock::new(storage))),
159 http_client,
160 })
161 }
162
163 #[inline]
165 pub fn set_storage(&mut self, storage: Arc<RwLock<ChunkStorage>>) {
166 self.storage = Some(storage);
167 }
168
169 #[inline]
171 pub fn storage(&self) -> Option<&Arc<RwLock<ChunkStorage>>> {
172 self.storage.as_ref()
173 }
174
175 #[inline]
177 pub fn public_key(&self) -> [u8; 32] {
178 self.keypair.public_key()
179 }
180
181 #[inline]
183 pub fn earnings(&self) -> Points {
184 self.earnings
185 }
186
187 #[inline]
189 pub fn config(&self) -> &NodeConfig {
190 &self.config
191 }
192
193 #[inline]
195 pub fn pin_content(&mut self, content: PinnedContent) {
196 self.pinned_contents.insert(content.cid.clone(), content);
197 }
198
199 #[inline]
201 pub fn unpin_content(&mut self, cid: &ContentCid) -> Option<PinnedContent> {
202 self.pinned_contents.remove(cid)
203 }
204
205 #[inline]
207 pub fn has_content(&self, cid: &ContentCid) -> bool {
208 self.pinned_contents.contains_key(cid)
209 }
210
211 #[inline]
213 pub fn pinned_count(&self) -> usize {
214 self.pinned_contents.len()
215 }
216
217 #[inline]
219 pub fn pinned_contents(&self) -> &HashMap<ContentCid, PinnedContent> {
220 &self.pinned_contents
221 }
222
223 pub async fn handle_chunk_request(
228 &self,
229 request: ChunkRequest,
230 ) -> Result<ChunkResponse, NodeError> {
231 if !self.pinned_contents.contains_key(&request.content_cid) {
233 return Err(NodeError::ContentNotFound(request.content_cid.clone()));
234 }
235
236 let chunk_data = if let Some(storage) = &self.storage {
238 let storage_guard = storage.read().await;
239 storage_guard
240 .get_chunk(&request.content_cid, request.chunk_index)
241 .await?
242 } else {
243 vec![0u8; 1024]
245 };
246
247 let chunk_hash = chie_crypto::hash(&chunk_data);
248
249 let message = [
251 &request.challenge_nonce[..],
252 &chunk_hash[..],
253 &request.requester_public_key[..],
254 ]
255 .concat();
256 let signature = self.keypair.sign(&message);
257
258 Ok(ChunkResponse {
259 encrypted_chunk: chunk_data,
260 chunk_hash,
261 provider_signature: signature.to_vec(),
262 provider_public_key: self.keypair.public_key(),
263 challenge_echo: request.challenge_nonce,
264 timestamp_ms: chrono::Utc::now().timestamp_millis(),
265 })
266 }
267
268 pub async fn handle_chunk_request_verified(
272 &self,
273 request: ChunkRequest,
274 ) -> Result<ChunkResponse, NodeError> {
275 if !self.pinned_contents.contains_key(&request.content_cid) {
277 return Err(NodeError::ContentNotFound(request.content_cid.clone()));
278 }
279
280 let (chunk_data, chunk_hash) = if let Some(storage) = &self.storage {
282 let storage_guard = storage.read().await;
283 storage_guard
284 .get_chunk_verified(&request.content_cid, request.chunk_index)
285 .await?
286 } else {
287 let data = vec![0u8; 1024];
288 let hash = chie_crypto::hash(&data);
289 (data, hash)
290 };
291
292 let message = [
294 &request.challenge_nonce[..],
295 &chunk_hash[..],
296 &request.requester_public_key[..],
297 ]
298 .concat();
299 let signature = self.keypair.sign(&message);
300
301 Ok(ChunkResponse {
302 encrypted_chunk: chunk_data,
303 chunk_hash,
304 provider_signature: signature.to_vec(),
305 provider_public_key: self.keypair.public_key(),
306 challenge_echo: request.challenge_nonce,
307 timestamp_ms: chrono::Utc::now().timestamp_millis(),
308 })
309 }
310
311 pub async fn submit_proof(&self, proof: BandwidthProof) -> Result<(), NodeError> {
313 let response = self
314 .http_client
315 .post(format!("{}/api/proofs", self.config.coordinator_url))
316 .json(&proof)
317 .send()
318 .await
319 .map_err(|e| NodeError::Network(e.to_string()))?;
320
321 if !response.status().is_success() {
322 return Err(NodeError::ProofSubmission(format!(
323 "Server returned status: {}",
324 response.status()
325 )));
326 }
327
328 Ok(())
329 }
330
331 pub async fn submit_proofs_batch(&self, proofs: Vec<BandwidthProof>) -> Result<(), NodeError> {
333 let response = self
334 .http_client
335 .post(format!("{}/api/proofs/batch", self.config.coordinator_url))
336 .json(&proofs)
337 .send()
338 .await
339 .map_err(|e| NodeError::Network(e.to_string()))?;
340
341 if !response.status().is_success() {
342 return Err(NodeError::ProofSubmission(format!(
343 "Batch submission failed with status: {}",
344 response.status()
345 )));
346 }
347
348 Ok(())
349 }
350
351 pub fn add_earnings(&mut self, amount: Points) {
353 self.earnings += amount;
354 }
355
356 pub async fn storage_stats(&self) -> Option<crate::storage::StorageStats> {
358 if let Some(storage) = &self.storage {
359 let storage_guard = storage.read().await;
360 Some(storage_guard.stats())
361 } else {
362 None
363 }
364 }
365
366 pub async fn handle_chunk_requests_batch(
368 &self,
369 requests: Vec<ChunkRequest>,
370 ) -> Result<Vec<ChunkResponse>, NodeError> {
371 for request in &requests {
373 if !self.pinned_contents.contains_key(&request.content_cid) {
374 return Err(NodeError::ContentNotFound(request.content_cid.clone()));
375 }
376 }
377
378 let mut responses = Vec::with_capacity(requests.len());
380 for request in requests {
381 let response = self.handle_chunk_request(request).await?;
382 responses.push(response);
383 }
384
385 Ok(responses)
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392 use crate::protocol::{create_bandwidth_proof, create_chunk_request};
393 use chie_crypto::{KeyPair, generate_key, generate_nonce, hash, verify};
394 use tempfile::TempDir;
395
396 #[tokio::test]
397 async fn test_node_creation() {
398 let config = NodeConfig::default();
399 let node = ContentNode::new(config);
400
401 assert_eq!(node.earnings(), 0);
402 assert_eq!(node.pinned_count(), 0);
403 }
404
405 #[tokio::test]
406 async fn test_node_with_storage() {
407 let temp_dir = TempDir::new().unwrap();
408 let config = NodeConfig {
409 storage_path: temp_dir.path().to_path_buf(),
410 max_storage_bytes: 10 * 1024 * 1024, ..Default::default()
412 };
413
414 let node = ContentNode::with_storage(config).await.unwrap();
415 assert!(node.storage().is_some());
416
417 let stats = node.storage_stats().await.unwrap();
418 assert_eq!(stats.used_bytes, 0);
419 assert_eq!(stats.max_bytes, 10 * 1024 * 1024);
420 }
421
422 #[tokio::test]
423 async fn test_pin_unpin_content() {
424 let config = NodeConfig::default();
425 let mut node = ContentNode::new(config);
426
427 let cid = "QmTest123".to_string();
428 let content = PinnedContent {
429 cid: cid.clone(),
430 size_bytes: 1024,
431 encryption_key: [0u8; 32],
432 predicted_revenue_per_gb: 10.0,
433 };
434
435 node.pin_content(content);
436 assert!(node.has_content(&cid));
437 assert_eq!(node.pinned_count(), 1);
438
439 let unpinned = node.unpin_content(&cid);
440 assert!(unpinned.is_some());
441 assert!(!node.has_content(&cid));
442 assert_eq!(node.pinned_count(), 0);
443 }
444
445 #[tokio::test]
446 async fn test_add_earnings() {
447 let config = NodeConfig::default();
448 let mut node = ContentNode::new(config);
449
450 assert_eq!(node.earnings(), 0);
451 node.add_earnings(100);
452 assert_eq!(node.earnings(), 100);
453 node.add_earnings(50);
454 assert_eq!(node.earnings(), 150);
455 }
456
457 #[tokio::test]
458 async fn test_handle_chunk_request_without_storage() {
459 let config = NodeConfig::default();
460 let mut node = ContentNode::new(config);
461
462 let cid = "QmTest123".to_string();
463 let content = PinnedContent {
464 cid: cid.clone(),
465 size_bytes: 1024,
466 encryption_key: [0u8; 32],
467 predicted_revenue_per_gb: 10.0,
468 };
469 node.pin_content(content);
470
471 let requester_keypair = KeyPair::generate();
472 let request = create_chunk_request(
473 cid.clone(),
474 0,
475 "peer-123".to_string(),
476 requester_keypair.public_key(),
477 );
478
479 let response = node.handle_chunk_request(request.clone()).await.unwrap();
480
481 assert_eq!(response.provider_public_key, node.public_key());
483 assert_eq!(response.challenge_echo, request.challenge_nonce);
484 assert_eq!(response.encrypted_chunk.len(), 1024); let message = [
488 &request.challenge_nonce[..],
489 &response.chunk_hash[..],
490 &request.requester_public_key[..],
491 ]
492 .concat();
493 let sig: [u8; 64] = response.provider_signature.as_slice().try_into().unwrap();
494 assert!(verify(&node.public_key(), &message, &sig).is_ok());
495 }
496
497 #[tokio::test]
498 async fn test_handle_chunk_request_content_not_found() {
499 let config = NodeConfig::default();
500 let node = ContentNode::new(config);
501
502 let requester_keypair = KeyPair::generate();
503 let request = create_chunk_request(
504 "QmNonExistent".to_string(),
505 0,
506 "peer-123".to_string(),
507 requester_keypair.public_key(),
508 );
509
510 let result = node.handle_chunk_request(request).await;
511 assert!(result.is_err());
512 assert!(matches!(result.unwrap_err(), NodeError::ContentNotFound(_)));
513 }
514
515 #[tokio::test]
516 async fn test_handle_chunk_request_with_storage() {
517 let temp_dir = TempDir::new().unwrap();
518 let config = NodeConfig {
519 storage_path: temp_dir.path().to_path_buf(),
520 max_storage_bytes: 10 * 1024 * 1024,
521 ..Default::default()
522 };
523
524 let mut node = ContentNode::with_storage(config).await.unwrap();
525
526 let cid = "QmTest123".to_string();
528 let test_data = b"Hello, CHIE Protocol!".to_vec();
529 let chunks = vec![test_data.clone()];
530
531 if let Some(storage_arc) = node.storage() {
533 let mut storage = storage_arc.write().await;
534 let key = generate_key();
535 let nonce = generate_nonce();
536
537 storage
538 .pin_content(&cid, &chunks, &key, &nonce)
539 .await
540 .unwrap();
541 }
542
543 let content = PinnedContent {
545 cid: cid.clone(),
546 size_bytes: test_data.len() as u64,
547 encryption_key: [0u8; 32],
548 predicted_revenue_per_gb: 10.0,
549 };
550 node.pin_content(content);
551
552 let requester_keypair = KeyPair::generate();
554 let request = create_chunk_request(
555 cid.clone(),
556 0,
557 "peer-123".to_string(),
558 requester_keypair.public_key(),
559 );
560
561 let response = node.handle_chunk_request(request.clone()).await.unwrap();
562
563 assert_eq!(response.provider_public_key, node.public_key());
565 assert_eq!(response.challenge_echo, request.challenge_nonce);
566 assert!(!response.encrypted_chunk.is_empty());
567
568 let message = [
570 &request.challenge_nonce[..],
571 &response.chunk_hash[..],
572 &request.requester_public_key[..],
573 ]
574 .concat();
575 let sig: [u8; 64] = response.provider_signature.as_slice().try_into().unwrap();
576 assert!(verify(&node.public_key(), &message, &sig).is_ok());
577 }
578
579 #[tokio::test]
580 async fn test_handle_chunk_request_verified() {
581 let temp_dir = TempDir::new().unwrap();
582 let config = NodeConfig {
583 storage_path: temp_dir.path().to_path_buf(),
584 max_storage_bytes: 10 * 1024 * 1024,
585 ..Default::default()
586 };
587
588 let mut node = ContentNode::with_storage(config).await.unwrap();
589
590 let cid = "QmTest456".to_string();
592 let test_data = b"Verified chunk test".to_vec();
593 let chunks = vec![test_data.clone()];
594 let expected_hash = hash(&test_data);
595
596 if let Some(storage_arc) = node.storage() {
598 let mut storage = storage_arc.write().await;
599 let key = generate_key();
600 let nonce = generate_nonce();
601
602 storage
603 .pin_content(&cid, &chunks, &key, &nonce)
604 .await
605 .unwrap();
606 }
607
608 let content = PinnedContent {
610 cid: cid.clone(),
611 size_bytes: test_data.len() as u64,
612 encryption_key: [0u8; 32],
613 predicted_revenue_per_gb: 10.0,
614 };
615 node.pin_content(content);
616
617 let requester_keypair = KeyPair::generate();
619 let request = create_chunk_request(
620 cid.clone(),
621 0,
622 "peer-456".to_string(),
623 requester_keypair.public_key(),
624 );
625
626 let response = node
627 .handle_chunk_request_verified(request.clone())
628 .await
629 .unwrap();
630
631 assert_eq!(response.chunk_hash, expected_hash);
633
634 let message = [
636 &request.challenge_nonce[..],
637 &response.chunk_hash[..],
638 &request.requester_public_key[..],
639 ]
640 .concat();
641 let sig: [u8; 64] = response.provider_signature.as_slice().try_into().unwrap();
642 assert!(verify(&node.public_key(), &message, &sig).is_ok());
643 }
644
645 #[tokio::test]
646 async fn test_full_bandwidth_proof_flow() {
647 let temp_dir = TempDir::new().unwrap();
648 let config = NodeConfig {
649 storage_path: temp_dir.path().to_path_buf(),
650 max_storage_bytes: 10 * 1024 * 1024,
651 ..Default::default()
652 };
653
654 let mut provider_node = ContentNode::with_storage(config).await.unwrap();
655 let requester_keypair = KeyPair::generate();
656
657 let cid = "QmFullFlow".to_string();
659 let test_data = b"Full bandwidth proof flow test data".to_vec();
660 let chunks = vec![test_data.clone()];
661
662 if let Some(storage_arc) = provider_node.storage() {
663 let mut storage = storage_arc.write().await;
664 let key = generate_key();
665 let nonce = generate_nonce();
666 storage
667 .pin_content(&cid, &chunks, &key, &nonce)
668 .await
669 .unwrap();
670 }
671
672 let content = PinnedContent {
673 cid: cid.clone(),
674 size_bytes: test_data.len() as u64,
675 encryption_key: [0u8; 32],
676 predicted_revenue_per_gb: 10.0,
677 };
678 provider_node.pin_content(content);
679
680 let start_time = chrono::Utc::now().timestamp_millis();
682 let request = create_chunk_request(
683 cid.clone(),
684 0,
685 "requester-peer".to_string(),
686 requester_keypair.public_key(),
687 );
688
689 let response = provider_node
691 .handle_chunk_request_verified(request.clone())
692 .await
693 .unwrap();
694 let end_time = chrono::Utc::now().timestamp_millis();
695
696 let requester_message = [
698 &request.challenge_nonce[..],
699 &response.chunk_hash[..],
700 &response.provider_public_key[..],
701 ]
702 .concat();
703 let requester_signature = requester_keypair.sign(&requester_message);
704
705 let proof = create_bandwidth_proof(
707 &request,
708 "provider-peer".to_string(),
709 response.provider_public_key.to_vec(),
710 response.encrypted_chunk.len() as u64,
711 response.provider_signature.clone(),
712 requester_signature.to_vec(),
713 response.chunk_hash.to_vec(),
714 start_time,
715 end_time,
716 (end_time - start_time) as u32,
717 );
718
719 assert_eq!(proof.content_cid, cid);
721 assert_eq!(proof.chunk_index, 0);
722 assert_eq!(
723 proof.bytes_transferred,
724 response.encrypted_chunk.len() as u64
725 );
726 assert_eq!(
727 proof.provider_public_key,
728 response.provider_public_key.to_vec()
729 );
730 assert_eq!(
731 proof.requester_public_key,
732 requester_keypair.public_key().to_vec()
733 );
734
735 let provider_msg = [
737 &request.challenge_nonce[..],
738 &response.chunk_hash[..],
739 &request.requester_public_key[..],
740 ]
741 .concat();
742 let prov_sig: [u8; 64] = proof.provider_signature.as_slice().try_into().unwrap();
743 assert!(verify(&provider_node.public_key(), &provider_msg, &prov_sig).is_ok());
744 let req_sig: [u8; 64] = proof.requester_signature.as_slice().try_into().unwrap();
745 assert!(
746 verify(
747 &requester_keypair.public_key(),
748 &requester_message,
749 &req_sig
750 )
751 .is_ok()
752 );
753 }
754
755 #[tokio::test]
756 async fn test_node_config_default() {
757 let config = NodeConfig::default();
758 assert_eq!(config.max_storage_bytes, 50 * 1024 * 1024 * 1024);
759 assert_eq!(config.max_bandwidth_bps, 100 * 1024 * 1024 / 8);
760 assert_eq!(config.coordinator_url, "https://coordinator.chie.network");
761 }
762}