use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::fs::{self, File};
use std::io::{BufReader, BufWriter};
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use tracing::{debug, info, span, Level};
use scirs2_core::profiling::Profiler;
use crate::annotations::TripleAnnotation;
use crate::StarResult;
#[derive(Debug, Clone)]
pub struct TierConfig {
pub hot_tier: HotTierConfig,
pub warm_tier: WarmTierConfig,
pub cold_tier: ColdTierConfig,
pub enable_auto_migration: bool,
pub migration_interval_secs: u64,
}
impl Default for TierConfig {
fn default() -> Self {
Self {
hot_tier: HotTierConfig::default(),
warm_tier: WarmTierConfig::default(),
cold_tier: ColdTierConfig::default(),
enable_auto_migration: true,
migration_interval_secs: 300, }
}
}
#[derive(Debug, Clone)]
pub struct HotTierConfig {
pub max_size_bytes: usize,
pub max_entries: usize,
pub eviction_policy: EvictionPolicy,
}
impl Default for HotTierConfig {
fn default() -> Self {
Self {
max_size_bytes: 512 * 1024 * 1024, max_entries: 100_000,
eviction_policy: EvictionPolicy::Lru,
}
}
}
#[derive(Debug, Clone)]
pub struct WarmTierConfig {
pub data_dir: PathBuf,
pub max_size_bytes: usize,
pub cold_tier_threshold_days: i64,
pub enable_compression: bool,
}
impl Default for WarmTierConfig {
fn default() -> Self {
Self {
data_dir: std::env::temp_dir().join("oxirs_warm"),
max_size_bytes: 10 * 1024 * 1024 * 1024, cold_tier_threshold_days: 30,
enable_compression: true,
}
}
}
#[derive(Debug, Clone)]
pub struct ColdTierConfig {
pub data_location: PathBuf,
pub enable_compression: bool,
pub compression_level: i32,
}
impl Default for ColdTierConfig {
fn default() -> Self {
Self {
data_location: std::env::temp_dir().join("oxirs_cold"),
enable_compression: true,
compression_level: 15, }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EvictionPolicy {
Lru,
Lfu,
Fifo,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct AnnotationMetadata {
tier: StorageTier,
access_count: usize,
last_access: DateTime<Utc>,
created_at: DateTime<Utc>,
size_bytes: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum StorageTier {
Hot,
Warm,
Cold,
}
struct HotTier {
data: HashMap<u64, TripleAnnotation>,
lru_queue: VecDeque<u64>,
access_counts: HashMap<u64, usize>,
size_bytes: usize,
config: HotTierConfig,
}
impl HotTier {
fn new(config: HotTierConfig) -> Self {
Self {
data: HashMap::new(),
lru_queue: VecDeque::new(),
access_counts: HashMap::new(),
size_bytes: 0,
config,
}
}
fn insert(
&mut self,
key: u64,
annotation: TripleAnnotation,
) -> Option<(u64, TripleAnnotation)> {
let size = std::mem::size_of::<TripleAnnotation>()
+ annotation.source.as_ref().map_or(0, |s| s.len());
let evicted = if self.data.len() >= self.config.max_entries
|| self.size_bytes + size >= self.config.max_size_bytes
{
self.evict()
} else {
None
};
self.data.insert(key, annotation.clone());
self.lru_queue.push_back(key);
*self.access_counts.entry(key).or_insert(0) += 1;
self.size_bytes += size;
evicted
}
fn get(&mut self, key: u64) -> Option<&TripleAnnotation> {
if let Some(annotation) = self.data.get(&key) {
if let Some(pos) = self.lru_queue.iter().position(|&k| k == key) {
self.lru_queue.remove(pos);
self.lru_queue.push_back(key);
}
*self.access_counts.entry(key).or_insert(0) += 1;
Some(annotation)
} else {
None
}
}
fn evict(&mut self) -> Option<(u64, TripleAnnotation)> {
let evict_key = match self.config.eviction_policy {
EvictionPolicy::Lru => self.lru_queue.pop_front(),
EvictionPolicy::Lfu => {
self.access_counts
.iter()
.min_by_key(|(_, &count)| count)
.map(|(&key, _)| key)
}
EvictionPolicy::Fifo => self.lru_queue.pop_front(),
}?;
let annotation = self.data.remove(&evict_key)?;
self.access_counts.remove(&evict_key);
let size = std::mem::size_of::<TripleAnnotation>()
+ annotation.source.as_ref().map_or(0, |s| s.len());
self.size_bytes = self.size_bytes.saturating_sub(size);
Some((evict_key, annotation))
}
#[allow(dead_code)]
fn remove(&mut self, key: u64) -> Option<TripleAnnotation> {
let annotation = self.data.remove(&key)?;
if let Some(pos) = self.lru_queue.iter().position(|&k| k == key) {
self.lru_queue.remove(pos);
}
self.access_counts.remove(&key);
let size = std::mem::size_of::<TripleAnnotation>()
+ annotation.source.as_ref().map_or(0, |s| s.len());
self.size_bytes = self.size_bytes.saturating_sub(size);
Some(annotation)
}
fn size_bytes(&self) -> usize {
self.size_bytes
}
#[allow(dead_code)]
fn len(&self) -> usize {
self.data.len()
}
}
struct WarmTier {
config: WarmTierConfig,
size_bytes: usize,
}
impl WarmTier {
fn new(config: WarmTierConfig) -> StarResult<Self> {
fs::create_dir_all(&config.data_dir)
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
Ok(Self {
config,
size_bytes: 0,
})
}
fn insert(&mut self, key: u64, annotation: &TripleAnnotation) -> StarResult<()> {
let path = self.config.data_dir.join(format!("{}.ann", key));
let file = File::create(&path)
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
let writer = BufWriter::new(file);
let bytes = oxicode::serde::encode_to_vec(annotation, oxicode::config::standard())
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
let final_bytes = if self.config.enable_compression {
oxiarc_zstd::encode_all(&bytes, 3)
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?
} else {
bytes
};
std::io::Write::write_all(
&mut std::io::BufWriter::new(
writer
.into_inner()
.expect("BufWriter flush should have succeeded"),
),
&final_bytes,
)
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
self.size_bytes += final_bytes.len();
Ok(())
}
fn get(&self, key: u64) -> StarResult<Option<TripleAnnotation>> {
let path = self.config.data_dir.join(format!("{}.ann", key));
if !path.exists() {
return Ok(None);
}
let file = File::open(&path).map_err(|e| crate::StarError::parse_error(e.to_string()))?;
let mut reader = BufReader::new(file);
let mut bytes = Vec::new();
std::io::Read::read_to_end(&mut reader, &mut bytes)
.map_err(|e| crate::StarError::parse_error(e.to_string()))?;
let decompressed = if self.config.enable_compression {
oxiarc_zstd::decode_all(&bytes)
.map_err(|e| crate::StarError::parse_error(e.to_string()))?
} else {
bytes
};
let annotation: TripleAnnotation =
oxicode::serde::decode_from_slice(&decompressed, oxicode::config::standard())
.map_err(|e| crate::StarError::parse_error(e.to_string()))?
.0;
Ok(Some(annotation))
}
fn remove(&mut self, key: u64) -> StarResult<bool> {
let path = self.config.data_dir.join(format!("{}.ann", key));
if path.exists() {
let size = std::fs::metadata(&path)
.map(|m| m.len() as usize)
.unwrap_or(0);
fs::remove_file(&path)
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
self.size_bytes = self.size_bytes.saturating_sub(size);
Ok(true)
} else {
Ok(false)
}
}
}
struct ColdTier {
config: ColdTierConfig,
size_bytes: usize,
}
impl ColdTier {
fn new(config: ColdTierConfig) -> StarResult<Self> {
fs::create_dir_all(&config.data_location)
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
Ok(Self {
config,
size_bytes: 0,
})
}
#[allow(dead_code)]
fn insert(&mut self, key: u64, annotation: &TripleAnnotation) -> StarResult<()> {
let path = self.config.data_location.join(format!("{}.cold", key));
let file = File::create(&path)
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
let writer = BufWriter::new(file);
let bytes = oxicode::serde::encode_to_vec(annotation, oxicode::config::standard())
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
let compressed = oxiarc_zstd::encode_all(&bytes, self.config.compression_level)
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
std::io::Write::write_all(
&mut std::io::BufWriter::new(
writer
.into_inner()
.expect("BufWriter flush should have succeeded"),
),
&compressed,
)
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
self.size_bytes += compressed.len();
Ok(())
}
fn get(&self, key: u64) -> StarResult<Option<TripleAnnotation>> {
let path = self.config.data_location.join(format!("{}.cold", key));
if !path.exists() {
return Ok(None);
}
let file = File::open(&path).map_err(|e| crate::StarError::parse_error(e.to_string()))?;
let mut reader = BufReader::new(file);
let mut compressed = Vec::new();
std::io::Read::read_to_end(&mut reader, &mut compressed)
.map_err(|e| crate::StarError::parse_error(e.to_string()))?;
let bytes = oxiarc_zstd::decode_all(&compressed)
.map_err(|e| crate::StarError::parse_error(e.to_string()))?;
let annotation: TripleAnnotation =
oxicode::serde::decode_from_slice(&bytes, oxicode::config::standard())
.map_err(|e| crate::StarError::parse_error(e.to_string()))?
.0;
Ok(Some(annotation))
}
}
pub struct TieredStorage {
#[allow(dead_code)]
config: TierConfig,
hot_tier: Arc<RwLock<HotTier>>,
warm_tier: Arc<RwLock<WarmTier>>,
cold_tier: Arc<RwLock<ColdTier>>,
metadata: Arc<RwLock<HashMap<u64, AnnotationMetadata>>>,
#[allow(dead_code)]
profiler: Profiler,
stats: Arc<RwLock<TieredStorageStatistics>>,
}
#[derive(Debug, Clone, Default)]
pub struct TieredStorageStatistics {
pub hot_hits: usize,
pub warm_hits: usize,
pub cold_hits: usize,
pub total_reads: usize,
pub total_writes: usize,
pub migrations_up: usize,
pub migrations_down: usize,
pub hot_tier_bytes: usize,
pub warm_tier_bytes: usize,
pub cold_tier_bytes: usize,
}
impl TieredStorage {
pub fn new(config: TierConfig) -> StarResult<Self> {
let span = span!(Level::INFO, "tiered_storage_new");
let _enter = span.enter();
let hot_tier = Arc::new(RwLock::new(HotTier::new(config.hot_tier.clone())));
let warm_tier = Arc::new(RwLock::new(WarmTier::new(config.warm_tier.clone())?));
let cold_tier = Arc::new(RwLock::new(ColdTier::new(config.cold_tier.clone())?));
info!("Created tiered storage system");
Ok(Self {
config,
hot_tier,
warm_tier,
cold_tier,
metadata: Arc::new(RwLock::new(HashMap::new())),
profiler: Profiler::new(),
stats: Arc::new(RwLock::new(TieredStorageStatistics::default())),
})
}
pub fn insert(&mut self, key: u64, annotation: TripleAnnotation) -> StarResult<()> {
let span = span!(Level::DEBUG, "tiered_insert");
let _enter = span.enter();
let size = std::mem::size_of::<TripleAnnotation>()
+ annotation.source.as_ref().map_or(0, |s| s.len());
let evicted = {
let mut hot = self.hot_tier.write().unwrap_or_else(|e| e.into_inner());
hot.insert(key, annotation.clone())
};
if let Some((evict_key, evict_annotation)) = evicted {
debug!("Evicting key {} to warm tier", evict_key);
let mut warm = self.warm_tier.write().unwrap_or_else(|e| e.into_inner());
warm.insert(evict_key, &evict_annotation)?;
let mut metadata = self.metadata.write().unwrap_or_else(|e| e.into_inner());
if let Some(meta) = metadata.get_mut(&evict_key) {
meta.tier = StorageTier::Warm;
}
self.stats
.write()
.unwrap_or_else(|e| e.into_inner())
.migrations_down += 1;
}
{
let mut metadata = self.metadata.write().unwrap_or_else(|e| e.into_inner());
metadata.insert(
key,
AnnotationMetadata {
tier: StorageTier::Hot,
access_count: 1,
last_access: Utc::now(),
created_at: Utc::now(),
size_bytes: size,
},
);
}
self.stats
.write()
.unwrap_or_else(|e| e.into_inner())
.total_writes += 1;
Ok(())
}
pub fn get(&mut self, key: u64) -> StarResult<Option<TripleAnnotation>> {
let span = span!(Level::DEBUG, "tiered_get");
let _enter = span.enter();
self.stats
.write()
.unwrap_or_else(|e| e.into_inner())
.total_reads += 1;
{
let mut hot = self.hot_tier.write().unwrap_or_else(|e| e.into_inner());
if let Some(annotation) = hot.get(key) {
self.stats
.write()
.unwrap_or_else(|e| e.into_inner())
.hot_hits += 1;
self.update_metadata_access(key);
return Ok(Some(annotation.clone()));
}
}
let warm_annotation = {
let warm = self.warm_tier.read().unwrap_or_else(|e| e.into_inner());
warm.get(key)?
};
if let Some(annotation) = warm_annotation {
self.stats
.write()
.unwrap_or_else(|e| e.into_inner())
.warm_hits += 1;
self.update_metadata_access(key);
self.maybe_promote_to_hot(key, annotation.clone())?;
return Ok(Some(annotation));
}
let cold_annotation = {
let cold = self.cold_tier.read().unwrap_or_else(|e| e.into_inner());
cold.get(key)?
};
if let Some(annotation) = cold_annotation {
self.stats
.write()
.unwrap_or_else(|e| e.into_inner())
.cold_hits += 1;
self.update_metadata_access(key);
self.maybe_promote_to_warm(key, annotation.clone())?;
return Ok(Some(annotation));
}
Ok(None)
}
fn update_metadata_access(&self, key: u64) {
let mut metadata = self.metadata.write().unwrap_or_else(|e| e.into_inner());
if let Some(meta) = metadata.get_mut(&key) {
meta.access_count += 1;
meta.last_access = Utc::now();
}
}
fn maybe_promote_to_hot(&mut self, key: u64, annotation: TripleAnnotation) -> StarResult<()> {
let should_promote = {
let metadata = self.metadata.read().unwrap_or_else(|e| e.into_inner());
metadata.get(&key).is_some_and(|meta| meta.access_count > 5)
};
if should_promote {
debug!("Promoting key {} to hot tier", key);
{
let mut warm = self.warm_tier.write().unwrap_or_else(|e| e.into_inner());
warm.remove(key)?;
}
{
let mut hot = self.hot_tier.write().unwrap_or_else(|e| e.into_inner());
hot.insert(key, annotation);
}
{
let mut metadata = self.metadata.write().unwrap_or_else(|e| e.into_inner());
if let Some(meta) = metadata.get_mut(&key) {
meta.tier = StorageTier::Hot;
}
}
self.stats
.write()
.unwrap_or_else(|e| e.into_inner())
.migrations_up += 1;
}
Ok(())
}
fn maybe_promote_to_warm(&mut self, key: u64, annotation: TripleAnnotation) -> StarResult<()> {
debug!("Promoting key {} to warm tier", key);
{
let mut warm = self.warm_tier.write().unwrap_or_else(|e| e.into_inner());
warm.insert(key, &annotation)?;
}
{
let mut metadata = self.metadata.write().unwrap_or_else(|e| e.into_inner());
if let Some(meta) = metadata.get_mut(&key) {
meta.tier = StorageTier::Warm;
}
}
self.stats
.write()
.unwrap_or_else(|e| e.into_inner())
.migrations_up += 1;
Ok(())
}
pub fn statistics(&self) -> TieredStorageStatistics {
let mut stats = self.stats.read().unwrap_or_else(|e| e.into_inner()).clone();
stats.hot_tier_bytes = self
.hot_tier
.read()
.unwrap_or_else(|e| e.into_inner())
.size_bytes();
stats.warm_tier_bytes = self
.warm_tier
.read()
.unwrap_or_else(|e| e.into_inner())
.size_bytes;
stats.cold_tier_bytes = self
.cold_tier
.read()
.unwrap_or_else(|e| e.into_inner())
.size_bytes;
stats
}
pub fn tier_distribution(&self) -> HashMap<StorageTier, usize> {
let metadata = self.metadata.read().unwrap_or_else(|e| e.into_inner());
let mut distribution = HashMap::new();
for meta in metadata.values() {
*distribution.entry(meta.tier).or_insert(0) += 1;
}
distribution
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tiered_storage_creation() {
let config = TierConfig::default();
let storage = TieredStorage::new(config);
assert!(storage.is_ok());
}
#[test]
fn test_insert_and_get() {
let config = TierConfig::default();
let mut storage = TieredStorage::new(config).unwrap();
let annotation = TripleAnnotation::new().with_confidence(0.9);
storage.insert(1, annotation.clone()).unwrap();
let retrieved = storage.get(1).unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().confidence, Some(0.9));
}
#[test]
fn test_hot_tier_eviction() {
let mut config = TierConfig::default();
config.hot_tier.max_entries = 2; let mut storage = TieredStorage::new(config).unwrap();
for i in 0..5 {
let annotation = TripleAnnotation::new().with_confidence(0.8);
storage.insert(i, annotation).unwrap();
}
let stats = storage.statistics();
assert!(stats.migrations_down > 0);
}
#[test]
fn test_tier_distribution() {
let config = TierConfig::default();
let mut storage = TieredStorage::new(config).unwrap();
for i in 0..10 {
let annotation = TripleAnnotation::new().with_confidence(0.8);
storage.insert(i, annotation).unwrap();
}
let distribution = storage.tier_distribution();
assert!(distribution.contains_key(&StorageTier::Hot));
}
#[test]
fn test_statistics() {
let config = TierConfig::default();
let mut storage = TieredStorage::new(config).unwrap();
let annotation = TripleAnnotation::new().with_confidence(0.9);
storage.insert(1, annotation).unwrap();
storage.get(1).unwrap();
storage.get(2).unwrap();
let stats = storage.statistics();
assert_eq!(stats.total_writes, 1);
assert_eq!(stats.total_reads, 2);
assert_eq!(stats.hot_hits, 1);
}
}