1#![deny(missing_docs)]
4
5pub mod catalog;
7pub mod catalog_api;
8pub mod columnar_api;
9pub mod options;
10mod sql_api;
11mod txn_manager;
12
13pub use crate::catalog::{CachedTableInfo, Catalog};
14pub use crate::catalog_api::{
15 CatalogInfo, ColumnDefinition, ColumnInfo, CreateCatalogRequest, CreateNamespaceRequest,
16 CreateTableRequest, IndexInfo, NamespaceInfo, StorageInfo, TableInfo,
17};
18pub use crate::columnar_api::{
19 ColumnarIndexInfo, ColumnarIndexType, ColumnarRowIterator, EmbeddedConfig, StorageMode,
20};
21pub use crate::options::DatabaseOptions;
22pub use crate::sql_api::{SqlStreamingResult, StreamingQueryResult, StreamingRows};
23pub use crate::txn_manager::{TransactionInfo, TransactionManager};
24pub use alopex_sql::{DataSourceFormat, TableType};
25pub type SqlResult = alopex_sql::SqlResult;
27use alopex_core::vector::hnsw::{HnswTransactionState, SearchStats as HnswSearchStats};
28use alopex_core::{
29 columnar::{
30 kvs_bridge::ColumnarKvsBridge, memory::InMemorySegmentStore, segment_v2::SegmentConfigV2,
31 },
32 kv::any::AnyKVTransaction,
33 kv::memory::MemoryKV,
34 kv::AnyKV,
35 validate_dimensions, HnswIndex, KVStore, KVTransaction, Key, LargeValueKind, LargeValueMeta,
36 LargeValueReader, LargeValueWriter, StorageFactory, VectorType, DEFAULT_CHUNK_SIZE,
37};
38pub use alopex_core::{HnswConfig, HnswSearchResult, HnswStats, MemoryStats, Metric, TxnMode};
39pub use alopex_sql::executor::QueryRowIterator;
41use std::collections::HashMap;
42use std::convert::TryInto;
43use std::fs;
44use std::path::{Path, PathBuf};
45use std::result;
46use std::sync::atomic::{AtomicU64, Ordering};
47use std::sync::{Arc, RwLock};
48
49pub type Result<T> = result::Result<T, Error>;
51
52#[derive(Debug, thiserror::Error)]
54pub enum Error {
55 #[error("core error: {0}")]
57 Core(#[from] alopex_core::Error),
58 #[error("{0}")]
60 Sql(#[from] alopex_sql::SqlError),
61 #[error("transaction is completed")]
63 TxnCompleted,
64 #[error("カタログが見つかりません: {0}")]
66 CatalogNotFound(String),
67 #[error("カタログは既に存在します: {0}")]
69 CatalogAlreadyExists(String),
70 #[error("カタログが空ではありません: {0}")]
72 CatalogNotEmpty(String),
73 #[error("ネームスペースが見つかりません: {0}.{1}")]
75 NamespaceNotFound(String, String),
76 #[error("ネームスペースは既に存在します: {0}.{1}")]
78 NamespaceAlreadyExists(String, String),
79 #[error("ネームスペースが空ではありません: {0}.{1}")]
81 NamespaceNotEmpty(String, String),
82 #[error("table not found: {0}")]
84 TableNotFound(String),
85 #[error("テーブルは既に存在します: {0}")]
87 TableAlreadyExists(String),
88 #[error("インデックスが見つかりません: {0}")]
90 IndexNotFound(String),
91 #[error("default オブジェクトは削除できません: {0}")]
93 CannotDeleteDefault(String),
94 #[error("managed テーブルにはスキーマが必要です")]
96 SchemaRequired,
97 #[error("external テーブルには storage_root が必要です")]
99 StorageRootRequired,
100 #[error("トランザクションは読み取り専用です")]
102 TxnReadOnly,
103 #[error("invalid transaction id: {0}")]
105 InvalidTransactionId(String),
106 #[error("not in in-memory columnar mode")]
108 NotInMemoryMode,
109 #[error("unsupported data source format: {0}")]
111 UnsupportedDataSourceFormat(String),
112 #[error("catalog lock poisoned")]
114 CatalogLockPoisoned,
115}
116
117impl Error {
118 pub fn sql_error_code(&self) -> Option<&'static str> {
120 match self {
121 Self::Sql(err) => Some(err.code()),
122 _ => None,
123 }
124 }
125}
126
127pub struct Database {
129 pub(crate) store: Arc<AnyKV>,
131 pub(crate) sql_catalog: Arc<RwLock<alopex_sql::catalog::PersistentCatalog<AnyKV>>>,
132 pub(crate) hnsw_cache: RwLock<HashMap<String, Arc<HnswIndex>>>,
133 pub(crate) vector_cache: RwLock<Option<HashMap<Key, CachedVector>>>,
134 pub(crate) table_info_cache: RwLock<HashMap<String, CachedTableInfo>>,
136 pub(crate) table_info_cache_epoch: AtomicU64,
138 pub(crate) columnar_mode: StorageMode,
139 pub(crate) columnar_bridge: ColumnarKvsBridge,
140 pub(crate) columnar_memory: Option<InMemorySegmentStore>,
141 pub(crate) segment_config: SegmentConfigV2,
142}
143
144pub(crate) fn disk_data_dir_path(path: &Path) -> std::path::PathBuf {
145 if path.extension().is_some_and(|e| e == "alopex") {
146 path.with_extension("alopex.d")
149 } else {
150 path.to_path_buf()
151 }
152}
153
154#[cfg(not(target_arch = "wasm32"))]
155fn read_file_version_from_storage(path: &Path) -> alopex_core::storage::format::FileVersion {
156 use alopex_core::storage::format::{FileHeader, FileVersion, HEADER_SIZE};
157 use std::io::Read;
158
159 let Some(file_path) = resolve_format_file_path(path) else {
160 return FileVersion::CURRENT;
161 };
162
163 let mut header_bytes = [0u8; HEADER_SIZE];
164 let Ok(mut file) = fs::File::open(file_path) else {
165 return FileVersion::CURRENT;
166 };
167 if file.read_exact(&mut header_bytes).is_err() {
168 return FileVersion::CURRENT;
169 }
170 match FileHeader::from_bytes(&header_bytes) {
171 Ok(header) => header.version,
172 Err(_) => FileVersion::CURRENT,
173 }
174}
175
176#[cfg(target_arch = "wasm32")]
177fn read_file_version_from_storage(_path: &Path) -> alopex_core::storage::format::FileVersion {
178 alopex_core::storage::format::FileVersion::CURRENT
179}
180
181#[cfg(not(target_arch = "wasm32"))]
182fn resolve_format_file_path(path: &Path) -> Option<PathBuf> {
183 if path.is_file() {
184 return Some(path.to_path_buf());
185 }
186
187 if path.is_dir() {
188 if let Some(ext) = path.extension() {
189 if ext == "d" {
190 let candidate = path.with_extension("");
191 if candidate.is_file() {
192 return Some(candidate);
193 }
194 }
195 }
196
197 if let Ok(entries) = fs::read_dir(path) {
198 for entry in entries.flatten() {
199 let entry_path = entry.path();
200 if entry_path.extension().is_some_and(|ext| ext == "alopex") && entry_path.is_file()
201 {
202 return Some(entry_path);
203 }
204 }
205 }
206 }
207
208 None
209}
210
211impl Database {
212 pub fn open(path: &Path) -> Result<Self> {
214 let data_dir = disk_data_dir_path(path);
215 let store = StorageFactory::create(alopex_core::StorageMode::Disk {
216 path: data_dir,
217 config: None,
218 })
219 .map_err(Error::Core)?;
220 let mut db = Self::init(store, StorageMode::Disk, None, SegmentConfigV2::default());
221 db.load_sql_catalog()?;
222 Ok(db)
223 }
224
225 pub fn new() -> Self {
227 let store = AnyKV::Memory(MemoryKV::new());
228 Self::init(
229 store,
230 StorageMode::InMemory,
231 None,
232 SegmentConfigV2::default(),
233 )
234 }
235
236 pub fn open_in_memory() -> Result<Self> {
238 Self::open_in_memory_with_options(DatabaseOptions::in_memory())
239 }
240
241 pub fn open_in_memory_with_options(opts: DatabaseOptions) -> Result<Self> {
243 if !opts.memory_mode() {
244 return Err(Error::Core(alopex_core::Error::InvalidFormat(
245 "memory_mode must be enabled for in-memory open".into(),
246 )));
247 }
248 let store = StorageFactory::create(opts.to_storage_mode(None)).map_err(Error::Core)?;
249 let mut db = Self::init(
250 store,
251 StorageMode::InMemory,
252 opts.memory_limit(),
253 SegmentConfigV2::default(),
254 );
255 db.load_sql_catalog()?;
256 Ok(db)
257 }
258
259 pub fn open_with_uri(uri: &str) -> Result<Self> {
275 if uri.starts_with("s3://") {
277 #[cfg(feature = "s3")]
278 {
279 return Self::open_s3(uri);
280 }
281 #[cfg(not(feature = "s3"))]
282 {
283 return Err(Error::Core(alopex_core::Error::InvalidFormat(
284 "S3 support requires the 's3' feature".into(),
285 )));
286 }
287 }
288
289 let path = if let Some(stripped) = uri.strip_prefix("file://") {
291 stripped
292 } else {
293 uri
294 };
295
296 Self::open(Path::new(path))
297 }
298
299 #[cfg(feature = "s3")]
318 pub fn open_s3(uri: &str) -> Result<Self> {
319 let s3_config = alopex_core::S3Config::from_uri(uri).map_err(Error::Core)?;
320 let store = StorageFactory::create(alopex_core::StorageMode::S3 { config: s3_config })
321 .map_err(Error::Core)?;
322 let mut db = Self::init(store, StorageMode::Disk, None, SegmentConfigV2::default());
323 db.load_sql_catalog()?;
324 Ok(db)
325 }
326
327 pub(crate) fn init(
328 store: AnyKV,
329 columnar_mode: StorageMode,
330 memory_limit: Option<usize>,
331 segment_config: SegmentConfigV2,
332 ) -> Self {
333 let store = Arc::new(store);
334 let sql_catalog = Arc::new(RwLock::new(alopex_sql::catalog::PersistentCatalog::new(
335 store.clone(),
336 )));
337 let columnar_bridge = ColumnarKvsBridge::new(store.clone());
338 let columnar_memory = if matches!(columnar_mode, StorageMode::InMemory) {
339 Some(InMemorySegmentStore::new(memory_limit.map(|v| v as u64)))
340 } else {
341 None
342 };
343
344 Self {
345 store,
346 sql_catalog,
347 hnsw_cache: RwLock::new(HashMap::new()),
348 vector_cache: RwLock::new(None),
349 table_info_cache: RwLock::new(HashMap::new()),
350 table_info_cache_epoch: AtomicU64::new(0),
351 columnar_mode,
352 columnar_bridge,
353 columnar_memory,
354 segment_config,
355 }
356 }
357
358 fn load_sql_catalog(&mut self) -> Result<()> {
359 use alopex_sql::catalog::CatalogError;
360
361 let loaded = match alopex_sql::catalog::PersistentCatalog::load(self.store.clone()) {
362 Ok(catalog) => catalog,
363 Err(CatalogError::Kv(alopex_core::Error::NotFound)) => {
364 alopex_sql::catalog::PersistentCatalog::new(self.store.clone())
365 }
366 Err(other) => return Err(Error::Sql(other.into())),
367 };
368
369 self.sql_catalog = Arc::new(RwLock::new(loaded));
370 Ok(())
371 }
372
373 fn hnsw_cache_get(&self, name: &str) -> Option<Arc<HnswIndex>> {
374 let cache = self.hnsw_cache.read().expect("hnsw cache lock poisoned");
375 cache.get(name).cloned()
376 }
377
378 fn hnsw_cache_insert(&self, name: &str, index: HnswIndex) -> Arc<HnswIndex> {
379 let index = Arc::new(index);
380 let mut cache = self.hnsw_cache.write().expect("hnsw cache lock poisoned");
381 cache.insert(name.to_string(), Arc::clone(&index));
382 index
383 }
384
385 fn hnsw_cache_remove(&self, name: &str) {
386 let mut cache = self.hnsw_cache.write().expect("hnsw cache lock poisoned");
387 cache.remove(name);
388 }
389
390 pub fn table_info_cache_epoch(&self) -> u64 {
392 self.table_info_cache_epoch.load(Ordering::Relaxed)
393 }
394
395 pub fn get_cached_table_info(
397 &self,
398 catalog_name: &str,
399 namespace_name: &str,
400 table_name: &str,
401 ) -> Option<CachedTableInfo> {
402 let cache = self
403 .table_info_cache
404 .read()
405 .expect("table info cache lock poisoned");
406 let key = format!("{}.{}.{}", catalog_name, namespace_name, table_name);
407 cache.get(&key).cloned()
408 }
409
410 pub fn cache_table_info(
412 &self,
413 catalog_name: &str,
414 namespace_name: &str,
415 table_name: &str,
416 info: CachedTableInfo,
417 ) {
418 let mut cache = self
419 .table_info_cache
420 .write()
421 .expect("table info cache lock poisoned");
422 let key = format!("{}.{}.{}", catalog_name, namespace_name, table_name);
423 cache.insert(key, info);
424 }
425
426 pub fn invalidate_table_info_cache(&self) {
428 self.table_info_cache_epoch.fetch_add(1, Ordering::Relaxed);
429 let mut cache = self
430 .table_info_cache
431 .write()
432 .expect("table info cache lock poisoned");
433 cache.clear();
434 }
435
436 pub fn flush(&self) -> Result<()> {
438 self.store.flush().map_err(Error::Core)
439 }
440
441 pub fn file_format_version(&self) -> alopex_core::storage::format::FileVersion {
443 use alopex_core::storage::format::FileVersion;
444
445 match self.store.as_ref() {
446 AnyKV::Memory(_) => FileVersion::CURRENT,
447 AnyKV::Lsm(kv) => read_file_version_from_storage(&kv.data_dir),
448 #[cfg(feature = "s3")]
449 AnyKV::S3(kv) => read_file_version_from_storage(kv.cache_dir()),
450 }
451 }
452
453 pub fn memory_usage(&self) -> Option<MemoryStats> {
455 match self.store.as_ref() {
456 AnyKV::Memory(kv) => Some(kv.memory_stats()),
457 AnyKV::Lsm(_) => None,
458 #[cfg(feature = "s3")]
459 AnyKV::S3(_) => None,
460 }
461 }
462
463 pub fn persist_to_disk(&self, wal_path: &Path) -> Result<()> {
467 if !matches!(self.store.as_ref(), AnyKV::Memory(_)) {
468 return Err(Error::NotInMemoryMode);
469 }
470 let data_dir = disk_data_dir_path(wal_path);
471 if wal_path.exists() || data_dir.exists() {
472 return Err(Error::Core(alopex_core::Error::PathExists(
473 wal_path.to_path_buf(),
474 )));
475 }
476
477 let tmp_dir = data_dir.with_extension("tmp");
478 if tmp_dir.exists() {
479 return Err(Error::Core(alopex_core::Error::PathExists(tmp_dir)));
480 }
481
482 let snapshot = self.snapshot_pairs()?;
483 let write_result = (|| -> Result<()> {
484 let store = StorageFactory::create(alopex_core::StorageMode::Disk {
485 path: tmp_dir.clone(),
486 config: None,
487 })
488 .map_err(Error::Core)?;
489
490 let mut txn = store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
491 for (key, value) in snapshot {
492 txn.put(key, value).map_err(Error::Core)?;
493 }
494 txn.commit_self().map_err(Error::Core)?;
495
496 Ok(())
497 })();
498
499 if let Err(e) = write_result {
500 let _ = fs::remove_dir_all(&tmp_dir);
501 return Err(e);
502 }
503
504 fs::rename(&tmp_dir, &data_dir).map_err(|e| Error::Core(e.into()))?;
505 if wal_path.extension().is_some_and(|e| e == "alopex") {
506 let _ = fs::OpenOptions::new()
508 .create_new(true)
509 .write(true)
510 .open(wal_path);
511 }
512 Ok(())
513 }
514
515 pub fn clone_to_memory(&self) -> Result<Self> {
517 let snapshot = self.snapshot_pairs()?;
518 let cloned = Database::open_in_memory()?;
519 if snapshot.is_empty() {
520 return Ok(cloned);
521 }
522
523 let mut txn = cloned.begin(TxnMode::ReadWrite)?;
524 for (key, value) in snapshot {
525 txn.put(&key, &value)?;
526 }
527 txn.commit()?;
528 Ok(cloned)
529 }
530
531 pub fn clear(&self) -> Result<()> {
533 let keys: Vec<Key> = self.snapshot_pairs()?.into_iter().map(|(k, _)| k).collect();
534 if keys.is_empty() {
535 return Ok(());
536 }
537 let mut txn = self.begin(TxnMode::ReadWrite)?;
538 for key in keys {
539 txn.delete(&key)?;
540 }
541 txn.commit()
542 }
543
544 pub fn set_memory_limit(&self, bytes: Option<usize>) {
546 if let AnyKV::Memory(kv) = self.store.as_ref() {
547 kv.txn_manager().set_memory_limit(bytes);
548 }
549 }
550
551 pub fn snapshot(&self) -> Vec<(Key, Vec<u8>)> {
553 self.snapshot_pairs().unwrap_or_default()
554 }
555
556 fn snapshot_pairs(&self) -> Result<Vec<(Key, Vec<u8>)>> {
557 let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
558 let pairs: Vec<(Key, Vec<u8>)> = txn.scan_prefix(b"").map_err(Error::Core)?.collect();
559 txn.commit_self().map_err(Error::Core)?;
560 Ok(pairs)
561 }
562
563 pub fn create_hnsw_index(&self, name: &str, config: HnswConfig) -> Result<()> {
565 let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
566 let index = HnswIndex::create(name, config).map_err(Error::Core)?;
567 index.save(&mut txn).map_err(Error::Core)?;
568 txn.commit_self().map_err(Error::Core)?;
569 self.hnsw_cache_insert(name, index);
570 Ok(())
571 }
572
573 pub fn drop_hnsw_index(&self, name: &str) -> Result<()> {
575 let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
576 let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
577 index.drop(&mut txn).map_err(Error::Core)?;
578 txn.commit_self().map_err(Error::Core)?;
579 self.hnsw_cache_remove(name);
580 Ok(())
581 }
582
583 pub fn get_hnsw_stats(&self, name: &str) -> Result<HnswStats> {
585 if let Some(index) = self.hnsw_cache_get(name) {
586 return Ok(index.stats());
587 }
588 let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
589 let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
590 let stats = index.stats();
591 self.hnsw_cache_insert(name, index);
592 Ok(stats)
593 }
594
595 pub fn compact_hnsw_index(&self, name: &str) -> Result<alopex_core::vector::CompactionResult> {
597 let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
598 let mut index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
599 let result = index.compact().map_err(Error::Core)?;
600 index.save(&mut txn).map_err(Error::Core)?;
601 txn.commit_self().map_err(Error::Core)?;
602 self.hnsw_cache_insert(name, index);
603 Ok(result)
604 }
605
606 pub fn search_hnsw(
608 &self,
609 name: &str,
610 query: &[f32],
611 k: usize,
612 ef_search: Option<usize>,
613 ) -> Result<(Vec<HnswSearchResult>, HnswSearchStats)> {
614 let profile = std::env::var_os("ALOPEX_PROFILE_HNSW").is_some();
615 let total_start = if profile {
616 Some(std::time::Instant::now())
617 } else {
618 None
619 };
620 if let Some(index) = self.hnsw_cache_get(name) {
621 let search_start = if profile {
622 Some(std::time::Instant::now())
623 } else {
624 None
625 };
626 let result = index.search(query, k, ef_search).map_err(Error::Core)?;
627 if let (true, Some(total_start), Some(search_start)) =
628 (profile, total_start, search_start)
629 {
630 let search_time = search_start.elapsed();
631 let total_time = total_start.elapsed();
632 eprintln!(
633 "alopex.hnsw_search cache=hit name={} k={} ef_search={:?} search_ms={:.2} total_ms={:.2}",
634 name,
635 k,
636 ef_search,
637 search_time.as_secs_f64() * 1000.0,
638 total_time.as_secs_f64() * 1000.0
639 );
640 }
641 return Ok(result);
642 }
643
644 let load_start = if profile {
645 Some(std::time::Instant::now())
646 } else {
647 None
648 };
649 let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
650 let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
651 let load_time = load_start.map(|start| start.elapsed());
652 let index = self.hnsw_cache_insert(name, index);
653 let search_start = if profile {
654 Some(std::time::Instant::now())
655 } else {
656 None
657 };
658 let result = index.search(query, k, ef_search).map_err(Error::Core)?;
659 if let (true, Some(total_start), Some(search_start)) = (profile, total_start, search_start)
660 {
661 let search_time = search_start.elapsed();
662 let total_time = total_start.elapsed();
663 let load_time_ms = load_time
664 .map(|elapsed| elapsed.as_secs_f64() * 1000.0)
665 .unwrap_or(0.0);
666 eprintln!(
667 "alopex.hnsw_search cache=miss name={} k={} ef_search={:?} load_ms={:.2} search_ms={:.2} total_ms={:.2}",
668 name,
669 k,
670 ef_search,
671 load_time_ms,
672 search_time.as_secs_f64() * 1000.0,
673 total_time.as_secs_f64() * 1000.0
674 );
675 }
676 Ok(result)
677 }
678
679 pub fn create_blob_writer(
681 &self,
682 path: &Path,
683 total_len: u64,
684 chunk_size: Option<u32>,
685 ) -> Result<LargeValueWriter> {
686 let meta = LargeValueMeta {
687 kind: LargeValueKind::Blob,
688 total_len,
689 chunk_size: chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE),
690 };
691 LargeValueWriter::create(path, meta).map_err(Error::Core)
692 }
693
694 pub fn create_typed_writer(
696 &self,
697 path: &Path,
698 type_id: u16,
699 total_len: u64,
700 chunk_size: Option<u32>,
701 ) -> Result<LargeValueWriter> {
702 let meta = LargeValueMeta {
703 kind: LargeValueKind::Typed(type_id),
704 total_len,
705 chunk_size: chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE),
706 };
707 LargeValueWriter::create(path, meta).map_err(Error::Core)
708 }
709
710 pub fn open_large_value(&self, path: &Path) -> Result<LargeValueReader> {
712 LargeValueReader::open(path).map_err(Error::Core)
713 }
714
715 pub fn begin(&self, mode: TxnMode) -> Result<Transaction<'_>> {
717 let txn = self.store.begin(mode).map_err(Error::Core)?;
718 Ok(Transaction {
719 inner: Some(txn),
720 db: self,
721 hnsw_indices: HashMap::new(),
722 overlay: alopex_sql::catalog::CatalogOverlay::new(),
723 vector_cache_updates: HashMap::new(),
724 vector_cache_deletes: Vec::new(),
725 vector_cache_invalidated: false,
726 catalog_modified: false,
727 })
728 }
729}
730
731impl Default for Database {
732 fn default() -> Self {
733 Self::new()
734 }
735}
736
737pub struct Transaction<'a> {
739 inner: Option<AnyKVTransaction<'a>>,
740 db: &'a Database,
741 hnsw_indices: HashMap<String, (HnswIndex, alopex_core::vector::hnsw::HnswTransactionState)>,
742 overlay: alopex_sql::catalog::CatalogOverlay,
743 vector_cache_updates: HashMap<Key, CachedVector>,
744 vector_cache_deletes: Vec<Key>,
745 vector_cache_invalidated: bool,
746 pub(crate) catalog_modified: bool,
748}
749
750#[derive(Debug, Clone, PartialEq)]
752pub struct SearchResult {
753 pub key: Key,
755 pub metadata: Vec<u8>,
757 pub score: f32,
759}
760
761const VECTOR_INDEX_KEY: &[u8] = b"__alopex_vector_index";
762
763impl<'a> Transaction<'a> {
764 pub(crate) fn catalog_overlay(&self) -> &alopex_sql::catalog::CatalogOverlay {
765 &self.overlay
766 }
767
768 pub(crate) fn catalog_overlay_mut(&mut self) -> &mut alopex_sql::catalog::CatalogOverlay {
769 &mut self.overlay
770 }
771
772 pub(crate) fn txn_mode(&self) -> Result<TxnMode> {
773 let txn = self.inner.as_ref().ok_or(Error::TxnCompleted)?;
774 Ok(txn.mode())
775 }
776 pub fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
778 self.inner_mut()?.get(&key.to_vec()).map_err(Error::Core)
779 }
780
781 pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
783 self.vector_cache_invalidated = true;
784 self.vector_cache_updates.clear();
785 self.vector_cache_deletes.clear();
786 self.inner_mut()?
787 .put(key.to_vec(), value.to_vec())
788 .map_err(Error::Core)
789 }
790
791 pub fn delete(&mut self, key: &[u8]) -> Result<()> {
793 self.vector_cache_deletes.push(key.to_vec());
794 self.inner_mut()?.delete(key.to_vec()).map_err(Error::Core)
795 }
796
797 pub fn scan_prefix(
801 &mut self,
802 prefix: &[u8],
803 ) -> Result<Box<dyn Iterator<Item = (Key, Vec<u8>)> + '_>> {
804 self.inner_mut()?.scan_prefix(prefix).map_err(Error::Core)
805 }
806
807 pub fn upsert_to_hnsw(
809 &mut self,
810 index_name: &str,
811 key: &[u8],
812 vector: &[f32],
813 metadata: &[u8],
814 ) -> Result<()> {
815 self.ensure_write_txn()?;
816 let (index, state) = self.hnsw_entry_mut(index_name)?;
817 index
818 .upsert_staged(key, vector, metadata, state)
819 .map_err(Error::Core)
820 }
821
822 pub fn delete_from_hnsw(&mut self, index_name: &str, key: &[u8]) -> Result<bool> {
824 self.ensure_write_txn()?;
825 let (index, state) = self.hnsw_entry_mut(index_name)?;
826 index.delete_staged(key, state).map_err(Error::Core)
827 }
828
829 pub fn upsert_vector(
833 &mut self,
834 key: &[u8],
835 metadata: &[u8],
836 vector: &[f32],
837 metric: Metric,
838 ) -> Result<()> {
839 if vector.is_empty() {
840 return Err(Error::Core(alopex_core::Error::InvalidFormat(
841 "vector cannot be empty".into(),
842 )));
843 }
844 let vt = VectorType::new(vector.len(), metric);
845 vt.validate(vector).map_err(Error::Core)?;
846
847 let payload = encode_vector_entry(vt, metadata, vector);
848 let txn = self.inner_mut()?;
849 txn.put(key.to_vec(), payload).map_err(Error::Core)?;
850
851 let mut keys = self.load_vector_index()?;
852 if !keys.iter().any(|k| k == key) {
853 keys.push(key.to_vec());
854 self.persist_vector_index(&keys)?;
855 }
856
857 let cached = cached_vector_from_entry(metric, metadata.to_vec(), vector.to_vec());
858 self.vector_cache_updates.insert(key.to_vec(), cached);
859 self.vector_cache_deletes.retain(|k| k != key);
860 Ok(())
861 }
862
863 pub fn get_vector(&mut self, key: &[u8], metric: Metric) -> Result<Option<Vec<f32>>> {
868 let txn = self.inner_mut()?;
869 let key_vec = key.to_vec();
870 let Some(raw) = txn.get(&key_vec).map_err(Error::Core)? else {
871 return Ok(None);
872 };
873 let decoded = decode_vector_entry(&raw).map_err(Error::Core)?;
874 if decoded.metric != metric {
875 return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
876 metric: metric.as_str().to_string(),
877 }));
878 }
879 Ok(Some(decoded.vector))
880 }
881
882 pub fn get_vectors(&mut self, keys: &[Key], metric: Metric) -> Result<Vec<Option<Vec<f32>>>> {
890 let txn = self.inner_mut()?;
891 let mut out = Vec::with_capacity(keys.len());
892 for key in keys {
893 let Some(raw) = txn.get(key).map_err(Error::Core)? else {
894 out.push(None);
895 continue;
896 };
897 let decoded = decode_vector_entry(&raw).map_err(Error::Core)?;
898 if decoded.metric != metric {
899 return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
900 metric: metric.as_str().to_string(),
901 }));
902 }
903 out.push(Some(decoded.vector));
904 }
905 Ok(out)
906 }
907
908 pub fn search_similar(
913 &mut self,
914 query_vector: &[f32],
915 metric: Metric,
916 top_k: usize,
917 filter_keys: Option<&[Key]>,
918 ) -> Result<Vec<SearchResult>> {
919 if top_k == 0 {
920 return Ok(Vec::new());
921 }
922
923 let profile = std::env::var_os("ALOPEX_PROFILE_SEARCH_SIMILAR").is_some();
924 let total_start = if profile {
925 Some(std::time::Instant::now())
926 } else {
927 None
928 };
929 let query_norm_sq = query_vector.iter().map(|v| v * v).sum::<f32>();
930 let query_norm = if matches!(metric, Metric::Cosine) {
931 query_norm_sq.sqrt()
932 } else {
933 0.0
934 };
935 let inv_query_norm = if query_norm == 0.0 {
936 0.0
937 } else {
938 1.0 / query_norm
939 };
940
941 if filter_keys.is_none() && self.txn_mode()? == TxnMode::ReadOnly {
942 let cache = self
943 .db
944 .vector_cache
945 .read()
946 .expect("vector cache lock poisoned");
947 if let Some(cache) = cache.as_ref() {
948 if cache.is_empty() {
949 return Ok(Vec::new());
950 }
951 let keys_len = cache.len();
952 let mut rows = Vec::with_capacity(keys_len);
953 let mut score_time = std::time::Duration::ZERO;
954 for (key, cached) in cache.iter() {
955 if cached.metric != metric {
956 return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
957 metric: metric.as_str().to_string(),
958 }));
959 }
960 validate_dimensions(cached.vector.len(), query_vector.len())
961 .map_err(Error::Core)?;
962 let score_start = if profile {
963 Some(std::time::Instant::now())
964 } else {
965 None
966 };
967 let dot = dot_product(query_vector, &cached.vector);
968 let score = match metric {
969 Metric::Cosine => {
970 if cached.inv_norm == 0.0 || inv_query_norm == 0.0 {
971 0.0
972 } else {
973 dot * cached.inv_norm * inv_query_norm
974 }
975 }
976 Metric::L2 => {
977 let dist_sq = query_norm_sq + cached.norm_sq - 2.0 * dot;
978 -dist_sq.sqrt()
979 }
980 Metric::InnerProduct => dot,
981 };
982 if let Some(score_start) = score_start {
983 score_time += score_start.elapsed();
984 }
985 rows.push(SearchResult {
986 key: key.clone(),
987 metadata: cached.metadata.clone(),
988 score,
989 });
990 }
991
992 let rows_total = rows.len();
993 let sort_start = if profile {
994 Some(std::time::Instant::now())
995 } else {
996 None
997 };
998 if rows.len() > top_k {
999 rows.select_nth_unstable_by(top_k - 1, |a, b| {
1000 b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key))
1001 });
1002 rows.truncate(top_k);
1003 }
1004 rows.sort_by(|a, b| b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key)));
1005 if let (true, Some(total_start), Some(sort_start)) =
1006 (profile, total_start, sort_start)
1007 {
1008 let sort_time = sort_start.elapsed();
1009 let total_time = total_start.elapsed();
1010 eprintln!(
1011 "alopex.search_similar keys={} results={} top_k={} load_keys_ms={:.2} get_ms={:.2} decode_ms={:.2} score_ms={:.2} sort_ms={:.2} total_ms={:.2}",
1012 keys_len,
1013 rows_total,
1014 top_k,
1015 0.0,
1016 0.0,
1017 0.0,
1018 score_time.as_secs_f64() * 1000.0,
1019 sort_time.as_secs_f64() * 1000.0,
1020 total_time.as_secs_f64() * 1000.0
1021 );
1022 }
1023 return Ok(rows);
1024 }
1025 }
1026
1027 let (keys, load_keys_time) = if profile {
1028 let start = std::time::Instant::now();
1029 let keys = match filter_keys {
1030 Some(keys) => keys.to_vec(),
1031 None => self.load_vector_index()?,
1032 };
1033 (keys, start.elapsed())
1034 } else {
1035 let keys = match filter_keys {
1036 Some(keys) => keys.to_vec(),
1037 None => self.load_vector_index()?,
1038 };
1039 (keys, std::time::Duration::ZERO)
1040 };
1041 if keys.is_empty() {
1042 return Ok(Vec::new());
1043 }
1044
1045 let keys_len = keys.len();
1046 let mut rows = Vec::with_capacity(keys.len());
1047 let txn = self.inner_mut()?;
1048 let mut get_time = std::time::Duration::ZERO;
1049 let mut decode_time = std::time::Duration::ZERO;
1050 let mut score_time = std::time::Duration::ZERO;
1051 if profile {
1052 for key in keys {
1053 let get_start = std::time::Instant::now();
1054 let Some(raw) = txn.get(&key).map_err(Error::Core)? else {
1055 get_time += get_start.elapsed();
1056 continue;
1057 };
1058 get_time += get_start.elapsed();
1059 let decode_start = std::time::Instant::now();
1060 let decoded = decode_vector_entry_view(&raw).map_err(Error::Core)?;
1061 decode_time += decode_start.elapsed();
1062 if decoded.metric != metric {
1063 return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
1064 metric: metric.as_str().to_string(),
1065 }));
1066 }
1067 validate_dimensions(decoded.dim, query_vector.len()).map_err(Error::Core)?;
1068 let score_start = std::time::Instant::now();
1069 let score =
1070 score_from_bytes(metric, query_vector, query_norm, decoded.vector_bytes)?;
1071 score_time += score_start.elapsed();
1072 rows.push(SearchResult {
1073 key,
1074 metadata: decoded.metadata,
1075 score,
1076 });
1077 }
1078 } else {
1079 for key in keys {
1080 let Some(raw) = txn.get(&key).map_err(Error::Core)? else {
1081 continue;
1082 };
1083 let decoded = decode_vector_entry_view(&raw).map_err(Error::Core)?;
1084 if decoded.metric != metric {
1085 return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
1086 metric: metric.as_str().to_string(),
1087 }));
1088 }
1089 validate_dimensions(decoded.dim, query_vector.len()).map_err(Error::Core)?;
1090 let score =
1091 score_from_bytes(metric, query_vector, query_norm, decoded.vector_bytes)?;
1092 rows.push(SearchResult {
1093 key,
1094 metadata: decoded.metadata,
1095 score,
1096 });
1097 }
1098 }
1099
1100 let rows_total = rows.len();
1101 let sort_start = if profile {
1102 Some(std::time::Instant::now())
1103 } else {
1104 None
1105 };
1106 if rows.len() > top_k {
1107 rows.select_nth_unstable_by(top_k - 1, |a, b| {
1108 b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key))
1109 });
1110 rows.truncate(top_k);
1111 }
1112 rows.sort_by(|a, b| b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key)));
1113 if let (true, Some(total_start), Some(sort_start)) = (profile, total_start, sort_start) {
1114 let sort_time = sort_start.elapsed();
1115 let total_time = total_start.elapsed();
1116 eprintln!(
1117 "alopex.search_similar keys={} results={} top_k={} load_keys_ms={:.2} get_ms={:.2} decode_ms={:.2} score_ms={:.2} sort_ms={:.2} total_ms={:.2}",
1118 keys_len,
1119 rows_total,
1120 top_k,
1121 load_keys_time.as_secs_f64() * 1000.0,
1122 get_time.as_secs_f64() * 1000.0,
1123 decode_time.as_secs_f64() * 1000.0,
1124 score_time.as_secs_f64() * 1000.0,
1125 sort_time.as_secs_f64() * 1000.0,
1126 total_time.as_secs_f64() * 1000.0
1127 );
1128 }
1129 Ok(rows)
1130 }
1131
1132 fn load_vector_index(&mut self) -> Result<Vec<Key>> {
1133 let txn = self.inner_mut()?;
1134 let Some(raw) = txn.get(&VECTOR_INDEX_KEY.to_vec()).map_err(Error::Core)? else {
1135 return Ok(Vec::new());
1136 };
1137 decode_index(&raw).map_err(Error::Core)
1138 }
1139
1140 fn persist_vector_index(&mut self, keys: &[Key]) -> Result<()> {
1141 let txn = self.inner_mut()?;
1142 let encoded = encode_index(keys)?;
1143 txn.put(VECTOR_INDEX_KEY.to_vec(), encoded)
1144 .map_err(Error::Core)
1145 }
1146
1147 pub fn commit(mut self) -> Result<()> {
1149 {
1150 let txn = self.inner.as_mut().ok_or(Error::TxnCompleted)?;
1151 for (_, (index, state)) in self.hnsw_indices.iter_mut() {
1152 index.commit_staged(txn, state).map_err(Error::Core)?;
1153 }
1154 let mut catalog = self.db.sql_catalog.write().expect("catalog lock poisoned");
1155 catalog
1156 .persist_overlay(txn, &self.overlay)
1157 .map_err(|err| Error::Sql(err.into()))?;
1158
1159 let vector_cache_invalidated = self.vector_cache_invalidated;
1160 let vector_cache_updates = std::mem::take(&mut self.vector_cache_updates);
1161 let vector_cache_deletes = std::mem::take(&mut self.vector_cache_deletes);
1162 if vector_cache_invalidated {
1163 let mut cache = self
1164 .db
1165 .vector_cache
1166 .write()
1167 .expect("vector cache lock poisoned");
1168 *cache = None;
1169 } else if !vector_cache_updates.is_empty() || !vector_cache_deletes.is_empty() {
1170 let needs_rebuild = {
1171 let cache = self
1172 .db
1173 .vector_cache
1174 .read()
1175 .expect("vector cache lock poisoned");
1176 cache.is_none()
1177 };
1178 if needs_rebuild {
1179 let rebuilt = build_vector_cache_from_txn(txn).map_err(Error::Core)?;
1180 let mut cache = self
1181 .db
1182 .vector_cache
1183 .write()
1184 .expect("vector cache lock poisoned");
1185 *cache = Some(rebuilt);
1186 } else {
1187 let mut cache = self
1188 .db
1189 .vector_cache
1190 .write()
1191 .expect("vector cache lock poisoned");
1192 if let Some(cache) = cache.as_mut() {
1193 for key in vector_cache_deletes {
1194 cache.remove(&key);
1195 }
1196 for (key, cached) in vector_cache_updates {
1197 cache.insert(key, cached);
1198 }
1199 }
1200 }
1201 }
1202 }
1203 let txn = self.inner.take().ok_or(Error::TxnCompleted)?;
1204 let hnsw_indices = std::mem::take(&mut self.hnsw_indices);
1205 txn.commit_self().map_err(Error::Core)?;
1206 if !hnsw_indices.is_empty() {
1207 let mut cache = self
1208 .db
1209 .hnsw_cache
1210 .write()
1211 .expect("hnsw cache lock poisoned");
1212 for (name, (index, _state)) in hnsw_indices {
1213 cache.insert(name, Arc::new(index));
1214 }
1215 }
1216
1217 let overlay = std::mem::take(&mut self.overlay);
1219 let catalog_modified = self.catalog_modified;
1220 let mut catalog = self.db.sql_catalog.write().expect("catalog lock poisoned");
1221 catalog.apply_overlay(overlay);
1222 drop(catalog); if catalog_modified {
1225 self.db.invalidate_table_info_cache();
1226 }
1227 Ok(())
1228 }
1229
1230 pub fn rollback_in_place(&mut self) -> Result<()> {
1232 let txn = self.inner.as_mut().ok_or(Error::TxnCompleted)?;
1233 txn.rollback_in_place().map_err(Error::Core)?;
1234 for (_, (index, state)) in self.hnsw_indices.iter_mut() {
1235 let _ = index.rollback(state);
1236 }
1237 self.hnsw_indices.clear();
1238 self.overlay = alopex_sql::catalog::CatalogOverlay::default();
1239 self.inner = None;
1240 Ok(())
1241 }
1242
1243 pub fn rollback(mut self) -> Result<()> {
1245 if let Some(txn) = self.inner.take() {
1246 for (_, (index, state)) in self.hnsw_indices.iter_mut() {
1247 let _ = index.rollback(state);
1248 }
1249 self.hnsw_indices.clear();
1250 txn.rollback_self().map_err(Error::Core)
1251 } else {
1252 Err(Error::TxnCompleted)
1253 }
1254 }
1255
1256 fn inner_mut(&mut self) -> Result<&mut AnyKVTransaction<'a>> {
1257 self.inner.as_mut().ok_or(Error::TxnCompleted)
1258 }
1259
1260 fn hnsw_entry_mut(&mut self, name: &str) -> Result<&mut (HnswIndex, HnswTransactionState)> {
1261 if !self.hnsw_indices.contains_key(name) {
1262 let index = {
1263 let txn = self.inner_mut()?;
1264 HnswIndex::load(name, txn).map_err(Error::Core)?
1265 };
1266 self.hnsw_indices
1267 .insert(name.to_string(), (index, HnswTransactionState::default()));
1268 }
1269 Ok(self.hnsw_indices.get_mut(name).unwrap())
1270 }
1271
1272 fn ensure_write_txn(&self) -> Result<()> {
1273 let txn = self.inner.as_ref().ok_or(Error::TxnCompleted)?;
1274 if txn.mode() != TxnMode::ReadWrite {
1275 return Err(Error::Core(alopex_core::Error::TxnReadOnly));
1276 }
1277 Ok(())
1278 }
1279}
1280
1281impl<'a> Drop for Transaction<'a> {
1282 fn drop(&mut self) {
1283 if let Some(txn) = self.inner.take() {
1284 for (_, (index, state)) in self.hnsw_indices.iter_mut() {
1285 let _ = index.rollback(state);
1286 }
1287 self.hnsw_indices.clear();
1288 let _ = txn.rollback_self();
1289 }
1290 }
1291}
1292
1293fn metric_to_byte(metric: Metric) -> u8 {
1294 match metric {
1295 Metric::Cosine => 0,
1296 Metric::L2 => 1,
1297 Metric::InnerProduct => 2,
1298 }
1299}
1300
1301fn byte_to_metric(byte: u8) -> result::Result<Metric, alopex_core::Error> {
1302 match byte {
1303 0 => Ok(Metric::Cosine),
1304 1 => Ok(Metric::L2),
1305 2 => Ok(Metric::InnerProduct),
1306 other => Err(alopex_core::Error::UnsupportedMetric {
1307 metric: format!("unknown({other})"),
1308 }),
1309 }
1310}
1311
1312fn encode_vector_entry(vector_type: VectorType, metadata: &[u8], vector: &[f32]) -> Vec<u8> {
1313 let dim = vector_type.dim() as u32;
1314 let meta_len = metadata.len() as u32;
1315 let mut buf = Vec::with_capacity(1 + 4 + 4 + metadata.len() + std::mem::size_of_val(vector));
1316 buf.push(metric_to_byte(vector_type.metric()));
1317 buf.extend_from_slice(&dim.to_le_bytes());
1318 buf.extend_from_slice(&meta_len.to_le_bytes());
1319 buf.extend_from_slice(metadata);
1320 for v in vector {
1321 buf.extend_from_slice(&v.to_le_bytes());
1322 }
1323 buf
1324}
1325
1326struct DecodedEntry {
1327 metric: Metric,
1328 vector: Vec<f32>,
1329}
1330
1331#[derive(Clone)]
1332struct CachedVector {
1333 metric: Metric,
1334 metadata: Vec<u8>,
1335 vector: Vec<f32>,
1336 norm_sq: f32,
1337 inv_norm: f32,
1338}
1339
1340struct VectorEntryView<'a> {
1341 metric: Metric,
1342 dim: usize,
1343 metadata: Vec<u8>,
1344 vector_bytes: &'a [u8],
1345}
1346
1347fn decode_vector_entry(bytes: &[u8]) -> result::Result<DecodedEntry, alopex_core::Error> {
1348 if bytes.len() < 9 {
1349 return Err(alopex_core::Error::InvalidFormat(
1350 "vector entry too short".into(),
1351 ));
1352 }
1353 let metric = byte_to_metric(bytes[0])?;
1354 let dim = u32::from_le_bytes(bytes[1..5].try_into().unwrap()) as usize;
1355 let meta_len = u32::from_le_bytes(bytes[5..9].try_into().unwrap()) as usize;
1356
1357 let header = 9;
1358 let expected_len = header + meta_len + dim * std::mem::size_of::<f32>();
1359 if bytes.len() < expected_len {
1360 return Err(alopex_core::Error::InvalidFormat(
1361 "vector entry truncated".into(),
1362 ));
1363 }
1364
1365 let mut vector = Vec::with_capacity(dim);
1366 let vec_bytes = &bytes[header + meta_len..expected_len];
1367 for chunk in vec_bytes.chunks_exact(4) {
1368 vector.push(f32::from_le_bytes(chunk.try_into().unwrap()));
1369 }
1370
1371 Ok(DecodedEntry { metric, vector })
1372}
1373
1374fn decode_vector_entry_view(
1375 bytes: &[u8],
1376) -> result::Result<VectorEntryView<'_>, alopex_core::Error> {
1377 if bytes.len() < 9 {
1378 return Err(alopex_core::Error::InvalidFormat(
1379 "vector entry too short".into(),
1380 ));
1381 }
1382 let metric = byte_to_metric(bytes[0])?;
1383 let dim = u32::from_le_bytes(bytes[1..5].try_into().unwrap()) as usize;
1384 let meta_len = u32::from_le_bytes(bytes[5..9].try_into().unwrap()) as usize;
1385
1386 let header = 9;
1387 let expected_len = header + meta_len + dim * std::mem::size_of::<f32>();
1388 if bytes.len() < expected_len {
1389 return Err(alopex_core::Error::InvalidFormat(
1390 "vector entry truncated".into(),
1391 ));
1392 }
1393
1394 let metadata = bytes[header..header + meta_len].to_vec();
1395 let vector_bytes = &bytes[header + meta_len..expected_len];
1396
1397 Ok(VectorEntryView {
1398 metric,
1399 dim,
1400 metadata,
1401 vector_bytes,
1402 })
1403}
1404
1405fn vector_bytes_to_vec(bytes: &[u8]) -> Vec<f32> {
1406 let mut vector = Vec::with_capacity(bytes.len() / 4);
1407 for chunk in bytes.chunks_exact(4) {
1408 vector.push(f32::from_le_bytes(chunk.try_into().unwrap()));
1409 }
1410 vector
1411}
1412
1413fn cached_vector_from_entry(metric: Metric, metadata: Vec<u8>, vector: Vec<f32>) -> CachedVector {
1414 let norm_sq = vector.iter().map(|v| v * v).sum::<f32>();
1415 let inv_norm = if norm_sq == 0.0 {
1416 0.0
1417 } else {
1418 1.0 / norm_sq.sqrt()
1419 };
1420 CachedVector {
1421 metric,
1422 metadata,
1423 vector,
1424 norm_sq,
1425 inv_norm,
1426 }
1427}
1428
1429fn build_vector_cache_from_txn<'a>(
1430 txn: &mut AnyKVTransaction<'a>,
1431) -> result::Result<HashMap<Key, CachedVector>, alopex_core::Error> {
1432 let Some(raw) = txn.get(&VECTOR_INDEX_KEY.to_vec())? else {
1433 return Ok(HashMap::new());
1434 };
1435 let keys = decode_index(&raw)?;
1436 let mut cache = HashMap::with_capacity(keys.len());
1437 for key in keys {
1438 let Some(raw) = txn.get(&key)? else {
1439 continue;
1440 };
1441 let decoded = decode_vector_entry_view(&raw)?;
1442 let vector = vector_bytes_to_vec(decoded.vector_bytes);
1443 let cached = cached_vector_from_entry(decoded.metric, decoded.metadata, vector);
1444 cache.insert(key, cached);
1445 }
1446 Ok(cache)
1447}
1448
1449fn dot_product(query: &[f32], item: &[f32]) -> f32 {
1450 #[cfg(target_arch = "x86_64")]
1451 {
1452 if std::is_x86_feature_detected!("avx") {
1453 unsafe {
1455 return dot_product_avx(query, item);
1456 }
1457 }
1458 }
1459 dot_product_scalar(query, item)
1460}
1461
1462fn dot_product_scalar(query: &[f32], item: &[f32]) -> f32 {
1463 query.iter().zip(item.iter()).map(|(q, v)| q * v).sum()
1464}
1465
1466#[cfg(target_arch = "x86_64")]
1467#[target_feature(enable = "avx")]
1468unsafe fn dot_product_avx(query: &[f32], item: &[f32]) -> f32 {
1469 use std::arch::x86_64::*;
1470
1471 let len = query.len();
1472 let mut i = 0;
1473 let mut acc = _mm256_setzero_ps();
1474 let q_ptr = query.as_ptr();
1475 let v_ptr = item.as_ptr();
1476 while i + 8 <= len {
1477 let q = _mm256_loadu_ps(q_ptr.add(i));
1478 let v = _mm256_loadu_ps(v_ptr.add(i));
1479 acc = _mm256_add_ps(acc, _mm256_mul_ps(q, v));
1480 i += 8;
1481 }
1482
1483 let mut tmp = [0f32; 8];
1484 _mm256_storeu_ps(tmp.as_mut_ptr(), acc);
1485 let mut sum = tmp.iter().sum::<f32>();
1486 while i < len {
1487 sum += *q_ptr.add(i) * *v_ptr.add(i);
1488 i += 1;
1489 }
1490 sum
1491}
1492
1493fn score_from_slice(metric: Metric, query: &[f32], query_norm: f32, item: &[f32]) -> f32 {
1494 match metric {
1495 Metric::Cosine => {
1496 if query_norm == 0.0 {
1497 return 0.0;
1498 }
1499 let mut dot = 0.0;
1500 let mut item_norm_sq = 0.0;
1501 for (q, v) in query.iter().zip(item.iter()) {
1502 dot += q * v;
1503 item_norm_sq += v * v;
1504 }
1505 let item_norm = item_norm_sq.sqrt();
1506 if item_norm == 0.0 {
1507 0.0
1508 } else {
1509 dot / (query_norm * item_norm)
1510 }
1511 }
1512 Metric::L2 => {
1513 let mut dist_sq = 0.0;
1514 for (q, v) in query.iter().zip(item.iter()) {
1515 let d = q - v;
1516 dist_sq += d * d;
1517 }
1518 -dist_sq.sqrt()
1519 }
1520 Metric::InnerProduct => query.iter().zip(item.iter()).map(|(q, v)| q * v).sum(),
1521 }
1522}
1523
1524fn score_from_bytes(
1525 metric: Metric,
1526 query: &[f32],
1527 query_norm: f32,
1528 vector_bytes: &[u8],
1529) -> result::Result<f32, alopex_core::Error> {
1530 let len = vector_bytes.len() / 4;
1531 #[cfg(target_endian = "little")]
1532 {
1533 let ptr = vector_bytes.as_ptr();
1534 if (ptr as usize).is_multiple_of(std::mem::align_of::<f32>()) {
1535 let items = unsafe { std::slice::from_raw_parts(ptr as *const f32, len) };
1536 return Ok(score_from_slice(metric, query, query_norm, items));
1537 }
1538 }
1539
1540 let mut iter = vector_bytes.chunks_exact(4);
1542 let score = match metric {
1543 Metric::Cosine => {
1544 if query_norm == 0.0 {
1545 0.0
1546 } else {
1547 let mut dot = 0.0;
1548 let mut item_norm_sq = 0.0;
1549 for (q, chunk) in query.iter().zip(&mut iter) {
1550 let v = f32::from_le_bytes(chunk.try_into().unwrap());
1551 dot += q * v;
1552 item_norm_sq += v * v;
1553 }
1554 let item_norm = item_norm_sq.sqrt();
1555 if item_norm == 0.0 {
1556 0.0
1557 } else {
1558 dot / (query_norm * item_norm)
1559 }
1560 }
1561 }
1562 Metric::L2 => {
1563 let mut dist_sq = 0.0;
1564 for (q, chunk) in query.iter().zip(&mut iter) {
1565 let v = f32::from_le_bytes(chunk.try_into().unwrap());
1566 let d = q - v;
1567 dist_sq += d * d;
1568 }
1569 -dist_sq.sqrt()
1570 }
1571 Metric::InnerProduct => query
1572 .iter()
1573 .zip(&mut iter)
1574 .map(|(q, chunk)| q * f32::from_le_bytes(chunk.try_into().unwrap()))
1575 .sum(),
1576 };
1577 Ok(score)
1578}
1579
1580fn encode_index(keys: &[Key]) -> result::Result<Vec<u8>, alopex_core::Error> {
1581 let mut buf = Vec::new();
1582 let count = keys.len() as u32;
1583 buf.extend_from_slice(&count.to_le_bytes());
1584 for key in keys {
1585 let len: u32 = key
1586 .len()
1587 .try_into()
1588 .map_err(|_| alopex_core::Error::InvalidFormat("key too long".into()))?;
1589 buf.extend_from_slice(&len.to_le_bytes());
1590 buf.extend_from_slice(key);
1591 }
1592 Ok(buf)
1593}
1594
1595fn decode_index(bytes: &[u8]) -> result::Result<Vec<Key>, alopex_core::Error> {
1596 if bytes.len() < 4 {
1597 return Err(alopex_core::Error::InvalidFormat("index too short".into()));
1598 }
1599 let count = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
1600 let mut pos = 4;
1601 let mut keys = Vec::with_capacity(count);
1602 for _ in 0..count {
1603 if pos + 4 > bytes.len() {
1604 return Err(alopex_core::Error::InvalidFormat("index truncated".into()));
1605 }
1606 let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
1607 pos += 4;
1608 if pos + len > bytes.len() {
1609 return Err(alopex_core::Error::InvalidFormat(
1610 "index key truncated".into(),
1611 ));
1612 }
1613 keys.push(bytes[pos..pos + len].to_vec());
1614 pos += len;
1615 }
1616 Ok(keys)
1617}
1618
1619#[cfg(test)]
1620mod tests {
1621 use super::*;
1622 use std::sync::mpsc;
1623 use std::thread;
1624 use tempfile::tempdir;
1625
1626 #[test]
1627 fn test_open_and_crud() {
1628 let dir = tempdir().unwrap();
1629 let path = dir.path().join("test.db");
1630 let db = Database::open(&path).unwrap();
1631
1632 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1633 txn.put(b"key1", b"value1").unwrap();
1634 txn.commit().unwrap();
1635
1636 let mut txn2 = db.begin(TxnMode::ReadOnly).unwrap();
1637 let val = txn2.get(b"key1").unwrap();
1638 assert_eq!(val, Some(b"value1".to_vec()));
1639 }
1640
1641 #[test]
1642 fn test_not_found() {
1643 let db = Database::new();
1644 let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
1645 let val = txn.get(b"non-existent-key").unwrap();
1646 assert!(val.is_none());
1647 }
1648
1649 #[cfg(not(target_arch = "wasm32"))]
1650 #[test]
1651 fn test_file_format_version_reads_alopex_header() {
1652 use alopex_core::storage::format::{AlopexFileWriter, FileFlags, FileVersion};
1653
1654 let dir = tempdir().unwrap();
1655 let path = dir.path().join("format-test.alopex");
1656 let expected = FileVersion::new(0, 0, 1);
1657
1658 let writer = AlopexFileWriter::new(path.clone(), expected, FileFlags(0)).unwrap();
1659 writer.finalize().unwrap();
1660
1661 let db = Database::open(&path).unwrap();
1662 assert_eq!(db.file_format_version(), expected);
1663 }
1664
1665 #[test]
1666 fn test_crash_recovery_replays_wal() {
1667 let dir = tempdir().unwrap();
1668 let path = dir.path().join("replay.db");
1669
1670 {
1671 let db = Database::open(&path).unwrap();
1672 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1673 txn.put(b"k1", b"v1").unwrap();
1674 txn.commit().unwrap();
1675
1676 let mut uncommitted = db.begin(TxnMode::ReadWrite).unwrap();
1677 uncommitted.put(b"k2", b"v2").unwrap();
1678 }
1680
1681 let db = Database::open(&path).unwrap();
1682 let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
1683 assert_eq!(txn.get(b"k1").unwrap(), Some(b"v1".to_vec()));
1684 assert_eq!(txn.get(b"k2").unwrap(), None);
1685 }
1686
1687 #[test]
1688 fn test_txn_closed() {
1689 let db = Database::new();
1690 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1691 txn.put(b"k1", b"v1").unwrap();
1692 txn.commit().unwrap();
1693 }
1699
1700 #[test]
1701 fn test_concurrency_conflict() {
1702 let db = std::sync::Arc::new(Database::new());
1703 let mut t0 = db.begin(TxnMode::ReadWrite).unwrap();
1704 t0.put(b"k1", b"v0").unwrap();
1705 t0.commit().unwrap();
1706
1707 let (tx1, rx1) = mpsc::channel();
1708 let (tx2, rx2) = mpsc::channel();
1709
1710 let db1 = db.clone();
1711 let t1 = thread::spawn(move || {
1712 let mut txn1 = db1.begin(TxnMode::ReadWrite).unwrap();
1713 let val = txn1.get(b"k1").unwrap();
1714 assert_eq!(val.unwrap(), b"v0");
1715 tx1.send(()).unwrap();
1716 rx2.recv().unwrap();
1717 txn1.put(b"k1", b"v1").unwrap();
1718 let result = txn1.commit();
1719 assert!(matches!(
1720 result,
1721 Err(Error::Core(alopex_core::Error::TxnConflict))
1722 ));
1723 });
1724
1725 let db2 = db.clone();
1726 let t2 = thread::spawn(move || {
1727 rx1.recv().unwrap();
1728 let mut txn2 = db2.begin(TxnMode::ReadWrite).unwrap();
1729 txn2.put(b"k1", b"v2").unwrap();
1730 assert!(txn2.commit().is_ok());
1731 tx2.send(()).unwrap();
1732 });
1733
1734 t1.join().unwrap();
1735 t2.join().unwrap();
1736
1737 let mut txn3 = db.begin(TxnMode::ReadOnly).unwrap();
1738 let val = txn3.get(b"k1").unwrap();
1739 assert_eq!(val.unwrap(), b"v2");
1740 }
1741
1742 #[test]
1743 fn test_flush_and_reopen_via_embedded_api() {
1744 let dir = tempdir().unwrap();
1745 let path = dir.path().join("persist.db");
1746 {
1747 let db = Database::open(&path).unwrap();
1748 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1749 txn.put(b"k1", b"v1").unwrap();
1750 txn.commit().unwrap();
1751 db.flush().unwrap();
1752 }
1753
1754 let db = Database::open(&path).unwrap();
1755 let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
1756 assert_eq!(txn.get(b"k1").unwrap(), Some(b"v1".to_vec()));
1757 }
1758
1759 #[test]
1760 fn test_large_value_blob_roundtrip() {
1761 let dir = tempdir().unwrap();
1762 let path = dir.path().join("blob.lv");
1763 let payload = b"hello large value";
1764
1765 {
1766 let db = Database::new();
1767 let mut writer = db
1768 .create_blob_writer(&path, payload.len() as u64, Some(16))
1769 .unwrap();
1770 writer.write_chunk(&payload[..5]).unwrap();
1771 writer.write_chunk(&payload[5..]).unwrap();
1772 writer.finish().unwrap();
1773 }
1774
1775 let db = Database::new();
1776 let mut reader = db.open_large_value(&path).unwrap();
1777 let mut buf = Vec::new();
1778 while let Some((_info, chunk)) = reader.next_chunk().unwrap() {
1779 buf.extend_from_slice(&chunk);
1780 }
1781 assert_eq!(buf, payload);
1782 }
1783
1784 #[test]
1785 fn upsert_and_search_same_txn() {
1786 let db = Database::new();
1787 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1788 txn.upsert_vector(b"k1", b"meta1", &[1.0, 0.0], Metric::Cosine)
1789 .unwrap();
1790
1791 let results = txn
1792 .search_similar(&[1.0, 0.0], Metric::Cosine, 1, None)
1793 .unwrap();
1794 assert_eq!(results.len(), 1);
1795 assert_eq!(results[0].key, b"k1");
1796 assert_eq!(results[0].metadata, b"meta1");
1797 txn.commit().unwrap();
1798 }
1799
1800 #[test]
1801 fn upsert_and_search_across_txn() {
1802 let db = Database::new();
1803 {
1804 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1805 txn.upsert_vector(b"k1", b"meta1", &[1.0, 1.0], Metric::Cosine)
1806 .unwrap();
1807 txn.commit().unwrap();
1808 }
1809
1810 let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
1811 let results = ro
1812 .search_similar(&[1.0, 1.0], Metric::Cosine, 1, None)
1813 .unwrap();
1814 assert_eq!(results.len(), 1);
1815 assert_eq!(results[0].key, b"k1");
1816 }
1817
1818 #[test]
1819 fn read_only_upsert_rejected() {
1820 let db = Database::new();
1821 let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
1822 let err = ro
1823 .upsert_vector(b"k1", b"m", &[1.0, 0.0], Metric::Cosine)
1824 .unwrap_err();
1825 assert!(matches!(err, Error::Core(alopex_core::Error::TxnReadOnly)));
1826 }
1827
1828 #[test]
1829 fn dimension_mismatch_on_search() {
1830 let db = Database::new();
1831 {
1832 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1833 txn.upsert_vector(b"k1", b"m", &[1.0, 0.0], Metric::Cosine)
1834 .unwrap();
1835 txn.commit().unwrap();
1836 }
1837 let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
1838 let err = ro
1839 .search_similar(&[1.0, 0.0, 1.0], Metric::Cosine, 1, None)
1840 .unwrap_err();
1841 assert!(matches!(
1842 err,
1843 Error::Core(alopex_core::Error::DimensionMismatch { .. })
1844 ));
1845 }
1846}