1use crate::traits::BlockStore;
12use async_trait::async_trait;
13use dashmap::DashMap;
14use ipfrs_core::{Block, Cid, Error, Result};
15use serde::{Deserialize, Serialize};
16use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tracing::{debug, warn};
20
21pub type BackendId = String;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
26pub enum RoutingStrategy {
27 RoundRobin,
29 SizeBased,
31 LeastLoaded,
33 CostAware,
35 LatencyAware,
37 Replicated,
39 ConsistentHash,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct BackendConfig {
46 pub id: BackendId,
48 pub priority: u8,
50 pub capacity: u64,
52 pub used: u64,
54 pub cost_per_gb: f64,
56 pub avg_latency_ms: f64,
58 pub size_threshold: Option<u64>,
60 pub healthy: bool,
62 pub read_enabled: bool,
64 pub write_enabled: bool,
66}
67
68impl Default for BackendConfig {
69 fn default() -> Self {
70 Self {
71 id: "default".to_string(),
72 priority: 100,
73 capacity: 0,
74 used: 0,
75 cost_per_gb: 0.0,
76 avg_latency_ms: 10.0,
77 size_threshold: None,
78 healthy: true,
79 read_enabled: true,
80 write_enabled: true,
81 }
82 }
83}
84
85#[derive(Debug, Default)]
87pub struct BackendStats {
88 pub reads: AtomicU64,
90 pub writes: AtomicU64,
92 pub bytes_read: AtomicU64,
94 pub bytes_written: AtomicU64,
96 pub errors: AtomicU64,
98 pub last_health_check: parking_lot::Mutex<Option<Instant>>,
100}
101
102impl BackendStats {
103 fn record_read(&self, bytes: u64) {
104 self.reads.fetch_add(1, Ordering::Relaxed);
105 self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
106 }
107
108 fn record_write(&self, bytes: u64) {
109 self.writes.fetch_add(1, Ordering::Relaxed);
110 self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
111 }
112
113 fn record_error(&self) {
114 self.errors.fetch_add(1, Ordering::Relaxed);
115 }
116
117 fn update_health_check(&self) {
118 *self.last_health_check.lock() = Some(Instant::now());
119 }
120}
121
122struct Backend<S: BlockStore> {
124 store: Arc<S>,
125 config: parking_lot::RwLock<BackendConfig>,
126 stats: BackendStats,
127}
128
129#[derive(Debug, Clone)]
131pub struct PoolConfig {
132 pub strategy: RoutingStrategy,
134 pub replication_factor: usize,
136 pub health_check_interval: Duration,
138 pub auto_failover: bool,
140 pub min_healthy_backends: usize,
142}
143
144impl Default for PoolConfig {
145 fn default() -> Self {
146 Self {
147 strategy: RoutingStrategy::RoundRobin,
148 replication_factor: 1,
149 health_check_interval: Duration::from_secs(30),
150 auto_failover: true,
151 min_healthy_backends: 1,
152 }
153 }
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct PoolStats {
159 pub total_backends: usize,
161 pub healthy_backends: usize,
163 pub total_capacity: u64,
165 pub total_used: u64,
167 pub total_reads: u64,
169 pub total_writes: u64,
171 pub total_errors: u64,
173 pub avg_cost_per_gb: f64,
175 pub avg_latency_ms: f64,
177}
178
179pub struct StoragePool<S: BlockStore> {
183 backends: DashMap<BackendId, Backend<S>>,
184 config: parking_lot::RwLock<PoolConfig>,
185 round_robin_counter: AtomicUsize,
186 cid_map: DashMap<Cid, Vec<BackendId>>,
188}
189
190impl<S: BlockStore> StoragePool<S> {
191 pub fn new(config: PoolConfig) -> Self {
193 Self {
194 backends: DashMap::new(),
195 config: parking_lot::RwLock::new(config),
196 round_robin_counter: AtomicUsize::new(0),
197 cid_map: DashMap::new(),
198 }
199 }
200
201 pub fn add_backend(&self, config: BackendConfig, store: Arc<S>) {
203 let id = config.id.clone();
204 let backend = Backend {
205 store,
206 config: parking_lot::RwLock::new(config),
207 stats: BackendStats::default(),
208 };
209 self.backends.insert(id.clone(), backend);
210 debug!("Added backend to pool: {}", id);
211 }
212
213 pub fn remove_backend(&self, id: &str) -> Option<Arc<S>> {
215 self.backends.remove(id).map(|(_, backend)| backend.store)
216 }
217
218 pub fn get_backend_config(&self, id: &str) -> Option<BackendConfig> {
220 self.backends
221 .get(id)
222 .map(|backend| backend.config.read().clone())
223 }
224
225 pub fn update_backend_config(&self, id: &str, config: BackendConfig) -> Result<()> {
227 let backend = self
228 .backends
229 .get(id)
230 .ok_or_else(|| Error::Storage(format!("Backend not found: {}", id)))?;
231 *backend.config.write() = config;
232 Ok(())
233 }
234
235 pub fn set_backend_health(&self, id: &str, healthy: bool) -> Result<()> {
237 let backend = self
238 .backends
239 .get(id)
240 .ok_or_else(|| Error::Storage(format!("Backend not found: {}", id)))?;
241 backend.config.write().healthy = healthy;
242 backend.stats.update_health_check();
243 debug!("Backend {} health set to: {}", id, healthy);
244 Ok(())
245 }
246
247 pub fn stats(&self) -> PoolStats {
249 let mut total_capacity = 0u64;
250 let mut total_used = 0u64;
251 let mut total_reads = 0u64;
252 let mut total_writes = 0u64;
253 let mut total_errors = 0u64;
254 let mut total_cost = 0.0;
255 let mut total_latency = 0.0;
256 let mut healthy_count = 0;
257 let total_count = self.backends.len();
258
259 for backend in self.backends.iter() {
260 let config = backend.config.read();
261 let stats = &backend.stats;
262
263 if config.healthy {
264 healthy_count += 1;
265 }
266
267 total_capacity += config.capacity;
268 total_used += config.used;
269 total_reads += stats.reads.load(Ordering::Relaxed);
270 total_writes += stats.writes.load(Ordering::Relaxed);
271 total_errors += stats.errors.load(Ordering::Relaxed);
272 total_cost += config.cost_per_gb;
273 total_latency += config.avg_latency_ms;
274 }
275
276 let avg_cost_per_gb = if total_count > 0 {
277 total_cost / total_count as f64
278 } else {
279 0.0
280 };
281
282 let avg_latency_ms = if total_count > 0 {
283 total_latency / total_count as f64
284 } else {
285 0.0
286 };
287
288 PoolStats {
289 total_backends: total_count,
290 healthy_backends: healthy_count,
291 total_capacity,
292 total_used,
293 total_reads,
294 total_writes,
295 total_errors,
296 avg_cost_per_gb,
297 avg_latency_ms,
298 }
299 }
300
301 #[allow(dead_code)]
303 fn select_backends_for_write(&self, cid: &Cid, data_size: usize) -> Vec<BackendId> {
304 let config = self.config.read();
305 let strategy = config.strategy;
306 let replication_factor = config.replication_factor;
307
308 match strategy {
309 RoutingStrategy::RoundRobin => self.select_round_robin(replication_factor),
310 RoutingStrategy::SizeBased => self.select_size_based(data_size, replication_factor),
311 RoutingStrategy::LeastLoaded => self.select_least_loaded(replication_factor),
312 RoutingStrategy::CostAware => self.select_cost_aware(replication_factor),
313 RoutingStrategy::LatencyAware => self.select_latency_aware(replication_factor),
314 RoutingStrategy::Replicated => self.select_all_healthy(),
315 RoutingStrategy::ConsistentHash => self.select_consistent_hash(cid, replication_factor),
316 }
317 }
318
319 fn select_round_robin(&self, count: usize) -> Vec<BackendId> {
321 let healthy: Vec<_> = self
322 .backends
323 .iter()
324 .filter(|b| b.config.read().healthy && b.config.read().write_enabled)
325 .map(|b| b.config.read().id.clone())
326 .collect();
327
328 if healthy.is_empty() {
329 return Vec::new();
330 }
331
332 let mut selected = Vec::new();
333 for _ in 0..count.min(healthy.len()) {
334 let idx = self.round_robin_counter.fetch_add(1, Ordering::Relaxed) % healthy.len();
335 selected.push(healthy[idx].clone());
336 }
337 selected
338 }
339
340 fn select_size_based(&self, data_size: usize, count: usize) -> Vec<BackendId> {
342 let mut candidates: Vec<_> = self
343 .backends
344 .iter()
345 .filter_map(|b| {
346 let config = b.config.read();
347 if !config.healthy || !config.write_enabled {
348 return None;
349 }
350
351 let matches_size = if let Some(threshold) = config.size_threshold {
352 if data_size >= threshold as usize {
353 config.priority >= 50 } else {
355 config.priority > 50 }
357 } else {
358 true
359 };
360
361 if matches_size {
362 Some((config.id.clone(), config.priority))
363 } else {
364 None
365 }
366 })
367 .collect();
368
369 candidates.sort_by(|a, b| b.1.cmp(&a.1));
370 candidates
371 .into_iter()
372 .take(count)
373 .map(|(id, _)| id)
374 .collect()
375 }
376
377 fn select_least_loaded(&self, count: usize) -> Vec<BackendId> {
379 let mut candidates: Vec<_> = self
380 .backends
381 .iter()
382 .filter_map(|b| {
383 let config = b.config.read();
384 if !config.healthy || !config.write_enabled {
385 return None;
386 }
387
388 let load = if config.capacity > 0 {
389 (config.used as f64 / config.capacity as f64 * 100.0) as u64
390 } else {
391 0
392 };
393
394 Some((config.id.clone(), load))
395 })
396 .collect();
397
398 candidates.sort_by_key(|(_, load)| *load);
399 candidates
400 .into_iter()
401 .take(count)
402 .map(|(id, _)| id)
403 .collect()
404 }
405
406 fn select_cost_aware(&self, count: usize) -> Vec<BackendId> {
408 let mut candidates: Vec<_> = self
409 .backends
410 .iter()
411 .filter_map(|b| {
412 let config = b.config.read();
413 if !config.healthy || !config.write_enabled {
414 return None;
415 }
416 Some((config.id.clone(), (config.cost_per_gb * 1000.0) as u64))
417 })
418 .collect();
419
420 candidates.sort_by_key(|(_, cost)| *cost);
421 candidates
422 .into_iter()
423 .take(count)
424 .map(|(id, _)| id)
425 .collect()
426 }
427
428 fn select_latency_aware(&self, count: usize) -> Vec<BackendId> {
430 let mut candidates: Vec<_> = self
431 .backends
432 .iter()
433 .filter_map(|b| {
434 let config = b.config.read();
435 if !config.healthy || !config.read_enabled {
436 return None;
437 }
438 Some((config.id.clone(), (config.avg_latency_ms * 1000.0) as u64))
439 })
440 .collect();
441
442 candidates.sort_by_key(|(_, latency)| *latency);
443 candidates
444 .into_iter()
445 .take(count)
446 .map(|(id, _)| id)
447 .collect()
448 }
449
450 fn select_all_healthy(&self) -> Vec<BackendId> {
452 self.backends
453 .iter()
454 .filter_map(|b| {
455 let config = b.config.read();
456 if config.healthy && config.write_enabled {
457 Some(config.id.clone())
458 } else {
459 None
460 }
461 })
462 .collect()
463 }
464
465 fn select_consistent_hash(&self, cid: &Cid, count: usize) -> Vec<BackendId> {
467 let healthy: Vec<_> = self
468 .backends
469 .iter()
470 .filter_map(|b| {
471 let config = b.config.read();
472 if config.healthy && config.write_enabled {
473 Some(config.id.clone())
474 } else {
475 None
476 }
477 })
478 .collect();
479
480 if healthy.is_empty() {
481 return Vec::new();
482 }
483
484 let cid_bytes = cid.to_bytes();
486 let hash = cid_bytes
487 .iter()
488 .fold(0u64, |acc, &b| acc.wrapping_mul(31).wrapping_add(b as u64));
489
490 let mut selected = Vec::new();
491 for i in 0..count.min(healthy.len()) {
492 let idx = ((hash + i as u64) % healthy.len() as u64) as usize;
493 selected.push(healthy[idx].clone());
494 }
495 selected
496 }
497
498 fn get_backends_for_cid(&self, cid: &Cid) -> Vec<BackendId> {
500 self.cid_map
501 .get(cid)
502 .map(|backends| backends.clone())
503 .unwrap_or_default()
504 }
505
506 fn record_cid_location(&self, cid: Cid, backend_id: BackendId) {
508 self.cid_map
509 .entry(cid)
510 .or_insert_with(Vec::new)
511 .push(backend_id);
512 }
513}
514
515#[async_trait]
516impl<S: BlockStore + Send + Sync + 'static> BlockStore for StoragePool<S> {
517 async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
518 let known_backends = self.get_backends_for_cid(cid);
520
521 for backend_id in known_backends {
522 if let Some(backend) = self.backends.get(&backend_id) {
523 let (healthy, read_enabled) = {
524 let config = backend.config.read();
525 (config.healthy, config.read_enabled)
526 };
527
528 if !healthy || !read_enabled {
529 continue;
530 }
531
532 match backend.store.get(cid).await {
533 Ok(Some(block)) => {
534 backend.stats.record_read(block.data().len() as u64);
535 return Ok(Some(block));
536 }
537 Ok(None) => continue,
538 Err(e) => {
539 warn!("Backend {} failed to get CID: {}", backend_id, e);
540 backend.stats.record_error();
541 }
542 }
543 }
544 }
545
546 for backend in self.backends.iter() {
548 let (healthy, read_enabled, backend_id) = {
549 let config = backend.config.read();
550 (config.healthy, config.read_enabled, config.id.clone())
551 };
552
553 if !healthy || !read_enabled {
554 continue;
555 }
556
557 match backend.store.get(cid).await {
558 Ok(Some(block)) => {
559 backend.stats.record_read(block.data().len() as u64);
560 self.record_cid_location(*cid, backend_id);
562 return Ok(Some(block));
563 }
564 Ok(None) => continue,
565 Err(_) => {
566 backend.stats.record_error();
567 }
568 }
569 }
570
571 Ok(None)
572 }
573
574 async fn put(&self, block: &Block) -> Result<()> {
575 let cid = block.cid();
576 let data_size = block.data().len();
577 let backends = self.select_backends_for_write(cid, data_size);
578
579 if backends.is_empty() {
580 return Err(Error::Storage(
581 "No healthy backends available for write".to_string(),
582 ));
583 }
584
585 let mut errors = Vec::new();
586 let mut success_count = 0;
587
588 for backend_id in &backends {
589 if let Some(backend) = self.backends.get(backend_id) {
590 match backend.store.put(block).await {
591 Ok(()) => {
592 backend.stats.record_write(data_size as u64);
593 self.record_cid_location(*cid, backend_id.clone());
594 success_count += 1;
595 }
596 Err(e) => {
597 backend.stats.record_error();
598 errors.push((backend_id.clone(), e));
599 }
600 }
601 }
602 }
603
604 if success_count == 0 {
605 return Err(Error::Storage(format!(
606 "Failed to write to any backend: {} errors",
607 errors.len()
608 )));
609 }
610
611 Ok(())
612 }
613
614 async fn has(&self, cid: &Cid) -> Result<bool> {
615 let known_backends = self.get_backends_for_cid(cid);
617
618 for backend_id in known_backends {
619 if let Some(backend) = self.backends.get(&backend_id) {
620 if !backend.config.read().healthy {
621 continue;
622 }
623
624 if let Ok(true) = backend.store.has(cid).await {
625 return Ok(true);
626 }
627 }
628 }
629
630 for backend in self.backends.iter() {
632 if !backend.config.read().healthy {
633 continue;
634 }
635
636 if let Ok(true) = backend.store.has(cid).await {
637 self.record_cid_location(*cid, backend.config.read().id.clone());
639 return Ok(true);
640 }
641 }
642
643 Ok(false)
644 }
645
646 async fn delete(&self, cid: &Cid) -> Result<()> {
647 let backends = self.get_backends_for_cid(cid);
648
649 if backends.is_empty() {
650 for backend in self.backends.iter() {
652 let _ = backend.store.delete(cid).await;
653 }
654 } else {
655 for backend_id in &backends {
656 if let Some(backend) = self.backends.get(backend_id) {
657 let _ = backend.store.delete(cid).await;
658 }
659 }
660 }
661
662 self.cid_map.remove(cid);
664 Ok(())
665 }
666
667 fn list_cids(&self) -> Result<Vec<Cid>> {
668 let cids: Vec<Cid> = self.cid_map.iter().map(|entry| *entry.key()).collect();
670 Ok(cids)
671 }
672
673 fn len(&self) -> usize {
674 self.cid_map.len()
675 }
676}
677
678#[cfg(test)]
679mod tests {
680 use super::*;
681 use crate::memory::MemoryBlockStore;
682 use bytes::Bytes;
683
684 #[tokio::test]
685 async fn test_pool_basic() {
686 let pool = StoragePool::new(PoolConfig::default());
687
688 let backend1 = Arc::new(MemoryBlockStore::new());
689 let config1 = BackendConfig {
690 id: "backend1".to_string(),
691 ..Default::default()
692 };
693
694 pool.add_backend(config1, backend1);
695
696 let data = Bytes::from_static(b"test data");
697 let block = Block::new(data).unwrap();
698 let cid = block.cid();
699
700 pool.put(&block).await.unwrap();
701 assert!(pool.has(cid).await.unwrap());
702
703 let retrieved = pool.get(cid).await.unwrap();
704 assert!(retrieved.is_some());
705 assert_eq!(retrieved.unwrap().data(), block.data());
706 }
707
708 #[tokio::test]
709 async fn test_pool_replicated() {
710 let config = PoolConfig {
711 strategy: RoutingStrategy::Replicated,
712 ..Default::default()
713 };
714 let pool = StoragePool::new(config);
715
716 let backend1 = Arc::new(MemoryBlockStore::new());
717 let backend2 = Arc::new(MemoryBlockStore::new());
718
719 pool.add_backend(
720 BackendConfig {
721 id: "backend1".to_string(),
722 ..Default::default()
723 },
724 backend1.clone(),
725 );
726
727 pool.add_backend(
728 BackendConfig {
729 id: "backend2".to_string(),
730 ..Default::default()
731 },
732 backend2.clone(),
733 );
734
735 let data = Bytes::from_static(b"test data");
736 let block = Block::new(data).unwrap();
737 let cid = block.cid();
738
739 pool.put(&block).await.unwrap();
740
741 assert!(backend1.has(cid).await.unwrap());
743 assert!(backend2.has(cid).await.unwrap());
744 }
745
746 #[tokio::test]
747 async fn test_pool_stats() {
748 let pool = StoragePool::new(PoolConfig::default());
749
750 let backend1 = Arc::new(MemoryBlockStore::new());
751 pool.add_backend(
752 BackendConfig {
753 id: "backend1".to_string(),
754 capacity: 1000,
755 cost_per_gb: 0.023,
756 ..Default::default()
757 },
758 backend1,
759 );
760
761 let stats = pool.stats();
762 assert_eq!(stats.total_backends, 1);
763 assert_eq!(stats.healthy_backends, 1);
764 assert_eq!(stats.total_capacity, 1000);
765 }
766}