use std::collections::BTreeMap;
use std::path::Path;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use rusqlite::Connection;
use serde_json::Value;
use tokio::sync::RwLock;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum MetadataScope {
Mob(String),
Run(String, String),
}
impl MetadataScope {
pub fn mob_id(&self) -> &str {
match self {
Self::Mob(mob) => mob,
Self::Run(mob, _) => mob,
}
}
pub fn run_id(&self) -> Option<&str> {
match self {
Self::Mob(_) => None,
Self::Run(_, run) => Some(run),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct RuntimeMetadataTable {
inner: Arc<RwLock<BTreeMap<MetadataScope, BTreeMap<String, String>>>>,
}
impl RuntimeMetadataTable {
pub fn new() -> Self {
Self::default()
}
pub async fn set_labels(&self, scope: MetadataScope, labels: BTreeMap<String, String>) {
let mut guard = self.inner.write().await;
if labels.is_empty() {
guard.remove(&scope);
} else {
guard.insert(scope, labels);
}
}
pub async fn get_labels(&self, scope: &MetadataScope) -> BTreeMap<String, String> {
let guard = self.inner.read().await;
guard.get(scope).cloned().unwrap_or_default()
}
pub async fn delete_labels(&self, scope: &MetadataScope) -> Option<BTreeMap<String, String>> {
let mut guard = self.inner.write().await;
guard.remove(scope)
}
pub async fn list_labels_for_mob(
&self,
mob_id: &str,
) -> Vec<(MetadataScope, BTreeMap<String, String>)> {
let guard = self.inner.read().await;
guard
.iter()
.filter(|(scope, _)| scope.mob_id() == mob_id)
.map(|(scope, labels)| (scope.clone(), labels.clone()))
.collect()
}
}
pub fn parse_labels_param(value: Option<&Value>) -> Result<BTreeMap<String, String>, String> {
match value {
None | Some(Value::Null) => Ok(BTreeMap::new()),
Some(v) => serde_json::from_value::<BTreeMap<String, String>>(v.clone())
.map_err(|err| format!("labels must be a map of string to string: {err}")),
}
}
pub fn labels_to_json_value(labels: &BTreeMap<String, String>) -> Value {
let mut map = serde_json::Map::with_capacity(labels.len());
for (k, v) in labels {
map.insert(k.clone(), Value::String(v.clone()));
}
Value::Object(map)
}
pub enum LabelRpcResult {
Accepted,
Labels(BTreeMap<String, String>),
InvalidParams(String),
}
pub async fn dispatch_labels_set(
table: &RuntimeMetadataTable,
scope: MetadataScope,
params: &Value,
) -> LabelRpcResult {
match parse_labels_param(params.get("labels")) {
Ok(labels) => {
table.set_labels(scope, labels).await;
LabelRpcResult::Accepted
}
Err(message) => LabelRpcResult::InvalidParams(message),
}
}
pub async fn dispatch_labels_get(
table: &RuntimeMetadataTable,
scope: MetadataScope,
) -> LabelRpcResult {
LabelRpcResult::Labels(table.get_labels(&scope).await)
}
pub async fn dispatch_labels_delete(
table: &RuntimeMetadataTable,
scope: MetadataScope,
) -> LabelRpcResult {
let _ = table.delete_labels(&scope).await;
LabelRpcResult::Accepted
}
pub fn parse_run_id_param(params: &Value) -> Result<&str, String> {
match params.get("run_id").and_then(Value::as_str) {
Some(s) if !s.is_empty() => Ok(s),
_ => Err("run_id required".to_string()),
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MetadataStoreError {
Io(String),
Decode(String),
}
impl std::fmt::Display for MetadataStoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(msg) => write!(f, "metadata store io: {msg}"),
Self::Decode(msg) => write!(f, "metadata store decode: {msg}"),
}
}
}
impl std::error::Error for MetadataStoreError {}
#[async_trait]
pub trait PersistentMetadataStore: Send + Sync {
async fn get_subscription_cursor(
&self,
mob_id: &str,
) -> Result<Option<u64>, MetadataStoreError>;
async fn set_subscription_cursor(
&self,
mob_id: &str,
cursor: u64,
) -> Result<(), MetadataStoreError>;
}
#[derive(Debug, Default)]
pub struct InMemoryMetadataStore {
cursors: RwLock<BTreeMap<String, u64>>,
}
impl InMemoryMetadataStore {
pub fn new() -> Self {
Self::default()
}
}
#[async_trait]
impl PersistentMetadataStore for InMemoryMetadataStore {
async fn get_subscription_cursor(
&self,
mob_id: &str,
) -> Result<Option<u64>, MetadataStoreError> {
Ok(self.cursors.read().await.get(mob_id).copied())
}
async fn set_subscription_cursor(
&self,
mob_id: &str,
cursor: u64,
) -> Result<(), MetadataStoreError> {
self.cursors
.write()
.await
.insert(mob_id.to_string(), cursor);
Ok(())
}
}
pub struct SqliteMetadataStore {
conn: Mutex<Connection>,
}
const SUBSCRIPTION_CURSOR_KEY: &str = "subscription_cursor";
impl SqliteMetadataStore {
pub fn open(path: impl AsRef<Path>) -> Result<Self, MetadataStoreError> {
let conn =
Connection::open(path).map_err(|err| MetadataStoreError::Io(format!("open: {err}")))?;
conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")
.map_err(|err| MetadataStoreError::Io(format!("pragma: {err}")))?;
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS mobkit_metadata (
mob_id TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
PRIMARY KEY (mob_id, key)
);",
)
.map_err(|err| MetadataStoreError::Io(format!("schema: {err}")))?;
Ok(Self {
conn: Mutex::new(conn),
})
}
pub fn in_memory() -> Result<Self, MetadataStoreError> {
let conn = Connection::open_in_memory()
.map_err(|err| MetadataStoreError::Io(format!("in-memory open: {err}")))?;
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS mobkit_metadata (
mob_id TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
PRIMARY KEY (mob_id, key)
);",
)
.map_err(|err| MetadataStoreError::Io(format!("schema: {err}")))?;
Ok(Self {
conn: Mutex::new(conn),
})
}
fn lock_conn(&self) -> Result<std::sync::MutexGuard<'_, Connection>, MetadataStoreError> {
self.conn
.lock()
.map_err(|err| MetadataStoreError::Io(format!("connection mutex poisoned: {err}")))
}
}
#[async_trait]
impl PersistentMetadataStore for SqliteMetadataStore {
async fn get_subscription_cursor(
&self,
mob_id: &str,
) -> Result<Option<u64>, MetadataStoreError> {
let conn = self.lock_conn()?;
let mut stmt = conn
.prepare_cached(
"SELECT value FROM mobkit_metadata WHERE mob_id = ?1 AND key = ?2 LIMIT 1",
)
.map_err(|err| MetadataStoreError::Io(format!("prepare: {err}")))?;
let value: Option<String> = stmt
.query_row(rusqlite::params![mob_id, SUBSCRIPTION_CURSOR_KEY], |row| {
row.get::<_, String>(0)
})
.map(Some)
.or_else(|err| match err {
rusqlite::Error::QueryReturnedNoRows => Ok(None),
other => Err(MetadataStoreError::Io(format!("query: {other}"))),
})?;
match value {
Some(s) => s
.parse::<u64>()
.map(Some)
.map_err(|err| MetadataStoreError::Decode(format!("cursor parse: {err}"))),
None => Ok(None),
}
}
async fn set_subscription_cursor(
&self,
mob_id: &str,
cursor: u64,
) -> Result<(), MetadataStoreError> {
let conn = self.lock_conn()?;
conn.execute(
"INSERT INTO mobkit_metadata (mob_id, key, value) VALUES (?1, ?2, ?3) \
ON CONFLICT(mob_id, key) DO UPDATE SET value = excluded.value",
rusqlite::params![mob_id, SUBSCRIPTION_CURSOR_KEY, cursor.to_string()],
)
.map_err(|err| MetadataStoreError::Io(format!("upsert: {err}")))?;
Ok(())
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
fn labels(pairs: &[(&str, &str)]) -> BTreeMap<String, String> {
pairs
.iter()
.map(|(k, v)| ((*k).to_string(), (*v).to_string()))
.collect()
}
#[tokio::test]
async fn set_and_get_mob_labels() {
let table = RuntimeMetadataTable::new();
let scope = MetadataScope::Mob("mob-a".to_string());
table
.set_labels(scope.clone(), labels(&[("repo", "agents"), ("env", "dev")]))
.await;
let got = table.get_labels(&scope).await;
assert_eq!(got.get("repo").map(String::as_str), Some("agents"));
assert_eq!(got.get("env").map(String::as_str), Some("dev"));
}
#[tokio::test]
async fn set_replaces_rather_than_merges() {
let table = RuntimeMetadataTable::new();
let scope = MetadataScope::Mob("mob-a".to_string());
table
.set_labels(scope.clone(), labels(&[("a", "1"), ("b", "2")]))
.await;
table.set_labels(scope.clone(), labels(&[("a", "9")])).await;
let got = table.get_labels(&scope).await;
assert_eq!(got.len(), 1);
assert_eq!(got.get("a").map(String::as_str), Some("9"));
assert!(!got.contains_key("b"));
}
#[tokio::test]
async fn delete_clears_entry() {
let table = RuntimeMetadataTable::new();
let scope = MetadataScope::Run("mob-a".to_string(), "run-1".to_string());
table.set_labels(scope.clone(), labels(&[("k", "v")])).await;
let prev = table.delete_labels(&scope).await;
assert_eq!(prev.unwrap().get("k").map(String::as_str), Some("v"));
let after = table.get_labels(&scope).await;
assert!(after.is_empty());
}
#[tokio::test]
async fn empty_set_clears_entry() {
let table = RuntimeMetadataTable::new();
let scope = MetadataScope::Mob("mob-a".to_string());
table.set_labels(scope.clone(), labels(&[("k", "v")])).await;
table.set_labels(scope.clone(), BTreeMap::new()).await;
assert!(table.get_labels(&scope).await.is_empty());
}
#[tokio::test]
async fn list_returns_mob_and_run_entries() {
let table = RuntimeMetadataTable::new();
let mob_scope = MetadataScope::Mob("mob-a".to_string());
let run_scope = MetadataScope::Run("mob-a".to_string(), "run-1".to_string());
let other_run = MetadataScope::Run("mob-b".to_string(), "run-1".to_string());
table
.set_labels(mob_scope.clone(), labels(&[("env", "dev")]))
.await;
table
.set_labels(run_scope.clone(), labels(&[("trace", "abc")]))
.await;
table
.set_labels(other_run, labels(&[("trace", "xyz")]))
.await;
let entries = table.list_labels_for_mob("mob-a").await;
assert_eq!(entries.len(), 2);
let scopes: Vec<&MetadataScope> = entries.iter().map(|(s, _)| s).collect();
assert!(scopes.contains(&&mob_scope));
assert!(scopes.contains(&&run_scope));
}
#[tokio::test]
async fn in_memory_persistent_store_round_trip() {
let store = InMemoryMetadataStore::new();
assert_eq!(
store.get_subscription_cursor("mob-a").await.unwrap(),
None,
"fresh store should have no cursor",
);
store.set_subscription_cursor("mob-a", 42).await.unwrap();
assert_eq!(
store.get_subscription_cursor("mob-a").await.unwrap(),
Some(42),
);
assert_eq!(store.get_subscription_cursor("mob-b").await.unwrap(), None,);
}
#[tokio::test]
async fn in_memory_persistent_store_overwrite() {
let store = InMemoryMetadataStore::new();
store.set_subscription_cursor("m", 1).await.unwrap();
store.set_subscription_cursor("m", 2).await.unwrap();
assert_eq!(store.get_subscription_cursor("m").await.unwrap(), Some(2),);
}
#[tokio::test]
async fn sqlite_persistent_store_round_trip() {
let store = SqliteMetadataStore::in_memory().unwrap();
assert_eq!(store.get_subscription_cursor("mob-a").await.unwrap(), None,);
store.set_subscription_cursor("mob-a", 1234).await.unwrap();
assert_eq!(
store.get_subscription_cursor("mob-a").await.unwrap(),
Some(1234),
);
store.set_subscription_cursor("mob-a", 9999).await.unwrap();
assert_eq!(
store.get_subscription_cursor("mob-a").await.unwrap(),
Some(9999),
);
store.set_subscription_cursor("mob-b", 5).await.unwrap();
assert_eq!(
store.get_subscription_cursor("mob-a").await.unwrap(),
Some(9999),
);
assert_eq!(
store.get_subscription_cursor("mob-b").await.unwrap(),
Some(5),
);
}
#[tokio::test]
async fn sqlite_store_persists_across_handles() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("mobkit-metadata.sqlite");
{
let store = SqliteMetadataStore::open(&path).unwrap();
store.set_subscription_cursor("mob-x", 7777).await.unwrap();
}
let store = SqliteMetadataStore::open(&path).unwrap();
assert_eq!(
store.get_subscription_cursor("mob-x").await.unwrap(),
Some(7777),
"cursor should survive handle drop",
);
}
#[tokio::test]
async fn run_scope_distinguishes_mobs() {
let table = RuntimeMetadataTable::new();
let scope_a = MetadataScope::Run("mob-a".to_string(), "run-1".to_string());
let scope_b = MetadataScope::Run("mob-b".to_string(), "run-1".to_string());
table
.set_labels(scope_a.clone(), labels(&[("k", "a")]))
.await;
table
.set_labels(scope_b.clone(), labels(&[("k", "b")]))
.await;
assert_eq!(
table
.get_labels(&scope_a)
.await
.get("k")
.map(String::as_str),
Some("a")
);
assert_eq!(
table
.get_labels(&scope_b)
.await
.get("k")
.map(String::as_str),
Some("b")
);
}
}