use super::{Namespace, Result};
use std::sync::Arc;
impl Namespace {
pub async fn get(&self, key: &str) -> Option<Arc<Vec<u8>>> {
self.get_bytes(key.as_bytes()).await
}
pub async fn get_bytes(&self, key: &[u8]) -> Option<Arc<Vec<u8>>> {
let (_idx, shard) = self.get_shard(key).await.ok()?;
let data = match self.timeout_read_lock(&shard, "get_bytes").await {
Ok(guard) => guard,
Err(_) => return None,
};
data.get_value(key)
}
pub async fn set(&self, key: &str, value: Vec<u8>) -> Result<Option<Arc<Vec<u8>>>> {
self.set_bytes(key.as_bytes(), value).await
}
pub async fn set_bytes(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Arc<Vec<u8>>>> {
if value.len() > self.config.max_value_size() {
return Err(crate::RkvsError::Storage(format!(
"Value size {} exceeds maximum allowed size {}",
value.len(),
self.config.max_value_size()
)));
}
let max_keys = self.config.max_keys();
let (_idx, shard) = self.get_shard(key).await?;
let mut data = self.timeout_write_lock(&shard, "set_bytes").await?;
if self.metadata.key_count() >= max_keys {
if !data.has_key(key) {
return Err(crate::RkvsError::Storage(format!(
"Maximum number of keys ({}) reached for this namespace",
max_keys
)));
}
}
let old_value = data.set_value(key.to_vec(), value.clone());
if let Some(old) = old_value {
self.metadata
.update_total_size((value.len() as isize) - (old.len() as isize));
return Ok(Some(old));
} else {
self.metadata.increment_key_count();
self.metadata.update_total_size(value.len() as isize);
return Ok(None);
}
}
pub async fn update(&self, key: &str, value: Vec<u8>) -> Result<Arc<Vec<u8>>> {
self.update_bytes(key.as_bytes(), value).await
}
pub async fn update_bytes(&self, key: &[u8], value: Vec<u8>) -> Result<Arc<Vec<u8>>> {
if value.len() > self.config.max_value_size() {
return Err(crate::RkvsError::Storage(format!(
"Value size {} exceeds maximum allowed size {}",
value.len(),
self.config.max_value_size()
)));
}
let (_idx, shard) = self.get_shard(key).await?;
let mut data = self.timeout_write_lock(&shard, "update_bytes").await?;
if !data.has_key(key) {
return Err(crate::RkvsError::Storage(format!(
"Key not found for update: '{}'",
String::from_utf8_lossy(key)
)));
}
if let Some(old_value) = data.set_value(key.to_vec(), value.clone()) {
self.metadata
.update_total_size(value.len() as isize - old_value.len() as isize);
Ok(old_value)
} else {
Err(crate::RkvsError::Internal(
"Failed to update a key that should exist.".to_string(),
))
}
}
pub async fn delete(&self, key: &str) -> bool {
self.delete_bytes(key.as_bytes()).await
}
pub async fn delete_bytes(&self, key: &[u8]) -> bool {
self.consume_bytes(key).await.is_some()
}
pub async fn consume(&self, key: &str) -> Option<Arc<Vec<u8>>> {
self.consume_bytes(key.as_bytes()).await
}
pub async fn consume_bytes(&self, key: &[u8]) -> Option<Arc<Vec<u8>>> {
let (_idx, shard) = self.get_shard(key).await.ok()?;
let mut data = match self.timeout_write_lock(&shard, "consume_bytes").await {
Ok(guard) => guard,
Err(_) => return None,
};
if let Some(value) = data.delete_value(key) {
self.metadata.decrement_key_count();
self.metadata.update_total_size(-(value.len() as isize));
Some(value)
} else {
None
}
}
pub async fn exists(&self, key: &str) -> bool {
self.exists_bytes(key.as_bytes()).await
}
pub async fn exists_bytes(&self, key: &[u8]) -> bool {
let (_idx, shard) = match self.get_shard(key).await {
Ok(s) => s,
Err(_) => return false,
};
let data = match self.timeout_read_lock(&shard, "exists_bytes").await {
Ok(guard) => guard,
Err(_) => return false,
};
data.has_key(key)
}
}