streamkit_core/resource_manager.rs
1// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
2//
3// SPDX-License-Identifier: MPL-2.0
4
5//! Resource management for plugins.
6//!
7//! This module provides centralized management of shared resources (primarily ML models)
8//! that can be expensive to load and should be shared across multiple node instances.
9//!
10//! # Key Features
11//!
12//! - **Automatic deduplication**: Resources are content-addressed by (plugin kind, params hash)
13//! - **Reference counting**: Resources are kept alive while any node uses them
14//! - **Configurable lifecycle**: Keep loaded until explicit unload, or use LRU eviction
15//! - **Thread-safe**: Safe to use from multiple pipelines concurrently
16//! - **Async initialization**: Resources can perform async I/O or blocking operations
17//!
18//! # Example
19//!
20//! ```rust,no_run
21//! use streamkit_core::resource_manager::{
22//! ResourceManager, Resource, ResourcePolicy, ResourceKey
23//! };
24//! use std::sync::Arc;
25//!
26//! // Define a custom resource type
27//! struct MyModel {
28//! data: Vec<f32>,
29//! }
30//!
31//! impl Resource for MyModel {
32//! fn size_bytes(&self) -> usize {
33//! self.data.len() * std::mem::size_of::<f32>()
34//! }
35//!
36//! fn resource_type(&self) -> &str {
37//! "ml_model"
38//! }
39//! }
40//!
41//! #[tokio::main]
42//! async fn main() {
43//! // Create resource manager
44//! let policy = ResourcePolicy {
45//! keep_loaded: true,
46//! max_memory_mb: None,
47//! };
48//! let manager = ResourceManager::new(policy);
49//!
50//! // Get or create a resource using a ResourceKey
51//! let key = ResourceKey::new("my_plugin", "param_hash");
52//! let resource = manager.get_or_create(
53//! key,
54//! || async {
55//! // Load model (runs only once per unique key)
56//! Ok(Arc::new(MyModel { data: vec![0.0; 1000] }) as Arc<dyn Resource>)
57//! }
58//! ).await.unwrap();
59//! }
60//! ```
61
62use std::collections::HashMap;
63use std::fmt;
64use std::future::Future;
65use std::pin::Pin;
66use std::sync::Arc;
67use tokio::sync::Mutex;
68
69/// A resource that can be shared across multiple node instances.
70///
71/// Resources are typically expensive to create (ML models, GPU contexts, etc.)
72/// and benefit from sharing across multiple pipeline instances.
73pub trait Resource: Send + Sync {
74 /// Returns the approximate memory footprint in bytes.
75 /// Used for LRU eviction when memory limits are configured.
76 fn size_bytes(&self) -> usize;
77
78 /// Returns a human-readable type identifier (e.g., "ml_model", "gpu_context").
79 /// Used for observability and debugging.
80 fn resource_type(&self) -> &str;
81}
82
83/// Configuration policy for resource lifecycle management.
84#[derive(Debug, Clone)]
85pub struct ResourcePolicy {
86 /// If true, resources are kept loaded until explicitly unloaded or server shutdown.
87 /// If false, resources may be evicted based on other policies (e.g., LRU).
88 pub keep_loaded: bool,
89
90 /// Optional memory limit in megabytes. When exceeded, least-recently-used
91 /// resources are evicted until memory usage is below the limit.
92 /// Only applies when keep_loaded is false.
93 pub max_memory_mb: Option<usize>,
94}
95
96impl Default for ResourcePolicy {
97 fn default() -> Self {
98 Self { keep_loaded: true, max_memory_mb: None }
99 }
100}
101
102/// A unique key identifying a resource based on plugin kind and parameters.
103#[derive(Debug, Clone, PartialEq, Eq, Hash)]
104pub struct ResourceKey {
105 /// The plugin kind (e.g., "plugin::native::kokoro", "plugin::native::whisper")
106 pub plugin_kind: String,
107
108 /// Hash of canonicalized parameters that affect resource creation
109 pub params_hash: String,
110}
111
112impl ResourceKey {
113 /// Create a new resource key from plugin kind and params hash.
114 pub fn new(plugin_kind: impl Into<String>, params_hash: impl Into<String>) -> Self {
115 Self { plugin_kind: plugin_kind.into(), params_hash: params_hash.into() }
116 }
117}
118
119impl fmt::Display for ResourceKey {
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 write!(f, "{}:{}", self.plugin_kind, self.params_hash)
122 }
123}
124
125/// Entry in the resource cache with metadata for LRU eviction.
126struct ResourceEntry {
127 resource: Arc<dyn Resource>,
128 last_accessed: std::time::Instant,
129}
130
131/// Centralized manager for shared plugin resources.
132///
133/// The ResourceManager maintains a cache of resources that can be shared across
134/// multiple node instances. It handles lifecycle management, deduplication, and
135/// optional memory-based eviction.
136pub struct ResourceManager {
137 resources: Arc<Mutex<HashMap<ResourceKey, ResourceEntry>>>,
138 policy: ResourcePolicy,
139}
140
141impl ResourceManager {
142 /// Create a new ResourceManager with the specified policy.
143 pub fn new(policy: ResourcePolicy) -> Self {
144 Self { resources: Arc::new(Mutex::new(HashMap::new())), policy }
145 }
146
147 /// Get an existing resource or create it using the provided factory.
148 ///
149 /// If a resource with the given key already exists, it is returned immediately.
150 /// Otherwise, the factory is called to create a new resource, which is then
151 /// cached for future use.
152 ///
153 /// # Arguments
154 ///
155 /// * `key` - Unique identifier for the resource
156 /// * `factory` - Async function that creates the resource if needed
157 ///
158 /// # Errors
159 ///
160 /// Returns an error if the factory function fails to create the resource.
161 ///
162 /// # Example
163 ///
164 /// ```rust,no_run
165 /// use streamkit_core::resource_manager::{
166 /// ResourceManager, Resource, ResourcePolicy, ResourceKey, ResourceError
167 /// };
168 /// use std::sync::Arc;
169 ///
170 /// struct MyModel { data: Vec<f32> }
171 ///
172 /// impl Resource for MyModel {
173 /// fn size_bytes(&self) -> usize { self.data.len() * 4 }
174 /// fn resource_type(&self) -> &str { "ml_model" }
175 /// }
176 ///
177 /// async fn example() -> Result<(), ResourceError> {
178 /// let manager = ResourceManager::new(ResourcePolicy::default());
179 /// let resource = manager.get_or_create(
180 /// ResourceKey::new("my_plugin", "model_v1"),
181 /// || async {
182 /// Ok(Arc::new(MyModel { data: vec![0.0; 1000] }) as Arc<dyn Resource>)
183 /// }
184 /// ).await?;
185 /// Ok(())
186 /// }
187 /// ```
188 pub async fn get_or_create<F, Fut>(
189 &self,
190 key: ResourceKey,
191 factory: F,
192 ) -> Result<Arc<dyn Resource>, ResourceError>
193 where
194 F: FnOnce() -> Fut,
195 Fut: Future<Output = Result<Arc<dyn Resource>, ResourceError>>,
196 {
197 // Fast path: resource already exists
198 {
199 let mut cache = self.resources.lock().await;
200 if let Some(entry) = cache.get_mut(&key) {
201 entry.last_accessed = std::time::Instant::now();
202 return Ok(entry.resource.clone());
203 }
204 }
205
206 // Slow path: create new resource
207 let resource = factory().await?;
208
209 // Check if we need to evict resources due to memory limit
210 if !self.policy.keep_loaded {
211 if let Some(max_mb) = self.policy.max_memory_mb {
212 self.evict_if_needed(max_mb, resource.size_bytes()).await;
213 }
214 }
215
216 // Insert into cache
217 let mut cache = self.resources.lock().await;
218
219 // Double-check: another task might have created it while we were waiting
220 if let Some(entry) = cache.get_mut(&key) {
221 entry.last_accessed = std::time::Instant::now();
222 return Ok(entry.resource.clone());
223 }
224
225 let entry =
226 ResourceEntry { resource: resource.clone(), last_accessed: std::time::Instant::now() };
227 cache.insert(key, entry);
228 drop(cache);
229
230 Ok(resource)
231 }
232
233 /// Evict least-recently-used resources until memory usage is below the limit.
234 ///
235 /// This method minimizes lock contention by:
236 /// 1. Taking a short lock to collect metadata and calculate eviction candidates
237 /// 2. Sorting candidates outside the lock (O(n log n) operation)
238 /// 3. Re-acquiring the lock only to perform the actual removals
239 async fn evict_if_needed(&self, max_mb: usize, new_resource_bytes: usize) {
240 let max_bytes = max_mb * 1024 * 1024;
241
242 // Phase 1: Collect metadata under lock (fast)
243 let (current_bytes, entries) = {
244 let cache = self.resources.lock().await;
245 let current_bytes: usize = cache.values().map(|e| e.resource.size_bytes()).sum();
246
247 if current_bytes + new_resource_bytes <= max_bytes {
248 return; // No eviction needed
249 }
250
251 // Collect entry metadata for sorting (clone keys, copy timestamps and sizes)
252 let entries: Vec<_> = cache
253 .iter()
254 .map(|(k, v)| (k.clone(), v.last_accessed, v.resource.size_bytes()))
255 .collect();
256
257 // Explicitly drop lock before returning to avoid clippy::significant_drop_tightening
258 drop(cache);
259
260 (current_bytes, entries)
261 };
262
263 // Phase 2: Sort outside the lock (potentially expensive for large caches)
264 let mut entries = entries;
265 entries.sort_by_key(|(_, accessed, _)| *accessed);
266
267 // Phase 3: Determine which keys to evict (no lock needed)
268 let target_freed = (current_bytes + new_resource_bytes).saturating_sub(max_bytes);
269 let mut bytes_to_free = 0;
270 let keys_to_evict: Vec<_> = entries
271 .into_iter()
272 .take_while(|(_, _, size)| {
273 if bytes_to_free >= target_freed {
274 return false;
275 }
276 bytes_to_free += size;
277 true
278 })
279 .map(|(key, _, size)| (key, size))
280 .collect();
281
282 if keys_to_evict.is_empty() {
283 return;
284 }
285
286 // Phase 4: Perform evictions under lock (fast - just HashMap removals)
287 {
288 let mut cache = self.resources.lock().await;
289 for (key, size) in keys_to_evict {
290 // Re-check that the key still exists (may have been removed by another task)
291 if cache.remove(&key).is_some() {
292 tracing::info!(
293 "Evicting resource {} ({} bytes) due to memory limit",
294 key,
295 size
296 );
297 }
298 }
299 }
300 }
301
302 /// Explicitly unload a resource by key.
303 ///
304 /// This removes the resource from the cache. If other node instances still hold
305 /// references to the resource, it will remain in memory until they drop their
306 /// references.
307 ///
308 /// # Errors
309 ///
310 /// Returns an error if the resource key is not found in the cache.
311 pub async fn unload(&self, key: &ResourceKey) -> Result<(), ResourceError> {
312 let mut cache = self.resources.lock().await;
313 if cache.remove(key).is_some() {
314 tracing::info!("Unloaded resource: {}", key);
315 Ok(())
316 } else {
317 Err(ResourceError::NotFound(key.clone()))
318 }
319 }
320
321 /// Get statistics about currently loaded resources.
322 pub async fn stats(&self) -> ResourceStats {
323 let cache = self.resources.lock().await;
324
325 let total_size_bytes: usize = cache.values().map(|e| e.resource.size_bytes()).sum();
326 let resource_types: HashMap<String, usize> =
327 cache.values().fold(HashMap::new(), |mut acc, entry| {
328 *acc.entry(entry.resource.resource_type().to_string()).or_insert(0) += 1;
329 acc
330 });
331
332 ResourceStats { total_resources: cache.len(), total_size_bytes, resource_types }
333 }
334
335 /// Clear all cached resources.
336 ///
337 /// This removes all resources from the cache. Resources that are still in use
338 /// by node instances will remain in memory until dropped.
339 pub async fn clear(&self) {
340 let mut cache = self.resources.lock().await;
341 let count = cache.len();
342 cache.clear();
343 drop(cache);
344 tracing::info!("Cleared {} resources from cache", count);
345 }
346}
347
348/// Statistics about currently loaded resources.
349#[derive(Debug, Clone)]
350pub struct ResourceStats {
351 /// Total number of cached resources
352 pub total_resources: usize,
353
354 /// Total memory footprint in bytes
355 pub total_size_bytes: usize,
356
357 /// Count of resources by type
358 pub resource_types: HashMap<String, usize>,
359}
360
361/// Errors that can occur during resource management.
362#[derive(Debug, thiserror::Error)]
363pub enum ResourceError {
364 #[error("Resource not found: {0}")]
365 NotFound(ResourceKey),
366
367 #[error("Resource initialization failed: {0}")]
368 InitializationFailed(String),
369
370 #[error("Resource error: {0}")]
371 Other(String),
372}
373
374/// Type alias for boxed async resource factories.
375pub type ResourceFactory = Arc<
376 dyn Fn() -> Pin<Box<dyn Future<Output = Result<Arc<dyn Resource>, ResourceError>> + Send>>
377 + Send
378 + Sync,
379>;
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384
385 struct TestResource {
386 size: usize,
387 }
388
389 impl Resource for TestResource {
390 fn size_bytes(&self) -> usize {
391 self.size
392 }
393
394 fn resource_type(&self) -> &'static str {
395 "test_resource"
396 }
397 }
398
399 #[tokio::test]
400 #[allow(clippy::unwrap_used)]
401 async fn test_resource_deduplication() {
402 let manager = ResourceManager::new(ResourcePolicy::default());
403 let key = ResourceKey::new("test", "params1");
404
405 let mut create_count = 0;
406
407 // First call should create resource
408 let r1 = manager
409 .get_or_create(key.clone(), || async {
410 create_count += 1;
411 Ok(Arc::new(TestResource { size: 1000 }) as Arc<dyn Resource>)
412 })
413 .await
414 .unwrap();
415
416 // Second call should reuse cached resource
417 let r2 = manager
418 .get_or_create(key.clone(), || async {
419 create_count += 1;
420 Ok(Arc::new(TestResource { size: 1000 }) as Arc<dyn Resource>)
421 })
422 .await
423 .unwrap();
424
425 assert_eq!(create_count, 1, "Resource should only be created once");
426 assert!(Arc::ptr_eq(&r1, &r2), "Should return same Arc instance");
427 }
428
429 #[tokio::test]
430 #[allow(clippy::unwrap_used)]
431 async fn test_lru_eviction() {
432 let policy = ResourcePolicy {
433 keep_loaded: false,
434 max_memory_mb: Some(1), // 1 MB limit
435 };
436 let manager = ResourceManager::new(policy);
437
438 // Create three 500KB resources
439 let _r1 = manager
440 .get_or_create(ResourceKey::new("test", "1"), || async {
441 Ok(Arc::new(TestResource { size: 500_000 }) as Arc<dyn Resource>)
442 })
443 .await
444 .unwrap();
445
446 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
447
448 let _r2 = manager
449 .get_or_create(ResourceKey::new("test", "2"), || async {
450 Ok(Arc::new(TestResource { size: 500_000 }) as Arc<dyn Resource>)
451 })
452 .await
453 .unwrap();
454
455 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
456
457 // This should trigger eviction of r1 (oldest)
458 let _r3 = manager
459 .get_or_create(ResourceKey::new("test", "3"), || async {
460 Ok(Arc::new(TestResource { size: 500_000 }) as Arc<dyn Resource>)
461 })
462 .await
463 .unwrap();
464
465 let stats = manager.stats().await;
466 assert!(
467 stats.total_size_bytes <= 1_048_576,
468 "Total size should be under 1MB after eviction"
469 );
470 }
471
472 #[tokio::test]
473 #[allow(clippy::unwrap_used)]
474 async fn test_stats() {
475 let manager = ResourceManager::new(ResourcePolicy::default());
476
477 manager
478 .get_or_create(ResourceKey::new("test", "1"), || async {
479 Ok(Arc::new(TestResource { size: 1000 }) as Arc<dyn Resource>)
480 })
481 .await
482 .unwrap();
483
484 manager
485 .get_or_create(ResourceKey::new("test", "2"), || async {
486 Ok(Arc::new(TestResource { size: 2000 }) as Arc<dyn Resource>)
487 })
488 .await
489 .unwrap();
490
491 let stats = manager.stats().await;
492 assert_eq!(stats.total_resources, 2);
493 assert_eq!(stats.total_size_bytes, 3000);
494 assert_eq!(stats.resource_types.get("test_resource"), Some(&2));
495 }
496
497 #[tokio::test]
498 #[allow(clippy::unwrap_used)]
499 async fn test_unload() {
500 let manager = ResourceManager::new(ResourcePolicy::default());
501 let key = ResourceKey::new("test", "1");
502
503 manager
504 .get_or_create(key.clone(), || async {
505 Ok(Arc::new(TestResource { size: 1000 }) as Arc<dyn Resource>)
506 })
507 .await
508 .unwrap();
509
510 manager.unload(&key).await.unwrap();
511
512 let stats = manager.stats().await;
513 assert_eq!(stats.total_resources, 0);
514 }
515}