alopex_embedded/
lib.rs

1//! A user-friendly embedded API for the AlopexDB key-value store.
2
3#![deny(missing_docs)]
4
5pub mod columnar_api;
6pub mod options;
7
8pub use crate::columnar_api::{EmbeddedConfig, StorageMode};
9pub use crate::options::DatabaseOptions;
10use alopex_core::vector::hnsw::{HnswTransactionState, SearchStats as HnswSearchStats};
11use alopex_core::{
12    columnar::{
13        kvs_bridge::ColumnarKvsBridge, memory::InMemorySegmentStore, segment_v2::SegmentConfigV2,
14    },
15    kv::any::AnyKVTransaction,
16    kv::memory::MemoryKV,
17    kv::AnyKV,
18    score, validate_dimensions, HnswConfig, HnswIndex, HnswSearchResult, HnswStats, KVStore,
19    KVTransaction, Key, LargeValueKind, LargeValueMeta, LargeValueReader, LargeValueWriter,
20    StorageFactory, VectorType, DEFAULT_CHUNK_SIZE,
21};
22pub use alopex_core::{MemoryStats, Metric, TxnMode};
23use std::collections::HashMap;
24use std::convert::TryInto;
25use std::fs;
26use std::path::Path;
27use std::result;
28use std::sync::Arc;
29
30/// A convenience `Result` type for database operations.
31pub type Result<T> = result::Result<T, Error>;
32
33/// The error type for embedded database operations.
34#[derive(Debug, thiserror::Error)]
35pub enum Error {
36    /// An error from the underlying core storage engine.
37    #[error("core error: {0}")]
38    Core(#[from] alopex_core::Error),
39    /// The transaction has already been completed and cannot be used.
40    #[error("transaction is completed")]
41    TxnCompleted,
42    /// The requested table was not found or is invalid.
43    #[error("table not found: {0}")]
44    TableNotFound(String),
45    /// The operation requires in-memory columnar mode.
46    #[error("not in in-memory columnar mode")]
47    NotInMemoryMode,
48}
49
50/// The main database object.
51pub struct Database {
52    /// The underlying key-value store.
53    pub(crate) store: Arc<AnyKV>,
54    pub(crate) columnar_mode: StorageMode,
55    pub(crate) columnar_bridge: ColumnarKvsBridge,
56    pub(crate) columnar_memory: Option<InMemorySegmentStore>,
57    pub(crate) segment_config: SegmentConfigV2,
58}
59
60pub(crate) fn disk_data_dir_path(path: &Path) -> std::path::PathBuf {
61    if path.extension().is_some_and(|e| e == "alopex") {
62        // v0.1 file-mode はディレクトリに WAL/SSTable を持つため、`.alopex` の横に
63        // sidecar ディレクトリを作ってそこへ格納する。
64        path.with_extension("alopex.d")
65    } else {
66        path.to_path_buf()
67    }
68}
69
70impl Database {
71    /// Opens a database at the specified path.
72    pub fn open(path: &Path) -> Result<Self> {
73        let data_dir = disk_data_dir_path(path);
74        let store = StorageFactory::create(alopex_core::StorageMode::Disk {
75            path: data_dir,
76            config: None,
77        })
78        .map_err(Error::Core)?;
79        Ok(Self::init(
80            store,
81            StorageMode::Disk,
82            None,
83            SegmentConfigV2::default(),
84        ))
85    }
86
87    /// Creates a new, purely in-memory (transient) database.
88    pub fn new() -> Self {
89        let store = AnyKV::Memory(MemoryKV::new());
90        Self::init(
91            store,
92            StorageMode::InMemory,
93            None,
94            SegmentConfigV2::default(),
95        )
96    }
97
98    /// Opens a database in in-memory mode with default options.
99    pub fn open_in_memory() -> Result<Self> {
100        Self::open_in_memory_with_options(DatabaseOptions::in_memory())
101    }
102
103    /// Opens a database in in-memory mode with the given options.
104    pub fn open_in_memory_with_options(opts: DatabaseOptions) -> Result<Self> {
105        if !opts.memory_mode() {
106            return Err(Error::Core(alopex_core::Error::InvalidFormat(
107                "memory_mode must be enabled for in-memory open".into(),
108            )));
109        }
110        let store = StorageFactory::create(opts.to_storage_mode(None)).map_err(Error::Core)?;
111        Ok(Self::init(
112            store,
113            StorageMode::InMemory,
114            opts.memory_limit(),
115            SegmentConfigV2::default(),
116        ))
117    }
118
119    pub(crate) fn init(
120        store: AnyKV,
121        columnar_mode: StorageMode,
122        memory_limit: Option<usize>,
123        segment_config: SegmentConfigV2,
124    ) -> Self {
125        let store = Arc::new(store);
126        let columnar_bridge = ColumnarKvsBridge::new(store.clone());
127        let columnar_memory = if matches!(columnar_mode, StorageMode::InMemory) {
128            Some(InMemorySegmentStore::new(memory_limit.map(|v| v as u64)))
129        } else {
130            None
131        };
132
133        Self {
134            store,
135            columnar_mode,
136            columnar_bridge,
137            columnar_memory,
138            segment_config,
139        }
140    }
141
142    /// Flushes the current in-memory data to an SSTable on disk (beta).
143    pub fn flush(&self) -> Result<()> {
144        self.store.flush().map_err(Error::Core)
145    }
146
147    /// Returns current memory usage statistics (in-memory KV only).
148    pub fn memory_usage(&self) -> Option<MemoryStats> {
149        match self.store.as_ref() {
150            AnyKV::Memory(kv) => Some(kv.memory_stats()),
151            AnyKV::Lsm(_) => None,
152        }
153    }
154
155    /// Persists the current in-memory database to disk atomically.
156    ///
157    /// `wal_path` は「データディレクトリ」として扱う(file-mode)。
158    pub fn persist_to_disk(&self, wal_path: &Path) -> Result<()> {
159        if !matches!(self.store.as_ref(), AnyKV::Memory(_)) {
160            return Err(Error::NotInMemoryMode);
161        }
162        let data_dir = disk_data_dir_path(wal_path);
163        if wal_path.exists() || data_dir.exists() {
164            return Err(Error::Core(alopex_core::Error::PathExists(
165                wal_path.to_path_buf(),
166            )));
167        }
168
169        let tmp_dir = data_dir.with_extension("tmp");
170        if tmp_dir.exists() {
171            return Err(Error::Core(alopex_core::Error::PathExists(tmp_dir)));
172        }
173
174        let snapshot = self.snapshot_pairs()?;
175        let write_result = (|| -> Result<()> {
176            let store = StorageFactory::create(alopex_core::StorageMode::Disk {
177                path: tmp_dir.clone(),
178                config: None,
179            })
180            .map_err(Error::Core)?;
181
182            let mut txn = store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
183            for (key, value) in snapshot {
184                txn.put(key, value).map_err(Error::Core)?;
185            }
186            txn.commit_self().map_err(Error::Core)?;
187
188            Ok(())
189        })();
190
191        if let Err(e) = write_result {
192            let _ = fs::remove_dir_all(&tmp_dir);
193            return Err(e);
194        }
195
196        fs::rename(&tmp_dir, &data_dir).map_err(|e| Error::Core(e.into()))?;
197        if wal_path.extension().is_some_and(|e| e == "alopex") {
198            // `.alopex` パスを渡した場合は、存在確認用のマーカーを作る。
199            let _ = fs::OpenOptions::new()
200                .create_new(true)
201                .write(true)
202                .open(wal_path);
203        }
204        Ok(())
205    }
206
207    /// Creates a fully in-memory clone of the current database.
208    pub fn clone_to_memory(&self) -> Result<Self> {
209        let snapshot = self.snapshot_pairs()?;
210        let cloned = Database::open_in_memory()?;
211        if snapshot.is_empty() {
212            return Ok(cloned);
213        }
214
215        let mut txn = cloned.begin(TxnMode::ReadWrite)?;
216        for (key, value) in snapshot {
217            txn.put(&key, &value)?;
218        }
219        txn.commit()?;
220        Ok(cloned)
221    }
222
223    /// Clears all data while keeping the database usable.
224    pub fn clear(&self) -> Result<()> {
225        let keys: Vec<Key> = self.snapshot_pairs()?.into_iter().map(|(k, _)| k).collect();
226        if keys.is_empty() {
227            return Ok(());
228        }
229        let mut txn = self.begin(TxnMode::ReadWrite)?;
230        for key in keys {
231            txn.delete(&key)?;
232        }
233        txn.commit()
234    }
235
236    /// Updates the memory limit in bytes for the underlying in-memory store.
237    pub fn set_memory_limit(&self, bytes: Option<usize>) {
238        if let AnyKV::Memory(kv) = self.store.as_ref() {
239            kv.txn_manager().set_memory_limit(bytes);
240        }
241    }
242
243    /// Returns a read-only snapshot of all key-value pairs.
244    pub fn snapshot(&self) -> Vec<(Key, Vec<u8>)> {
245        self.snapshot_pairs().unwrap_or_default()
246    }
247
248    fn snapshot_pairs(&self) -> Result<Vec<(Key, Vec<u8>)>> {
249        let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
250        let pairs: Vec<(Key, Vec<u8>)> = txn.scan_prefix(b"").map_err(Error::Core)?.collect();
251        txn.commit_self().map_err(Error::Core)?;
252        Ok(pairs)
253    }
254
255    /// HNSW インデックスを作成し、永続化する。
256    pub fn create_hnsw_index(&self, name: &str, config: HnswConfig) -> Result<()> {
257        let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
258        let index = HnswIndex::create(name, config).map_err(Error::Core)?;
259        index.save(&mut txn).map_err(Error::Core)?;
260        txn.commit_self().map_err(Error::Core)
261    }
262
263    /// HNSW インデックスを削除する。
264    pub fn drop_hnsw_index(&self, name: &str) -> Result<()> {
265        let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
266        let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
267        index.drop(&mut txn).map_err(Error::Core)?;
268        txn.commit_self().map_err(Error::Core)
269    }
270
271    /// HNSW 統計情報を取得する。
272    pub fn get_hnsw_stats(&self, name: &str) -> Result<HnswStats> {
273        let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
274        let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
275        Ok(index.stats())
276    }
277
278    /// HNSW インデックスをコンパクションし、結果を返す。
279    pub fn compact_hnsw_index(&self, name: &str) -> Result<alopex_core::vector::CompactionResult> {
280        let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
281        let mut index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
282        let result = index.compact().map_err(Error::Core)?;
283        index.save(&mut txn).map_err(Error::Core)?;
284        txn.commit_self().map_err(Error::Core)?;
285        Ok(result)
286    }
287
288    /// HNSW インデックスに検索を行う。
289    pub fn search_hnsw(
290        &self,
291        name: &str,
292        query: &[f32],
293        k: usize,
294        ef_search: Option<usize>,
295    ) -> Result<(Vec<HnswSearchResult>, HnswSearchStats)> {
296        let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
297        let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
298        index.search(query, k, ef_search).map_err(Error::Core)
299    }
300
301    /// Creates a chunked large value writer for opaque blobs (beta).
302    pub fn create_blob_writer(
303        &self,
304        path: &Path,
305        total_len: u64,
306        chunk_size: Option<u32>,
307    ) -> Result<LargeValueWriter> {
308        let meta = LargeValueMeta {
309            kind: LargeValueKind::Blob,
310            total_len,
311            chunk_size: chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE),
312        };
313        LargeValueWriter::create(path, meta).map_err(Error::Core)
314    }
315
316    /// Creates a chunked large value writer for typed payloads (beta).
317    pub fn create_typed_writer(
318        &self,
319        path: &Path,
320        type_id: u16,
321        total_len: u64,
322        chunk_size: Option<u32>,
323    ) -> Result<LargeValueWriter> {
324        let meta = LargeValueMeta {
325            kind: LargeValueKind::Typed(type_id),
326            total_len,
327            chunk_size: chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE),
328        };
329        LargeValueWriter::create(path, meta).map_err(Error::Core)
330    }
331
332    /// Opens a chunked large value reader (beta). Kind/type is read from the file header.
333    pub fn open_large_value(&self, path: &Path) -> Result<LargeValueReader> {
334        LargeValueReader::open(path).map_err(Error::Core)
335    }
336
337    /// Begins a new transaction.
338    pub fn begin(&self, mode: TxnMode) -> Result<Transaction<'_>> {
339        let txn = self.store.begin(mode).map_err(Error::Core)?;
340        Ok(Transaction {
341            inner: Some(txn),
342            db: self,
343            hnsw_indices: HashMap::new(),
344        })
345    }
346}
347
348impl Default for Database {
349    fn default() -> Self {
350        Self::new()
351    }
352}
353
354/// A database transaction.
355pub struct Transaction<'a> {
356    inner: Option<AnyKVTransaction<'a>>,
357    db: &'a Database,
358    hnsw_indices: HashMap<String, (HnswIndex, alopex_core::vector::hnsw::HnswTransactionState)>,
359}
360
361/// A search result row containing key, metadata, and similarity score.
362#[derive(Debug, Clone, PartialEq)]
363pub struct SearchResult {
364    /// User key associated with the vector.
365    pub key: Key,
366    /// Opaque metadata payload stored alongside the vector.
367    pub metadata: Vec<u8>,
368    /// Similarity score for the query/vector pair.
369    pub score: f32,
370}
371
372const VECTOR_INDEX_KEY: &[u8] = b"__alopex_vector_index";
373
374impl<'a> Transaction<'a> {
375    /// Retrieves the value for a given key.
376    pub fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
377        self.inner_mut()?.get(&key.to_vec()).map_err(Error::Core)
378    }
379
380    /// Sets a value for a given key.
381    pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
382        self.inner_mut()?
383            .put(key.to_vec(), value.to_vec())
384            .map_err(Error::Core)
385    }
386
387    /// Deletes a key-value pair.
388    pub fn delete(&mut self, key: &[u8]) -> Result<()> {
389        self.inner_mut()?.delete(key.to_vec()).map_err(Error::Core)
390    }
391
392    /// HNSW にベクトルをステージング挿入/更新する。
393    pub fn upsert_to_hnsw(
394        &mut self,
395        index_name: &str,
396        key: &[u8],
397        vector: &[f32],
398        metadata: &[u8],
399    ) -> Result<()> {
400        self.ensure_write_txn()?;
401        let (index, state) = self.hnsw_entry_mut(index_name)?;
402        index
403            .upsert_staged(key, vector, metadata, state)
404            .map_err(Error::Core)
405    }
406
407    /// HNSW からキーをステージング削除する。
408    pub fn delete_from_hnsw(&mut self, index_name: &str, key: &[u8]) -> Result<bool> {
409        self.ensure_write_txn()?;
410        let (index, state) = self.hnsw_entry_mut(index_name)?;
411        index.delete_staged(key, state).map_err(Error::Core)
412    }
413
414    /// Upserts a vector and metadata under the provided key after validating dimensions and metric.
415    ///
416    /// A small internal index is maintained to enable scanning for similarity search.
417    pub fn upsert_vector(
418        &mut self,
419        key: &[u8],
420        metadata: &[u8],
421        vector: &[f32],
422        metric: Metric,
423    ) -> Result<()> {
424        if vector.is_empty() {
425            return Err(Error::Core(alopex_core::Error::InvalidFormat(
426                "vector cannot be empty".into(),
427            )));
428        }
429        let vt = VectorType::new(vector.len(), metric);
430        vt.validate(vector).map_err(Error::Core)?;
431
432        let payload = encode_vector_entry(vt, metadata, vector);
433        let txn = self.inner_mut()?;
434        txn.put(key.to_vec(), payload).map_err(Error::Core)?;
435
436        let mut keys = self.load_vector_index()?;
437        if !keys.iter().any(|k| k == key) {
438            keys.push(key.to_vec());
439            self.persist_vector_index(&keys)?;
440        }
441        Ok(())
442    }
443
444    /// Executes a flat similarity search over stored vectors using the provided metric and query.
445    ///
446    /// The optional `filter_keys` restricts the scan to the given keys; otherwise the full
447    /// vector index is scanned. Results are sorted by descending score and truncated to `top_k`.
448    pub fn search_similar(
449        &mut self,
450        query_vector: &[f32],
451        metric: Metric,
452        top_k: usize,
453        filter_keys: Option<&[Key]>,
454    ) -> Result<Vec<SearchResult>> {
455        if top_k == 0 {
456            return Ok(Vec::new());
457        }
458
459        let mut keys = match filter_keys {
460            Some(keys) => keys.to_vec(),
461            None => self.load_vector_index()?,
462        };
463        if keys.is_empty() {
464            return Ok(Vec::new());
465        }
466
467        let mut rows = Vec::new();
468        let txn = self.inner_mut()?;
469        for key in keys.drain(..) {
470            let Some(raw) = txn.get(&key).map_err(Error::Core)? else {
471                continue;
472            };
473            let decoded = decode_vector_entry(&raw).map_err(Error::Core)?;
474            if decoded.metric != metric {
475                return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
476                    metric: metric.as_str().to_string(),
477                }));
478            }
479            validate_dimensions(decoded.dim, query_vector.len()).map_err(Error::Core)?;
480            let score = score(metric, query_vector, &decoded.vector).map_err(Error::Core)?;
481            rows.push(SearchResult {
482                key,
483                metadata: decoded.metadata,
484                score,
485            });
486        }
487
488        rows.sort_by(|a, b| b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key)));
489        if rows.len() > top_k {
490            rows.truncate(top_k);
491        }
492        Ok(rows)
493    }
494
495    fn load_vector_index(&mut self) -> Result<Vec<Key>> {
496        let txn = self.inner_mut()?;
497        let Some(raw) = txn.get(&VECTOR_INDEX_KEY.to_vec()).map_err(Error::Core)? else {
498            return Ok(Vec::new());
499        };
500        decode_index(&raw).map_err(Error::Core)
501    }
502
503    fn persist_vector_index(&mut self, keys: &[Key]) -> Result<()> {
504        let txn = self.inner_mut()?;
505        let encoded = encode_index(keys)?;
506        txn.put(VECTOR_INDEX_KEY.to_vec(), encoded)
507            .map_err(Error::Core)
508    }
509
510    /// Commits the transaction, applying all changes.
511    pub fn commit(mut self) -> Result<()> {
512        {
513            let txn = self.inner.as_mut().ok_or(Error::TxnCompleted)?;
514            for (_, (index, state)) in self.hnsw_indices.iter_mut() {
515                index.commit_staged(txn, state).map_err(Error::Core)?;
516            }
517        }
518        let txn = self.inner.take().ok_or(Error::TxnCompleted)?;
519        self.hnsw_indices.clear();
520        txn.commit_self().map_err(Error::Core)
521    }
522
523    /// Rolls back the transaction, discarding all changes.
524    pub fn rollback(mut self) -> Result<()> {
525        if let Some(txn) = self.inner.take() {
526            for (_, (index, state)) in self.hnsw_indices.iter_mut() {
527                let _ = index.rollback(state);
528            }
529            self.hnsw_indices.clear();
530            txn.rollback_self().map_err(Error::Core)
531        } else {
532            Err(Error::TxnCompleted)
533        }
534    }
535
536    fn inner_mut(&mut self) -> Result<&mut AnyKVTransaction<'a>> {
537        self.inner.as_mut().ok_or(Error::TxnCompleted)
538    }
539
540    fn hnsw_entry_mut(&mut self, name: &str) -> Result<&mut (HnswIndex, HnswTransactionState)> {
541        if !self.hnsw_indices.contains_key(name) {
542            let index = {
543                let txn = self.inner_mut()?;
544                HnswIndex::load(name, txn).map_err(Error::Core)?
545            };
546            self.hnsw_indices
547                .insert(name.to_string(), (index, HnswTransactionState::default()));
548        }
549        Ok(self.hnsw_indices.get_mut(name).unwrap())
550    }
551
552    fn ensure_write_txn(&self) -> Result<()> {
553        let txn = self.inner.as_ref().ok_or(Error::TxnCompleted)?;
554        if txn.mode() != TxnMode::ReadWrite {
555            return Err(Error::Core(alopex_core::Error::TxnConflict));
556        }
557        Ok(())
558    }
559}
560
561impl<'a> Drop for Transaction<'a> {
562    fn drop(&mut self) {
563        if let Some(txn) = self.inner.take() {
564            for (_, (index, state)) in self.hnsw_indices.iter_mut() {
565                let _ = index.rollback(state);
566            }
567            self.hnsw_indices.clear();
568            let _ = txn.rollback_self();
569        }
570    }
571}
572
573fn metric_to_byte(metric: Metric) -> u8 {
574    match metric {
575        Metric::Cosine => 0,
576        Metric::L2 => 1,
577        Metric::InnerProduct => 2,
578    }
579}
580
581fn byte_to_metric(byte: u8) -> result::Result<Metric, alopex_core::Error> {
582    match byte {
583        0 => Ok(Metric::Cosine),
584        1 => Ok(Metric::L2),
585        2 => Ok(Metric::InnerProduct),
586        other => Err(alopex_core::Error::UnsupportedMetric {
587            metric: format!("unknown({other})"),
588        }),
589    }
590}
591
592fn encode_vector_entry(vector_type: VectorType, metadata: &[u8], vector: &[f32]) -> Vec<u8> {
593    let dim = vector_type.dim() as u32;
594    let meta_len = metadata.len() as u32;
595    let mut buf = Vec::with_capacity(1 + 4 + 4 + metadata.len() + std::mem::size_of_val(vector));
596    buf.push(metric_to_byte(vector_type.metric()));
597    buf.extend_from_slice(&dim.to_le_bytes());
598    buf.extend_from_slice(&meta_len.to_le_bytes());
599    buf.extend_from_slice(metadata);
600    for v in vector {
601        buf.extend_from_slice(&v.to_le_bytes());
602    }
603    buf
604}
605
606struct DecodedEntry {
607    metric: Metric,
608    dim: usize,
609    metadata: Vec<u8>,
610    vector: Vec<f32>,
611}
612
613fn decode_vector_entry(bytes: &[u8]) -> result::Result<DecodedEntry, alopex_core::Error> {
614    if bytes.len() < 9 {
615        return Err(alopex_core::Error::InvalidFormat(
616            "vector entry too short".into(),
617        ));
618    }
619    let metric = byte_to_metric(bytes[0])?;
620    let dim = u32::from_le_bytes(bytes[1..5].try_into().unwrap()) as usize;
621    let meta_len = u32::from_le_bytes(bytes[5..9].try_into().unwrap()) as usize;
622
623    let header = 9;
624    let expected_len = header + meta_len + dim * std::mem::size_of::<f32>();
625    if bytes.len() < expected_len {
626        return Err(alopex_core::Error::InvalidFormat(
627            "vector entry truncated".into(),
628        ));
629    }
630
631    let metadata = bytes[header..header + meta_len].to_vec();
632    let mut vector = Vec::with_capacity(dim);
633    let vec_bytes = &bytes[header + meta_len..expected_len];
634    for chunk in vec_bytes.chunks_exact(4) {
635        vector.push(f32::from_le_bytes(chunk.try_into().unwrap()));
636    }
637
638    Ok(DecodedEntry {
639        metric,
640        dim,
641        metadata,
642        vector,
643    })
644}
645
646fn encode_index(keys: &[Key]) -> result::Result<Vec<u8>, alopex_core::Error> {
647    let mut buf = Vec::new();
648    let count = keys.len() as u32;
649    buf.extend_from_slice(&count.to_le_bytes());
650    for key in keys {
651        let len: u32 = key
652            .len()
653            .try_into()
654            .map_err(|_| alopex_core::Error::InvalidFormat("key too long".into()))?;
655        buf.extend_from_slice(&len.to_le_bytes());
656        buf.extend_from_slice(key);
657    }
658    Ok(buf)
659}
660
661fn decode_index(bytes: &[u8]) -> result::Result<Vec<Key>, alopex_core::Error> {
662    if bytes.len() < 4 {
663        return Err(alopex_core::Error::InvalidFormat("index too short".into()));
664    }
665    let count = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
666    let mut pos = 4;
667    let mut keys = Vec::with_capacity(count);
668    for _ in 0..count {
669        if pos + 4 > bytes.len() {
670            return Err(alopex_core::Error::InvalidFormat("index truncated".into()));
671        }
672        let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
673        pos += 4;
674        if pos + len > bytes.len() {
675            return Err(alopex_core::Error::InvalidFormat(
676                "index key truncated".into(),
677            ));
678        }
679        keys.push(bytes[pos..pos + len].to_vec());
680        pos += len;
681    }
682    Ok(keys)
683}
684
685#[cfg(test)]
686mod tests {
687    use super::*;
688    use std::sync::mpsc;
689    use std::thread;
690    use tempfile::tempdir;
691
692    #[test]
693    fn test_open_and_crud() {
694        let dir = tempdir().unwrap();
695        let path = dir.path().join("test.db");
696        let db = Database::open(&path).unwrap();
697
698        let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
699        txn.put(b"key1", b"value1").unwrap();
700        txn.commit().unwrap();
701
702        let mut txn2 = db.begin(TxnMode::ReadOnly).unwrap();
703        let val = txn2.get(b"key1").unwrap();
704        assert_eq!(val, Some(b"value1".to_vec()));
705    }
706
707    #[test]
708    fn test_not_found() {
709        let db = Database::new();
710        let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
711        let val = txn.get(b"non-existent-key").unwrap();
712        assert!(val.is_none());
713    }
714
715    #[test]
716    fn test_crash_recovery_replays_wal() {
717        let dir = tempdir().unwrap();
718        let path = dir.path().join("replay.db");
719
720        {
721            let db = Database::open(&path).unwrap();
722            let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
723            txn.put(b"k1", b"v1").unwrap();
724            txn.commit().unwrap();
725
726            let mut uncommitted = db.begin(TxnMode::ReadWrite).unwrap();
727            uncommitted.put(b"k2", b"v2").unwrap();
728            // Drop without commit to simulate crash before commit.
729        }
730
731        let db = Database::open(&path).unwrap();
732        let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
733        assert_eq!(txn.get(b"k1").unwrap(), Some(b"v1".to_vec()));
734        assert_eq!(txn.get(b"k2").unwrap(), None);
735    }
736
737    #[test]
738    fn test_txn_closed() {
739        let db = Database::new();
740        let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
741        txn.put(b"k1", b"v1").unwrap();
742        txn.commit().unwrap();
743        // The `commit` call consumes the transaction, so we can't call it again.
744        // This test verifies that we can't use a transaction after it's been completed.
745        // The `inner_mut` method will return `Error::TxnCompleted`.
746        // This is a compile-time check in practice, but we can't write a test that fails to compile.
747        // The logic is sound.
748    }
749
750    #[test]
751    fn test_concurrency_conflict() {
752        let db = std::sync::Arc::new(Database::new());
753        let mut t0 = db.begin(TxnMode::ReadWrite).unwrap();
754        t0.put(b"k1", b"v0").unwrap();
755        t0.commit().unwrap();
756
757        let (tx1, rx1) = mpsc::channel();
758        let (tx2, rx2) = mpsc::channel();
759
760        let db1 = db.clone();
761        let t1 = thread::spawn(move || {
762            let mut txn1 = db1.begin(TxnMode::ReadWrite).unwrap();
763            let val = txn1.get(b"k1").unwrap();
764            assert_eq!(val.unwrap(), b"v0");
765            tx1.send(()).unwrap();
766            rx2.recv().unwrap();
767            txn1.put(b"k1", b"v1").unwrap();
768            let result = txn1.commit();
769            assert!(matches!(
770                result,
771                Err(Error::Core(alopex_core::Error::TxnConflict))
772            ));
773        });
774
775        let db2 = db.clone();
776        let t2 = thread::spawn(move || {
777            rx1.recv().unwrap();
778            let mut txn2 = db2.begin(TxnMode::ReadWrite).unwrap();
779            txn2.put(b"k1", b"v2").unwrap();
780            assert!(txn2.commit().is_ok());
781            tx2.send(()).unwrap();
782        });
783
784        t1.join().unwrap();
785        t2.join().unwrap();
786
787        let mut txn3 = db.begin(TxnMode::ReadOnly).unwrap();
788        let val = txn3.get(b"k1").unwrap();
789        assert_eq!(val.unwrap(), b"v2");
790    }
791
792    #[test]
793    fn test_flush_and_reopen_via_embedded_api() {
794        let dir = tempdir().unwrap();
795        let path = dir.path().join("persist.db");
796        {
797            let db = Database::open(&path).unwrap();
798            let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
799            txn.put(b"k1", b"v1").unwrap();
800            txn.commit().unwrap();
801            db.flush().unwrap();
802        }
803
804        let db = Database::open(&path).unwrap();
805        let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
806        assert_eq!(txn.get(b"k1").unwrap(), Some(b"v1".to_vec()));
807    }
808
809    #[test]
810    fn test_large_value_blob_roundtrip() {
811        let dir = tempdir().unwrap();
812        let path = dir.path().join("blob.lv");
813        let payload = b"hello large value";
814
815        {
816            let db = Database::new();
817            let mut writer = db
818                .create_blob_writer(&path, payload.len() as u64, Some(16))
819                .unwrap();
820            writer.write_chunk(&payload[..5]).unwrap();
821            writer.write_chunk(&payload[5..]).unwrap();
822            writer.finish().unwrap();
823        }
824
825        let db = Database::new();
826        let mut reader = db.open_large_value(&path).unwrap();
827        let mut buf = Vec::new();
828        while let Some((_info, chunk)) = reader.next_chunk().unwrap() {
829            buf.extend_from_slice(&chunk);
830        }
831        assert_eq!(buf, payload);
832    }
833
834    #[test]
835    fn upsert_and_search_same_txn() {
836        let db = Database::new();
837        let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
838        txn.upsert_vector(b"k1", b"meta1", &[1.0, 0.0], Metric::Cosine)
839            .unwrap();
840
841        let results = txn
842            .search_similar(&[1.0, 0.0], Metric::Cosine, 1, None)
843            .unwrap();
844        assert_eq!(results.len(), 1);
845        assert_eq!(results[0].key, b"k1");
846        assert_eq!(results[0].metadata, b"meta1");
847        txn.commit().unwrap();
848    }
849
850    #[test]
851    fn upsert_and_search_across_txn() {
852        let db = Database::new();
853        {
854            let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
855            txn.upsert_vector(b"k1", b"meta1", &[1.0, 1.0], Metric::Cosine)
856                .unwrap();
857            txn.commit().unwrap();
858        }
859
860        let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
861        let results = ro
862            .search_similar(&[1.0, 1.0], Metric::Cosine, 1, None)
863            .unwrap();
864        assert_eq!(results.len(), 1);
865        assert_eq!(results[0].key, b"k1");
866    }
867
868    #[test]
869    fn read_only_upsert_rejected() {
870        let db = Database::new();
871        let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
872        let err = ro
873            .upsert_vector(b"k1", b"m", &[1.0, 0.0], Metric::Cosine)
874            .unwrap_err();
875        assert!(matches!(err, Error::Core(alopex_core::Error::TxnConflict)));
876    }
877
878    #[test]
879    fn dimension_mismatch_on_search() {
880        let db = Database::new();
881        {
882            let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
883            txn.upsert_vector(b"k1", b"m", &[1.0, 0.0], Metric::Cosine)
884                .unwrap();
885            txn.commit().unwrap();
886        }
887        let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
888        let err = ro
889            .search_similar(&[1.0, 0.0, 1.0], Metric::Cosine, 1, None)
890            .unwrap_err();
891        assert!(matches!(
892            err,
893            Error::Core(alopex_core::Error::DimensionMismatch { .. })
894        ));
895    }
896}