agentic_memory/v3/
tiered.rs1use super::block::Block;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::time::Duration;
8
9#[derive(Clone, Debug)]
11pub struct TierConfig {
12 pub hot_threshold: Duration,
14 pub warm_threshold: Duration,
16 pub cold_threshold: Duration,
18 pub hot_max_bytes: usize,
20 pub warm_max_bytes: usize,
22}
23
24impl Default for TierConfig {
25 fn default() -> Self {
26 Self {
27 hot_threshold: Duration::from_secs(24 * 60 * 60), warm_threshold: Duration::from_secs(30 * 24 * 60 * 60), cold_threshold: Duration::from_secs(365 * 24 * 60 * 60), hot_max_bytes: 10 * 1024 * 1024, warm_max_bytes: 100 * 1024 * 1024, }
33 }
34}
35
36pub struct TieredStorage {
38 hot: HashMap<u64, Block>,
39 hot_bytes: usize,
40 warm: HashMap<u64, Block>,
41 warm_bytes: usize,
42 cold: HashMap<u64, Vec<u8>>,
43 cold_bytes: usize,
44 frozen: HashMap<u64, Vec<u8>>,
45 config: TierConfig,
46}
47
48impl TieredStorage {
49 pub fn new(config: TierConfig) -> Self {
50 Self {
51 hot: HashMap::new(),
52 hot_bytes: 0,
53 warm: HashMap::new(),
54 warm_bytes: 0,
55 cold: HashMap::new(),
56 cold_bytes: 0,
57 frozen: HashMap::new(),
58 config,
59 }
60 }
61
62 pub fn store(&mut self, block: Block) {
64 self.hot_bytes += block.size_bytes as usize;
65 self.hot.insert(block.sequence, block);
66 self.maybe_demote();
67 }
68
69 pub fn get(&self, sequence: u64) -> Option<Block> {
71 self.hot
72 .get(&sequence)
73 .cloned()
74 .or_else(|| self.warm.get(&sequence).cloned())
75 .or_else(|| {
76 self.cold
77 .get(&sequence)
78 .and_then(|data| serde_json::from_slice(data).ok())
79 })
80 .or_else(|| {
81 self.frozen
82 .get(&sequence)
83 .and_then(|data| serde_json::from_slice(data).ok())
84 })
85 }
86
87 fn maybe_demote(&mut self) {
89 let now = Utc::now();
90
91 if self.hot_bytes > self.config.hot_max_bytes {
93 let to_demote: Vec<u64> = self
94 .hot
95 .iter()
96 .filter(|(_, b)| {
97 let age = now.signed_duration_since(b.timestamp);
98 age.num_seconds() > self.config.hot_threshold.as_secs() as i64
99 })
100 .map(|(&seq, _)| seq)
101 .collect();
102
103 for seq in to_demote {
104 if let Some(block) = self.hot.remove(&seq) {
105 self.hot_bytes -= block.size_bytes as usize;
106 self.warm_bytes += block.size_bytes as usize;
107 self.warm.insert(seq, block);
108 }
109 }
110 }
111
112 if self.warm_bytes > self.config.warm_max_bytes {
114 let to_demote: Vec<u64> = self
115 .warm
116 .iter()
117 .filter(|(_, b)| {
118 let age = now.signed_duration_since(b.timestamp);
119 age.num_seconds() > self.config.warm_threshold.as_secs() as i64
120 })
121 .map(|(&seq, _)| seq)
122 .collect();
123
124 for seq in to_demote {
125 if let Some(block) = self.warm.remove(&seq) {
126 self.warm_bytes -= block.size_bytes as usize;
127 let data = serde_json::to_vec(&block).unwrap_or_default();
128 self.cold_bytes += data.len();
129 self.cold.insert(seq, data);
130 }
131 }
132 }
133 }
134
135 pub fn archive_old(&mut self, older_than: DateTime<Utc>) {
137 let cold_to_freeze: Vec<u64> = self.cold.keys().cloned().collect();
138
139 for seq in cold_to_freeze {
140 if let Some(data) = self.cold.get(&seq) {
141 if let Ok(block) = serde_json::from_slice::<Block>(data) {
142 if block.timestamp < older_than {
143 let data = self.cold.remove(&seq).unwrap();
144 self.cold_bytes -= data.len();
145 self.frozen.insert(seq, data);
146 }
147 }
148 }
149 }
150 }
151
152 pub fn stats(&self) -> TierStats {
154 TierStats {
155 hot_blocks: self.hot.len(),
156 hot_bytes: self.hot_bytes,
157 warm_blocks: self.warm.len(),
158 warm_bytes: self.warm_bytes,
159 cold_blocks: self.cold.len(),
160 cold_bytes: self.cold_bytes,
161 frozen_blocks: self.frozen.len(),
162 }
163 }
164
165 pub fn total_bytes(&self) -> usize {
167 self.hot_bytes + self.warm_bytes + self.cold_bytes
168 }
169
170 pub fn check_memory_pressure(&mut self, max_memory_bytes: usize) {
172 let total = self.total_bytes();
173 let pressure = total as f64 / max_memory_bytes.max(1) as f64;
174
175 if pressure > 0.9 {
176 log::warn!(
177 "Memory pressure at {:.1}%, forcing eviction",
178 pressure * 100.0
179 );
180 self.force_eviction(max_memory_bytes, 0.7);
181 } else if pressure > 0.8 {
182 self.maybe_demote();
183 }
184 }
185
186 pub fn force_eviction(&mut self, max_memory_bytes: usize, target_ratio: f64) {
188 let target = (max_memory_bytes as f64 * target_ratio) as usize;
189
190 while self.hot_bytes > target / 3 && !self.hot.is_empty() {
192 if let Some((&oldest_seq, _)) = self.hot.iter().min_by_key(|(_, b)| b.timestamp) {
194 if let Some(block) = self.hot.remove(&oldest_seq) {
195 self.hot_bytes -= block.size_bytes as usize;
196 self.warm_bytes += block.size_bytes as usize;
197 self.warm.insert(oldest_seq, block);
198 }
199 } else {
200 break;
201 }
202 }
203
204 while self.warm_bytes > target / 3 && !self.warm.is_empty() {
206 if let Some((&oldest_seq, _)) = self.warm.iter().min_by_key(|(_, b)| b.timestamp) {
207 if let Some(block) = self.warm.remove(&oldest_seq) {
208 self.warm_bytes -= block.size_bytes as usize;
209 let data = serde_json::to_vec(&block).unwrap_or_default();
210 self.cold_bytes += data.len();
211 self.cold.insert(oldest_seq, data);
212 }
213 } else {
214 break;
215 }
216 }
217
218 log::info!(
219 "After eviction: hot={}KB, warm={}KB, cold={}KB",
220 self.hot_bytes / 1024,
221 self.warm_bytes / 1024,
222 self.cold_bytes / 1024
223 );
224 }
225
226 pub fn total_blocks(&self) -> usize {
228 self.hot.len() + self.warm.len() + self.cold.len() + self.frozen.len()
229 }
230}
231
232#[derive(Debug, Clone, Serialize, Deserialize)]
234pub struct TierStats {
235 pub hot_blocks: usize,
236 pub hot_bytes: usize,
237 pub warm_blocks: usize,
238 pub warm_bytes: usize,
239 pub cold_blocks: usize,
240 pub cold_bytes: usize,
241 pub frozen_blocks: usize,
242}