use async_trait::async_trait;
use serde_json::Value;
use super::storage::StorageError;
use crate::contract::scope::{ScopeId, scoped_key};
use std::sync::Arc;
#[async_trait]
pub trait ConfigStore: Send + Sync {
async fn get(&self, namespace: &str, id: &str) -> Result<Option<Value>, StorageError>;
async fn list(
&self,
namespace: &str,
offset: usize,
limit: usize,
) -> Result<Vec<(String, Value)>, StorageError>;
async fn put(&self, namespace: &str, id: &str, value: &Value) -> Result<(), StorageError>;
async fn delete(&self, namespace: &str, id: &str) -> Result<(), StorageError>;
async fn put_if_absent(
&self,
namespace: &str,
id: &str,
value: &Value,
) -> Result<(), StorageError> {
if self.exists(namespace, id).await? {
return Err(StorageError::AlreadyExists(format!("{namespace}/{id}")));
}
self.put(namespace, id, value).await
}
async fn exists(&self, namespace: &str, id: &str) -> Result<bool, StorageError> {
Ok(self.get(namespace, id).await?.is_some())
}
async fn put_if_revision(
&self,
namespace: &str,
id: &str,
value: &Value,
expected_revision: u64,
) -> Result<(), StorageError> {
let actual = self
.get(namespace, id)
.await?
.as_ref()
.and_then(extract_meta_revision)
.unwrap_or(0);
if actual != expected_revision {
return Err(StorageError::VersionConflict {
expected: expected_revision,
actual,
});
}
self.put(namespace, id, value).await
}
async fn delete_if_revision(
&self,
namespace: &str,
id: &str,
expected_revision: u64,
) -> Result<(), StorageError> {
let actual = self
.get(namespace, id)
.await?
.as_ref()
.and_then(extract_meta_revision)
.unwrap_or(0);
if actual != expected_revision {
return Err(StorageError::VersionConflict {
expected: expected_revision,
actual,
});
}
self.delete(namespace, id).await
}
}
pub fn extract_meta_revision(value: &Value) -> Option<u64> {
value
.get("meta")
.and_then(|m| m.get("revision"))
.and_then(|r| r.as_u64())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ConfigChangeKind {
Put,
Delete,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct ConfigChangeEvent {
pub namespace: String,
pub id: String,
pub kind: ConfigChangeKind,
}
#[async_trait]
pub trait ConfigChangeSubscriber: Send {
async fn next(&mut self) -> Result<ConfigChangeEvent, StorageError>;
}
#[async_trait]
pub trait ConfigChangeNotifier: Send + Sync {
async fn subscribe(&self) -> Result<Box<dyn ConfigChangeSubscriber>, StorageError>;
}
#[derive(Clone)]
pub struct ScopedConfigStore {
inner: Arc<dyn ConfigStore>,
scope_id: ScopeId,
}
impl ScopedConfigStore {
pub fn new(inner: Arc<dyn ConfigStore>, scope_id: ScopeId) -> Self {
Self { inner, scope_id }
}
pub fn scope_id(&self) -> &ScopeId {
&self.scope_id
}
pub fn inner(&self) -> &dyn ConfigStore {
self.inner.as_ref()
}
fn scoped_namespace(&self, namespace: &str) -> String {
scoped_key(&self.scope_id, namespace)
}
}
#[async_trait]
impl ConfigStore for ScopedConfigStore {
async fn get(&self, namespace: &str, id: &str) -> Result<Option<Value>, StorageError> {
self.inner.get(&self.scoped_namespace(namespace), id).await
}
async fn list(
&self,
namespace: &str,
offset: usize,
limit: usize,
) -> Result<Vec<(String, Value)>, StorageError> {
self.inner
.list(&self.scoped_namespace(namespace), offset, limit)
.await
}
async fn put(&self, namespace: &str, id: &str, value: &Value) -> Result<(), StorageError> {
self.inner
.put(&self.scoped_namespace(namespace), id, value)
.await
}
async fn delete(&self, namespace: &str, id: &str) -> Result<(), StorageError> {
self.inner
.delete(&self.scoped_namespace(namespace), id)
.await
}
async fn put_if_absent(
&self,
namespace: &str,
id: &str,
value: &Value,
) -> Result<(), StorageError> {
self.inner
.put_if_absent(&self.scoped_namespace(namespace), id, value)
.await
}
async fn exists(&self, namespace: &str, id: &str) -> Result<bool, StorageError> {
self.inner
.exists(&self.scoped_namespace(namespace), id)
.await
}
async fn put_if_revision(
&self,
namespace: &str,
id: &str,
value: &Value,
expected_revision: u64,
) -> Result<(), StorageError> {
self.inner
.put_if_revision(
&self.scoped_namespace(namespace),
id,
value,
expected_revision,
)
.await
}
async fn delete_if_revision(
&self,
namespace: &str,
id: &str,
expected_revision: u64,
) -> Result<(), StorageError> {
self.inner
.delete_if_revision(&self.scoped_namespace(namespace), id, expected_revision)
.await
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use super::*;
#[test]
fn extract_meta_revision_returns_some_for_valid_path() {
let value = serde_json::json!({"meta": {"revision": 42}});
assert_eq!(extract_meta_revision(&value), Some(42));
}
#[test]
fn extract_meta_revision_returns_none_for_missing_meta() {
let value = serde_json::json!({"spec": {"id": "foo"}});
assert_eq!(extract_meta_revision(&value), None);
}
#[test]
fn extract_meta_revision_returns_none_for_missing_revision_key() {
let value = serde_json::json!({"meta": {"source": {"kind": "user"}}});
assert_eq!(extract_meta_revision(&value), None);
}
#[test]
fn extract_meta_revision_returns_none_for_wrong_type() {
let value = serde_json::json!({"meta": {"revision": "not-a-number"}});
assert_eq!(extract_meta_revision(&value), None);
}
#[test]
fn extract_meta_revision_returns_none_for_bare_spec() {
let value = serde_json::json!({"id": "agent-1", "name": "Alice"});
assert_eq!(extract_meta_revision(&value), None);
}
#[derive(Debug, Default)]
struct MemoryConfigStore {
data: RwLock<HashMap<String, HashMap<String, Value>>>,
}
#[async_trait]
impl ConfigStore for MemoryConfigStore {
async fn get(&self, namespace: &str, id: &str) -> Result<Option<Value>, StorageError> {
let data = self.data.read().await;
Ok(data.get(namespace).and_then(|ns| ns.get(id)).cloned())
}
async fn list(
&self,
namespace: &str,
offset: usize,
limit: usize,
) -> Result<Vec<(String, Value)>, StorageError> {
let data = self.data.read().await;
let Some(namespace_data) = data.get(namespace) else {
return Ok(Vec::new());
};
let mut items: Vec<_> = namespace_data
.iter()
.map(|(id, value)| (id.clone(), value.clone()))
.collect();
items.sort_by(|left, right| left.0.cmp(&right.0));
Ok(items.into_iter().skip(offset).take(limit).collect())
}
async fn put(&self, namespace: &str, id: &str, value: &Value) -> Result<(), StorageError> {
let mut data = self.data.write().await;
data.entry(namespace.to_string())
.or_default()
.insert(id.to_string(), value.clone());
Ok(())
}
async fn delete(&self, namespace: &str, id: &str) -> Result<(), StorageError> {
let mut data = self.data.write().await;
if let Some(namespace_data) = data.get_mut(namespace) {
namespace_data.remove(id);
}
Ok(())
}
}
#[tokio::test]
async fn config_store_round_trip() {
let store: Arc<dyn ConfigStore> = Arc::new(MemoryConfigStore::default());
let value = serde_json::json!({"id": "alpha", "label": "first"});
store.put("tests", "alpha", &value).await.unwrap();
assert_eq!(store.get("tests", "alpha").await.unwrap(), Some(value));
}
#[tokio::test]
async fn config_store_lists_sorted_entries() {
let store: Arc<dyn ConfigStore> = Arc::new(MemoryConfigStore::default());
store
.put(
"tests",
"bravo",
&serde_json::json!({"id": "bravo", "label": "second"}),
)
.await
.unwrap();
store
.put(
"tests",
"alpha",
&serde_json::json!({"id": "alpha", "label": "first"}),
)
.await
.unwrap();
let items = store.list("tests", 0, 10).await.unwrap();
assert_eq!(items[0].0, "alpha");
assert_eq!(items[1].0, "bravo");
}
}