Skip to main content

sochdb_storage/
namespace.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Lightweight Namespace Routing + On-Disk Layout (Task 3)
19//!
20//! This module provides physical namespace isolation with:
21//! - Namespace registry (`_namespaces.meta`)
22//! - Per-namespace directory layout
23//! - Database router for (namespace, collection) → storage handles
24//!
25//! ## On-Disk Layout
26//!
27//! ```text
28//! data/
29//! ├── _namespaces.meta          # Registry of all namespaces
30//! ├── _global/                  # Shared metadata
31//! │   └── catalog.db
32//! └── namespaces/
33//!     ├── tenant_a/
34//!     │   ├── _meta.json        # Namespace metadata
35//!     │   ├── collections/
36//!     │   │   ├── docs/
37//!     │   │   │   ├── vectors.idx
38//!     │   │   │   └── data.db
39//!     │   │   └── images/
40//!     │   │       ├── vectors.idx
41//!     │   │       └── data.db
42//!     │   └── kv/                # Key-value storage
43//!     │       └── data.db
44//!     └── tenant_b/
45//!         └── ...
46//! ```
47//!
48//! ## Name Resolution
49//!
50//! Resolution is O(1) via hash-map lookup:
51//!
52//! ```text
53//! (namespace="tenant_a", collection="docs")
54//!     → NamespaceHandle { root: "data/namespaces/tenant_a" }
55//!         → CollectionHandle { path: "data/namespaces/tenant_a/collections/docs" }
56//! ```
57
58use std::collections::HashMap;
59use std::fs::{self, File};
60use std::io::{BufReader, BufWriter};
61use std::path::{Path, PathBuf};
62use std::sync::Arc;
63use std::time::{SystemTime, UNIX_EPOCH};
64
65use parking_lot::RwLock;
66use serde::{Deserialize, Serialize};
67
68// ============================================================================
69// Namespace Error Types
70// ============================================================================
71
72/// Errors related to namespace operations
73#[derive(Debug, Clone, thiserror::Error)]
74pub enum NamespaceStorageError {
75    #[error("namespace not found: {0}")]
76    NotFound(String),
77
78    #[error("namespace already exists: {0}")]
79    AlreadyExists(String),
80
81    #[error("invalid namespace name: {0}")]
82    InvalidName(String),
83
84    #[error("collection not found: {namespace}/{collection}")]
85    CollectionNotFound {
86        namespace: String,
87        collection: String,
88    },
89
90    #[error("collection already exists: {namespace}/{collection}")]
91    CollectionAlreadyExists {
92        namespace: String,
93        collection: String,
94    },
95
96    #[error("storage I/O error: {0}")]
97    IoError(String),
98
99    #[error("namespace is read-only: {0}")]
100    ReadOnly(String),
101
102    #[error("namespace registry corrupted: {0}")]
103    RegistryCorrupted(String),
104}
105
106impl From<std::io::Error> for NamespaceStorageError {
107    fn from(e: std::io::Error) -> Self {
108        NamespaceStorageError::IoError(e.to_string())
109    }
110}
111
112pub type Result<T> = std::result::Result<T, NamespaceStorageError>;
113
114// ============================================================================
115// Namespace Metadata
116// ============================================================================
117
118/// Metadata for a namespace (stored in _meta.json)
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct NamespaceMeta {
121    /// Unique namespace identifier
122    pub name: String,
123
124    /// Human-readable display name
125    pub display_name: Option<String>,
126
127    /// Creation timestamp (Unix epoch millis)
128    pub created_at: u64,
129
130    /// Last modified timestamp (Unix epoch millis)
131    pub updated_at: u64,
132
133    /// Whether the namespace is read-only
134    pub read_only: bool,
135
136    /// Custom metadata/labels
137    pub labels: HashMap<String, String>,
138
139    /// Collections in this namespace
140    #[serde(default)]
141    pub collections: Vec<String>,
142}
143
144impl NamespaceMeta {
145    /// Create a new namespace metadata
146    pub fn new(name: impl Into<String>) -> Self {
147        let now = SystemTime::now()
148            .duration_since(UNIX_EPOCH)
149            .unwrap_or_default()
150            .as_millis() as u64;
151
152        Self {
153            name: name.into(),
154            display_name: None,
155            created_at: now,
156            updated_at: now,
157            read_only: false,
158            labels: HashMap::new(),
159            collections: Vec::new(),
160        }
161    }
162
163    /// Set display name
164    pub fn with_display_name(mut self, name: impl Into<String>) -> Self {
165        self.display_name = Some(name.into());
166        self
167    }
168
169    /// Set labels
170    pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
171        self.labels = labels;
172        self
173    }
174}
175
176/// Namespace registry (stored in _namespaces.meta)
177#[derive(Debug, Clone, Serialize, Deserialize, Default)]
178pub struct NamespaceRegistry {
179    /// Version for forward compatibility
180    pub version: u32,
181
182    /// List of registered namespaces
183    pub namespaces: Vec<NamespaceEntry>,
184}
185
186/// Entry in the namespace registry
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct NamespaceEntry {
189    /// Namespace name (directory name)
190    pub name: String,
191
192    /// Creation timestamp
193    pub created_at: u64,
194
195    /// Whether the namespace is active
196    pub active: bool,
197}
198
199// ============================================================================
200// Collection Config
201// ============================================================================
202
203/// Collection configuration (stored in collection directory)
204#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct CollectionConfig {
206    /// Collection name
207    pub name: String,
208
209    /// Vector dimension (if applicable)
210    pub dimension: Option<usize>,
211
212    /// Distance metric
213    pub metric: DistanceMetric,
214
215    /// Index configuration
216    pub index_config: IndexConfig,
217
218    /// Creation timestamp
219    pub created_at: u64,
220
221    /// Whether the config is frozen (immutable after creation)
222    pub frozen: bool,
223}
224
225/// Distance metric for vector similarity
226#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
227pub enum DistanceMetric {
228    #[default]
229    Cosine,
230    Euclidean,
231    DotProduct,
232}
233
234/// Index configuration
235#[derive(Debug, Clone, Serialize, Deserialize)]
236pub struct IndexConfig {
237    /// HNSW M parameter
238    pub m: usize,
239
240    /// HNSW ef_construction parameter
241    pub ef_construction: usize,
242
243    /// Enable quantization
244    pub quantization: Option<QuantizationType>,
245}
246
247impl Default for IndexConfig {
248    fn default() -> Self {
249        Self {
250            m: 16,
251            ef_construction: 100,
252            quantization: None,
253        }
254    }
255}
256
257/// Quantization type
258#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
259pub enum QuantizationType {
260    Scalar, // int8 quantization
261    PQ,     // Product quantization
262}
263
264// ============================================================================
265// Namespace Handle
266// ============================================================================
267
268/// Handle to a namespace's storage
269#[derive(Debug, Clone)]
270pub struct NamespaceHandle {
271    /// Namespace name
272    pub name: String,
273
274    /// Root directory for this namespace
275    pub root: PathBuf,
276
277    /// Metadata
278    pub meta: Arc<RwLock<NamespaceMeta>>,
279}
280
281impl NamespaceHandle {
282    /// Get the collections directory
283    pub fn collections_dir(&self) -> PathBuf {
284        self.root.join("collections")
285    }
286
287    /// Get the key-value storage directory
288    pub fn kv_dir(&self) -> PathBuf {
289        self.root.join("kv")
290    }
291
292    /// Get a collection path
293    pub fn collection_path(&self, collection: &str) -> PathBuf {
294        self.collections_dir().join(collection)
295    }
296
297    /// Check if collection exists
298    pub fn has_collection(&self, collection: &str) -> bool {
299        self.collection_path(collection).exists()
300    }
301
302    /// List collections
303    pub fn list_collections(&self) -> Result<Vec<String>> {
304        let collections_dir = self.collections_dir();
305        if !collections_dir.exists() {
306            return Ok(Vec::new());
307        }
308
309        let mut collections = Vec::new();
310        for entry in fs::read_dir(&collections_dir)? {
311            let entry = entry?;
312            if entry.file_type()?.is_dir() {
313                if let Some(name) = entry.file_name().to_str() {
314                    collections.push(name.to_string());
315                }
316            }
317        }
318        Ok(collections)
319    }
320}
321
322/// Handle to a collection's storage
323#[derive(Debug, Clone)]
324pub struct CollectionHandle {
325    /// Namespace this collection belongs to
326    pub namespace: String,
327
328    /// Collection name
329    pub name: String,
330
331    /// Root directory for this collection
332    pub root: PathBuf,
333
334    /// Configuration (immutable after creation)
335    pub config: Arc<CollectionConfig>,
336}
337
338impl CollectionHandle {
339    /// Get the vector index path
340    pub fn vectors_path(&self) -> PathBuf {
341        self.root.join("vectors.idx")
342    }
343
344    /// Get the data storage path
345    pub fn data_path(&self) -> PathBuf {
346        self.root.join("data.db")
347    }
348
349    /// Get the metadata index path
350    pub fn metadata_path(&self) -> PathBuf {
351        self.root.join("metadata.idx")
352    }
353
354    /// Get the tombstones path
355    pub fn tombstones_path(&self) -> PathBuf {
356        self.root.join("tombstones.bin")
357    }
358}
359
360// ============================================================================
361// Namespace Router (Main Interface)
362// ============================================================================
363
364/// Database router for namespace resolution
365///
366/// Resolves (namespace, collection) → storage handles with O(1) lookup.
367pub struct NamespaceRouter {
368    /// Base data directory
369    data_dir: PathBuf,
370
371    /// Namespace registry (cached)
372    registry: RwLock<NamespaceRegistry>,
373
374    /// Loaded namespace handles (cache)
375    namespaces: RwLock<HashMap<String, Arc<NamespaceHandle>>>,
376
377    /// Loaded collection handles (cache)
378    collections: RwLock<HashMap<(String, String), Arc<CollectionHandle>>>,
379}
380
381impl NamespaceRouter {
382    /// Create a new namespace router
383    pub fn new(data_dir: impl AsRef<Path>) -> Result<Self> {
384        let data_dir = data_dir.as_ref().to_path_buf();
385
386        // Ensure directories exist
387        fs::create_dir_all(&data_dir)?;
388        fs::create_dir_all(data_dir.join("namespaces"))?;
389        fs::create_dir_all(data_dir.join("_global"))?;
390
391        // Load or create registry
392        let registry_path = data_dir.join("_namespaces.meta");
393        let registry = if registry_path.exists() {
394            let file = File::open(&registry_path)?;
395            let reader = BufReader::new(file);
396            serde_json::from_reader(reader)
397                .map_err(|e| NamespaceStorageError::RegistryCorrupted(e.to_string()))?
398        } else {
399            NamespaceRegistry {
400                version: 1,
401                namespaces: Vec::new(),
402            }
403        };
404
405        Ok(Self {
406            data_dir,
407            registry: RwLock::new(registry),
408            namespaces: RwLock::new(HashMap::new()),
409            collections: RwLock::new(HashMap::new()),
410        })
411    }
412
413    /// Save the registry to disk
414    fn save_registry(&self) -> Result<()> {
415        let registry_path = self.data_dir.join("_namespaces.meta");
416        let file = File::create(&registry_path)?;
417        let writer = BufWriter::new(file);
418        serde_json::to_writer_pretty(writer, &*self.registry.read())
419            .map_err(|e| NamespaceStorageError::IoError(e.to_string()))?;
420        Ok(())
421    }
422
423    // ========================================================================
424    // Namespace Operations
425    // ========================================================================
426
427    /// Create a new namespace
428    pub fn create_namespace(&self, meta: NamespaceMeta) -> Result<Arc<NamespaceHandle>> {
429        let name = meta.name.clone();
430
431        // Validate name
432        Self::validate_namespace_name(&name)?;
433
434        // Check if already exists
435        {
436            let registry = self.registry.read();
437            if registry.namespaces.iter().any(|e| e.name == name) {
438                return Err(NamespaceStorageError::AlreadyExists(name));
439            }
440        }
441
442        // Create directory structure
443        let namespace_dir = self.data_dir.join("namespaces").join(&name);
444        fs::create_dir_all(&namespace_dir)?;
445        fs::create_dir_all(namespace_dir.join("collections"))?;
446        fs::create_dir_all(namespace_dir.join("kv"))?;
447
448        // Write namespace metadata
449        let meta_path = namespace_dir.join("_meta.json");
450        let file = File::create(&meta_path)?;
451        let writer = BufWriter::new(file);
452        serde_json::to_writer_pretty(writer, &meta)
453            .map_err(|e| NamespaceStorageError::IoError(e.to_string()))?;
454
455        // Update registry
456        {
457            let mut registry = self.registry.write();
458            registry.namespaces.push(NamespaceEntry {
459                name: name.clone(),
460                created_at: meta.created_at,
461                active: true,
462            });
463        }
464        self.save_registry()?;
465
466        // Create and cache handle
467        let handle = Arc::new(NamespaceHandle {
468            name: name.clone(),
469            root: namespace_dir,
470            meta: Arc::new(RwLock::new(meta)),
471        });
472
473        self.namespaces.write().insert(name, handle.clone());
474
475        Ok(handle)
476    }
477
478    /// Get a namespace handle
479    pub fn get_namespace(&self, name: &str) -> Result<Arc<NamespaceHandle>> {
480        // Check cache first
481        if let Some(handle) = self.namespaces.read().get(name) {
482            return Ok(handle.clone());
483        }
484
485        // Check registry
486        {
487            let registry = self.registry.read();
488            if !registry
489                .namespaces
490                .iter()
491                .any(|e| e.name == name && e.active)
492            {
493                return Err(NamespaceStorageError::NotFound(name.to_string()));
494            }
495        }
496
497        // Load from disk
498        let namespace_dir = self.data_dir.join("namespaces").join(name);
499        if !namespace_dir.exists() {
500            return Err(NamespaceStorageError::NotFound(name.to_string()));
501        }
502
503        let meta_path = namespace_dir.join("_meta.json");
504        let meta: NamespaceMeta = if meta_path.exists() {
505            let file = File::open(&meta_path)?;
506            let reader = BufReader::new(file);
507            serde_json::from_reader(reader)
508                .map_err(|e| NamespaceStorageError::RegistryCorrupted(e.to_string()))?
509        } else {
510            NamespaceMeta::new(name)
511        };
512
513        let handle = Arc::new(NamespaceHandle {
514            name: name.to_string(),
515            root: namespace_dir,
516            meta: Arc::new(RwLock::new(meta)),
517        });
518
519        self.namespaces
520            .write()
521            .insert(name.to_string(), handle.clone());
522
523        Ok(handle)
524    }
525
526    /// List all namespaces
527    pub fn list_namespaces(&self) -> Vec<String> {
528        self.registry
529            .read()
530            .namespaces
531            .iter()
532            .filter(|e| e.active)
533            .map(|e| e.name.clone())
534            .collect()
535    }
536
537    /// Delete a namespace (marks as inactive, doesn't delete files)
538    pub fn delete_namespace(&self, name: &str) -> Result<()> {
539        {
540            let mut registry = self.registry.write();
541            let entry = registry
542                .namespaces
543                .iter_mut()
544                .find(|e| e.name == name)
545                .ok_or_else(|| NamespaceStorageError::NotFound(name.to_string()))?;
546            entry.active = false;
547        }
548
549        self.save_registry()?;
550        self.namespaces.write().remove(name);
551
552        // Remove cached collections for this namespace
553        self.collections.write().retain(|(ns, _), _| ns != name);
554
555        Ok(())
556    }
557
558    /// Validate namespace name
559    fn validate_namespace_name(name: &str) -> Result<()> {
560        if name.is_empty() {
561            return Err(NamespaceStorageError::InvalidName(
562                "namespace name cannot be empty".to_string(),
563            ));
564        }
565
566        if name.len() > 256 {
567            return Err(NamespaceStorageError::InvalidName(
568                "namespace name too long (max 256 chars)".to_string(),
569            ));
570        }
571
572        // Must start with alphanumeric
573        let first = name.chars().next().unwrap();
574        if !first.is_alphanumeric() {
575            return Err(NamespaceStorageError::InvalidName(format!(
576                "namespace name must start with alphanumeric, got '{}'",
577                first
578            )));
579        }
580
581        // Only allow alphanumeric, underscore, hyphen, dot
582        for ch in name.chars() {
583            if !ch.is_alphanumeric() && ch != '_' && ch != '-' && ch != '.' {
584                return Err(NamespaceStorageError::InvalidName(format!(
585                    "invalid character '{}' in namespace name",
586                    ch
587                )));
588            }
589        }
590
591        // Reserved names
592        let reserved = ["_global", "_namespaces", "_meta", "_system"];
593        if reserved.contains(&name) {
594            return Err(NamespaceStorageError::InvalidName(format!(
595                "'{}' is a reserved name",
596                name
597            )));
598        }
599
600        Ok(())
601    }
602
603    // ========================================================================
604    // Collection Operations
605    // ========================================================================
606
607    /// Create a collection in a namespace
608    pub fn create_collection(
609        &self,
610        namespace: &str,
611        config: CollectionConfig,
612    ) -> Result<Arc<CollectionHandle>> {
613        let ns_handle = self.get_namespace(namespace)?;
614
615        // Check if namespace is read-only
616        if ns_handle.meta.read().read_only {
617            return Err(NamespaceStorageError::ReadOnly(namespace.to_string()));
618        }
619
620        let collection_name = config.name.clone();
621        let collection_dir = ns_handle.collection_path(&collection_name);
622
623        // Check if already exists
624        if collection_dir.exists() {
625            return Err(NamespaceStorageError::CollectionAlreadyExists {
626                namespace: namespace.to_string(),
627                collection: collection_name,
628            });
629        }
630
631        // Create directory
632        fs::create_dir_all(&collection_dir)?;
633
634        // Freeze config and write
635        let frozen_config = CollectionConfig {
636            frozen: true,
637            ..config
638        };
639
640        let config_path = collection_dir.join("config.json");
641        let file = File::create(&config_path)?;
642        let writer = BufWriter::new(file);
643        serde_json::to_writer_pretty(writer, &frozen_config)
644            .map_err(|e| NamespaceStorageError::IoError(e.to_string()))?;
645
646        // Update namespace metadata
647        {
648            let mut meta = ns_handle.meta.write();
649            meta.collections.push(collection_name.clone());
650            meta.updated_at = SystemTime::now()
651                .duration_since(UNIX_EPOCH)
652                .unwrap_or_default()
653                .as_millis() as u64;
654        }
655
656        // Write updated namespace meta
657        let ns_meta_path = ns_handle.root.join("_meta.json");
658        let file = File::create(&ns_meta_path)?;
659        let writer = BufWriter::new(file);
660        serde_json::to_writer_pretty(writer, &*ns_handle.meta.read())
661            .map_err(|e| NamespaceStorageError::IoError(e.to_string()))?;
662
663        // Create and cache handle
664        let handle = Arc::new(CollectionHandle {
665            namespace: namespace.to_string(),
666            name: collection_name.clone(),
667            root: collection_dir,
668            config: Arc::new(frozen_config),
669        });
670
671        self.collections
672            .write()
673            .insert((namespace.to_string(), collection_name), handle.clone());
674
675        Ok(handle)
676    }
677
678    /// Get a collection handle
679    pub fn get_collection(
680        &self,
681        namespace: &str,
682        collection: &str,
683    ) -> Result<Arc<CollectionHandle>> {
684        let key = (namespace.to_string(), collection.to_string());
685
686        // Check cache
687        if let Some(handle) = self.collections.read().get(&key) {
688            return Ok(handle.clone());
689        }
690
691        let ns_handle = self.get_namespace(namespace)?;
692        let collection_dir = ns_handle.collection_path(collection);
693
694        if !collection_dir.exists() {
695            return Err(NamespaceStorageError::CollectionNotFound {
696                namespace: namespace.to_string(),
697                collection: collection.to_string(),
698            });
699        }
700
701        // Load config
702        let config_path = collection_dir.join("config.json");
703        let config: CollectionConfig = if config_path.exists() {
704            let file = File::open(&config_path)?;
705            let reader = BufReader::new(file);
706            serde_json::from_reader(reader)
707                .map_err(|e| NamespaceStorageError::RegistryCorrupted(e.to_string()))?
708        } else {
709            // Default config for legacy collections
710            CollectionConfig {
711                name: collection.to_string(),
712                dimension: None,
713                metric: DistanceMetric::Cosine,
714                index_config: IndexConfig::default(),
715                created_at: 0,
716                frozen: true,
717            }
718        };
719
720        let handle = Arc::new(CollectionHandle {
721            namespace: namespace.to_string(),
722            name: collection.to_string(),
723            root: collection_dir,
724            config: Arc::new(config),
725        });
726
727        self.collections.write().insert(key, handle.clone());
728
729        Ok(handle)
730    }
731
732    /// Delete a collection
733    pub fn delete_collection(&self, namespace: &str, collection: &str) -> Result<()> {
734        let ns_handle = self.get_namespace(namespace)?;
735
736        // Check if namespace is read-only
737        if ns_handle.meta.read().read_only {
738            return Err(NamespaceStorageError::ReadOnly(namespace.to_string()));
739        }
740
741        let collection_dir = ns_handle.collection_path(collection);
742        if !collection_dir.exists() {
743            return Err(NamespaceStorageError::CollectionNotFound {
744                namespace: namespace.to_string(),
745                collection: collection.to_string(),
746            });
747        }
748
749        // Remove from cache
750        self.collections
751            .write()
752            .remove(&(namespace.to_string(), collection.to_string()));
753
754        // Update namespace metadata
755        {
756            let mut meta = ns_handle.meta.write();
757            meta.collections.retain(|c| c != collection);
758            meta.updated_at = SystemTime::now()
759                .duration_since(UNIX_EPOCH)
760                .unwrap_or_default()
761                .as_millis() as u64;
762        }
763
764        // Write updated namespace meta
765        let ns_meta_path = ns_handle.root.join("_meta.json");
766        let file = File::create(&ns_meta_path)?;
767        let writer = BufWriter::new(file);
768        serde_json::to_writer_pretty(writer, &*ns_handle.meta.read())
769            .map_err(|e| NamespaceStorageError::IoError(e.to_string()))?;
770
771        // Remove directory (optional - could mark as deleted instead)
772        fs::remove_dir_all(&collection_dir)?;
773
774        Ok(())
775    }
776
777    /// Resolve (namespace, collection) to storage paths
778    ///
779    /// This is the main routing method, O(1) via hash-map lookup.
780    pub fn resolve(&self, namespace: &str, collection: &str) -> Result<Arc<CollectionHandle>> {
781        self.get_collection(namespace, collection)
782    }
783}
784
785// ============================================================================
786// Prefix Iteration Safety
787// ============================================================================
788
789/// Safe prefix iterator that guarantees all keys match the prefix
790///
791/// This is the correct primitive for namespace-scoped iteration.
792/// Unlike range scans, this cannot accidentally include keys from other namespaces.
793pub struct PrefixIterator<I> {
794    inner: I,
795    prefix: Vec<u8>,
796    exhausted: bool,
797}
798
799impl<I> PrefixIterator<I> {
800    /// Create a new prefix iterator
801    pub fn new(inner: I, prefix: Vec<u8>) -> Self {
802        Self {
803            inner,
804            prefix,
805            exhausted: false,
806        }
807    }
808
809    /// Get the prefix
810    pub fn prefix(&self) -> &[u8] {
811        &self.prefix
812    }
813}
814
815impl<I, K, V> Iterator for PrefixIterator<I>
816where
817    I: Iterator<Item = (K, V)>,
818    K: AsRef<[u8]>,
819{
820    type Item = (K, V);
821
822    fn next(&mut self) -> Option<Self::Item> {
823        if self.exhausted {
824            return None;
825        }
826
827        match self.inner.next() {
828            Some((key, value)) => {
829                if key.as_ref().starts_with(&self.prefix) {
830                    Some((key, value))
831                } else {
832                    // Key doesn't match prefix - we're done
833                    // This is the safety guarantee: we stop at the first non-matching key
834                    self.exhausted = true;
835                    None
836                }
837            }
838            None => {
839                self.exhausted = true;
840                None
841            }
842        }
843    }
844}
845
846/// Compute the exclusive end key for a prefix scan
847///
848/// Given a prefix, returns the smallest key that is greater than all keys
849/// starting with that prefix. This enables efficient range scans.
850///
851/// # Example
852/// ```
853/// use sochdb_storage::namespace::next_prefix;
854/// assert_eq!(next_prefix(b"abc"), Some(b"abd".to_vec()));
855/// assert_eq!(next_prefix(b"\xff\xff"), None); // No successor
856/// ```
857pub fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
858    if prefix.is_empty() {
859        return None;
860    }
861
862    let mut result = prefix.to_vec();
863
864    // Find rightmost byte that isn't 0xFF
865    while let Some(&last) = result.last() {
866        if last == 0xFF {
867            result.pop();
868        } else {
869            // Increment this byte
870            *result.last_mut().unwrap() += 1;
871            return Some(result);
872        }
873    }
874
875    // All bytes were 0xFF - no successor exists
876    None
877}
878
879// ============================================================================
880// Tests
881// ============================================================================
882
883#[cfg(test)]
884mod tests {
885    use super::*;
886    use tempfile::TempDir;
887
888    fn setup_router() -> (TempDir, NamespaceRouter) {
889        let temp_dir = TempDir::new().unwrap();
890        let router = NamespaceRouter::new(temp_dir.path()).unwrap();
891        (temp_dir, router)
892    }
893
894    #[test]
895    fn test_create_namespace() {
896        let (_temp, router) = setup_router();
897
898        let meta = NamespaceMeta::new("tenant_a")
899            .with_display_name("Tenant A")
900            .with_labels([("env".to_string(), "production".to_string())].into());
901
902        let handle = router.create_namespace(meta).unwrap();
903        assert_eq!(handle.name, "tenant_a");
904        assert!(handle.root.exists());
905        assert!(handle.collections_dir().exists());
906    }
907
908    #[test]
909    fn test_namespace_already_exists() {
910        let (_temp, router) = setup_router();
911
912        router
913            .create_namespace(NamespaceMeta::new("tenant_a"))
914            .unwrap();
915
916        let result = router.create_namespace(NamespaceMeta::new("tenant_a"));
917        assert!(matches!(
918            result,
919            Err(NamespaceStorageError::AlreadyExists(_))
920        ));
921    }
922
923    #[test]
924    fn test_get_namespace() {
925        let (_temp, router) = setup_router();
926
927        router
928            .create_namespace(NamespaceMeta::new("tenant_a"))
929            .unwrap();
930
931        let handle = router.get_namespace("tenant_a").unwrap();
932        assert_eq!(handle.name, "tenant_a");
933    }
934
935    #[test]
936    fn test_namespace_not_found() {
937        let (_temp, router) = setup_router();
938
939        let result = router.get_namespace("nonexistent");
940        assert!(matches!(result, Err(NamespaceStorageError::NotFound(_))));
941    }
942
943    #[test]
944    fn test_list_namespaces() {
945        let (_temp, router) = setup_router();
946
947        router
948            .create_namespace(NamespaceMeta::new("tenant_a"))
949            .unwrap();
950        router
951            .create_namespace(NamespaceMeta::new("tenant_b"))
952            .unwrap();
953
954        let mut namespaces = router.list_namespaces();
955        namespaces.sort();
956
957        assert_eq!(namespaces, vec!["tenant_a", "tenant_b"]);
958    }
959
960    #[test]
961    fn test_delete_namespace() {
962        let (_temp, router) = setup_router();
963
964        router
965            .create_namespace(NamespaceMeta::new("tenant_a"))
966            .unwrap();
967        router.delete_namespace("tenant_a").unwrap();
968
969        let namespaces = router.list_namespaces();
970        assert!(!namespaces.contains(&"tenant_a".to_string()));
971    }
972
973    #[test]
974    fn test_create_collection() {
975        let (_temp, router) = setup_router();
976
977        router
978            .create_namespace(NamespaceMeta::new("tenant_a"))
979            .unwrap();
980
981        let config = CollectionConfig {
982            name: "documents".to_string(),
983            dimension: Some(384),
984            metric: DistanceMetric::Cosine,
985            index_config: IndexConfig::default(),
986            created_at: 0,
987            frozen: false,
988        };
989
990        let handle = router.create_collection("tenant_a", config).unwrap();
991        assert_eq!(handle.name, "documents");
992        assert!(handle.root.exists());
993        assert!(handle.config.frozen); // Should be frozen after creation
994    }
995
996    #[test]
997    fn test_get_collection() {
998        let (_temp, router) = setup_router();
999
1000        router
1001            .create_namespace(NamespaceMeta::new("tenant_a"))
1002            .unwrap();
1003
1004        let config = CollectionConfig {
1005            name: "documents".to_string(),
1006            dimension: Some(384),
1007            metric: DistanceMetric::Cosine,
1008            index_config: IndexConfig::default(),
1009            created_at: 0,
1010            frozen: false,
1011        };
1012
1013        router.create_collection("tenant_a", config).unwrap();
1014
1015        let handle = router.get_collection("tenant_a", "documents").unwrap();
1016        assert_eq!(handle.name, "documents");
1017        assert_eq!(handle.namespace, "tenant_a");
1018    }
1019
1020    #[test]
1021    fn test_resolve() {
1022        let (_temp, router) = setup_router();
1023
1024        router
1025            .create_namespace(NamespaceMeta::new("tenant_a"))
1026            .unwrap();
1027
1028        let config = CollectionConfig {
1029            name: "documents".to_string(),
1030            dimension: Some(384),
1031            metric: DistanceMetric::Cosine,
1032            index_config: IndexConfig::default(),
1033            created_at: 0,
1034            frozen: false,
1035        };
1036
1037        router.create_collection("tenant_a", config).unwrap();
1038
1039        // O(1) resolution
1040        let handle = router.resolve("tenant_a", "documents").unwrap();
1041        assert_eq!(handle.vectors_path(), handle.root.join("vectors.idx"));
1042    }
1043
1044    #[test]
1045    fn test_invalid_namespace_names() {
1046        let (_temp, router) = setup_router();
1047
1048        // Empty name
1049        assert!(router.create_namespace(NamespaceMeta::new("")).is_err());
1050
1051        // Starts with hyphen
1052        assert!(router.create_namespace(NamespaceMeta::new("-bad")).is_err());
1053
1054        // Contains space
1055        assert!(
1056            router
1057                .create_namespace(NamespaceMeta::new("bad name"))
1058                .is_err()
1059        );
1060
1061        // Reserved name
1062        assert!(
1063            router
1064                .create_namespace(NamespaceMeta::new("_global"))
1065                .is_err()
1066        );
1067    }
1068
1069    #[test]
1070    fn test_next_prefix() {
1071        assert_eq!(next_prefix(b"abc"), Some(b"abd".to_vec()));
1072        assert_eq!(next_prefix(b"ab\xff"), Some(b"ac".to_vec()));
1073        assert_eq!(next_prefix(b"\xff\xff\xff"), None);
1074        assert_eq!(next_prefix(b""), None);
1075        assert_eq!(next_prefix(b"tenant_a/"), Some(b"tenant_a0".to_vec()));
1076    }
1077
1078    #[test]
1079    fn test_prefix_iterator() {
1080        let data = vec![
1081            (b"tenant_a/doc1".to_vec(), b"v1".to_vec()),
1082            (b"tenant_a/doc2".to_vec(), b"v2".to_vec()),
1083            (b"tenant_b/doc1".to_vec(), b"v3".to_vec()), // Should not be returned
1084        ];
1085
1086        let iter = PrefixIterator::new(data.into_iter(), b"tenant_a/".to_vec());
1087        let results: Vec<_> = iter.collect();
1088
1089        assert_eq!(results.len(), 2);
1090        assert!(results[0].0.starts_with(b"tenant_a/"));
1091        assert!(results[1].0.starts_with(b"tenant_a/"));
1092    }
1093}