streamkit_core/
resource_manager.rs

1// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
2//
3// SPDX-License-Identifier: MPL-2.0
4
5//! Resource management for plugins.
6//!
7//! This module provides centralized management of shared resources (primarily ML models)
8//! that can be expensive to load and should be shared across multiple node instances.
9//!
10//! # Key Features
11//!
12//! - **Automatic deduplication**: Resources are content-addressed by (plugin kind, params hash)
13//! - **Reference counting**: Resources are kept alive while any node uses them
14//! - **Configurable lifecycle**: Keep loaded until explicit unload, or use LRU eviction
15//! - **Thread-safe**: Safe to use from multiple pipelines concurrently
16//! - **Async initialization**: Resources can perform async I/O or blocking operations
17//!
18//! # Example
19//!
20//! ```rust,no_run
21//! use streamkit_core::resource_manager::{
22//!     ResourceManager, Resource, ResourcePolicy, ResourceKey
23//! };
24//! use std::sync::Arc;
25//!
26//! // Define a custom resource type
27//! struct MyModel {
28//!     data: Vec<f32>,
29//! }
30//!
31//! impl Resource for MyModel {
32//!     fn size_bytes(&self) -> usize {
33//!         self.data.len() * std::mem::size_of::<f32>()
34//!     }
35//!
36//!     fn resource_type(&self) -> &str {
37//!         "ml_model"
38//!     }
39//! }
40//!
41//! #[tokio::main]
42//! async fn main() {
43//!     // Create resource manager
44//!     let policy = ResourcePolicy {
45//!         keep_loaded: true,
46//!         max_memory_mb: None,
47//!     };
48//!     let manager = ResourceManager::new(policy);
49//!
50//!     // Get or create a resource using a ResourceKey
51//!     let key = ResourceKey::new("my_plugin", "param_hash");
52//!     let resource = manager.get_or_create(
53//!         key,
54//!         || async {
55//!             // Load model (runs only once per unique key)
56//!             Ok(Arc::new(MyModel { data: vec![0.0; 1000] }) as Arc<dyn Resource>)
57//!         }
58//!     ).await.unwrap();
59//! }
60//! ```
61
62use std::collections::HashMap;
63use std::fmt;
64use std::future::Future;
65use std::pin::Pin;
66use std::sync::Arc;
67use tokio::sync::Mutex;
68
69/// A resource that can be shared across multiple node instances.
70///
71/// Resources are typically expensive to create (ML models, GPU contexts, etc.)
72/// and benefit from sharing across multiple pipeline instances.
73pub trait Resource: Send + Sync {
74    /// Returns the approximate memory footprint in bytes.
75    /// Used for LRU eviction when memory limits are configured.
76    fn size_bytes(&self) -> usize;
77
78    /// Returns a human-readable type identifier (e.g., "ml_model", "gpu_context").
79    /// Used for observability and debugging.
80    fn resource_type(&self) -> &str;
81}
82
83/// Configuration policy for resource lifecycle management.
84#[derive(Debug, Clone)]
85pub struct ResourcePolicy {
86    /// If true, resources are kept loaded until explicitly unloaded or server shutdown.
87    /// If false, resources may be evicted based on other policies (e.g., LRU).
88    pub keep_loaded: bool,
89
90    /// Optional memory limit in megabytes. When exceeded, least-recently-used
91    /// resources are evicted until memory usage is below the limit.
92    /// Only applies when keep_loaded is false.
93    pub max_memory_mb: Option<usize>,
94}
95
96impl Default for ResourcePolicy {
97    fn default() -> Self {
98        Self { keep_loaded: true, max_memory_mb: None }
99    }
100}
101
102/// A unique key identifying a resource based on plugin kind and parameters.
103#[derive(Debug, Clone, PartialEq, Eq, Hash)]
104pub struct ResourceKey {
105    /// The plugin kind (e.g., "plugin::native::kokoro", "plugin::native::whisper")
106    pub plugin_kind: String,
107
108    /// Hash of canonicalized parameters that affect resource creation
109    pub params_hash: String,
110}
111
112impl ResourceKey {
113    /// Create a new resource key from plugin kind and params hash.
114    pub fn new(plugin_kind: impl Into<String>, params_hash: impl Into<String>) -> Self {
115        Self { plugin_kind: plugin_kind.into(), params_hash: params_hash.into() }
116    }
117}
118
119impl fmt::Display for ResourceKey {
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        write!(f, "{}:{}", self.plugin_kind, self.params_hash)
122    }
123}
124
125/// Entry in the resource cache with metadata for LRU eviction.
126struct ResourceEntry {
127    resource: Arc<dyn Resource>,
128    last_accessed: std::time::Instant,
129}
130
131/// Centralized manager for shared plugin resources.
132///
133/// The ResourceManager maintains a cache of resources that can be shared across
134/// multiple node instances. It handles lifecycle management, deduplication, and
135/// optional memory-based eviction.
136pub struct ResourceManager {
137    resources: Arc<Mutex<HashMap<ResourceKey, ResourceEntry>>>,
138    policy: ResourcePolicy,
139}
140
141impl ResourceManager {
142    /// Create a new ResourceManager with the specified policy.
143    pub fn new(policy: ResourcePolicy) -> Self {
144        Self { resources: Arc::new(Mutex::new(HashMap::new())), policy }
145    }
146
147    /// Get an existing resource or create it using the provided factory.
148    ///
149    /// If a resource with the given key already exists, it is returned immediately.
150    /// Otherwise, the factory is called to create a new resource, which is then
151    /// cached for future use.
152    ///
153    /// # Arguments
154    ///
155    /// * `key` - Unique identifier for the resource
156    /// * `factory` - Async function that creates the resource if needed
157    ///
158    /// # Errors
159    ///
160    /// Returns an error if the factory function fails to create the resource.
161    ///
162    /// # Example
163    ///
164    /// ```rust,no_run
165    /// use streamkit_core::resource_manager::{
166    ///     ResourceManager, Resource, ResourcePolicy, ResourceKey, ResourceError
167    /// };
168    /// use std::sync::Arc;
169    ///
170    /// struct MyModel { data: Vec<f32> }
171    ///
172    /// impl Resource for MyModel {
173    ///     fn size_bytes(&self) -> usize { self.data.len() * 4 }
174    ///     fn resource_type(&self) -> &str { "ml_model" }
175    /// }
176    ///
177    /// async fn example() -> Result<(), ResourceError> {
178    ///     let manager = ResourceManager::new(ResourcePolicy::default());
179    ///     let resource = manager.get_or_create(
180    ///         ResourceKey::new("my_plugin", "model_v1"),
181    ///         || async {
182    ///             Ok(Arc::new(MyModel { data: vec![0.0; 1000] }) as Arc<dyn Resource>)
183    ///         }
184    ///     ).await?;
185    ///     Ok(())
186    /// }
187    /// ```
188    pub async fn get_or_create<F, Fut>(
189        &self,
190        key: ResourceKey,
191        factory: F,
192    ) -> Result<Arc<dyn Resource>, ResourceError>
193    where
194        F: FnOnce() -> Fut,
195        Fut: Future<Output = Result<Arc<dyn Resource>, ResourceError>>,
196    {
197        // Fast path: resource already exists
198        {
199            let mut cache = self.resources.lock().await;
200            if let Some(entry) = cache.get_mut(&key) {
201                entry.last_accessed = std::time::Instant::now();
202                return Ok(entry.resource.clone());
203            }
204        }
205
206        // Slow path: create new resource
207        let resource = factory().await?;
208
209        // Check if we need to evict resources due to memory limit
210        if !self.policy.keep_loaded {
211            if let Some(max_mb) = self.policy.max_memory_mb {
212                self.evict_if_needed(max_mb, resource.size_bytes()).await;
213            }
214        }
215
216        // Insert into cache
217        let mut cache = self.resources.lock().await;
218
219        // Double-check: another task might have created it while we were waiting
220        if let Some(entry) = cache.get_mut(&key) {
221            entry.last_accessed = std::time::Instant::now();
222            return Ok(entry.resource.clone());
223        }
224
225        let entry =
226            ResourceEntry { resource: resource.clone(), last_accessed: std::time::Instant::now() };
227        cache.insert(key, entry);
228        drop(cache);
229
230        Ok(resource)
231    }
232
233    /// Evict least-recently-used resources until memory usage is below the limit.
234    ///
235    /// This method minimizes lock contention by:
236    /// 1. Taking a short lock to collect metadata and calculate eviction candidates
237    /// 2. Sorting candidates outside the lock (O(n log n) operation)
238    /// 3. Re-acquiring the lock only to perform the actual removals
239    async fn evict_if_needed(&self, max_mb: usize, new_resource_bytes: usize) {
240        let max_bytes = max_mb * 1024 * 1024;
241
242        // Phase 1: Collect metadata under lock (fast)
243        let (current_bytes, entries) = {
244            let cache = self.resources.lock().await;
245            let current_bytes: usize = cache.values().map(|e| e.resource.size_bytes()).sum();
246
247            if current_bytes + new_resource_bytes <= max_bytes {
248                return; // No eviction needed
249            }
250
251            // Collect entry metadata for sorting (clone keys, copy timestamps and sizes)
252            let entries: Vec<_> = cache
253                .iter()
254                .map(|(k, v)| (k.clone(), v.last_accessed, v.resource.size_bytes()))
255                .collect();
256
257            // Explicitly drop lock before returning to avoid clippy::significant_drop_tightening
258            drop(cache);
259
260            (current_bytes, entries)
261        };
262
263        // Phase 2: Sort outside the lock (potentially expensive for large caches)
264        let mut entries = entries;
265        entries.sort_by_key(|(_, accessed, _)| *accessed);
266
267        // Phase 3: Determine which keys to evict (no lock needed)
268        let target_freed = (current_bytes + new_resource_bytes).saturating_sub(max_bytes);
269        let mut bytes_to_free = 0;
270        let keys_to_evict: Vec<_> = entries
271            .into_iter()
272            .take_while(|(_, _, size)| {
273                if bytes_to_free >= target_freed {
274                    return false;
275                }
276                bytes_to_free += size;
277                true
278            })
279            .map(|(key, _, size)| (key, size))
280            .collect();
281
282        if keys_to_evict.is_empty() {
283            return;
284        }
285
286        // Phase 4: Perform evictions under lock (fast - just HashMap removals)
287        {
288            let mut cache = self.resources.lock().await;
289            for (key, size) in keys_to_evict {
290                // Re-check that the key still exists (may have been removed by another task)
291                if cache.remove(&key).is_some() {
292                    tracing::info!(
293                        "Evicting resource {} ({} bytes) due to memory limit",
294                        key,
295                        size
296                    );
297                }
298            }
299        }
300    }
301
302    /// Explicitly unload a resource by key.
303    ///
304    /// This removes the resource from the cache. If other node instances still hold
305    /// references to the resource, it will remain in memory until they drop their
306    /// references.
307    ///
308    /// # Errors
309    ///
310    /// Returns an error if the resource key is not found in the cache.
311    pub async fn unload(&self, key: &ResourceKey) -> Result<(), ResourceError> {
312        let mut cache = self.resources.lock().await;
313        if cache.remove(key).is_some() {
314            tracing::info!("Unloaded resource: {}", key);
315            Ok(())
316        } else {
317            Err(ResourceError::NotFound(key.clone()))
318        }
319    }
320
321    /// Get statistics about currently loaded resources.
322    pub async fn stats(&self) -> ResourceStats {
323        let cache = self.resources.lock().await;
324
325        let total_size_bytes: usize = cache.values().map(|e| e.resource.size_bytes()).sum();
326        let resource_types: HashMap<String, usize> =
327            cache.values().fold(HashMap::new(), |mut acc, entry| {
328                *acc.entry(entry.resource.resource_type().to_string()).or_insert(0) += 1;
329                acc
330            });
331
332        ResourceStats { total_resources: cache.len(), total_size_bytes, resource_types }
333    }
334
335    /// Clear all cached resources.
336    ///
337    /// This removes all resources from the cache. Resources that are still in use
338    /// by node instances will remain in memory until dropped.
339    pub async fn clear(&self) {
340        let mut cache = self.resources.lock().await;
341        let count = cache.len();
342        cache.clear();
343        drop(cache);
344        tracing::info!("Cleared {} resources from cache", count);
345    }
346}
347
348/// Statistics about currently loaded resources.
349#[derive(Debug, Clone)]
350pub struct ResourceStats {
351    /// Total number of cached resources
352    pub total_resources: usize,
353
354    /// Total memory footprint in bytes
355    pub total_size_bytes: usize,
356
357    /// Count of resources by type
358    pub resource_types: HashMap<String, usize>,
359}
360
361/// Errors that can occur during resource management.
362#[derive(Debug, thiserror::Error)]
363pub enum ResourceError {
364    #[error("Resource not found: {0}")]
365    NotFound(ResourceKey),
366
367    #[error("Resource initialization failed: {0}")]
368    InitializationFailed(String),
369
370    #[error("Resource error: {0}")]
371    Other(String),
372}
373
374/// Type alias for boxed async resource factories.
375pub type ResourceFactory = Arc<
376    dyn Fn() -> Pin<Box<dyn Future<Output = Result<Arc<dyn Resource>, ResourceError>> + Send>>
377        + Send
378        + Sync,
379>;
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384
385    struct TestResource {
386        size: usize,
387    }
388
389    impl Resource for TestResource {
390        fn size_bytes(&self) -> usize {
391            self.size
392        }
393
394        fn resource_type(&self) -> &'static str {
395            "test_resource"
396        }
397    }
398
399    #[tokio::test]
400    #[allow(clippy::unwrap_used)]
401    async fn test_resource_deduplication() {
402        let manager = ResourceManager::new(ResourcePolicy::default());
403        let key = ResourceKey::new("test", "params1");
404
405        let mut create_count = 0;
406
407        // First call should create resource
408        let r1 = manager
409            .get_or_create(key.clone(), || async {
410                create_count += 1;
411                Ok(Arc::new(TestResource { size: 1000 }) as Arc<dyn Resource>)
412            })
413            .await
414            .unwrap();
415
416        // Second call should reuse cached resource
417        let r2 = manager
418            .get_or_create(key.clone(), || async {
419                create_count += 1;
420                Ok(Arc::new(TestResource { size: 1000 }) as Arc<dyn Resource>)
421            })
422            .await
423            .unwrap();
424
425        assert_eq!(create_count, 1, "Resource should only be created once");
426        assert!(Arc::ptr_eq(&r1, &r2), "Should return same Arc instance");
427    }
428
429    #[tokio::test]
430    #[allow(clippy::unwrap_used)]
431    async fn test_lru_eviction() {
432        let policy = ResourcePolicy {
433            keep_loaded: false,
434            max_memory_mb: Some(1), // 1 MB limit
435        };
436        let manager = ResourceManager::new(policy);
437
438        // Create three 500KB resources
439        let _r1 = manager
440            .get_or_create(ResourceKey::new("test", "1"), || async {
441                Ok(Arc::new(TestResource { size: 500_000 }) as Arc<dyn Resource>)
442            })
443            .await
444            .unwrap();
445
446        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
447
448        let _r2 = manager
449            .get_or_create(ResourceKey::new("test", "2"), || async {
450                Ok(Arc::new(TestResource { size: 500_000 }) as Arc<dyn Resource>)
451            })
452            .await
453            .unwrap();
454
455        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
456
457        // This should trigger eviction of r1 (oldest)
458        let _r3 = manager
459            .get_or_create(ResourceKey::new("test", "3"), || async {
460                Ok(Arc::new(TestResource { size: 500_000 }) as Arc<dyn Resource>)
461            })
462            .await
463            .unwrap();
464
465        let stats = manager.stats().await;
466        assert!(
467            stats.total_size_bytes <= 1_048_576,
468            "Total size should be under 1MB after eviction"
469        );
470    }
471
472    #[tokio::test]
473    #[allow(clippy::unwrap_used)]
474    async fn test_stats() {
475        let manager = ResourceManager::new(ResourcePolicy::default());
476
477        manager
478            .get_or_create(ResourceKey::new("test", "1"), || async {
479                Ok(Arc::new(TestResource { size: 1000 }) as Arc<dyn Resource>)
480            })
481            .await
482            .unwrap();
483
484        manager
485            .get_or_create(ResourceKey::new("test", "2"), || async {
486                Ok(Arc::new(TestResource { size: 2000 }) as Arc<dyn Resource>)
487            })
488            .await
489            .unwrap();
490
491        let stats = manager.stats().await;
492        assert_eq!(stats.total_resources, 2);
493        assert_eq!(stats.total_size_bytes, 3000);
494        assert_eq!(stats.resource_types.get("test_resource"), Some(&2));
495    }
496
497    #[tokio::test]
498    #[allow(clippy::unwrap_used)]
499    async fn test_unload() {
500        let manager = ResourceManager::new(ResourcePolicy::default());
501        let key = ResourceKey::new("test", "1");
502
503        manager
504            .get_or_create(key.clone(), || async {
505                Ok(Arc::new(TestResource { size: 1000 }) as Arc<dyn Resource>)
506            })
507            .await
508            .unwrap();
509
510        manager.unload(&key).await.unwrap();
511
512        let stats = manager.stats().await;
513        assert_eq!(stats.total_resources, 0);
514    }
515}