Skip to main content

sochdb_storage/
namespace.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Lightweight Namespace Routing + On-Disk Layout (Task 3)
19//!
20//! This module provides physical namespace isolation with:
21//! - Namespace registry (`_namespaces.meta`)
22//! - Per-namespace directory layout
23//! - Database router for (namespace, collection) → storage handles
24//!
25//! ## On-Disk Layout
26//!
27//! ```text
28//! data/
29//! ├── _namespaces.meta          # Registry of all namespaces
30//! ├── _global/                  # Shared metadata
31//! │   └── catalog.db
32//! └── namespaces/
33//!     ├── tenant_a/
34//!     │   ├── _meta.json        # Namespace metadata
35//!     │   ├── collections/
36//!     │   │   ├── docs/
37//!     │   │   │   ├── vectors.idx
38//!     │   │   │   └── data.db
39//!     │   │   └── images/
40//!     │   │       ├── vectors.idx
41//!     │   │       └── data.db
42//!     │   └── kv/                # Key-value storage
43//!     │       └── data.db
44//!     └── tenant_b/
45//!         └── ...
46//! ```
47//!
48//! ## Name Resolution
49//!
50//! Resolution is O(1) via hash-map lookup:
51//!
52//! ```text
53//! (namespace="tenant_a", collection="docs")
54//!     → NamespaceHandle { root: "data/namespaces/tenant_a" }
55//!         → CollectionHandle { path: "data/namespaces/tenant_a/collections/docs" }
56//! ```
57
58use std::collections::HashMap;
59use std::fs::{self, File};
60use std::io::{BufReader, BufWriter};
61use std::path::{Path, PathBuf};
62use std::sync::Arc;
63use std::time::{SystemTime, UNIX_EPOCH};
64
65use parking_lot::RwLock;
66use serde::{Deserialize, Serialize};
67
68// ============================================================================
69// Namespace Error Types
70// ============================================================================
71
72/// Errors related to namespace operations
73#[derive(Debug, Clone, thiserror::Error)]
74pub enum NamespaceStorageError {
75    #[error("namespace not found: {0}")]
76    NotFound(String),
77
78    #[error("namespace already exists: {0}")]
79    AlreadyExists(String),
80
81    #[error("invalid namespace name: {0}")]
82    InvalidName(String),
83
84    #[error("collection not found: {namespace}/{collection}")]
85    CollectionNotFound { namespace: String, collection: String },
86
87    #[error("collection already exists: {namespace}/{collection}")]
88    CollectionAlreadyExists { namespace: String, collection: String },
89
90    #[error("storage I/O error: {0}")]
91    IoError(String),
92
93    #[error("namespace is read-only: {0}")]
94    ReadOnly(String),
95
96    #[error("namespace registry corrupted: {0}")]
97    RegistryCorrupted(String),
98}
99
100impl From<std::io::Error> for NamespaceStorageError {
101    fn from(e: std::io::Error) -> Self {
102        NamespaceStorageError::IoError(e.to_string())
103    }
104}
105
106pub type Result<T> = std::result::Result<T, NamespaceStorageError>;
107
108// ============================================================================
109// Namespace Metadata
110// ============================================================================
111
112/// Metadata for a namespace (stored in _meta.json)
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct NamespaceMeta {
115    /// Unique namespace identifier
116    pub name: String,
117    
118    /// Human-readable display name
119    pub display_name: Option<String>,
120    
121    /// Creation timestamp (Unix epoch millis)
122    pub created_at: u64,
123    
124    /// Last modified timestamp (Unix epoch millis)
125    pub updated_at: u64,
126    
127    /// Whether the namespace is read-only
128    pub read_only: bool,
129    
130    /// Custom metadata/labels
131    pub labels: HashMap<String, String>,
132    
133    /// Collections in this namespace
134    #[serde(default)]
135    pub collections: Vec<String>,
136}
137
138impl NamespaceMeta {
139    /// Create a new namespace metadata
140    pub fn new(name: impl Into<String>) -> Self {
141        let now = SystemTime::now()
142            .duration_since(UNIX_EPOCH)
143            .unwrap_or_default()
144            .as_millis() as u64;
145        
146        Self {
147            name: name.into(),
148            display_name: None,
149            created_at: now,
150            updated_at: now,
151            read_only: false,
152            labels: HashMap::new(),
153            collections: Vec::new(),
154        }
155    }
156    
157    /// Set display name
158    pub fn with_display_name(mut self, name: impl Into<String>) -> Self {
159        self.display_name = Some(name.into());
160        self
161    }
162    
163    /// Set labels
164    pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
165        self.labels = labels;
166        self
167    }
168}
169
170/// Namespace registry (stored in _namespaces.meta)
171#[derive(Debug, Clone, Serialize, Deserialize, Default)]
172pub struct NamespaceRegistry {
173    /// Version for forward compatibility
174    pub version: u32,
175    
176    /// List of registered namespaces
177    pub namespaces: Vec<NamespaceEntry>,
178}
179
180/// Entry in the namespace registry
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct NamespaceEntry {
183    /// Namespace name (directory name)
184    pub name: String,
185    
186    /// Creation timestamp
187    pub created_at: u64,
188    
189    /// Whether the namespace is active
190    pub active: bool,
191}
192
193// ============================================================================
194// Collection Config
195// ============================================================================
196
197/// Collection configuration (stored in collection directory)
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct CollectionConfig {
200    /// Collection name
201    pub name: String,
202    
203    /// Vector dimension (if applicable)
204    pub dimension: Option<usize>,
205    
206    /// Distance metric
207    pub metric: DistanceMetric,
208    
209    /// Index configuration
210    pub index_config: IndexConfig,
211    
212    /// Creation timestamp
213    pub created_at: u64,
214    
215    /// Whether the config is frozen (immutable after creation)
216    pub frozen: bool,
217}
218
219/// Distance metric for vector similarity
220#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
221pub enum DistanceMetric {
222    #[default]
223    Cosine,
224    Euclidean,
225    DotProduct,
226}
227
228/// Index configuration
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct IndexConfig {
231    /// HNSW M parameter
232    pub m: usize,
233    
234    /// HNSW ef_construction parameter
235    pub ef_construction: usize,
236    
237    /// Enable quantization
238    pub quantization: Option<QuantizationType>,
239}
240
241impl Default for IndexConfig {
242    fn default() -> Self {
243        Self {
244            m: 16,
245            ef_construction: 100,
246            quantization: None,
247        }
248    }
249}
250
251/// Quantization type
252#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
253pub enum QuantizationType {
254    Scalar,  // int8 quantization
255    PQ,      // Product quantization
256}
257
258// ============================================================================
259// Namespace Handle
260// ============================================================================
261
262/// Handle to a namespace's storage
263#[derive(Debug, Clone)]
264pub struct NamespaceHandle {
265    /// Namespace name
266    pub name: String,
267    
268    /// Root directory for this namespace
269    pub root: PathBuf,
270    
271    /// Metadata
272    pub meta: Arc<RwLock<NamespaceMeta>>,
273}
274
275impl NamespaceHandle {
276    /// Get the collections directory
277    pub fn collections_dir(&self) -> PathBuf {
278        self.root.join("collections")
279    }
280    
281    /// Get the key-value storage directory
282    pub fn kv_dir(&self) -> PathBuf {
283        self.root.join("kv")
284    }
285    
286    /// Get a collection path
287    pub fn collection_path(&self, collection: &str) -> PathBuf {
288        self.collections_dir().join(collection)
289    }
290    
291    /// Check if collection exists
292    pub fn has_collection(&self, collection: &str) -> bool {
293        self.collection_path(collection).exists()
294    }
295    
296    /// List collections
297    pub fn list_collections(&self) -> Result<Vec<String>> {
298        let collections_dir = self.collections_dir();
299        if !collections_dir.exists() {
300            return Ok(Vec::new());
301        }
302        
303        let mut collections = Vec::new();
304        for entry in fs::read_dir(&collections_dir)? {
305            let entry = entry?;
306            if entry.file_type()?.is_dir() {
307                if let Some(name) = entry.file_name().to_str() {
308                    collections.push(name.to_string());
309                }
310            }
311        }
312        Ok(collections)
313    }
314}
315
316/// Handle to a collection's storage
317#[derive(Debug, Clone)]
318pub struct CollectionHandle {
319    /// Namespace this collection belongs to
320    pub namespace: String,
321    
322    /// Collection name
323    pub name: String,
324    
325    /// Root directory for this collection
326    pub root: PathBuf,
327    
328    /// Configuration (immutable after creation)
329    pub config: Arc<CollectionConfig>,
330}
331
332impl CollectionHandle {
333    /// Get the vector index path
334    pub fn vectors_path(&self) -> PathBuf {
335        self.root.join("vectors.idx")
336    }
337    
338    /// Get the data storage path
339    pub fn data_path(&self) -> PathBuf {
340        self.root.join("data.db")
341    }
342    
343    /// Get the metadata index path
344    pub fn metadata_path(&self) -> PathBuf {
345        self.root.join("metadata.idx")
346    }
347    
348    /// Get the tombstones path
349    pub fn tombstones_path(&self) -> PathBuf {
350        self.root.join("tombstones.bin")
351    }
352}
353
354// ============================================================================
355// Namespace Router (Main Interface)
356// ============================================================================
357
358/// Database router for namespace resolution
359///
360/// Resolves (namespace, collection) → storage handles with O(1) lookup.
361pub struct NamespaceRouter {
362    /// Base data directory
363    data_dir: PathBuf,
364    
365    /// Namespace registry (cached)
366    registry: RwLock<NamespaceRegistry>,
367    
368    /// Loaded namespace handles (cache)
369    namespaces: RwLock<HashMap<String, Arc<NamespaceHandle>>>,
370    
371    /// Loaded collection handles (cache)
372    collections: RwLock<HashMap<(String, String), Arc<CollectionHandle>>>,
373}
374
375impl NamespaceRouter {
376    /// Create a new namespace router
377    pub fn new(data_dir: impl AsRef<Path>) -> Result<Self> {
378        let data_dir = data_dir.as_ref().to_path_buf();
379        
380        // Ensure directories exist
381        fs::create_dir_all(&data_dir)?;
382        fs::create_dir_all(data_dir.join("namespaces"))?;
383        fs::create_dir_all(data_dir.join("_global"))?;
384        
385        // Load or create registry
386        let registry_path = data_dir.join("_namespaces.meta");
387        let registry = if registry_path.exists() {
388            let file = File::open(&registry_path)?;
389            let reader = BufReader::new(file);
390            serde_json::from_reader(reader)
391                .map_err(|e| NamespaceStorageError::RegistryCorrupted(e.to_string()))?
392        } else {
393            NamespaceRegistry {
394                version: 1,
395                namespaces: Vec::new(),
396            }
397        };
398        
399        Ok(Self {
400            data_dir,
401            registry: RwLock::new(registry),
402            namespaces: RwLock::new(HashMap::new()),
403            collections: RwLock::new(HashMap::new()),
404        })
405    }
406    
407    /// Save the registry to disk
408    fn save_registry(&self) -> Result<()> {
409        let registry_path = self.data_dir.join("_namespaces.meta");
410        let file = File::create(&registry_path)?;
411        let writer = BufWriter::new(file);
412        serde_json::to_writer_pretty(writer, &*self.registry.read())
413            .map_err(|e| NamespaceStorageError::IoError(e.to_string()))?;
414        Ok(())
415    }
416    
417    // ========================================================================
418    // Namespace Operations
419    // ========================================================================
420    
421    /// Create a new namespace
422    pub fn create_namespace(&self, meta: NamespaceMeta) -> Result<Arc<NamespaceHandle>> {
423        let name = meta.name.clone();
424        
425        // Validate name
426        Self::validate_namespace_name(&name)?;
427        
428        // Check if already exists
429        {
430            let registry = self.registry.read();
431            if registry.namespaces.iter().any(|e| e.name == name) {
432                return Err(NamespaceStorageError::AlreadyExists(name));
433            }
434        }
435        
436        // Create directory structure
437        let namespace_dir = self.data_dir.join("namespaces").join(&name);
438        fs::create_dir_all(&namespace_dir)?;
439        fs::create_dir_all(namespace_dir.join("collections"))?;
440        fs::create_dir_all(namespace_dir.join("kv"))?;
441        
442        // Write namespace metadata
443        let meta_path = namespace_dir.join("_meta.json");
444        let file = File::create(&meta_path)?;
445        let writer = BufWriter::new(file);
446        serde_json::to_writer_pretty(writer, &meta)
447            .map_err(|e| NamespaceStorageError::IoError(e.to_string()))?;
448        
449        // Update registry
450        {
451            let mut registry = self.registry.write();
452            registry.namespaces.push(NamespaceEntry {
453                name: name.clone(),
454                created_at: meta.created_at,
455                active: true,
456            });
457        }
458        self.save_registry()?;
459        
460        // Create and cache handle
461        let handle = Arc::new(NamespaceHandle {
462            name: name.clone(),
463            root: namespace_dir,
464            meta: Arc::new(RwLock::new(meta)),
465        });
466        
467        self.namespaces.write().insert(name, handle.clone());
468        
469        Ok(handle)
470    }
471    
472    /// Get a namespace handle
473    pub fn get_namespace(&self, name: &str) -> Result<Arc<NamespaceHandle>> {
474        // Check cache first
475        if let Some(handle) = self.namespaces.read().get(name) {
476            return Ok(handle.clone());
477        }
478        
479        // Check registry
480        {
481            let registry = self.registry.read();
482            if !registry.namespaces.iter().any(|e| e.name == name && e.active) {
483                return Err(NamespaceStorageError::NotFound(name.to_string()));
484            }
485        }
486        
487        // Load from disk
488        let namespace_dir = self.data_dir.join("namespaces").join(name);
489        if !namespace_dir.exists() {
490            return Err(NamespaceStorageError::NotFound(name.to_string()));
491        }
492        
493        let meta_path = namespace_dir.join("_meta.json");
494        let meta: NamespaceMeta = if meta_path.exists() {
495            let file = File::open(&meta_path)?;
496            let reader = BufReader::new(file);
497            serde_json::from_reader(reader)
498                .map_err(|e| NamespaceStorageError::RegistryCorrupted(e.to_string()))?
499        } else {
500            NamespaceMeta::new(name)
501        };
502        
503        let handle = Arc::new(NamespaceHandle {
504            name: name.to_string(),
505            root: namespace_dir,
506            meta: Arc::new(RwLock::new(meta)),
507        });
508        
509        self.namespaces.write().insert(name.to_string(), handle.clone());
510        
511        Ok(handle)
512    }
513    
514    /// List all namespaces
515    pub fn list_namespaces(&self) -> Vec<String> {
516        self.registry
517            .read()
518            .namespaces
519            .iter()
520            .filter(|e| e.active)
521            .map(|e| e.name.clone())
522            .collect()
523    }
524    
525    /// Delete a namespace (marks as inactive, doesn't delete files)
526    pub fn delete_namespace(&self, name: &str) -> Result<()> {
527        {
528            let mut registry = self.registry.write();
529            let entry = registry
530                .namespaces
531                .iter_mut()
532                .find(|e| e.name == name)
533                .ok_or_else(|| NamespaceStorageError::NotFound(name.to_string()))?;
534            entry.active = false;
535        }
536        
537        self.save_registry()?;
538        self.namespaces.write().remove(name);
539        
540        // Remove cached collections for this namespace
541        self.collections
542            .write()
543            .retain(|(ns, _), _| ns != name);
544        
545        Ok(())
546    }
547    
548    /// Validate namespace name
549    fn validate_namespace_name(name: &str) -> Result<()> {
550        if name.is_empty() {
551            return Err(NamespaceStorageError::InvalidName(
552                "namespace name cannot be empty".to_string(),
553            ));
554        }
555        
556        if name.len() > 256 {
557            return Err(NamespaceStorageError::InvalidName(
558                "namespace name too long (max 256 chars)".to_string(),
559            ));
560        }
561        
562        // Must start with alphanumeric
563        let first = name.chars().next().unwrap();
564        if !first.is_alphanumeric() {
565            return Err(NamespaceStorageError::InvalidName(format!(
566                "namespace name must start with alphanumeric, got '{}'",
567                first
568            )));
569        }
570        
571        // Only allow alphanumeric, underscore, hyphen, dot
572        for ch in name.chars() {
573            if !ch.is_alphanumeric() && ch != '_' && ch != '-' && ch != '.' {
574                return Err(NamespaceStorageError::InvalidName(format!(
575                    "invalid character '{}' in namespace name",
576                    ch
577                )));
578            }
579        }
580        
581        // Reserved names
582        let reserved = ["_global", "_namespaces", "_meta", "_system"];
583        if reserved.contains(&name) {
584            return Err(NamespaceStorageError::InvalidName(format!(
585                "'{}' is a reserved name",
586                name
587            )));
588        }
589        
590        Ok(())
591    }
592    
593    // ========================================================================
594    // Collection Operations
595    // ========================================================================
596    
597    /// Create a collection in a namespace
598    pub fn create_collection(
599        &self,
600        namespace: &str,
601        config: CollectionConfig,
602    ) -> Result<Arc<CollectionHandle>> {
603        let ns_handle = self.get_namespace(namespace)?;
604        
605        // Check if namespace is read-only
606        if ns_handle.meta.read().read_only {
607            return Err(NamespaceStorageError::ReadOnly(namespace.to_string()));
608        }
609        
610        let collection_name = config.name.clone();
611        let collection_dir = ns_handle.collection_path(&collection_name);
612        
613        // Check if already exists
614        if collection_dir.exists() {
615            return Err(NamespaceStorageError::CollectionAlreadyExists {
616                namespace: namespace.to_string(),
617                collection: collection_name,
618            });
619        }
620        
621        // Create directory
622        fs::create_dir_all(&collection_dir)?;
623        
624        // Freeze config and write
625        let frozen_config = CollectionConfig {
626            frozen: true,
627            ..config
628        };
629        
630        let config_path = collection_dir.join("config.json");
631        let file = File::create(&config_path)?;
632        let writer = BufWriter::new(file);
633        serde_json::to_writer_pretty(writer, &frozen_config)
634            .map_err(|e| NamespaceStorageError::IoError(e.to_string()))?;
635        
636        // Update namespace metadata
637        {
638            let mut meta = ns_handle.meta.write();
639            meta.collections.push(collection_name.clone());
640            meta.updated_at = SystemTime::now()
641                .duration_since(UNIX_EPOCH)
642                .unwrap_or_default()
643                .as_millis() as u64;
644        }
645        
646        // Write updated namespace meta
647        let ns_meta_path = ns_handle.root.join("_meta.json");
648        let file = File::create(&ns_meta_path)?;
649        let writer = BufWriter::new(file);
650        serde_json::to_writer_pretty(writer, &*ns_handle.meta.read())
651            .map_err(|e| NamespaceStorageError::IoError(e.to_string()))?;
652        
653        // Create and cache handle
654        let handle = Arc::new(CollectionHandle {
655            namespace: namespace.to_string(),
656            name: collection_name.clone(),
657            root: collection_dir,
658            config: Arc::new(frozen_config),
659        });
660        
661        self.collections
662            .write()
663            .insert((namespace.to_string(), collection_name), handle.clone());
664        
665        Ok(handle)
666    }
667    
668    /// Get a collection handle
669    pub fn get_collection(
670        &self,
671        namespace: &str,
672        collection: &str,
673    ) -> Result<Arc<CollectionHandle>> {
674        let key = (namespace.to_string(), collection.to_string());
675        
676        // Check cache
677        if let Some(handle) = self.collections.read().get(&key) {
678            return Ok(handle.clone());
679        }
680        
681        let ns_handle = self.get_namespace(namespace)?;
682        let collection_dir = ns_handle.collection_path(collection);
683        
684        if !collection_dir.exists() {
685            return Err(NamespaceStorageError::CollectionNotFound {
686                namespace: namespace.to_string(),
687                collection: collection.to_string(),
688            });
689        }
690        
691        // Load config
692        let config_path = collection_dir.join("config.json");
693        let config: CollectionConfig = if config_path.exists() {
694            let file = File::open(&config_path)?;
695            let reader = BufReader::new(file);
696            serde_json::from_reader(reader)
697                .map_err(|e| NamespaceStorageError::RegistryCorrupted(e.to_string()))?
698        } else {
699            // Default config for legacy collections
700            CollectionConfig {
701                name: collection.to_string(),
702                dimension: None,
703                metric: DistanceMetric::Cosine,
704                index_config: IndexConfig::default(),
705                created_at: 0,
706                frozen: true,
707            }
708        };
709        
710        let handle = Arc::new(CollectionHandle {
711            namespace: namespace.to_string(),
712            name: collection.to_string(),
713            root: collection_dir,
714            config: Arc::new(config),
715        });
716        
717        self.collections.write().insert(key, handle.clone());
718        
719        Ok(handle)
720    }
721    
722    /// Delete a collection
723    pub fn delete_collection(&self, namespace: &str, collection: &str) -> Result<()> {
724        let ns_handle = self.get_namespace(namespace)?;
725        
726        // Check if namespace is read-only
727        if ns_handle.meta.read().read_only {
728            return Err(NamespaceStorageError::ReadOnly(namespace.to_string()));
729        }
730        
731        let collection_dir = ns_handle.collection_path(collection);
732        if !collection_dir.exists() {
733            return Err(NamespaceStorageError::CollectionNotFound {
734                namespace: namespace.to_string(),
735                collection: collection.to_string(),
736            });
737        }
738        
739        // Remove from cache
740        self.collections
741            .write()
742            .remove(&(namespace.to_string(), collection.to_string()));
743        
744        // Update namespace metadata
745        {
746            let mut meta = ns_handle.meta.write();
747            meta.collections.retain(|c| c != collection);
748            meta.updated_at = SystemTime::now()
749                .duration_since(UNIX_EPOCH)
750                .unwrap_or_default()
751                .as_millis() as u64;
752        }
753        
754        // Write updated namespace meta
755        let ns_meta_path = ns_handle.root.join("_meta.json");
756        let file = File::create(&ns_meta_path)?;
757        let writer = BufWriter::new(file);
758        serde_json::to_writer_pretty(writer, &*ns_handle.meta.read())
759            .map_err(|e| NamespaceStorageError::IoError(e.to_string()))?;
760        
761        // Remove directory (optional - could mark as deleted instead)
762        fs::remove_dir_all(&collection_dir)?;
763        
764        Ok(())
765    }
766    
767    /// Resolve (namespace, collection) to storage paths
768    ///
769    /// This is the main routing method, O(1) via hash-map lookup.
770    pub fn resolve(
771        &self,
772        namespace: &str,
773        collection: &str,
774    ) -> Result<Arc<CollectionHandle>> {
775        self.get_collection(namespace, collection)
776    }
777}
778
779// ============================================================================
780// Prefix Iteration Safety
781// ============================================================================
782
783/// Safe prefix iterator that guarantees all keys match the prefix
784///
785/// This is the correct primitive for namespace-scoped iteration.
786/// Unlike range scans, this cannot accidentally include keys from other namespaces.
787pub struct PrefixIterator<I> {
788    inner: I,
789    prefix: Vec<u8>,
790    exhausted: bool,
791}
792
793impl<I> PrefixIterator<I> {
794    /// Create a new prefix iterator
795    pub fn new(inner: I, prefix: Vec<u8>) -> Self {
796        Self {
797            inner,
798            prefix,
799            exhausted: false,
800        }
801    }
802    
803    /// Get the prefix
804    pub fn prefix(&self) -> &[u8] {
805        &self.prefix
806    }
807}
808
809impl<I, K, V> Iterator for PrefixIterator<I>
810where
811    I: Iterator<Item = (K, V)>,
812    K: AsRef<[u8]>,
813{
814    type Item = (K, V);
815    
816    fn next(&mut self) -> Option<Self::Item> {
817        if self.exhausted {
818            return None;
819        }
820        
821        match self.inner.next() {
822            Some((key, value)) => {
823                if key.as_ref().starts_with(&self.prefix) {
824                    Some((key, value))
825                } else {
826                    // Key doesn't match prefix - we're done
827                    // This is the safety guarantee: we stop at the first non-matching key
828                    self.exhausted = true;
829                    None
830                }
831            }
832            None => {
833                self.exhausted = true;
834                None
835            }
836        }
837    }
838}
839
840/// Compute the exclusive end key for a prefix scan
841///
842/// Given a prefix, returns the smallest key that is greater than all keys
843/// starting with that prefix. This enables efficient range scans.
844///
845/// # Example
846/// ```
847/// use sochdb_storage::namespace::next_prefix;
848/// assert_eq!(next_prefix(b"abc"), Some(b"abd".to_vec()));
849/// assert_eq!(next_prefix(b"\xff\xff"), None); // No successor
850/// ```
851pub fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
852    if prefix.is_empty() {
853        return None;
854    }
855    
856    let mut result = prefix.to_vec();
857    
858    // Find rightmost byte that isn't 0xFF
859    while let Some(&last) = result.last() {
860        if last == 0xFF {
861            result.pop();
862        } else {
863            // Increment this byte
864            *result.last_mut().unwrap() += 1;
865            return Some(result);
866        }
867    }
868    
869    // All bytes were 0xFF - no successor exists
870    None
871}
872
873// ============================================================================
874// Tests
875// ============================================================================
876
877#[cfg(test)]
878mod tests {
879    use super::*;
880    use tempfile::TempDir;
881    
882    fn setup_router() -> (TempDir, NamespaceRouter) {
883        let temp_dir = TempDir::new().unwrap();
884        let router = NamespaceRouter::new(temp_dir.path()).unwrap();
885        (temp_dir, router)
886    }
887    
888    #[test]
889    fn test_create_namespace() {
890        let (_temp, router) = setup_router();
891        
892        let meta = NamespaceMeta::new("tenant_a")
893            .with_display_name("Tenant A")
894            .with_labels([("env".to_string(), "production".to_string())].into());
895        
896        let handle = router.create_namespace(meta).unwrap();
897        assert_eq!(handle.name, "tenant_a");
898        assert!(handle.root.exists());
899        assert!(handle.collections_dir().exists());
900    }
901    
902    #[test]
903    fn test_namespace_already_exists() {
904        let (_temp, router) = setup_router();
905        
906        router.create_namespace(NamespaceMeta::new("tenant_a")).unwrap();
907        
908        let result = router.create_namespace(NamespaceMeta::new("tenant_a"));
909        assert!(matches!(result, Err(NamespaceStorageError::AlreadyExists(_))));
910    }
911    
912    #[test]
913    fn test_get_namespace() {
914        let (_temp, router) = setup_router();
915        
916        router.create_namespace(NamespaceMeta::new("tenant_a")).unwrap();
917        
918        let handle = router.get_namespace("tenant_a").unwrap();
919        assert_eq!(handle.name, "tenant_a");
920    }
921    
922    #[test]
923    fn test_namespace_not_found() {
924        let (_temp, router) = setup_router();
925        
926        let result = router.get_namespace("nonexistent");
927        assert!(matches!(result, Err(NamespaceStorageError::NotFound(_))));
928    }
929    
930    #[test]
931    fn test_list_namespaces() {
932        let (_temp, router) = setup_router();
933        
934        router.create_namespace(NamespaceMeta::new("tenant_a")).unwrap();
935        router.create_namespace(NamespaceMeta::new("tenant_b")).unwrap();
936        
937        let mut namespaces = router.list_namespaces();
938        namespaces.sort();
939        
940        assert_eq!(namespaces, vec!["tenant_a", "tenant_b"]);
941    }
942    
943    #[test]
944    fn test_delete_namespace() {
945        let (_temp, router) = setup_router();
946        
947        router.create_namespace(NamespaceMeta::new("tenant_a")).unwrap();
948        router.delete_namespace("tenant_a").unwrap();
949        
950        let namespaces = router.list_namespaces();
951        assert!(!namespaces.contains(&"tenant_a".to_string()));
952    }
953    
954    #[test]
955    fn test_create_collection() {
956        let (_temp, router) = setup_router();
957        
958        router.create_namespace(NamespaceMeta::new("tenant_a")).unwrap();
959        
960        let config = CollectionConfig {
961            name: "documents".to_string(),
962            dimension: Some(384),
963            metric: DistanceMetric::Cosine,
964            index_config: IndexConfig::default(),
965            created_at: 0,
966            frozen: false,
967        };
968        
969        let handle = router.create_collection("tenant_a", config).unwrap();
970        assert_eq!(handle.name, "documents");
971        assert!(handle.root.exists());
972        assert!(handle.config.frozen); // Should be frozen after creation
973    }
974    
975    #[test]
976    fn test_get_collection() {
977        let (_temp, router) = setup_router();
978        
979        router.create_namespace(NamespaceMeta::new("tenant_a")).unwrap();
980        
981        let config = CollectionConfig {
982            name: "documents".to_string(),
983            dimension: Some(384),
984            metric: DistanceMetric::Cosine,
985            index_config: IndexConfig::default(),
986            created_at: 0,
987            frozen: false,
988        };
989        
990        router.create_collection("tenant_a", config).unwrap();
991        
992        let handle = router.get_collection("tenant_a", "documents").unwrap();
993        assert_eq!(handle.name, "documents");
994        assert_eq!(handle.namespace, "tenant_a");
995    }
996    
997    #[test]
998    fn test_resolve() {
999        let (_temp, router) = setup_router();
1000        
1001        router.create_namespace(NamespaceMeta::new("tenant_a")).unwrap();
1002        
1003        let config = CollectionConfig {
1004            name: "documents".to_string(),
1005            dimension: Some(384),
1006            metric: DistanceMetric::Cosine,
1007            index_config: IndexConfig::default(),
1008            created_at: 0,
1009            frozen: false,
1010        };
1011        
1012        router.create_collection("tenant_a", config).unwrap();
1013        
1014        // O(1) resolution
1015        let handle = router.resolve("tenant_a", "documents").unwrap();
1016        assert_eq!(handle.vectors_path(), handle.root.join("vectors.idx"));
1017    }
1018    
1019    #[test]
1020    fn test_invalid_namespace_names() {
1021        let (_temp, router) = setup_router();
1022        
1023        // Empty name
1024        assert!(router.create_namespace(NamespaceMeta::new("")).is_err());
1025        
1026        // Starts with hyphen
1027        assert!(router.create_namespace(NamespaceMeta::new("-bad")).is_err());
1028        
1029        // Contains space
1030        assert!(router.create_namespace(NamespaceMeta::new("bad name")).is_err());
1031        
1032        // Reserved name
1033        assert!(router.create_namespace(NamespaceMeta::new("_global")).is_err());
1034    }
1035    
1036    #[test]
1037    fn test_next_prefix() {
1038        assert_eq!(next_prefix(b"abc"), Some(b"abd".to_vec()));
1039        assert_eq!(next_prefix(b"ab\xff"), Some(b"ac".to_vec()));
1040        assert_eq!(next_prefix(b"\xff\xff\xff"), None);
1041        assert_eq!(next_prefix(b""), None);
1042        assert_eq!(next_prefix(b"tenant_a/"), Some(b"tenant_a0".to_vec()));
1043    }
1044    
1045    #[test]
1046    fn test_prefix_iterator() {
1047        let data = vec![
1048            (b"tenant_a/doc1".to_vec(), b"v1".to_vec()),
1049            (b"tenant_a/doc2".to_vec(), b"v2".to_vec()),
1050            (b"tenant_b/doc1".to_vec(), b"v3".to_vec()), // Should not be returned
1051        ];
1052        
1053        let iter = PrefixIterator::new(data.into_iter(), b"tenant_a/".to_vec());
1054        let results: Vec<_> = iter.collect();
1055        
1056        assert_eq!(results.len(), 2);
1057        assert!(results[0].0.starts_with(b"tenant_a/"));
1058        assert!(results[1].0.starts_with(b"tenant_a/"));
1059    }
1060}