ipfrs_storage/
pool.rs

1//! Storage Pool Manager for multi-backend routing
2//!
3//! This module provides a storage pool that manages multiple backends with
4//! intelligent routing strategies including:
5//! - Load balancing across backends
6//! - Size-based routing (small blocks to fast storage, large to cold)
7//! - Cost-aware routing for cloud storage
8//! - Automatic failover and redundancy
9//! - Backend health monitoring
10
11use crate::traits::BlockStore;
12use async_trait::async_trait;
13use dashmap::DashMap;
14use ipfrs_core::{Block, Cid, Error, Result};
15use serde::{Deserialize, Serialize};
16use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tracing::{debug, warn};
20
21/// Backend identifier
22pub type BackendId = String;
23
24/// Routing strategy for selecting backends
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
26pub enum RoutingStrategy {
27    /// Round-robin load balancing
28    RoundRobin,
29    /// Route based on block size (small to fast, large to slow)
30    SizeBased,
31    /// Route to least loaded backend
32    LeastLoaded,
33    /// Route to lowest cost backend
34    CostAware,
35    /// Route to geographically closest backend
36    LatencyAware,
37    /// Replicate to all backends
38    Replicated,
39    /// Hash-based consistent hashing
40    ConsistentHash,
41}
42
43/// Backend configuration
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct BackendConfig {
46    /// Unique backend identifier
47    pub id: BackendId,
48    /// Backend priority (higher = preferred)
49    pub priority: u8,
50    /// Maximum capacity in bytes (0 = unlimited)
51    pub capacity: u64,
52    /// Current used bytes
53    pub used: u64,
54    /// Cost per GB per month (for cost-aware routing)
55    pub cost_per_gb: f64,
56    /// Average read latency in milliseconds
57    pub avg_latency_ms: f64,
58    /// Block size threshold (route blocks larger than this)
59    pub size_threshold: Option<u64>,
60    /// Whether this backend is healthy
61    pub healthy: bool,
62    /// Whether to use for reads
63    pub read_enabled: bool,
64    /// Whether to use for writes
65    pub write_enabled: bool,
66}
67
68impl Default for BackendConfig {
69    fn default() -> Self {
70        Self {
71            id: "default".to_string(),
72            priority: 100,
73            capacity: 0,
74            used: 0,
75            cost_per_gb: 0.0,
76            avg_latency_ms: 10.0,
77            size_threshold: None,
78            healthy: true,
79            read_enabled: true,
80            write_enabled: true,
81        }
82    }
83}
84
85/// Backend statistics
86#[derive(Debug, Default)]
87pub struct BackendStats {
88    /// Total read operations
89    pub reads: AtomicU64,
90    /// Total write operations
91    pub writes: AtomicU64,
92    /// Total bytes read
93    pub bytes_read: AtomicU64,
94    /// Total bytes written
95    pub bytes_written: AtomicU64,
96    /// Total errors
97    pub errors: AtomicU64,
98    /// Last health check time
99    pub last_health_check: parking_lot::Mutex<Option<Instant>>,
100}
101
102impl BackendStats {
103    fn record_read(&self, bytes: u64) {
104        self.reads.fetch_add(1, Ordering::Relaxed);
105        self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
106    }
107
108    fn record_write(&self, bytes: u64) {
109        self.writes.fetch_add(1, Ordering::Relaxed);
110        self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
111    }
112
113    fn record_error(&self) {
114        self.errors.fetch_add(1, Ordering::Relaxed);
115    }
116
117    fn update_health_check(&self) {
118        *self.last_health_check.lock() = Some(Instant::now());
119    }
120}
121
122/// Backend wrapper with metadata
123struct Backend<S: BlockStore> {
124    store: Arc<S>,
125    config: parking_lot::RwLock<BackendConfig>,
126    stats: BackendStats,
127}
128
129/// Storage pool configuration
130#[derive(Debug, Clone)]
131pub struct PoolConfig {
132    /// Routing strategy
133    pub strategy: RoutingStrategy,
134    /// Replication factor (for replicated strategy)
135    pub replication_factor: usize,
136    /// Health check interval
137    pub health_check_interval: Duration,
138    /// Enable automatic failover
139    pub auto_failover: bool,
140    /// Minimum healthy backends required
141    pub min_healthy_backends: usize,
142}
143
144impl Default for PoolConfig {
145    fn default() -> Self {
146        Self {
147            strategy: RoutingStrategy::RoundRobin,
148            replication_factor: 1,
149            health_check_interval: Duration::from_secs(30),
150            auto_failover: true,
151            min_healthy_backends: 1,
152        }
153    }
154}
155
156/// Storage pool statistics
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct PoolStats {
159    /// Total backends
160    pub total_backends: usize,
161    /// Healthy backends
162    pub healthy_backends: usize,
163    /// Total capacity
164    pub total_capacity: u64,
165    /// Total used
166    pub total_used: u64,
167    /// Total reads
168    pub total_reads: u64,
169    /// Total writes
170    pub total_writes: u64,
171    /// Total errors
172    pub total_errors: u64,
173    /// Average cost per GB
174    pub avg_cost_per_gb: f64,
175    /// Average latency
176    pub avg_latency_ms: f64,
177}
178
179/// Storage Pool Manager
180///
181/// Manages multiple storage backends with intelligent routing
182pub struct StoragePool<S: BlockStore> {
183    backends: DashMap<BackendId, Backend<S>>,
184    config: parking_lot::RwLock<PoolConfig>,
185    round_robin_counter: AtomicUsize,
186    /// CID to backend mapping (for tracking where blocks are stored)
187    cid_map: DashMap<Cid, Vec<BackendId>>,
188}
189
190impl<S: BlockStore> StoragePool<S> {
191    /// Create a new storage pool
192    pub fn new(config: PoolConfig) -> Self {
193        Self {
194            backends: DashMap::new(),
195            config: parking_lot::RwLock::new(config),
196            round_robin_counter: AtomicUsize::new(0),
197            cid_map: DashMap::new(),
198        }
199    }
200
201    /// Add a backend to the pool
202    pub fn add_backend(&self, config: BackendConfig, store: Arc<S>) {
203        let id = config.id.clone();
204        let backend = Backend {
205            store,
206            config: parking_lot::RwLock::new(config),
207            stats: BackendStats::default(),
208        };
209        self.backends.insert(id.clone(), backend);
210        debug!("Added backend to pool: {}", id);
211    }
212
213    /// Remove a backend from the pool
214    pub fn remove_backend(&self, id: &str) -> Option<Arc<S>> {
215        self.backends.remove(id).map(|(_, backend)| backend.store)
216    }
217
218    /// Get backend configuration
219    pub fn get_backend_config(&self, id: &str) -> Option<BackendConfig> {
220        self.backends
221            .get(id)
222            .map(|backend| backend.config.read().clone())
223    }
224
225    /// Update backend configuration
226    pub fn update_backend_config(&self, id: &str, config: BackendConfig) -> Result<()> {
227        let backend = self
228            .backends
229            .get(id)
230            .ok_or_else(|| Error::Storage(format!("Backend not found: {}", id)))?;
231        *backend.config.write() = config;
232        Ok(())
233    }
234
235    /// Mark backend as healthy or unhealthy
236    pub fn set_backend_health(&self, id: &str, healthy: bool) -> Result<()> {
237        let backend = self
238            .backends
239            .get(id)
240            .ok_or_else(|| Error::Storage(format!("Backend not found: {}", id)))?;
241        backend.config.write().healthy = healthy;
242        backend.stats.update_health_check();
243        debug!("Backend {} health set to: {}", id, healthy);
244        Ok(())
245    }
246
247    /// Get pool statistics
248    pub fn stats(&self) -> PoolStats {
249        let mut total_capacity = 0u64;
250        let mut total_used = 0u64;
251        let mut total_reads = 0u64;
252        let mut total_writes = 0u64;
253        let mut total_errors = 0u64;
254        let mut total_cost = 0.0;
255        let mut total_latency = 0.0;
256        let mut healthy_count = 0;
257        let total_count = self.backends.len();
258
259        for backend in self.backends.iter() {
260            let config = backend.config.read();
261            let stats = &backend.stats;
262
263            if config.healthy {
264                healthy_count += 1;
265            }
266
267            total_capacity += config.capacity;
268            total_used += config.used;
269            total_reads += stats.reads.load(Ordering::Relaxed);
270            total_writes += stats.writes.load(Ordering::Relaxed);
271            total_errors += stats.errors.load(Ordering::Relaxed);
272            total_cost += config.cost_per_gb;
273            total_latency += config.avg_latency_ms;
274        }
275
276        let avg_cost_per_gb = if total_count > 0 {
277            total_cost / total_count as f64
278        } else {
279            0.0
280        };
281
282        let avg_latency_ms = if total_count > 0 {
283            total_latency / total_count as f64
284        } else {
285            0.0
286        };
287
288        PoolStats {
289            total_backends: total_count,
290            healthy_backends: healthy_count,
291            total_capacity,
292            total_used,
293            total_reads,
294            total_writes,
295            total_errors,
296            avg_cost_per_gb,
297            avg_latency_ms,
298        }
299    }
300
301    /// Select backends for a write operation
302    #[allow(dead_code)]
303    fn select_backends_for_write(&self, cid: &Cid, data_size: usize) -> Vec<BackendId> {
304        let config = self.config.read();
305        let strategy = config.strategy;
306        let replication_factor = config.replication_factor;
307
308        match strategy {
309            RoutingStrategy::RoundRobin => self.select_round_robin(replication_factor),
310            RoutingStrategy::SizeBased => self.select_size_based(data_size, replication_factor),
311            RoutingStrategy::LeastLoaded => self.select_least_loaded(replication_factor),
312            RoutingStrategy::CostAware => self.select_cost_aware(replication_factor),
313            RoutingStrategy::LatencyAware => self.select_latency_aware(replication_factor),
314            RoutingStrategy::Replicated => self.select_all_healthy(),
315            RoutingStrategy::ConsistentHash => self.select_consistent_hash(cid, replication_factor),
316        }
317    }
318
319    /// Select backends using round-robin
320    fn select_round_robin(&self, count: usize) -> Vec<BackendId> {
321        let healthy: Vec<_> = self
322            .backends
323            .iter()
324            .filter(|b| b.config.read().healthy && b.config.read().write_enabled)
325            .map(|b| b.config.read().id.clone())
326            .collect();
327
328        if healthy.is_empty() {
329            return Vec::new();
330        }
331
332        let mut selected = Vec::new();
333        for _ in 0..count.min(healthy.len()) {
334            let idx = self.round_robin_counter.fetch_add(1, Ordering::Relaxed) % healthy.len();
335            selected.push(healthy[idx].clone());
336        }
337        selected
338    }
339
340    /// Select backends based on size
341    fn select_size_based(&self, data_size: usize, count: usize) -> Vec<BackendId> {
342        let mut candidates: Vec<_> = self
343            .backends
344            .iter()
345            .filter_map(|b| {
346                let config = b.config.read();
347                if !config.healthy || !config.write_enabled {
348                    return None;
349                }
350
351                let matches_size = if let Some(threshold) = config.size_threshold {
352                    if data_size >= threshold as usize {
353                        config.priority >= 50 // Low priority for large blocks
354                    } else {
355                        config.priority > 50 // High priority for small blocks
356                    }
357                } else {
358                    true
359                };
360
361                if matches_size {
362                    Some((config.id.clone(), config.priority))
363                } else {
364                    None
365                }
366            })
367            .collect();
368
369        candidates.sort_by(|a, b| b.1.cmp(&a.1));
370        candidates
371            .into_iter()
372            .take(count)
373            .map(|(id, _)| id)
374            .collect()
375    }
376
377    /// Select least loaded backends
378    fn select_least_loaded(&self, count: usize) -> Vec<BackendId> {
379        let mut candidates: Vec<_> = self
380            .backends
381            .iter()
382            .filter_map(|b| {
383                let config = b.config.read();
384                if !config.healthy || !config.write_enabled {
385                    return None;
386                }
387
388                let load = if config.capacity > 0 {
389                    (config.used as f64 / config.capacity as f64 * 100.0) as u64
390                } else {
391                    0
392                };
393
394                Some((config.id.clone(), load))
395            })
396            .collect();
397
398        candidates.sort_by_key(|(_, load)| *load);
399        candidates
400            .into_iter()
401            .take(count)
402            .map(|(id, _)| id)
403            .collect()
404    }
405
406    /// Select lowest cost backends
407    fn select_cost_aware(&self, count: usize) -> Vec<BackendId> {
408        let mut candidates: Vec<_> = self
409            .backends
410            .iter()
411            .filter_map(|b| {
412                let config = b.config.read();
413                if !config.healthy || !config.write_enabled {
414                    return None;
415                }
416                Some((config.id.clone(), (config.cost_per_gb * 1000.0) as u64))
417            })
418            .collect();
419
420        candidates.sort_by_key(|(_, cost)| *cost);
421        candidates
422            .into_iter()
423            .take(count)
424            .map(|(id, _)| id)
425            .collect()
426    }
427
428    /// Select lowest latency backends
429    fn select_latency_aware(&self, count: usize) -> Vec<BackendId> {
430        let mut candidates: Vec<_> = self
431            .backends
432            .iter()
433            .filter_map(|b| {
434                let config = b.config.read();
435                if !config.healthy || !config.read_enabled {
436                    return None;
437                }
438                Some((config.id.clone(), (config.avg_latency_ms * 1000.0) as u64))
439            })
440            .collect();
441
442        candidates.sort_by_key(|(_, latency)| *latency);
443        candidates
444            .into_iter()
445            .take(count)
446            .map(|(id, _)| id)
447            .collect()
448    }
449
450    /// Select all healthy backends
451    fn select_all_healthy(&self) -> Vec<BackendId> {
452        self.backends
453            .iter()
454            .filter_map(|b| {
455                let config = b.config.read();
456                if config.healthy && config.write_enabled {
457                    Some(config.id.clone())
458                } else {
459                    None
460                }
461            })
462            .collect()
463    }
464
465    /// Select backends using consistent hashing
466    fn select_consistent_hash(&self, cid: &Cid, count: usize) -> Vec<BackendId> {
467        let healthy: Vec<_> = self
468            .backends
469            .iter()
470            .filter_map(|b| {
471                let config = b.config.read();
472                if config.healthy && config.write_enabled {
473                    Some(config.id.clone())
474                } else {
475                    None
476                }
477            })
478            .collect();
479
480        if healthy.is_empty() {
481            return Vec::new();
482        }
483
484        // Use CID hash to determine backend
485        let cid_bytes = cid.to_bytes();
486        let hash = cid_bytes
487            .iter()
488            .fold(0u64, |acc, &b| acc.wrapping_mul(31).wrapping_add(b as u64));
489
490        let mut selected = Vec::new();
491        for i in 0..count.min(healthy.len()) {
492            let idx = ((hash + i as u64) % healthy.len() as u64) as usize;
493            selected.push(healthy[idx].clone());
494        }
495        selected
496    }
497
498    /// Get backends where a CID is stored
499    fn get_backends_for_cid(&self, cid: &Cid) -> Vec<BackendId> {
500        self.cid_map
501            .get(cid)
502            .map(|backends| backends.clone())
503            .unwrap_or_default()
504    }
505
506    /// Record that a CID is stored in a backend
507    fn record_cid_location(&self, cid: Cid, backend_id: BackendId) {
508        self.cid_map
509            .entry(cid)
510            .or_insert_with(Vec::new)
511            .push(backend_id);
512    }
513}
514
515#[async_trait]
516impl<S: BlockStore + Send + Sync + 'static> BlockStore for StoragePool<S> {
517    async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
518        // Try to get from known backends first
519        let known_backends = self.get_backends_for_cid(cid);
520
521        for backend_id in known_backends {
522            if let Some(backend) = self.backends.get(&backend_id) {
523                let (healthy, read_enabled) = {
524                    let config = backend.config.read();
525                    (config.healthy, config.read_enabled)
526                };
527
528                if !healthy || !read_enabled {
529                    continue;
530                }
531
532                match backend.store.get(cid).await {
533                    Ok(Some(block)) => {
534                        backend.stats.record_read(block.data().len() as u64);
535                        return Ok(Some(block));
536                    }
537                    Ok(None) => continue,
538                    Err(e) => {
539                        warn!("Backend {} failed to get CID: {}", backend_id, e);
540                        backend.stats.record_error();
541                    }
542                }
543            }
544        }
545
546        // Fallback: try all healthy backends
547        for backend in self.backends.iter() {
548            let (healthy, read_enabled, backend_id) = {
549                let config = backend.config.read();
550                (config.healthy, config.read_enabled, config.id.clone())
551            };
552
553            if !healthy || !read_enabled {
554                continue;
555            }
556
557            match backend.store.get(cid).await {
558                Ok(Some(block)) => {
559                    backend.stats.record_read(block.data().len() as u64);
560                    // Update mapping
561                    self.record_cid_location(*cid, backend_id);
562                    return Ok(Some(block));
563                }
564                Ok(None) => continue,
565                Err(_) => {
566                    backend.stats.record_error();
567                }
568            }
569        }
570
571        Ok(None)
572    }
573
574    async fn put(&self, block: &Block) -> Result<()> {
575        let cid = block.cid();
576        let data_size = block.data().len();
577        let backends = self.select_backends_for_write(cid, data_size);
578
579        if backends.is_empty() {
580            return Err(Error::Storage(
581                "No healthy backends available for write".to_string(),
582            ));
583        }
584
585        let mut errors = Vec::new();
586        let mut success_count = 0;
587
588        for backend_id in &backends {
589            if let Some(backend) = self.backends.get(backend_id) {
590                match backend.store.put(block).await {
591                    Ok(()) => {
592                        backend.stats.record_write(data_size as u64);
593                        self.record_cid_location(*cid, backend_id.clone());
594                        success_count += 1;
595                    }
596                    Err(e) => {
597                        backend.stats.record_error();
598                        errors.push((backend_id.clone(), e));
599                    }
600                }
601            }
602        }
603
604        if success_count == 0 {
605            return Err(Error::Storage(format!(
606                "Failed to write to any backend: {} errors",
607                errors.len()
608            )));
609        }
610
611        Ok(())
612    }
613
614    async fn has(&self, cid: &Cid) -> Result<bool> {
615        // Check known backends first
616        let known_backends = self.get_backends_for_cid(cid);
617
618        for backend_id in known_backends {
619            if let Some(backend) = self.backends.get(&backend_id) {
620                if !backend.config.read().healthy {
621                    continue;
622                }
623
624                if let Ok(true) = backend.store.has(cid).await {
625                    return Ok(true);
626                }
627            }
628        }
629
630        // Fallback: check all healthy backends
631        for backend in self.backends.iter() {
632            if !backend.config.read().healthy {
633                continue;
634            }
635
636            if let Ok(true) = backend.store.has(cid).await {
637                // Update mapping
638                self.record_cid_location(*cid, backend.config.read().id.clone());
639                return Ok(true);
640            }
641        }
642
643        Ok(false)
644    }
645
646    async fn delete(&self, cid: &Cid) -> Result<()> {
647        let backends = self.get_backends_for_cid(cid);
648
649        if backends.is_empty() {
650            // Try all backends if we don't know where it's stored
651            for backend in self.backends.iter() {
652                let _ = backend.store.delete(cid).await;
653            }
654        } else {
655            for backend_id in &backends {
656                if let Some(backend) = self.backends.get(backend_id) {
657                    let _ = backend.store.delete(cid).await;
658                }
659            }
660        }
661
662        // Remove from mapping
663        self.cid_map.remove(cid);
664        Ok(())
665    }
666
667    fn list_cids(&self) -> Result<Vec<Cid>> {
668        // Get unique CIDs from the mapping
669        let cids: Vec<Cid> = self.cid_map.iter().map(|entry| *entry.key()).collect();
670        Ok(cids)
671    }
672
673    fn len(&self) -> usize {
674        self.cid_map.len()
675    }
676}
677
678#[cfg(test)]
679mod tests {
680    use super::*;
681    use crate::memory::MemoryBlockStore;
682    use bytes::Bytes;
683
684    #[tokio::test]
685    async fn test_pool_basic() {
686        let pool = StoragePool::new(PoolConfig::default());
687
688        let backend1 = Arc::new(MemoryBlockStore::new());
689        let config1 = BackendConfig {
690            id: "backend1".to_string(),
691            ..Default::default()
692        };
693
694        pool.add_backend(config1, backend1);
695
696        let data = Bytes::from_static(b"test data");
697        let block = Block::new(data).unwrap();
698        let cid = block.cid();
699
700        pool.put(&block).await.unwrap();
701        assert!(pool.has(cid).await.unwrap());
702
703        let retrieved = pool.get(cid).await.unwrap();
704        assert!(retrieved.is_some());
705        assert_eq!(retrieved.unwrap().data(), block.data());
706    }
707
708    #[tokio::test]
709    async fn test_pool_replicated() {
710        let config = PoolConfig {
711            strategy: RoutingStrategy::Replicated,
712            ..Default::default()
713        };
714        let pool = StoragePool::new(config);
715
716        let backend1 = Arc::new(MemoryBlockStore::new());
717        let backend2 = Arc::new(MemoryBlockStore::new());
718
719        pool.add_backend(
720            BackendConfig {
721                id: "backend1".to_string(),
722                ..Default::default()
723            },
724            backend1.clone(),
725        );
726
727        pool.add_backend(
728            BackendConfig {
729                id: "backend2".to_string(),
730                ..Default::default()
731            },
732            backend2.clone(),
733        );
734
735        let data = Bytes::from_static(b"test data");
736        let block = Block::new(data).unwrap();
737        let cid = block.cid();
738
739        pool.put(&block).await.unwrap();
740
741        // Should be in both backends
742        assert!(backend1.has(cid).await.unwrap());
743        assert!(backend2.has(cid).await.unwrap());
744    }
745
746    #[tokio::test]
747    async fn test_pool_stats() {
748        let pool = StoragePool::new(PoolConfig::default());
749
750        let backend1 = Arc::new(MemoryBlockStore::new());
751        pool.add_backend(
752            BackendConfig {
753                id: "backend1".to_string(),
754                capacity: 1000,
755                cost_per_gb: 0.023,
756                ..Default::default()
757            },
758            backend1,
759        );
760
761        let stats = pool.stats();
762        assert_eq!(stats.total_backends, 1);
763        assert_eq!(stats.healthy_backends, 1);
764        assert_eq!(stats.total_capacity, 1000);
765    }
766}