Skip to main content

storage/
quota.rs

1use common::{
2    DakeraError, NamespaceId, QuotaCheckResult, QuotaConfig, QuotaEnforcement, QuotaStatus,
3    QuotaUsage, Result, Vector,
4};
5use parking_lot::RwLock;
6use std::collections::HashMap;
7use std::sync::Arc;
8
9/// Manager for namespace storage quotas
10pub struct QuotaManager {
11    /// Quota configurations per namespace
12    configs: RwLock<HashMap<NamespaceId, QuotaConfig>>,
13    /// Current usage per namespace
14    usage: RwLock<HashMap<NamespaceId, QuotaUsage>>,
15    /// Default quota for new namespaces (None = unlimited)
16    default_config: RwLock<Option<QuotaConfig>>,
17}
18
19impl QuotaManager {
20    /// Create a new quota manager
21    pub fn new() -> Self {
22        Self {
23            configs: RwLock::new(HashMap::new()),
24            usage: RwLock::new(HashMap::new()),
25            default_config: RwLock::new(None),
26        }
27    }
28
29    /// Create a quota manager with a default configuration for new namespaces
30    pub fn with_default(default_config: QuotaConfig) -> Self {
31        Self {
32            configs: RwLock::new(HashMap::new()),
33            usage: RwLock::new(HashMap::new()),
34            default_config: RwLock::new(Some(default_config)),
35        }
36    }
37
38    /// Set the default quota configuration for new namespaces
39    pub fn set_default_config(&self, config: Option<QuotaConfig>) {
40        *self.default_config.write() = config;
41    }
42
43    /// Get the default quota configuration
44    pub fn get_default_config(&self) -> Option<QuotaConfig> {
45        self.default_config.read().clone()
46    }
47
48    /// Set quota configuration for a namespace
49    pub fn set_quota(&self, namespace: &NamespaceId, config: QuotaConfig) {
50        self.configs.write().insert(namespace.clone(), config);
51        tracing::info!(namespace = %namespace, "Quota configuration updated");
52    }
53
54    /// Get quota configuration for a namespace
55    pub fn get_quota(&self, namespace: &NamespaceId) -> Option<QuotaConfig> {
56        self.configs
57            .read()
58            .get(namespace)
59            .cloned()
60            .or_else(|| self.default_config.read().clone())
61    }
62
63    /// Remove quota configuration for a namespace
64    pub fn remove_quota(&self, namespace: &NamespaceId) -> Option<QuotaConfig> {
65        self.configs.write().remove(namespace)
66    }
67
68    /// Initialize usage tracking for a namespace
69    pub fn init_usage(&self, namespace: &NamespaceId, vector_count: u64, storage_bytes: u64) {
70        let usage = QuotaUsage::new(vector_count, storage_bytes);
71        self.usage.write().insert(namespace.clone(), usage);
72
73        // Apply default config if none exists
74        if !self.configs.read().contains_key(namespace) {
75            if let Some(default) = self.default_config.read().clone() {
76                self.configs.write().insert(namespace.clone(), default);
77            }
78        }
79    }
80
81    /// Get current usage for a namespace
82    pub fn get_usage(&self, namespace: &NamespaceId) -> Option<QuotaUsage> {
83        self.usage.read().get(namespace).cloned()
84    }
85
86    /// Update usage for a namespace
87    pub fn update_usage(&self, namespace: &NamespaceId, vector_count: u64, storage_bytes: u64) {
88        let mut usage_map = self.usage.write();
89        if let Some(usage) = usage_map.get_mut(namespace) {
90            usage.vector_count = vector_count;
91            usage.storage_bytes = storage_bytes;
92            usage.touch();
93        } else {
94            usage_map.insert(
95                namespace.clone(),
96                QuotaUsage::new(vector_count, storage_bytes),
97            );
98        }
99    }
100
101    /// Increment usage after successful upsert
102    pub fn increment_usage(&self, namespace: &NamespaceId, added_vectors: u64, added_bytes: u64) {
103        let mut usage_map = self.usage.write();
104        if let Some(usage) = usage_map.get_mut(namespace) {
105            usage.vector_count += added_vectors;
106            usage.storage_bytes += added_bytes;
107            usage.touch();
108        } else {
109            usage_map.insert(
110                namespace.clone(),
111                QuotaUsage::new(added_vectors, added_bytes),
112            );
113        }
114    }
115
116    /// Decrement usage after successful delete
117    pub fn decrement_usage(
118        &self,
119        namespace: &NamespaceId,
120        removed_vectors: u64,
121        removed_bytes: u64,
122    ) {
123        let mut usage_map = self.usage.write();
124        if let Some(usage) = usage_map.get_mut(namespace) {
125            usage.vector_count = usage.vector_count.saturating_sub(removed_vectors);
126            usage.storage_bytes = usage.storage_bytes.saturating_sub(removed_bytes);
127            usage.touch();
128        }
129    }
130
131    /// Remove usage tracking for a namespace
132    pub fn remove_usage(&self, namespace: &NamespaceId) {
133        self.usage.write().remove(namespace);
134    }
135
136    /// Get full quota status for a namespace
137    pub fn get_status(&self, namespace: &NamespaceId) -> Option<QuotaStatus> {
138        let config = self.get_quota(namespace)?;
139        let usage = self.get_usage(namespace).unwrap_or_default();
140        Some(QuotaStatus::new(namespace.clone(), config, usage))
141    }
142
143    /// Get quota status for all namespaces
144    pub fn get_all_status(&self) -> Vec<QuotaStatus> {
145        let configs = self.configs.read();
146        let usage = self.usage.read();
147
148        // Combine all namespaces from both maps
149        let mut namespaces: std::collections::HashSet<_> = configs.keys().cloned().collect();
150        namespaces.extend(usage.keys().cloned());
151
152        namespaces
153            .into_iter()
154            .filter_map(|ns| {
155                let config = configs
156                    .get(&ns)
157                    .cloned()
158                    .or_else(|| self.default_config.read().clone())?;
159                let usage = usage.get(&ns).cloned().unwrap_or_default();
160                Some(QuotaStatus::new(ns, config, usage))
161            })
162            .collect()
163    }
164
165    /// Estimate storage size for vectors
166    pub fn estimate_storage_size(vectors: &[Vector]) -> u64 {
167        vectors
168            .iter()
169            .map(|v| {
170                // Base: id + values
171                let id_size = v.id.len() as u64;
172                let values_size = (v.values.len() * std::mem::size_of::<f32>()) as u64;
173
174                // Metadata (estimate JSON size)
175                let metadata_size = v
176                    .metadata
177                    .as_ref()
178                    .map(|m| serde_json::to_string(m).map(|s| s.len()).unwrap_or(0))
179                    .unwrap_or(0) as u64;
180
181                // TTL fields
182                let ttl_size = 16u64; // 2 x Option<u64>
183
184                id_size + values_size + metadata_size + ttl_size + 64 // 64 bytes overhead
185            })
186            .sum()
187    }
188
189    /// Check if an upsert operation would exceed quotas
190    pub fn check_upsert(&self, namespace: &NamespaceId, vectors: &[Vector]) -> QuotaCheckResult {
191        let config = match self.get_quota(namespace) {
192            Some(c) => c,
193            None => {
194                // No quota configured, allow
195                return QuotaCheckResult {
196                    allowed: true,
197                    reason: None,
198                    usage: self.get_usage(namespace).unwrap_or_default(),
199                    exceeded_quota: None,
200                };
201            }
202        };
203
204        // Skip check if enforcement is None
205        if config.enforcement == QuotaEnforcement::None {
206            return QuotaCheckResult {
207                allowed: true,
208                reason: None,
209                usage: self.get_usage(namespace).unwrap_or_default(),
210                exceeded_quota: None,
211            };
212        }
213
214        let usage = self.get_usage(namespace).unwrap_or_default();
215        let new_vectors = vectors.len() as u64;
216        let new_bytes = Self::estimate_storage_size(vectors);
217
218        // Check vector count quota
219        if let Some(max_vectors) = config.max_vectors {
220            let projected = usage.vector_count + new_vectors;
221            if projected > max_vectors {
222                let reason = format!(
223                    "Would exceed max vectors: {} + {} = {} > {}",
224                    usage.vector_count, new_vectors, projected, max_vectors
225                );
226
227                if config.enforcement == QuotaEnforcement::Hard {
228                    return QuotaCheckResult {
229                        allowed: false,
230                        reason: Some(reason),
231                        usage,
232                        exceeded_quota: Some("max_vectors".to_string()),
233                    };
234                } else {
235                    tracing::warn!(
236                        namespace = %namespace,
237                        reason = %reason,
238                        "Soft quota exceeded"
239                    );
240                }
241            }
242        }
243
244        // Check storage quota
245        if let Some(max_storage) = config.max_storage_bytes {
246            let projected = usage.storage_bytes + new_bytes;
247            if projected > max_storage {
248                let reason = format!(
249                    "Would exceed max storage: {} + {} = {} > {} bytes",
250                    usage.storage_bytes, new_bytes, projected, max_storage
251                );
252
253                if config.enforcement == QuotaEnforcement::Hard {
254                    return QuotaCheckResult {
255                        allowed: false,
256                        reason: Some(reason),
257                        usage,
258                        exceeded_quota: Some("max_storage_bytes".to_string()),
259                    };
260                } else {
261                    tracing::warn!(
262                        namespace = %namespace,
263                        reason = %reason,
264                        "Soft quota exceeded"
265                    );
266                }
267            }
268        }
269
270        // Check per-vector constraints
271        if let Some(max_dim) = config.max_dimensions {
272            for v in vectors {
273                if v.values.len() > max_dim {
274                    let reason = format!(
275                        "Vector '{}' exceeds max dimensions: {} > {}",
276                        v.id,
277                        v.values.len(),
278                        max_dim
279                    );
280
281                    if config.enforcement == QuotaEnforcement::Hard {
282                        return QuotaCheckResult {
283                            allowed: false,
284                            reason: Some(reason),
285                            usage,
286                            exceeded_quota: Some("max_dimensions".to_string()),
287                        };
288                    } else {
289                        tracing::warn!(
290                            namespace = %namespace,
291                            vector_id = %v.id,
292                            reason = %reason,
293                            "Soft quota exceeded"
294                        );
295                    }
296                }
297            }
298        }
299
300        // Check metadata size
301        if let Some(max_meta) = config.max_metadata_bytes {
302            for v in vectors {
303                if let Some(meta) = &v.metadata {
304                    let meta_size = serde_json::to_string(meta).map(|s| s.len()).unwrap_or(0);
305                    if meta_size > max_meta {
306                        let reason = format!(
307                            "Vector '{}' exceeds max metadata size: {} > {} bytes",
308                            v.id, meta_size, max_meta
309                        );
310
311                        if config.enforcement == QuotaEnforcement::Hard {
312                            return QuotaCheckResult {
313                                allowed: false,
314                                reason: Some(reason),
315                                usage,
316                                exceeded_quota: Some("max_metadata_bytes".to_string()),
317                            };
318                        } else {
319                            tracing::warn!(
320                                namespace = %namespace,
321                                vector_id = %v.id,
322                                reason = %reason,
323                                "Soft quota exceeded"
324                            );
325                        }
326                    }
327                }
328            }
329        }
330
331        QuotaCheckResult {
332            allowed: true,
333            reason: None,
334            usage,
335            exceeded_quota: None,
336        }
337    }
338
339    /// Check quota and return error if exceeded (for use in upsert operations)
340    pub fn enforce_upsert(&self, namespace: &NamespaceId, vectors: &[Vector]) -> Result<()> {
341        let check = self.check_upsert(namespace, vectors);
342        if !check.allowed {
343            return Err(DakeraError::QuotaExceeded {
344                namespace: namespace.clone(),
345                reason: check.reason.unwrap_or_else(|| "Quota exceeded".to_string()),
346            });
347        }
348        Ok(())
349    }
350}
351
352impl Default for QuotaManager {
353    fn default() -> Self {
354        Self::new()
355    }
356}
357
358/// Thread-safe shared quota manager
359pub type SharedQuotaManager = Arc<QuotaManager>;
360
361#[cfg(test)]
362mod tests {
363    use super::*;
364
365    #[test]
366    fn test_quota_manager_basic() {
367        let manager = QuotaManager::new();
368        let namespace = "test".to_string();
369
370        // No quota configured - should allow
371        let vectors = vec![Vector {
372            id: "v1".to_string(),
373            values: vec![1.0, 2.0, 3.0],
374            metadata: None,
375            ttl_seconds: None,
376            expires_at: None,
377        }];
378
379        let check = manager.check_upsert(&namespace, &vectors);
380        assert!(check.allowed);
381    }
382
383    #[test]
384    fn test_quota_enforcement_hard() {
385        let manager = QuotaManager::new();
386        let namespace = "test".to_string();
387
388        // Set quota with max 2 vectors
389        manager.set_quota(
390            &namespace,
391            QuotaConfig {
392                max_vectors: Some(2),
393                max_storage_bytes: None,
394                max_dimensions: None,
395                max_metadata_bytes: None,
396                enforcement: QuotaEnforcement::Hard,
397            },
398        );
399
400        // Initialize with 1 vector
401        manager.init_usage(&namespace, 1, 100);
402
403        // Adding 1 more should be allowed
404        let vectors = vec![Vector {
405            id: "v1".to_string(),
406            values: vec![1.0, 2.0],
407            metadata: None,
408            ttl_seconds: None,
409            expires_at: None,
410        }];
411
412        let check = manager.check_upsert(&namespace, &vectors);
413        assert!(check.allowed);
414
415        // Adding 2 more should be rejected
416        let vectors = vec![
417            Vector {
418                id: "v2".to_string(),
419                values: vec![1.0, 2.0],
420                metadata: None,
421                ttl_seconds: None,
422                expires_at: None,
423            },
424            Vector {
425                id: "v3".to_string(),
426                values: vec![1.0, 2.0],
427                metadata: None,
428                ttl_seconds: None,
429                expires_at: None,
430            },
431        ];
432
433        let check = manager.check_upsert(&namespace, &vectors);
434        assert!(!check.allowed);
435        assert_eq!(check.exceeded_quota, Some("max_vectors".to_string()));
436    }
437
438    #[test]
439    fn test_quota_enforcement_soft() {
440        let manager = QuotaManager::new();
441        let namespace = "test".to_string();
442
443        // Set quota with soft enforcement
444        manager.set_quota(
445            &namespace,
446            QuotaConfig {
447                max_vectors: Some(1),
448                max_storage_bytes: None,
449                max_dimensions: None,
450                max_metadata_bytes: None,
451                enforcement: QuotaEnforcement::Soft,
452            },
453        );
454
455        manager.init_usage(&namespace, 1, 100);
456
457        // Adding more should be allowed (soft)
458        let vectors = vec![Vector {
459            id: "v1".to_string(),
460            values: vec![1.0, 2.0],
461            metadata: None,
462            ttl_seconds: None,
463            expires_at: None,
464        }];
465
466        let check = manager.check_upsert(&namespace, &vectors);
467        assert!(check.allowed); // Soft enforcement allows
468    }
469
470    #[test]
471    fn test_dimension_quota() {
472        let manager = QuotaManager::new();
473        let namespace = "test".to_string();
474
475        manager.set_quota(
476            &namespace,
477            QuotaConfig {
478                max_vectors: None,
479                max_storage_bytes: None,
480                max_dimensions: Some(3),
481                max_metadata_bytes: None,
482                enforcement: QuotaEnforcement::Hard,
483            },
484        );
485
486        // Vector with 4 dimensions should be rejected
487        let vectors = vec![Vector {
488            id: "v1".to_string(),
489            values: vec![1.0, 2.0, 3.0, 4.0],
490            metadata: None,
491            ttl_seconds: None,
492            expires_at: None,
493        }];
494
495        let check = manager.check_upsert(&namespace, &vectors);
496        assert!(!check.allowed);
497        assert_eq!(check.exceeded_quota, Some("max_dimensions".to_string()));
498    }
499
500    #[test]
501    fn test_quota_status() {
502        let manager = QuotaManager::new();
503        let namespace = "test".to_string();
504
505        manager.set_quota(
506            &namespace,
507            QuotaConfig {
508                max_vectors: Some(100),
509                max_storage_bytes: Some(1_000_000),
510                max_dimensions: None,
511                max_metadata_bytes: None,
512                enforcement: QuotaEnforcement::Hard,
513            },
514        );
515
516        manager.init_usage(&namespace, 50, 500_000);
517
518        let status = manager.get_status(&namespace).unwrap();
519        assert_eq!(status.namespace, namespace);
520        assert!(!status.is_exceeded);
521        assert_eq!(status.vector_usage_percent, Some(50.0));
522        assert_eq!(status.storage_usage_percent, Some(50.0));
523    }
524}