1#![deny(missing_docs)]
4
5pub mod catalog;
7pub mod catalog_api;
8pub mod columnar_api;
9mod dataframe_api;
10pub mod options;
11mod sql_api;
12mod txn_manager;
13
14pub use crate::catalog::{CachedTableInfo, Catalog};
15pub use crate::catalog_api::{
16 CatalogInfo, ColumnDefinition, ColumnInfo, CreateCatalogRequest, CreateNamespaceRequest,
17 CreateTableRequest, IndexInfo, NamespaceInfo, StorageInfo, TableInfo,
18};
19pub use crate::columnar_api::{
20 ColumnarIndexInfo, ColumnarIndexType, ColumnarRowIterator, EmbeddedConfig, StorageMode,
21};
22pub use crate::options::DatabaseOptions;
23pub use crate::sql_api::{SqlStreamingResult, StreamingQueryResult, StreamingRows};
24pub use crate::txn_manager::{TransactionInfo, TransactionManager};
25pub use alopex_dataframe::{DataFrame, JoinKeys, JoinType, SortOptions};
26pub use alopex_sql::{DataSourceFormat, TableType};
27pub type SqlResult = alopex_sql::SqlResult;
29use alopex_core::vector::hnsw::{HnswTransactionState, SearchStats as HnswSearchStats};
30use alopex_core::{
31 columnar::{
32 kvs_bridge::ColumnarKvsBridge, memory::InMemorySegmentStore, segment_v2::SegmentConfigV2,
33 },
34 kv::any::AnyKVTransaction,
35 kv::memory::MemoryKV,
36 kv::AnyKV,
37 validate_dimensions, HnswIndex, KVStore, KVTransaction, Key, LargeValueKind, LargeValueMeta,
38 LargeValueReader, LargeValueWriter, StorageFactory, VectorType, DEFAULT_CHUNK_SIZE,
39};
40pub use alopex_core::{HnswConfig, HnswSearchResult, HnswStats, MemoryStats, Metric, TxnMode};
41pub use alopex_sql::executor::QueryRowIterator;
43use std::collections::HashMap;
44use std::convert::TryInto;
45use std::fs;
46use std::path::{Path, PathBuf};
47use std::result;
48use std::sync::atomic::{AtomicU64, Ordering};
49use std::sync::{Arc, RwLock};
50
51pub type Result<T> = result::Result<T, Error>;
53
54#[derive(Debug, thiserror::Error)]
56pub enum Error {
57 #[error("core error: {0}")]
59 Core(#[from] alopex_core::Error),
60 #[error("{0}")]
62 Sql(#[from] alopex_sql::SqlError),
63 #[error("{0}")]
65 DataFrame(#[from] alopex_dataframe::DataFrameError),
66 #[error("transaction is completed")]
68 TxnCompleted,
69 #[error("カタログが見つかりません: {0}")]
71 CatalogNotFound(String),
72 #[error("カタログは既に存在します: {0}")]
74 CatalogAlreadyExists(String),
75 #[error("カタログが空ではありません: {0}")]
77 CatalogNotEmpty(String),
78 #[error("ネームスペースが見つかりません: {0}.{1}")]
80 NamespaceNotFound(String, String),
81 #[error("ネームスペースは既に存在します: {0}.{1}")]
83 NamespaceAlreadyExists(String, String),
84 #[error("ネームスペースが空ではありません: {0}.{1}")]
86 NamespaceNotEmpty(String, String),
87 #[error("table not found: {0}")]
89 TableNotFound(String),
90 #[error("テーブルは既に存在します: {0}")]
92 TableAlreadyExists(String),
93 #[error("インデックスが見つかりません: {0}")]
95 IndexNotFound(String),
96 #[error("default オブジェクトは削除できません: {0}")]
98 CannotDeleteDefault(String),
99 #[error("managed テーブルにはスキーマが必要です")]
101 SchemaRequired,
102 #[error("external テーブルには storage_root が必要です")]
104 StorageRootRequired,
105 #[error("トランザクションは読み取り専用です")]
107 TxnReadOnly,
108 #[error("invalid transaction id: {0}")]
110 InvalidTransactionId(String),
111 #[error("not in in-memory columnar mode")]
113 NotInMemoryMode,
114 #[error("unsupported data source format: {0}")]
116 UnsupportedDataSourceFormat(String),
117 #[error("catalog lock poisoned")]
119 CatalogLockPoisoned,
120}
121
122impl Error {
123 pub fn sql_error_code(&self) -> Option<&'static str> {
125 match self {
126 Self::Sql(err) => Some(err.code()),
127 _ => None,
128 }
129 }
130}
131
132pub struct Database {
134 pub(crate) store: Arc<AnyKV>,
136 pub(crate) sql_catalog: Arc<RwLock<alopex_sql::catalog::PersistentCatalog<AnyKV>>>,
137 pub(crate) hnsw_cache: RwLock<HashMap<String, Arc<HnswIndex>>>,
138 pub(crate) vector_cache: RwLock<Option<HashMap<Key, CachedVector>>>,
139 pub(crate) table_info_cache: RwLock<HashMap<String, CachedTableInfo>>,
141 pub(crate) table_info_cache_epoch: AtomicU64,
143 pub(crate) columnar_mode: StorageMode,
144 pub(crate) columnar_bridge: ColumnarKvsBridge,
145 pub(crate) columnar_memory: Option<InMemorySegmentStore>,
146 pub(crate) segment_config: SegmentConfigV2,
147}
148
149pub(crate) fn disk_data_dir_path(path: &Path) -> std::path::PathBuf {
150 if path.extension().is_some_and(|e| e == "alopex") {
151 path.with_extension("alopex.d")
154 } else {
155 path.to_path_buf()
156 }
157}
158
159#[cfg(not(target_arch = "wasm32"))]
160fn read_file_version_from_storage(path: &Path) -> alopex_core::storage::format::FileVersion {
161 use alopex_core::storage::format::{FileHeader, FileVersion, HEADER_SIZE};
162 use std::io::Read;
163
164 let Some(file_path) = resolve_format_file_path(path) else {
165 return FileVersion::CURRENT;
166 };
167
168 let mut header_bytes = [0u8; HEADER_SIZE];
169 let Ok(mut file) = fs::File::open(file_path) else {
170 return FileVersion::CURRENT;
171 };
172 if file.read_exact(&mut header_bytes).is_err() {
173 return FileVersion::CURRENT;
174 }
175 match FileHeader::from_bytes(&header_bytes) {
176 Ok(header) => header.version,
177 Err(_) => FileVersion::CURRENT,
178 }
179}
180
181#[cfg(target_arch = "wasm32")]
182fn read_file_version_from_storage(_path: &Path) -> alopex_core::storage::format::FileVersion {
183 alopex_core::storage::format::FileVersion::CURRENT
184}
185
186#[cfg(not(target_arch = "wasm32"))]
187fn resolve_format_file_path(path: &Path) -> Option<PathBuf> {
188 if path.is_file() {
189 return Some(path.to_path_buf());
190 }
191
192 if path.is_dir() {
193 if let Some(ext) = path.extension() {
194 if ext == "d" {
195 let candidate = path.with_extension("");
196 if candidate.is_file() {
197 return Some(candidate);
198 }
199 }
200 }
201
202 if let Ok(entries) = fs::read_dir(path) {
203 for entry in entries.flatten() {
204 let entry_path = entry.path();
205 if entry_path.extension().is_some_and(|ext| ext == "alopex") && entry_path.is_file()
206 {
207 return Some(entry_path);
208 }
209 }
210 }
211 }
212
213 None
214}
215
216impl Database {
217 pub fn open(path: &Path) -> Result<Self> {
219 let data_dir = disk_data_dir_path(path);
220 let store = StorageFactory::create(alopex_core::StorageMode::Disk {
221 path: data_dir,
222 config: None,
223 })
224 .map_err(Error::Core)?;
225 let mut db = Self::init(store, StorageMode::Disk, None, SegmentConfigV2::default());
226 db.load_sql_catalog()?;
227 Ok(db)
228 }
229
230 pub fn new() -> Self {
232 let store = AnyKV::Memory(MemoryKV::new());
233 Self::init(
234 store,
235 StorageMode::InMemory,
236 None,
237 SegmentConfigV2::default(),
238 )
239 }
240
241 pub fn open_in_memory() -> Result<Self> {
243 Self::open_in_memory_with_options(DatabaseOptions::in_memory())
244 }
245
246 pub fn open_in_memory_with_options(opts: DatabaseOptions) -> Result<Self> {
248 if !opts.memory_mode() {
249 return Err(Error::Core(alopex_core::Error::InvalidFormat(
250 "memory_mode must be enabled for in-memory open".into(),
251 )));
252 }
253 let store = StorageFactory::create(opts.to_storage_mode(None)).map_err(Error::Core)?;
254 let mut db = Self::init(
255 store,
256 StorageMode::InMemory,
257 opts.memory_limit(),
258 SegmentConfigV2::default(),
259 );
260 db.load_sql_catalog()?;
261 Ok(db)
262 }
263
264 pub fn open_with_uri(uri: &str) -> Result<Self> {
280 if uri.starts_with("s3://") {
282 #[cfg(feature = "s3")]
283 {
284 return Self::open_s3(uri);
285 }
286 #[cfg(not(feature = "s3"))]
287 {
288 return Err(Error::Core(alopex_core::Error::InvalidFormat(
289 "S3 support requires the 's3' feature".into(),
290 )));
291 }
292 }
293
294 let path = if let Some(stripped) = uri.strip_prefix("file://") {
296 stripped
297 } else {
298 uri
299 };
300
301 Self::open(Path::new(path))
302 }
303
304 #[cfg(feature = "s3")]
323 pub fn open_s3(uri: &str) -> Result<Self> {
324 let s3_config = alopex_core::S3Config::from_uri(uri).map_err(Error::Core)?;
325 let store = StorageFactory::create(alopex_core::StorageMode::S3 { config: s3_config })
326 .map_err(Error::Core)?;
327 let mut db = Self::init(store, StorageMode::Disk, None, SegmentConfigV2::default());
328 db.load_sql_catalog()?;
329 Ok(db)
330 }
331
332 pub(crate) fn init(
333 store: AnyKV,
334 columnar_mode: StorageMode,
335 memory_limit: Option<usize>,
336 segment_config: SegmentConfigV2,
337 ) -> Self {
338 let store = Arc::new(store);
339 let sql_catalog = Arc::new(RwLock::new(alopex_sql::catalog::PersistentCatalog::new(
340 store.clone(),
341 )));
342 let columnar_bridge = ColumnarKvsBridge::new(store.clone());
343 let columnar_memory = if matches!(columnar_mode, StorageMode::InMemory) {
344 Some(InMemorySegmentStore::new(memory_limit.map(|v| v as u64)))
345 } else {
346 None
347 };
348
349 Self {
350 store,
351 sql_catalog,
352 hnsw_cache: RwLock::new(HashMap::new()),
353 vector_cache: RwLock::new(None),
354 table_info_cache: RwLock::new(HashMap::new()),
355 table_info_cache_epoch: AtomicU64::new(0),
356 columnar_mode,
357 columnar_bridge,
358 columnar_memory,
359 segment_config,
360 }
361 }
362
363 fn load_sql_catalog(&mut self) -> Result<()> {
364 use alopex_sql::catalog::CatalogError;
365
366 let loaded = match alopex_sql::catalog::PersistentCatalog::load(self.store.clone()) {
367 Ok(catalog) => catalog,
368 Err(CatalogError::Kv(alopex_core::Error::NotFound)) => {
369 alopex_sql::catalog::PersistentCatalog::new(self.store.clone())
370 }
371 Err(other) => return Err(Error::Sql(other.into())),
372 };
373
374 self.sql_catalog = Arc::new(RwLock::new(loaded));
375 Ok(())
376 }
377
378 fn hnsw_cache_get(&self, name: &str) -> Option<Arc<HnswIndex>> {
379 let cache = self.hnsw_cache.read().expect("hnsw cache lock poisoned");
380 cache.get(name).cloned()
381 }
382
383 fn hnsw_cache_insert(&self, name: &str, index: HnswIndex) -> Arc<HnswIndex> {
384 let index = Arc::new(index);
385 let mut cache = self.hnsw_cache.write().expect("hnsw cache lock poisoned");
386 cache.insert(name.to_string(), Arc::clone(&index));
387 index
388 }
389
390 fn hnsw_cache_remove(&self, name: &str) {
391 let mut cache = self.hnsw_cache.write().expect("hnsw cache lock poisoned");
392 cache.remove(name);
393 }
394
395 pub fn table_info_cache_epoch(&self) -> u64 {
397 self.table_info_cache_epoch.load(Ordering::Relaxed)
398 }
399
400 pub fn get_cached_table_info(
402 &self,
403 catalog_name: &str,
404 namespace_name: &str,
405 table_name: &str,
406 ) -> Option<CachedTableInfo> {
407 let cache = self
408 .table_info_cache
409 .read()
410 .expect("table info cache lock poisoned");
411 let key = format!("{}.{}.{}", catalog_name, namespace_name, table_name);
412 cache.get(&key).cloned()
413 }
414
415 pub fn cache_table_info(
417 &self,
418 catalog_name: &str,
419 namespace_name: &str,
420 table_name: &str,
421 info: CachedTableInfo,
422 ) {
423 let mut cache = self
424 .table_info_cache
425 .write()
426 .expect("table info cache lock poisoned");
427 let key = format!("{}.{}.{}", catalog_name, namespace_name, table_name);
428 cache.insert(key, info);
429 }
430
431 pub fn invalidate_table_info_cache(&self) {
433 self.table_info_cache_epoch.fetch_add(1, Ordering::Relaxed);
434 let mut cache = self
435 .table_info_cache
436 .write()
437 .expect("table info cache lock poisoned");
438 cache.clear();
439 }
440
441 pub fn flush(&self) -> Result<()> {
443 self.store.flush().map_err(Error::Core)
444 }
445
446 pub fn file_format_version(&self) -> alopex_core::storage::format::FileVersion {
448 use alopex_core::storage::format::FileVersion;
449
450 match self.store.as_ref() {
451 AnyKV::Memory(_) => FileVersion::CURRENT,
452 AnyKV::Lsm(kv) => read_file_version_from_storage(&kv.data_dir),
453 #[cfg(feature = "s3")]
454 AnyKV::S3(kv) => read_file_version_from_storage(kv.cache_dir()),
455 }
456 }
457
458 pub fn memory_usage(&self) -> Option<MemoryStats> {
460 match self.store.as_ref() {
461 AnyKV::Memory(kv) => Some(kv.memory_stats()),
462 AnyKV::Lsm(_) => None,
463 #[cfg(feature = "s3")]
464 AnyKV::S3(_) => None,
465 }
466 }
467
468 pub fn persist_to_disk(&self, wal_path: &Path) -> Result<()> {
472 if !matches!(self.store.as_ref(), AnyKV::Memory(_)) {
473 return Err(Error::NotInMemoryMode);
474 }
475 let data_dir = disk_data_dir_path(wal_path);
476 if wal_path.exists() || data_dir.exists() {
477 return Err(Error::Core(alopex_core::Error::PathExists(
478 wal_path.to_path_buf(),
479 )));
480 }
481
482 let tmp_dir = data_dir.with_extension("tmp");
483 if tmp_dir.exists() {
484 return Err(Error::Core(alopex_core::Error::PathExists(tmp_dir)));
485 }
486
487 let snapshot = self.snapshot_pairs()?;
488 let write_result = (|| -> Result<()> {
489 let store = StorageFactory::create(alopex_core::StorageMode::Disk {
490 path: tmp_dir.clone(),
491 config: None,
492 })
493 .map_err(Error::Core)?;
494
495 let mut txn = store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
496 for (key, value) in snapshot {
497 txn.put(key, value).map_err(Error::Core)?;
498 }
499 txn.commit_self().map_err(Error::Core)?;
500
501 Ok(())
502 })();
503
504 if let Err(e) = write_result {
505 let _ = fs::remove_dir_all(&tmp_dir);
506 return Err(e);
507 }
508
509 fs::rename(&tmp_dir, &data_dir).map_err(|e| Error::Core(e.into()))?;
510 if wal_path.extension().is_some_and(|e| e == "alopex") {
511 let _ = fs::OpenOptions::new()
513 .create_new(true)
514 .write(true)
515 .open(wal_path);
516 }
517 Ok(())
518 }
519
520 pub fn clone_to_memory(&self) -> Result<Self> {
522 let snapshot = self.snapshot_pairs()?;
523 let cloned = Database::open_in_memory()?;
524 if snapshot.is_empty() {
525 return Ok(cloned);
526 }
527
528 let mut txn = cloned.begin(TxnMode::ReadWrite)?;
529 for (key, value) in snapshot {
530 txn.put(&key, &value)?;
531 }
532 txn.commit()?;
533 Ok(cloned)
534 }
535
536 pub fn clear(&self) -> Result<()> {
538 let keys: Vec<Key> = self.snapshot_pairs()?.into_iter().map(|(k, _)| k).collect();
539 if keys.is_empty() {
540 return Ok(());
541 }
542 let mut txn = self.begin(TxnMode::ReadWrite)?;
543 for key in keys {
544 txn.delete(&key)?;
545 }
546 txn.commit()
547 }
548
549 pub fn set_memory_limit(&self, bytes: Option<usize>) {
551 if let AnyKV::Memory(kv) = self.store.as_ref() {
552 kv.txn_manager().set_memory_limit(bytes);
553 }
554 }
555
556 pub fn snapshot(&self) -> Vec<(Key, Vec<u8>)> {
558 self.snapshot_pairs().unwrap_or_default()
559 }
560
561 fn snapshot_pairs(&self) -> Result<Vec<(Key, Vec<u8>)>> {
562 let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
563 let pairs: Vec<(Key, Vec<u8>)> = txn.scan_prefix(b"").map_err(Error::Core)?.collect();
564 txn.commit_self().map_err(Error::Core)?;
565 Ok(pairs)
566 }
567
568 pub fn create_hnsw_index(&self, name: &str, config: HnswConfig) -> Result<()> {
570 let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
571 let index = HnswIndex::create(name, config).map_err(Error::Core)?;
572 index.save(&mut txn).map_err(Error::Core)?;
573 txn.commit_self().map_err(Error::Core)?;
574 self.hnsw_cache_insert(name, index);
575 Ok(())
576 }
577
578 pub fn drop_hnsw_index(&self, name: &str) -> Result<()> {
580 let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
581 let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
582 index.drop(&mut txn).map_err(Error::Core)?;
583 txn.commit_self().map_err(Error::Core)?;
584 self.hnsw_cache_remove(name);
585 Ok(())
586 }
587
588 pub fn get_hnsw_stats(&self, name: &str) -> Result<HnswStats> {
590 if let Some(index) = self.hnsw_cache_get(name) {
591 return Ok(index.stats());
592 }
593 let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
594 let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
595 let stats = index.stats();
596 self.hnsw_cache_insert(name, index);
597 Ok(stats)
598 }
599
600 pub fn compact_hnsw_index(&self, name: &str) -> Result<alopex_core::vector::CompactionResult> {
602 let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
603 let mut index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
604 let result = index.compact().map_err(Error::Core)?;
605 index.save(&mut txn).map_err(Error::Core)?;
606 txn.commit_self().map_err(Error::Core)?;
607 self.hnsw_cache_insert(name, index);
608 Ok(result)
609 }
610
611 pub fn search_hnsw(
613 &self,
614 name: &str,
615 query: &[f32],
616 k: usize,
617 ef_search: Option<usize>,
618 ) -> Result<(Vec<HnswSearchResult>, HnswSearchStats)> {
619 let profile = std::env::var_os("ALOPEX_PROFILE_HNSW").is_some();
620 let total_start = if profile {
621 Some(std::time::Instant::now())
622 } else {
623 None
624 };
625 if let Some(index) = self.hnsw_cache_get(name) {
626 let search_start = if profile {
627 Some(std::time::Instant::now())
628 } else {
629 None
630 };
631 let result = index.search(query, k, ef_search).map_err(Error::Core)?;
632 if let (true, Some(total_start), Some(search_start)) =
633 (profile, total_start, search_start)
634 {
635 let search_time = search_start.elapsed();
636 let total_time = total_start.elapsed();
637 eprintln!(
638 "alopex.hnsw_search cache=hit name={} k={} ef_search={:?} search_ms={:.2} total_ms={:.2}",
639 name,
640 k,
641 ef_search,
642 search_time.as_secs_f64() * 1000.0,
643 total_time.as_secs_f64() * 1000.0
644 );
645 }
646 return Ok(result);
647 }
648
649 let load_start = if profile {
650 Some(std::time::Instant::now())
651 } else {
652 None
653 };
654 let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
655 let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
656 let load_time = load_start.map(|start| start.elapsed());
657 let index = self.hnsw_cache_insert(name, index);
658 let search_start = if profile {
659 Some(std::time::Instant::now())
660 } else {
661 None
662 };
663 let result = index.search(query, k, ef_search).map_err(Error::Core)?;
664 if let (true, Some(total_start), Some(search_start)) = (profile, total_start, search_start)
665 {
666 let search_time = search_start.elapsed();
667 let total_time = total_start.elapsed();
668 let load_time_ms = load_time
669 .map(|elapsed| elapsed.as_secs_f64() * 1000.0)
670 .unwrap_or(0.0);
671 eprintln!(
672 "alopex.hnsw_search cache=miss name={} k={} ef_search={:?} load_ms={:.2} search_ms={:.2} total_ms={:.2}",
673 name,
674 k,
675 ef_search,
676 load_time_ms,
677 search_time.as_secs_f64() * 1000.0,
678 total_time.as_secs_f64() * 1000.0
679 );
680 }
681 Ok(result)
682 }
683
684 pub fn create_blob_writer(
686 &self,
687 path: &Path,
688 total_len: u64,
689 chunk_size: Option<u32>,
690 ) -> Result<LargeValueWriter> {
691 let meta = LargeValueMeta {
692 kind: LargeValueKind::Blob,
693 total_len,
694 chunk_size: chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE),
695 };
696 LargeValueWriter::create(path, meta).map_err(Error::Core)
697 }
698
699 pub fn create_typed_writer(
701 &self,
702 path: &Path,
703 type_id: u16,
704 total_len: u64,
705 chunk_size: Option<u32>,
706 ) -> Result<LargeValueWriter> {
707 let meta = LargeValueMeta {
708 kind: LargeValueKind::Typed(type_id),
709 total_len,
710 chunk_size: chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE),
711 };
712 LargeValueWriter::create(path, meta).map_err(Error::Core)
713 }
714
715 pub fn open_large_value(&self, path: &Path) -> Result<LargeValueReader> {
717 LargeValueReader::open(path).map_err(Error::Core)
718 }
719
720 pub fn begin(&self, mode: TxnMode) -> Result<Transaction<'_>> {
722 let txn = self.store.begin(mode).map_err(Error::Core)?;
723 Ok(Transaction {
724 inner: Some(txn),
725 db: self,
726 hnsw_indices: HashMap::new(),
727 overlay: alopex_sql::catalog::CatalogOverlay::new(),
728 vector_cache_updates: HashMap::new(),
729 vector_cache_deletes: Vec::new(),
730 vector_cache_invalidated: false,
731 catalog_modified: false,
732 })
733 }
734}
735
736impl Default for Database {
737 fn default() -> Self {
738 Self::new()
739 }
740}
741
742pub struct Transaction<'a> {
744 inner: Option<AnyKVTransaction<'a>>,
745 db: &'a Database,
746 hnsw_indices: HashMap<String, (HnswIndex, alopex_core::vector::hnsw::HnswTransactionState)>,
747 overlay: alopex_sql::catalog::CatalogOverlay,
748 vector_cache_updates: HashMap<Key, CachedVector>,
749 vector_cache_deletes: Vec<Key>,
750 vector_cache_invalidated: bool,
751 pub(crate) catalog_modified: bool,
753}
754
755#[derive(Debug, Clone, PartialEq)]
757pub struct SearchResult {
758 pub key: Key,
760 pub metadata: Vec<u8>,
762 pub score: f32,
764}
765
766const VECTOR_INDEX_KEY: &[u8] = b"__alopex_vector_index";
767
768impl<'a> Transaction<'a> {
769 pub(crate) fn catalog_overlay(&self) -> &alopex_sql::catalog::CatalogOverlay {
770 &self.overlay
771 }
772
773 pub(crate) fn catalog_overlay_mut(&mut self) -> &mut alopex_sql::catalog::CatalogOverlay {
774 &mut self.overlay
775 }
776
777 pub(crate) fn txn_mode(&self) -> Result<TxnMode> {
778 let txn = self.inner.as_ref().ok_or(Error::TxnCompleted)?;
779 Ok(txn.mode())
780 }
781 pub fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
783 self.inner_mut()?.get(&key.to_vec()).map_err(Error::Core)
784 }
785
786 pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
788 self.vector_cache_invalidated = true;
789 self.vector_cache_updates.clear();
790 self.vector_cache_deletes.clear();
791 self.inner_mut()?
792 .put(key.to_vec(), value.to_vec())
793 .map_err(Error::Core)
794 }
795
796 pub fn delete(&mut self, key: &[u8]) -> Result<()> {
798 self.vector_cache_deletes.push(key.to_vec());
799 self.inner_mut()?.delete(key.to_vec()).map_err(Error::Core)
800 }
801
802 pub fn scan_prefix(
806 &mut self,
807 prefix: &[u8],
808 ) -> Result<Box<dyn Iterator<Item = (Key, Vec<u8>)> + '_>> {
809 self.inner_mut()?.scan_prefix(prefix).map_err(Error::Core)
810 }
811
812 pub fn upsert_to_hnsw(
814 &mut self,
815 index_name: &str,
816 key: &[u8],
817 vector: &[f32],
818 metadata: &[u8],
819 ) -> Result<()> {
820 self.ensure_write_txn()?;
821 let (index, state) = self.hnsw_entry_mut(index_name)?;
822 index
823 .upsert_staged(key, vector, metadata, state)
824 .map_err(Error::Core)
825 }
826
827 pub fn delete_from_hnsw(&mut self, index_name: &str, key: &[u8]) -> Result<bool> {
829 self.ensure_write_txn()?;
830 let (index, state) = self.hnsw_entry_mut(index_name)?;
831 index.delete_staged(key, state).map_err(Error::Core)
832 }
833
834 pub fn upsert_vector(
838 &mut self,
839 key: &[u8],
840 metadata: &[u8],
841 vector: &[f32],
842 metric: Metric,
843 ) -> Result<()> {
844 if vector.is_empty() {
845 return Err(Error::Core(alopex_core::Error::InvalidFormat(
846 "vector cannot be empty".into(),
847 )));
848 }
849 let vt = VectorType::new(vector.len(), metric);
850 vt.validate(vector).map_err(Error::Core)?;
851
852 let payload = encode_vector_entry(vt, metadata, vector);
853 let txn = self.inner_mut()?;
854 txn.put(key.to_vec(), payload).map_err(Error::Core)?;
855
856 let mut keys = self.load_vector_index()?;
857 if !keys.iter().any(|k| k == key) {
858 keys.push(key.to_vec());
859 self.persist_vector_index(&keys)?;
860 }
861
862 let cached = cached_vector_from_entry(metric, metadata.to_vec(), vector.to_vec());
863 self.vector_cache_updates.insert(key.to_vec(), cached);
864 self.vector_cache_deletes.retain(|k| k != key);
865 Ok(())
866 }
867
868 pub fn get_vector(&mut self, key: &[u8], metric: Metric) -> Result<Option<Vec<f32>>> {
873 let txn = self.inner_mut()?;
874 let key_vec = key.to_vec();
875 let Some(raw) = txn.get(&key_vec).map_err(Error::Core)? else {
876 return Ok(None);
877 };
878 let decoded = decode_vector_entry(&raw).map_err(Error::Core)?;
879 if decoded.metric != metric {
880 return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
881 metric: metric.as_str().to_string(),
882 }));
883 }
884 Ok(Some(decoded.vector))
885 }
886
887 pub fn get_vectors(&mut self, keys: &[Key], metric: Metric) -> Result<Vec<Option<Vec<f32>>>> {
895 let txn = self.inner_mut()?;
896 let mut out = Vec::with_capacity(keys.len());
897 for key in keys {
898 let Some(raw) = txn.get(key).map_err(Error::Core)? else {
899 out.push(None);
900 continue;
901 };
902 let decoded = decode_vector_entry(&raw).map_err(Error::Core)?;
903 if decoded.metric != metric {
904 return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
905 metric: metric.as_str().to_string(),
906 }));
907 }
908 out.push(Some(decoded.vector));
909 }
910 Ok(out)
911 }
912
913 pub fn search_similar(
918 &mut self,
919 query_vector: &[f32],
920 metric: Metric,
921 top_k: usize,
922 filter_keys: Option<&[Key]>,
923 ) -> Result<Vec<SearchResult>> {
924 if top_k == 0 {
925 return Ok(Vec::new());
926 }
927
928 let profile = std::env::var_os("ALOPEX_PROFILE_SEARCH_SIMILAR").is_some();
929 let total_start = if profile {
930 Some(std::time::Instant::now())
931 } else {
932 None
933 };
934 let query_norm_sq = query_vector.iter().map(|v| v * v).sum::<f32>();
935 let query_norm = if matches!(metric, Metric::Cosine) {
936 query_norm_sq.sqrt()
937 } else {
938 0.0
939 };
940 let inv_query_norm = if query_norm == 0.0 {
941 0.0
942 } else {
943 1.0 / query_norm
944 };
945
946 if filter_keys.is_none() && self.txn_mode()? == TxnMode::ReadOnly {
947 let cache = self
948 .db
949 .vector_cache
950 .read()
951 .expect("vector cache lock poisoned");
952 if let Some(cache) = cache.as_ref() {
953 if cache.is_empty() {
954 return Ok(Vec::new());
955 }
956 let keys_len = cache.len();
957 let mut rows = Vec::with_capacity(keys_len);
958 let mut score_time = std::time::Duration::ZERO;
959 for (key, cached) in cache.iter() {
960 if cached.metric != metric {
961 return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
962 metric: metric.as_str().to_string(),
963 }));
964 }
965 validate_dimensions(cached.vector.len(), query_vector.len())
966 .map_err(Error::Core)?;
967 let score_start = if profile {
968 Some(std::time::Instant::now())
969 } else {
970 None
971 };
972 let dot = dot_product(query_vector, &cached.vector);
973 let score = match metric {
974 Metric::Cosine => {
975 if cached.inv_norm == 0.0 || inv_query_norm == 0.0 {
976 0.0
977 } else {
978 dot * cached.inv_norm * inv_query_norm
979 }
980 }
981 Metric::L2 => {
982 let dist_sq = query_norm_sq + cached.norm_sq - 2.0 * dot;
983 -dist_sq.sqrt()
984 }
985 Metric::InnerProduct => dot,
986 };
987 if let Some(score_start) = score_start {
988 score_time += score_start.elapsed();
989 }
990 rows.push(SearchResult {
991 key: key.clone(),
992 metadata: cached.metadata.clone(),
993 score,
994 });
995 }
996
997 let rows_total = rows.len();
998 let sort_start = if profile {
999 Some(std::time::Instant::now())
1000 } else {
1001 None
1002 };
1003 if rows.len() > top_k {
1004 rows.select_nth_unstable_by(top_k - 1, |a, b| {
1005 b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key))
1006 });
1007 rows.truncate(top_k);
1008 }
1009 rows.sort_by(|a, b| b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key)));
1010 if let (true, Some(total_start), Some(sort_start)) =
1011 (profile, total_start, sort_start)
1012 {
1013 let sort_time = sort_start.elapsed();
1014 let total_time = total_start.elapsed();
1015 eprintln!(
1016 "alopex.search_similar keys={} results={} top_k={} load_keys_ms={:.2} get_ms={:.2} decode_ms={:.2} score_ms={:.2} sort_ms={:.2} total_ms={:.2}",
1017 keys_len,
1018 rows_total,
1019 top_k,
1020 0.0,
1021 0.0,
1022 0.0,
1023 score_time.as_secs_f64() * 1000.0,
1024 sort_time.as_secs_f64() * 1000.0,
1025 total_time.as_secs_f64() * 1000.0
1026 );
1027 }
1028 return Ok(rows);
1029 }
1030 }
1031
1032 let (keys, load_keys_time) = if profile {
1033 let start = std::time::Instant::now();
1034 let keys = match filter_keys {
1035 Some(keys) => keys.to_vec(),
1036 None => self.load_vector_index()?,
1037 };
1038 (keys, start.elapsed())
1039 } else {
1040 let keys = match filter_keys {
1041 Some(keys) => keys.to_vec(),
1042 None => self.load_vector_index()?,
1043 };
1044 (keys, std::time::Duration::ZERO)
1045 };
1046 if keys.is_empty() {
1047 return Ok(Vec::new());
1048 }
1049
1050 let keys_len = keys.len();
1051 let mut rows = Vec::with_capacity(keys.len());
1052 let txn = self.inner_mut()?;
1053 let mut get_time = std::time::Duration::ZERO;
1054 let mut decode_time = std::time::Duration::ZERO;
1055 let mut score_time = std::time::Duration::ZERO;
1056 if profile {
1057 for key in keys {
1058 let get_start = std::time::Instant::now();
1059 let Some(raw) = txn.get(&key).map_err(Error::Core)? else {
1060 get_time += get_start.elapsed();
1061 continue;
1062 };
1063 get_time += get_start.elapsed();
1064 let decode_start = std::time::Instant::now();
1065 let decoded = decode_vector_entry_view(&raw).map_err(Error::Core)?;
1066 decode_time += decode_start.elapsed();
1067 if decoded.metric != metric {
1068 return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
1069 metric: metric.as_str().to_string(),
1070 }));
1071 }
1072 validate_dimensions(decoded.dim, query_vector.len()).map_err(Error::Core)?;
1073 let score_start = std::time::Instant::now();
1074 let score =
1075 score_from_bytes(metric, query_vector, query_norm, decoded.vector_bytes)?;
1076 score_time += score_start.elapsed();
1077 rows.push(SearchResult {
1078 key,
1079 metadata: decoded.metadata,
1080 score,
1081 });
1082 }
1083 } else {
1084 for key in keys {
1085 let Some(raw) = txn.get(&key).map_err(Error::Core)? else {
1086 continue;
1087 };
1088 let decoded = decode_vector_entry_view(&raw).map_err(Error::Core)?;
1089 if decoded.metric != metric {
1090 return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
1091 metric: metric.as_str().to_string(),
1092 }));
1093 }
1094 validate_dimensions(decoded.dim, query_vector.len()).map_err(Error::Core)?;
1095 let score =
1096 score_from_bytes(metric, query_vector, query_norm, decoded.vector_bytes)?;
1097 rows.push(SearchResult {
1098 key,
1099 metadata: decoded.metadata,
1100 score,
1101 });
1102 }
1103 }
1104
1105 let rows_total = rows.len();
1106 let sort_start = if profile {
1107 Some(std::time::Instant::now())
1108 } else {
1109 None
1110 };
1111 if rows.len() > top_k {
1112 rows.select_nth_unstable_by(top_k - 1, |a, b| {
1113 b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key))
1114 });
1115 rows.truncate(top_k);
1116 }
1117 rows.sort_by(|a, b| b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key)));
1118 if let (true, Some(total_start), Some(sort_start)) = (profile, total_start, sort_start) {
1119 let sort_time = sort_start.elapsed();
1120 let total_time = total_start.elapsed();
1121 eprintln!(
1122 "alopex.search_similar keys={} results={} top_k={} load_keys_ms={:.2} get_ms={:.2} decode_ms={:.2} score_ms={:.2} sort_ms={:.2} total_ms={:.2}",
1123 keys_len,
1124 rows_total,
1125 top_k,
1126 load_keys_time.as_secs_f64() * 1000.0,
1127 get_time.as_secs_f64() * 1000.0,
1128 decode_time.as_secs_f64() * 1000.0,
1129 score_time.as_secs_f64() * 1000.0,
1130 sort_time.as_secs_f64() * 1000.0,
1131 total_time.as_secs_f64() * 1000.0
1132 );
1133 }
1134 Ok(rows)
1135 }
1136
1137 fn load_vector_index(&mut self) -> Result<Vec<Key>> {
1138 let txn = self.inner_mut()?;
1139 let Some(raw) = txn.get(&VECTOR_INDEX_KEY.to_vec()).map_err(Error::Core)? else {
1140 return Ok(Vec::new());
1141 };
1142 decode_index(&raw).map_err(Error::Core)
1143 }
1144
1145 fn persist_vector_index(&mut self, keys: &[Key]) -> Result<()> {
1146 let txn = self.inner_mut()?;
1147 let encoded = encode_index(keys)?;
1148 txn.put(VECTOR_INDEX_KEY.to_vec(), encoded)
1149 .map_err(Error::Core)
1150 }
1151
1152 pub fn commit(mut self) -> Result<()> {
1154 {
1155 let txn = self.inner.as_mut().ok_or(Error::TxnCompleted)?;
1156 for (_, (index, state)) in self.hnsw_indices.iter_mut() {
1157 index.commit_staged(txn, state).map_err(Error::Core)?;
1158 }
1159 let mut catalog = self.db.sql_catalog.write().expect("catalog lock poisoned");
1160 catalog
1161 .persist_overlay(txn, &self.overlay)
1162 .map_err(|err| Error::Sql(err.into()))?;
1163
1164 let vector_cache_invalidated = self.vector_cache_invalidated;
1165 let vector_cache_updates = std::mem::take(&mut self.vector_cache_updates);
1166 let vector_cache_deletes = std::mem::take(&mut self.vector_cache_deletes);
1167 if vector_cache_invalidated {
1168 let mut cache = self
1169 .db
1170 .vector_cache
1171 .write()
1172 .expect("vector cache lock poisoned");
1173 *cache = None;
1174 } else if !vector_cache_updates.is_empty() || !vector_cache_deletes.is_empty() {
1175 let needs_rebuild = {
1176 let cache = self
1177 .db
1178 .vector_cache
1179 .read()
1180 .expect("vector cache lock poisoned");
1181 cache.is_none()
1182 };
1183 if needs_rebuild {
1184 let rebuilt = build_vector_cache_from_txn(txn).map_err(Error::Core)?;
1185 let mut cache = self
1186 .db
1187 .vector_cache
1188 .write()
1189 .expect("vector cache lock poisoned");
1190 *cache = Some(rebuilt);
1191 } else {
1192 let mut cache = self
1193 .db
1194 .vector_cache
1195 .write()
1196 .expect("vector cache lock poisoned");
1197 if let Some(cache) = cache.as_mut() {
1198 for key in vector_cache_deletes {
1199 cache.remove(&key);
1200 }
1201 for (key, cached) in vector_cache_updates {
1202 cache.insert(key, cached);
1203 }
1204 }
1205 }
1206 }
1207 }
1208 let txn = self.inner.take().ok_or(Error::TxnCompleted)?;
1209 let hnsw_indices = std::mem::take(&mut self.hnsw_indices);
1210 txn.commit_self().map_err(Error::Core)?;
1211 if !hnsw_indices.is_empty() {
1212 let mut cache = self
1213 .db
1214 .hnsw_cache
1215 .write()
1216 .expect("hnsw cache lock poisoned");
1217 for (name, (index, _state)) in hnsw_indices {
1218 cache.insert(name, Arc::new(index));
1219 }
1220 }
1221
1222 let overlay = std::mem::take(&mut self.overlay);
1224 let catalog_modified = self.catalog_modified;
1225 let mut catalog = self.db.sql_catalog.write().expect("catalog lock poisoned");
1226 catalog.apply_overlay(overlay);
1227 drop(catalog); if catalog_modified {
1230 self.db.invalidate_table_info_cache();
1231 }
1232 Ok(())
1233 }
1234
1235 pub fn rollback_in_place(&mut self) -> Result<()> {
1237 let txn = self.inner.as_mut().ok_or(Error::TxnCompleted)?;
1238 txn.rollback_in_place().map_err(Error::Core)?;
1239 for (_, (index, state)) in self.hnsw_indices.iter_mut() {
1240 let _ = index.rollback(state);
1241 }
1242 self.hnsw_indices.clear();
1243 self.overlay = alopex_sql::catalog::CatalogOverlay::default();
1244 self.inner = None;
1245 Ok(())
1246 }
1247
1248 pub fn rollback(mut self) -> Result<()> {
1250 if let Some(txn) = self.inner.take() {
1251 for (_, (index, state)) in self.hnsw_indices.iter_mut() {
1252 let _ = index.rollback(state);
1253 }
1254 self.hnsw_indices.clear();
1255 txn.rollback_self().map_err(Error::Core)
1256 } else {
1257 Err(Error::TxnCompleted)
1258 }
1259 }
1260
1261 fn inner_mut(&mut self) -> Result<&mut AnyKVTransaction<'a>> {
1262 self.inner.as_mut().ok_or(Error::TxnCompleted)
1263 }
1264
1265 fn hnsw_entry_mut(&mut self, name: &str) -> Result<&mut (HnswIndex, HnswTransactionState)> {
1266 if !self.hnsw_indices.contains_key(name) {
1267 let index = {
1268 let txn = self.inner_mut()?;
1269 HnswIndex::load(name, txn).map_err(Error::Core)?
1270 };
1271 self.hnsw_indices
1272 .insert(name.to_string(), (index, HnswTransactionState::default()));
1273 }
1274 Ok(self.hnsw_indices.get_mut(name).unwrap())
1275 }
1276
1277 fn ensure_write_txn(&self) -> Result<()> {
1278 let txn = self.inner.as_ref().ok_or(Error::TxnCompleted)?;
1279 if txn.mode() != TxnMode::ReadWrite {
1280 return Err(Error::Core(alopex_core::Error::TxnReadOnly));
1281 }
1282 Ok(())
1283 }
1284}
1285
1286impl<'a> Drop for Transaction<'a> {
1287 fn drop(&mut self) {
1288 if let Some(txn) = self.inner.take() {
1289 for (_, (index, state)) in self.hnsw_indices.iter_mut() {
1290 let _ = index.rollback(state);
1291 }
1292 self.hnsw_indices.clear();
1293 let _ = txn.rollback_self();
1294 }
1295 }
1296}
1297
1298fn metric_to_byte(metric: Metric) -> u8 {
1299 match metric {
1300 Metric::Cosine => 0,
1301 Metric::L2 => 1,
1302 Metric::InnerProduct => 2,
1303 }
1304}
1305
1306fn byte_to_metric(byte: u8) -> result::Result<Metric, alopex_core::Error> {
1307 match byte {
1308 0 => Ok(Metric::Cosine),
1309 1 => Ok(Metric::L2),
1310 2 => Ok(Metric::InnerProduct),
1311 other => Err(alopex_core::Error::UnsupportedMetric {
1312 metric: format!("unknown({other})"),
1313 }),
1314 }
1315}
1316
1317fn encode_vector_entry(vector_type: VectorType, metadata: &[u8], vector: &[f32]) -> Vec<u8> {
1318 let dim = vector_type.dim() as u32;
1319 let meta_len = metadata.len() as u32;
1320 let mut buf = Vec::with_capacity(1 + 4 + 4 + metadata.len() + std::mem::size_of_val(vector));
1321 buf.push(metric_to_byte(vector_type.metric()));
1322 buf.extend_from_slice(&dim.to_le_bytes());
1323 buf.extend_from_slice(&meta_len.to_le_bytes());
1324 buf.extend_from_slice(metadata);
1325 for v in vector {
1326 buf.extend_from_slice(&v.to_le_bytes());
1327 }
1328 buf
1329}
1330
1331struct DecodedEntry {
1332 metric: Metric,
1333 vector: Vec<f32>,
1334}
1335
1336#[derive(Clone)]
1337struct CachedVector {
1338 metric: Metric,
1339 metadata: Vec<u8>,
1340 vector: Vec<f32>,
1341 norm_sq: f32,
1342 inv_norm: f32,
1343}
1344
1345struct VectorEntryView<'a> {
1346 metric: Metric,
1347 dim: usize,
1348 metadata: Vec<u8>,
1349 vector_bytes: &'a [u8],
1350}
1351
1352fn decode_vector_entry(bytes: &[u8]) -> result::Result<DecodedEntry, alopex_core::Error> {
1353 if bytes.len() < 9 {
1354 return Err(alopex_core::Error::InvalidFormat(
1355 "vector entry too short".into(),
1356 ));
1357 }
1358 let metric = byte_to_metric(bytes[0])?;
1359 let dim = u32::from_le_bytes(bytes[1..5].try_into().unwrap()) as usize;
1360 let meta_len = u32::from_le_bytes(bytes[5..9].try_into().unwrap()) as usize;
1361
1362 let header = 9;
1363 let expected_len = header + meta_len + dim * std::mem::size_of::<f32>();
1364 if bytes.len() < expected_len {
1365 return Err(alopex_core::Error::InvalidFormat(
1366 "vector entry truncated".into(),
1367 ));
1368 }
1369
1370 let mut vector = Vec::with_capacity(dim);
1371 let vec_bytes = &bytes[header + meta_len..expected_len];
1372 for chunk in vec_bytes.chunks_exact(4) {
1373 vector.push(f32::from_le_bytes(chunk.try_into().unwrap()));
1374 }
1375
1376 Ok(DecodedEntry { metric, vector })
1377}
1378
1379fn decode_vector_entry_view(
1380 bytes: &[u8],
1381) -> result::Result<VectorEntryView<'_>, alopex_core::Error> {
1382 if bytes.len() < 9 {
1383 return Err(alopex_core::Error::InvalidFormat(
1384 "vector entry too short".into(),
1385 ));
1386 }
1387 let metric = byte_to_metric(bytes[0])?;
1388 let dim = u32::from_le_bytes(bytes[1..5].try_into().unwrap()) as usize;
1389 let meta_len = u32::from_le_bytes(bytes[5..9].try_into().unwrap()) as usize;
1390
1391 let header = 9;
1392 let expected_len = header + meta_len + dim * std::mem::size_of::<f32>();
1393 if bytes.len() < expected_len {
1394 return Err(alopex_core::Error::InvalidFormat(
1395 "vector entry truncated".into(),
1396 ));
1397 }
1398
1399 let metadata = bytes[header..header + meta_len].to_vec();
1400 let vector_bytes = &bytes[header + meta_len..expected_len];
1401
1402 Ok(VectorEntryView {
1403 metric,
1404 dim,
1405 metadata,
1406 vector_bytes,
1407 })
1408}
1409
1410fn vector_bytes_to_vec(bytes: &[u8]) -> Vec<f32> {
1411 let mut vector = Vec::with_capacity(bytes.len() / 4);
1412 for chunk in bytes.chunks_exact(4) {
1413 vector.push(f32::from_le_bytes(chunk.try_into().unwrap()));
1414 }
1415 vector
1416}
1417
1418fn cached_vector_from_entry(metric: Metric, metadata: Vec<u8>, vector: Vec<f32>) -> CachedVector {
1419 let norm_sq = vector.iter().map(|v| v * v).sum::<f32>();
1420 let inv_norm = if norm_sq == 0.0 {
1421 0.0
1422 } else {
1423 1.0 / norm_sq.sqrt()
1424 };
1425 CachedVector {
1426 metric,
1427 metadata,
1428 vector,
1429 norm_sq,
1430 inv_norm,
1431 }
1432}
1433
1434fn build_vector_cache_from_txn<'a>(
1435 txn: &mut AnyKVTransaction<'a>,
1436) -> result::Result<HashMap<Key, CachedVector>, alopex_core::Error> {
1437 let Some(raw) = txn.get(&VECTOR_INDEX_KEY.to_vec())? else {
1438 return Ok(HashMap::new());
1439 };
1440 let keys = decode_index(&raw)?;
1441 let mut cache = HashMap::with_capacity(keys.len());
1442 for key in keys {
1443 let Some(raw) = txn.get(&key)? else {
1444 continue;
1445 };
1446 let decoded = decode_vector_entry_view(&raw)?;
1447 let vector = vector_bytes_to_vec(decoded.vector_bytes);
1448 let cached = cached_vector_from_entry(decoded.metric, decoded.metadata, vector);
1449 cache.insert(key, cached);
1450 }
1451 Ok(cache)
1452}
1453
1454fn dot_product(query: &[f32], item: &[f32]) -> f32 {
1455 #[cfg(target_arch = "x86_64")]
1456 {
1457 if std::is_x86_feature_detected!("avx") {
1458 unsafe {
1460 return dot_product_avx(query, item);
1461 }
1462 }
1463 }
1464 dot_product_scalar(query, item)
1465}
1466
1467fn dot_product_scalar(query: &[f32], item: &[f32]) -> f32 {
1468 query.iter().zip(item.iter()).map(|(q, v)| q * v).sum()
1469}
1470
1471#[cfg(target_arch = "x86_64")]
1472#[target_feature(enable = "avx")]
1473unsafe fn dot_product_avx(query: &[f32], item: &[f32]) -> f32 {
1474 use std::arch::x86_64::*;
1475
1476 let len = query.len();
1477 let mut i = 0;
1478 let mut acc = _mm256_setzero_ps();
1479 let q_ptr = query.as_ptr();
1480 let v_ptr = item.as_ptr();
1481 while i + 8 <= len {
1482 let q = _mm256_loadu_ps(q_ptr.add(i));
1483 let v = _mm256_loadu_ps(v_ptr.add(i));
1484 acc = _mm256_add_ps(acc, _mm256_mul_ps(q, v));
1485 i += 8;
1486 }
1487
1488 let mut tmp = [0f32; 8];
1489 _mm256_storeu_ps(tmp.as_mut_ptr(), acc);
1490 let mut sum = tmp.iter().sum::<f32>();
1491 while i < len {
1492 sum += *q_ptr.add(i) * *v_ptr.add(i);
1493 i += 1;
1494 }
1495 sum
1496}
1497
1498fn score_from_slice(metric: Metric, query: &[f32], query_norm: f32, item: &[f32]) -> f32 {
1499 match metric {
1500 Metric::Cosine => {
1501 if query_norm == 0.0 {
1502 return 0.0;
1503 }
1504 let mut dot = 0.0;
1505 let mut item_norm_sq = 0.0;
1506 for (q, v) in query.iter().zip(item.iter()) {
1507 dot += q * v;
1508 item_norm_sq += v * v;
1509 }
1510 let item_norm = item_norm_sq.sqrt();
1511 if item_norm == 0.0 {
1512 0.0
1513 } else {
1514 dot / (query_norm * item_norm)
1515 }
1516 }
1517 Metric::L2 => {
1518 let mut dist_sq = 0.0;
1519 for (q, v) in query.iter().zip(item.iter()) {
1520 let d = q - v;
1521 dist_sq += d * d;
1522 }
1523 -dist_sq.sqrt()
1524 }
1525 Metric::InnerProduct => query.iter().zip(item.iter()).map(|(q, v)| q * v).sum(),
1526 }
1527}
1528
1529fn score_from_bytes(
1530 metric: Metric,
1531 query: &[f32],
1532 query_norm: f32,
1533 vector_bytes: &[u8],
1534) -> result::Result<f32, alopex_core::Error> {
1535 let len = vector_bytes.len() / 4;
1536 #[cfg(target_endian = "little")]
1537 {
1538 let ptr = vector_bytes.as_ptr();
1539 if (ptr as usize).is_multiple_of(std::mem::align_of::<f32>()) {
1540 let items = unsafe { std::slice::from_raw_parts(ptr as *const f32, len) };
1541 return Ok(score_from_slice(metric, query, query_norm, items));
1542 }
1543 }
1544
1545 let mut iter = vector_bytes.chunks_exact(4);
1547 let score = match metric {
1548 Metric::Cosine => {
1549 if query_norm == 0.0 {
1550 0.0
1551 } else {
1552 let mut dot = 0.0;
1553 let mut item_norm_sq = 0.0;
1554 for (q, chunk) in query.iter().zip(&mut iter) {
1555 let v = f32::from_le_bytes(chunk.try_into().unwrap());
1556 dot += q * v;
1557 item_norm_sq += v * v;
1558 }
1559 let item_norm = item_norm_sq.sqrt();
1560 if item_norm == 0.0 {
1561 0.0
1562 } else {
1563 dot / (query_norm * item_norm)
1564 }
1565 }
1566 }
1567 Metric::L2 => {
1568 let mut dist_sq = 0.0;
1569 for (q, chunk) in query.iter().zip(&mut iter) {
1570 let v = f32::from_le_bytes(chunk.try_into().unwrap());
1571 let d = q - v;
1572 dist_sq += d * d;
1573 }
1574 -dist_sq.sqrt()
1575 }
1576 Metric::InnerProduct => query
1577 .iter()
1578 .zip(&mut iter)
1579 .map(|(q, chunk)| q * f32::from_le_bytes(chunk.try_into().unwrap()))
1580 .sum(),
1581 };
1582 Ok(score)
1583}
1584
1585fn encode_index(keys: &[Key]) -> result::Result<Vec<u8>, alopex_core::Error> {
1586 let mut buf = Vec::new();
1587 let count = keys.len() as u32;
1588 buf.extend_from_slice(&count.to_le_bytes());
1589 for key in keys {
1590 let len: u32 = key
1591 .len()
1592 .try_into()
1593 .map_err(|_| alopex_core::Error::InvalidFormat("key too long".into()))?;
1594 buf.extend_from_slice(&len.to_le_bytes());
1595 buf.extend_from_slice(key);
1596 }
1597 Ok(buf)
1598}
1599
1600fn decode_index(bytes: &[u8]) -> result::Result<Vec<Key>, alopex_core::Error> {
1601 if bytes.len() < 4 {
1602 return Err(alopex_core::Error::InvalidFormat("index too short".into()));
1603 }
1604 let count = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
1605 let mut pos = 4;
1606 let mut keys = Vec::with_capacity(count);
1607 for _ in 0..count {
1608 if pos + 4 > bytes.len() {
1609 return Err(alopex_core::Error::InvalidFormat("index truncated".into()));
1610 }
1611 let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
1612 pos += 4;
1613 if pos + len > bytes.len() {
1614 return Err(alopex_core::Error::InvalidFormat(
1615 "index key truncated".into(),
1616 ));
1617 }
1618 keys.push(bytes[pos..pos + len].to_vec());
1619 pos += len;
1620 }
1621 Ok(keys)
1622}
1623
1624#[cfg(test)]
1625mod tests {
1626 use super::*;
1627 use std::sync::mpsc;
1628 use std::thread;
1629 use tempfile::tempdir;
1630
1631 #[test]
1632 fn test_open_and_crud() {
1633 let dir = tempdir().unwrap();
1634 let path = dir.path().join("test.db");
1635 let db = Database::open(&path).unwrap();
1636
1637 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1638 txn.put(b"key1", b"value1").unwrap();
1639 txn.commit().unwrap();
1640
1641 let mut txn2 = db.begin(TxnMode::ReadOnly).unwrap();
1642 let val = txn2.get(b"key1").unwrap();
1643 assert_eq!(val, Some(b"value1".to_vec()));
1644 }
1645
1646 #[test]
1647 fn test_not_found() {
1648 let db = Database::new();
1649 let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
1650 let val = txn.get(b"non-existent-key").unwrap();
1651 assert!(val.is_none());
1652 }
1653
1654 #[cfg(not(target_arch = "wasm32"))]
1655 #[test]
1656 fn test_file_format_version_reads_alopex_header() {
1657 use alopex_core::storage::format::{AlopexFileWriter, FileFlags, FileVersion};
1658
1659 let dir = tempdir().unwrap();
1660 let path = dir.path().join("format-test.alopex");
1661 let expected = FileVersion::new(0, 0, 1);
1662
1663 let writer = AlopexFileWriter::new(path.clone(), expected, FileFlags(0)).unwrap();
1664 writer.finalize().unwrap();
1665
1666 let db = Database::open(&path).unwrap();
1667 assert_eq!(db.file_format_version(), expected);
1668 }
1669
1670 #[test]
1671 fn test_crash_recovery_replays_wal() {
1672 let dir = tempdir().unwrap();
1673 let path = dir.path().join("replay.db");
1674
1675 {
1676 let db = Database::open(&path).unwrap();
1677 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1678 txn.put(b"k1", b"v1").unwrap();
1679 txn.commit().unwrap();
1680
1681 let mut uncommitted = db.begin(TxnMode::ReadWrite).unwrap();
1682 uncommitted.put(b"k2", b"v2").unwrap();
1683 }
1685
1686 let db = Database::open(&path).unwrap();
1687 let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
1688 assert_eq!(txn.get(b"k1").unwrap(), Some(b"v1".to_vec()));
1689 assert_eq!(txn.get(b"k2").unwrap(), None);
1690 }
1691
1692 #[test]
1693 fn test_txn_closed() {
1694 let db = Database::new();
1695 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1696 txn.put(b"k1", b"v1").unwrap();
1697 txn.commit().unwrap();
1698 }
1704
1705 #[test]
1706 fn test_concurrency_conflict() {
1707 let db = std::sync::Arc::new(Database::new());
1708 let mut t0 = db.begin(TxnMode::ReadWrite).unwrap();
1709 t0.put(b"k1", b"v0").unwrap();
1710 t0.commit().unwrap();
1711
1712 let (tx1, rx1) = mpsc::channel();
1713 let (tx2, rx2) = mpsc::channel();
1714
1715 let db1 = db.clone();
1716 let t1 = thread::spawn(move || {
1717 let mut txn1 = db1.begin(TxnMode::ReadWrite).unwrap();
1718 let val = txn1.get(b"k1").unwrap();
1719 assert_eq!(val.unwrap(), b"v0");
1720 tx1.send(()).unwrap();
1721 rx2.recv().unwrap();
1722 txn1.put(b"k1", b"v1").unwrap();
1723 let result = txn1.commit();
1724 assert!(matches!(
1725 result,
1726 Err(Error::Core(alopex_core::Error::TxnConflict))
1727 ));
1728 });
1729
1730 let db2 = db.clone();
1731 let t2 = thread::spawn(move || {
1732 rx1.recv().unwrap();
1733 let mut txn2 = db2.begin(TxnMode::ReadWrite).unwrap();
1734 txn2.put(b"k1", b"v2").unwrap();
1735 assert!(txn2.commit().is_ok());
1736 tx2.send(()).unwrap();
1737 });
1738
1739 t1.join().unwrap();
1740 t2.join().unwrap();
1741
1742 let mut txn3 = db.begin(TxnMode::ReadOnly).unwrap();
1743 let val = txn3.get(b"k1").unwrap();
1744 assert_eq!(val.unwrap(), b"v2");
1745 }
1746
1747 #[test]
1748 fn test_flush_and_reopen_via_embedded_api() {
1749 let dir = tempdir().unwrap();
1750 let path = dir.path().join("persist.db");
1751 {
1752 let db = Database::open(&path).unwrap();
1753 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1754 txn.put(b"k1", b"v1").unwrap();
1755 txn.commit().unwrap();
1756 db.flush().unwrap();
1757 }
1758
1759 let db = Database::open(&path).unwrap();
1760 let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
1761 assert_eq!(txn.get(b"k1").unwrap(), Some(b"v1".to_vec()));
1762 }
1763
1764 #[test]
1765 fn test_large_value_blob_roundtrip() {
1766 let dir = tempdir().unwrap();
1767 let path = dir.path().join("blob.lv");
1768 let payload = b"hello large value";
1769
1770 {
1771 let db = Database::new();
1772 let mut writer = db
1773 .create_blob_writer(&path, payload.len() as u64, Some(16))
1774 .unwrap();
1775 writer.write_chunk(&payload[..5]).unwrap();
1776 writer.write_chunk(&payload[5..]).unwrap();
1777 writer.finish().unwrap();
1778 }
1779
1780 let db = Database::new();
1781 let mut reader = db.open_large_value(&path).unwrap();
1782 let mut buf = Vec::new();
1783 while let Some((_info, chunk)) = reader.next_chunk().unwrap() {
1784 buf.extend_from_slice(&chunk);
1785 }
1786 assert_eq!(buf, payload);
1787 }
1788
1789 #[test]
1790 fn upsert_and_search_same_txn() {
1791 let db = Database::new();
1792 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1793 txn.upsert_vector(b"k1", b"meta1", &[1.0, 0.0], Metric::Cosine)
1794 .unwrap();
1795
1796 let results = txn
1797 .search_similar(&[1.0, 0.0], Metric::Cosine, 1, None)
1798 .unwrap();
1799 assert_eq!(results.len(), 1);
1800 assert_eq!(results[0].key, b"k1");
1801 assert_eq!(results[0].metadata, b"meta1");
1802 txn.commit().unwrap();
1803 }
1804
1805 #[test]
1806 fn upsert_and_search_across_txn() {
1807 let db = Database::new();
1808 {
1809 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1810 txn.upsert_vector(b"k1", b"meta1", &[1.0, 1.0], Metric::Cosine)
1811 .unwrap();
1812 txn.commit().unwrap();
1813 }
1814
1815 let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
1816 let results = ro
1817 .search_similar(&[1.0, 1.0], Metric::Cosine, 1, None)
1818 .unwrap();
1819 assert_eq!(results.len(), 1);
1820 assert_eq!(results[0].key, b"k1");
1821 }
1822
1823 #[test]
1824 fn read_only_upsert_rejected() {
1825 let db = Database::new();
1826 let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
1827 let err = ro
1828 .upsert_vector(b"k1", b"m", &[1.0, 0.0], Metric::Cosine)
1829 .unwrap_err();
1830 assert!(matches!(err, Error::Core(alopex_core::Error::TxnReadOnly)));
1831 }
1832
1833 #[test]
1834 fn dimension_mismatch_on_search() {
1835 let db = Database::new();
1836 {
1837 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
1838 txn.upsert_vector(b"k1", b"m", &[1.0, 0.0], Metric::Cosine)
1839 .unwrap();
1840 txn.commit().unwrap();
1841 }
1842 let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
1843 let err = ro
1844 .search_similar(&[1.0, 0.0, 1.0], Metric::Cosine, 1, None)
1845 .unwrap_err();
1846 assert!(matches!(
1847 err,
1848 Error::Core(alopex_core::Error::DimensionMismatch { .. })
1849 ));
1850 }
1851}