1use common::{
2 DakeraError, NamespaceId, QuotaCheckResult, QuotaConfig, QuotaEnforcement, QuotaStatus,
3 QuotaUsage, Result, Vector,
4};
5use parking_lot::RwLock;
6use std::collections::HashMap;
7use std::sync::Arc;
8
9pub struct QuotaManager {
11 configs: RwLock<HashMap<NamespaceId, QuotaConfig>>,
13 usage: RwLock<HashMap<NamespaceId, QuotaUsage>>,
15 default_config: RwLock<Option<QuotaConfig>>,
17}
18
19impl QuotaManager {
20 pub fn new() -> Self {
22 Self {
23 configs: RwLock::new(HashMap::new()),
24 usage: RwLock::new(HashMap::new()),
25 default_config: RwLock::new(None),
26 }
27 }
28
29 pub fn with_default(default_config: QuotaConfig) -> Self {
31 Self {
32 configs: RwLock::new(HashMap::new()),
33 usage: RwLock::new(HashMap::new()),
34 default_config: RwLock::new(Some(default_config)),
35 }
36 }
37
38 pub fn set_default_config(&self, config: Option<QuotaConfig>) {
40 *self.default_config.write() = config;
41 }
42
43 pub fn get_default_config(&self) -> Option<QuotaConfig> {
45 self.default_config.read().clone()
46 }
47
48 pub fn set_quota(&self, namespace: &NamespaceId, config: QuotaConfig) {
50 self.configs.write().insert(namespace.clone(), config);
51 tracing::info!(namespace = %namespace, "Quota configuration updated");
52 }
53
54 pub fn get_quota(&self, namespace: &NamespaceId) -> Option<QuotaConfig> {
56 self.configs
57 .read()
58 .get(namespace)
59 .cloned()
60 .or_else(|| self.default_config.read().clone())
61 }
62
63 pub fn remove_quota(&self, namespace: &NamespaceId) -> Option<QuotaConfig> {
65 self.configs.write().remove(namespace)
66 }
67
68 pub fn init_usage(&self, namespace: &NamespaceId, vector_count: u64, storage_bytes: u64) {
70 let usage = QuotaUsage::new(vector_count, storage_bytes);
71 self.usage.write().insert(namespace.clone(), usage);
72
73 if !self.configs.read().contains_key(namespace) {
75 if let Some(default) = self.default_config.read().clone() {
76 self.configs.write().insert(namespace.clone(), default);
77 }
78 }
79 }
80
81 pub fn get_usage(&self, namespace: &NamespaceId) -> Option<QuotaUsage> {
83 self.usage.read().get(namespace).cloned()
84 }
85
86 pub fn update_usage(&self, namespace: &NamespaceId, vector_count: u64, storage_bytes: u64) {
88 let mut usage_map = self.usage.write();
89 if let Some(usage) = usage_map.get_mut(namespace) {
90 usage.vector_count = vector_count;
91 usage.storage_bytes = storage_bytes;
92 usage.touch();
93 } else {
94 usage_map.insert(
95 namespace.clone(),
96 QuotaUsage::new(vector_count, storage_bytes),
97 );
98 }
99 }
100
101 pub fn increment_usage(&self, namespace: &NamespaceId, added_vectors: u64, added_bytes: u64) {
103 let mut usage_map = self.usage.write();
104 if let Some(usage) = usage_map.get_mut(namespace) {
105 usage.vector_count += added_vectors;
106 usage.storage_bytes += added_bytes;
107 usage.touch();
108 } else {
109 usage_map.insert(
110 namespace.clone(),
111 QuotaUsage::new(added_vectors, added_bytes),
112 );
113 }
114 }
115
116 pub fn decrement_usage(
118 &self,
119 namespace: &NamespaceId,
120 removed_vectors: u64,
121 removed_bytes: u64,
122 ) {
123 let mut usage_map = self.usage.write();
124 if let Some(usage) = usage_map.get_mut(namespace) {
125 usage.vector_count = usage.vector_count.saturating_sub(removed_vectors);
126 usage.storage_bytes = usage.storage_bytes.saturating_sub(removed_bytes);
127 usage.touch();
128 }
129 }
130
131 pub fn remove_usage(&self, namespace: &NamespaceId) {
133 self.usage.write().remove(namespace);
134 }
135
136 pub fn get_status(&self, namespace: &NamespaceId) -> Option<QuotaStatus> {
138 let config = self.get_quota(namespace)?;
139 let usage = self.get_usage(namespace).unwrap_or_default();
140 Some(QuotaStatus::new(namespace.clone(), config, usage))
141 }
142
143 pub fn get_all_status(&self) -> Vec<QuotaStatus> {
145 let configs = self.configs.read();
146 let usage = self.usage.read();
147
148 let mut namespaces: std::collections::HashSet<_> = configs.keys().cloned().collect();
150 namespaces.extend(usage.keys().cloned());
151
152 namespaces
153 .into_iter()
154 .filter_map(|ns| {
155 let config = configs
156 .get(&ns)
157 .cloned()
158 .or_else(|| self.default_config.read().clone())?;
159 let usage = usage.get(&ns).cloned().unwrap_or_default();
160 Some(QuotaStatus::new(ns, config, usage))
161 })
162 .collect()
163 }
164
165 pub fn estimate_storage_size(vectors: &[Vector]) -> u64 {
167 vectors
168 .iter()
169 .map(|v| {
170 let id_size = v.id.len() as u64;
172 let values_size = (v.values.len() * std::mem::size_of::<f32>()) as u64;
173
174 let metadata_size = v
176 .metadata
177 .as_ref()
178 .map(|m| serde_json::to_string(m).map(|s| s.len()).unwrap_or(0))
179 .unwrap_or(0) as u64;
180
181 let ttl_size = 16u64; id_size + values_size + metadata_size + ttl_size + 64 })
186 .sum()
187 }
188
189 pub fn check_upsert(&self, namespace: &NamespaceId, vectors: &[Vector]) -> QuotaCheckResult {
191 let config = match self.get_quota(namespace) {
192 Some(c) => c,
193 None => {
194 return QuotaCheckResult {
196 allowed: true,
197 reason: None,
198 usage: self.get_usage(namespace).unwrap_or_default(),
199 exceeded_quota: None,
200 };
201 }
202 };
203
204 if config.enforcement == QuotaEnforcement::None {
206 return QuotaCheckResult {
207 allowed: true,
208 reason: None,
209 usage: self.get_usage(namespace).unwrap_or_default(),
210 exceeded_quota: None,
211 };
212 }
213
214 let usage = self.get_usage(namespace).unwrap_or_default();
215 let new_vectors = vectors.len() as u64;
216 let new_bytes = Self::estimate_storage_size(vectors);
217
218 if let Some(max_vectors) = config.max_vectors {
220 let projected = usage.vector_count + new_vectors;
221 if projected > max_vectors {
222 let reason = format!(
223 "Would exceed max vectors: {} + {} = {} > {}",
224 usage.vector_count, new_vectors, projected, max_vectors
225 );
226
227 if config.enforcement == QuotaEnforcement::Hard {
228 return QuotaCheckResult {
229 allowed: false,
230 reason: Some(reason),
231 usage,
232 exceeded_quota: Some("max_vectors".to_string()),
233 };
234 } else {
235 tracing::warn!(
236 namespace = %namespace,
237 reason = %reason,
238 "Soft quota exceeded"
239 );
240 }
241 }
242 }
243
244 if let Some(max_storage) = config.max_storage_bytes {
246 let projected = usage.storage_bytes + new_bytes;
247 if projected > max_storage {
248 let reason = format!(
249 "Would exceed max storage: {} + {} = {} > {} bytes",
250 usage.storage_bytes, new_bytes, projected, max_storage
251 );
252
253 if config.enforcement == QuotaEnforcement::Hard {
254 return QuotaCheckResult {
255 allowed: false,
256 reason: Some(reason),
257 usage,
258 exceeded_quota: Some("max_storage_bytes".to_string()),
259 };
260 } else {
261 tracing::warn!(
262 namespace = %namespace,
263 reason = %reason,
264 "Soft quota exceeded"
265 );
266 }
267 }
268 }
269
270 if let Some(max_dim) = config.max_dimensions {
272 for v in vectors {
273 if v.values.len() > max_dim {
274 let reason = format!(
275 "Vector '{}' exceeds max dimensions: {} > {}",
276 v.id,
277 v.values.len(),
278 max_dim
279 );
280
281 if config.enforcement == QuotaEnforcement::Hard {
282 return QuotaCheckResult {
283 allowed: false,
284 reason: Some(reason),
285 usage,
286 exceeded_quota: Some("max_dimensions".to_string()),
287 };
288 } else {
289 tracing::warn!(
290 namespace = %namespace,
291 vector_id = %v.id,
292 reason = %reason,
293 "Soft quota exceeded"
294 );
295 }
296 }
297 }
298 }
299
300 if let Some(max_meta) = config.max_metadata_bytes {
302 for v in vectors {
303 if let Some(meta) = &v.metadata {
304 let meta_size = serde_json::to_string(meta).map(|s| s.len()).unwrap_or(0);
305 if meta_size > max_meta {
306 let reason = format!(
307 "Vector '{}' exceeds max metadata size: {} > {} bytes",
308 v.id, meta_size, max_meta
309 );
310
311 if config.enforcement == QuotaEnforcement::Hard {
312 return QuotaCheckResult {
313 allowed: false,
314 reason: Some(reason),
315 usage,
316 exceeded_quota: Some("max_metadata_bytes".to_string()),
317 };
318 } else {
319 tracing::warn!(
320 namespace = %namespace,
321 vector_id = %v.id,
322 reason = %reason,
323 "Soft quota exceeded"
324 );
325 }
326 }
327 }
328 }
329 }
330
331 QuotaCheckResult {
332 allowed: true,
333 reason: None,
334 usage,
335 exceeded_quota: None,
336 }
337 }
338
339 pub fn enforce_upsert(&self, namespace: &NamespaceId, vectors: &[Vector]) -> Result<()> {
341 let check = self.check_upsert(namespace, vectors);
342 if !check.allowed {
343 return Err(DakeraError::QuotaExceeded {
344 namespace: namespace.clone(),
345 reason: check.reason.unwrap_or_else(|| "Quota exceeded".to_string()),
346 });
347 }
348 Ok(())
349 }
350}
351
352impl Default for QuotaManager {
353 fn default() -> Self {
354 Self::new()
355 }
356}
357
358pub type SharedQuotaManager = Arc<QuotaManager>;
360
361#[cfg(test)]
362mod tests {
363 use super::*;
364
365 #[test]
366 fn test_quota_manager_basic() {
367 let manager = QuotaManager::new();
368 let namespace = "test".to_string();
369
370 let vectors = vec![Vector {
372 id: "v1".to_string(),
373 values: vec![1.0, 2.0, 3.0],
374 metadata: None,
375 ttl_seconds: None,
376 expires_at: None,
377 }];
378
379 let check = manager.check_upsert(&namespace, &vectors);
380 assert!(check.allowed);
381 }
382
383 #[test]
384 fn test_quota_enforcement_hard() {
385 let manager = QuotaManager::new();
386 let namespace = "test".to_string();
387
388 manager.set_quota(
390 &namespace,
391 QuotaConfig {
392 max_vectors: Some(2),
393 max_storage_bytes: None,
394 max_dimensions: None,
395 max_metadata_bytes: None,
396 enforcement: QuotaEnforcement::Hard,
397 },
398 );
399
400 manager.init_usage(&namespace, 1, 100);
402
403 let vectors = vec![Vector {
405 id: "v1".to_string(),
406 values: vec![1.0, 2.0],
407 metadata: None,
408 ttl_seconds: None,
409 expires_at: None,
410 }];
411
412 let check = manager.check_upsert(&namespace, &vectors);
413 assert!(check.allowed);
414
415 let vectors = vec![
417 Vector {
418 id: "v2".to_string(),
419 values: vec![1.0, 2.0],
420 metadata: None,
421 ttl_seconds: None,
422 expires_at: None,
423 },
424 Vector {
425 id: "v3".to_string(),
426 values: vec![1.0, 2.0],
427 metadata: None,
428 ttl_seconds: None,
429 expires_at: None,
430 },
431 ];
432
433 let check = manager.check_upsert(&namespace, &vectors);
434 assert!(!check.allowed);
435 assert_eq!(check.exceeded_quota, Some("max_vectors".to_string()));
436 }
437
438 #[test]
439 fn test_quota_enforcement_soft() {
440 let manager = QuotaManager::new();
441 let namespace = "test".to_string();
442
443 manager.set_quota(
445 &namespace,
446 QuotaConfig {
447 max_vectors: Some(1),
448 max_storage_bytes: None,
449 max_dimensions: None,
450 max_metadata_bytes: None,
451 enforcement: QuotaEnforcement::Soft,
452 },
453 );
454
455 manager.init_usage(&namespace, 1, 100);
456
457 let vectors = vec![Vector {
459 id: "v1".to_string(),
460 values: vec![1.0, 2.0],
461 metadata: None,
462 ttl_seconds: None,
463 expires_at: None,
464 }];
465
466 let check = manager.check_upsert(&namespace, &vectors);
467 assert!(check.allowed); }
469
470 #[test]
471 fn test_dimension_quota() {
472 let manager = QuotaManager::new();
473 let namespace = "test".to_string();
474
475 manager.set_quota(
476 &namespace,
477 QuotaConfig {
478 max_vectors: None,
479 max_storage_bytes: None,
480 max_dimensions: Some(3),
481 max_metadata_bytes: None,
482 enforcement: QuotaEnforcement::Hard,
483 },
484 );
485
486 let vectors = vec![Vector {
488 id: "v1".to_string(),
489 values: vec![1.0, 2.0, 3.0, 4.0],
490 metadata: None,
491 ttl_seconds: None,
492 expires_at: None,
493 }];
494
495 let check = manager.check_upsert(&namespace, &vectors);
496 assert!(!check.allowed);
497 assert_eq!(check.exceeded_quota, Some("max_dimensions".to_string()));
498 }
499
500 #[test]
501 fn test_quota_status() {
502 let manager = QuotaManager::new();
503 let namespace = "test".to_string();
504
505 manager.set_quota(
506 &namespace,
507 QuotaConfig {
508 max_vectors: Some(100),
509 max_storage_bytes: Some(1_000_000),
510 max_dimensions: None,
511 max_metadata_bytes: None,
512 enforcement: QuotaEnforcement::Hard,
513 },
514 );
515
516 manager.init_usage(&namespace, 50, 500_000);
517
518 let status = manager.get_status(&namespace).unwrap();
519 assert_eq!(status.namespace, namespace);
520 assert!(!status.is_exceeded);
521 assert_eq!(status.vector_usage_percent, Some(50.0));
522 assert_eq!(status.storage_usage_percent, Some(50.0));
523 }
524}