Skip to main content

alopex_embedded/
lib.rs

1//! A user-friendly embedded API for the AlopexDB key-value store.
2
3#![deny(missing_docs)]
4
5/// Catalog metadata API (in-memory, primarily for Python bindings).
6pub mod catalog;
7pub mod catalog_api;
8pub mod columnar_api;
9mod dataframe_api;
10pub mod options;
11mod sql_api;
12mod txn_manager;
13
14pub use crate::catalog::{CachedTableInfo, Catalog};
15pub use crate::catalog_api::{
16    CatalogInfo, ColumnDefinition, ColumnInfo, CreateCatalogRequest, CreateNamespaceRequest,
17    CreateTableRequest, IndexInfo, NamespaceInfo, StorageInfo, TableInfo,
18};
19pub use crate::columnar_api::{
20    ColumnarIndexInfo, ColumnarIndexType, ColumnarRowIterator, EmbeddedConfig, StorageMode,
21};
22pub use crate::options::DatabaseOptions;
23pub use crate::sql_api::{SqlStreamingResult, StreamingQueryResult, StreamingRows};
24pub use crate::txn_manager::{TransactionInfo, TransactionManager};
25pub use alopex_dataframe::{DataFrame, JoinKeys, JoinType, SortOptions};
26pub use alopex_sql::{DataSourceFormat, TableType};
27/// `Database::execute_sql()` / `Transaction::execute_sql()` の返却型。
28pub type SqlResult = alopex_sql::SqlResult;
29use alopex_core::vector::hnsw::{HnswTransactionState, SearchStats as HnswSearchStats};
30use alopex_core::{
31    columnar::{
32        kvs_bridge::ColumnarKvsBridge, memory::InMemorySegmentStore, segment_v2::SegmentConfigV2,
33    },
34    kv::any::AnyKVTransaction,
35    kv::memory::MemoryKV,
36    kv::AnyKV,
37    validate_dimensions, HnswIndex, KVStore, KVTransaction, Key, LargeValueKind, LargeValueMeta,
38    LargeValueReader, LargeValueWriter, StorageFactory, VectorType, DEFAULT_CHUNK_SIZE,
39};
40pub use alopex_core::{HnswConfig, HnswSearchResult, HnswStats, MemoryStats, Metric, TxnMode};
41/// Streaming query row iterator for FR-7 compliance.
42pub use alopex_sql::executor::QueryRowIterator;
43use std::collections::HashMap;
44use std::convert::TryInto;
45use std::fs;
46use std::path::{Path, PathBuf};
47use std::result;
48use std::sync::atomic::{AtomicU64, Ordering};
49use std::sync::{Arc, RwLock};
50
51/// A convenience `Result` type for database operations.
52pub type Result<T> = result::Result<T, Error>;
53
54/// The error type for embedded database operations.
55#[derive(Debug, thiserror::Error)]
56pub enum Error {
57    /// An error from the underlying core storage engine.
58    #[error("core error: {0}")]
59    Core(#[from] alopex_core::Error),
60    /// An error from the SQL execution pipeline.
61    #[error("{0}")]
62    Sql(#[from] alopex_sql::SqlError),
63    /// An error from DataFrame operations.
64    #[error("{0}")]
65    DataFrame(#[from] alopex_dataframe::DataFrameError),
66    /// The transaction has already been completed and cannot be used.
67    #[error("transaction is completed")]
68    TxnCompleted,
69    /// Catalog が見つかりません。
70    #[error("カタログが見つかりません: {0}")]
71    CatalogNotFound(String),
72    /// Catalog が既に存在します。
73    #[error("カタログは既に存在します: {0}")]
74    CatalogAlreadyExists(String),
75    /// Catalog が空ではありません。
76    #[error("カタログが空ではありません: {0}")]
77    CatalogNotEmpty(String),
78    /// Namespace が見つかりません。
79    #[error("ネームスペースが見つかりません: {0}.{1}")]
80    NamespaceNotFound(String, String),
81    /// Namespace が既に存在します。
82    #[error("ネームスペースは既に存在します: {0}.{1}")]
83    NamespaceAlreadyExists(String, String),
84    /// Namespace が空ではありません。
85    #[error("ネームスペースが空ではありません: {0}.{1}")]
86    NamespaceNotEmpty(String, String),
87    /// The requested table was not found or is invalid.
88    #[error("table not found: {0}")]
89    TableNotFound(String),
90    /// Table が既に存在します。
91    #[error("テーブルは既に存在します: {0}")]
92    TableAlreadyExists(String),
93    /// Index が見つかりません。
94    #[error("インデックスが見つかりません: {0}")]
95    IndexNotFound(String),
96    /// default オブジェクトは削除できません。
97    #[error("default オブジェクトは削除できません: {0}")]
98    CannotDeleteDefault(String),
99    /// Managed テーブルにはスキーマが必要です。
100    #[error("managed テーブルにはスキーマが必要です")]
101    SchemaRequired,
102    /// External テーブルには storage_root が必要です。
103    #[error("external テーブルには storage_root が必要です")]
104    StorageRootRequired,
105    /// トランザクションは read-only です。
106    #[error("トランザクションは読み取り専用です")]
107    TxnReadOnly,
108    /// Transaction ID is invalid or missing.
109    #[error("invalid transaction id: {0}")]
110    InvalidTransactionId(String),
111    /// The operation requires in-memory columnar mode.
112    #[error("not in in-memory columnar mode")]
113    NotInMemoryMode,
114    /// The requested data source format is not supported.
115    #[error("unsupported data source format: {0}")]
116    UnsupportedDataSourceFormat(String),
117    /// The catalog store lock was poisoned.
118    #[error("catalog lock poisoned")]
119    CatalogLockPoisoned,
120}
121
122impl Error {
123    /// SQL エラーの場合はエラーコード(例: `ALOPEX-S003`)を返す。
124    pub fn sql_error_code(&self) -> Option<&'static str> {
125        match self {
126            Self::Sql(err) => Some(err.code()),
127            _ => None,
128        }
129    }
130}
131
132/// The main database object.
133pub struct Database {
134    /// The underlying key-value store.
135    pub(crate) store: Arc<AnyKV>,
136    pub(crate) sql_catalog: Arc<RwLock<alopex_sql::catalog::PersistentCatalog<AnyKV>>>,
137    pub(crate) hnsw_cache: RwLock<HashMap<String, Arc<HnswIndex>>>,
138    pub(crate) vector_cache: RwLock<Option<HashMap<Key, CachedVector>>>,
139    /// Table info cache for scan/write operations.
140    pub(crate) table_info_cache: RwLock<HashMap<String, CachedTableInfo>>,
141    /// Cache epoch for invalidation (incremented on DDL operations).
142    pub(crate) table_info_cache_epoch: AtomicU64,
143    pub(crate) columnar_mode: StorageMode,
144    pub(crate) columnar_bridge: ColumnarKvsBridge,
145    pub(crate) columnar_memory: Option<InMemorySegmentStore>,
146    pub(crate) segment_config: SegmentConfigV2,
147}
148
149pub(crate) fn disk_data_dir_path(path: &Path) -> std::path::PathBuf {
150    if path.extension().is_some_and(|e| e == "alopex") {
151        // v0.1 file-mode はディレクトリに WAL/SSTable を持つため、`.alopex` の横に
152        // sidecar ディレクトリを作ってそこへ格納する。
153        path.with_extension("alopex.d")
154    } else {
155        path.to_path_buf()
156    }
157}
158
159#[cfg(not(target_arch = "wasm32"))]
160fn read_file_version_from_storage(path: &Path) -> alopex_core::storage::format::FileVersion {
161    use alopex_core::storage::format::{FileHeader, FileVersion, HEADER_SIZE};
162    use std::io::Read;
163
164    let Some(file_path) = resolve_format_file_path(path) else {
165        return FileVersion::CURRENT;
166    };
167
168    let mut header_bytes = [0u8; HEADER_SIZE];
169    let Ok(mut file) = fs::File::open(file_path) else {
170        return FileVersion::CURRENT;
171    };
172    if file.read_exact(&mut header_bytes).is_err() {
173        return FileVersion::CURRENT;
174    }
175    match FileHeader::from_bytes(&header_bytes) {
176        Ok(header) => header.version,
177        Err(_) => FileVersion::CURRENT,
178    }
179}
180
181#[cfg(target_arch = "wasm32")]
182fn read_file_version_from_storage(_path: &Path) -> alopex_core::storage::format::FileVersion {
183    alopex_core::storage::format::FileVersion::CURRENT
184}
185
186#[cfg(not(target_arch = "wasm32"))]
187fn resolve_format_file_path(path: &Path) -> Option<PathBuf> {
188    if path.is_file() {
189        return Some(path.to_path_buf());
190    }
191
192    if path.is_dir() {
193        if let Some(ext) = path.extension() {
194            if ext == "d" {
195                let candidate = path.with_extension("");
196                if candidate.is_file() {
197                    return Some(candidate);
198                }
199            }
200        }
201
202        if let Ok(entries) = fs::read_dir(path) {
203            for entry in entries.flatten() {
204                let entry_path = entry.path();
205                if entry_path.extension().is_some_and(|ext| ext == "alopex") && entry_path.is_file()
206                {
207                    return Some(entry_path);
208                }
209            }
210        }
211    }
212
213    None
214}
215
216impl Database {
217    /// Opens a database at the specified path.
218    pub fn open(path: &Path) -> Result<Self> {
219        let data_dir = disk_data_dir_path(path);
220        let store = StorageFactory::create(alopex_core::StorageMode::Disk {
221            path: data_dir,
222            config: None,
223        })
224        .map_err(Error::Core)?;
225        let mut db = Self::init(store, StorageMode::Disk, None, SegmentConfigV2::default());
226        db.load_sql_catalog()?;
227        Ok(db)
228    }
229
230    /// Creates a new, purely in-memory (transient) database.
231    pub fn new() -> Self {
232        let store = AnyKV::Memory(MemoryKV::new());
233        Self::init(
234            store,
235            StorageMode::InMemory,
236            None,
237            SegmentConfigV2::default(),
238        )
239    }
240
241    /// Opens a database in in-memory mode with default options.
242    pub fn open_in_memory() -> Result<Self> {
243        Self::open_in_memory_with_options(DatabaseOptions::in_memory())
244    }
245
246    /// Opens a database in in-memory mode with the given options.
247    pub fn open_in_memory_with_options(opts: DatabaseOptions) -> Result<Self> {
248        if !opts.memory_mode() {
249            return Err(Error::Core(alopex_core::Error::InvalidFormat(
250                "memory_mode must be enabled for in-memory open".into(),
251            )));
252        }
253        let store = StorageFactory::create(opts.to_storage_mode(None)).map_err(Error::Core)?;
254        let mut db = Self::init(
255            store,
256            StorageMode::InMemory,
257            opts.memory_limit(),
258            SegmentConfigV2::default(),
259        );
260        db.load_sql_catalog()?;
261        Ok(db)
262    }
263
264    /// Opens a database from a URI string.
265    ///
266    /// Supported URI schemes:
267    /// - `file://path` or bare path: Local filesystem
268    /// - `s3://bucket/prefix`: S3-compatible storage (requires `s3` feature)
269    ///
270    /// # Example
271    ///
272    /// ```ignore
273    /// // Local path
274    /// let db = Database::open_with_uri("/path/to/db")?;
275    ///
276    /// // S3 URI (requires s3 feature and credentials)
277    /// let db = Database::open_with_uri("s3://my-bucket/data")?;
278    /// ```
279    pub fn open_with_uri(uri: &str) -> Result<Self> {
280        // Check for S3 URI
281        if uri.starts_with("s3://") {
282            #[cfg(feature = "s3")]
283            {
284                return Self::open_s3(uri);
285            }
286            #[cfg(not(feature = "s3"))]
287            {
288                return Err(Error::Core(alopex_core::Error::InvalidFormat(
289                    "S3 support requires the 's3' feature".into(),
290                )));
291            }
292        }
293
294        // Strip file:// prefix if present
295        let path = if let Some(stripped) = uri.strip_prefix("file://") {
296            stripped
297        } else {
298            uri
299        };
300
301        Self::open(Path::new(path))
302    }
303
304    /// Opens a database backed by S3 storage.
305    ///
306    /// This method downloads data from S3 to a local cache, operates on it
307    /// using LsmKV, and syncs changes back to S3 on flush/close.
308    ///
309    /// # Arguments
310    ///
311    /// * `uri` - S3 URI in the format `s3://bucket/prefix`
312    ///
313    /// # Environment Variables
314    ///
315    /// Required:
316    /// * `AWS_ACCESS_KEY_ID` - AWS access key
317    /// * `AWS_SECRET_ACCESS_KEY` - AWS secret key
318    ///
319    /// Optional:
320    /// * `AWS_REGION` - AWS region (default: us-east-1)
321    /// * `AWS_ENDPOINT_URL` - Custom endpoint for S3-compatible services
322    #[cfg(feature = "s3")]
323    pub fn open_s3(uri: &str) -> Result<Self> {
324        let s3_config = alopex_core::S3Config::from_uri(uri).map_err(Error::Core)?;
325        let store = StorageFactory::create(alopex_core::StorageMode::S3 { config: s3_config })
326            .map_err(Error::Core)?;
327        let mut db = Self::init(store, StorageMode::Disk, None, SegmentConfigV2::default());
328        db.load_sql_catalog()?;
329        Ok(db)
330    }
331
332    pub(crate) fn init(
333        store: AnyKV,
334        columnar_mode: StorageMode,
335        memory_limit: Option<usize>,
336        segment_config: SegmentConfigV2,
337    ) -> Self {
338        let store = Arc::new(store);
339        let sql_catalog = Arc::new(RwLock::new(alopex_sql::catalog::PersistentCatalog::new(
340            store.clone(),
341        )));
342        let columnar_bridge = ColumnarKvsBridge::new(store.clone());
343        let columnar_memory = if matches!(columnar_mode, StorageMode::InMemory) {
344            Some(InMemorySegmentStore::new(memory_limit.map(|v| v as u64)))
345        } else {
346            None
347        };
348
349        Self {
350            store,
351            sql_catalog,
352            hnsw_cache: RwLock::new(HashMap::new()),
353            vector_cache: RwLock::new(None),
354            table_info_cache: RwLock::new(HashMap::new()),
355            table_info_cache_epoch: AtomicU64::new(0),
356            columnar_mode,
357            columnar_bridge,
358            columnar_memory,
359            segment_config,
360        }
361    }
362
363    fn load_sql_catalog(&mut self) -> Result<()> {
364        use alopex_sql::catalog::CatalogError;
365
366        let loaded = match alopex_sql::catalog::PersistentCatalog::load(self.store.clone()) {
367            Ok(catalog) => catalog,
368            Err(CatalogError::Kv(alopex_core::Error::NotFound)) => {
369                alopex_sql::catalog::PersistentCatalog::new(self.store.clone())
370            }
371            Err(other) => return Err(Error::Sql(other.into())),
372        };
373
374        self.sql_catalog = Arc::new(RwLock::new(loaded));
375        Ok(())
376    }
377
378    fn hnsw_cache_get(&self, name: &str) -> Option<Arc<HnswIndex>> {
379        let cache = self.hnsw_cache.read().expect("hnsw cache lock poisoned");
380        cache.get(name).cloned()
381    }
382
383    fn hnsw_cache_insert(&self, name: &str, index: HnswIndex) -> Arc<HnswIndex> {
384        let index = Arc::new(index);
385        let mut cache = self.hnsw_cache.write().expect("hnsw cache lock poisoned");
386        cache.insert(name.to_string(), Arc::clone(&index));
387        index
388    }
389
390    fn hnsw_cache_remove(&self, name: &str) {
391        let mut cache = self.hnsw_cache.write().expect("hnsw cache lock poisoned");
392        cache.remove(name);
393    }
394
395    /// Returns the current table info cache epoch.
396    pub fn table_info_cache_epoch(&self) -> u64 {
397        self.table_info_cache_epoch.load(Ordering::Relaxed)
398    }
399
400    /// Retrieves cached table info if available and epoch matches.
401    pub fn get_cached_table_info(
402        &self,
403        catalog_name: &str,
404        namespace_name: &str,
405        table_name: &str,
406    ) -> Option<CachedTableInfo> {
407        let cache = self
408            .table_info_cache
409            .read()
410            .expect("table info cache lock poisoned");
411        let key = format!("{}.{}.{}", catalog_name, namespace_name, table_name);
412        cache.get(&key).cloned()
413    }
414
415    /// Stores table info in the cache.
416    pub fn cache_table_info(
417        &self,
418        catalog_name: &str,
419        namespace_name: &str,
420        table_name: &str,
421        info: CachedTableInfo,
422    ) {
423        let mut cache = self
424            .table_info_cache
425            .write()
426            .expect("table info cache lock poisoned");
427        let key = format!("{}.{}.{}", catalog_name, namespace_name, table_name);
428        cache.insert(key, info);
429    }
430
431    /// Invalidates the table info cache (called on DDL operations).
432    pub fn invalidate_table_info_cache(&self) {
433        self.table_info_cache_epoch.fetch_add(1, Ordering::Relaxed);
434        let mut cache = self
435            .table_info_cache
436            .write()
437            .expect("table info cache lock poisoned");
438        cache.clear();
439    }
440
441    /// Flushes the current in-memory data to an SSTable on disk (beta).
442    pub fn flush(&self) -> Result<()> {
443        self.store.flush().map_err(Error::Core)
444    }
445
446    /// Returns the file format version supported by the embedded engine.
447    pub fn file_format_version(&self) -> alopex_core::storage::format::FileVersion {
448        use alopex_core::storage::format::FileVersion;
449
450        match self.store.as_ref() {
451            AnyKV::Memory(_) => FileVersion::CURRENT,
452            AnyKV::Lsm(kv) => read_file_version_from_storage(&kv.data_dir),
453            #[cfg(feature = "s3")]
454            AnyKV::S3(kv) => read_file_version_from_storage(kv.cache_dir()),
455        }
456    }
457
458    /// Returns current memory usage statistics (in-memory KV only).
459    pub fn memory_usage(&self) -> Option<MemoryStats> {
460        match self.store.as_ref() {
461            AnyKV::Memory(kv) => Some(kv.memory_stats()),
462            AnyKV::Lsm(_) => None,
463            #[cfg(feature = "s3")]
464            AnyKV::S3(_) => None,
465        }
466    }
467
468    /// Persists the current in-memory database to disk atomically.
469    ///
470    /// `wal_path` は「データディレクトリ」として扱う(file-mode)。
471    pub fn persist_to_disk(&self, wal_path: &Path) -> Result<()> {
472        if !matches!(self.store.as_ref(), AnyKV::Memory(_)) {
473            return Err(Error::NotInMemoryMode);
474        }
475        let data_dir = disk_data_dir_path(wal_path);
476        if wal_path.exists() || data_dir.exists() {
477            return Err(Error::Core(alopex_core::Error::PathExists(
478                wal_path.to_path_buf(),
479            )));
480        }
481
482        let tmp_dir = data_dir.with_extension("tmp");
483        if tmp_dir.exists() {
484            return Err(Error::Core(alopex_core::Error::PathExists(tmp_dir)));
485        }
486
487        let snapshot = self.snapshot_pairs()?;
488        let write_result = (|| -> Result<()> {
489            let store = StorageFactory::create(alopex_core::StorageMode::Disk {
490                path: tmp_dir.clone(),
491                config: None,
492            })
493            .map_err(Error::Core)?;
494
495            let mut txn = store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
496            for (key, value) in snapshot {
497                txn.put(key, value).map_err(Error::Core)?;
498            }
499            txn.commit_self().map_err(Error::Core)?;
500
501            Ok(())
502        })();
503
504        if let Err(e) = write_result {
505            let _ = fs::remove_dir_all(&tmp_dir);
506            return Err(e);
507        }
508
509        fs::rename(&tmp_dir, &data_dir).map_err(|e| Error::Core(e.into()))?;
510        if wal_path.extension().is_some_and(|e| e == "alopex") {
511            // `.alopex` パスを渡した場合は、存在確認用のマーカーを作る。
512            let _ = fs::OpenOptions::new()
513                .create_new(true)
514                .write(true)
515                .open(wal_path);
516        }
517        Ok(())
518    }
519
520    /// Creates a fully in-memory clone of the current database.
521    pub fn clone_to_memory(&self) -> Result<Self> {
522        let snapshot = self.snapshot_pairs()?;
523        let cloned = Database::open_in_memory()?;
524        if snapshot.is_empty() {
525            return Ok(cloned);
526        }
527
528        let mut txn = cloned.begin(TxnMode::ReadWrite)?;
529        for (key, value) in snapshot {
530            txn.put(&key, &value)?;
531        }
532        txn.commit()?;
533        Ok(cloned)
534    }
535
536    /// Clears all data while keeping the database usable.
537    pub fn clear(&self) -> Result<()> {
538        let keys: Vec<Key> = self.snapshot_pairs()?.into_iter().map(|(k, _)| k).collect();
539        if keys.is_empty() {
540            return Ok(());
541        }
542        let mut txn = self.begin(TxnMode::ReadWrite)?;
543        for key in keys {
544            txn.delete(&key)?;
545        }
546        txn.commit()
547    }
548
549    /// Updates the memory limit in bytes for the underlying in-memory store.
550    pub fn set_memory_limit(&self, bytes: Option<usize>) {
551        if let AnyKV::Memory(kv) = self.store.as_ref() {
552            kv.txn_manager().set_memory_limit(bytes);
553        }
554    }
555
556    /// Returns a read-only snapshot of all key-value pairs.
557    pub fn snapshot(&self) -> Vec<(Key, Vec<u8>)> {
558        self.snapshot_pairs().unwrap_or_default()
559    }
560
561    fn snapshot_pairs(&self) -> Result<Vec<(Key, Vec<u8>)>> {
562        let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
563        let pairs: Vec<(Key, Vec<u8>)> = txn.scan_prefix(b"").map_err(Error::Core)?.collect();
564        txn.commit_self().map_err(Error::Core)?;
565        Ok(pairs)
566    }
567
568    /// HNSW インデックスを作成し、永続化する。
569    pub fn create_hnsw_index(&self, name: &str, config: HnswConfig) -> Result<()> {
570        let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
571        let index = HnswIndex::create(name, config).map_err(Error::Core)?;
572        index.save(&mut txn).map_err(Error::Core)?;
573        txn.commit_self().map_err(Error::Core)?;
574        self.hnsw_cache_insert(name, index);
575        Ok(())
576    }
577
578    /// HNSW インデックスを削除する。
579    pub fn drop_hnsw_index(&self, name: &str) -> Result<()> {
580        let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
581        let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
582        index.drop(&mut txn).map_err(Error::Core)?;
583        txn.commit_self().map_err(Error::Core)?;
584        self.hnsw_cache_remove(name);
585        Ok(())
586    }
587
588    /// HNSW 統計情報を取得する。
589    pub fn get_hnsw_stats(&self, name: &str) -> Result<HnswStats> {
590        if let Some(index) = self.hnsw_cache_get(name) {
591            return Ok(index.stats());
592        }
593        let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
594        let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
595        let stats = index.stats();
596        self.hnsw_cache_insert(name, index);
597        Ok(stats)
598    }
599
600    /// HNSW インデックスをコンパクションし、結果を返す。
601    pub fn compact_hnsw_index(&self, name: &str) -> Result<alopex_core::vector::CompactionResult> {
602        let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
603        let mut index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
604        let result = index.compact().map_err(Error::Core)?;
605        index.save(&mut txn).map_err(Error::Core)?;
606        txn.commit_self().map_err(Error::Core)?;
607        self.hnsw_cache_insert(name, index);
608        Ok(result)
609    }
610
611    /// HNSW インデックスに検索を行う。
612    pub fn search_hnsw(
613        &self,
614        name: &str,
615        query: &[f32],
616        k: usize,
617        ef_search: Option<usize>,
618    ) -> Result<(Vec<HnswSearchResult>, HnswSearchStats)> {
619        let profile = std::env::var_os("ALOPEX_PROFILE_HNSW").is_some();
620        let total_start = if profile {
621            Some(std::time::Instant::now())
622        } else {
623            None
624        };
625        if let Some(index) = self.hnsw_cache_get(name) {
626            let search_start = if profile {
627                Some(std::time::Instant::now())
628            } else {
629                None
630            };
631            let result = index.search(query, k, ef_search).map_err(Error::Core)?;
632            if let (true, Some(total_start), Some(search_start)) =
633                (profile, total_start, search_start)
634            {
635                let search_time = search_start.elapsed();
636                let total_time = total_start.elapsed();
637                eprintln!(
638                    "alopex.hnsw_search cache=hit name={} k={} ef_search={:?} search_ms={:.2} total_ms={:.2}",
639                    name,
640                    k,
641                    ef_search,
642                    search_time.as_secs_f64() * 1000.0,
643                    total_time.as_secs_f64() * 1000.0
644                );
645            }
646            return Ok(result);
647        }
648
649        let load_start = if profile {
650            Some(std::time::Instant::now())
651        } else {
652            None
653        };
654        let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
655        let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
656        let load_time = load_start.map(|start| start.elapsed());
657        let index = self.hnsw_cache_insert(name, index);
658        let search_start = if profile {
659            Some(std::time::Instant::now())
660        } else {
661            None
662        };
663        let result = index.search(query, k, ef_search).map_err(Error::Core)?;
664        if let (true, Some(total_start), Some(search_start)) = (profile, total_start, search_start)
665        {
666            let search_time = search_start.elapsed();
667            let total_time = total_start.elapsed();
668            let load_time_ms = load_time
669                .map(|elapsed| elapsed.as_secs_f64() * 1000.0)
670                .unwrap_or(0.0);
671            eprintln!(
672                "alopex.hnsw_search cache=miss name={} k={} ef_search={:?} load_ms={:.2} search_ms={:.2} total_ms={:.2}",
673                name,
674                k,
675                ef_search,
676                load_time_ms,
677                search_time.as_secs_f64() * 1000.0,
678                total_time.as_secs_f64() * 1000.0
679            );
680        }
681        Ok(result)
682    }
683
684    /// Creates a chunked large value writer for opaque blobs (beta).
685    pub fn create_blob_writer(
686        &self,
687        path: &Path,
688        total_len: u64,
689        chunk_size: Option<u32>,
690    ) -> Result<LargeValueWriter> {
691        let meta = LargeValueMeta {
692            kind: LargeValueKind::Blob,
693            total_len,
694            chunk_size: chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE),
695        };
696        LargeValueWriter::create(path, meta).map_err(Error::Core)
697    }
698
699    /// Creates a chunked large value writer for typed payloads (beta).
700    pub fn create_typed_writer(
701        &self,
702        path: &Path,
703        type_id: u16,
704        total_len: u64,
705        chunk_size: Option<u32>,
706    ) -> Result<LargeValueWriter> {
707        let meta = LargeValueMeta {
708            kind: LargeValueKind::Typed(type_id),
709            total_len,
710            chunk_size: chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE),
711        };
712        LargeValueWriter::create(path, meta).map_err(Error::Core)
713    }
714
715    /// Opens a chunked large value reader (beta). Kind/type is read from the file header.
716    pub fn open_large_value(&self, path: &Path) -> Result<LargeValueReader> {
717        LargeValueReader::open(path).map_err(Error::Core)
718    }
719
720    /// Begins a new transaction.
721    pub fn begin(&self, mode: TxnMode) -> Result<Transaction<'_>> {
722        let txn = self.store.begin(mode).map_err(Error::Core)?;
723        Ok(Transaction {
724            inner: Some(txn),
725            db: self,
726            hnsw_indices: HashMap::new(),
727            overlay: alopex_sql::catalog::CatalogOverlay::new(),
728            vector_cache_updates: HashMap::new(),
729            vector_cache_deletes: Vec::new(),
730            vector_cache_invalidated: false,
731            catalog_modified: false,
732        })
733    }
734}
735
736impl Default for Database {
737    fn default() -> Self {
738        Self::new()
739    }
740}
741
742/// A database transaction.
743pub struct Transaction<'a> {
744    inner: Option<AnyKVTransaction<'a>>,
745    db: &'a Database,
746    hnsw_indices: HashMap<String, (HnswIndex, alopex_core::vector::hnsw::HnswTransactionState)>,
747    overlay: alopex_sql::catalog::CatalogOverlay,
748    vector_cache_updates: HashMap<Key, CachedVector>,
749    vector_cache_deletes: Vec<Key>,
750    vector_cache_invalidated: bool,
751    /// Whether DDL operations were performed in this transaction.
752    pub(crate) catalog_modified: bool,
753}
754
755/// A search result row containing key, metadata, and similarity score.
756#[derive(Debug, Clone, PartialEq)]
757pub struct SearchResult {
758    /// User key associated with the vector.
759    pub key: Key,
760    /// Opaque metadata payload stored alongside the vector.
761    pub metadata: Vec<u8>,
762    /// Similarity score for the query/vector pair.
763    pub score: f32,
764}
765
766const VECTOR_INDEX_KEY: &[u8] = b"__alopex_vector_index";
767
768impl<'a> Transaction<'a> {
769    pub(crate) fn catalog_overlay(&self) -> &alopex_sql::catalog::CatalogOverlay {
770        &self.overlay
771    }
772
773    pub(crate) fn catalog_overlay_mut(&mut self) -> &mut alopex_sql::catalog::CatalogOverlay {
774        &mut self.overlay
775    }
776
777    pub(crate) fn txn_mode(&self) -> Result<TxnMode> {
778        let txn = self.inner.as_ref().ok_or(Error::TxnCompleted)?;
779        Ok(txn.mode())
780    }
781    /// Retrieves the value for a given key.
782    pub fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
783        self.inner_mut()?.get(&key.to_vec()).map_err(Error::Core)
784    }
785
786    /// Sets a value for a given key.
787    pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
788        self.vector_cache_invalidated = true;
789        self.vector_cache_updates.clear();
790        self.vector_cache_deletes.clear();
791        self.inner_mut()?
792            .put(key.to_vec(), value.to_vec())
793            .map_err(Error::Core)
794    }
795
796    /// Deletes a key-value pair.
797    pub fn delete(&mut self, key: &[u8]) -> Result<()> {
798        self.vector_cache_deletes.push(key.to_vec());
799        self.inner_mut()?.delete(key.to_vec()).map_err(Error::Core)
800    }
801
802    /// Scans all key-value pairs whose keys start with the given prefix.
803    ///
804    /// Returns an iterator over (key, value) pairs.
805    pub fn scan_prefix(
806        &mut self,
807        prefix: &[u8],
808    ) -> Result<Box<dyn Iterator<Item = (Key, Vec<u8>)> + '_>> {
809        self.inner_mut()?.scan_prefix(prefix).map_err(Error::Core)
810    }
811
812    /// HNSW にベクトルをステージング挿入/更新する。
813    pub fn upsert_to_hnsw(
814        &mut self,
815        index_name: &str,
816        key: &[u8],
817        vector: &[f32],
818        metadata: &[u8],
819    ) -> Result<()> {
820        self.ensure_write_txn()?;
821        let (index, state) = self.hnsw_entry_mut(index_name)?;
822        index
823            .upsert_staged(key, vector, metadata, state)
824            .map_err(Error::Core)
825    }
826
827    /// HNSW からキーをステージング削除する。
828    pub fn delete_from_hnsw(&mut self, index_name: &str, key: &[u8]) -> Result<bool> {
829        self.ensure_write_txn()?;
830        let (index, state) = self.hnsw_entry_mut(index_name)?;
831        index.delete_staged(key, state).map_err(Error::Core)
832    }
833
834    /// Upserts a vector and metadata under the provided key after validating dimensions and metric.
835    ///
836    /// A small internal index is maintained to enable scanning for similarity search.
837    pub fn upsert_vector(
838        &mut self,
839        key: &[u8],
840        metadata: &[u8],
841        vector: &[f32],
842        metric: Metric,
843    ) -> Result<()> {
844        if vector.is_empty() {
845            return Err(Error::Core(alopex_core::Error::InvalidFormat(
846                "vector cannot be empty".into(),
847            )));
848        }
849        let vt = VectorType::new(vector.len(), metric);
850        vt.validate(vector).map_err(Error::Core)?;
851
852        let payload = encode_vector_entry(vt, metadata, vector);
853        let txn = self.inner_mut()?;
854        txn.put(key.to_vec(), payload).map_err(Error::Core)?;
855
856        let mut keys = self.load_vector_index()?;
857        if !keys.iter().any(|k| k == key) {
858            keys.push(key.to_vec());
859            self.persist_vector_index(&keys)?;
860        }
861
862        let cached = cached_vector_from_entry(metric, metadata.to_vec(), vector.to_vec());
863        self.vector_cache_updates.insert(key.to_vec(), cached);
864        self.vector_cache_deletes.retain(|k| k != key);
865        Ok(())
866    }
867
868    /// Retrieves a vector stored under the given key.
869    ///
870    /// Returns `None` if the key does not exist. If the key exists but has a different
871    /// metric than specified, returns an error.
872    pub fn get_vector(&mut self, key: &[u8], metric: Metric) -> Result<Option<Vec<f32>>> {
873        let txn = self.inner_mut()?;
874        let key_vec = key.to_vec();
875        let Some(raw) = txn.get(&key_vec).map_err(Error::Core)? else {
876            return Ok(None);
877        };
878        let decoded = decode_vector_entry(&raw).map_err(Error::Core)?;
879        if decoded.metric != metric {
880            return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
881                metric: metric.as_str().to_string(),
882            }));
883        }
884        Ok(Some(decoded.vector))
885    }
886
887    /// Retrieves vectors stored under the given keys in a single transaction call.
888    ///
889    /// Returned list order matches the input `keys` order. Each entry is:
890    /// - `None` if the key does not exist
891    /// - `Some(Vec<f32>)` if the key exists and metric matches
892    ///
893    /// If any existing entry has a different metric than specified, returns an error.
894    pub fn get_vectors(&mut self, keys: &[Key], metric: Metric) -> Result<Vec<Option<Vec<f32>>>> {
895        let txn = self.inner_mut()?;
896        let mut out = Vec::with_capacity(keys.len());
897        for key in keys {
898            let Some(raw) = txn.get(key).map_err(Error::Core)? else {
899                out.push(None);
900                continue;
901            };
902            let decoded = decode_vector_entry(&raw).map_err(Error::Core)?;
903            if decoded.metric != metric {
904                return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
905                    metric: metric.as_str().to_string(),
906                }));
907            }
908            out.push(Some(decoded.vector));
909        }
910        Ok(out)
911    }
912
913    /// Executes a flat similarity search over stored vectors using the provided metric and query.
914    ///
915    /// The optional `filter_keys` restricts the scan to the given keys; otherwise the full
916    /// vector index is scanned. Results are sorted by descending score and truncated to `top_k`.
917    pub fn search_similar(
918        &mut self,
919        query_vector: &[f32],
920        metric: Metric,
921        top_k: usize,
922        filter_keys: Option<&[Key]>,
923    ) -> Result<Vec<SearchResult>> {
924        if top_k == 0 {
925            return Ok(Vec::new());
926        }
927
928        let profile = std::env::var_os("ALOPEX_PROFILE_SEARCH_SIMILAR").is_some();
929        let total_start = if profile {
930            Some(std::time::Instant::now())
931        } else {
932            None
933        };
934        let query_norm_sq = query_vector.iter().map(|v| v * v).sum::<f32>();
935        let query_norm = if matches!(metric, Metric::Cosine) {
936            query_norm_sq.sqrt()
937        } else {
938            0.0
939        };
940        let inv_query_norm = if query_norm == 0.0 {
941            0.0
942        } else {
943            1.0 / query_norm
944        };
945
946        if filter_keys.is_none() && self.txn_mode()? == TxnMode::ReadOnly {
947            let cache = self
948                .db
949                .vector_cache
950                .read()
951                .expect("vector cache lock poisoned");
952            if let Some(cache) = cache.as_ref() {
953                if cache.is_empty() {
954                    return Ok(Vec::new());
955                }
956                let keys_len = cache.len();
957                let mut rows = Vec::with_capacity(keys_len);
958                let mut score_time = std::time::Duration::ZERO;
959                for (key, cached) in cache.iter() {
960                    if cached.metric != metric {
961                        return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
962                            metric: metric.as_str().to_string(),
963                        }));
964                    }
965                    validate_dimensions(cached.vector.len(), query_vector.len())
966                        .map_err(Error::Core)?;
967                    let score_start = if profile {
968                        Some(std::time::Instant::now())
969                    } else {
970                        None
971                    };
972                    let dot = dot_product(query_vector, &cached.vector);
973                    let score = match metric {
974                        Metric::Cosine => {
975                            if cached.inv_norm == 0.0 || inv_query_norm == 0.0 {
976                                0.0
977                            } else {
978                                dot * cached.inv_norm * inv_query_norm
979                            }
980                        }
981                        Metric::L2 => {
982                            let dist_sq = query_norm_sq + cached.norm_sq - 2.0 * dot;
983                            -dist_sq.sqrt()
984                        }
985                        Metric::InnerProduct => dot,
986                    };
987                    if let Some(score_start) = score_start {
988                        score_time += score_start.elapsed();
989                    }
990                    rows.push(SearchResult {
991                        key: key.clone(),
992                        metadata: cached.metadata.clone(),
993                        score,
994                    });
995                }
996
997                let rows_total = rows.len();
998                let sort_start = if profile {
999                    Some(std::time::Instant::now())
1000                } else {
1001                    None
1002                };
1003                if rows.len() > top_k {
1004                    rows.select_nth_unstable_by(top_k - 1, |a, b| {
1005                        b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key))
1006                    });
1007                    rows.truncate(top_k);
1008                }
1009                rows.sort_by(|a, b| b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key)));
1010                if let (true, Some(total_start), Some(sort_start)) =
1011                    (profile, total_start, sort_start)
1012                {
1013                    let sort_time = sort_start.elapsed();
1014                    let total_time = total_start.elapsed();
1015                    eprintln!(
1016                        "alopex.search_similar keys={} results={} top_k={} load_keys_ms={:.2} get_ms={:.2} decode_ms={:.2} score_ms={:.2} sort_ms={:.2} total_ms={:.2}",
1017                        keys_len,
1018                        rows_total,
1019                        top_k,
1020                        0.0,
1021                        0.0,
1022                        0.0,
1023                        score_time.as_secs_f64() * 1000.0,
1024                        sort_time.as_secs_f64() * 1000.0,
1025                        total_time.as_secs_f64() * 1000.0
1026                    );
1027                }
1028                return Ok(rows);
1029            }
1030        }
1031
1032        let (keys, load_keys_time) = if profile {
1033            let start = std::time::Instant::now();
1034            let keys = match filter_keys {
1035                Some(keys) => keys.to_vec(),
1036                None => self.load_vector_index()?,
1037            };
1038            (keys, start.elapsed())
1039        } else {
1040            let keys = match filter_keys {
1041                Some(keys) => keys.to_vec(),
1042                None => self.load_vector_index()?,
1043            };
1044            (keys, std::time::Duration::ZERO)
1045        };
1046        if keys.is_empty() {
1047            return Ok(Vec::new());
1048        }
1049
1050        let keys_len = keys.len();
1051        let mut rows = Vec::with_capacity(keys.len());
1052        let txn = self.inner_mut()?;
1053        let mut get_time = std::time::Duration::ZERO;
1054        let mut decode_time = std::time::Duration::ZERO;
1055        let mut score_time = std::time::Duration::ZERO;
1056        if profile {
1057            for key in keys {
1058                let get_start = std::time::Instant::now();
1059                let Some(raw) = txn.get(&key).map_err(Error::Core)? else {
1060                    get_time += get_start.elapsed();
1061                    continue;
1062                };
1063                get_time += get_start.elapsed();
1064                let decode_start = std::time::Instant::now();
1065                let decoded = decode_vector_entry_view(&raw).map_err(Error::Core)?;
1066                decode_time += decode_start.elapsed();
1067                if decoded.metric != metric {
1068                    return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
1069                        metric: metric.as_str().to_string(),
1070                    }));
1071                }
1072                validate_dimensions(decoded.dim, query_vector.len()).map_err(Error::Core)?;
1073                let score_start = std::time::Instant::now();
1074                let score =
1075                    score_from_bytes(metric, query_vector, query_norm, decoded.vector_bytes)?;
1076                score_time += score_start.elapsed();
1077                rows.push(SearchResult {
1078                    key,
1079                    metadata: decoded.metadata,
1080                    score,
1081                });
1082            }
1083        } else {
1084            for key in keys {
1085                let Some(raw) = txn.get(&key).map_err(Error::Core)? else {
1086                    continue;
1087                };
1088                let decoded = decode_vector_entry_view(&raw).map_err(Error::Core)?;
1089                if decoded.metric != metric {
1090                    return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
1091                        metric: metric.as_str().to_string(),
1092                    }));
1093                }
1094                validate_dimensions(decoded.dim, query_vector.len()).map_err(Error::Core)?;
1095                let score =
1096                    score_from_bytes(metric, query_vector, query_norm, decoded.vector_bytes)?;
1097                rows.push(SearchResult {
1098                    key,
1099                    metadata: decoded.metadata,
1100                    score,
1101                });
1102            }
1103        }
1104
1105        let rows_total = rows.len();
1106        let sort_start = if profile {
1107            Some(std::time::Instant::now())
1108        } else {
1109            None
1110        };
1111        if rows.len() > top_k {
1112            rows.select_nth_unstable_by(top_k - 1, |a, b| {
1113                b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key))
1114            });
1115            rows.truncate(top_k);
1116        }
1117        rows.sort_by(|a, b| b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key)));
1118        if let (true, Some(total_start), Some(sort_start)) = (profile, total_start, sort_start) {
1119            let sort_time = sort_start.elapsed();
1120            let total_time = total_start.elapsed();
1121            eprintln!(
1122                "alopex.search_similar keys={} results={} top_k={} load_keys_ms={:.2} get_ms={:.2} decode_ms={:.2} score_ms={:.2} sort_ms={:.2} total_ms={:.2}",
1123                keys_len,
1124                rows_total,
1125                top_k,
1126                load_keys_time.as_secs_f64() * 1000.0,
1127                get_time.as_secs_f64() * 1000.0,
1128                decode_time.as_secs_f64() * 1000.0,
1129                score_time.as_secs_f64() * 1000.0,
1130                sort_time.as_secs_f64() * 1000.0,
1131                total_time.as_secs_f64() * 1000.0
1132            );
1133        }
1134        Ok(rows)
1135    }
1136
1137    fn load_vector_index(&mut self) -> Result<Vec<Key>> {
1138        let txn = self.inner_mut()?;
1139        let Some(raw) = txn.get(&VECTOR_INDEX_KEY.to_vec()).map_err(Error::Core)? else {
1140            return Ok(Vec::new());
1141        };
1142        decode_index(&raw).map_err(Error::Core)
1143    }
1144
1145    fn persist_vector_index(&mut self, keys: &[Key]) -> Result<()> {
1146        let txn = self.inner_mut()?;
1147        let encoded = encode_index(keys)?;
1148        txn.put(VECTOR_INDEX_KEY.to_vec(), encoded)
1149            .map_err(Error::Core)
1150    }
1151
1152    /// Commits the transaction, applying all changes.
1153    pub fn commit(mut self) -> Result<()> {
1154        {
1155            let txn = self.inner.as_mut().ok_or(Error::TxnCompleted)?;
1156            for (_, (index, state)) in self.hnsw_indices.iter_mut() {
1157                index.commit_staged(txn, state).map_err(Error::Core)?;
1158            }
1159            let mut catalog = self.db.sql_catalog.write().expect("catalog lock poisoned");
1160            catalog
1161                .persist_overlay(txn, &self.overlay)
1162                .map_err(|err| Error::Sql(err.into()))?;
1163
1164            let vector_cache_invalidated = self.vector_cache_invalidated;
1165            let vector_cache_updates = std::mem::take(&mut self.vector_cache_updates);
1166            let vector_cache_deletes = std::mem::take(&mut self.vector_cache_deletes);
1167            if vector_cache_invalidated {
1168                let mut cache = self
1169                    .db
1170                    .vector_cache
1171                    .write()
1172                    .expect("vector cache lock poisoned");
1173                *cache = None;
1174            } else if !vector_cache_updates.is_empty() || !vector_cache_deletes.is_empty() {
1175                let needs_rebuild = {
1176                    let cache = self
1177                        .db
1178                        .vector_cache
1179                        .read()
1180                        .expect("vector cache lock poisoned");
1181                    cache.is_none()
1182                };
1183                if needs_rebuild {
1184                    let rebuilt = build_vector_cache_from_txn(txn).map_err(Error::Core)?;
1185                    let mut cache = self
1186                        .db
1187                        .vector_cache
1188                        .write()
1189                        .expect("vector cache lock poisoned");
1190                    *cache = Some(rebuilt);
1191                } else {
1192                    let mut cache = self
1193                        .db
1194                        .vector_cache
1195                        .write()
1196                        .expect("vector cache lock poisoned");
1197                    if let Some(cache) = cache.as_mut() {
1198                        for key in vector_cache_deletes {
1199                            cache.remove(&key);
1200                        }
1201                        for (key, cached) in vector_cache_updates {
1202                            cache.insert(key, cached);
1203                        }
1204                    }
1205                }
1206            }
1207        }
1208        let txn = self.inner.take().ok_or(Error::TxnCompleted)?;
1209        let hnsw_indices = std::mem::take(&mut self.hnsw_indices);
1210        txn.commit_self().map_err(Error::Core)?;
1211        if !hnsw_indices.is_empty() {
1212            let mut cache = self
1213                .db
1214                .hnsw_cache
1215                .write()
1216                .expect("hnsw cache lock poisoned");
1217            for (name, (index, _state)) in hnsw_indices {
1218                cache.insert(name, Arc::new(index));
1219            }
1220        }
1221
1222        // KV commit 成功後のみ、カタログにオーバーレイを適用する。
1223        let overlay = std::mem::take(&mut self.overlay);
1224        let catalog_modified = self.catalog_modified;
1225        let mut catalog = self.db.sql_catalog.write().expect("catalog lock poisoned");
1226        catalog.apply_overlay(overlay);
1227        drop(catalog); // Release lock before invalidating cache
1228                       // Invalidate table info cache only if DDL operations were performed
1229        if catalog_modified {
1230            self.db.invalidate_table_info_cache();
1231        }
1232        Ok(())
1233    }
1234
1235    /// トランザクションを消費せずにロールバックする(失敗時の再試行を可能にする)。
1236    pub fn rollback_in_place(&mut self) -> Result<()> {
1237        let txn = self.inner.as_mut().ok_or(Error::TxnCompleted)?;
1238        txn.rollback_in_place().map_err(Error::Core)?;
1239        for (_, (index, state)) in self.hnsw_indices.iter_mut() {
1240            let _ = index.rollback(state);
1241        }
1242        self.hnsw_indices.clear();
1243        self.overlay = alopex_sql::catalog::CatalogOverlay::default();
1244        self.inner = None;
1245        Ok(())
1246    }
1247
1248    /// Rolls back the transaction, discarding all changes.
1249    pub fn rollback(mut self) -> Result<()> {
1250        if let Some(txn) = self.inner.take() {
1251            for (_, (index, state)) in self.hnsw_indices.iter_mut() {
1252                let _ = index.rollback(state);
1253            }
1254            self.hnsw_indices.clear();
1255            txn.rollback_self().map_err(Error::Core)
1256        } else {
1257            Err(Error::TxnCompleted)
1258        }
1259    }
1260
1261    fn inner_mut(&mut self) -> Result<&mut AnyKVTransaction<'a>> {
1262        self.inner.as_mut().ok_or(Error::TxnCompleted)
1263    }
1264
1265    fn hnsw_entry_mut(&mut self, name: &str) -> Result<&mut (HnswIndex, HnswTransactionState)> {
1266        if !self.hnsw_indices.contains_key(name) {
1267            let index = {
1268                let txn = self.inner_mut()?;
1269                HnswIndex::load(name, txn).map_err(Error::Core)?
1270            };
1271            self.hnsw_indices
1272                .insert(name.to_string(), (index, HnswTransactionState::default()));
1273        }
1274        Ok(self.hnsw_indices.get_mut(name).unwrap())
1275    }
1276
1277    fn ensure_write_txn(&self) -> Result<()> {
1278        let txn = self.inner.as_ref().ok_or(Error::TxnCompleted)?;
1279        if txn.mode() != TxnMode::ReadWrite {
1280            return Err(Error::Core(alopex_core::Error::TxnReadOnly));
1281        }
1282        Ok(())
1283    }
1284}
1285
1286impl<'a> Drop for Transaction<'a> {
1287    fn drop(&mut self) {
1288        if let Some(txn) = self.inner.take() {
1289            for (_, (index, state)) in self.hnsw_indices.iter_mut() {
1290                let _ = index.rollback(state);
1291            }
1292            self.hnsw_indices.clear();
1293            let _ = txn.rollback_self();
1294        }
1295    }
1296}
1297
1298fn metric_to_byte(metric: Metric) -> u8 {
1299    match metric {
1300        Metric::Cosine => 0,
1301        Metric::L2 => 1,
1302        Metric::InnerProduct => 2,
1303    }
1304}
1305
1306fn byte_to_metric(byte: u8) -> result::Result<Metric, alopex_core::Error> {
1307    match byte {
1308        0 => Ok(Metric::Cosine),
1309        1 => Ok(Metric::L2),
1310        2 => Ok(Metric::InnerProduct),
1311        other => Err(alopex_core::Error::UnsupportedMetric {
1312            metric: format!("unknown({other})"),
1313        }),
1314    }
1315}
1316
1317fn encode_vector_entry(vector_type: VectorType, metadata: &[u8], vector: &[f32]) -> Vec<u8> {
1318    let dim = vector_type.dim() as u32;
1319    let meta_len = metadata.len() as u32;
1320    let mut buf = Vec::with_capacity(1 + 4 + 4 + metadata.len() + std::mem::size_of_val(vector));
1321    buf.push(metric_to_byte(vector_type.metric()));
1322    buf.extend_from_slice(&dim.to_le_bytes());
1323    buf.extend_from_slice(&meta_len.to_le_bytes());
1324    buf.extend_from_slice(metadata);
1325    for v in vector {
1326        buf.extend_from_slice(&v.to_le_bytes());
1327    }
1328    buf
1329}
1330
1331struct DecodedEntry {
1332    metric: Metric,
1333    vector: Vec<f32>,
1334}
1335
1336#[derive(Clone)]
1337struct CachedVector {
1338    metric: Metric,
1339    metadata: Vec<u8>,
1340    vector: Vec<f32>,
1341    norm_sq: f32,
1342    inv_norm: f32,
1343}
1344
1345struct VectorEntryView<'a> {
1346    metric: Metric,
1347    dim: usize,
1348    metadata: Vec<u8>,
1349    vector_bytes: &'a [u8],
1350}
1351
1352fn decode_vector_entry(bytes: &[u8]) -> result::Result<DecodedEntry, alopex_core::Error> {
1353    if bytes.len() < 9 {
1354        return Err(alopex_core::Error::InvalidFormat(
1355            "vector entry too short".into(),
1356        ));
1357    }
1358    let metric = byte_to_metric(bytes[0])?;
1359    let dim = u32::from_le_bytes(bytes[1..5].try_into().unwrap()) as usize;
1360    let meta_len = u32::from_le_bytes(bytes[5..9].try_into().unwrap()) as usize;
1361
1362    let header = 9;
1363    let expected_len = header + meta_len + dim * std::mem::size_of::<f32>();
1364    if bytes.len() < expected_len {
1365        return Err(alopex_core::Error::InvalidFormat(
1366            "vector entry truncated".into(),
1367        ));
1368    }
1369
1370    let mut vector = Vec::with_capacity(dim);
1371    let vec_bytes = &bytes[header + meta_len..expected_len];
1372    for chunk in vec_bytes.chunks_exact(4) {
1373        vector.push(f32::from_le_bytes(chunk.try_into().unwrap()));
1374    }
1375
1376    Ok(DecodedEntry { metric, vector })
1377}
1378
1379fn decode_vector_entry_view(
1380    bytes: &[u8],
1381) -> result::Result<VectorEntryView<'_>, alopex_core::Error> {
1382    if bytes.len() < 9 {
1383        return Err(alopex_core::Error::InvalidFormat(
1384            "vector entry too short".into(),
1385        ));
1386    }
1387    let metric = byte_to_metric(bytes[0])?;
1388    let dim = u32::from_le_bytes(bytes[1..5].try_into().unwrap()) as usize;
1389    let meta_len = u32::from_le_bytes(bytes[5..9].try_into().unwrap()) as usize;
1390
1391    let header = 9;
1392    let expected_len = header + meta_len + dim * std::mem::size_of::<f32>();
1393    if bytes.len() < expected_len {
1394        return Err(alopex_core::Error::InvalidFormat(
1395            "vector entry truncated".into(),
1396        ));
1397    }
1398
1399    let metadata = bytes[header..header + meta_len].to_vec();
1400    let vector_bytes = &bytes[header + meta_len..expected_len];
1401
1402    Ok(VectorEntryView {
1403        metric,
1404        dim,
1405        metadata,
1406        vector_bytes,
1407    })
1408}
1409
1410fn vector_bytes_to_vec(bytes: &[u8]) -> Vec<f32> {
1411    let mut vector = Vec::with_capacity(bytes.len() / 4);
1412    for chunk in bytes.chunks_exact(4) {
1413        vector.push(f32::from_le_bytes(chunk.try_into().unwrap()));
1414    }
1415    vector
1416}
1417
1418fn cached_vector_from_entry(metric: Metric, metadata: Vec<u8>, vector: Vec<f32>) -> CachedVector {
1419    let norm_sq = vector.iter().map(|v| v * v).sum::<f32>();
1420    let inv_norm = if norm_sq == 0.0 {
1421        0.0
1422    } else {
1423        1.0 / norm_sq.sqrt()
1424    };
1425    CachedVector {
1426        metric,
1427        metadata,
1428        vector,
1429        norm_sq,
1430        inv_norm,
1431    }
1432}
1433
1434fn build_vector_cache_from_txn<'a>(
1435    txn: &mut AnyKVTransaction<'a>,
1436) -> result::Result<HashMap<Key, CachedVector>, alopex_core::Error> {
1437    let Some(raw) = txn.get(&VECTOR_INDEX_KEY.to_vec())? else {
1438        return Ok(HashMap::new());
1439    };
1440    let keys = decode_index(&raw)?;
1441    let mut cache = HashMap::with_capacity(keys.len());
1442    for key in keys {
1443        let Some(raw) = txn.get(&key)? else {
1444            continue;
1445        };
1446        let decoded = decode_vector_entry_view(&raw)?;
1447        let vector = vector_bytes_to_vec(decoded.vector_bytes);
1448        let cached = cached_vector_from_entry(decoded.metric, decoded.metadata, vector);
1449        cache.insert(key, cached);
1450    }
1451    Ok(cache)
1452}
1453
1454fn dot_product(query: &[f32], item: &[f32]) -> f32 {
1455    #[cfg(target_arch = "x86_64")]
1456    {
1457        if std::is_x86_feature_detected!("avx") {
1458            // SAFETY: guarded by runtime feature detection.
1459            unsafe {
1460                return dot_product_avx(query, item);
1461            }
1462        }
1463    }
1464    dot_product_scalar(query, item)
1465}
1466
1467fn dot_product_scalar(query: &[f32], item: &[f32]) -> f32 {
1468    query.iter().zip(item.iter()).map(|(q, v)| q * v).sum()
1469}
1470
1471#[cfg(target_arch = "x86_64")]
1472#[target_feature(enable = "avx")]
1473unsafe fn dot_product_avx(query: &[f32], item: &[f32]) -> f32 {
1474    use std::arch::x86_64::*;
1475
1476    let len = query.len();
1477    let mut i = 0;
1478    let mut acc = _mm256_setzero_ps();
1479    let q_ptr = query.as_ptr();
1480    let v_ptr = item.as_ptr();
1481    while i + 8 <= len {
1482        let q = _mm256_loadu_ps(q_ptr.add(i));
1483        let v = _mm256_loadu_ps(v_ptr.add(i));
1484        acc = _mm256_add_ps(acc, _mm256_mul_ps(q, v));
1485        i += 8;
1486    }
1487
1488    let mut tmp = [0f32; 8];
1489    _mm256_storeu_ps(tmp.as_mut_ptr(), acc);
1490    let mut sum = tmp.iter().sum::<f32>();
1491    while i < len {
1492        sum += *q_ptr.add(i) * *v_ptr.add(i);
1493        i += 1;
1494    }
1495    sum
1496}
1497
1498fn score_from_slice(metric: Metric, query: &[f32], query_norm: f32, item: &[f32]) -> f32 {
1499    match metric {
1500        Metric::Cosine => {
1501            if query_norm == 0.0 {
1502                return 0.0;
1503            }
1504            let mut dot = 0.0;
1505            let mut item_norm_sq = 0.0;
1506            for (q, v) in query.iter().zip(item.iter()) {
1507                dot += q * v;
1508                item_norm_sq += v * v;
1509            }
1510            let item_norm = item_norm_sq.sqrt();
1511            if item_norm == 0.0 {
1512                0.0
1513            } else {
1514                dot / (query_norm * item_norm)
1515            }
1516        }
1517        Metric::L2 => {
1518            let mut dist_sq = 0.0;
1519            for (q, v) in query.iter().zip(item.iter()) {
1520                let d = q - v;
1521                dist_sq += d * d;
1522            }
1523            -dist_sq.sqrt()
1524        }
1525        Metric::InnerProduct => query.iter().zip(item.iter()).map(|(q, v)| q * v).sum(),
1526    }
1527}
1528
1529fn score_from_bytes(
1530    metric: Metric,
1531    query: &[f32],
1532    query_norm: f32,
1533    vector_bytes: &[u8],
1534) -> result::Result<f32, alopex_core::Error> {
1535    let len = vector_bytes.len() / 4;
1536    #[cfg(target_endian = "little")]
1537    {
1538        let ptr = vector_bytes.as_ptr();
1539        if (ptr as usize).is_multiple_of(std::mem::align_of::<f32>()) {
1540            let items = unsafe { std::slice::from_raw_parts(ptr as *const f32, len) };
1541            return Ok(score_from_slice(metric, query, query_norm, items));
1542        }
1543    }
1544
1545    // Fallback for unaligned or big-endian targets: decode each f32 from bytes.
1546    let mut iter = vector_bytes.chunks_exact(4);
1547    let score = match metric {
1548        Metric::Cosine => {
1549            if query_norm == 0.0 {
1550                0.0
1551            } else {
1552                let mut dot = 0.0;
1553                let mut item_norm_sq = 0.0;
1554                for (q, chunk) in query.iter().zip(&mut iter) {
1555                    let v = f32::from_le_bytes(chunk.try_into().unwrap());
1556                    dot += q * v;
1557                    item_norm_sq += v * v;
1558                }
1559                let item_norm = item_norm_sq.sqrt();
1560                if item_norm == 0.0 {
1561                    0.0
1562                } else {
1563                    dot / (query_norm * item_norm)
1564                }
1565            }
1566        }
1567        Metric::L2 => {
1568            let mut dist_sq = 0.0;
1569            for (q, chunk) in query.iter().zip(&mut iter) {
1570                let v = f32::from_le_bytes(chunk.try_into().unwrap());
1571                let d = q - v;
1572                dist_sq += d * d;
1573            }
1574            -dist_sq.sqrt()
1575        }
1576        Metric::InnerProduct => query
1577            .iter()
1578            .zip(&mut iter)
1579            .map(|(q, chunk)| q * f32::from_le_bytes(chunk.try_into().unwrap()))
1580            .sum(),
1581    };
1582    Ok(score)
1583}
1584
1585fn encode_index(keys: &[Key]) -> result::Result<Vec<u8>, alopex_core::Error> {
1586    let mut buf = Vec::new();
1587    let count = keys.len() as u32;
1588    buf.extend_from_slice(&count.to_le_bytes());
1589    for key in keys {
1590        let len: u32 = key
1591            .len()
1592            .try_into()
1593            .map_err(|_| alopex_core::Error::InvalidFormat("key too long".into()))?;
1594        buf.extend_from_slice(&len.to_le_bytes());
1595        buf.extend_from_slice(key);
1596    }
1597    Ok(buf)
1598}
1599
1600fn decode_index(bytes: &[u8]) -> result::Result<Vec<Key>, alopex_core::Error> {
1601    if bytes.len() < 4 {
1602        return Err(alopex_core::Error::InvalidFormat("index too short".into()));
1603    }
1604    let count = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
1605    let mut pos = 4;
1606    let mut keys = Vec::with_capacity(count);
1607    for _ in 0..count {
1608        if pos + 4 > bytes.len() {
1609            return Err(alopex_core::Error::InvalidFormat("index truncated".into()));
1610        }
1611        let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
1612        pos += 4;
1613        if pos + len > bytes.len() {
1614            return Err(alopex_core::Error::InvalidFormat(
1615                "index key truncated".into(),
1616            ));
1617        }
1618        keys.push(bytes[pos..pos + len].to_vec());
1619        pos += len;
1620    }
1621    Ok(keys)
1622}
1623
1624#[cfg(test)]
1625mod tests {
1626    use super::*;
1627    use std::sync::mpsc;
1628    use std::thread;
1629    use tempfile::tempdir;
1630
1631    #[test]
1632    fn test_open_and_crud() {
1633        let dir = tempdir().unwrap();
1634        let path = dir.path().join("test.db");
1635        let db = Database::open(&path).unwrap();
1636
1637        let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1638        txn.put(b"key1", b"value1").unwrap();
1639        txn.commit().unwrap();
1640
1641        let mut txn2 = db.begin(TxnMode::ReadOnly).unwrap();
1642        let val = txn2.get(b"key1").unwrap();
1643        assert_eq!(val, Some(b"value1".to_vec()));
1644    }
1645
1646    #[test]
1647    fn test_not_found() {
1648        let db = Database::new();
1649        let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
1650        let val = txn.get(b"non-existent-key").unwrap();
1651        assert!(val.is_none());
1652    }
1653
1654    #[cfg(not(target_arch = "wasm32"))]
1655    #[test]
1656    fn test_file_format_version_reads_alopex_header() {
1657        use alopex_core::storage::format::{AlopexFileWriter, FileFlags, FileVersion};
1658
1659        let dir = tempdir().unwrap();
1660        let path = dir.path().join("format-test.alopex");
1661        let expected = FileVersion::new(0, 0, 1);
1662
1663        let writer = AlopexFileWriter::new(path.clone(), expected, FileFlags(0)).unwrap();
1664        writer.finalize().unwrap();
1665
1666        let db = Database::open(&path).unwrap();
1667        assert_eq!(db.file_format_version(), expected);
1668    }
1669
1670    #[test]
1671    fn test_crash_recovery_replays_wal() {
1672        let dir = tempdir().unwrap();
1673        let path = dir.path().join("replay.db");
1674
1675        {
1676            let db = Database::open(&path).unwrap();
1677            let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1678            txn.put(b"k1", b"v1").unwrap();
1679            txn.commit().unwrap();
1680
1681            let mut uncommitted = db.begin(TxnMode::ReadWrite).unwrap();
1682            uncommitted.put(b"k2", b"v2").unwrap();
1683            // Drop without commit to simulate crash before commit.
1684        }
1685
1686        let db = Database::open(&path).unwrap();
1687        let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
1688        assert_eq!(txn.get(b"k1").unwrap(), Some(b"v1".to_vec()));
1689        assert_eq!(txn.get(b"k2").unwrap(), None);
1690    }
1691
1692    #[test]
1693    fn test_txn_closed() {
1694        let db = Database::new();
1695        let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1696        txn.put(b"k1", b"v1").unwrap();
1697        txn.commit().unwrap();
1698        // The `commit` call consumes the transaction, so we can't call it again.
1699        // This test verifies that we can't use a transaction after it's been completed.
1700        // The `inner_mut` method will return `Error::TxnCompleted`.
1701        // This is a compile-time check in practice, but we can't write a test that fails to compile.
1702        // The logic is sound.
1703    }
1704
1705    #[test]
1706    fn test_concurrency_conflict() {
1707        let db = std::sync::Arc::new(Database::new());
1708        let mut t0 = db.begin(TxnMode::ReadWrite).unwrap();
1709        t0.put(b"k1", b"v0").unwrap();
1710        t0.commit().unwrap();
1711
1712        let (tx1, rx1) = mpsc::channel();
1713        let (tx2, rx2) = mpsc::channel();
1714
1715        let db1 = db.clone();
1716        let t1 = thread::spawn(move || {
1717            let mut txn1 = db1.begin(TxnMode::ReadWrite).unwrap();
1718            let val = txn1.get(b"k1").unwrap();
1719            assert_eq!(val.unwrap(), b"v0");
1720            tx1.send(()).unwrap();
1721            rx2.recv().unwrap();
1722            txn1.put(b"k1", b"v1").unwrap();
1723            let result = txn1.commit();
1724            assert!(matches!(
1725                result,
1726                Err(Error::Core(alopex_core::Error::TxnConflict))
1727            ));
1728        });
1729
1730        let db2 = db.clone();
1731        let t2 = thread::spawn(move || {
1732            rx1.recv().unwrap();
1733            let mut txn2 = db2.begin(TxnMode::ReadWrite).unwrap();
1734            txn2.put(b"k1", b"v2").unwrap();
1735            assert!(txn2.commit().is_ok());
1736            tx2.send(()).unwrap();
1737        });
1738
1739        t1.join().unwrap();
1740        t2.join().unwrap();
1741
1742        let mut txn3 = db.begin(TxnMode::ReadOnly).unwrap();
1743        let val = txn3.get(b"k1").unwrap();
1744        assert_eq!(val.unwrap(), b"v2");
1745    }
1746
1747    #[test]
1748    fn test_flush_and_reopen_via_embedded_api() {
1749        let dir = tempdir().unwrap();
1750        let path = dir.path().join("persist.db");
1751        {
1752            let db = Database::open(&path).unwrap();
1753            let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1754            txn.put(b"k1", b"v1").unwrap();
1755            txn.commit().unwrap();
1756            db.flush().unwrap();
1757        }
1758
1759        let db = Database::open(&path).unwrap();
1760        let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
1761        assert_eq!(txn.get(b"k1").unwrap(), Some(b"v1".to_vec()));
1762    }
1763
1764    #[test]
1765    fn test_large_value_blob_roundtrip() {
1766        let dir = tempdir().unwrap();
1767        let path = dir.path().join("blob.lv");
1768        let payload = b"hello large value";
1769
1770        {
1771            let db = Database::new();
1772            let mut writer = db
1773                .create_blob_writer(&path, payload.len() as u64, Some(16))
1774                .unwrap();
1775            writer.write_chunk(&payload[..5]).unwrap();
1776            writer.write_chunk(&payload[5..]).unwrap();
1777            writer.finish().unwrap();
1778        }
1779
1780        let db = Database::new();
1781        let mut reader = db.open_large_value(&path).unwrap();
1782        let mut buf = Vec::new();
1783        while let Some((_info, chunk)) = reader.next_chunk().unwrap() {
1784            buf.extend_from_slice(&chunk);
1785        }
1786        assert_eq!(buf, payload);
1787    }
1788
1789    #[test]
1790    fn upsert_and_search_same_txn() {
1791        let db = Database::new();
1792        let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1793        txn.upsert_vector(b"k1", b"meta1", &[1.0, 0.0], Metric::Cosine)
1794            .unwrap();
1795
1796        let results = txn
1797            .search_similar(&[1.0, 0.0], Metric::Cosine, 1, None)
1798            .unwrap();
1799        assert_eq!(results.len(), 1);
1800        assert_eq!(results[0].key, b"k1");
1801        assert_eq!(results[0].metadata, b"meta1");
1802        txn.commit().unwrap();
1803    }
1804
1805    #[test]
1806    fn upsert_and_search_across_txn() {
1807        let db = Database::new();
1808        {
1809            let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1810            txn.upsert_vector(b"k1", b"meta1", &[1.0, 1.0], Metric::Cosine)
1811                .unwrap();
1812            txn.commit().unwrap();
1813        }
1814
1815        let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
1816        let results = ro
1817            .search_similar(&[1.0, 1.0], Metric::Cosine, 1, None)
1818            .unwrap();
1819        assert_eq!(results.len(), 1);
1820        assert_eq!(results[0].key, b"k1");
1821    }
1822
1823    #[test]
1824    fn read_only_upsert_rejected() {
1825        let db = Database::new();
1826        let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
1827        let err = ro
1828            .upsert_vector(b"k1", b"m", &[1.0, 0.0], Metric::Cosine)
1829            .unwrap_err();
1830        assert!(matches!(err, Error::Core(alopex_core::Error::TxnReadOnly)));
1831    }
1832
1833    #[test]
1834    fn dimension_mismatch_on_search() {
1835        let db = Database::new();
1836        {
1837            let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1838            txn.upsert_vector(b"k1", b"m", &[1.0, 0.0], Metric::Cosine)
1839                .unwrap();
1840            txn.commit().unwrap();
1841        }
1842        let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
1843        let err = ro
1844            .search_similar(&[1.0, 0.0, 1.0], Metric::Cosine, 1, None)
1845            .unwrap_err();
1846        assert!(matches!(
1847            err,
1848            Error::Core(alopex_core::Error::DimensionMismatch { .. })
1849        ));
1850    }
1851}