Skip to main content

mentedb_core/
limits.rs

1//! Resource limits and backpressure for MenteDB.
2
3use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use parking_lot::RwLock;
7
8use crate::error::{MenteError, MenteResult};
9use crate::types::AgentId;
10
11/// Configurable resource limits.
12#[derive(Debug, Clone)]
13pub struct ResourceLimits {
14    /// Maximum total memories (None = unlimited).
15    pub max_memories: Option<usize>,
16    /// Maximum total memory usage in bytes.
17    pub max_memory_bytes: Option<u64>,
18    /// Per-agent memory limit.
19    pub max_memories_per_agent: Option<usize>,
20    /// Maximum embedding dimensions (default: 4096).
21    pub max_embedding_dimensions: usize,
22    /// Maximum writes per second (None = unlimited).
23    pub write_rate_limit: Option<u32>,
24}
25
26impl Default for ResourceLimits {
27    fn default() -> Self {
28        Self {
29            max_memories: None,
30            max_memory_bytes: None,
31            max_memories_per_agent: None,
32            max_embedding_dimensions: 4096,
33            write_rate_limit: None,
34        }
35    }
36}
37
38/// Current resource usage snapshot.
39#[derive(Debug, Clone)]
40pub struct ResourceUsage {
41    /// Total number of stored memories.
42    pub total_memories: u64,
43    /// Total bytes used.
44    pub total_bytes: u64,
45    /// Configured memory limit.
46    pub limit_memories: Option<usize>,
47    /// Configured byte limit.
48    pub limit_bytes: Option<u64>,
49    /// Overall utilization percentage (0.0–100.0).
50    pub utilization_pct: f32,
51}
52
53/// Tracks resource usage and enforces limits.
54pub struct ResourceTracker {
55    limits: ResourceLimits,
56    current_memory_count: AtomicU64,
57    current_bytes: AtomicU64,
58    agent_counts: RwLock<HashMap<AgentId, u64>>,
59}
60
61impl ResourceTracker {
62    /// Create a new tracker with the given limits.
63    pub fn new(limits: ResourceLimits) -> Self {
64        Self {
65            limits,
66            current_memory_count: AtomicU64::new(0),
67            current_bytes: AtomicU64::new(0),
68            agent_counts: RwLock::new(HashMap::new()),
69        }
70    }
71
72    /// Check whether a write is allowed given current usage.
73    pub fn check_can_write(&self, agent_id: AgentId, estimated_bytes: u64) -> MenteResult<()> {
74        let count = self.current_memory_count.load(Ordering::Relaxed);
75        if let Some(max) = self.limits.max_memories
76            && count as usize >= max
77        {
78            return Err(MenteError::ResourceExhausted(format!(
79                "memory limit reached: {count}/{max}"
80            )));
81        }
82
83        let bytes = self.current_bytes.load(Ordering::Relaxed);
84        if let Some(max_bytes) = self.limits.max_memory_bytes
85            && bytes + estimated_bytes > max_bytes
86        {
87            return Err(MenteError::ResourceExhausted(format!(
88                "byte limit would be exceeded: {} + {} > {}",
89                bytes, estimated_bytes, max_bytes
90            )));
91        }
92
93        if let Some(per_agent) = self.limits.max_memories_per_agent {
94            let agent_map = self.agent_counts.read();
95            let agent_count = agent_map.get(&agent_id).copied().unwrap_or(0);
96            if agent_count as usize >= per_agent {
97                return Err(MenteError::ResourceExhausted(format!(
98                    "per-agent limit reached for {agent_id}: {agent_count}/{per_agent}"
99                )));
100            }
101        }
102
103        Ok(())
104    }
105
106    /// Record that a write occurred.
107    pub fn record_write(&self, agent_id: AgentId, bytes: u64) {
108        self.current_memory_count.fetch_add(1, Ordering::Relaxed);
109        self.current_bytes.fetch_add(bytes, Ordering::Relaxed);
110        let mut agent_map = self.agent_counts.write();
111        *agent_map.entry(agent_id).or_insert(0) += 1;
112    }
113
114    /// Record that a delete occurred.
115    pub fn record_delete(&self, agent_id: AgentId, bytes: u64) {
116        self.current_memory_count.fetch_sub(
117            1.min(self.current_memory_count.load(Ordering::Relaxed)),
118            Ordering::Relaxed,
119        );
120        let current = self.current_bytes.load(Ordering::Relaxed);
121        self.current_bytes
122            .fetch_sub(bytes.min(current), Ordering::Relaxed);
123        let mut agent_map = self.agent_counts.write();
124        if let Some(count) = agent_map.get_mut(&agent_id) {
125            *count = count.saturating_sub(1);
126        }
127    }
128
129    /// Get a snapshot of current resource usage.
130    pub fn usage(&self) -> ResourceUsage {
131        let total_memories = self.current_memory_count.load(Ordering::Relaxed);
132        let total_bytes = self.current_bytes.load(Ordering::Relaxed);
133
134        let utilization_pct = match (self.limits.max_memories, self.limits.max_memory_bytes) {
135            (Some(max_mem), Some(max_bytes)) => {
136                let mem_pct = (total_memories as f32 / max_mem as f32) * 100.0;
137                let byte_pct = (total_bytes as f32 / max_bytes as f32) * 100.0;
138                mem_pct.max(byte_pct)
139            }
140            (Some(max_mem), None) => (total_memories as f32 / max_mem as f32) * 100.0,
141            (None, Some(max_bytes)) => (total_bytes as f32 / max_bytes as f32) * 100.0,
142            (None, None) => 0.0,
143        };
144
145        ResourceUsage {
146            total_memories,
147            total_bytes,
148            limit_memories: self.limits.max_memories,
149            limit_bytes: self.limits.max_memory_bytes,
150            utilization_pct,
151        }
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158
159    #[test]
160    fn within_limits_succeeds() {
161        let tracker = ResourceTracker::new(ResourceLimits {
162            max_memories: Some(10),
163            max_memory_bytes: Some(10_000),
164            ..Default::default()
165        });
166        let agent = AgentId::new();
167        assert!(tracker.check_can_write(agent, 100).is_ok());
168        tracker.record_write(agent, 100);
169        assert_eq!(tracker.usage().total_memories, 1);
170        assert_eq!(tracker.usage().total_bytes, 100);
171    }
172
173    #[test]
174    fn exceeding_limit_returns_error() {
175        let tracker = ResourceTracker::new(ResourceLimits {
176            max_memories: Some(2),
177            ..Default::default()
178        });
179        let agent = AgentId::new();
180
181        tracker.record_write(agent, 50);
182        tracker.record_write(agent, 50);
183        let result = tracker.check_can_write(agent, 50);
184        assert!(result.is_err());
185        let err_msg = format!("{}", result.unwrap_err());
186        assert!(err_msg.contains("memory limit reached"));
187    }
188
189    #[test]
190    fn per_agent_limit_works() {
191        let tracker = ResourceTracker::new(ResourceLimits {
192            max_memories_per_agent: Some(1),
193            ..Default::default()
194        });
195        let agent_a = AgentId::new();
196        let agent_b = AgentId::new();
197
198        tracker.record_write(agent_a, 50);
199        // agent_a is now at limit
200        assert!(tracker.check_can_write(agent_a, 50).is_err());
201        // agent_b can still write
202        assert!(tracker.check_can_write(agent_b, 50).is_ok());
203    }
204}