Skip to main content

haagenti_serverless/
fragment_pool.rs

1//! Pre-warmed fragment pools for fast cold starts
2
3use crate::{Result, ServerlessError};
4use dashmap::DashMap;
5use serde::{Deserialize, Serialize};
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::Instant;
9
10/// Pool configuration
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct PoolConfig {
13    /// Maximum pool size in bytes
14    pub max_size: u64,
15    /// Maximum fragments per pool
16    pub max_fragments: usize,
17    /// Fragment expiry time in seconds
18    pub expiry_seconds: u64,
19    /// Enable deduplication
20    pub dedup_enabled: bool,
21    /// Pre-warm common fragments
22    pub prewarm_common: bool,
23}
24
25impl Default for PoolConfig {
26    fn default() -> Self {
27        Self {
28            max_size: 512 * 1024 * 1024, // 512MB
29            max_fragments: 1000,
30            expiry_seconds: 300, // 5 minutes
31            dedup_enabled: true,
32            prewarm_common: true,
33        }
34    }
35}
36
37/// A pooled fragment
38#[derive(Debug)]
39pub struct PooledFragment {
40    /// Fragment ID
41    pub id: u64,
42    /// Fragment key
43    pub key: String,
44    /// Fragment data
45    pub data: Arc<Vec<u8>>,
46    /// Content hash for deduplication
47    pub hash: u64,
48    /// Size in bytes
49    pub size: u64,
50    /// Creation time
51    pub created_at: Instant,
52    /// Last access time
53    pub last_accessed: Instant,
54    /// Access count
55    pub access_count: AtomicU64,
56}
57
58impl Clone for PooledFragment {
59    fn clone(&self) -> Self {
60        Self {
61            id: self.id,
62            key: self.key.clone(),
63            data: self.data.clone(),
64            hash: self.hash,
65            size: self.size,
66            created_at: self.created_at,
67            last_accessed: self.last_accessed,
68            access_count: AtomicU64::new(self.access_count.load(Ordering::Relaxed)),
69        }
70    }
71}
72
73impl PooledFragment {
74    /// Create new fragment
75    pub fn new(key: impl Into<String>, data: Vec<u8>) -> Self {
76        static NEXT_ID: AtomicU64 = AtomicU64::new(1);
77
78        let hash = Self::compute_hash(&data);
79        let size = data.len() as u64;
80        let now = Instant::now();
81
82        Self {
83            id: NEXT_ID.fetch_add(1, Ordering::SeqCst),
84            key: key.into(),
85            data: Arc::new(data),
86            hash,
87            size,
88            created_at: now,
89            last_accessed: now,
90            access_count: AtomicU64::new(1),
91        }
92    }
93
94    /// Compute content hash
95    fn compute_hash(data: &[u8]) -> u64 {
96        use std::hash::{Hash, Hasher};
97        let mut hasher = std::collections::hash_map::DefaultHasher::new();
98        data.hash(&mut hasher);
99        hasher.finish()
100    }
101
102    /// Record access
103    pub fn record_access(&self) {
104        self.access_count.fetch_add(1, Ordering::Relaxed);
105    }
106
107    /// Get access count
108    pub fn get_access_count(&self) -> u64 {
109        self.access_count.load(Ordering::Relaxed)
110    }
111
112    /// Check if expired
113    pub fn is_expired(&self, expiry_seconds: u64) -> bool {
114        self.created_at.elapsed().as_secs() > expiry_seconds
115    }
116}
117
118/// Fragment pool for pre-warmed fragments
119#[derive(Debug)]
120pub struct FragmentPool {
121    /// Configuration
122    config: PoolConfig,
123    /// Fragments by key
124    fragments: DashMap<String, PooledFragment>,
125    /// Hash to key mapping for deduplication
126    hash_to_key: DashMap<u64, String>,
127    /// Total size
128    total_size: AtomicU64,
129    /// Pool statistics
130    stats: PoolStats,
131}
132
133/// Pool statistics
134#[derive(Debug, Default)]
135pub struct PoolStats {
136    /// Cache hits
137    hits: AtomicU64,
138    /// Cache misses
139    misses: AtomicU64,
140    /// Evictions
141    evictions: AtomicU64,
142    /// Dedup savings in bytes
143    dedup_savings: AtomicU64,
144}
145
146impl FragmentPool {
147    /// Create new pool
148    pub fn new(config: PoolConfig) -> Self {
149        Self {
150            config,
151            fragments: DashMap::new(),
152            hash_to_key: DashMap::new(),
153            total_size: AtomicU64::new(0),
154            stats: PoolStats::default(),
155        }
156    }
157
158    /// Get or create fragment
159    pub fn get_or_create<F>(&self, key: &str, create_fn: F) -> Result<Arc<Vec<u8>>>
160    where
161        F: FnOnce() -> Result<Vec<u8>>,
162    {
163        // Check for existing fragment
164        if let Some(fragment) = self.fragments.get(key) {
165            if !fragment.is_expired(self.config.expiry_seconds) {
166                fragment.record_access();
167                self.stats.hits.fetch_add(1, Ordering::Relaxed);
168                return Ok(Arc::clone(&fragment.data));
169            }
170        }
171
172        self.stats.misses.fetch_add(1, Ordering::Relaxed);
173
174        // Create new fragment
175        let data = create_fn()?;
176
177        // Check for dedup
178        if self.config.dedup_enabled {
179            let hash = PooledFragment::compute_hash(&data);
180            if let Some(existing_key) = self.hash_to_key.get(&hash) {
181                if let Some(existing) = self.fragments.get(existing_key.value()) {
182                    self.stats
183                        .dedup_savings
184                        .fetch_add(data.len() as u64, Ordering::Relaxed);
185                    return Ok(Arc::clone(&existing.data));
186                }
187            }
188        }
189
190        // Ensure space
191        self.ensure_space(data.len() as u64)?;
192
193        // Insert fragment
194        let fragment = PooledFragment::new(key, data);
195        let data_ref = Arc::clone(&fragment.data);
196        let hash = fragment.hash;
197
198        self.total_size.fetch_add(fragment.size, Ordering::SeqCst);
199        self.fragments.insert(key.to_string(), fragment);
200
201        if self.config.dedup_enabled {
202            self.hash_to_key.insert(hash, key.to_string());
203        }
204
205        Ok(data_ref)
206    }
207
208    /// Get fragment by key
209    pub fn get(&self, key: &str) -> Option<Arc<Vec<u8>>> {
210        if let Some(fragment) = self.fragments.get(key) {
211            if !fragment.is_expired(self.config.expiry_seconds) {
212                fragment.record_access();
213                self.stats.hits.fetch_add(1, Ordering::Relaxed);
214                return Some(Arc::clone(&fragment.data));
215            }
216        }
217        self.stats.misses.fetch_add(1, Ordering::Relaxed);
218        None
219    }
220
221    /// Insert fragment directly
222    pub fn insert(&self, key: impl Into<String>, data: Vec<u8>) -> Result<()> {
223        let size = data.len() as u64;
224        self.ensure_space(size)?;
225
226        let fragment = PooledFragment::new(key, data);
227        let hash = fragment.hash;
228        let key = fragment.key.clone();
229
230        self.total_size.fetch_add(fragment.size, Ordering::SeqCst);
231        self.fragments.insert(key.clone(), fragment);
232
233        if self.config.dedup_enabled {
234            self.hash_to_key.insert(hash, key);
235        }
236
237        Ok(())
238    }
239
240    /// Remove fragment
241    pub fn remove(&self, key: &str) -> Option<PooledFragment> {
242        if let Some((_, fragment)) = self.fragments.remove(key) {
243            self.total_size.fetch_sub(fragment.size, Ordering::SeqCst);
244            self.hash_to_key.remove(&fragment.hash);
245            Some(fragment)
246        } else {
247            None
248        }
249    }
250
251    /// Ensure space for new fragment
252    fn ensure_space(&self, required: u64) -> Result<()> {
253        // Check fragment count
254        while self.fragments.len() >= self.config.max_fragments {
255            self.evict_one()?;
256        }
257
258        // Check size
259        while self.total_size.load(Ordering::SeqCst) + required > self.config.max_size {
260            self.evict_one()?;
261        }
262
263        Ok(())
264    }
265
266    /// Evict one fragment (LRU)
267    fn evict_one(&self) -> Result<()> {
268        let mut oldest_key = None;
269        let mut oldest_time = None;
270
271        for entry in self.fragments.iter() {
272            let time = entry.value().last_accessed;
273            if oldest_time.is_none() || time < oldest_time.unwrap() {
274                oldest_key = Some(entry.key().clone());
275                oldest_time = Some(time);
276            }
277        }
278
279        if let Some(key) = oldest_key {
280            self.remove(&key);
281            self.stats.evictions.fetch_add(1, Ordering::Relaxed);
282            Ok(())
283        } else {
284            Err(ServerlessError::PoolError("No fragments to evict".into()))
285        }
286    }
287
288    /// Clear expired fragments
289    pub fn clear_expired(&self) -> usize {
290        let mut expired = Vec::new();
291
292        for entry in self.fragments.iter() {
293            if entry.value().is_expired(self.config.expiry_seconds) {
294                expired.push(entry.key().clone());
295            }
296        }
297
298        for key in &expired {
299            self.remove(key);
300        }
301
302        expired.len()
303    }
304
305    /// Clear all fragments
306    pub fn clear(&self) {
307        self.fragments.clear();
308        self.hash_to_key.clear();
309        self.total_size.store(0, Ordering::SeqCst);
310    }
311
312    /// Total size
313    pub fn total_size(&self) -> u64 {
314        self.total_size.load(Ordering::SeqCst)
315    }
316
317    /// Fragment count
318    pub fn len(&self) -> usize {
319        self.fragments.len()
320    }
321
322    /// Is empty
323    pub fn is_empty(&self) -> bool {
324        self.fragments.is_empty()
325    }
326
327    /// Get statistics
328    pub fn stats(&self) -> (u64, u64, u64, u64) {
329        (
330            self.stats.hits.load(Ordering::Relaxed),
331            self.stats.misses.load(Ordering::Relaxed),
332            self.stats.evictions.load(Ordering::Relaxed),
333            self.stats.dedup_savings.load(Ordering::Relaxed),
334        )
335    }
336
337    /// Hit rate
338    pub fn hit_rate(&self) -> f64 {
339        let hits = self.stats.hits.load(Ordering::Relaxed);
340        let misses = self.stats.misses.load(Ordering::Relaxed);
341        let total = hits + misses;
342
343        if total == 0 {
344            0.0
345        } else {
346            hits as f64 / total as f64
347        }
348    }
349}
350
351/// Pre-warmer for common fragments
352#[derive(Debug)]
353pub struct FragmentPrewarmer {
354    /// Pool to pre-warm
355    pool: Arc<FragmentPool>,
356    /// Pre-warm list
357    prewarm_list: Vec<PrewarmEntry>,
358}
359
360/// Pre-warm entry (internal to FragmentPrewarmer)
361#[derive(Debug, Clone)]
362#[allow(dead_code)]
363struct PrewarmEntry {
364    key: String,
365    priority: u32,
366    size_hint: u64,
367}
368
369impl FragmentPrewarmer {
370    /// Create new pre-warmer
371    pub fn new(pool: Arc<FragmentPool>) -> Self {
372        Self {
373            pool,
374            prewarm_list: Vec::new(),
375        }
376    }
377
378    /// Add fragment to pre-warm list
379    pub fn add(&mut self, key: impl Into<String>, priority: u32, size_hint: u64) {
380        self.prewarm_list.push(PrewarmEntry {
381            key: key.into(),
382            priority,
383            size_hint,
384        });
385    }
386
387    /// Sort by priority
388    pub fn sort(&mut self) {
389        self.prewarm_list
390            .sort_by(|a, b| b.priority.cmp(&a.priority));
391    }
392
393    /// Pre-warm fragments
394    pub async fn prewarm<F>(&self, loader: F) -> Result<usize>
395    where
396        F: Fn(&str) -> Result<Vec<u8>>,
397    {
398        let mut count = 0;
399
400        for entry in &self.prewarm_list {
401            if self.pool.get(&entry.key).is_none() {
402                let data = loader(&entry.key)?;
403                self.pool.insert(entry.key.clone(), data)?;
404                count += 1;
405            }
406        }
407
408        Ok(count)
409    }
410
411    /// Get pre-warm list size
412    pub fn list_size(&self) -> usize {
413        self.prewarm_list.len()
414    }
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420
421    #[test]
422    fn test_config_default() {
423        let config = PoolConfig::default();
424        assert_eq!(config.max_size, 512 * 1024 * 1024);
425        assert!(config.dedup_enabled);
426    }
427
428    #[test]
429    fn test_pooled_fragment() {
430        let fragment = PooledFragment::new("test", vec![1, 2, 3, 4]);
431
432        assert_eq!(fragment.key, "test");
433        assert_eq!(fragment.size, 4);
434        assert_eq!(fragment.get_access_count(), 1);
435
436        fragment.record_access();
437        assert_eq!(fragment.get_access_count(), 2);
438    }
439
440    #[test]
441    fn test_pool_creation() {
442        let config = PoolConfig::default();
443        let pool = FragmentPool::new(config);
444
445        assert!(pool.is_empty());
446        assert_eq!(pool.total_size(), 0);
447    }
448
449    #[test]
450    fn test_pool_insert_get() {
451        let config = PoolConfig::default();
452        let pool = FragmentPool::new(config);
453
454        pool.insert("key1", vec![1, 2, 3, 4]).unwrap();
455        assert_eq!(pool.len(), 1);
456        assert_eq!(pool.total_size(), 4);
457
458        let data = pool.get("key1").unwrap();
459        assert_eq!(*data, vec![1, 2, 3, 4]);
460
461        assert!(pool.get("nonexistent").is_none());
462    }
463
464    #[test]
465    fn test_pool_dedup() {
466        let config = PoolConfig {
467            dedup_enabled: true,
468            ..Default::default()
469        };
470        let pool = FragmentPool::new(config);
471
472        // Insert same data with different keys
473        let data = vec![1, 2, 3, 4];
474        pool.insert("key1", data.clone()).unwrap();
475
476        let result = pool.get_or_create("key2", || Ok(data.clone()));
477        assert!(result.is_ok());
478
479        // Should have dedup savings
480        let (_, _, _, dedup) = pool.stats();
481        assert!(dedup > 0);
482    }
483
484    #[test]
485    fn test_pool_eviction() {
486        let config = PoolConfig {
487            max_fragments: 2,
488            ..Default::default()
489        };
490        let pool = FragmentPool::new(config);
491
492        pool.insert("key1", vec![1]).unwrap();
493        pool.insert("key2", vec![2]).unwrap();
494        pool.insert("key3", vec![3]).unwrap();
495
496        // Should have evicted one
497        assert_eq!(pool.len(), 2);
498
499        let (_, _, evictions, _) = pool.stats();
500        assert_eq!(evictions, 1);
501    }
502
503    #[test]
504    fn test_prewarmer() {
505        let config = PoolConfig::default();
506        let pool = Arc::new(FragmentPool::new(config));
507        let mut prewarmer = FragmentPrewarmer::new(Arc::clone(&pool));
508
509        prewarmer.add("layer0", 10, 1024);
510        prewarmer.add("layer1", 5, 1024);
511        prewarmer.sort();
512
513        assert_eq!(prewarmer.list_size(), 2);
514    }
515}