1use crate::{Result, ServerlessError};
4use dashmap::DashMap;
5use serde::{Deserialize, Serialize};
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::Instant;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct PoolConfig {
13 pub max_size: u64,
15 pub max_fragments: usize,
17 pub expiry_seconds: u64,
19 pub dedup_enabled: bool,
21 pub prewarm_common: bool,
23}
24
25impl Default for PoolConfig {
26 fn default() -> Self {
27 Self {
28 max_size: 512 * 1024 * 1024, max_fragments: 1000,
30 expiry_seconds: 300, dedup_enabled: true,
32 prewarm_common: true,
33 }
34 }
35}
36
37#[derive(Debug)]
39pub struct PooledFragment {
40 pub id: u64,
42 pub key: String,
44 pub data: Arc<Vec<u8>>,
46 pub hash: u64,
48 pub size: u64,
50 pub created_at: Instant,
52 pub last_accessed: Instant,
54 pub access_count: AtomicU64,
56}
57
58impl Clone for PooledFragment {
59 fn clone(&self) -> Self {
60 Self {
61 id: self.id,
62 key: self.key.clone(),
63 data: self.data.clone(),
64 hash: self.hash,
65 size: self.size,
66 created_at: self.created_at,
67 last_accessed: self.last_accessed,
68 access_count: AtomicU64::new(self.access_count.load(Ordering::Relaxed)),
69 }
70 }
71}
72
73impl PooledFragment {
74 pub fn new(key: impl Into<String>, data: Vec<u8>) -> Self {
76 static NEXT_ID: AtomicU64 = AtomicU64::new(1);
77
78 let hash = Self::compute_hash(&data);
79 let size = data.len() as u64;
80 let now = Instant::now();
81
82 Self {
83 id: NEXT_ID.fetch_add(1, Ordering::SeqCst),
84 key: key.into(),
85 data: Arc::new(data),
86 hash,
87 size,
88 created_at: now,
89 last_accessed: now,
90 access_count: AtomicU64::new(1),
91 }
92 }
93
94 fn compute_hash(data: &[u8]) -> u64 {
96 use std::hash::{Hash, Hasher};
97 let mut hasher = std::collections::hash_map::DefaultHasher::new();
98 data.hash(&mut hasher);
99 hasher.finish()
100 }
101
102 pub fn record_access(&self) {
104 self.access_count.fetch_add(1, Ordering::Relaxed);
105 }
106
107 pub fn get_access_count(&self) -> u64 {
109 self.access_count.load(Ordering::Relaxed)
110 }
111
112 pub fn is_expired(&self, expiry_seconds: u64) -> bool {
114 self.created_at.elapsed().as_secs() > expiry_seconds
115 }
116}
117
118#[derive(Debug)]
120pub struct FragmentPool {
121 config: PoolConfig,
123 fragments: DashMap<String, PooledFragment>,
125 hash_to_key: DashMap<u64, String>,
127 total_size: AtomicU64,
129 stats: PoolStats,
131}
132
133#[derive(Debug, Default)]
135pub struct PoolStats {
136 hits: AtomicU64,
138 misses: AtomicU64,
140 evictions: AtomicU64,
142 dedup_savings: AtomicU64,
144}
145
146impl FragmentPool {
147 pub fn new(config: PoolConfig) -> Self {
149 Self {
150 config,
151 fragments: DashMap::new(),
152 hash_to_key: DashMap::new(),
153 total_size: AtomicU64::new(0),
154 stats: PoolStats::default(),
155 }
156 }
157
158 pub fn get_or_create<F>(&self, key: &str, create_fn: F) -> Result<Arc<Vec<u8>>>
160 where
161 F: FnOnce() -> Result<Vec<u8>>,
162 {
163 if let Some(fragment) = self.fragments.get(key) {
165 if !fragment.is_expired(self.config.expiry_seconds) {
166 fragment.record_access();
167 self.stats.hits.fetch_add(1, Ordering::Relaxed);
168 return Ok(Arc::clone(&fragment.data));
169 }
170 }
171
172 self.stats.misses.fetch_add(1, Ordering::Relaxed);
173
174 let data = create_fn()?;
176
177 if self.config.dedup_enabled {
179 let hash = PooledFragment::compute_hash(&data);
180 if let Some(existing_key) = self.hash_to_key.get(&hash) {
181 if let Some(existing) = self.fragments.get(existing_key.value()) {
182 self.stats
183 .dedup_savings
184 .fetch_add(data.len() as u64, Ordering::Relaxed);
185 return Ok(Arc::clone(&existing.data));
186 }
187 }
188 }
189
190 self.ensure_space(data.len() as u64)?;
192
193 let fragment = PooledFragment::new(key, data);
195 let data_ref = Arc::clone(&fragment.data);
196 let hash = fragment.hash;
197
198 self.total_size.fetch_add(fragment.size, Ordering::SeqCst);
199 self.fragments.insert(key.to_string(), fragment);
200
201 if self.config.dedup_enabled {
202 self.hash_to_key.insert(hash, key.to_string());
203 }
204
205 Ok(data_ref)
206 }
207
208 pub fn get(&self, key: &str) -> Option<Arc<Vec<u8>>> {
210 if let Some(fragment) = self.fragments.get(key) {
211 if !fragment.is_expired(self.config.expiry_seconds) {
212 fragment.record_access();
213 self.stats.hits.fetch_add(1, Ordering::Relaxed);
214 return Some(Arc::clone(&fragment.data));
215 }
216 }
217 self.stats.misses.fetch_add(1, Ordering::Relaxed);
218 None
219 }
220
221 pub fn insert(&self, key: impl Into<String>, data: Vec<u8>) -> Result<()> {
223 let size = data.len() as u64;
224 self.ensure_space(size)?;
225
226 let fragment = PooledFragment::new(key, data);
227 let hash = fragment.hash;
228 let key = fragment.key.clone();
229
230 self.total_size.fetch_add(fragment.size, Ordering::SeqCst);
231 self.fragments.insert(key.clone(), fragment);
232
233 if self.config.dedup_enabled {
234 self.hash_to_key.insert(hash, key);
235 }
236
237 Ok(())
238 }
239
240 pub fn remove(&self, key: &str) -> Option<PooledFragment> {
242 if let Some((_, fragment)) = self.fragments.remove(key) {
243 self.total_size.fetch_sub(fragment.size, Ordering::SeqCst);
244 self.hash_to_key.remove(&fragment.hash);
245 Some(fragment)
246 } else {
247 None
248 }
249 }
250
251 fn ensure_space(&self, required: u64) -> Result<()> {
253 while self.fragments.len() >= self.config.max_fragments {
255 self.evict_one()?;
256 }
257
258 while self.total_size.load(Ordering::SeqCst) + required > self.config.max_size {
260 self.evict_one()?;
261 }
262
263 Ok(())
264 }
265
266 fn evict_one(&self) -> Result<()> {
268 let mut oldest_key = None;
269 let mut oldest_time = None;
270
271 for entry in self.fragments.iter() {
272 let time = entry.value().last_accessed;
273 if oldest_time.is_none() || time < oldest_time.unwrap() {
274 oldest_key = Some(entry.key().clone());
275 oldest_time = Some(time);
276 }
277 }
278
279 if let Some(key) = oldest_key {
280 self.remove(&key);
281 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
282 Ok(())
283 } else {
284 Err(ServerlessError::PoolError("No fragments to evict".into()))
285 }
286 }
287
288 pub fn clear_expired(&self) -> usize {
290 let mut expired = Vec::new();
291
292 for entry in self.fragments.iter() {
293 if entry.value().is_expired(self.config.expiry_seconds) {
294 expired.push(entry.key().clone());
295 }
296 }
297
298 for key in &expired {
299 self.remove(key);
300 }
301
302 expired.len()
303 }
304
305 pub fn clear(&self) {
307 self.fragments.clear();
308 self.hash_to_key.clear();
309 self.total_size.store(0, Ordering::SeqCst);
310 }
311
312 pub fn total_size(&self) -> u64 {
314 self.total_size.load(Ordering::SeqCst)
315 }
316
317 pub fn len(&self) -> usize {
319 self.fragments.len()
320 }
321
322 pub fn is_empty(&self) -> bool {
324 self.fragments.is_empty()
325 }
326
327 pub fn stats(&self) -> (u64, u64, u64, u64) {
329 (
330 self.stats.hits.load(Ordering::Relaxed),
331 self.stats.misses.load(Ordering::Relaxed),
332 self.stats.evictions.load(Ordering::Relaxed),
333 self.stats.dedup_savings.load(Ordering::Relaxed),
334 )
335 }
336
337 pub fn hit_rate(&self) -> f64 {
339 let hits = self.stats.hits.load(Ordering::Relaxed);
340 let misses = self.stats.misses.load(Ordering::Relaxed);
341 let total = hits + misses;
342
343 if total == 0 {
344 0.0
345 } else {
346 hits as f64 / total as f64
347 }
348 }
349}
350
351#[derive(Debug)]
353pub struct FragmentPrewarmer {
354 pool: Arc<FragmentPool>,
356 prewarm_list: Vec<PrewarmEntry>,
358}
359
360#[derive(Debug, Clone)]
362#[allow(dead_code)]
363struct PrewarmEntry {
364 key: String,
365 priority: u32,
366 size_hint: u64,
367}
368
369impl FragmentPrewarmer {
370 pub fn new(pool: Arc<FragmentPool>) -> Self {
372 Self {
373 pool,
374 prewarm_list: Vec::new(),
375 }
376 }
377
378 pub fn add(&mut self, key: impl Into<String>, priority: u32, size_hint: u64) {
380 self.prewarm_list.push(PrewarmEntry {
381 key: key.into(),
382 priority,
383 size_hint,
384 });
385 }
386
387 pub fn sort(&mut self) {
389 self.prewarm_list
390 .sort_by(|a, b| b.priority.cmp(&a.priority));
391 }
392
393 pub async fn prewarm<F>(&self, loader: F) -> Result<usize>
395 where
396 F: Fn(&str) -> Result<Vec<u8>>,
397 {
398 let mut count = 0;
399
400 for entry in &self.prewarm_list {
401 if self.pool.get(&entry.key).is_none() {
402 let data = loader(&entry.key)?;
403 self.pool.insert(entry.key.clone(), data)?;
404 count += 1;
405 }
406 }
407
408 Ok(count)
409 }
410
411 pub fn list_size(&self) -> usize {
413 self.prewarm_list.len()
414 }
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420
421 #[test]
422 fn test_config_default() {
423 let config = PoolConfig::default();
424 assert_eq!(config.max_size, 512 * 1024 * 1024);
425 assert!(config.dedup_enabled);
426 }
427
428 #[test]
429 fn test_pooled_fragment() {
430 let fragment = PooledFragment::new("test", vec![1, 2, 3, 4]);
431
432 assert_eq!(fragment.key, "test");
433 assert_eq!(fragment.size, 4);
434 assert_eq!(fragment.get_access_count(), 1);
435
436 fragment.record_access();
437 assert_eq!(fragment.get_access_count(), 2);
438 }
439
440 #[test]
441 fn test_pool_creation() {
442 let config = PoolConfig::default();
443 let pool = FragmentPool::new(config);
444
445 assert!(pool.is_empty());
446 assert_eq!(pool.total_size(), 0);
447 }
448
449 #[test]
450 fn test_pool_insert_get() {
451 let config = PoolConfig::default();
452 let pool = FragmentPool::new(config);
453
454 pool.insert("key1", vec![1, 2, 3, 4]).unwrap();
455 assert_eq!(pool.len(), 1);
456 assert_eq!(pool.total_size(), 4);
457
458 let data = pool.get("key1").unwrap();
459 assert_eq!(*data, vec![1, 2, 3, 4]);
460
461 assert!(pool.get("nonexistent").is_none());
462 }
463
464 #[test]
465 fn test_pool_dedup() {
466 let config = PoolConfig {
467 dedup_enabled: true,
468 ..Default::default()
469 };
470 let pool = FragmentPool::new(config);
471
472 let data = vec![1, 2, 3, 4];
474 pool.insert("key1", data.clone()).unwrap();
475
476 let result = pool.get_or_create("key2", || Ok(data.clone()));
477 assert!(result.is_ok());
478
479 let (_, _, _, dedup) = pool.stats();
481 assert!(dedup > 0);
482 }
483
484 #[test]
485 fn test_pool_eviction() {
486 let config = PoolConfig {
487 max_fragments: 2,
488 ..Default::default()
489 };
490 let pool = FragmentPool::new(config);
491
492 pool.insert("key1", vec![1]).unwrap();
493 pool.insert("key2", vec![2]).unwrap();
494 pool.insert("key3", vec![3]).unwrap();
495
496 assert_eq!(pool.len(), 2);
498
499 let (_, _, evictions, _) = pool.stats();
500 assert_eq!(evictions, 1);
501 }
502
503 #[test]
504 fn test_prewarmer() {
505 let config = PoolConfig::default();
506 let pool = Arc::new(FragmentPool::new(config));
507 let mut prewarmer = FragmentPrewarmer::new(Arc::clone(&pool));
508
509 prewarmer.add("layer0", 10, 1024);
510 prewarmer.add("layer1", 5, 1024);
511 prewarmer.sort();
512
513 assert_eq!(prewarmer.list_size(), 2);
514 }
515}