use ahash::RandomState;
use bytes::Bytes;
use scc::HashMap;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::constants::Operation;
use crate::core::record::Record;
use crate::error::{FeoxError, Result};
use crate::storage::write_buffer::WriteBuffer;
use super::FeoxStore;
impl FeoxStore {
pub(super) fn update_record_with_ttl(
&self,
old_record: &Record,
value: &[u8],
timestamp: u64,
ttl_expiry: u64,
) -> Result<bool> {
let new_record = if ttl_expiry > 0 && self.enable_ttl {
Arc::new(Record::new_with_timestamp_ttl(
old_record.key.clone(),
value.to_vec(),
timestamp,
ttl_expiry,
))
} else {
Arc::new(Record::new(
old_record.key.clone(),
value.to_vec(),
timestamp,
))
};
let old_value_len = old_record.value_len;
let old_size = old_record.calculate_size();
let new_size = self.calculate_record_size(old_record.key.len(), value.len());
let old_record_arc =
if let Some(entry) = self.hash_table.read(&old_record.key, |_, v| v.clone()) {
entry
} else {
return Err(FeoxError::KeyNotFound);
};
let key_vec = new_record.key.clone();
self.hash_table
.upsert(key_vec.clone(), Arc::clone(&new_record));
self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
if new_size > old_size {
self.stats
.memory_usage
.fetch_add(new_size - old_size, Ordering::AcqRel);
} else {
self.stats
.memory_usage
.fetch_sub(old_size - new_size, Ordering::AcqRel);
}
if !self.memory_only {
if self.enable_caching {
if let Some(ref cache) = self.cache {
cache.remove(&key_vec);
}
}
if let Some(ref wb) = self.write_buffer {
if let Err(e) =
wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len)
{
let _ = e;
}
if let Err(e) = wb.add_write(Operation::Delete, old_record_arc, old_value_len) {
let _ = e;
}
}
}
Ok(false)
}
pub(super) fn update_record_with_ttl_bytes(
&self,
old_record: &Record,
value: Bytes,
timestamp: u64,
ttl_expiry: u64,
) -> Result<bool> {
let new_record = if ttl_expiry > 0 && self.enable_ttl {
Arc::new(Record::new_from_bytes_with_ttl(
old_record.key.clone(),
value,
timestamp,
ttl_expiry,
))
} else {
Arc::new(Record::new_from_bytes(
old_record.key.clone(),
value,
timestamp,
))
};
let old_value_len = old_record.value_len;
let old_size = old_record.calculate_size();
let new_size = new_record.calculate_size();
let old_record_arc =
if let Some(entry) = self.hash_table.read(&old_record.key, |_, v| v.clone()) {
entry
} else {
return Err(FeoxError::KeyNotFound);
};
let key_vec = new_record.key.clone();
self.hash_table
.upsert(key_vec.clone(), Arc::clone(&new_record));
self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
if new_size > old_size {
self.stats
.memory_usage
.fetch_add(new_size - old_size, Ordering::AcqRel);
} else {
self.stats
.memory_usage
.fetch_sub(old_size - new_size, Ordering::AcqRel);
}
if !self.memory_only {
if self.enable_caching {
if let Some(ref cache) = self.cache {
cache.remove(&key_vec);
}
}
if let Some(ref wb) = self.write_buffer {
if let Err(e) =
wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len)
{
let _ = e;
}
if let Err(e) = wb.add_write(Operation::Delete, old_record_arc, old_value_len) {
let _ = e;
}
}
}
Ok(false)
}
pub(crate) fn get_hash_table(&self) -> &HashMap<Vec<u8>, Arc<Record>, RandomState> {
&self.hash_table
}
pub(crate) fn remove_from_tree(&self, key: &[u8]) {
self.tree.remove(key);
}
pub(crate) fn get_write_buffer(&self) -> Option<&Arc<WriteBuffer>> {
self.write_buffer.as_ref()
}
}