use async_std::sync::RwLock;
use async_trait::async_trait;
use log::{debug, trace};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use zenoh::prelude::r#async::*;
use zenoh::time::Timestamp;
use zenoh_backend_traits::config::{StorageConfig, VolumeConfig};
use zenoh_backend_traits::*;
use zenoh_collections::{Timed, TimedEvent, TimedHandle, Timer};
use zenoh_core::Result as ZResult;
pub fn create_memory_backend(config: VolumeConfig) -> ZResult<Box<dyn Volume>> {
Ok(Box::new(MemoryBackend { config }))
}
pub struct MemoryBackend {
config: VolumeConfig,
}
#[async_trait]
impl Volume for MemoryBackend {
fn get_admin_status(&self) -> serde_json::Value {
self.config.to_json_value()
}
async fn create_storage(&mut self, properties: StorageConfig) -> ZResult<Box<dyn Storage>> {
debug!("Create Memory Storage with configuration: {:?}", properties);
Ok(Box::new(MemoryStorage::new(properties).await?))
}
fn incoming_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>> {
None
}
fn outgoing_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>> {
None
}
}
impl Drop for MemoryBackend {
fn drop(&mut self) {
trace!("MemoryBackend::drop()");
}
}
#[allow(clippy::large_enum_variant)]
enum StoredValue {
Present {
ts: Timestamp,
sample: Sample,
},
Removed {
ts: Timestamp,
cleanup_handle: TimedHandle,
},
}
impl StoredValue {
fn ts(&self) -> &Timestamp {
match self {
Present { ts, sample: _ } => ts,
Removed {
ts,
cleanup_handle: _,
} => ts,
}
}
}
use StoredValue::{Present, Removed};
struct MemoryStorage {
config: StorageConfig,
map: Arc<RwLock<HashMap<OwnedKeyExpr, StoredValue>>>,
timer: Timer,
}
impl MemoryStorage {
async fn new(properties: StorageConfig) -> ZResult<MemoryStorage> {
Ok(MemoryStorage {
config: properties,
map: Arc::new(RwLock::new(HashMap::new())),
timer: Timer::new(false),
})
}
}
impl MemoryStorage {
async fn schedule_cleanup(&self, key: OwnedKeyExpr) -> TimedHandle {
let event = TimedEvent::once(
Instant::now() + Duration::from_millis(CLEANUP_TIMEOUT_MS),
TimedCleanup {
map: self.map.clone(),
key,
},
);
let handle = event.get_handle();
self.timer.add_async(event).await;
handle
}
}
#[async_trait]
impl Storage for MemoryStorage {
fn get_admin_status(&self) -> serde_json::Value {
self.config.to_json_value()
}
async fn on_sample(&mut self, mut sample: Sample) -> ZResult<StorageInsertionResult> {
trace!("on_sample for {}", sample.key_expr);
sample.ensure_timestamp();
let timestamp = sample.timestamp.unwrap();
match sample.kind {
SampleKind::Put => match self.map.write().await.entry(sample.key_expr.clone().into()) {
Entry::Vacant(v) => {
v.insert(Present {
sample,
ts: timestamp,
});
return Ok(StorageInsertionResult::Inserted);
}
Entry::Occupied(mut o) => {
let old_val = o.get();
if old_val.ts() < ×tamp {
if let Removed {
ts: _,
cleanup_handle,
} = old_val
{
cleanup_handle.clone().defuse();
}
o.insert(Present {
sample,
ts: timestamp,
});
return Ok(StorageInsertionResult::Replaced);
} else {
debug!("PUT on {} dropped: out-of-date", sample.key_expr);
return Ok(StorageInsertionResult::Outdated);
}
}
},
SampleKind::Delete => {
match self.map.write().await.entry(sample.key_expr.clone().into()) {
Entry::Vacant(v) => {
let cleanup_handle = self.schedule_cleanup(sample.key_expr.into()).await;
v.insert(Removed {
ts: timestamp,
cleanup_handle,
});
return Ok(StorageInsertionResult::Deleted);
}
Entry::Occupied(mut o) => match o.get() {
Removed {
ts,
cleanup_handle: _,
} => {
if ts < ×tamp {
let cleanup_handle =
self.schedule_cleanup(sample.key_expr.into()).await;
o.insert(Removed {
ts: timestamp,
cleanup_handle,
});
return Ok(StorageInsertionResult::Deleted);
} else {
debug!("DEL on {} dropped: out-of-date", sample.key_expr);
return Ok(StorageInsertionResult::Outdated);
}
}
Present { sample: _, ts } => {
if ts < ×tamp {
let cleanup_handle =
self.schedule_cleanup(sample.key_expr.into()).await;
o.insert(Removed {
ts: timestamp,
cleanup_handle,
});
return Ok(StorageInsertionResult::Deleted);
} else {
debug!("DEL on {} dropped: out-of-date", sample.key_expr);
return Ok(StorageInsertionResult::Outdated);
}
}
},
}
}
}
}
async fn on_query(&mut self, query: Query) -> ZResult<()> {
trace!("on_query for {}", query.key_expr());
if !query.key_expr().is_wild() {
if let Some(Present { sample, ts: _ }) =
self.map.read().await.get(query.key_expr().as_keyexpr())
{
query.reply(sample.clone()).res().await?;
}
} else {
for (_, stored_value) in self.map.read().await.iter() {
if let Present { sample, ts: _ } = stored_value {
if query.key_expr().intersects(&sample.key_expr) {
let s: Sample = sample.clone();
query.reply(s).res().await?;
}
}
}
}
Ok(())
}
async fn get_all_entries(&self) -> ZResult<Vec<(OwnedKeyExpr, Timestamp)>> {
let map = self.map.read().await;
let mut result = Vec::with_capacity(map.len());
for (k, v) in map.iter() {
result.push((k.clone(), *v.ts()));
}
Ok(result)
}
}
impl Drop for MemoryStorage {
fn drop(&mut self) {
trace!("MemoryStorage::drop()");
}
}
const CLEANUP_TIMEOUT_MS: u64 = 5000;
struct TimedCleanup {
map: Arc<RwLock<HashMap<OwnedKeyExpr, StoredValue>>>,
key: OwnedKeyExpr,
}
#[async_trait]
impl Timed for TimedCleanup {
async fn run(&mut self) {
self.map.write().await.remove(&self.key);
}
}