alopex_embedded/
lib.rs

1//! A user-friendly embedded API for the AlopexDB key-value store.
2
3#![deny(missing_docs)]
4
5/// Catalog metadata API (in-memory, primarily for Python bindings).
6pub mod catalog;
7pub mod catalog_api;
8pub mod columnar_api;
9pub mod options;
10mod sql_api;
11mod txn_manager;
12
13pub use crate::catalog::{CachedTableInfo, Catalog};
14pub use crate::catalog_api::{
15    CatalogInfo, ColumnDefinition, ColumnInfo, CreateCatalogRequest, CreateNamespaceRequest,
16    CreateTableRequest, IndexInfo, NamespaceInfo, StorageInfo, TableInfo,
17};
18pub use crate::columnar_api::{
19    ColumnarIndexInfo, ColumnarIndexType, ColumnarRowIterator, EmbeddedConfig, StorageMode,
20};
21pub use crate::options::DatabaseOptions;
22pub use crate::sql_api::{SqlStreamingResult, StreamingQueryResult, StreamingRows};
23pub use crate::txn_manager::{TransactionInfo, TransactionManager};
24pub use alopex_sql::{DataSourceFormat, TableType};
25/// `Database::execute_sql()` / `Transaction::execute_sql()` の返却型。
26pub type SqlResult = alopex_sql::SqlResult;
27use alopex_core::vector::hnsw::{HnswTransactionState, SearchStats as HnswSearchStats};
28use alopex_core::{
29    columnar::{
30        kvs_bridge::ColumnarKvsBridge, memory::InMemorySegmentStore, segment_v2::SegmentConfigV2,
31    },
32    kv::any::AnyKVTransaction,
33    kv::memory::MemoryKV,
34    kv::AnyKV,
35    validate_dimensions, HnswIndex, KVStore, KVTransaction, Key, LargeValueKind, LargeValueMeta,
36    LargeValueReader, LargeValueWriter, StorageFactory, VectorType, DEFAULT_CHUNK_SIZE,
37};
38pub use alopex_core::{HnswConfig, HnswSearchResult, HnswStats, MemoryStats, Metric, TxnMode};
39/// Streaming query row iterator for FR-7 compliance.
40pub use alopex_sql::executor::QueryRowIterator;
41use std::collections::HashMap;
42use std::convert::TryInto;
43use std::fs;
44use std::path::{Path, PathBuf};
45use std::result;
46use std::sync::atomic::{AtomicU64, Ordering};
47use std::sync::{Arc, RwLock};
48
49/// A convenience `Result` type for database operations.
50pub type Result<T> = result::Result<T, Error>;
51
52/// The error type for embedded database operations.
53#[derive(Debug, thiserror::Error)]
54pub enum Error {
55    /// An error from the underlying core storage engine.
56    #[error("core error: {0}")]
57    Core(#[from] alopex_core::Error),
58    /// An error from the SQL execution pipeline.
59    #[error("{0}")]
60    Sql(#[from] alopex_sql::SqlError),
61    /// The transaction has already been completed and cannot be used.
62    #[error("transaction is completed")]
63    TxnCompleted,
64    /// Catalog が見つかりません。
65    #[error("カタログが見つかりません: {0}")]
66    CatalogNotFound(String),
67    /// Catalog が既に存在します。
68    #[error("カタログは既に存在します: {0}")]
69    CatalogAlreadyExists(String),
70    /// Catalog が空ではありません。
71    #[error("カタログが空ではありません: {0}")]
72    CatalogNotEmpty(String),
73    /// Namespace が見つかりません。
74    #[error("ネームスペースが見つかりません: {0}.{1}")]
75    NamespaceNotFound(String, String),
76    /// Namespace が既に存在します。
77    #[error("ネームスペースは既に存在します: {0}.{1}")]
78    NamespaceAlreadyExists(String, String),
79    /// Namespace が空ではありません。
80    #[error("ネームスペースが空ではありません: {0}.{1}")]
81    NamespaceNotEmpty(String, String),
82    /// The requested table was not found or is invalid.
83    #[error("table not found: {0}")]
84    TableNotFound(String),
85    /// Table が既に存在します。
86    #[error("テーブルは既に存在します: {0}")]
87    TableAlreadyExists(String),
88    /// Index が見つかりません。
89    #[error("インデックスが見つかりません: {0}")]
90    IndexNotFound(String),
91    /// default オブジェクトは削除できません。
92    #[error("default オブジェクトは削除できません: {0}")]
93    CannotDeleteDefault(String),
94    /// Managed テーブルにはスキーマが必要です。
95    #[error("managed テーブルにはスキーマが必要です")]
96    SchemaRequired,
97    /// External テーブルには storage_root が必要です。
98    #[error("external テーブルには storage_root が必要です")]
99    StorageRootRequired,
100    /// トランザクションは read-only です。
101    #[error("トランザクションは読み取り専用です")]
102    TxnReadOnly,
103    /// Transaction ID is invalid or missing.
104    #[error("invalid transaction id: {0}")]
105    InvalidTransactionId(String),
106    /// The operation requires in-memory columnar mode.
107    #[error("not in in-memory columnar mode")]
108    NotInMemoryMode,
109    /// The requested data source format is not supported.
110    #[error("unsupported data source format: {0}")]
111    UnsupportedDataSourceFormat(String),
112    /// The catalog store lock was poisoned.
113    #[error("catalog lock poisoned")]
114    CatalogLockPoisoned,
115}
116
117impl Error {
118    /// SQL エラーの場合はエラーコード(例: `ALOPEX-S003`)を返す。
119    pub fn sql_error_code(&self) -> Option<&'static str> {
120        match self {
121            Self::Sql(err) => Some(err.code()),
122            _ => None,
123        }
124    }
125}
126
127/// The main database object.
128pub struct Database {
129    /// The underlying key-value store.
130    pub(crate) store: Arc<AnyKV>,
131    pub(crate) sql_catalog: Arc<RwLock<alopex_sql::catalog::PersistentCatalog<AnyKV>>>,
132    pub(crate) hnsw_cache: RwLock<HashMap<String, Arc<HnswIndex>>>,
133    pub(crate) vector_cache: RwLock<Option<HashMap<Key, CachedVector>>>,
134    /// Table info cache for scan/write operations.
135    pub(crate) table_info_cache: RwLock<HashMap<String, CachedTableInfo>>,
136    /// Cache epoch for invalidation (incremented on DDL operations).
137    pub(crate) table_info_cache_epoch: AtomicU64,
138    pub(crate) columnar_mode: StorageMode,
139    pub(crate) columnar_bridge: ColumnarKvsBridge,
140    pub(crate) columnar_memory: Option<InMemorySegmentStore>,
141    pub(crate) segment_config: SegmentConfigV2,
142}
143
144pub(crate) fn disk_data_dir_path(path: &Path) -> std::path::PathBuf {
145    if path.extension().is_some_and(|e| e == "alopex") {
146        // v0.1 file-mode はディレクトリに WAL/SSTable を持つため、`.alopex` の横に
147        // sidecar ディレクトリを作ってそこへ格納する。
148        path.with_extension("alopex.d")
149    } else {
150        path.to_path_buf()
151    }
152}
153
154#[cfg(not(target_arch = "wasm32"))]
155fn read_file_version_from_storage(path: &Path) -> alopex_core::storage::format::FileVersion {
156    use alopex_core::storage::format::{FileHeader, FileVersion, HEADER_SIZE};
157    use std::io::Read;
158
159    let Some(file_path) = resolve_format_file_path(path) else {
160        return FileVersion::CURRENT;
161    };
162
163    let mut header_bytes = [0u8; HEADER_SIZE];
164    let Ok(mut file) = fs::File::open(file_path) else {
165        return FileVersion::CURRENT;
166    };
167    if file.read_exact(&mut header_bytes).is_err() {
168        return FileVersion::CURRENT;
169    }
170    match FileHeader::from_bytes(&header_bytes) {
171        Ok(header) => header.version,
172        Err(_) => FileVersion::CURRENT,
173    }
174}
175
176#[cfg(target_arch = "wasm32")]
177fn read_file_version_from_storage(_path: &Path) -> alopex_core::storage::format::FileVersion {
178    alopex_core::storage::format::FileVersion::CURRENT
179}
180
181#[cfg(not(target_arch = "wasm32"))]
182fn resolve_format_file_path(path: &Path) -> Option<PathBuf> {
183    if path.is_file() {
184        return Some(path.to_path_buf());
185    }
186
187    if path.is_dir() {
188        if let Some(ext) = path.extension() {
189            if ext == "d" {
190                let candidate = path.with_extension("");
191                if candidate.is_file() {
192                    return Some(candidate);
193                }
194            }
195        }
196
197        if let Ok(entries) = fs::read_dir(path) {
198            for entry in entries.flatten() {
199                let entry_path = entry.path();
200                if entry_path.extension().is_some_and(|ext| ext == "alopex") && entry_path.is_file()
201                {
202                    return Some(entry_path);
203                }
204            }
205        }
206    }
207
208    None
209}
210
211impl Database {
212    /// Opens a database at the specified path.
213    pub fn open(path: &Path) -> Result<Self> {
214        let data_dir = disk_data_dir_path(path);
215        let store = StorageFactory::create(alopex_core::StorageMode::Disk {
216            path: data_dir,
217            config: None,
218        })
219        .map_err(Error::Core)?;
220        let mut db = Self::init(store, StorageMode::Disk, None, SegmentConfigV2::default());
221        db.load_sql_catalog()?;
222        Ok(db)
223    }
224
225    /// Creates a new, purely in-memory (transient) database.
226    pub fn new() -> Self {
227        let store = AnyKV::Memory(MemoryKV::new());
228        Self::init(
229            store,
230            StorageMode::InMemory,
231            None,
232            SegmentConfigV2::default(),
233        )
234    }
235
236    /// Opens a database in in-memory mode with default options.
237    pub fn open_in_memory() -> Result<Self> {
238        Self::open_in_memory_with_options(DatabaseOptions::in_memory())
239    }
240
241    /// Opens a database in in-memory mode with the given options.
242    pub fn open_in_memory_with_options(opts: DatabaseOptions) -> Result<Self> {
243        if !opts.memory_mode() {
244            return Err(Error::Core(alopex_core::Error::InvalidFormat(
245                "memory_mode must be enabled for in-memory open".into(),
246            )));
247        }
248        let store = StorageFactory::create(opts.to_storage_mode(None)).map_err(Error::Core)?;
249        let mut db = Self::init(
250            store,
251            StorageMode::InMemory,
252            opts.memory_limit(),
253            SegmentConfigV2::default(),
254        );
255        db.load_sql_catalog()?;
256        Ok(db)
257    }
258
259    /// Opens a database from a URI string.
260    ///
261    /// Supported URI schemes:
262    /// - `file://path` or bare path: Local filesystem
263    /// - `s3://bucket/prefix`: S3-compatible storage (requires `s3` feature)
264    ///
265    /// # Example
266    ///
267    /// ```ignore
268    /// // Local path
269    /// let db = Database::open_with_uri("/path/to/db")?;
270    ///
271    /// // S3 URI (requires s3 feature and credentials)
272    /// let db = Database::open_with_uri("s3://my-bucket/data")?;
273    /// ```
274    pub fn open_with_uri(uri: &str) -> Result<Self> {
275        // Check for S3 URI
276        if uri.starts_with("s3://") {
277            #[cfg(feature = "s3")]
278            {
279                return Self::open_s3(uri);
280            }
281            #[cfg(not(feature = "s3"))]
282            {
283                return Err(Error::Core(alopex_core::Error::InvalidFormat(
284                    "S3 support requires the 's3' feature".into(),
285                )));
286            }
287        }
288
289        // Strip file:// prefix if present
290        let path = if let Some(stripped) = uri.strip_prefix("file://") {
291            stripped
292        } else {
293            uri
294        };
295
296        Self::open(Path::new(path))
297    }
298
299    /// Opens a database backed by S3 storage.
300    ///
301    /// This method downloads data from S3 to a local cache, operates on it
302    /// using LsmKV, and syncs changes back to S3 on flush/close.
303    ///
304    /// # Arguments
305    ///
306    /// * `uri` - S3 URI in the format `s3://bucket/prefix`
307    ///
308    /// # Environment Variables
309    ///
310    /// Required:
311    /// * `AWS_ACCESS_KEY_ID` - AWS access key
312    /// * `AWS_SECRET_ACCESS_KEY` - AWS secret key
313    ///
314    /// Optional:
315    /// * `AWS_REGION` - AWS region (default: us-east-1)
316    /// * `AWS_ENDPOINT_URL` - Custom endpoint for S3-compatible services
317    #[cfg(feature = "s3")]
318    pub fn open_s3(uri: &str) -> Result<Self> {
319        let s3_config = alopex_core::S3Config::from_uri(uri).map_err(Error::Core)?;
320        let store = StorageFactory::create(alopex_core::StorageMode::S3 { config: s3_config })
321            .map_err(Error::Core)?;
322        let mut db = Self::init(store, StorageMode::Disk, None, SegmentConfigV2::default());
323        db.load_sql_catalog()?;
324        Ok(db)
325    }
326
327    pub(crate) fn init(
328        store: AnyKV,
329        columnar_mode: StorageMode,
330        memory_limit: Option<usize>,
331        segment_config: SegmentConfigV2,
332    ) -> Self {
333        let store = Arc::new(store);
334        let sql_catalog = Arc::new(RwLock::new(alopex_sql::catalog::PersistentCatalog::new(
335            store.clone(),
336        )));
337        let columnar_bridge = ColumnarKvsBridge::new(store.clone());
338        let columnar_memory = if matches!(columnar_mode, StorageMode::InMemory) {
339            Some(InMemorySegmentStore::new(memory_limit.map(|v| v as u64)))
340        } else {
341            None
342        };
343
344        Self {
345            store,
346            sql_catalog,
347            hnsw_cache: RwLock::new(HashMap::new()),
348            vector_cache: RwLock::new(None),
349            table_info_cache: RwLock::new(HashMap::new()),
350            table_info_cache_epoch: AtomicU64::new(0),
351            columnar_mode,
352            columnar_bridge,
353            columnar_memory,
354            segment_config,
355        }
356    }
357
358    fn load_sql_catalog(&mut self) -> Result<()> {
359        use alopex_sql::catalog::CatalogError;
360
361        let loaded = match alopex_sql::catalog::PersistentCatalog::load(self.store.clone()) {
362            Ok(catalog) => catalog,
363            Err(CatalogError::Kv(alopex_core::Error::NotFound)) => {
364                alopex_sql::catalog::PersistentCatalog::new(self.store.clone())
365            }
366            Err(other) => return Err(Error::Sql(other.into())),
367        };
368
369        self.sql_catalog = Arc::new(RwLock::new(loaded));
370        Ok(())
371    }
372
373    fn hnsw_cache_get(&self, name: &str) -> Option<Arc<HnswIndex>> {
374        let cache = self.hnsw_cache.read().expect("hnsw cache lock poisoned");
375        cache.get(name).cloned()
376    }
377
378    fn hnsw_cache_insert(&self, name: &str, index: HnswIndex) -> Arc<HnswIndex> {
379        let index = Arc::new(index);
380        let mut cache = self.hnsw_cache.write().expect("hnsw cache lock poisoned");
381        cache.insert(name.to_string(), Arc::clone(&index));
382        index
383    }
384
385    fn hnsw_cache_remove(&self, name: &str) {
386        let mut cache = self.hnsw_cache.write().expect("hnsw cache lock poisoned");
387        cache.remove(name);
388    }
389
390    /// Returns the current table info cache epoch.
391    pub fn table_info_cache_epoch(&self) -> u64 {
392        self.table_info_cache_epoch.load(Ordering::Relaxed)
393    }
394
395    /// Retrieves cached table info if available and epoch matches.
396    pub fn get_cached_table_info(
397        &self,
398        catalog_name: &str,
399        namespace_name: &str,
400        table_name: &str,
401    ) -> Option<CachedTableInfo> {
402        let cache = self
403            .table_info_cache
404            .read()
405            .expect("table info cache lock poisoned");
406        let key = format!("{}.{}.{}", catalog_name, namespace_name, table_name);
407        cache.get(&key).cloned()
408    }
409
410    /// Stores table info in the cache.
411    pub fn cache_table_info(
412        &self,
413        catalog_name: &str,
414        namespace_name: &str,
415        table_name: &str,
416        info: CachedTableInfo,
417    ) {
418        let mut cache = self
419            .table_info_cache
420            .write()
421            .expect("table info cache lock poisoned");
422        let key = format!("{}.{}.{}", catalog_name, namespace_name, table_name);
423        cache.insert(key, info);
424    }
425
426    /// Invalidates the table info cache (called on DDL operations).
427    pub fn invalidate_table_info_cache(&self) {
428        self.table_info_cache_epoch.fetch_add(1, Ordering::Relaxed);
429        let mut cache = self
430            .table_info_cache
431            .write()
432            .expect("table info cache lock poisoned");
433        cache.clear();
434    }
435
436    /// Flushes the current in-memory data to an SSTable on disk (beta).
437    pub fn flush(&self) -> Result<()> {
438        self.store.flush().map_err(Error::Core)
439    }
440
441    /// Returns the file format version supported by the embedded engine.
442    pub fn file_format_version(&self) -> alopex_core::storage::format::FileVersion {
443        use alopex_core::storage::format::FileVersion;
444
445        match self.store.as_ref() {
446            AnyKV::Memory(_) => FileVersion::CURRENT,
447            AnyKV::Lsm(kv) => read_file_version_from_storage(&kv.data_dir),
448            #[cfg(feature = "s3")]
449            AnyKV::S3(kv) => read_file_version_from_storage(kv.cache_dir()),
450        }
451    }
452
453    /// Returns current memory usage statistics (in-memory KV only).
454    pub fn memory_usage(&self) -> Option<MemoryStats> {
455        match self.store.as_ref() {
456            AnyKV::Memory(kv) => Some(kv.memory_stats()),
457            AnyKV::Lsm(_) => None,
458            #[cfg(feature = "s3")]
459            AnyKV::S3(_) => None,
460        }
461    }
462
463    /// Persists the current in-memory database to disk atomically.
464    ///
465    /// `wal_path` は「データディレクトリ」として扱う(file-mode)。
466    pub fn persist_to_disk(&self, wal_path: &Path) -> Result<()> {
467        if !matches!(self.store.as_ref(), AnyKV::Memory(_)) {
468            return Err(Error::NotInMemoryMode);
469        }
470        let data_dir = disk_data_dir_path(wal_path);
471        if wal_path.exists() || data_dir.exists() {
472            return Err(Error::Core(alopex_core::Error::PathExists(
473                wal_path.to_path_buf(),
474            )));
475        }
476
477        let tmp_dir = data_dir.with_extension("tmp");
478        if tmp_dir.exists() {
479            return Err(Error::Core(alopex_core::Error::PathExists(tmp_dir)));
480        }
481
482        let snapshot = self.snapshot_pairs()?;
483        let write_result = (|| -> Result<()> {
484            let store = StorageFactory::create(alopex_core::StorageMode::Disk {
485                path: tmp_dir.clone(),
486                config: None,
487            })
488            .map_err(Error::Core)?;
489
490            let mut txn = store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
491            for (key, value) in snapshot {
492                txn.put(key, value).map_err(Error::Core)?;
493            }
494            txn.commit_self().map_err(Error::Core)?;
495
496            Ok(())
497        })();
498
499        if let Err(e) = write_result {
500            let _ = fs::remove_dir_all(&tmp_dir);
501            return Err(e);
502        }
503
504        fs::rename(&tmp_dir, &data_dir).map_err(|e| Error::Core(e.into()))?;
505        if wal_path.extension().is_some_and(|e| e == "alopex") {
506            // `.alopex` パスを渡した場合は、存在確認用のマーカーを作る。
507            let _ = fs::OpenOptions::new()
508                .create_new(true)
509                .write(true)
510                .open(wal_path);
511        }
512        Ok(())
513    }
514
515    /// Creates a fully in-memory clone of the current database.
516    pub fn clone_to_memory(&self) -> Result<Self> {
517        let snapshot = self.snapshot_pairs()?;
518        let cloned = Database::open_in_memory()?;
519        if snapshot.is_empty() {
520            return Ok(cloned);
521        }
522
523        let mut txn = cloned.begin(TxnMode::ReadWrite)?;
524        for (key, value) in snapshot {
525            txn.put(&key, &value)?;
526        }
527        txn.commit()?;
528        Ok(cloned)
529    }
530
531    /// Clears all data while keeping the database usable.
532    pub fn clear(&self) -> Result<()> {
533        let keys: Vec<Key> = self.snapshot_pairs()?.into_iter().map(|(k, _)| k).collect();
534        if keys.is_empty() {
535            return Ok(());
536        }
537        let mut txn = self.begin(TxnMode::ReadWrite)?;
538        for key in keys {
539            txn.delete(&key)?;
540        }
541        txn.commit()
542    }
543
544    /// Updates the memory limit in bytes for the underlying in-memory store.
545    pub fn set_memory_limit(&self, bytes: Option<usize>) {
546        if let AnyKV::Memory(kv) = self.store.as_ref() {
547            kv.txn_manager().set_memory_limit(bytes);
548        }
549    }
550
551    /// Returns a read-only snapshot of all key-value pairs.
552    pub fn snapshot(&self) -> Vec<(Key, Vec<u8>)> {
553        self.snapshot_pairs().unwrap_or_default()
554    }
555
556    fn snapshot_pairs(&self) -> Result<Vec<(Key, Vec<u8>)>> {
557        let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
558        let pairs: Vec<(Key, Vec<u8>)> = txn.scan_prefix(b"").map_err(Error::Core)?.collect();
559        txn.commit_self().map_err(Error::Core)?;
560        Ok(pairs)
561    }
562
563    /// HNSW インデックスを作成し、永続化する。
564    pub fn create_hnsw_index(&self, name: &str, config: HnswConfig) -> Result<()> {
565        let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
566        let index = HnswIndex::create(name, config).map_err(Error::Core)?;
567        index.save(&mut txn).map_err(Error::Core)?;
568        txn.commit_self().map_err(Error::Core)?;
569        self.hnsw_cache_insert(name, index);
570        Ok(())
571    }
572
573    /// HNSW インデックスを削除する。
574    pub fn drop_hnsw_index(&self, name: &str) -> Result<()> {
575        let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
576        let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
577        index.drop(&mut txn).map_err(Error::Core)?;
578        txn.commit_self().map_err(Error::Core)?;
579        self.hnsw_cache_remove(name);
580        Ok(())
581    }
582
583    /// HNSW 統計情報を取得する。
584    pub fn get_hnsw_stats(&self, name: &str) -> Result<HnswStats> {
585        if let Some(index) = self.hnsw_cache_get(name) {
586            return Ok(index.stats());
587        }
588        let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
589        let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
590        let stats = index.stats();
591        self.hnsw_cache_insert(name, index);
592        Ok(stats)
593    }
594
595    /// HNSW インデックスをコンパクションし、結果を返す。
596    pub fn compact_hnsw_index(&self, name: &str) -> Result<alopex_core::vector::CompactionResult> {
597        let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
598        let mut index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
599        let result = index.compact().map_err(Error::Core)?;
600        index.save(&mut txn).map_err(Error::Core)?;
601        txn.commit_self().map_err(Error::Core)?;
602        self.hnsw_cache_insert(name, index);
603        Ok(result)
604    }
605
606    /// HNSW インデックスに検索を行う。
607    pub fn search_hnsw(
608        &self,
609        name: &str,
610        query: &[f32],
611        k: usize,
612        ef_search: Option<usize>,
613    ) -> Result<(Vec<HnswSearchResult>, HnswSearchStats)> {
614        let profile = std::env::var_os("ALOPEX_PROFILE_HNSW").is_some();
615        let total_start = if profile {
616            Some(std::time::Instant::now())
617        } else {
618            None
619        };
620        if let Some(index) = self.hnsw_cache_get(name) {
621            let search_start = if profile {
622                Some(std::time::Instant::now())
623            } else {
624                None
625            };
626            let result = index.search(query, k, ef_search).map_err(Error::Core)?;
627            if let (true, Some(total_start), Some(search_start)) =
628                (profile, total_start, search_start)
629            {
630                let search_time = search_start.elapsed();
631                let total_time = total_start.elapsed();
632                eprintln!(
633                    "alopex.hnsw_search cache=hit name={} k={} ef_search={:?} search_ms={:.2} total_ms={:.2}",
634                    name,
635                    k,
636                    ef_search,
637                    search_time.as_secs_f64() * 1000.0,
638                    total_time.as_secs_f64() * 1000.0
639                );
640            }
641            return Ok(result);
642        }
643
644        let load_start = if profile {
645            Some(std::time::Instant::now())
646        } else {
647            None
648        };
649        let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
650        let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
651        let load_time = load_start.map(|start| start.elapsed());
652        let index = self.hnsw_cache_insert(name, index);
653        let search_start = if profile {
654            Some(std::time::Instant::now())
655        } else {
656            None
657        };
658        let result = index.search(query, k, ef_search).map_err(Error::Core)?;
659        if let (true, Some(total_start), Some(search_start)) = (profile, total_start, search_start)
660        {
661            let search_time = search_start.elapsed();
662            let total_time = total_start.elapsed();
663            let load_time_ms = load_time
664                .map(|elapsed| elapsed.as_secs_f64() * 1000.0)
665                .unwrap_or(0.0);
666            eprintln!(
667                "alopex.hnsw_search cache=miss name={} k={} ef_search={:?} load_ms={:.2} search_ms={:.2} total_ms={:.2}",
668                name,
669                k,
670                ef_search,
671                load_time_ms,
672                search_time.as_secs_f64() * 1000.0,
673                total_time.as_secs_f64() * 1000.0
674            );
675        }
676        Ok(result)
677    }
678
679    /// Creates a chunked large value writer for opaque blobs (beta).
680    pub fn create_blob_writer(
681        &self,
682        path: &Path,
683        total_len: u64,
684        chunk_size: Option<u32>,
685    ) -> Result<LargeValueWriter> {
686        let meta = LargeValueMeta {
687            kind: LargeValueKind::Blob,
688            total_len,
689            chunk_size: chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE),
690        };
691        LargeValueWriter::create(path, meta).map_err(Error::Core)
692    }
693
694    /// Creates a chunked large value writer for typed payloads (beta).
695    pub fn create_typed_writer(
696        &self,
697        path: &Path,
698        type_id: u16,
699        total_len: u64,
700        chunk_size: Option<u32>,
701    ) -> Result<LargeValueWriter> {
702        let meta = LargeValueMeta {
703            kind: LargeValueKind::Typed(type_id),
704            total_len,
705            chunk_size: chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE),
706        };
707        LargeValueWriter::create(path, meta).map_err(Error::Core)
708    }
709
710    /// Opens a chunked large value reader (beta). Kind/type is read from the file header.
711    pub fn open_large_value(&self, path: &Path) -> Result<LargeValueReader> {
712        LargeValueReader::open(path).map_err(Error::Core)
713    }
714
715    /// Begins a new transaction.
716    pub fn begin(&self, mode: TxnMode) -> Result<Transaction<'_>> {
717        let txn = self.store.begin(mode).map_err(Error::Core)?;
718        Ok(Transaction {
719            inner: Some(txn),
720            db: self,
721            hnsw_indices: HashMap::new(),
722            overlay: alopex_sql::catalog::CatalogOverlay::new(),
723            vector_cache_updates: HashMap::new(),
724            vector_cache_deletes: Vec::new(),
725            vector_cache_invalidated: false,
726            catalog_modified: false,
727        })
728    }
729}
730
731impl Default for Database {
732    fn default() -> Self {
733        Self::new()
734    }
735}
736
737/// A database transaction.
738pub struct Transaction<'a> {
739    inner: Option<AnyKVTransaction<'a>>,
740    db: &'a Database,
741    hnsw_indices: HashMap<String, (HnswIndex, alopex_core::vector::hnsw::HnswTransactionState)>,
742    overlay: alopex_sql::catalog::CatalogOverlay,
743    vector_cache_updates: HashMap<Key, CachedVector>,
744    vector_cache_deletes: Vec<Key>,
745    vector_cache_invalidated: bool,
746    /// Whether DDL operations were performed in this transaction.
747    pub(crate) catalog_modified: bool,
748}
749
750/// A search result row containing key, metadata, and similarity score.
751#[derive(Debug, Clone, PartialEq)]
752pub struct SearchResult {
753    /// User key associated with the vector.
754    pub key: Key,
755    /// Opaque metadata payload stored alongside the vector.
756    pub metadata: Vec<u8>,
757    /// Similarity score for the query/vector pair.
758    pub score: f32,
759}
760
761const VECTOR_INDEX_KEY: &[u8] = b"__alopex_vector_index";
762
763impl<'a> Transaction<'a> {
764    pub(crate) fn catalog_overlay(&self) -> &alopex_sql::catalog::CatalogOverlay {
765        &self.overlay
766    }
767
768    pub(crate) fn catalog_overlay_mut(&mut self) -> &mut alopex_sql::catalog::CatalogOverlay {
769        &mut self.overlay
770    }
771
772    pub(crate) fn txn_mode(&self) -> Result<TxnMode> {
773        let txn = self.inner.as_ref().ok_or(Error::TxnCompleted)?;
774        Ok(txn.mode())
775    }
776    /// Retrieves the value for a given key.
777    pub fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
778        self.inner_mut()?.get(&key.to_vec()).map_err(Error::Core)
779    }
780
781    /// Sets a value for a given key.
782    pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
783        self.vector_cache_invalidated = true;
784        self.vector_cache_updates.clear();
785        self.vector_cache_deletes.clear();
786        self.inner_mut()?
787            .put(key.to_vec(), value.to_vec())
788            .map_err(Error::Core)
789    }
790
791    /// Deletes a key-value pair.
792    pub fn delete(&mut self, key: &[u8]) -> Result<()> {
793        self.vector_cache_deletes.push(key.to_vec());
794        self.inner_mut()?.delete(key.to_vec()).map_err(Error::Core)
795    }
796
797    /// Scans all key-value pairs whose keys start with the given prefix.
798    ///
799    /// Returns an iterator over (key, value) pairs.
800    pub fn scan_prefix(
801        &mut self,
802        prefix: &[u8],
803    ) -> Result<Box<dyn Iterator<Item = (Key, Vec<u8>)> + '_>> {
804        self.inner_mut()?.scan_prefix(prefix).map_err(Error::Core)
805    }
806
807    /// HNSW にベクトルをステージング挿入/更新する。
808    pub fn upsert_to_hnsw(
809        &mut self,
810        index_name: &str,
811        key: &[u8],
812        vector: &[f32],
813        metadata: &[u8],
814    ) -> Result<()> {
815        self.ensure_write_txn()?;
816        let (index, state) = self.hnsw_entry_mut(index_name)?;
817        index
818            .upsert_staged(key, vector, metadata, state)
819            .map_err(Error::Core)
820    }
821
822    /// HNSW からキーをステージング削除する。
823    pub fn delete_from_hnsw(&mut self, index_name: &str, key: &[u8]) -> Result<bool> {
824        self.ensure_write_txn()?;
825        let (index, state) = self.hnsw_entry_mut(index_name)?;
826        index.delete_staged(key, state).map_err(Error::Core)
827    }
828
829    /// Upserts a vector and metadata under the provided key after validating dimensions and metric.
830    ///
831    /// A small internal index is maintained to enable scanning for similarity search.
832    pub fn upsert_vector(
833        &mut self,
834        key: &[u8],
835        metadata: &[u8],
836        vector: &[f32],
837        metric: Metric,
838    ) -> Result<()> {
839        if vector.is_empty() {
840            return Err(Error::Core(alopex_core::Error::InvalidFormat(
841                "vector cannot be empty".into(),
842            )));
843        }
844        let vt = VectorType::new(vector.len(), metric);
845        vt.validate(vector).map_err(Error::Core)?;
846
847        let payload = encode_vector_entry(vt, metadata, vector);
848        let txn = self.inner_mut()?;
849        txn.put(key.to_vec(), payload).map_err(Error::Core)?;
850
851        let mut keys = self.load_vector_index()?;
852        if !keys.iter().any(|k| k == key) {
853            keys.push(key.to_vec());
854            self.persist_vector_index(&keys)?;
855        }
856
857        let cached = cached_vector_from_entry(metric, metadata.to_vec(), vector.to_vec());
858        self.vector_cache_updates.insert(key.to_vec(), cached);
859        self.vector_cache_deletes.retain(|k| k != key);
860        Ok(())
861    }
862
863    /// Retrieves a vector stored under the given key.
864    ///
865    /// Returns `None` if the key does not exist. If the key exists but has a different
866    /// metric than specified, returns an error.
867    pub fn get_vector(&mut self, key: &[u8], metric: Metric) -> Result<Option<Vec<f32>>> {
868        let txn = self.inner_mut()?;
869        let key_vec = key.to_vec();
870        let Some(raw) = txn.get(&key_vec).map_err(Error::Core)? else {
871            return Ok(None);
872        };
873        let decoded = decode_vector_entry(&raw).map_err(Error::Core)?;
874        if decoded.metric != metric {
875            return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
876                metric: metric.as_str().to_string(),
877            }));
878        }
879        Ok(Some(decoded.vector))
880    }
881
882    /// Retrieves vectors stored under the given keys in a single transaction call.
883    ///
884    /// Returned list order matches the input `keys` order. Each entry is:
885    /// - `None` if the key does not exist
886    /// - `Some(Vec<f32>)` if the key exists and metric matches
887    ///
888    /// If any existing entry has a different metric than specified, returns an error.
889    pub fn get_vectors(&mut self, keys: &[Key], metric: Metric) -> Result<Vec<Option<Vec<f32>>>> {
890        let txn = self.inner_mut()?;
891        let mut out = Vec::with_capacity(keys.len());
892        for key in keys {
893            let Some(raw) = txn.get(key).map_err(Error::Core)? else {
894                out.push(None);
895                continue;
896            };
897            let decoded = decode_vector_entry(&raw).map_err(Error::Core)?;
898            if decoded.metric != metric {
899                return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
900                    metric: metric.as_str().to_string(),
901                }));
902            }
903            out.push(Some(decoded.vector));
904        }
905        Ok(out)
906    }
907
908    /// Executes a flat similarity search over stored vectors using the provided metric and query.
909    ///
910    /// The optional `filter_keys` restricts the scan to the given keys; otherwise the full
911    /// vector index is scanned. Results are sorted by descending score and truncated to `top_k`.
912    pub fn search_similar(
913        &mut self,
914        query_vector: &[f32],
915        metric: Metric,
916        top_k: usize,
917        filter_keys: Option<&[Key]>,
918    ) -> Result<Vec<SearchResult>> {
919        if top_k == 0 {
920            return Ok(Vec::new());
921        }
922
923        let profile = std::env::var_os("ALOPEX_PROFILE_SEARCH_SIMILAR").is_some();
924        let total_start = if profile {
925            Some(std::time::Instant::now())
926        } else {
927            None
928        };
929        let query_norm_sq = query_vector.iter().map(|v| v * v).sum::<f32>();
930        let query_norm = if matches!(metric, Metric::Cosine) {
931            query_norm_sq.sqrt()
932        } else {
933            0.0
934        };
935        let inv_query_norm = if query_norm == 0.0 {
936            0.0
937        } else {
938            1.0 / query_norm
939        };
940
941        if filter_keys.is_none() && self.txn_mode()? == TxnMode::ReadOnly {
942            let cache = self
943                .db
944                .vector_cache
945                .read()
946                .expect("vector cache lock poisoned");
947            if let Some(cache) = cache.as_ref() {
948                if cache.is_empty() {
949                    return Ok(Vec::new());
950                }
951                let keys_len = cache.len();
952                let mut rows = Vec::with_capacity(keys_len);
953                let mut score_time = std::time::Duration::ZERO;
954                for (key, cached) in cache.iter() {
955                    if cached.metric != metric {
956                        return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
957                            metric: metric.as_str().to_string(),
958                        }));
959                    }
960                    validate_dimensions(cached.vector.len(), query_vector.len())
961                        .map_err(Error::Core)?;
962                    let score_start = if profile {
963                        Some(std::time::Instant::now())
964                    } else {
965                        None
966                    };
967                    let dot = dot_product(query_vector, &cached.vector);
968                    let score = match metric {
969                        Metric::Cosine => {
970                            if cached.inv_norm == 0.0 || inv_query_norm == 0.0 {
971                                0.0
972                            } else {
973                                dot * cached.inv_norm * inv_query_norm
974                            }
975                        }
976                        Metric::L2 => {
977                            let dist_sq = query_norm_sq + cached.norm_sq - 2.0 * dot;
978                            -dist_sq.sqrt()
979                        }
980                        Metric::InnerProduct => dot,
981                    };
982                    if let Some(score_start) = score_start {
983                        score_time += score_start.elapsed();
984                    }
985                    rows.push(SearchResult {
986                        key: key.clone(),
987                        metadata: cached.metadata.clone(),
988                        score,
989                    });
990                }
991
992                let rows_total = rows.len();
993                let sort_start = if profile {
994                    Some(std::time::Instant::now())
995                } else {
996                    None
997                };
998                if rows.len() > top_k {
999                    rows.select_nth_unstable_by(top_k - 1, |a, b| {
1000                        b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key))
1001                    });
1002                    rows.truncate(top_k);
1003                }
1004                rows.sort_by(|a, b| b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key)));
1005                if let (true, Some(total_start), Some(sort_start)) =
1006                    (profile, total_start, sort_start)
1007                {
1008                    let sort_time = sort_start.elapsed();
1009                    let total_time = total_start.elapsed();
1010                    eprintln!(
1011                        "alopex.search_similar keys={} results={} top_k={} load_keys_ms={:.2} get_ms={:.2} decode_ms={:.2} score_ms={:.2} sort_ms={:.2} total_ms={:.2}",
1012                        keys_len,
1013                        rows_total,
1014                        top_k,
1015                        0.0,
1016                        0.0,
1017                        0.0,
1018                        score_time.as_secs_f64() * 1000.0,
1019                        sort_time.as_secs_f64() * 1000.0,
1020                        total_time.as_secs_f64() * 1000.0
1021                    );
1022                }
1023                return Ok(rows);
1024            }
1025        }
1026
1027        let (keys, load_keys_time) = if profile {
1028            let start = std::time::Instant::now();
1029            let keys = match filter_keys {
1030                Some(keys) => keys.to_vec(),
1031                None => self.load_vector_index()?,
1032            };
1033            (keys, start.elapsed())
1034        } else {
1035            let keys = match filter_keys {
1036                Some(keys) => keys.to_vec(),
1037                None => self.load_vector_index()?,
1038            };
1039            (keys, std::time::Duration::ZERO)
1040        };
1041        if keys.is_empty() {
1042            return Ok(Vec::new());
1043        }
1044
1045        let keys_len = keys.len();
1046        let mut rows = Vec::with_capacity(keys.len());
1047        let txn = self.inner_mut()?;
1048        let mut get_time = std::time::Duration::ZERO;
1049        let mut decode_time = std::time::Duration::ZERO;
1050        let mut score_time = std::time::Duration::ZERO;
1051        if profile {
1052            for key in keys {
1053                let get_start = std::time::Instant::now();
1054                let Some(raw) = txn.get(&key).map_err(Error::Core)? else {
1055                    get_time += get_start.elapsed();
1056                    continue;
1057                };
1058                get_time += get_start.elapsed();
1059                let decode_start = std::time::Instant::now();
1060                let decoded = decode_vector_entry_view(&raw).map_err(Error::Core)?;
1061                decode_time += decode_start.elapsed();
1062                if decoded.metric != metric {
1063                    return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
1064                        metric: metric.as_str().to_string(),
1065                    }));
1066                }
1067                validate_dimensions(decoded.dim, query_vector.len()).map_err(Error::Core)?;
1068                let score_start = std::time::Instant::now();
1069                let score =
1070                    score_from_bytes(metric, query_vector, query_norm, decoded.vector_bytes)?;
1071                score_time += score_start.elapsed();
1072                rows.push(SearchResult {
1073                    key,
1074                    metadata: decoded.metadata,
1075                    score,
1076                });
1077            }
1078        } else {
1079            for key in keys {
1080                let Some(raw) = txn.get(&key).map_err(Error::Core)? else {
1081                    continue;
1082                };
1083                let decoded = decode_vector_entry_view(&raw).map_err(Error::Core)?;
1084                if decoded.metric != metric {
1085                    return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
1086                        metric: metric.as_str().to_string(),
1087                    }));
1088                }
1089                validate_dimensions(decoded.dim, query_vector.len()).map_err(Error::Core)?;
1090                let score =
1091                    score_from_bytes(metric, query_vector, query_norm, decoded.vector_bytes)?;
1092                rows.push(SearchResult {
1093                    key,
1094                    metadata: decoded.metadata,
1095                    score,
1096                });
1097            }
1098        }
1099
1100        let rows_total = rows.len();
1101        let sort_start = if profile {
1102            Some(std::time::Instant::now())
1103        } else {
1104            None
1105        };
1106        if rows.len() > top_k {
1107            rows.select_nth_unstable_by(top_k - 1, |a, b| {
1108                b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key))
1109            });
1110            rows.truncate(top_k);
1111        }
1112        rows.sort_by(|a, b| b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key)));
1113        if let (true, Some(total_start), Some(sort_start)) = (profile, total_start, sort_start) {
1114            let sort_time = sort_start.elapsed();
1115            let total_time = total_start.elapsed();
1116            eprintln!(
1117                "alopex.search_similar keys={} results={} top_k={} load_keys_ms={:.2} get_ms={:.2} decode_ms={:.2} score_ms={:.2} sort_ms={:.2} total_ms={:.2}",
1118                keys_len,
1119                rows_total,
1120                top_k,
1121                load_keys_time.as_secs_f64() * 1000.0,
1122                get_time.as_secs_f64() * 1000.0,
1123                decode_time.as_secs_f64() * 1000.0,
1124                score_time.as_secs_f64() * 1000.0,
1125                sort_time.as_secs_f64() * 1000.0,
1126                total_time.as_secs_f64() * 1000.0
1127            );
1128        }
1129        Ok(rows)
1130    }
1131
1132    fn load_vector_index(&mut self) -> Result<Vec<Key>> {
1133        let txn = self.inner_mut()?;
1134        let Some(raw) = txn.get(&VECTOR_INDEX_KEY.to_vec()).map_err(Error::Core)? else {
1135            return Ok(Vec::new());
1136        };
1137        decode_index(&raw).map_err(Error::Core)
1138    }
1139
1140    fn persist_vector_index(&mut self, keys: &[Key]) -> Result<()> {
1141        let txn = self.inner_mut()?;
1142        let encoded = encode_index(keys)?;
1143        txn.put(VECTOR_INDEX_KEY.to_vec(), encoded)
1144            .map_err(Error::Core)
1145    }
1146
1147    /// Commits the transaction, applying all changes.
1148    pub fn commit(mut self) -> Result<()> {
1149        {
1150            let txn = self.inner.as_mut().ok_or(Error::TxnCompleted)?;
1151            for (_, (index, state)) in self.hnsw_indices.iter_mut() {
1152                index.commit_staged(txn, state).map_err(Error::Core)?;
1153            }
1154            let mut catalog = self.db.sql_catalog.write().expect("catalog lock poisoned");
1155            catalog
1156                .persist_overlay(txn, &self.overlay)
1157                .map_err(|err| Error::Sql(err.into()))?;
1158
1159            let vector_cache_invalidated = self.vector_cache_invalidated;
1160            let vector_cache_updates = std::mem::take(&mut self.vector_cache_updates);
1161            let vector_cache_deletes = std::mem::take(&mut self.vector_cache_deletes);
1162            if vector_cache_invalidated {
1163                let mut cache = self
1164                    .db
1165                    .vector_cache
1166                    .write()
1167                    .expect("vector cache lock poisoned");
1168                *cache = None;
1169            } else if !vector_cache_updates.is_empty() || !vector_cache_deletes.is_empty() {
1170                let needs_rebuild = {
1171                    let cache = self
1172                        .db
1173                        .vector_cache
1174                        .read()
1175                        .expect("vector cache lock poisoned");
1176                    cache.is_none()
1177                };
1178                if needs_rebuild {
1179                    let rebuilt = build_vector_cache_from_txn(txn).map_err(Error::Core)?;
1180                    let mut cache = self
1181                        .db
1182                        .vector_cache
1183                        .write()
1184                        .expect("vector cache lock poisoned");
1185                    *cache = Some(rebuilt);
1186                } else {
1187                    let mut cache = self
1188                        .db
1189                        .vector_cache
1190                        .write()
1191                        .expect("vector cache lock poisoned");
1192                    if let Some(cache) = cache.as_mut() {
1193                        for key in vector_cache_deletes {
1194                            cache.remove(&key);
1195                        }
1196                        for (key, cached) in vector_cache_updates {
1197                            cache.insert(key, cached);
1198                        }
1199                    }
1200                }
1201            }
1202        }
1203        let txn = self.inner.take().ok_or(Error::TxnCompleted)?;
1204        let hnsw_indices = std::mem::take(&mut self.hnsw_indices);
1205        txn.commit_self().map_err(Error::Core)?;
1206        if !hnsw_indices.is_empty() {
1207            let mut cache = self
1208                .db
1209                .hnsw_cache
1210                .write()
1211                .expect("hnsw cache lock poisoned");
1212            for (name, (index, _state)) in hnsw_indices {
1213                cache.insert(name, Arc::new(index));
1214            }
1215        }
1216
1217        // KV commit 成功後のみ、カタログにオーバーレイを適用する。
1218        let overlay = std::mem::take(&mut self.overlay);
1219        let catalog_modified = self.catalog_modified;
1220        let mut catalog = self.db.sql_catalog.write().expect("catalog lock poisoned");
1221        catalog.apply_overlay(overlay);
1222        drop(catalog); // Release lock before invalidating cache
1223                       // Invalidate table info cache only if DDL operations were performed
1224        if catalog_modified {
1225            self.db.invalidate_table_info_cache();
1226        }
1227        Ok(())
1228    }
1229
1230    /// トランザクションを消費せずにロールバックする(失敗時の再試行を可能にする)。
1231    pub fn rollback_in_place(&mut self) -> Result<()> {
1232        let txn = self.inner.as_mut().ok_or(Error::TxnCompleted)?;
1233        txn.rollback_in_place().map_err(Error::Core)?;
1234        for (_, (index, state)) in self.hnsw_indices.iter_mut() {
1235            let _ = index.rollback(state);
1236        }
1237        self.hnsw_indices.clear();
1238        self.overlay = alopex_sql::catalog::CatalogOverlay::default();
1239        self.inner = None;
1240        Ok(())
1241    }
1242
1243    /// Rolls back the transaction, discarding all changes.
1244    pub fn rollback(mut self) -> Result<()> {
1245        if let Some(txn) = self.inner.take() {
1246            for (_, (index, state)) in self.hnsw_indices.iter_mut() {
1247                let _ = index.rollback(state);
1248            }
1249            self.hnsw_indices.clear();
1250            txn.rollback_self().map_err(Error::Core)
1251        } else {
1252            Err(Error::TxnCompleted)
1253        }
1254    }
1255
1256    fn inner_mut(&mut self) -> Result<&mut AnyKVTransaction<'a>> {
1257        self.inner.as_mut().ok_or(Error::TxnCompleted)
1258    }
1259
1260    fn hnsw_entry_mut(&mut self, name: &str) -> Result<&mut (HnswIndex, HnswTransactionState)> {
1261        if !self.hnsw_indices.contains_key(name) {
1262            let index = {
1263                let txn = self.inner_mut()?;
1264                HnswIndex::load(name, txn).map_err(Error::Core)?
1265            };
1266            self.hnsw_indices
1267                .insert(name.to_string(), (index, HnswTransactionState::default()));
1268        }
1269        Ok(self.hnsw_indices.get_mut(name).unwrap())
1270    }
1271
1272    fn ensure_write_txn(&self) -> Result<()> {
1273        let txn = self.inner.as_ref().ok_or(Error::TxnCompleted)?;
1274        if txn.mode() != TxnMode::ReadWrite {
1275            return Err(Error::Core(alopex_core::Error::TxnReadOnly));
1276        }
1277        Ok(())
1278    }
1279}
1280
1281impl<'a> Drop for Transaction<'a> {
1282    fn drop(&mut self) {
1283        if let Some(txn) = self.inner.take() {
1284            for (_, (index, state)) in self.hnsw_indices.iter_mut() {
1285                let _ = index.rollback(state);
1286            }
1287            self.hnsw_indices.clear();
1288            let _ = txn.rollback_self();
1289        }
1290    }
1291}
1292
1293fn metric_to_byte(metric: Metric) -> u8 {
1294    match metric {
1295        Metric::Cosine => 0,
1296        Metric::L2 => 1,
1297        Metric::InnerProduct => 2,
1298    }
1299}
1300
1301fn byte_to_metric(byte: u8) -> result::Result<Metric, alopex_core::Error> {
1302    match byte {
1303        0 => Ok(Metric::Cosine),
1304        1 => Ok(Metric::L2),
1305        2 => Ok(Metric::InnerProduct),
1306        other => Err(alopex_core::Error::UnsupportedMetric {
1307            metric: format!("unknown({other})"),
1308        }),
1309    }
1310}
1311
1312fn encode_vector_entry(vector_type: VectorType, metadata: &[u8], vector: &[f32]) -> Vec<u8> {
1313    let dim = vector_type.dim() as u32;
1314    let meta_len = metadata.len() as u32;
1315    let mut buf = Vec::with_capacity(1 + 4 + 4 + metadata.len() + std::mem::size_of_val(vector));
1316    buf.push(metric_to_byte(vector_type.metric()));
1317    buf.extend_from_slice(&dim.to_le_bytes());
1318    buf.extend_from_slice(&meta_len.to_le_bytes());
1319    buf.extend_from_slice(metadata);
1320    for v in vector {
1321        buf.extend_from_slice(&v.to_le_bytes());
1322    }
1323    buf
1324}
1325
1326struct DecodedEntry {
1327    metric: Metric,
1328    vector: Vec<f32>,
1329}
1330
1331#[derive(Clone)]
1332struct CachedVector {
1333    metric: Metric,
1334    metadata: Vec<u8>,
1335    vector: Vec<f32>,
1336    norm_sq: f32,
1337    inv_norm: f32,
1338}
1339
1340struct VectorEntryView<'a> {
1341    metric: Metric,
1342    dim: usize,
1343    metadata: Vec<u8>,
1344    vector_bytes: &'a [u8],
1345}
1346
1347fn decode_vector_entry(bytes: &[u8]) -> result::Result<DecodedEntry, alopex_core::Error> {
1348    if bytes.len() < 9 {
1349        return Err(alopex_core::Error::InvalidFormat(
1350            "vector entry too short".into(),
1351        ));
1352    }
1353    let metric = byte_to_metric(bytes[0])?;
1354    let dim = u32::from_le_bytes(bytes[1..5].try_into().unwrap()) as usize;
1355    let meta_len = u32::from_le_bytes(bytes[5..9].try_into().unwrap()) as usize;
1356
1357    let header = 9;
1358    let expected_len = header + meta_len + dim * std::mem::size_of::<f32>();
1359    if bytes.len() < expected_len {
1360        return Err(alopex_core::Error::InvalidFormat(
1361            "vector entry truncated".into(),
1362        ));
1363    }
1364
1365    let mut vector = Vec::with_capacity(dim);
1366    let vec_bytes = &bytes[header + meta_len..expected_len];
1367    for chunk in vec_bytes.chunks_exact(4) {
1368        vector.push(f32::from_le_bytes(chunk.try_into().unwrap()));
1369    }
1370
1371    Ok(DecodedEntry { metric, vector })
1372}
1373
1374fn decode_vector_entry_view(
1375    bytes: &[u8],
1376) -> result::Result<VectorEntryView<'_>, alopex_core::Error> {
1377    if bytes.len() < 9 {
1378        return Err(alopex_core::Error::InvalidFormat(
1379            "vector entry too short".into(),
1380        ));
1381    }
1382    let metric = byte_to_metric(bytes[0])?;
1383    let dim = u32::from_le_bytes(bytes[1..5].try_into().unwrap()) as usize;
1384    let meta_len = u32::from_le_bytes(bytes[5..9].try_into().unwrap()) as usize;
1385
1386    let header = 9;
1387    let expected_len = header + meta_len + dim * std::mem::size_of::<f32>();
1388    if bytes.len() < expected_len {
1389        return Err(alopex_core::Error::InvalidFormat(
1390            "vector entry truncated".into(),
1391        ));
1392    }
1393
1394    let metadata = bytes[header..header + meta_len].to_vec();
1395    let vector_bytes = &bytes[header + meta_len..expected_len];
1396
1397    Ok(VectorEntryView {
1398        metric,
1399        dim,
1400        metadata,
1401        vector_bytes,
1402    })
1403}
1404
1405fn vector_bytes_to_vec(bytes: &[u8]) -> Vec<f32> {
1406    let mut vector = Vec::with_capacity(bytes.len() / 4);
1407    for chunk in bytes.chunks_exact(4) {
1408        vector.push(f32::from_le_bytes(chunk.try_into().unwrap()));
1409    }
1410    vector
1411}
1412
1413fn cached_vector_from_entry(metric: Metric, metadata: Vec<u8>, vector: Vec<f32>) -> CachedVector {
1414    let norm_sq = vector.iter().map(|v| v * v).sum::<f32>();
1415    let inv_norm = if norm_sq == 0.0 {
1416        0.0
1417    } else {
1418        1.0 / norm_sq.sqrt()
1419    };
1420    CachedVector {
1421        metric,
1422        metadata,
1423        vector,
1424        norm_sq,
1425        inv_norm,
1426    }
1427}
1428
1429fn build_vector_cache_from_txn<'a>(
1430    txn: &mut AnyKVTransaction<'a>,
1431) -> result::Result<HashMap<Key, CachedVector>, alopex_core::Error> {
1432    let Some(raw) = txn.get(&VECTOR_INDEX_KEY.to_vec())? else {
1433        return Ok(HashMap::new());
1434    };
1435    let keys = decode_index(&raw)?;
1436    let mut cache = HashMap::with_capacity(keys.len());
1437    for key in keys {
1438        let Some(raw) = txn.get(&key)? else {
1439            continue;
1440        };
1441        let decoded = decode_vector_entry_view(&raw)?;
1442        let vector = vector_bytes_to_vec(decoded.vector_bytes);
1443        let cached = cached_vector_from_entry(decoded.metric, decoded.metadata, vector);
1444        cache.insert(key, cached);
1445    }
1446    Ok(cache)
1447}
1448
1449fn dot_product(query: &[f32], item: &[f32]) -> f32 {
1450    #[cfg(target_arch = "x86_64")]
1451    {
1452        if std::is_x86_feature_detected!("avx") {
1453            // SAFETY: guarded by runtime feature detection.
1454            unsafe {
1455                return dot_product_avx(query, item);
1456            }
1457        }
1458    }
1459    dot_product_scalar(query, item)
1460}
1461
1462fn dot_product_scalar(query: &[f32], item: &[f32]) -> f32 {
1463    query.iter().zip(item.iter()).map(|(q, v)| q * v).sum()
1464}
1465
1466#[cfg(target_arch = "x86_64")]
1467#[target_feature(enable = "avx")]
1468unsafe fn dot_product_avx(query: &[f32], item: &[f32]) -> f32 {
1469    use std::arch::x86_64::*;
1470
1471    let len = query.len();
1472    let mut i = 0;
1473    let mut acc = _mm256_setzero_ps();
1474    let q_ptr = query.as_ptr();
1475    let v_ptr = item.as_ptr();
1476    while i + 8 <= len {
1477        let q = _mm256_loadu_ps(q_ptr.add(i));
1478        let v = _mm256_loadu_ps(v_ptr.add(i));
1479        acc = _mm256_add_ps(acc, _mm256_mul_ps(q, v));
1480        i += 8;
1481    }
1482
1483    let mut tmp = [0f32; 8];
1484    _mm256_storeu_ps(tmp.as_mut_ptr(), acc);
1485    let mut sum = tmp.iter().sum::<f32>();
1486    while i < len {
1487        sum += *q_ptr.add(i) * *v_ptr.add(i);
1488        i += 1;
1489    }
1490    sum
1491}
1492
1493fn score_from_slice(metric: Metric, query: &[f32], query_norm: f32, item: &[f32]) -> f32 {
1494    match metric {
1495        Metric::Cosine => {
1496            if query_norm == 0.0 {
1497                return 0.0;
1498            }
1499            let mut dot = 0.0;
1500            let mut item_norm_sq = 0.0;
1501            for (q, v) in query.iter().zip(item.iter()) {
1502                dot += q * v;
1503                item_norm_sq += v * v;
1504            }
1505            let item_norm = item_norm_sq.sqrt();
1506            if item_norm == 0.0 {
1507                0.0
1508            } else {
1509                dot / (query_norm * item_norm)
1510            }
1511        }
1512        Metric::L2 => {
1513            let mut dist_sq = 0.0;
1514            for (q, v) in query.iter().zip(item.iter()) {
1515                let d = q - v;
1516                dist_sq += d * d;
1517            }
1518            -dist_sq.sqrt()
1519        }
1520        Metric::InnerProduct => query.iter().zip(item.iter()).map(|(q, v)| q * v).sum(),
1521    }
1522}
1523
1524fn score_from_bytes(
1525    metric: Metric,
1526    query: &[f32],
1527    query_norm: f32,
1528    vector_bytes: &[u8],
1529) -> result::Result<f32, alopex_core::Error> {
1530    let len = vector_bytes.len() / 4;
1531    #[cfg(target_endian = "little")]
1532    {
1533        let ptr = vector_bytes.as_ptr();
1534        if (ptr as usize).is_multiple_of(std::mem::align_of::<f32>()) {
1535            let items = unsafe { std::slice::from_raw_parts(ptr as *const f32, len) };
1536            return Ok(score_from_slice(metric, query, query_norm, items));
1537        }
1538    }
1539
1540    // Fallback for unaligned or big-endian targets: decode each f32 from bytes.
1541    let mut iter = vector_bytes.chunks_exact(4);
1542    let score = match metric {
1543        Metric::Cosine => {
1544            if query_norm == 0.0 {
1545                0.0
1546            } else {
1547                let mut dot = 0.0;
1548                let mut item_norm_sq = 0.0;
1549                for (q, chunk) in query.iter().zip(&mut iter) {
1550                    let v = f32::from_le_bytes(chunk.try_into().unwrap());
1551                    dot += q * v;
1552                    item_norm_sq += v * v;
1553                }
1554                let item_norm = item_norm_sq.sqrt();
1555                if item_norm == 0.0 {
1556                    0.0
1557                } else {
1558                    dot / (query_norm * item_norm)
1559                }
1560            }
1561        }
1562        Metric::L2 => {
1563            let mut dist_sq = 0.0;
1564            for (q, chunk) in query.iter().zip(&mut iter) {
1565                let v = f32::from_le_bytes(chunk.try_into().unwrap());
1566                let d = q - v;
1567                dist_sq += d * d;
1568            }
1569            -dist_sq.sqrt()
1570        }
1571        Metric::InnerProduct => query
1572            .iter()
1573            .zip(&mut iter)
1574            .map(|(q, chunk)| q * f32::from_le_bytes(chunk.try_into().unwrap()))
1575            .sum(),
1576    };
1577    Ok(score)
1578}
1579
1580fn encode_index(keys: &[Key]) -> result::Result<Vec<u8>, alopex_core::Error> {
1581    let mut buf = Vec::new();
1582    let count = keys.len() as u32;
1583    buf.extend_from_slice(&count.to_le_bytes());
1584    for key in keys {
1585        let len: u32 = key
1586            .len()
1587            .try_into()
1588            .map_err(|_| alopex_core::Error::InvalidFormat("key too long".into()))?;
1589        buf.extend_from_slice(&len.to_le_bytes());
1590        buf.extend_from_slice(key);
1591    }
1592    Ok(buf)
1593}
1594
1595fn decode_index(bytes: &[u8]) -> result::Result<Vec<Key>, alopex_core::Error> {
1596    if bytes.len() < 4 {
1597        return Err(alopex_core::Error::InvalidFormat("index too short".into()));
1598    }
1599    let count = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
1600    let mut pos = 4;
1601    let mut keys = Vec::with_capacity(count);
1602    for _ in 0..count {
1603        if pos + 4 > bytes.len() {
1604            return Err(alopex_core::Error::InvalidFormat("index truncated".into()));
1605        }
1606        let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
1607        pos += 4;
1608        if pos + len > bytes.len() {
1609            return Err(alopex_core::Error::InvalidFormat(
1610                "index key truncated".into(),
1611            ));
1612        }
1613        keys.push(bytes[pos..pos + len].to_vec());
1614        pos += len;
1615    }
1616    Ok(keys)
1617}
1618
1619#[cfg(test)]
1620mod tests {
1621    use super::*;
1622    use std::sync::mpsc;
1623    use std::thread;
1624    use tempfile::tempdir;
1625
1626    #[test]
1627    fn test_open_and_crud() {
1628        let dir = tempdir().unwrap();
1629        let path = dir.path().join("test.db");
1630        let db = Database::open(&path).unwrap();
1631
1632        let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1633        txn.put(b"key1", b"value1").unwrap();
1634        txn.commit().unwrap();
1635
1636        let mut txn2 = db.begin(TxnMode::ReadOnly).unwrap();
1637        let val = txn2.get(b"key1").unwrap();
1638        assert_eq!(val, Some(b"value1".to_vec()));
1639    }
1640
1641    #[test]
1642    fn test_not_found() {
1643        let db = Database::new();
1644        let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
1645        let val = txn.get(b"non-existent-key").unwrap();
1646        assert!(val.is_none());
1647    }
1648
1649    #[cfg(not(target_arch = "wasm32"))]
1650    #[test]
1651    fn test_file_format_version_reads_alopex_header() {
1652        use alopex_core::storage::format::{AlopexFileWriter, FileFlags, FileVersion};
1653
1654        let dir = tempdir().unwrap();
1655        let path = dir.path().join("format-test.alopex");
1656        let expected = FileVersion::new(0, 0, 1);
1657
1658        let writer = AlopexFileWriter::new(path.clone(), expected, FileFlags(0)).unwrap();
1659        writer.finalize().unwrap();
1660
1661        let db = Database::open(&path).unwrap();
1662        assert_eq!(db.file_format_version(), expected);
1663    }
1664
1665    #[test]
1666    fn test_crash_recovery_replays_wal() {
1667        let dir = tempdir().unwrap();
1668        let path = dir.path().join("replay.db");
1669
1670        {
1671            let db = Database::open(&path).unwrap();
1672            let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1673            txn.put(b"k1", b"v1").unwrap();
1674            txn.commit().unwrap();
1675
1676            let mut uncommitted = db.begin(TxnMode::ReadWrite).unwrap();
1677            uncommitted.put(b"k2", b"v2").unwrap();
1678            // Drop without commit to simulate crash before commit.
1679        }
1680
1681        let db = Database::open(&path).unwrap();
1682        let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
1683        assert_eq!(txn.get(b"k1").unwrap(), Some(b"v1".to_vec()));
1684        assert_eq!(txn.get(b"k2").unwrap(), None);
1685    }
1686
1687    #[test]
1688    fn test_txn_closed() {
1689        let db = Database::new();
1690        let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1691        txn.put(b"k1", b"v1").unwrap();
1692        txn.commit().unwrap();
1693        // The `commit` call consumes the transaction, so we can't call it again.
1694        // This test verifies that we can't use a transaction after it's been completed.
1695        // The `inner_mut` method will return `Error::TxnCompleted`.
1696        // This is a compile-time check in practice, but we can't write a test that fails to compile.
1697        // The logic is sound.
1698    }
1699
1700    #[test]
1701    fn test_concurrency_conflict() {
1702        let db = std::sync::Arc::new(Database::new());
1703        let mut t0 = db.begin(TxnMode::ReadWrite).unwrap();
1704        t0.put(b"k1", b"v0").unwrap();
1705        t0.commit().unwrap();
1706
1707        let (tx1, rx1) = mpsc::channel();
1708        let (tx2, rx2) = mpsc::channel();
1709
1710        let db1 = db.clone();
1711        let t1 = thread::spawn(move || {
1712            let mut txn1 = db1.begin(TxnMode::ReadWrite).unwrap();
1713            let val = txn1.get(b"k1").unwrap();
1714            assert_eq!(val.unwrap(), b"v0");
1715            tx1.send(()).unwrap();
1716            rx2.recv().unwrap();
1717            txn1.put(b"k1", b"v1").unwrap();
1718            let result = txn1.commit();
1719            assert!(matches!(
1720                result,
1721                Err(Error::Core(alopex_core::Error::TxnConflict))
1722            ));
1723        });
1724
1725        let db2 = db.clone();
1726        let t2 = thread::spawn(move || {
1727            rx1.recv().unwrap();
1728            let mut txn2 = db2.begin(TxnMode::ReadWrite).unwrap();
1729            txn2.put(b"k1", b"v2").unwrap();
1730            assert!(txn2.commit().is_ok());
1731            tx2.send(()).unwrap();
1732        });
1733
1734        t1.join().unwrap();
1735        t2.join().unwrap();
1736
1737        let mut txn3 = db.begin(TxnMode::ReadOnly).unwrap();
1738        let val = txn3.get(b"k1").unwrap();
1739        assert_eq!(val.unwrap(), b"v2");
1740    }
1741
1742    #[test]
1743    fn test_flush_and_reopen_via_embedded_api() {
1744        let dir = tempdir().unwrap();
1745        let path = dir.path().join("persist.db");
1746        {
1747            let db = Database::open(&path).unwrap();
1748            let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1749            txn.put(b"k1", b"v1").unwrap();
1750            txn.commit().unwrap();
1751            db.flush().unwrap();
1752        }
1753
1754        let db = Database::open(&path).unwrap();
1755        let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
1756        assert_eq!(txn.get(b"k1").unwrap(), Some(b"v1".to_vec()));
1757    }
1758
1759    #[test]
1760    fn test_large_value_blob_roundtrip() {
1761        let dir = tempdir().unwrap();
1762        let path = dir.path().join("blob.lv");
1763        let payload = b"hello large value";
1764
1765        {
1766            let db = Database::new();
1767            let mut writer = db
1768                .create_blob_writer(&path, payload.len() as u64, Some(16))
1769                .unwrap();
1770            writer.write_chunk(&payload[..5]).unwrap();
1771            writer.write_chunk(&payload[5..]).unwrap();
1772            writer.finish().unwrap();
1773        }
1774
1775        let db = Database::new();
1776        let mut reader = db.open_large_value(&path).unwrap();
1777        let mut buf = Vec::new();
1778        while let Some((_info, chunk)) = reader.next_chunk().unwrap() {
1779            buf.extend_from_slice(&chunk);
1780        }
1781        assert_eq!(buf, payload);
1782    }
1783
1784    #[test]
1785    fn upsert_and_search_same_txn() {
1786        let db = Database::new();
1787        let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1788        txn.upsert_vector(b"k1", b"meta1", &[1.0, 0.0], Metric::Cosine)
1789            .unwrap();
1790
1791        let results = txn
1792            .search_similar(&[1.0, 0.0], Metric::Cosine, 1, None)
1793            .unwrap();
1794        assert_eq!(results.len(), 1);
1795        assert_eq!(results[0].key, b"k1");
1796        assert_eq!(results[0].metadata, b"meta1");
1797        txn.commit().unwrap();
1798    }
1799
1800    #[test]
1801    fn upsert_and_search_across_txn() {
1802        let db = Database::new();
1803        {
1804            let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1805            txn.upsert_vector(b"k1", b"meta1", &[1.0, 1.0], Metric::Cosine)
1806                .unwrap();
1807            txn.commit().unwrap();
1808        }
1809
1810        let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
1811        let results = ro
1812            .search_similar(&[1.0, 1.0], Metric::Cosine, 1, None)
1813            .unwrap();
1814        assert_eq!(results.len(), 1);
1815        assert_eq!(results[0].key, b"k1");
1816    }
1817
1818    #[test]
1819    fn read_only_upsert_rejected() {
1820        let db = Database::new();
1821        let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
1822        let err = ro
1823            .upsert_vector(b"k1", b"m", &[1.0, 0.0], Metric::Cosine)
1824            .unwrap_err();
1825        assert!(matches!(err, Error::Core(alopex_core::Error::TxnReadOnly)));
1826    }
1827
1828    #[test]
1829    fn dimension_mismatch_on_search() {
1830        let db = Database::new();
1831        {
1832            let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1833            txn.upsert_vector(b"k1", b"m", &[1.0, 0.0], Metric::Cosine)
1834                .unwrap();
1835            txn.commit().unwrap();
1836        }
1837        let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
1838        let err = ro
1839            .search_similar(&[1.0, 0.0, 1.0], Metric::Cosine, 1, None)
1840            .unwrap_err();
1841        assert!(matches!(
1842            err,
1843            Error::Core(alopex_core::Error::DimensionMismatch { .. })
1844        ));
1845    }
1846}