use futures::{stream, stream::StreamExt};
use object_store::ObjectStore;
use parking_lot::RwLock;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet};
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use std::{
fmt::Debug,
time::{Duration, Instant},
};
use tokio_util::sync::CancellationToken;
use crate::{
collection::{Collection, CollectionConfig},
error::DBError,
schema::*,
storage::{Storage, StorageConfig, StorageStats},
unix_ms,
};
#[derive(Clone)]
pub struct AndaDB {
inner: Arc<InnerDB>,
}
struct InnerDB {
name: String,
object_store: Arc<dyn ObjectStore>,
storage: Storage,
metadata: RwLock<DBMetadata>,
collections: RwLock<BTreeMap<String, Arc<Collection>>>,
read_only: AtomicBool,
dropping_collections: RwLock<BTreeSet<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DBConfig {
pub name: String,
pub description: String,
pub storage: StorageConfig,
pub lock: Option<ByteBufB64>,
}
impl Default for DBConfig {
fn default() -> Self {
Self {
name: "anda_db".to_string(),
description: "Anda DB".to_string(),
storage: StorageConfig::default(),
lock: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DBMetadata {
pub config: DBConfig,
pub collections: BTreeSet<String>,
#[serde(default)]
pub extensions: BTreeMap<String, FieldValue>,
}
impl Debug for AndaDB {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "AndaDB({})", self.inner.name)
}
}
impl AndaDB {
const METADATA_PATH: &'static str = "db_meta.cbor";
pub fn stats(&self) -> StorageStats {
let mut stats = self.inner.storage.stats();
for collection in self.inner.collections.read().values() {
stats.merge(&collection.storage_stats());
}
stats
}
pub async fn create(
object_store: Arc<dyn ObjectStore>,
config: DBConfig,
) -> Result<Self, DBError> {
validate_field_name(config.name.as_str())?;
let storage = Storage::connect(
config.name.clone(),
object_store.clone(),
config.storage.clone(),
)
.await?;
let metadata = DBMetadata {
config,
collections: BTreeSet::new(),
extensions: BTreeMap::new(),
};
match storage.create(Self::METADATA_PATH, &metadata).await {
Ok(_) => {
storage.store_metadata(0, unix_ms()).await?;
}
Err(err) => return Err(err),
}
Ok(Self {
inner: Arc::new(InnerDB {
name: metadata.config.name.clone(),
object_store,
storage,
metadata: RwLock::new(metadata),
collections: RwLock::new(BTreeMap::new()),
read_only: AtomicBool::new(false),
dropping_collections: RwLock::new(BTreeSet::new()),
}),
})
}
pub async fn connect(
object_store: Arc<dyn ObjectStore>,
config: DBConfig,
) -> Result<Self, DBError> {
validate_field_name(config.name.as_str())?;
let storage = Storage::connect(
config.name.clone(),
object_store.clone(),
config.storage.clone(),
)
.await?;
match storage.fetch::<DBMetadata>(Self::METADATA_PATH).await {
Ok((metadata, _)) => {
let set_lock = match (&metadata.config.lock, config.lock) {
(None, Some(lock)) => Some(lock),
(Some(existing_lock), lock) => {
if lock.as_ref() != Some(existing_lock) {
return Err(DBError::Storage {
name: config.name.clone(),
source: "Database lock mismatch".into(),
});
}
None
}
_ => None,
};
let this = Self {
inner: Arc::new(InnerDB {
name: metadata.config.name.clone(),
object_store,
storage,
metadata: RwLock::new(metadata),
collections: RwLock::new(BTreeMap::new()),
read_only: AtomicBool::new(false),
dropping_collections: RwLock::new(BTreeSet::new()),
}),
};
if let Some(lock) = set_lock {
this.set_lock(lock).await?;
}
Ok(this)
}
Err(DBError::NotFound { .. }) => {
let metadata = DBMetadata {
config,
collections: BTreeSet::new(),
extensions: BTreeMap::new(),
};
match storage.create(Self::METADATA_PATH, &metadata).await {
Ok(_) => {
storage.store_metadata(0, unix_ms()).await?;
}
Err(err) => return Err(err),
}
Ok(Self {
inner: Arc::new(InnerDB {
name: metadata.config.name.clone(),
object_store,
storage,
metadata: RwLock::new(metadata),
collections: RwLock::new(BTreeMap::new()),
read_only: AtomicBool::new(false),
dropping_collections: RwLock::new(BTreeSet::new()),
}),
})
}
Err(err) => Err(err),
}
}
pub async fn open(
object_store: Arc<dyn ObjectStore>,
config: DBConfig,
) -> Result<Self, DBError> {
validate_field_name(config.name.as_str())?;
let storage = Storage::connect(
config.name.clone(),
object_store.clone(),
config.storage.clone(),
)
.await?;
match storage.fetch::<DBMetadata>(Self::METADATA_PATH).await {
Ok((metadata, _)) => {
let set_lock = match (&metadata.config.lock, config.lock) {
(None, Some(lock)) => Some(lock),
(Some(existing_lock), lock) => {
if lock.as_ref() != Some(existing_lock) {
return Err(DBError::Storage {
name: config.name.clone(),
source: "Database lock mismatch".into(),
});
}
None
}
_ => None,
};
let this = Self {
inner: Arc::new(InnerDB {
name: metadata.config.name.clone(),
object_store,
storage,
metadata: RwLock::new(metadata),
collections: RwLock::new(BTreeMap::new()),
read_only: AtomicBool::new(false),
dropping_collections: RwLock::new(BTreeSet::new()),
}),
};
if let Some(lock) = set_lock {
this.set_lock(lock).await?;
}
Ok(this)
}
Err(err) => Err(err),
}
}
pub fn name(&self) -> &str {
&self.inner.name
}
pub fn metadata(&self) -> DBMetadata {
self.inner.metadata.read().clone()
}
pub fn set_read_only(&self, read_only: bool) {
self.inner.read_only.store(read_only, Ordering::Release);
log::warn!(
action = "AndaDB::set_read_only",
database = self.inner.name;
"Database is set to read-only: {read_only}"
);
for collection in self.inner.collections.read().values() {
collection.set_read_only(read_only);
}
}
pub async fn close(&self) -> Result<(), DBError> {
self.set_read_only(true);
let collections = self
.inner
.collections
.read()
.values()
.cloned()
.collect::<Vec<_>>();
let _ = stream::iter(collections.into_iter())
.map(|collection| async move { collection.close().await })
.buffer_unordered(8) .collect::<Vec<_>>()
.await;
let start = Instant::now();
match self.flush_metadata(unix_ms()).await {
Ok(_) => {
let elapsed = start.elapsed();
log::warn!(
action = "AndaDB::close",
database = self.inner.name,
elapsed = elapsed.as_millis();
"Database closed successfully in {elapsed:?}",
);
}
Err(err) => {
let elapsed = start.elapsed();
log::error!(
action = "AndaDB::close",
database = self.inner.name,
elapsed = elapsed.as_millis();
"Failed to close database: {err:?}",
);
return Err(err);
}
}
Ok(())
}
pub async fn flush(&self) -> Result<(), DBError> {
let collections = self
.inner
.collections
.read()
.values()
.cloned()
.collect::<Vec<_>>();
let _ = stream::iter(collections.into_iter())
.map(|collection| async move { collection.flush(unix_ms()).await })
.buffer_unordered(8) .collect::<Vec<_>>()
.await;
self.flush_metadata(unix_ms()).await
}
pub async fn auto_flush(&self, cancel_token: CancellationToken, interval: Duration) {
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
let _ = self.close().await;
return;
}
_ = tokio::time::sleep(interval) => {}
};
let start = Instant::now();
match self.flush().await {
Ok(_) => {
let elapsed = start.elapsed();
log::warn!(
action = "AndaDB::auto_flush",
database = self.inner.name,
elapsed = elapsed.as_millis();
"Database flushed successfully in {elapsed:?}",
);
}
Err(err) => {
let elapsed = start.elapsed();
log::error!(
action = "AndaDB::auto_flush",
database = self.inner.name,
elapsed = elapsed.as_millis();
"Failed to flush database: {err:?}",
);
}
}
}
}
pub async fn create_collection<F>(
&self,
schema: Schema,
config: CollectionConfig,
f: F,
) -> Result<Arc<Collection>, DBError>
where
F: AsyncFnOnce(&mut Collection) -> Result<(), DBError>,
{
if self.inner.read_only.load(Ordering::Relaxed) {
return Err(DBError::Generic {
name: self.inner.name.clone(),
source: "database is read-only".into(),
});
}
{
if self.inner.collections.read().contains_key(&config.name) {
return Err(DBError::AlreadyExists {
name: config.name,
path: self.inner.name.clone(),
source: "collection already exists".into(),
_id: 0,
});
}
}
{
if self
.inner
.dropping_collections
.read()
.contains(&config.name)
{
return Err(DBError::AlreadyExists {
name: config.name,
path: self.inner.name.clone(),
source: "collection is being dropped".to_string().into(),
_id: 0,
});
}
}
let start = Instant::now();
let mut collection = Collection::create(self.clone(), schema, config).await?;
f(&mut collection).await?;
let collection = Arc::new(collection);
{
let mut collections = self.inner.collections.write();
collections.insert(collection.name().to_string(), collection.clone());
self.inner
.metadata
.write()
.collections
.insert(collection.name().to_string());
}
let now = unix_ms();
collection.flush(now).await?;
self.flush_metadata(now).await?;
let elapsed = start.elapsed();
log::warn!(
action = "AndaDB::create_collection",
database = self.inner.name,
collection = collection.name(),
elapsed = elapsed.as_millis();
"Create a collection successfully in {elapsed:?}",
);
Ok(collection)
}
pub async fn open_or_create_collection<F>(
&self,
schema: Schema,
config: CollectionConfig,
f: F,
) -> Result<Arc<Collection>, DBError>
where
F: AsyncFnOnce(&mut Collection) -> Result<(), DBError>,
{
if self.inner.read_only.load(Ordering::Relaxed) {
return Err(DBError::Generic {
name: self.inner.name.clone(),
source: "database is read-only".into(),
});
}
{
if let Some(collection) = self.inner.collections.read().get(&config.name) {
return Ok(collection.clone());
}
}
{
if self
.inner
.dropping_collections
.read()
.contains(&config.name)
{
return Err(DBError::AlreadyExists {
name: config.name,
path: self.inner.name.clone(),
source: "collection is being dropped".to_string().into(),
_id: 0,
});
}
}
{
if !self
.inner
.metadata
.read()
.collections
.contains(&config.name)
{
return self.create_collection(schema, config, f).await;
}
}
self.open_collection_with_schema(config.name, Some(schema), f)
.await
}
pub async fn open_collection<F>(&self, name: String, f: F) -> Result<Arc<Collection>, DBError>
where
F: AsyncFnOnce(&mut Collection) -> Result<(), DBError>,
{
self.open_collection_with_schema(name, None, f).await
}
async fn open_collection_with_schema<F>(
&self,
name: String,
schema: Option<Schema>,
f: F,
) -> Result<Arc<Collection>, DBError>
where
F: AsyncFnOnce(&mut Collection) -> Result<(), DBError>,
{
{
if let Some(collection) = self.inner.collections.read().get(&name) {
return Ok(collection.clone());
}
}
{
if self.inner.dropping_collections.read().contains(&name) {
return Err(DBError::AlreadyExists {
name,
path: self.inner.name.clone(),
source: "collection is being dropped".to_string().into(),
_id: 0,
});
}
}
{
if !self.inner.metadata.read().collections.contains(&name) {
return Err(DBError::NotFound {
name,
path: self.inner.name.clone(),
source: "collection not found".into(),
_id: 0,
});
}
}
let collection = Collection::open(self.clone(), name, schema, f).await?;
let collection = Arc::new(collection);
{
let mut collections = self.inner.collections.write();
collections.insert(collection.name().to_string(), collection.clone());
}
let now = unix_ms();
collection.flush(now).await?;
Ok(collection)
}
pub async fn delete_collection(&self, name: &str) -> Result<(), DBError> {
if self.inner.read_only.load(Ordering::Relaxed) {
return Err(DBError::Generic {
name: self.inner.name.clone(),
source: "database is read-only".into(),
});
}
{
if !self.inner.metadata.write().collections.remove(name) {
return Ok(());
}
self.inner
.dropping_collections
.write()
.insert(name.to_string());
}
self.flush_metadata(unix_ms()).await?;
if let Some(col) = { self.inner.collections.write().remove(name) } {
let _ = col.drop_data().await;
}
self.inner.dropping_collections.write().remove(name);
Ok(())
}
async fn set_lock(&self, lock: ByteBufB64) -> Result<(), DBError> {
{
self.inner.metadata.write().config.lock = Some(lock);
}
let metadata = self.metadata();
self.inner
.storage
.put(Self::METADATA_PATH, &metadata, None)
.await?;
Ok(())
}
pub async fn flush_metadata(&self, now_ms: u64) -> Result<(), DBError> {
let metadata = self.metadata();
self.inner
.storage
.put(Self::METADATA_PATH, &metadata, None)
.await?;
self.inner.storage.store_metadata(0, now_ms).await?;
Ok(())
}
pub fn get_extension(&self, key: &str) -> Option<FieldValue> {
self.inner.metadata.read().extensions.get(key).cloned()
}
pub fn get_extension_as<T>(&self, key: &str) -> Option<T>
where
T: DeserializeOwned,
{
self.get_extension(key)
.and_then(|v| v.clone().deserialized().ok())
}
pub fn set_extension(&self, key: String, value: FieldValue) {
self.inner.metadata.write().extensions.insert(key, value);
}
pub fn set_extension_from<T>(&self, key: String, value: T)
where
T: Serialize,
{
if let Ok(value) = FieldValue::serialized(&value, None) {
self.set_extension(key, value);
}
}
pub fn set_extension_with<F>(&self, key: String, f: F) -> Option<FieldValue>
where
F: FnOnce(Option<&FieldValue>) -> Option<FieldValue>,
{
let mut meta = self.inner.metadata.write();
let old_value = meta.extensions.get(&key);
let new_value = f(old_value);
if let Some(value) = new_value {
meta.extensions.insert(key, value)
} else {
None
}
}
pub fn set_extension_from_with<F, T>(&self, key: String, f: F) -> Option<T>
where
F: FnOnce(Option<T>) -> Option<T>,
T: Serialize + DeserializeOwned,
{
let mut meta = self.inner.metadata.write();
let old_value = meta.extensions.get(&key);
let new_value = f(old_value.and_then(|v| v.clone().deserialized().ok()));
if let Some(value) = new_value
&& let Ok(value) = FieldValue::serialized(&value, None)
{
let old = meta.extensions.insert(key, value);
return old.and_then(|v| v.deserialized().ok());
}
None
}
pub async fn save_extension(&self, key: String, value: FieldValue) -> Result<(), DBError> {
{
self.inner.metadata.write().extensions.insert(key, value);
}
self.flush_metadata(unix_ms()).await
}
pub async fn save_extension_from<T>(&self, key: String, value: &T) -> Result<(), DBError>
where
T: Serialize,
{
let field_value = FieldValue::serialized(value, None)?;
self.save_extension(key, field_value).await
}
pub async fn remove_extension(&self, key: &str) -> Result<Option<FieldValue>, DBError> {
let old = { self.inner.metadata.write().extensions.remove(key) };
if old.is_some() {
self.flush_metadata(unix_ms()).await?;
}
Ok(old)
}
pub fn extensions_with<F, R>(&self, f: F) -> R
where
F: FnOnce(&BTreeMap<String, FieldValue>) -> R,
{
f(&self.inner.metadata.read().extensions)
}
pub fn object_store(&self) -> Arc<dyn ObjectStore> {
self.inner.object_store.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::{ByteBufB64, Fe, FieldValue, Ft, Schema};
use object_store::memory::InMemory;
#[tokio::test]
async fn test_database_creation() {
let object_store = Arc::new(InMemory::new());
let config = DBConfig::default();
let db = AndaDB::create(object_store, config).await.unwrap();
assert_eq!(db.name(), "anda_db");
assert!(db.metadata().collections.is_empty());
}
#[tokio::test]
async fn test_database_connection() {
let object_store = Arc::new(InMemory::new());
let config = DBConfig {
name: "test_db".to_string(),
description: "Test Database".to_string(),
storage: StorageConfig::default(),
lock: None,
};
{
let _db = AndaDB::create(object_store.clone(), config.clone())
.await
.unwrap();
}
let db = AndaDB::connect(object_store, config).await.unwrap();
assert_eq!(db.name(), "test_db");
}
#[tokio::test]
async fn test_database_open() {
let object_store = Arc::new(InMemory::new());
let config = DBConfig {
name: "test_open_db".to_string(),
description: "Test Open Database".to_string(),
storage: StorageConfig::default(),
lock: None,
};
let err = AndaDB::open(object_store.clone(), config.clone())
.await
.unwrap_err();
match err {
DBError::NotFound { .. } => {}
_ => panic!("Expected NotFound when opening a non-existent database"),
}
let _db = AndaDB::create(object_store.clone(), config.clone())
.await
.unwrap();
let db = AndaDB::open(object_store, config).await.unwrap();
assert_eq!(db.name(), "test_open_db");
}
#[tokio::test]
async fn test_database_open_lock_mismatch() {
let object_store = Arc::new(InMemory::new());
let create_config = DBConfig {
name: "test_open_lock_db".to_string(),
description: "Test Open Lock Database".to_string(),
storage: StorageConfig::default(),
lock: Some(ByteBufB64(vec![1, 2, 3])),
};
let _db = AndaDB::create(object_store.clone(), create_config)
.await
.unwrap();
let open_config = DBConfig {
name: "test_open_lock_db".to_string(),
description: "Test Open Lock Database".to_string(),
storage: StorageConfig::default(),
lock: Some(ByteBufB64(vec![9, 9, 9])),
};
let err = AndaDB::open(object_store, open_config).await.unwrap_err();
match err {
DBError::Storage { .. } => {}
_ => panic!("Expected Storage error for lock mismatch"),
}
}
#[tokio::test]
async fn test_database_open_with_matching_lock() {
let object_store = Arc::new(InMemory::new());
let lock = ByteBufB64(vec![7, 8, 9]);
let create_config = DBConfig {
name: "test_open_match_lock_db".to_string(),
description: "Test Open Match Lock Database".to_string(),
storage: StorageConfig::default(),
lock: Some(lock.clone()),
};
let _db = AndaDB::create(object_store.clone(), create_config)
.await
.unwrap();
let open_config = DBConfig {
name: "test_open_match_lock_db".to_string(),
description: "Test Open Match Lock Database".to_string(),
storage: StorageConfig::default(),
lock: Some(lock),
};
let db = AndaDB::open(object_store, open_config).await.unwrap();
assert_eq!(db.name(), "test_open_match_lock_db");
}
#[tokio::test]
async fn test_create_collection() {
let object_store = Arc::new(InMemory::new());
let config = DBConfig::default();
let db = AndaDB::create(object_store, config).await.unwrap();
let mut schema = Schema::builder();
schema
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
let schema = schema.build().unwrap();
let collection_config = CollectionConfig {
name: "test_collection".to_string(),
description: "Test Collection".to_string(),
};
let collection = db
.create_collection(schema.clone(), collection_config.clone(), async |_| Ok(()))
.await
.unwrap();
assert_eq!(collection.name(), "test_collection");
assert!(db.metadata().collections.contains("test_collection"));
}
#[tokio::test]
async fn test_open_collection() {
let object_store = Arc::new(InMemory::new());
let config = DBConfig::default();
let db = AndaDB::create(object_store, config).await.unwrap();
let mut schema = Schema::builder();
schema
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
let schema = schema.build().unwrap();
let collection_config = CollectionConfig {
name: "test_collection".to_string(),
description: "Test Collection".to_string(),
};
db.create_collection(schema.clone(), collection_config.clone(), async |_| Ok(()))
.await
.unwrap();
let collection = db
.open_collection("test_collection".to_string(), async |_| Ok(()))
.await
.unwrap();
assert_eq!(collection.name(), "test_collection");
}
#[tokio::test]
async fn test_open_or_create_collection() {
let object_store = Arc::new(InMemory::new());
let config = DBConfig::default();
let db = AndaDB::create(object_store, config).await.unwrap();
let mut schema = Schema::builder();
schema
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
let schema = schema.build().unwrap();
let collection_config = CollectionConfig {
name: "test_collection".to_string(),
description: "Test Collection".to_string(),
};
let collection1 = db
.open_or_create_collection(schema.clone(), collection_config.clone(), async |_| Ok(()))
.await
.unwrap();
assert_eq!(collection1.name(), "test_collection");
let collection2 = db
.open_or_create_collection(schema.clone(), collection_config.clone(), async |_| Ok(()))
.await
.unwrap();
assert_eq!(collection2.name(), "test_collection");
}
#[tokio::test]
async fn test_read_only_mode() {
let object_store = Arc::new(InMemory::new());
let config = DBConfig::default();
let db = AndaDB::create(object_store, config).await.unwrap();
let mut schema = Schema::builder();
schema
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
let schema = schema.build().unwrap();
let collection_config = CollectionConfig {
name: "test_collection".to_string(),
description: "Test Collection".to_string(),
};
let _collection = db
.create_collection(schema.clone(), collection_config.clone(), async |_| Ok(()))
.await
.unwrap();
db.set_read_only(true);
let collection_config2 = CollectionConfig {
name: "test_collection2".to_string(),
description: "Test Collection 2".to_string(),
};
let result = db
.create_collection(schema, collection_config2, async |_| Ok(()))
.await;
assert!(result.is_err());
match result {
Err(DBError::Generic { .. }) => (),
_ => panic!("Expected Generic error due to read-only mode"),
}
}
#[tokio::test]
async fn test_database_close() {
let object_store = Arc::new(InMemory::new());
let config = DBConfig::default();
let db = AndaDB::create(object_store, config).await.unwrap();
let mut schema = Schema::builder();
schema
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
let schema = schema.build().unwrap();
let collection_config = CollectionConfig {
name: "test_collection".to_string(),
description: "Test Collection".to_string(),
};
db.create_collection(schema, collection_config, async |_| Ok(()))
.await
.unwrap();
db.close().await.unwrap();
assert!(db.inner.read_only.load(Ordering::Relaxed));
}
#[tokio::test]
async fn test_delete_collection() {
let object_store = Arc::new(InMemory::new());
let config = DBConfig::default();
let db = AndaDB::create(object_store, config).await.unwrap();
let mut schema_builder = Schema::builder();
schema_builder
.add_field(Fe::new("name".to_string(), Ft::Text).unwrap())
.unwrap();
let schema = schema_builder.build().unwrap();
let collection_config = CollectionConfig {
name: "test_collection".to_string(),
description: "Test Collection".to_string(),
};
db.create_collection(schema.clone(), collection_config.clone(), async |_| Ok(()))
.await
.unwrap();
assert!(db.metadata().collections.contains("test_collection"));
db.delete_collection("test_collection").await.unwrap();
assert!(!db.metadata().collections.contains("test_collection"));
let res = db
.open_collection("test_collection".to_string(), async |_| Ok(()))
.await;
match res {
Err(DBError::NotFound { .. }) => {}
_ => panic!("expected NotFound after delete_collection"),
}
db.create_collection(schema, collection_config, async |_| Ok(()))
.await
.unwrap();
assert!(db.metadata().collections.contains("test_collection"));
}
#[tokio::test]
async fn test_db_extension_get_set_remove() {
let object_store = Arc::new(InMemory::new());
let config = DBConfig::default();
let db = AndaDB::create(object_store, config).await.unwrap();
assert!(db.get_extension("key1").is_none());
assert!(db.metadata().extensions.is_empty());
db.set_extension("key1".into(), FieldValue::Text("hello".into()));
assert_eq!(
db.get_extension("key1"),
Some(FieldValue::Text("hello".into()))
);
db.set_extension("count".into(), FieldValue::U64(42));
db.set_extension("flag".into(), FieldValue::Bool(true));
assert_eq!(db.get_extension("count"), Some(FieldValue::U64(42)));
assert_eq!(db.get_extension("flag"), Some(FieldValue::Bool(true)));
db.set_extension("key1".into(), FieldValue::I64(-1));
assert_eq!(db.get_extension("key1"), Some(FieldValue::I64(-1)));
let meta = db.metadata();
assert_eq!(meta.extensions.len(), 3);
assert_eq!(meta.extensions.get("key1"), Some(&FieldValue::I64(-1)));
let old = db.remove_extension("count").await.unwrap();
assert_eq!(old, Some(FieldValue::U64(42)));
assert!(db.get_extension("count").is_none());
let old = db.remove_extension("nonexistent").await.unwrap();
assert!(old.is_none());
db.close().await.unwrap();
}
#[tokio::test]
async fn test_db_extension_save_and_persist() {
let object_store = Arc::new(InMemory::new());
let config = DBConfig::default();
{
let db = AndaDB::create(object_store.clone(), config.clone())
.await
.unwrap();
db.save_extension("persist_key".into(), FieldValue::Text("persisted".into()))
.await
.unwrap();
assert_eq!(
db.get_extension("persist_key"),
Some(FieldValue::Text("persisted".into()))
);
}
let db = AndaDB::connect(object_store, config).await.unwrap();
assert_eq!(
db.get_extension("persist_key"),
Some(FieldValue::Text("persisted".into()))
);
db.close().await.unwrap();
}
#[tokio::test]
async fn test_db_extension_flush_persist() {
let object_store = Arc::new(InMemory::new());
let config = DBConfig::default();
{
let db = AndaDB::create(object_store.clone(), config.clone())
.await
.unwrap();
db.set_extension("lazy_key".into(), FieldValue::Bytes(vec![1, 2, 3]));
db.flush().await.unwrap();
}
let db = AndaDB::connect(object_store, config).await.unwrap();
assert_eq!(
db.get_extension("lazy_key"),
Some(FieldValue::Bytes(vec![1, 2, 3]))
);
db.close().await.unwrap();
}
#[tokio::test]
async fn test_db_set_extension_with() {
let object_store = Arc::new(InMemory::new());
let config = DBConfig::default();
let db = AndaDB::create(object_store, config).await.unwrap();
let key = "test_key".to_string();
let old = db.set_extension_with(key.clone(), |val| {
assert!(val.is_none());
Some(FieldValue::U64(100))
});
assert!(old.is_none());
assert_eq!(db.get_extension(&key), Some(FieldValue::U64(100)));
let old = db.set_extension_with(key.clone(), |val| {
if let Some(FieldValue::U64(v)) = val {
return Some(FieldValue::U64(v + 100));
}
None
});
assert_eq!(old, Some(FieldValue::U64(100)));
assert_eq!(db.get_extension(&key), Some(FieldValue::U64(200)));
let old = db.set_extension_with(key.clone(), |_| None);
assert!(old.is_none());
assert_eq!(db.get_extension(&key), Some(FieldValue::U64(200)));
db.close().await.unwrap();
}
}