1use crate::storage::{ChunkStorage, StorageError};
4use chie_crypto::KeyPair;
5use chie_shared::{BandwidthProof, ChunkRequest, ChunkResponse, ContentCid, Points};
6use std::collections::HashMap;
7use std::path::PathBuf;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11#[derive(Debug, Clone)]
13pub struct NodeConfig {
14 pub max_storage_bytes: u64,
16
17 pub max_bandwidth_bps: u64,
19
20 pub coordinator_url: String,
22
23 pub storage_path: PathBuf,
25}
26
27impl Default for NodeConfig {
28 fn default() -> Self {
29 Self {
30 max_storage_bytes: 50 * 1024 * 1024 * 1024, max_bandwidth_bps: 100 * 1024 * 1024 / 8, coordinator_url: "https://coordinator.chie.network".to_string(),
33 storage_path: PathBuf::from("./chie-storage"),
34 }
35 }
36}
37
38#[derive(Debug, Clone)]
40pub struct PinnedContent {
41 pub cid: ContentCid,
43
44 pub size_bytes: u64,
46
47 pub encryption_key: [u8; 32],
49
50 pub predicted_revenue_per_gb: f64,
52}
53
54#[derive(Debug, thiserror::Error)]
56pub enum NodeError {
57 #[error("Storage error: {0}")]
58 Storage(#[from] StorageError),
59
60 #[error("Content not found: {0}")]
61 ContentNotFound(String),
62
63 #[error("Network error: {0}")]
64 Network(String),
65
66 #[error("Proof submission failed: {0}")]
67 ProofSubmission(String),
68}
69
70pub struct ContentNode {
100 config: NodeConfig,
102
103 keypair: Arc<KeyPair>,
105
106 pinned_contents: HashMap<ContentCid, PinnedContent>,
108
109 earnings: Points,
111
112 storage: Option<Arc<RwLock<ChunkStorage>>>,
114
115 http_client: reqwest::Client,
117}
118
119impl ContentNode {
120 pub fn new(config: NodeConfig) -> Self {
122 let http_client = reqwest::Client::builder()
124 .pool_max_idle_per_host(10)
125 .pool_idle_timeout(std::time::Duration::from_secs(30))
126 .timeout(std::time::Duration::from_secs(30))
127 .build()
128 .expect("Failed to build HTTP client");
129
130 Self {
131 config,
132 keypair: Arc::new(KeyPair::generate()),
133 pinned_contents: HashMap::new(),
134 earnings: 0,
135 storage: None,
136 http_client,
137 }
138 }
139
140 pub async fn with_storage(config: NodeConfig) -> Result<Self, NodeError> {
142 let storage =
143 ChunkStorage::new(config.storage_path.clone(), config.max_storage_bytes).await?;
144
145 let http_client = reqwest::Client::builder()
147 .pool_max_idle_per_host(10)
148 .pool_idle_timeout(std::time::Duration::from_secs(30))
149 .timeout(std::time::Duration::from_secs(30))
150 .build()
151 .map_err(|e| NodeError::Network(format!("Failed to build HTTP client: {}", e)))?;
152
153 Ok(Self {
154 config,
155 keypair: Arc::new(KeyPair::generate()),
156 pinned_contents: HashMap::new(),
157 earnings: 0,
158 storage: Some(Arc::new(RwLock::new(storage))),
159 http_client,
160 })
161 }
162
163 #[inline]
165 pub fn set_storage(&mut self, storage: Arc<RwLock<ChunkStorage>>) {
166 self.storage = Some(storage);
167 }
168
169 #[inline]
171 pub fn storage(&self) -> Option<&Arc<RwLock<ChunkStorage>>> {
172 self.storage.as_ref()
173 }
174
175 #[inline]
177 pub fn public_key(&self) -> [u8; 32] {
178 self.keypair.public_key()
179 }
180
181 #[inline]
183 pub fn earnings(&self) -> Points {
184 self.earnings
185 }
186
187 #[inline]
189 pub fn config(&self) -> &NodeConfig {
190 &self.config
191 }
192
193 #[inline]
195 pub fn pin_content(&mut self, content: PinnedContent) {
196 self.pinned_contents.insert(content.cid.clone(), content);
197 }
198
199 #[inline]
201 pub fn unpin_content(&mut self, cid: &ContentCid) -> Option<PinnedContent> {
202 self.pinned_contents.remove(cid)
203 }
204
205 #[inline]
207 pub fn has_content(&self, cid: &ContentCid) -> bool {
208 self.pinned_contents.contains_key(cid)
209 }
210
211 #[inline]
213 pub fn pinned_count(&self) -> usize {
214 self.pinned_contents.len()
215 }
216
217 pub async fn handle_chunk_request(
222 &self,
223 request: ChunkRequest,
224 ) -> Result<ChunkResponse, NodeError> {
225 if !self.pinned_contents.contains_key(&request.content_cid) {
227 return Err(NodeError::ContentNotFound(request.content_cid.clone()));
228 }
229
230 let chunk_data = if let Some(storage) = &self.storage {
232 let storage_guard = storage.read().await;
233 storage_guard
234 .get_chunk(&request.content_cid, request.chunk_index)
235 .await?
236 } else {
237 vec![0u8; 1024]
239 };
240
241 let chunk_hash = chie_crypto::hash(&chunk_data);
242
243 let message = [
245 &request.challenge_nonce[..],
246 &chunk_hash[..],
247 &request.requester_public_key[..],
248 ]
249 .concat();
250 let signature = self.keypair.sign(&message);
251
252 Ok(ChunkResponse {
253 encrypted_chunk: chunk_data,
254 chunk_hash,
255 provider_signature: signature.to_vec(),
256 provider_public_key: self.keypair.public_key(),
257 challenge_echo: request.challenge_nonce,
258 timestamp_ms: chrono::Utc::now().timestamp_millis(),
259 })
260 }
261
262 pub async fn handle_chunk_request_verified(
266 &self,
267 request: ChunkRequest,
268 ) -> Result<ChunkResponse, NodeError> {
269 if !self.pinned_contents.contains_key(&request.content_cid) {
271 return Err(NodeError::ContentNotFound(request.content_cid.clone()));
272 }
273
274 let (chunk_data, chunk_hash) = if let Some(storage) = &self.storage {
276 let storage_guard = storage.read().await;
277 storage_guard
278 .get_chunk_verified(&request.content_cid, request.chunk_index)
279 .await?
280 } else {
281 let data = vec![0u8; 1024];
282 let hash = chie_crypto::hash(&data);
283 (data, hash)
284 };
285
286 let message = [
288 &request.challenge_nonce[..],
289 &chunk_hash[..],
290 &request.requester_public_key[..],
291 ]
292 .concat();
293 let signature = self.keypair.sign(&message);
294
295 Ok(ChunkResponse {
296 encrypted_chunk: chunk_data,
297 chunk_hash,
298 provider_signature: signature.to_vec(),
299 provider_public_key: self.keypair.public_key(),
300 challenge_echo: request.challenge_nonce,
301 timestamp_ms: chrono::Utc::now().timestamp_millis(),
302 })
303 }
304
305 pub async fn submit_proof(&self, proof: BandwidthProof) -> Result<(), NodeError> {
307 let response = self
308 .http_client
309 .post(format!("{}/api/proofs", self.config.coordinator_url))
310 .json(&proof)
311 .send()
312 .await
313 .map_err(|e| NodeError::Network(e.to_string()))?;
314
315 if !response.status().is_success() {
316 return Err(NodeError::ProofSubmission(format!(
317 "Server returned status: {}",
318 response.status()
319 )));
320 }
321
322 Ok(())
323 }
324
325 pub async fn submit_proofs_batch(&self, proofs: Vec<BandwidthProof>) -> Result<(), NodeError> {
327 let response = self
328 .http_client
329 .post(format!("{}/api/proofs/batch", self.config.coordinator_url))
330 .json(&proofs)
331 .send()
332 .await
333 .map_err(|e| NodeError::Network(e.to_string()))?;
334
335 if !response.status().is_success() {
336 return Err(NodeError::ProofSubmission(format!(
337 "Batch submission failed with status: {}",
338 response.status()
339 )));
340 }
341
342 Ok(())
343 }
344
345 pub fn add_earnings(&mut self, amount: Points) {
347 self.earnings += amount;
348 }
349
350 pub async fn storage_stats(&self) -> Option<crate::storage::StorageStats> {
352 if let Some(storage) = &self.storage {
353 let storage_guard = storage.read().await;
354 Some(storage_guard.stats())
355 } else {
356 None
357 }
358 }
359
360 pub async fn handle_chunk_requests_batch(
362 &self,
363 requests: Vec<ChunkRequest>,
364 ) -> Result<Vec<ChunkResponse>, NodeError> {
365 for request in &requests {
367 if !self.pinned_contents.contains_key(&request.content_cid) {
368 return Err(NodeError::ContentNotFound(request.content_cid.clone()));
369 }
370 }
371
372 let mut responses = Vec::with_capacity(requests.len());
374 for request in requests {
375 let response = self.handle_chunk_request(request).await?;
376 responses.push(response);
377 }
378
379 Ok(responses)
380 }
381}
382
383#[cfg(test)]
384mod tests {
385 use super::*;
386 use crate::protocol::{create_bandwidth_proof, create_chunk_request};
387 use chie_crypto::{KeyPair, generate_key, generate_nonce, hash, verify};
388 use tempfile::TempDir;
389
390 #[tokio::test]
391 async fn test_node_creation() {
392 let config = NodeConfig::default();
393 let node = ContentNode::new(config);
394
395 assert_eq!(node.earnings(), 0);
396 assert_eq!(node.pinned_count(), 0);
397 }
398
399 #[tokio::test]
400 async fn test_node_with_storage() {
401 let temp_dir = TempDir::new().unwrap();
402 let config = NodeConfig {
403 storage_path: temp_dir.path().to_path_buf(),
404 max_storage_bytes: 10 * 1024 * 1024, ..Default::default()
406 };
407
408 let node = ContentNode::with_storage(config).await.unwrap();
409 assert!(node.storage().is_some());
410
411 let stats = node.storage_stats().await.unwrap();
412 assert_eq!(stats.used_bytes, 0);
413 assert_eq!(stats.max_bytes, 10 * 1024 * 1024);
414 }
415
416 #[tokio::test]
417 async fn test_pin_unpin_content() {
418 let config = NodeConfig::default();
419 let mut node = ContentNode::new(config);
420
421 let cid = "QmTest123".to_string();
422 let content = PinnedContent {
423 cid: cid.clone(),
424 size_bytes: 1024,
425 encryption_key: [0u8; 32],
426 predicted_revenue_per_gb: 10.0,
427 };
428
429 node.pin_content(content);
430 assert!(node.has_content(&cid));
431 assert_eq!(node.pinned_count(), 1);
432
433 let unpinned = node.unpin_content(&cid);
434 assert!(unpinned.is_some());
435 assert!(!node.has_content(&cid));
436 assert_eq!(node.pinned_count(), 0);
437 }
438
439 #[tokio::test]
440 async fn test_add_earnings() {
441 let config = NodeConfig::default();
442 let mut node = ContentNode::new(config);
443
444 assert_eq!(node.earnings(), 0);
445 node.add_earnings(100);
446 assert_eq!(node.earnings(), 100);
447 node.add_earnings(50);
448 assert_eq!(node.earnings(), 150);
449 }
450
451 #[tokio::test]
452 async fn test_handle_chunk_request_without_storage() {
453 let config = NodeConfig::default();
454 let mut node = ContentNode::new(config);
455
456 let cid = "QmTest123".to_string();
457 let content = PinnedContent {
458 cid: cid.clone(),
459 size_bytes: 1024,
460 encryption_key: [0u8; 32],
461 predicted_revenue_per_gb: 10.0,
462 };
463 node.pin_content(content);
464
465 let requester_keypair = KeyPair::generate();
466 let request = create_chunk_request(
467 cid.clone(),
468 0,
469 "peer-123".to_string(),
470 requester_keypair.public_key(),
471 );
472
473 let response = node.handle_chunk_request(request.clone()).await.unwrap();
474
475 assert_eq!(response.provider_public_key, node.public_key());
477 assert_eq!(response.challenge_echo, request.challenge_nonce);
478 assert_eq!(response.encrypted_chunk.len(), 1024); let message = [
482 &request.challenge_nonce[..],
483 &response.chunk_hash[..],
484 &request.requester_public_key[..],
485 ]
486 .concat();
487 let sig: [u8; 64] = response.provider_signature.as_slice().try_into().unwrap();
488 assert!(verify(&node.public_key(), &message, &sig).is_ok());
489 }
490
491 #[tokio::test]
492 async fn test_handle_chunk_request_content_not_found() {
493 let config = NodeConfig::default();
494 let node = ContentNode::new(config);
495
496 let requester_keypair = KeyPair::generate();
497 let request = create_chunk_request(
498 "QmNonExistent".to_string(),
499 0,
500 "peer-123".to_string(),
501 requester_keypair.public_key(),
502 );
503
504 let result = node.handle_chunk_request(request).await;
505 assert!(result.is_err());
506 assert!(matches!(result.unwrap_err(), NodeError::ContentNotFound(_)));
507 }
508
509 #[tokio::test]
510 async fn test_handle_chunk_request_with_storage() {
511 let temp_dir = TempDir::new().unwrap();
512 let config = NodeConfig {
513 storage_path: temp_dir.path().to_path_buf(),
514 max_storage_bytes: 10 * 1024 * 1024,
515 ..Default::default()
516 };
517
518 let mut node = ContentNode::with_storage(config).await.unwrap();
519
520 let cid = "QmTest123".to_string();
522 let test_data = b"Hello, CHIE Protocol!".to_vec();
523 let chunks = vec![test_data.clone()];
524
525 if let Some(storage_arc) = node.storage() {
527 let mut storage = storage_arc.write().await;
528 let key = generate_key();
529 let nonce = generate_nonce();
530
531 storage
532 .pin_content(&cid, &chunks, &key, &nonce)
533 .await
534 .unwrap();
535 }
536
537 let content = PinnedContent {
539 cid: cid.clone(),
540 size_bytes: test_data.len() as u64,
541 encryption_key: [0u8; 32],
542 predicted_revenue_per_gb: 10.0,
543 };
544 node.pin_content(content);
545
546 let requester_keypair = KeyPair::generate();
548 let request = create_chunk_request(
549 cid.clone(),
550 0,
551 "peer-123".to_string(),
552 requester_keypair.public_key(),
553 );
554
555 let response = node.handle_chunk_request(request.clone()).await.unwrap();
556
557 assert_eq!(response.provider_public_key, node.public_key());
559 assert_eq!(response.challenge_echo, request.challenge_nonce);
560 assert!(!response.encrypted_chunk.is_empty());
561
562 let message = [
564 &request.challenge_nonce[..],
565 &response.chunk_hash[..],
566 &request.requester_public_key[..],
567 ]
568 .concat();
569 let sig: [u8; 64] = response.provider_signature.as_slice().try_into().unwrap();
570 assert!(verify(&node.public_key(), &message, &sig).is_ok());
571 }
572
573 #[tokio::test]
574 async fn test_handle_chunk_request_verified() {
575 let temp_dir = TempDir::new().unwrap();
576 let config = NodeConfig {
577 storage_path: temp_dir.path().to_path_buf(),
578 max_storage_bytes: 10 * 1024 * 1024,
579 ..Default::default()
580 };
581
582 let mut node = ContentNode::with_storage(config).await.unwrap();
583
584 let cid = "QmTest456".to_string();
586 let test_data = b"Verified chunk test".to_vec();
587 let chunks = vec![test_data.clone()];
588 let expected_hash = hash(&test_data);
589
590 if let Some(storage_arc) = node.storage() {
592 let mut storage = storage_arc.write().await;
593 let key = generate_key();
594 let nonce = generate_nonce();
595
596 storage
597 .pin_content(&cid, &chunks, &key, &nonce)
598 .await
599 .unwrap();
600 }
601
602 let content = PinnedContent {
604 cid: cid.clone(),
605 size_bytes: test_data.len() as u64,
606 encryption_key: [0u8; 32],
607 predicted_revenue_per_gb: 10.0,
608 };
609 node.pin_content(content);
610
611 let requester_keypair = KeyPair::generate();
613 let request = create_chunk_request(
614 cid.clone(),
615 0,
616 "peer-456".to_string(),
617 requester_keypair.public_key(),
618 );
619
620 let response = node
621 .handle_chunk_request_verified(request.clone())
622 .await
623 .unwrap();
624
625 assert_eq!(response.chunk_hash, expected_hash);
627
628 let message = [
630 &request.challenge_nonce[..],
631 &response.chunk_hash[..],
632 &request.requester_public_key[..],
633 ]
634 .concat();
635 let sig: [u8; 64] = response.provider_signature.as_slice().try_into().unwrap();
636 assert!(verify(&node.public_key(), &message, &sig).is_ok());
637 }
638
639 #[tokio::test]
640 async fn test_full_bandwidth_proof_flow() {
641 let temp_dir = TempDir::new().unwrap();
642 let config = NodeConfig {
643 storage_path: temp_dir.path().to_path_buf(),
644 max_storage_bytes: 10 * 1024 * 1024,
645 ..Default::default()
646 };
647
648 let mut provider_node = ContentNode::with_storage(config).await.unwrap();
649 let requester_keypair = KeyPair::generate();
650
651 let cid = "QmFullFlow".to_string();
653 let test_data = b"Full bandwidth proof flow test data".to_vec();
654 let chunks = vec![test_data.clone()];
655
656 if let Some(storage_arc) = provider_node.storage() {
657 let mut storage = storage_arc.write().await;
658 let key = generate_key();
659 let nonce = generate_nonce();
660 storage
661 .pin_content(&cid, &chunks, &key, &nonce)
662 .await
663 .unwrap();
664 }
665
666 let content = PinnedContent {
667 cid: cid.clone(),
668 size_bytes: test_data.len() as u64,
669 encryption_key: [0u8; 32],
670 predicted_revenue_per_gb: 10.0,
671 };
672 provider_node.pin_content(content);
673
674 let start_time = chrono::Utc::now().timestamp_millis();
676 let request = create_chunk_request(
677 cid.clone(),
678 0,
679 "requester-peer".to_string(),
680 requester_keypair.public_key(),
681 );
682
683 let response = provider_node
685 .handle_chunk_request_verified(request.clone())
686 .await
687 .unwrap();
688 let end_time = chrono::Utc::now().timestamp_millis();
689
690 let requester_message = [
692 &request.challenge_nonce[..],
693 &response.chunk_hash[..],
694 &response.provider_public_key[..],
695 ]
696 .concat();
697 let requester_signature = requester_keypair.sign(&requester_message);
698
699 let proof = create_bandwidth_proof(
701 &request,
702 "provider-peer".to_string(),
703 response.provider_public_key.to_vec(),
704 response.encrypted_chunk.len() as u64,
705 response.provider_signature.clone(),
706 requester_signature.to_vec(),
707 response.chunk_hash.to_vec(),
708 start_time,
709 end_time,
710 (end_time - start_time) as u32,
711 );
712
713 assert_eq!(proof.content_cid, cid);
715 assert_eq!(proof.chunk_index, 0);
716 assert_eq!(
717 proof.bytes_transferred,
718 response.encrypted_chunk.len() as u64
719 );
720 assert_eq!(
721 proof.provider_public_key,
722 response.provider_public_key.to_vec()
723 );
724 assert_eq!(
725 proof.requester_public_key,
726 requester_keypair.public_key().to_vec()
727 );
728
729 let provider_msg = [
731 &request.challenge_nonce[..],
732 &response.chunk_hash[..],
733 &request.requester_public_key[..],
734 ]
735 .concat();
736 let prov_sig: [u8; 64] = proof.provider_signature.as_slice().try_into().unwrap();
737 assert!(verify(&provider_node.public_key(), &provider_msg, &prov_sig).is_ok());
738 let req_sig: [u8; 64] = proof.requester_signature.as_slice().try_into().unwrap();
739 assert!(
740 verify(
741 &requester_keypair.public_key(),
742 &requester_message,
743 &req_sig
744 )
745 .is_ok()
746 );
747 }
748
749 #[tokio::test]
750 async fn test_node_config_default() {
751 let config = NodeConfig::default();
752 assert_eq!(config.max_storage_bytes, 50 * 1024 * 1024 * 1024);
753 assert_eq!(config.max_bandwidth_bps, 100 * 1024 * 1024 / 8);
754 assert_eq!(config.coordinator_url, "https://coordinator.chie.network");
755 }
756}