use std::collections::HashMap;
use std::sync::Mutex;
use crate::face::{
FaceError, FaceWatchEvent, FaceWatchEventKind, FaceWatchStream, ResourceFormat, ResourceRef,
};
use crate::format::AdapterRegistry;
pub struct InMemoryStore {
face_name: String,
store: Mutex<HashMap<ResourceRef, Vec<u8>>>,
subscribers: Mutex<Vec<WatchSubscriber>>,
adapters: AdapterRegistry,
}
struct WatchSubscriber {
sender: Option<std::sync::mpsc::Sender<FaceWatchEvent>>,
kind_filter: String,
namespace_filter: Option<String>,
}
impl InMemoryStore {
#[must_use]
pub fn new(face_name: impl Into<String>) -> Self {
Self {
face_name: face_name.into(),
store: Mutex::new(HashMap::new()),
subscribers: Mutex::new(Vec::new()),
adapters: AdapterRegistry::default(),
}
}
pub fn set_adapters(&mut self, adapters: AdapterRegistry) {
self.adapters = adapters;
}
#[must_use]
pub fn adapters(&self) -> &AdapterRegistry {
&self.adapters
}
fn select_adapter(
&self,
format: ResourceFormat,
verb: &'static str,
) -> Result<std::sync::Arc<dyn crate::format::FormatAdapter>, FaceError> {
self.adapters
.select(format)
.map_err(|e| FaceError::Unsupported(format!("{verb}: {e}")))
}
fn broadcast(&self, reference: &ResourceRef, event_kind: FaceWatchEventKind, body: Vec<u8>) {
let mut subs = self.subscribers.lock().expect("subscribers mutex poisoned");
subs.retain_mut(|sub| {
if sub.kind_filter != reference.kind {
return true;
}
if let Some(ns) = &sub.namespace_filter
&& reference.namespace.as_deref() != Some(ns.as_str())
{
return true;
}
let Some(sender) = &sub.sender else { return false };
let event = FaceWatchEvent {
kind: event_kind,
body: body.clone(),
};
sender.send(event).is_ok()
});
}
pub fn apply(&self, format: ResourceFormat, body: &[u8]) -> Result<(), FaceError> {
let adapter = self.select_adapter(format, "apply_resource")?;
let reference = adapter
.extract_ref(format, body)
.map_err(|e| FaceError::Unsupported(format!("apply_resource: {e}")))?;
let envelope = adapter
.to_native(format, body)
.map_err(|e| FaceError::Unsupported(format!("apply_resource: {e}")))?;
let mut store = self.store.lock().expect("store mutex poisoned");
let event_kind = if store.contains_key(&reference) {
FaceWatchEventKind::Modified
} else {
FaceWatchEventKind::Added
};
store.insert(reference.clone(), envelope.clone());
drop(store);
self.broadcast(&reference, event_kind, envelope);
Ok(())
}
pub fn get(
&self,
reference: &ResourceRef,
format: ResourceFormat,
) -> Result<Vec<u8>, FaceError> {
let adapter = self.select_adapter(format, "get_resource")?;
let store = self.store.lock().expect("store mutex poisoned");
let envelope = store.get(reference).cloned().ok_or_else(|| {
FaceError::Unsupported(format!(
"get_resource on {}: no resource at {reference:?}",
self.face_name
))
})?;
drop(store);
adapter
.from_native(format, &envelope)
.map_err(|e| FaceError::Unsupported(format!("get_resource: {e}")))
}
pub fn list(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Vec<Vec<u8>>, FaceError> {
let adapter = self.select_adapter(format, "list_resources")?;
let store = self.store.lock().expect("store mutex poisoned");
let envelopes: Vec<Vec<u8>> = store
.iter()
.filter(|(r, _)| r.kind == kind)
.filter(|(r, _)| match namespace {
Some(ns) => r.namespace.as_deref() == Some(ns),
None => true,
})
.map(|(_, bytes)| bytes.clone())
.collect();
drop(store);
let mut out = Vec::with_capacity(envelopes.len());
for env in envelopes {
out.push(
adapter
.from_native(format, &env)
.map_err(|e| FaceError::Unsupported(format!("list_resources: {e}")))?,
);
}
Ok(out)
}
pub fn delete(&self, reference: &ResourceRef) -> Result<(), FaceError> {
let mut store = self.store.lock().expect("store mutex poisoned");
let body = store.remove(reference).ok_or_else(|| {
FaceError::Unsupported(format!(
"delete_resource on {}: no resource at {reference:?}",
self.face_name
))
})?;
drop(store);
self.broadcast(reference, FaceWatchEventKind::Deleted, body);
Ok(())
}
pub fn snapshot(&self) -> Result<Vec<u8>, FaceError> {
let store = self.store.lock().expect("store mutex poisoned");
let mut entries: Vec<(ResourceRef, Vec<u8>)> = store
.iter()
.map(|(r, body)| (r.clone(), body.clone()))
.collect();
drop(store);
entries.sort_by(|a, b| {
a.0.kind
.cmp(&b.0.kind)
.then(a.0.namespace.cmp(&b.0.namespace))
.then(a.0.name.cmp(&b.0.name))
});
let mut out = Vec::new();
ciborium::into_writer(&entries, &mut out).map_err(|e| {
FaceError::Unsupported(format!("snapshot: cbor encode failed: {e}"))
})?;
Ok(out)
}
pub fn restore(&self, snapshot_bytes: &[u8]) -> Result<(), FaceError> {
let entries: Vec<(ResourceRef, Vec<u8>)> =
ciborium::from_reader(snapshot_bytes).map_err(|e| {
FaceError::Unsupported(format!("restore: cbor decode failed: {e}"))
})?;
let mut store = self.store.lock().expect("store mutex poisoned");
store.clear();
for (r, body) in entries {
store.insert(r, body);
}
Ok(())
}
#[must_use]
pub fn len(&self) -> usize {
self.store.lock().map(|s| s.len()).unwrap_or(0)
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[must_use]
pub fn subscriber_count(&self) -> usize {
self.subscribers.lock().map(|s| s.len()).unwrap_or(0)
}
pub fn watch(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Box<dyn FaceWatchStream>, FaceError> {
let _ = self.select_adapter(format, "watch_resources")?;
let (tx, rx) = std::sync::mpsc::channel();
let store = self.store.lock().expect("store mutex poisoned");
for (r, body) in store.iter() {
if r.kind != kind {
continue;
}
if let Some(ns) = namespace
&& r.namespace.as_deref() != Some(ns)
{
continue;
}
let _ = tx.send(FaceWatchEvent {
kind: FaceWatchEventKind::Added,
body: body.clone(),
});
}
drop(store);
let mut subs = self.subscribers.lock().expect("subscribers mutex poisoned");
subs.push(WatchSubscriber {
sender: Some(tx),
kind_filter: kind.to_string(),
namespace_filter: namespace.map(str::to_string),
});
Ok(Box::new(MpscWatchStream { rx }))
}
}
impl std::fmt::Debug for InMemoryStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InMemoryStore")
.field("face_name", &self.face_name)
.field(
"store_len",
&self.store.lock().map(|s| s.len()).unwrap_or(0),
)
.field(
"subscriber_count",
&self.subscribers.lock().map(|s| s.len()).unwrap_or(0),
)
.finish()
}
}
struct MpscWatchStream {
rx: std::sync::mpsc::Receiver<FaceWatchEvent>,
}
impl FaceWatchStream for MpscWatchStream {
fn next_event(&mut self) -> Result<Option<FaceWatchEvent>, FaceError> {
match self.rx.recv() {
Ok(event) => Ok(Some(event)),
Err(_) => Ok(None),
}
}
}