sochdb_storage/
namespace.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Lightweight Namespace Routing + On-Disk Layout (Task 3)
16//!
17//! This module provides physical namespace isolation with:
18//! - Namespace registry (`_namespaces.meta`)
19//! - Per-namespace directory layout
20//! - Database router for (namespace, collection) → storage handles
21//!
22//! ## On-Disk Layout
23//!
24//! ```text
25//! data/
26//! ├── _namespaces.meta          # Registry of all namespaces
27//! ├── _global/                  # Shared metadata
28//! │   └── catalog.db
29//! └── namespaces/
30//!     ├── tenant_a/
31//!     │   ├── _meta.json        # Namespace metadata
32//!     │   ├── collections/
33//!     │   │   ├── docs/
34//!     │   │   │   ├── vectors.idx
35//!     │   │   │   └── data.db
36//!     │   │   └── images/
37//!     │   │       ├── vectors.idx
38//!     │   │       └── data.db
39//!     │   └── kv/                # Key-value storage
40//!     │       └── data.db
41//!     └── tenant_b/
42//!         └── ...
43//! ```
44//!
45//! ## Name Resolution
46//!
47//! Resolution is O(1) via hash-map lookup:
48//!
49//! ```text
50//! (namespace="tenant_a", collection="docs")
51//!     → NamespaceHandle { root: "data/namespaces/tenant_a" }
52//!         → CollectionHandle { path: "data/namespaces/tenant_a/collections/docs" }
53//! ```
54
55use std::collections::HashMap;
56use std::fs::{self, File};
57use std::io::{BufReader, BufWriter};
58use std::path::{Path, PathBuf};
59use std::sync::Arc;
60use std::time::{SystemTime, UNIX_EPOCH};
61
62use parking_lot::RwLock;
63use serde::{Deserialize, Serialize};
64
65// ============================================================================
66// Namespace Error Types
67// ============================================================================
68
69/// Errors related to namespace operations
70#[derive(Debug, Clone, thiserror::Error)]
71pub enum NamespaceStorageError {
72    #[error("namespace not found: {0}")]
73    NotFound(String),
74
75    #[error("namespace already exists: {0}")]
76    AlreadyExists(String),
77
78    #[error("invalid namespace name: {0}")]
79    InvalidName(String),
80
81    #[error("collection not found: {namespace}/{collection}")]
82    CollectionNotFound { namespace: String, collection: String },
83
84    #[error("collection already exists: {namespace}/{collection}")]
85    CollectionAlreadyExists { namespace: String, collection: String },
86
87    #[error("storage I/O error: {0}")]
88    IoError(String),
89
90    #[error("namespace is read-only: {0}")]
91    ReadOnly(String),
92
93    #[error("namespace registry corrupted: {0}")]
94    RegistryCorrupted(String),
95}
96
97impl From<std::io::Error> for NamespaceStorageError {
98    fn from(e: std::io::Error) -> Self {
99        NamespaceStorageError::IoError(e.to_string())
100    }
101}
102
103pub type Result<T> = std::result::Result<T, NamespaceStorageError>;
104
105// ============================================================================
106// Namespace Metadata
107// ============================================================================
108
109/// Metadata for a namespace (stored in _meta.json)
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct NamespaceMeta {
112    /// Unique namespace identifier
113    pub name: String,
114    
115    /// Human-readable display name
116    pub display_name: Option<String>,
117    
118    /// Creation timestamp (Unix epoch millis)
119    pub created_at: u64,
120    
121    /// Last modified timestamp (Unix epoch millis)
122    pub updated_at: u64,
123    
124    /// Whether the namespace is read-only
125    pub read_only: bool,
126    
127    /// Custom metadata/labels
128    pub labels: HashMap<String, String>,
129    
130    /// Collections in this namespace
131    #[serde(default)]
132    pub collections: Vec<String>,
133}
134
135impl NamespaceMeta {
136    /// Create a new namespace metadata
137    pub fn new(name: impl Into<String>) -> Self {
138        let now = SystemTime::now()
139            .duration_since(UNIX_EPOCH)
140            .unwrap_or_default()
141            .as_millis() as u64;
142        
143        Self {
144            name: name.into(),
145            display_name: None,
146            created_at: now,
147            updated_at: now,
148            read_only: false,
149            labels: HashMap::new(),
150            collections: Vec::new(),
151        }
152    }
153    
154    /// Set display name
155    pub fn with_display_name(mut self, name: impl Into<String>) -> Self {
156        self.display_name = Some(name.into());
157        self
158    }
159    
160    /// Set labels
161    pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
162        self.labels = labels;
163        self
164    }
165}
166
167/// Namespace registry (stored in _namespaces.meta)
168#[derive(Debug, Clone, Serialize, Deserialize, Default)]
169pub struct NamespaceRegistry {
170    /// Version for forward compatibility
171    pub version: u32,
172    
173    /// List of registered namespaces
174    pub namespaces: Vec<NamespaceEntry>,
175}
176
177/// Entry in the namespace registry
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct NamespaceEntry {
180    /// Namespace name (directory name)
181    pub name: String,
182    
183    /// Creation timestamp
184    pub created_at: u64,
185    
186    /// Whether the namespace is active
187    pub active: bool,
188}
189
190// ============================================================================
191// Collection Config
192// ============================================================================
193
194/// Collection configuration (stored in collection directory)
195#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct CollectionConfig {
197    /// Collection name
198    pub name: String,
199    
200    /// Vector dimension (if applicable)
201    pub dimension: Option<usize>,
202    
203    /// Distance metric
204    pub metric: DistanceMetric,
205    
206    /// Index configuration
207    pub index_config: IndexConfig,
208    
209    /// Creation timestamp
210    pub created_at: u64,
211    
212    /// Whether the config is frozen (immutable after creation)
213    pub frozen: bool,
214}
215
216/// Distance metric for vector similarity
217#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
218pub enum DistanceMetric {
219    #[default]
220    Cosine,
221    Euclidean,
222    DotProduct,
223}
224
225/// Index configuration
226#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct IndexConfig {
228    /// HNSW M parameter
229    pub m: usize,
230    
231    /// HNSW ef_construction parameter
232    pub ef_construction: usize,
233    
234    /// Enable quantization
235    pub quantization: Option<QuantizationType>,
236}
237
238impl Default for IndexConfig {
239    fn default() -> Self {
240        Self {
241            m: 16,
242            ef_construction: 100,
243            quantization: None,
244        }
245    }
246}
247
248/// Quantization type
249#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
250pub enum QuantizationType {
251    Scalar,  // int8 quantization
252    PQ,      // Product quantization
253}
254
255// ============================================================================
256// Namespace Handle
257// ============================================================================
258
259/// Handle to a namespace's storage
260#[derive(Debug, Clone)]
261pub struct NamespaceHandle {
262    /// Namespace name
263    pub name: String,
264    
265    /// Root directory for this namespace
266    pub root: PathBuf,
267    
268    /// Metadata
269    pub meta: Arc<RwLock<NamespaceMeta>>,
270}
271
272impl NamespaceHandle {
273    /// Get the collections directory
274    pub fn collections_dir(&self) -> PathBuf {
275        self.root.join("collections")
276    }
277    
278    /// Get the key-value storage directory
279    pub fn kv_dir(&self) -> PathBuf {
280        self.root.join("kv")
281    }
282    
283    /// Get a collection path
284    pub fn collection_path(&self, collection: &str) -> PathBuf {
285        self.collections_dir().join(collection)
286    }
287    
288    /// Check if collection exists
289    pub fn has_collection(&self, collection: &str) -> bool {
290        self.collection_path(collection).exists()
291    }
292    
293    /// List collections
294    pub fn list_collections(&self) -> Result<Vec<String>> {
295        let collections_dir = self.collections_dir();
296        if !collections_dir.exists() {
297            return Ok(Vec::new());
298        }
299        
300        let mut collections = Vec::new();
301        for entry in fs::read_dir(&collections_dir)? {
302            let entry = entry?;
303            if entry.file_type()?.is_dir() {
304                if let Some(name) = entry.file_name().to_str() {
305                    collections.push(name.to_string());
306                }
307            }
308        }
309        Ok(collections)
310    }
311}
312
313/// Handle to a collection's storage
314#[derive(Debug, Clone)]
315pub struct CollectionHandle {
316    /// Namespace this collection belongs to
317    pub namespace: String,
318    
319    /// Collection name
320    pub name: String,
321    
322    /// Root directory for this collection
323    pub root: PathBuf,
324    
325    /// Configuration (immutable after creation)
326    pub config: Arc<CollectionConfig>,
327}
328
329impl CollectionHandle {
330    /// Get the vector index path
331    pub fn vectors_path(&self) -> PathBuf {
332        self.root.join("vectors.idx")
333    }
334    
335    /// Get the data storage path
336    pub fn data_path(&self) -> PathBuf {
337        self.root.join("data.db")
338    }
339    
340    /// Get the metadata index path
341    pub fn metadata_path(&self) -> PathBuf {
342        self.root.join("metadata.idx")
343    }
344    
345    /// Get the tombstones path
346    pub fn tombstones_path(&self) -> PathBuf {
347        self.root.join("tombstones.bin")
348    }
349}
350
351// ============================================================================
352// Namespace Router (Main Interface)
353// ============================================================================
354
355/// Database router for namespace resolution
356///
357/// Resolves (namespace, collection) → storage handles with O(1) lookup.
358pub struct NamespaceRouter {
359    /// Base data directory
360    data_dir: PathBuf,
361    
362    /// Namespace registry (cached)
363    registry: RwLock<NamespaceRegistry>,
364    
365    /// Loaded namespace handles (cache)
366    namespaces: RwLock<HashMap<String, Arc<NamespaceHandle>>>,
367    
368    /// Loaded collection handles (cache)
369    collections: RwLock<HashMap<(String, String), Arc<CollectionHandle>>>,
370}
371
372impl NamespaceRouter {
373    /// Create a new namespace router
374    pub fn new(data_dir: impl AsRef<Path>) -> Result<Self> {
375        let data_dir = data_dir.as_ref().to_path_buf();
376        
377        // Ensure directories exist
378        fs::create_dir_all(&data_dir)?;
379        fs::create_dir_all(data_dir.join("namespaces"))?;
380        fs::create_dir_all(data_dir.join("_global"))?;
381        
382        // Load or create registry
383        let registry_path = data_dir.join("_namespaces.meta");
384        let registry = if registry_path.exists() {
385            let file = File::open(&registry_path)?;
386            let reader = BufReader::new(file);
387            serde_json::from_reader(reader)
388                .map_err(|e| NamespaceStorageError::RegistryCorrupted(e.to_string()))?
389        } else {
390            NamespaceRegistry {
391                version: 1,
392                namespaces: Vec::new(),
393            }
394        };
395        
396        Ok(Self {
397            data_dir,
398            registry: RwLock::new(registry),
399            namespaces: RwLock::new(HashMap::new()),
400            collections: RwLock::new(HashMap::new()),
401        })
402    }
403    
404    /// Save the registry to disk
405    fn save_registry(&self) -> Result<()> {
406        let registry_path = self.data_dir.join("_namespaces.meta");
407        let file = File::create(&registry_path)?;
408        let writer = BufWriter::new(file);
409        serde_json::to_writer_pretty(writer, &*self.registry.read())
410            .map_err(|e| NamespaceStorageError::IoError(e.to_string()))?;
411        Ok(())
412    }
413    
414    // ========================================================================
415    // Namespace Operations
416    // ========================================================================
417    
418    /// Create a new namespace
419    pub fn create_namespace(&self, meta: NamespaceMeta) -> Result<Arc<NamespaceHandle>> {
420        let name = meta.name.clone();
421        
422        // Validate name
423        Self::validate_namespace_name(&name)?;
424        
425        // Check if already exists
426        {
427            let registry = self.registry.read();
428            if registry.namespaces.iter().any(|e| e.name == name) {
429                return Err(NamespaceStorageError::AlreadyExists(name));
430            }
431        }
432        
433        // Create directory structure
434        let namespace_dir = self.data_dir.join("namespaces").join(&name);
435        fs::create_dir_all(&namespace_dir)?;
436        fs::create_dir_all(namespace_dir.join("collections"))?;
437        fs::create_dir_all(namespace_dir.join("kv"))?;
438        
439        // Write namespace metadata
440        let meta_path = namespace_dir.join("_meta.json");
441        let file = File::create(&meta_path)?;
442        let writer = BufWriter::new(file);
443        serde_json::to_writer_pretty(writer, &meta)
444            .map_err(|e| NamespaceStorageError::IoError(e.to_string()))?;
445        
446        // Update registry
447        {
448            let mut registry = self.registry.write();
449            registry.namespaces.push(NamespaceEntry {
450                name: name.clone(),
451                created_at: meta.created_at,
452                active: true,
453            });
454        }
455        self.save_registry()?;
456        
457        // Create and cache handle
458        let handle = Arc::new(NamespaceHandle {
459            name: name.clone(),
460            root: namespace_dir,
461            meta: Arc::new(RwLock::new(meta)),
462        });
463        
464        self.namespaces.write().insert(name, handle.clone());
465        
466        Ok(handle)
467    }
468    
469    /// Get a namespace handle
470    pub fn get_namespace(&self, name: &str) -> Result<Arc<NamespaceHandle>> {
471        // Check cache first
472        if let Some(handle) = self.namespaces.read().get(name) {
473            return Ok(handle.clone());
474        }
475        
476        // Check registry
477        {
478            let registry = self.registry.read();
479            if !registry.namespaces.iter().any(|e| e.name == name && e.active) {
480                return Err(NamespaceStorageError::NotFound(name.to_string()));
481            }
482        }
483        
484        // Load from disk
485        let namespace_dir = self.data_dir.join("namespaces").join(name);
486        if !namespace_dir.exists() {
487            return Err(NamespaceStorageError::NotFound(name.to_string()));
488        }
489        
490        let meta_path = namespace_dir.join("_meta.json");
491        let meta: NamespaceMeta = if meta_path.exists() {
492            let file = File::open(&meta_path)?;
493            let reader = BufReader::new(file);
494            serde_json::from_reader(reader)
495                .map_err(|e| NamespaceStorageError::RegistryCorrupted(e.to_string()))?
496        } else {
497            NamespaceMeta::new(name)
498        };
499        
500        let handle = Arc::new(NamespaceHandle {
501            name: name.to_string(),
502            root: namespace_dir,
503            meta: Arc::new(RwLock::new(meta)),
504        });
505        
506        self.namespaces.write().insert(name.to_string(), handle.clone());
507        
508        Ok(handle)
509    }
510    
511    /// List all namespaces
512    pub fn list_namespaces(&self) -> Vec<String> {
513        self.registry
514            .read()
515            .namespaces
516            .iter()
517            .filter(|e| e.active)
518            .map(|e| e.name.clone())
519            .collect()
520    }
521    
522    /// Delete a namespace (marks as inactive, doesn't delete files)
523    pub fn delete_namespace(&self, name: &str) -> Result<()> {
524        {
525            let mut registry = self.registry.write();
526            let entry = registry
527                .namespaces
528                .iter_mut()
529                .find(|e| e.name == name)
530                .ok_or_else(|| NamespaceStorageError::NotFound(name.to_string()))?;
531            entry.active = false;
532        }
533        
534        self.save_registry()?;
535        self.namespaces.write().remove(name);
536        
537        // Remove cached collections for this namespace
538        self.collections
539            .write()
540            .retain(|(ns, _), _| ns != name);
541        
542        Ok(())
543    }
544    
545    /// Validate namespace name
546    fn validate_namespace_name(name: &str) -> Result<()> {
547        if name.is_empty() {
548            return Err(NamespaceStorageError::InvalidName(
549                "namespace name cannot be empty".to_string(),
550            ));
551        }
552        
553        if name.len() > 256 {
554            return Err(NamespaceStorageError::InvalidName(
555                "namespace name too long (max 256 chars)".to_string(),
556            ));
557        }
558        
559        // Must start with alphanumeric
560        let first = name.chars().next().unwrap();
561        if !first.is_alphanumeric() {
562            return Err(NamespaceStorageError::InvalidName(format!(
563                "namespace name must start with alphanumeric, got '{}'",
564                first
565            )));
566        }
567        
568        // Only allow alphanumeric, underscore, hyphen, dot
569        for ch in name.chars() {
570            if !ch.is_alphanumeric() && ch != '_' && ch != '-' && ch != '.' {
571                return Err(NamespaceStorageError::InvalidName(format!(
572                    "invalid character '{}' in namespace name",
573                    ch
574                )));
575            }
576        }
577        
578        // Reserved names
579        let reserved = ["_global", "_namespaces", "_meta", "_system"];
580        if reserved.contains(&name) {
581            return Err(NamespaceStorageError::InvalidName(format!(
582                "'{}' is a reserved name",
583                name
584            )));
585        }
586        
587        Ok(())
588    }
589    
590    // ========================================================================
591    // Collection Operations
592    // ========================================================================
593    
594    /// Create a collection in a namespace
595    pub fn create_collection(
596        &self,
597        namespace: &str,
598        config: CollectionConfig,
599    ) -> Result<Arc<CollectionHandle>> {
600        let ns_handle = self.get_namespace(namespace)?;
601        
602        // Check if namespace is read-only
603        if ns_handle.meta.read().read_only {
604            return Err(NamespaceStorageError::ReadOnly(namespace.to_string()));
605        }
606        
607        let collection_name = config.name.clone();
608        let collection_dir = ns_handle.collection_path(&collection_name);
609        
610        // Check if already exists
611        if collection_dir.exists() {
612            return Err(NamespaceStorageError::CollectionAlreadyExists {
613                namespace: namespace.to_string(),
614                collection: collection_name,
615            });
616        }
617        
618        // Create directory
619        fs::create_dir_all(&collection_dir)?;
620        
621        // Freeze config and write
622        let frozen_config = CollectionConfig {
623            frozen: true,
624            ..config
625        };
626        
627        let config_path = collection_dir.join("config.json");
628        let file = File::create(&config_path)?;
629        let writer = BufWriter::new(file);
630        serde_json::to_writer_pretty(writer, &frozen_config)
631            .map_err(|e| NamespaceStorageError::IoError(e.to_string()))?;
632        
633        // Update namespace metadata
634        {
635            let mut meta = ns_handle.meta.write();
636            meta.collections.push(collection_name.clone());
637            meta.updated_at = SystemTime::now()
638                .duration_since(UNIX_EPOCH)
639                .unwrap_or_default()
640                .as_millis() as u64;
641        }
642        
643        // Write updated namespace meta
644        let ns_meta_path = ns_handle.root.join("_meta.json");
645        let file = File::create(&ns_meta_path)?;
646        let writer = BufWriter::new(file);
647        serde_json::to_writer_pretty(writer, &*ns_handle.meta.read())
648            .map_err(|e| NamespaceStorageError::IoError(e.to_string()))?;
649        
650        // Create and cache handle
651        let handle = Arc::new(CollectionHandle {
652            namespace: namespace.to_string(),
653            name: collection_name.clone(),
654            root: collection_dir,
655            config: Arc::new(frozen_config),
656        });
657        
658        self.collections
659            .write()
660            .insert((namespace.to_string(), collection_name), handle.clone());
661        
662        Ok(handle)
663    }
664    
665    /// Get a collection handle
666    pub fn get_collection(
667        &self,
668        namespace: &str,
669        collection: &str,
670    ) -> Result<Arc<CollectionHandle>> {
671        let key = (namespace.to_string(), collection.to_string());
672        
673        // Check cache
674        if let Some(handle) = self.collections.read().get(&key) {
675            return Ok(handle.clone());
676        }
677        
678        let ns_handle = self.get_namespace(namespace)?;
679        let collection_dir = ns_handle.collection_path(collection);
680        
681        if !collection_dir.exists() {
682            return Err(NamespaceStorageError::CollectionNotFound {
683                namespace: namespace.to_string(),
684                collection: collection.to_string(),
685            });
686        }
687        
688        // Load config
689        let config_path = collection_dir.join("config.json");
690        let config: CollectionConfig = if config_path.exists() {
691            let file = File::open(&config_path)?;
692            let reader = BufReader::new(file);
693            serde_json::from_reader(reader)
694                .map_err(|e| NamespaceStorageError::RegistryCorrupted(e.to_string()))?
695        } else {
696            // Default config for legacy collections
697            CollectionConfig {
698                name: collection.to_string(),
699                dimension: None,
700                metric: DistanceMetric::Cosine,
701                index_config: IndexConfig::default(),
702                created_at: 0,
703                frozen: true,
704            }
705        };
706        
707        let handle = Arc::new(CollectionHandle {
708            namespace: namespace.to_string(),
709            name: collection.to_string(),
710            root: collection_dir,
711            config: Arc::new(config),
712        });
713        
714        self.collections.write().insert(key, handle.clone());
715        
716        Ok(handle)
717    }
718    
719    /// Delete a collection
720    pub fn delete_collection(&self, namespace: &str, collection: &str) -> Result<()> {
721        let ns_handle = self.get_namespace(namespace)?;
722        
723        // Check if namespace is read-only
724        if ns_handle.meta.read().read_only {
725            return Err(NamespaceStorageError::ReadOnly(namespace.to_string()));
726        }
727        
728        let collection_dir = ns_handle.collection_path(collection);
729        if !collection_dir.exists() {
730            return Err(NamespaceStorageError::CollectionNotFound {
731                namespace: namespace.to_string(),
732                collection: collection.to_string(),
733            });
734        }
735        
736        // Remove from cache
737        self.collections
738            .write()
739            .remove(&(namespace.to_string(), collection.to_string()));
740        
741        // Update namespace metadata
742        {
743            let mut meta = ns_handle.meta.write();
744            meta.collections.retain(|c| c != collection);
745            meta.updated_at = SystemTime::now()
746                .duration_since(UNIX_EPOCH)
747                .unwrap_or_default()
748                .as_millis() as u64;
749        }
750        
751        // Write updated namespace meta
752        let ns_meta_path = ns_handle.root.join("_meta.json");
753        let file = File::create(&ns_meta_path)?;
754        let writer = BufWriter::new(file);
755        serde_json::to_writer_pretty(writer, &*ns_handle.meta.read())
756            .map_err(|e| NamespaceStorageError::IoError(e.to_string()))?;
757        
758        // Remove directory (optional - could mark as deleted instead)
759        fs::remove_dir_all(&collection_dir)?;
760        
761        Ok(())
762    }
763    
764    /// Resolve (namespace, collection) to storage paths
765    ///
766    /// This is the main routing method, O(1) via hash-map lookup.
767    pub fn resolve(
768        &self,
769        namespace: &str,
770        collection: &str,
771    ) -> Result<Arc<CollectionHandle>> {
772        self.get_collection(namespace, collection)
773    }
774}
775
776// ============================================================================
777// Prefix Iteration Safety
778// ============================================================================
779
780/// Safe prefix iterator that guarantees all keys match the prefix
781///
782/// This is the correct primitive for namespace-scoped iteration.
783/// Unlike range scans, this cannot accidentally include keys from other namespaces.
784pub struct PrefixIterator<I> {
785    inner: I,
786    prefix: Vec<u8>,
787    exhausted: bool,
788}
789
790impl<I> PrefixIterator<I> {
791    /// Create a new prefix iterator
792    pub fn new(inner: I, prefix: Vec<u8>) -> Self {
793        Self {
794            inner,
795            prefix,
796            exhausted: false,
797        }
798    }
799    
800    /// Get the prefix
801    pub fn prefix(&self) -> &[u8] {
802        &self.prefix
803    }
804}
805
806impl<I, K, V> Iterator for PrefixIterator<I>
807where
808    I: Iterator<Item = (K, V)>,
809    K: AsRef<[u8]>,
810{
811    type Item = (K, V);
812    
813    fn next(&mut self) -> Option<Self::Item> {
814        if self.exhausted {
815            return None;
816        }
817        
818        match self.inner.next() {
819            Some((key, value)) => {
820                if key.as_ref().starts_with(&self.prefix) {
821                    Some((key, value))
822                } else {
823                    // Key doesn't match prefix - we're done
824                    // This is the safety guarantee: we stop at the first non-matching key
825                    self.exhausted = true;
826                    None
827                }
828            }
829            None => {
830                self.exhausted = true;
831                None
832            }
833        }
834    }
835}
836
837/// Compute the exclusive end key for a prefix scan
838///
839/// Given a prefix, returns the smallest key that is greater than all keys
840/// starting with that prefix. This enables efficient range scans.
841///
842/// # Example
843/// ```
844/// use sochdb_storage::namespace::next_prefix;
845/// assert_eq!(next_prefix(b"abc"), Some(b"abd".to_vec()));
846/// assert_eq!(next_prefix(b"\xff\xff"), None); // No successor
847/// ```
848pub fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
849    if prefix.is_empty() {
850        return None;
851    }
852    
853    let mut result = prefix.to_vec();
854    
855    // Find rightmost byte that isn't 0xFF
856    while let Some(&last) = result.last() {
857        if last == 0xFF {
858            result.pop();
859        } else {
860            // Increment this byte
861            *result.last_mut().unwrap() += 1;
862            return Some(result);
863        }
864    }
865    
866    // All bytes were 0xFF - no successor exists
867    None
868}
869
870// ============================================================================
871// Tests
872// ============================================================================
873
874#[cfg(test)]
875mod tests {
876    use super::*;
877    use tempfile::TempDir;
878    
879    fn setup_router() -> (TempDir, NamespaceRouter) {
880        let temp_dir = TempDir::new().unwrap();
881        let router = NamespaceRouter::new(temp_dir.path()).unwrap();
882        (temp_dir, router)
883    }
884    
885    #[test]
886    fn test_create_namespace() {
887        let (_temp, router) = setup_router();
888        
889        let meta = NamespaceMeta::new("tenant_a")
890            .with_display_name("Tenant A")
891            .with_labels([("env".to_string(), "production".to_string())].into());
892        
893        let handle = router.create_namespace(meta).unwrap();
894        assert_eq!(handle.name, "tenant_a");
895        assert!(handle.root.exists());
896        assert!(handle.collections_dir().exists());
897    }
898    
899    #[test]
900    fn test_namespace_already_exists() {
901        let (_temp, router) = setup_router();
902        
903        router.create_namespace(NamespaceMeta::new("tenant_a")).unwrap();
904        
905        let result = router.create_namespace(NamespaceMeta::new("tenant_a"));
906        assert!(matches!(result, Err(NamespaceStorageError::AlreadyExists(_))));
907    }
908    
909    #[test]
910    fn test_get_namespace() {
911        let (_temp, router) = setup_router();
912        
913        router.create_namespace(NamespaceMeta::new("tenant_a")).unwrap();
914        
915        let handle = router.get_namespace("tenant_a").unwrap();
916        assert_eq!(handle.name, "tenant_a");
917    }
918    
919    #[test]
920    fn test_namespace_not_found() {
921        let (_temp, router) = setup_router();
922        
923        let result = router.get_namespace("nonexistent");
924        assert!(matches!(result, Err(NamespaceStorageError::NotFound(_))));
925    }
926    
927    #[test]
928    fn test_list_namespaces() {
929        let (_temp, router) = setup_router();
930        
931        router.create_namespace(NamespaceMeta::new("tenant_a")).unwrap();
932        router.create_namespace(NamespaceMeta::new("tenant_b")).unwrap();
933        
934        let mut namespaces = router.list_namespaces();
935        namespaces.sort();
936        
937        assert_eq!(namespaces, vec!["tenant_a", "tenant_b"]);
938    }
939    
940    #[test]
941    fn test_delete_namespace() {
942        let (_temp, router) = setup_router();
943        
944        router.create_namespace(NamespaceMeta::new("tenant_a")).unwrap();
945        router.delete_namespace("tenant_a").unwrap();
946        
947        let namespaces = router.list_namespaces();
948        assert!(!namespaces.contains(&"tenant_a".to_string()));
949    }
950    
951    #[test]
952    fn test_create_collection() {
953        let (_temp, router) = setup_router();
954        
955        router.create_namespace(NamespaceMeta::new("tenant_a")).unwrap();
956        
957        let config = CollectionConfig {
958            name: "documents".to_string(),
959            dimension: Some(384),
960            metric: DistanceMetric::Cosine,
961            index_config: IndexConfig::default(),
962            created_at: 0,
963            frozen: false,
964        };
965        
966        let handle = router.create_collection("tenant_a", config).unwrap();
967        assert_eq!(handle.name, "documents");
968        assert!(handle.root.exists());
969        assert!(handle.config.frozen); // Should be frozen after creation
970    }
971    
972    #[test]
973    fn test_get_collection() {
974        let (_temp, router) = setup_router();
975        
976        router.create_namespace(NamespaceMeta::new("tenant_a")).unwrap();
977        
978        let config = CollectionConfig {
979            name: "documents".to_string(),
980            dimension: Some(384),
981            metric: DistanceMetric::Cosine,
982            index_config: IndexConfig::default(),
983            created_at: 0,
984            frozen: false,
985        };
986        
987        router.create_collection("tenant_a", config).unwrap();
988        
989        let handle = router.get_collection("tenant_a", "documents").unwrap();
990        assert_eq!(handle.name, "documents");
991        assert_eq!(handle.namespace, "tenant_a");
992    }
993    
994    #[test]
995    fn test_resolve() {
996        let (_temp, router) = setup_router();
997        
998        router.create_namespace(NamespaceMeta::new("tenant_a")).unwrap();
999        
1000        let config = CollectionConfig {
1001            name: "documents".to_string(),
1002            dimension: Some(384),
1003            metric: DistanceMetric::Cosine,
1004            index_config: IndexConfig::default(),
1005            created_at: 0,
1006            frozen: false,
1007        };
1008        
1009        router.create_collection("tenant_a", config).unwrap();
1010        
1011        // O(1) resolution
1012        let handle = router.resolve("tenant_a", "documents").unwrap();
1013        assert_eq!(handle.vectors_path(), handle.root.join("vectors.idx"));
1014    }
1015    
1016    #[test]
1017    fn test_invalid_namespace_names() {
1018        let (_temp, router) = setup_router();
1019        
1020        // Empty name
1021        assert!(router.create_namespace(NamespaceMeta::new("")).is_err());
1022        
1023        // Starts with hyphen
1024        assert!(router.create_namespace(NamespaceMeta::new("-bad")).is_err());
1025        
1026        // Contains space
1027        assert!(router.create_namespace(NamespaceMeta::new("bad name")).is_err());
1028        
1029        // Reserved name
1030        assert!(router.create_namespace(NamespaceMeta::new("_global")).is_err());
1031    }
1032    
1033    #[test]
1034    fn test_next_prefix() {
1035        assert_eq!(next_prefix(b"abc"), Some(b"abd".to_vec()));
1036        assert_eq!(next_prefix(b"ab\xff"), Some(b"ac".to_vec()));
1037        assert_eq!(next_prefix(b"\xff\xff\xff"), None);
1038        assert_eq!(next_prefix(b""), None);
1039        assert_eq!(next_prefix(b"tenant_a/"), Some(b"tenant_a0".to_vec()));
1040    }
1041    
1042    #[test]
1043    fn test_prefix_iterator() {
1044        let data = vec![
1045            (b"tenant_a/doc1".to_vec(), b"v1".to_vec()),
1046            (b"tenant_a/doc2".to_vec(), b"v2".to_vec()),
1047            (b"tenant_b/doc1".to_vec(), b"v3".to_vec()), // Should not be returned
1048        ];
1049        
1050        let iter = PrefixIterator::new(data.into_iter(), b"tenant_a/".to_vec());
1051        let results: Vec<_> = iter.collect();
1052        
1053        assert_eq!(results.len(), 2);
1054        assert!(results[0].0.starts_with(b"tenant_a/"));
1055        assert!(results[1].0.starts_with(b"tenant_a/"));
1056    }
1057}