1#![deny(missing_docs)]
4
5pub mod columnar_api;
6pub mod options;
7
8pub use crate::columnar_api::{EmbeddedConfig, StorageMode};
9pub use crate::options::DatabaseOptions;
10use alopex_core::vector::hnsw::{HnswTransactionState, SearchStats as HnswSearchStats};
11use alopex_core::{
12 columnar::{
13 kvs_bridge::ColumnarKvsBridge, memory::InMemorySegmentStore, segment_v2::SegmentConfigV2,
14 },
15 kv::any::AnyKVTransaction,
16 kv::memory::MemoryKV,
17 kv::AnyKV,
18 score, validate_dimensions, HnswConfig, HnswIndex, HnswSearchResult, HnswStats, KVStore,
19 KVTransaction, Key, LargeValueKind, LargeValueMeta, LargeValueReader, LargeValueWriter,
20 StorageFactory, VectorType, DEFAULT_CHUNK_SIZE,
21};
22pub use alopex_core::{MemoryStats, Metric, TxnMode};
23use std::collections::HashMap;
24use std::convert::TryInto;
25use std::fs;
26use std::path::Path;
27use std::result;
28use std::sync::Arc;
29
30pub type Result<T> = result::Result<T, Error>;
32
33#[derive(Debug, thiserror::Error)]
35pub enum Error {
36 #[error("core error: {0}")]
38 Core(#[from] alopex_core::Error),
39 #[error("transaction is completed")]
41 TxnCompleted,
42 #[error("table not found: {0}")]
44 TableNotFound(String),
45 #[error("not in in-memory columnar mode")]
47 NotInMemoryMode,
48}
49
50pub struct Database {
52 pub(crate) store: Arc<AnyKV>,
54 pub(crate) columnar_mode: StorageMode,
55 pub(crate) columnar_bridge: ColumnarKvsBridge,
56 pub(crate) columnar_memory: Option<InMemorySegmentStore>,
57 pub(crate) segment_config: SegmentConfigV2,
58}
59
60pub(crate) fn disk_data_dir_path(path: &Path) -> std::path::PathBuf {
61 if path.extension().is_some_and(|e| e == "alopex") {
62 path.with_extension("alopex.d")
65 } else {
66 path.to_path_buf()
67 }
68}
69
70impl Database {
71 pub fn open(path: &Path) -> Result<Self> {
73 let data_dir = disk_data_dir_path(path);
74 let store = StorageFactory::create(alopex_core::StorageMode::Disk {
75 path: data_dir,
76 config: None,
77 })
78 .map_err(Error::Core)?;
79 Ok(Self::init(
80 store,
81 StorageMode::Disk,
82 None,
83 SegmentConfigV2::default(),
84 ))
85 }
86
87 pub fn new() -> Self {
89 let store = AnyKV::Memory(MemoryKV::new());
90 Self::init(
91 store,
92 StorageMode::InMemory,
93 None,
94 SegmentConfigV2::default(),
95 )
96 }
97
98 pub fn open_in_memory() -> Result<Self> {
100 Self::open_in_memory_with_options(DatabaseOptions::in_memory())
101 }
102
103 pub fn open_in_memory_with_options(opts: DatabaseOptions) -> Result<Self> {
105 if !opts.memory_mode() {
106 return Err(Error::Core(alopex_core::Error::InvalidFormat(
107 "memory_mode must be enabled for in-memory open".into(),
108 )));
109 }
110 let store = StorageFactory::create(opts.to_storage_mode(None)).map_err(Error::Core)?;
111 Ok(Self::init(
112 store,
113 StorageMode::InMemory,
114 opts.memory_limit(),
115 SegmentConfigV2::default(),
116 ))
117 }
118
119 pub(crate) fn init(
120 store: AnyKV,
121 columnar_mode: StorageMode,
122 memory_limit: Option<usize>,
123 segment_config: SegmentConfigV2,
124 ) -> Self {
125 let store = Arc::new(store);
126 let columnar_bridge = ColumnarKvsBridge::new(store.clone());
127 let columnar_memory = if matches!(columnar_mode, StorageMode::InMemory) {
128 Some(InMemorySegmentStore::new(memory_limit.map(|v| v as u64)))
129 } else {
130 None
131 };
132
133 Self {
134 store,
135 columnar_mode,
136 columnar_bridge,
137 columnar_memory,
138 segment_config,
139 }
140 }
141
142 pub fn flush(&self) -> Result<()> {
144 self.store.flush().map_err(Error::Core)
145 }
146
147 pub fn memory_usage(&self) -> Option<MemoryStats> {
149 match self.store.as_ref() {
150 AnyKV::Memory(kv) => Some(kv.memory_stats()),
151 AnyKV::Lsm(_) => None,
152 }
153 }
154
155 pub fn persist_to_disk(&self, wal_path: &Path) -> Result<()> {
159 if !matches!(self.store.as_ref(), AnyKV::Memory(_)) {
160 return Err(Error::NotInMemoryMode);
161 }
162 let data_dir = disk_data_dir_path(wal_path);
163 if wal_path.exists() || data_dir.exists() {
164 return Err(Error::Core(alopex_core::Error::PathExists(
165 wal_path.to_path_buf(),
166 )));
167 }
168
169 let tmp_dir = data_dir.with_extension("tmp");
170 if tmp_dir.exists() {
171 return Err(Error::Core(alopex_core::Error::PathExists(tmp_dir)));
172 }
173
174 let snapshot = self.snapshot_pairs()?;
175 let write_result = (|| -> Result<()> {
176 let store = StorageFactory::create(alopex_core::StorageMode::Disk {
177 path: tmp_dir.clone(),
178 config: None,
179 })
180 .map_err(Error::Core)?;
181
182 let mut txn = store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
183 for (key, value) in snapshot {
184 txn.put(key, value).map_err(Error::Core)?;
185 }
186 txn.commit_self().map_err(Error::Core)?;
187
188 Ok(())
189 })();
190
191 if let Err(e) = write_result {
192 let _ = fs::remove_dir_all(&tmp_dir);
193 return Err(e);
194 }
195
196 fs::rename(&tmp_dir, &data_dir).map_err(|e| Error::Core(e.into()))?;
197 if wal_path.extension().is_some_and(|e| e == "alopex") {
198 let _ = fs::OpenOptions::new()
200 .create_new(true)
201 .write(true)
202 .open(wal_path);
203 }
204 Ok(())
205 }
206
207 pub fn clone_to_memory(&self) -> Result<Self> {
209 let snapshot = self.snapshot_pairs()?;
210 let cloned = Database::open_in_memory()?;
211 if snapshot.is_empty() {
212 return Ok(cloned);
213 }
214
215 let mut txn = cloned.begin(TxnMode::ReadWrite)?;
216 for (key, value) in snapshot {
217 txn.put(&key, &value)?;
218 }
219 txn.commit()?;
220 Ok(cloned)
221 }
222
223 pub fn clear(&self) -> Result<()> {
225 let keys: Vec<Key> = self.snapshot_pairs()?.into_iter().map(|(k, _)| k).collect();
226 if keys.is_empty() {
227 return Ok(());
228 }
229 let mut txn = self.begin(TxnMode::ReadWrite)?;
230 for key in keys {
231 txn.delete(&key)?;
232 }
233 txn.commit()
234 }
235
236 pub fn set_memory_limit(&self, bytes: Option<usize>) {
238 if let AnyKV::Memory(kv) = self.store.as_ref() {
239 kv.txn_manager().set_memory_limit(bytes);
240 }
241 }
242
243 pub fn snapshot(&self) -> Vec<(Key, Vec<u8>)> {
245 self.snapshot_pairs().unwrap_or_default()
246 }
247
248 fn snapshot_pairs(&self) -> Result<Vec<(Key, Vec<u8>)>> {
249 let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
250 let pairs: Vec<(Key, Vec<u8>)> = txn.scan_prefix(b"").map_err(Error::Core)?.collect();
251 txn.commit_self().map_err(Error::Core)?;
252 Ok(pairs)
253 }
254
255 pub fn create_hnsw_index(&self, name: &str, config: HnswConfig) -> Result<()> {
257 let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
258 let index = HnswIndex::create(name, config).map_err(Error::Core)?;
259 index.save(&mut txn).map_err(Error::Core)?;
260 txn.commit_self().map_err(Error::Core)
261 }
262
263 pub fn drop_hnsw_index(&self, name: &str) -> Result<()> {
265 let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
266 let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
267 index.drop(&mut txn).map_err(Error::Core)?;
268 txn.commit_self().map_err(Error::Core)
269 }
270
271 pub fn get_hnsw_stats(&self, name: &str) -> Result<HnswStats> {
273 let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
274 let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
275 Ok(index.stats())
276 }
277
278 pub fn compact_hnsw_index(&self, name: &str) -> Result<alopex_core::vector::CompactionResult> {
280 let mut txn = self.store.begin(TxnMode::ReadWrite).map_err(Error::Core)?;
281 let mut index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
282 let result = index.compact().map_err(Error::Core)?;
283 index.save(&mut txn).map_err(Error::Core)?;
284 txn.commit_self().map_err(Error::Core)?;
285 Ok(result)
286 }
287
288 pub fn search_hnsw(
290 &self,
291 name: &str,
292 query: &[f32],
293 k: usize,
294 ef_search: Option<usize>,
295 ) -> Result<(Vec<HnswSearchResult>, HnswSearchStats)> {
296 let mut txn = self.store.begin(TxnMode::ReadOnly).map_err(Error::Core)?;
297 let index = HnswIndex::load(name, &mut txn).map_err(Error::Core)?;
298 index.search(query, k, ef_search).map_err(Error::Core)
299 }
300
301 pub fn create_blob_writer(
303 &self,
304 path: &Path,
305 total_len: u64,
306 chunk_size: Option<u32>,
307 ) -> Result<LargeValueWriter> {
308 let meta = LargeValueMeta {
309 kind: LargeValueKind::Blob,
310 total_len,
311 chunk_size: chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE),
312 };
313 LargeValueWriter::create(path, meta).map_err(Error::Core)
314 }
315
316 pub fn create_typed_writer(
318 &self,
319 path: &Path,
320 type_id: u16,
321 total_len: u64,
322 chunk_size: Option<u32>,
323 ) -> Result<LargeValueWriter> {
324 let meta = LargeValueMeta {
325 kind: LargeValueKind::Typed(type_id),
326 total_len,
327 chunk_size: chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE),
328 };
329 LargeValueWriter::create(path, meta).map_err(Error::Core)
330 }
331
332 pub fn open_large_value(&self, path: &Path) -> Result<LargeValueReader> {
334 LargeValueReader::open(path).map_err(Error::Core)
335 }
336
337 pub fn begin(&self, mode: TxnMode) -> Result<Transaction<'_>> {
339 let txn = self.store.begin(mode).map_err(Error::Core)?;
340 Ok(Transaction {
341 inner: Some(txn),
342 db: self,
343 hnsw_indices: HashMap::new(),
344 })
345 }
346}
347
348impl Default for Database {
349 fn default() -> Self {
350 Self::new()
351 }
352}
353
354pub struct Transaction<'a> {
356 inner: Option<AnyKVTransaction<'a>>,
357 db: &'a Database,
358 hnsw_indices: HashMap<String, (HnswIndex, alopex_core::vector::hnsw::HnswTransactionState)>,
359}
360
361#[derive(Debug, Clone, PartialEq)]
363pub struct SearchResult {
364 pub key: Key,
366 pub metadata: Vec<u8>,
368 pub score: f32,
370}
371
372const VECTOR_INDEX_KEY: &[u8] = b"__alopex_vector_index";
373
374impl<'a> Transaction<'a> {
375 pub fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
377 self.inner_mut()?.get(&key.to_vec()).map_err(Error::Core)
378 }
379
380 pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
382 self.inner_mut()?
383 .put(key.to_vec(), value.to_vec())
384 .map_err(Error::Core)
385 }
386
387 pub fn delete(&mut self, key: &[u8]) -> Result<()> {
389 self.inner_mut()?.delete(key.to_vec()).map_err(Error::Core)
390 }
391
392 pub fn upsert_to_hnsw(
394 &mut self,
395 index_name: &str,
396 key: &[u8],
397 vector: &[f32],
398 metadata: &[u8],
399 ) -> Result<()> {
400 self.ensure_write_txn()?;
401 let (index, state) = self.hnsw_entry_mut(index_name)?;
402 index
403 .upsert_staged(key, vector, metadata, state)
404 .map_err(Error::Core)
405 }
406
407 pub fn delete_from_hnsw(&mut self, index_name: &str, key: &[u8]) -> Result<bool> {
409 self.ensure_write_txn()?;
410 let (index, state) = self.hnsw_entry_mut(index_name)?;
411 index.delete_staged(key, state).map_err(Error::Core)
412 }
413
414 pub fn upsert_vector(
418 &mut self,
419 key: &[u8],
420 metadata: &[u8],
421 vector: &[f32],
422 metric: Metric,
423 ) -> Result<()> {
424 if vector.is_empty() {
425 return Err(Error::Core(alopex_core::Error::InvalidFormat(
426 "vector cannot be empty".into(),
427 )));
428 }
429 let vt = VectorType::new(vector.len(), metric);
430 vt.validate(vector).map_err(Error::Core)?;
431
432 let payload = encode_vector_entry(vt, metadata, vector);
433 let txn = self.inner_mut()?;
434 txn.put(key.to_vec(), payload).map_err(Error::Core)?;
435
436 let mut keys = self.load_vector_index()?;
437 if !keys.iter().any(|k| k == key) {
438 keys.push(key.to_vec());
439 self.persist_vector_index(&keys)?;
440 }
441 Ok(())
442 }
443
444 pub fn search_similar(
449 &mut self,
450 query_vector: &[f32],
451 metric: Metric,
452 top_k: usize,
453 filter_keys: Option<&[Key]>,
454 ) -> Result<Vec<SearchResult>> {
455 if top_k == 0 {
456 return Ok(Vec::new());
457 }
458
459 let mut keys = match filter_keys {
460 Some(keys) => keys.to_vec(),
461 None => self.load_vector_index()?,
462 };
463 if keys.is_empty() {
464 return Ok(Vec::new());
465 }
466
467 let mut rows = Vec::new();
468 let txn = self.inner_mut()?;
469 for key in keys.drain(..) {
470 let Some(raw) = txn.get(&key).map_err(Error::Core)? else {
471 continue;
472 };
473 let decoded = decode_vector_entry(&raw).map_err(Error::Core)?;
474 if decoded.metric != metric {
475 return Err(Error::Core(alopex_core::Error::UnsupportedMetric {
476 metric: metric.as_str().to_string(),
477 }));
478 }
479 validate_dimensions(decoded.dim, query_vector.len()).map_err(Error::Core)?;
480 let score = score(metric, query_vector, &decoded.vector).map_err(Error::Core)?;
481 rows.push(SearchResult {
482 key,
483 metadata: decoded.metadata,
484 score,
485 });
486 }
487
488 rows.sort_by(|a, b| b.score.total_cmp(&a.score).then_with(|| a.key.cmp(&b.key)));
489 if rows.len() > top_k {
490 rows.truncate(top_k);
491 }
492 Ok(rows)
493 }
494
495 fn load_vector_index(&mut self) -> Result<Vec<Key>> {
496 let txn = self.inner_mut()?;
497 let Some(raw) = txn.get(&VECTOR_INDEX_KEY.to_vec()).map_err(Error::Core)? else {
498 return Ok(Vec::new());
499 };
500 decode_index(&raw).map_err(Error::Core)
501 }
502
503 fn persist_vector_index(&mut self, keys: &[Key]) -> Result<()> {
504 let txn = self.inner_mut()?;
505 let encoded = encode_index(keys)?;
506 txn.put(VECTOR_INDEX_KEY.to_vec(), encoded)
507 .map_err(Error::Core)
508 }
509
510 pub fn commit(mut self) -> Result<()> {
512 {
513 let txn = self.inner.as_mut().ok_or(Error::TxnCompleted)?;
514 for (_, (index, state)) in self.hnsw_indices.iter_mut() {
515 index.commit_staged(txn, state).map_err(Error::Core)?;
516 }
517 }
518 let txn = self.inner.take().ok_or(Error::TxnCompleted)?;
519 self.hnsw_indices.clear();
520 txn.commit_self().map_err(Error::Core)
521 }
522
523 pub fn rollback(mut self) -> Result<()> {
525 if let Some(txn) = self.inner.take() {
526 for (_, (index, state)) in self.hnsw_indices.iter_mut() {
527 let _ = index.rollback(state);
528 }
529 self.hnsw_indices.clear();
530 txn.rollback_self().map_err(Error::Core)
531 } else {
532 Err(Error::TxnCompleted)
533 }
534 }
535
536 fn inner_mut(&mut self) -> Result<&mut AnyKVTransaction<'a>> {
537 self.inner.as_mut().ok_or(Error::TxnCompleted)
538 }
539
540 fn hnsw_entry_mut(&mut self, name: &str) -> Result<&mut (HnswIndex, HnswTransactionState)> {
541 if !self.hnsw_indices.contains_key(name) {
542 let index = {
543 let txn = self.inner_mut()?;
544 HnswIndex::load(name, txn).map_err(Error::Core)?
545 };
546 self.hnsw_indices
547 .insert(name.to_string(), (index, HnswTransactionState::default()));
548 }
549 Ok(self.hnsw_indices.get_mut(name).unwrap())
550 }
551
552 fn ensure_write_txn(&self) -> Result<()> {
553 let txn = self.inner.as_ref().ok_or(Error::TxnCompleted)?;
554 if txn.mode() != TxnMode::ReadWrite {
555 return Err(Error::Core(alopex_core::Error::TxnConflict));
556 }
557 Ok(())
558 }
559}
560
561impl<'a> Drop for Transaction<'a> {
562 fn drop(&mut self) {
563 if let Some(txn) = self.inner.take() {
564 for (_, (index, state)) in self.hnsw_indices.iter_mut() {
565 let _ = index.rollback(state);
566 }
567 self.hnsw_indices.clear();
568 let _ = txn.rollback_self();
569 }
570 }
571}
572
573fn metric_to_byte(metric: Metric) -> u8 {
574 match metric {
575 Metric::Cosine => 0,
576 Metric::L2 => 1,
577 Metric::InnerProduct => 2,
578 }
579}
580
581fn byte_to_metric(byte: u8) -> result::Result<Metric, alopex_core::Error> {
582 match byte {
583 0 => Ok(Metric::Cosine),
584 1 => Ok(Metric::L2),
585 2 => Ok(Metric::InnerProduct),
586 other => Err(alopex_core::Error::UnsupportedMetric {
587 metric: format!("unknown({other})"),
588 }),
589 }
590}
591
592fn encode_vector_entry(vector_type: VectorType, metadata: &[u8], vector: &[f32]) -> Vec<u8> {
593 let dim = vector_type.dim() as u32;
594 let meta_len = metadata.len() as u32;
595 let mut buf = Vec::with_capacity(1 + 4 + 4 + metadata.len() + std::mem::size_of_val(vector));
596 buf.push(metric_to_byte(vector_type.metric()));
597 buf.extend_from_slice(&dim.to_le_bytes());
598 buf.extend_from_slice(&meta_len.to_le_bytes());
599 buf.extend_from_slice(metadata);
600 for v in vector {
601 buf.extend_from_slice(&v.to_le_bytes());
602 }
603 buf
604}
605
606struct DecodedEntry {
607 metric: Metric,
608 dim: usize,
609 metadata: Vec<u8>,
610 vector: Vec<f32>,
611}
612
613fn decode_vector_entry(bytes: &[u8]) -> result::Result<DecodedEntry, alopex_core::Error> {
614 if bytes.len() < 9 {
615 return Err(alopex_core::Error::InvalidFormat(
616 "vector entry too short".into(),
617 ));
618 }
619 let metric = byte_to_metric(bytes[0])?;
620 let dim = u32::from_le_bytes(bytes[1..5].try_into().unwrap()) as usize;
621 let meta_len = u32::from_le_bytes(bytes[5..9].try_into().unwrap()) as usize;
622
623 let header = 9;
624 let expected_len = header + meta_len + dim * std::mem::size_of::<f32>();
625 if bytes.len() < expected_len {
626 return Err(alopex_core::Error::InvalidFormat(
627 "vector entry truncated".into(),
628 ));
629 }
630
631 let metadata = bytes[header..header + meta_len].to_vec();
632 let mut vector = Vec::with_capacity(dim);
633 let vec_bytes = &bytes[header + meta_len..expected_len];
634 for chunk in vec_bytes.chunks_exact(4) {
635 vector.push(f32::from_le_bytes(chunk.try_into().unwrap()));
636 }
637
638 Ok(DecodedEntry {
639 metric,
640 dim,
641 metadata,
642 vector,
643 })
644}
645
646fn encode_index(keys: &[Key]) -> result::Result<Vec<u8>, alopex_core::Error> {
647 let mut buf = Vec::new();
648 let count = keys.len() as u32;
649 buf.extend_from_slice(&count.to_le_bytes());
650 for key in keys {
651 let len: u32 = key
652 .len()
653 .try_into()
654 .map_err(|_| alopex_core::Error::InvalidFormat("key too long".into()))?;
655 buf.extend_from_slice(&len.to_le_bytes());
656 buf.extend_from_slice(key);
657 }
658 Ok(buf)
659}
660
661fn decode_index(bytes: &[u8]) -> result::Result<Vec<Key>, alopex_core::Error> {
662 if bytes.len() < 4 {
663 return Err(alopex_core::Error::InvalidFormat("index too short".into()));
664 }
665 let count = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
666 let mut pos = 4;
667 let mut keys = Vec::with_capacity(count);
668 for _ in 0..count {
669 if pos + 4 > bytes.len() {
670 return Err(alopex_core::Error::InvalidFormat("index truncated".into()));
671 }
672 let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
673 pos += 4;
674 if pos + len > bytes.len() {
675 return Err(alopex_core::Error::InvalidFormat(
676 "index key truncated".into(),
677 ));
678 }
679 keys.push(bytes[pos..pos + len].to_vec());
680 pos += len;
681 }
682 Ok(keys)
683}
684
685#[cfg(test)]
686mod tests {
687 use super::*;
688 use std::sync::mpsc;
689 use std::thread;
690 use tempfile::tempdir;
691
692 #[test]
693 fn test_open_and_crud() {
694 let dir = tempdir().unwrap();
695 let path = dir.path().join("test.db");
696 let db = Database::open(&path).unwrap();
697
698 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
699 txn.put(b"key1", b"value1").unwrap();
700 txn.commit().unwrap();
701
702 let mut txn2 = db.begin(TxnMode::ReadOnly).unwrap();
703 let val = txn2.get(b"key1").unwrap();
704 assert_eq!(val, Some(b"value1".to_vec()));
705 }
706
707 #[test]
708 fn test_not_found() {
709 let db = Database::new();
710 let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
711 let val = txn.get(b"non-existent-key").unwrap();
712 assert!(val.is_none());
713 }
714
715 #[test]
716 fn test_crash_recovery_replays_wal() {
717 let dir = tempdir().unwrap();
718 let path = dir.path().join("replay.db");
719
720 {
721 let db = Database::open(&path).unwrap();
722 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
723 txn.put(b"k1", b"v1").unwrap();
724 txn.commit().unwrap();
725
726 let mut uncommitted = db.begin(TxnMode::ReadWrite).unwrap();
727 uncommitted.put(b"k2", b"v2").unwrap();
728 }
730
731 let db = Database::open(&path).unwrap();
732 let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
733 assert_eq!(txn.get(b"k1").unwrap(), Some(b"v1".to_vec()));
734 assert_eq!(txn.get(b"k2").unwrap(), None);
735 }
736
737 #[test]
738 fn test_txn_closed() {
739 let db = Database::new();
740 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
741 txn.put(b"k1", b"v1").unwrap();
742 txn.commit().unwrap();
743 }
749
750 #[test]
751 fn test_concurrency_conflict() {
752 let db = std::sync::Arc::new(Database::new());
753 let mut t0 = db.begin(TxnMode::ReadWrite).unwrap();
754 t0.put(b"k1", b"v0").unwrap();
755 t0.commit().unwrap();
756
757 let (tx1, rx1) = mpsc::channel();
758 let (tx2, rx2) = mpsc::channel();
759
760 let db1 = db.clone();
761 let t1 = thread::spawn(move || {
762 let mut txn1 = db1.begin(TxnMode::ReadWrite).unwrap();
763 let val = txn1.get(b"k1").unwrap();
764 assert_eq!(val.unwrap(), b"v0");
765 tx1.send(()).unwrap();
766 rx2.recv().unwrap();
767 txn1.put(b"k1", b"v1").unwrap();
768 let result = txn1.commit();
769 assert!(matches!(
770 result,
771 Err(Error::Core(alopex_core::Error::TxnConflict))
772 ));
773 });
774
775 let db2 = db.clone();
776 let t2 = thread::spawn(move || {
777 rx1.recv().unwrap();
778 let mut txn2 = db2.begin(TxnMode::ReadWrite).unwrap();
779 txn2.put(b"k1", b"v2").unwrap();
780 assert!(txn2.commit().is_ok());
781 tx2.send(()).unwrap();
782 });
783
784 t1.join().unwrap();
785 t2.join().unwrap();
786
787 let mut txn3 = db.begin(TxnMode::ReadOnly).unwrap();
788 let val = txn3.get(b"k1").unwrap();
789 assert_eq!(val.unwrap(), b"v2");
790 }
791
792 #[test]
793 fn test_flush_and_reopen_via_embedded_api() {
794 let dir = tempdir().unwrap();
795 let path = dir.path().join("persist.db");
796 {
797 let db = Database::open(&path).unwrap();
798 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
799 txn.put(b"k1", b"v1").unwrap();
800 txn.commit().unwrap();
801 db.flush().unwrap();
802 }
803
804 let db = Database::open(&path).unwrap();
805 let mut txn = db.begin(TxnMode::ReadOnly).unwrap();
806 assert_eq!(txn.get(b"k1").unwrap(), Some(b"v1".to_vec()));
807 }
808
809 #[test]
810 fn test_large_value_blob_roundtrip() {
811 let dir = tempdir().unwrap();
812 let path = dir.path().join("blob.lv");
813 let payload = b"hello large value";
814
815 {
816 let db = Database::new();
817 let mut writer = db
818 .create_blob_writer(&path, payload.len() as u64, Some(16))
819 .unwrap();
820 writer.write_chunk(&payload[..5]).unwrap();
821 writer.write_chunk(&payload[5..]).unwrap();
822 writer.finish().unwrap();
823 }
824
825 let db = Database::new();
826 let mut reader = db.open_large_value(&path).unwrap();
827 let mut buf = Vec::new();
828 while let Some((_info, chunk)) = reader.next_chunk().unwrap() {
829 buf.extend_from_slice(&chunk);
830 }
831 assert_eq!(buf, payload);
832 }
833
834 #[test]
835 fn upsert_and_search_same_txn() {
836 let db = Database::new();
837 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
838 txn.upsert_vector(b"k1", b"meta1", &[1.0, 0.0], Metric::Cosine)
839 .unwrap();
840
841 let results = txn
842 .search_similar(&[1.0, 0.0], Metric::Cosine, 1, None)
843 .unwrap();
844 assert_eq!(results.len(), 1);
845 assert_eq!(results[0].key, b"k1");
846 assert_eq!(results[0].metadata, b"meta1");
847 txn.commit().unwrap();
848 }
849
850 #[test]
851 fn upsert_and_search_across_txn() {
852 let db = Database::new();
853 {
854 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
855 txn.upsert_vector(b"k1", b"meta1", &[1.0, 1.0], Metric::Cosine)
856 .unwrap();
857 txn.commit().unwrap();
858 }
859
860 let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
861 let results = ro
862 .search_similar(&[1.0, 1.0], Metric::Cosine, 1, None)
863 .unwrap();
864 assert_eq!(results.len(), 1);
865 assert_eq!(results[0].key, b"k1");
866 }
867
868 #[test]
869 fn read_only_upsert_rejected() {
870 let db = Database::new();
871 let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
872 let err = ro
873 .upsert_vector(b"k1", b"m", &[1.0, 0.0], Metric::Cosine)
874 .unwrap_err();
875 assert!(matches!(err, Error::Core(alopex_core::Error::TxnConflict)));
876 }
877
878 #[test]
879 fn dimension_mismatch_on_search() {
880 let db = Database::new();
881 {
882 let mut txn = db.begin(TxnMode::ReadWrite).unwrap();
883 txn.upsert_vector(b"k1", b"m", &[1.0, 0.0], Metric::Cosine)
884 .unwrap();
885 txn.commit().unwrap();
886 }
887 let mut ro = db.begin(TxnMode::ReadOnly).unwrap();
888 let err = ro
889 .search_similar(&[1.0, 0.0, 1.0], Metric::Cosine, 1, None)
890 .unwrap_err();
891 assert!(matches!(
892 err,
893 Error::Core(alopex_core::Error::DimensionMismatch { .. })
894 ));
895 }
896}