Skip to main content

agentic_memory/v3/
tiered.rs

1//! Tiered storage: Hot -> Warm -> Cold -> Frozen.
2
3use super::block::Block;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::time::Duration;
8
9/// Tiered storage configuration
10#[derive(Clone, Debug)]
11pub struct TierConfig {
12    /// Hot tier: blocks younger than this
13    pub hot_threshold: Duration,
14    /// Warm tier: blocks younger than this (but older than hot)
15    pub warm_threshold: Duration,
16    /// Cold tier: blocks younger than this (but older than warm)
17    pub cold_threshold: Duration,
18    /// Maximum hot tier size in bytes
19    pub hot_max_bytes: usize,
20    /// Maximum warm tier size in bytes
21    pub warm_max_bytes: usize,
22}
23
24impl Default for TierConfig {
25    fn default() -> Self {
26        Self {
27            hot_threshold: Duration::from_secs(24 * 60 * 60), // 24 hours
28            warm_threshold: Duration::from_secs(30 * 24 * 60 * 60), // 30 days
29            cold_threshold: Duration::from_secs(365 * 24 * 60 * 60), // 1 year
30            hot_max_bytes: 10 * 1024 * 1024,                  // 10 MB
31            warm_max_bytes: 100 * 1024 * 1024,                // 100 MB
32        }
33    }
34}
35
36/// Tiered storage: Hot -> Warm -> Cold -> Frozen
37pub struct TieredStorage {
38    hot: HashMap<u64, Block>,
39    hot_bytes: usize,
40    warm: HashMap<u64, Block>,
41    warm_bytes: usize,
42    cold: HashMap<u64, Vec<u8>>,
43    cold_bytes: usize,
44    frozen: HashMap<u64, Vec<u8>>,
45    config: TierConfig,
46}
47
48impl TieredStorage {
49    pub fn new(config: TierConfig) -> Self {
50        Self {
51            hot: HashMap::new(),
52            hot_bytes: 0,
53            warm: HashMap::new(),
54            warm_bytes: 0,
55            cold: HashMap::new(),
56            cold_bytes: 0,
57            frozen: HashMap::new(),
58            config,
59        }
60    }
61
62    /// Store a block (goes to hot tier)
63    pub fn store(&mut self, block: Block) {
64        self.hot_bytes += block.size_bytes as usize;
65        self.hot.insert(block.sequence, block);
66        self.maybe_demote();
67    }
68
69    /// Retrieve a block from any tier
70    pub fn get(&self, sequence: u64) -> Option<Block> {
71        self.hot
72            .get(&sequence)
73            .cloned()
74            .or_else(|| self.warm.get(&sequence).cloned())
75            .or_else(|| {
76                self.cold
77                    .get(&sequence)
78                    .and_then(|data| serde_json::from_slice(data).ok())
79            })
80            .or_else(|| {
81                self.frozen
82                    .get(&sequence)
83                    .and_then(|data| serde_json::from_slice(data).ok())
84            })
85    }
86
87    /// Check if we need to demote blocks to lower tiers
88    fn maybe_demote(&mut self) {
89        let now = Utc::now();
90
91        // Demote from hot to warm
92        if self.hot_bytes > self.config.hot_max_bytes {
93            let to_demote: Vec<u64> = self
94                .hot
95                .iter()
96                .filter(|(_, b)| {
97                    let age = now.signed_duration_since(b.timestamp);
98                    age.num_seconds() > self.config.hot_threshold.as_secs() as i64
99                })
100                .map(|(&seq, _)| seq)
101                .collect();
102
103            for seq in to_demote {
104                if let Some(block) = self.hot.remove(&seq) {
105                    self.hot_bytes -= block.size_bytes as usize;
106                    self.warm_bytes += block.size_bytes as usize;
107                    self.warm.insert(seq, block);
108                }
109            }
110        }
111
112        // Demote from warm to cold
113        if self.warm_bytes > self.config.warm_max_bytes {
114            let to_demote: Vec<u64> = self
115                .warm
116                .iter()
117                .filter(|(_, b)| {
118                    let age = now.signed_duration_since(b.timestamp);
119                    age.num_seconds() > self.config.warm_threshold.as_secs() as i64
120                })
121                .map(|(&seq, _)| seq)
122                .collect();
123
124            for seq in to_demote {
125                if let Some(block) = self.warm.remove(&seq) {
126                    self.warm_bytes -= block.size_bytes as usize;
127                    let data = serde_json::to_vec(&block).unwrap_or_default();
128                    self.cold_bytes += data.len();
129                    self.cold.insert(seq, data);
130                }
131            }
132        }
133    }
134
135    /// Force archive old blocks to frozen tier
136    pub fn archive_old(&mut self, older_than: DateTime<Utc>) {
137        let cold_to_freeze: Vec<u64> = self.cold.keys().cloned().collect();
138
139        for seq in cold_to_freeze {
140            if let Some(data) = self.cold.get(&seq) {
141                if let Ok(block) = serde_json::from_slice::<Block>(data) {
142                    if block.timestamp < older_than {
143                        let data = self.cold.remove(&seq).unwrap();
144                        self.cold_bytes -= data.len();
145                        self.frozen.insert(seq, data);
146                    }
147                }
148            }
149        }
150    }
151
152    /// Get storage statistics
153    pub fn stats(&self) -> TierStats {
154        TierStats {
155            hot_blocks: self.hot.len(),
156            hot_bytes: self.hot_bytes,
157            warm_blocks: self.warm.len(),
158            warm_bytes: self.warm_bytes,
159            cold_blocks: self.cold.len(),
160            cold_bytes: self.cold_bytes,
161            frozen_blocks: self.frozen.len(),
162        }
163    }
164
165    /// Total bytes across all tiers
166    pub fn total_bytes(&self) -> usize {
167        self.hot_bytes + self.warm_bytes + self.cold_bytes
168    }
169
170    /// Check memory pressure and evict if needed
171    pub fn check_memory_pressure(&mut self, max_memory_bytes: usize) {
172        let total = self.total_bytes();
173        let pressure = total as f64 / max_memory_bytes.max(1) as f64;
174
175        if pressure > 0.9 {
176            log::warn!(
177                "Memory pressure at {:.1}%, forcing eviction",
178                pressure * 100.0
179            );
180            self.force_eviction(max_memory_bytes, 0.7);
181        } else if pressure > 0.8 {
182            self.maybe_demote();
183        }
184    }
185
186    /// Force eviction to reduce memory to target ratio
187    pub fn force_eviction(&mut self, max_memory_bytes: usize, target_ratio: f64) {
188        let target = (max_memory_bytes as f64 * target_ratio) as usize;
189
190        // Evict oldest from hot to warm
191        while self.hot_bytes > target / 3 && !self.hot.is_empty() {
192            // Find oldest block in hot tier
193            if let Some((&oldest_seq, _)) = self.hot.iter().min_by_key(|(_, b)| b.timestamp) {
194                if let Some(block) = self.hot.remove(&oldest_seq) {
195                    self.hot_bytes -= block.size_bytes as usize;
196                    self.warm_bytes += block.size_bytes as usize;
197                    self.warm.insert(oldest_seq, block);
198                }
199            } else {
200                break;
201            }
202        }
203
204        // Evict oldest from warm to cold
205        while self.warm_bytes > target / 3 && !self.warm.is_empty() {
206            if let Some((&oldest_seq, _)) = self.warm.iter().min_by_key(|(_, b)| b.timestamp) {
207                if let Some(block) = self.warm.remove(&oldest_seq) {
208                    self.warm_bytes -= block.size_bytes as usize;
209                    let data = serde_json::to_vec(&block).unwrap_or_default();
210                    self.cold_bytes += data.len();
211                    self.cold.insert(oldest_seq, data);
212                }
213            } else {
214                break;
215            }
216        }
217
218        log::info!(
219            "After eviction: hot={}KB, warm={}KB, cold={}KB",
220            self.hot_bytes / 1024,
221            self.warm_bytes / 1024,
222            self.cold_bytes / 1024
223        );
224    }
225
226    /// Get count of total blocks across all tiers
227    pub fn total_blocks(&self) -> usize {
228        self.hot.len() + self.warm.len() + self.cold.len() + self.frozen.len()
229    }
230}
231
232/// Storage tier statistics
233#[derive(Debug, Clone, Serialize, Deserialize)]
234pub struct TierStats {
235    pub hot_blocks: usize,
236    pub hot_bytes: usize,
237    pub warm_blocks: usize,
238    pub warm_bytes: usize,
239    pub cold_blocks: usize,
240    pub cold_bytes: usize,
241    pub frozen_blocks: usize,
242}