Skip to main content

meerkat_mobkit/runtime/
metadata.rs

1//! Mobkit-side sidecar table for mob- and run-level labels.
2//!
3//! Member-level labels are owned by `meerkat-mob` (they flow through
4//! `SpawnMemberSpec.with_labels()` and out via `MobMemberListEntry.labels`).
5//! Mob-level and run-level labels — for associating an external context like
6//! `repo`, `branch`, `customer`, `deployment`, or `environment` with a mob or
7//! a flow run — have nowhere to live in the upstream model. This module owns
8//! that side table.
9//!
10//! For v1 the table is in-memory only. Persistence behind `MobStorage` is a
11//! future enhancement; restarts wipe the labels. The table is keyed by
12//! [`MetadataScope`] so the same surface can serve mobs and runs uniformly.
13
14use std::collections::BTreeMap;
15use std::path::Path;
16use std::sync::{Arc, Mutex};
17
18use async_trait::async_trait;
19use rusqlite::Connection;
20use serde_json::Value;
21use tokio::sync::RwLock;
22
23/// Scope of a label set.
24///
25/// Mob scope holds labels keyed by `mob_id`; run scope holds labels keyed by
26/// `(mob_id, run_id)`. The mob id is part of the run scope so two mobs with
27/// overlapping run identifiers stay isolated.
28#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
29pub enum MetadataScope {
30    Mob(String),
31    Run(String, String),
32}
33
34impl MetadataScope {
35    /// Return the mob id this scope belongs to.
36    pub fn mob_id(&self) -> &str {
37        match self {
38            Self::Mob(mob) => mob,
39            Self::Run(mob, _) => mob,
40        }
41    }
42
43    /// Return the run id, if this scope is run-scoped.
44    pub fn run_id(&self) -> Option<&str> {
45        match self {
46            Self::Mob(_) => None,
47            Self::Run(_, run) => Some(run),
48        }
49    }
50}
51
52/// In-memory label table keyed by [`MetadataScope`].
53///
54/// Operations replace label sets wholesale (no merge). Callers wanting
55/// merge semantics should read first, mutate the map, then write it back.
56#[derive(Debug, Clone, Default)]
57pub struct RuntimeMetadataTable {
58    inner: Arc<RwLock<BTreeMap<MetadataScope, BTreeMap<String, String>>>>,
59}
60
61impl RuntimeMetadataTable {
62    /// Create an empty table.
63    pub fn new() -> Self {
64        Self::default()
65    }
66
67    /// Replace the label set for `scope`. An empty `labels` map clears
68    /// the entry.
69    pub async fn set_labels(&self, scope: MetadataScope, labels: BTreeMap<String, String>) {
70        let mut guard = self.inner.write().await;
71        if labels.is_empty() {
72            guard.remove(&scope);
73        } else {
74            guard.insert(scope, labels);
75        }
76    }
77
78    /// Return the label set for `scope`, or an empty map if none is set.
79    pub async fn get_labels(&self, scope: &MetadataScope) -> BTreeMap<String, String> {
80        let guard = self.inner.read().await;
81        guard.get(scope).cloned().unwrap_or_default()
82    }
83
84    /// Remove the label set for `scope`. Returns the previous value if any.
85    pub async fn delete_labels(&self, scope: &MetadataScope) -> Option<BTreeMap<String, String>> {
86        let mut guard = self.inner.write().await;
87        guard.remove(scope)
88    }
89
90    /// Return all label sets associated with a mob — both the mob-scoped
91    /// entry (if any) and every run-scoped entry whose mob id matches.
92    pub async fn list_labels_for_mob(
93        &self,
94        mob_id: &str,
95    ) -> Vec<(MetadataScope, BTreeMap<String, String>)> {
96        let guard = self.inner.read().await;
97        guard
98            .iter()
99            .filter(|(scope, _)| scope.mob_id() == mob_id)
100            .map(|(scope, labels)| (scope.clone(), labels.clone()))
101            .collect()
102    }
103}
104
105/// Parse a JSON `labels` field as a string→string map.
106///
107/// Accepts a missing field, `null`, or an empty object — all yield an empty
108/// map. Anything else must deserialize cleanly or returns a human-readable
109/// error string suitable for a JSON-RPC `Invalid params` reply.
110pub fn parse_labels_param(value: Option<&Value>) -> Result<BTreeMap<String, String>, String> {
111    match value {
112        None | Some(Value::Null) => Ok(BTreeMap::new()),
113        Some(v) => serde_json::from_value::<BTreeMap<String, String>>(v.clone())
114            .map_err(|err| format!("labels must be a map of string to string: {err}")),
115    }
116}
117
118/// Render a label map as a JSON object suitable for the wire format.
119pub fn labels_to_json_value(labels: &BTreeMap<String, String>) -> Value {
120    let mut map = serde_json::Map::with_capacity(labels.len());
121    for (k, v) in labels {
122        map.insert(k.clone(), Value::String(v.clone()));
123    }
124    Value::Object(map)
125}
126
127/// Outcome of dispatching a label RPC against a [`RuntimeMetadataTable`].
128///
129/// Both transports (the unified-runtime JSON-RPC and the HTTP-console JSON-RPC)
130/// project this into their own response envelope.
131pub enum LabelRpcResult {
132    /// `set` / `delete`: returns `{"accepted": true}`.
133    Accepted,
134    /// `get`: returns `{"labels": {...}}`.
135    Labels(BTreeMap<String, String>),
136    /// Validation error — `Invalid params: <message>`.
137    InvalidParams(String),
138}
139
140/// Replace the label set for `scope`, parsing `labels` from RPC params.
141pub async fn dispatch_labels_set(
142    table: &RuntimeMetadataTable,
143    scope: MetadataScope,
144    params: &Value,
145) -> LabelRpcResult {
146    match parse_labels_param(params.get("labels")) {
147        Ok(labels) => {
148            table.set_labels(scope, labels).await;
149            LabelRpcResult::Accepted
150        }
151        Err(message) => LabelRpcResult::InvalidParams(message),
152    }
153}
154
155/// Read the label set for `scope`.
156pub async fn dispatch_labels_get(
157    table: &RuntimeMetadataTable,
158    scope: MetadataScope,
159) -> LabelRpcResult {
160    LabelRpcResult::Labels(table.get_labels(&scope).await)
161}
162
163/// Remove the label set for `scope`.
164pub async fn dispatch_labels_delete(
165    table: &RuntimeMetadataTable,
166    scope: MetadataScope,
167) -> LabelRpcResult {
168    let _ = table.delete_labels(&scope).await;
169    LabelRpcResult::Accepted
170}
171
172/// Pull a non-empty `run_id` string from RPC params.
173pub fn parse_run_id_param(params: &Value) -> Result<&str, String> {
174    match params.get("run_id").and_then(Value::as_str) {
175        Some(s) if !s.is_empty() => Ok(s),
176        _ => Err("run_id required".to_string()),
177    }
178}
179
180// ---------------------------------------------------------------------------
181// Persistent metadata adapter
182// ---------------------------------------------------------------------------
183//
184// Distinct from the in-memory `RuntimeMetadataTable` above. The label sidecar
185// resets on restart (acceptable — labels are app-injected runtime metadata).
186// The structural-events subscription cursor must survive restart so a
187// restarted gateway resumes from where it left off rather than dropping
188// events emitted between processes. This adapter owns that durable state.
189
190/// Errors raised by [`PersistentMetadataStore`] implementations.
191#[derive(Debug, Clone, PartialEq, Eq)]
192pub enum MetadataStoreError {
193    /// Underlying I/O or storage failure (sqlite open, schema, query, ...).
194    Io(String),
195    /// A persisted value couldn't be parsed back into the typed shape — the
196    /// store was probably written by a future mobkit version.
197    Decode(String),
198}
199
200impl std::fmt::Display for MetadataStoreError {
201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202        match self {
203            Self::Io(msg) => write!(f, "metadata store io: {msg}"),
204            Self::Decode(msg) => write!(f, "metadata store decode: {msg}"),
205        }
206    }
207}
208
209impl std::error::Error for MetadataStoreError {}
210
211/// Persistent storage for mobkit runtime metadata that must survive a
212/// gateway restart — currently the structural-events subscription cursor.
213///
214/// Two impls live in this module: [`InMemoryMetadataStore`] (no
215/// persistence; used when no SQLite mob storage is configured) and
216/// [`SqliteMetadataStore`] (writes a small `mobkit_metadata` table next
217/// to the mob's own SQLite store). The `UnifiedRuntime` builder picks
218/// the impl based on the configured `MobBootstrapSpec`.
219#[async_trait]
220pub trait PersistentMetadataStore: Send + Sync {
221    /// Read the last-projected mob events cursor for `mob_id`. Returns
222    /// `Ok(None)` when no cursor has been written yet (fresh deploy or
223    /// in-memory deployment that just started).
224    async fn get_subscription_cursor(
225        &self,
226        mob_id: &str,
227    ) -> Result<Option<u64>, MetadataStoreError>;
228
229    /// Persist the last-projected mob events cursor for `mob_id`.
230    async fn set_subscription_cursor(
231        &self,
232        mob_id: &str,
233        cursor: u64,
234    ) -> Result<(), MetadataStoreError>;
235}
236
237/// In-memory persistent metadata store.
238///
239/// "Persistent" is aspirational here — the values survive `Arc<...>` clones
240/// but reset to empty on process restart. Used when no SQLite mob storage
241/// is configured. The structural-events subscription falls back to "start
242/// at latest" on restart in this case, which is the right behaviour:
243/// in-memory deployments don't have a persistent ledger to replay against
244/// either.
245#[derive(Debug, Default)]
246pub struct InMemoryMetadataStore {
247    cursors: RwLock<BTreeMap<String, u64>>,
248}
249
250impl InMemoryMetadataStore {
251    pub fn new() -> Self {
252        Self::default()
253    }
254}
255
256#[async_trait]
257impl PersistentMetadataStore for InMemoryMetadataStore {
258    async fn get_subscription_cursor(
259        &self,
260        mob_id: &str,
261    ) -> Result<Option<u64>, MetadataStoreError> {
262        Ok(self.cursors.read().await.get(mob_id).copied())
263    }
264
265    async fn set_subscription_cursor(
266        &self,
267        mob_id: &str,
268        cursor: u64,
269    ) -> Result<(), MetadataStoreError> {
270        self.cursors
271            .write()
272            .await
273            .insert(mob_id.to_string(), cursor);
274        Ok(())
275    }
276}
277
278/// SQLite-backed persistent metadata store.
279///
280/// Opens its own `rusqlite::Connection` to the supplied database path —
281/// the same path the mob's `MobStorage` uses, but with a separate handle.
282/// Cross-handle access is safe; meerkat #445's `notify`-based event-store
283/// watcher already runs in this configuration. The `mobkit_metadata`
284/// table is created on init and is independent of meerkat-mob's own
285/// schema, so opening order doesn't matter.
286///
287/// Schema:
288/// ```text
289/// CREATE TABLE mobkit_metadata (
290///     mob_id  TEXT NOT NULL,
291///     key     TEXT NOT NULL,
292///     value   TEXT NOT NULL,
293///     PRIMARY KEY (mob_id, key)
294/// )
295/// ```
296///
297/// The subscription cursor lives at `key = "subscription_cursor"`,
298/// stored as a base-10 string for simple human inspection. Future
299/// metadata fields land here under their own keys.
300pub struct SqliteMetadataStore {
301    conn: Mutex<Connection>,
302}
303
304const SUBSCRIPTION_CURSOR_KEY: &str = "subscription_cursor";
305
306impl SqliteMetadataStore {
307    /// Open (or create) a SQLite metadata store at `path`.
308    ///
309    /// `path` should typically be the same database the mob's `MobStorage`
310    /// uses; the table is `mobkit_metadata` and won't collide with
311    /// meerkat-mob's own tables.
312    pub fn open(path: impl AsRef<Path>) -> Result<Self, MetadataStoreError> {
313        let conn =
314            Connection::open(path).map_err(|err| MetadataStoreError::Io(format!("open: {err}")))?;
315        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")
316            .map_err(|err| MetadataStoreError::Io(format!("pragma: {err}")))?;
317        conn.execute_batch(
318            "CREATE TABLE IF NOT EXISTS mobkit_metadata (
319                mob_id TEXT NOT NULL,
320                key    TEXT NOT NULL,
321                value  TEXT NOT NULL,
322                PRIMARY KEY (mob_id, key)
323            );",
324        )
325        .map_err(|err| MetadataStoreError::Io(format!("schema: {err}")))?;
326        Ok(Self {
327            conn: Mutex::new(conn),
328        })
329    }
330
331    /// Open an in-memory SQLite store (for tests).
332    pub fn in_memory() -> Result<Self, MetadataStoreError> {
333        let conn = Connection::open_in_memory()
334            .map_err(|err| MetadataStoreError::Io(format!("in-memory open: {err}")))?;
335        conn.execute_batch(
336            "CREATE TABLE IF NOT EXISTS mobkit_metadata (
337                mob_id TEXT NOT NULL,
338                key    TEXT NOT NULL,
339                value  TEXT NOT NULL,
340                PRIMARY KEY (mob_id, key)
341            );",
342        )
343        .map_err(|err| MetadataStoreError::Io(format!("schema: {err}")))?;
344        Ok(Self {
345            conn: Mutex::new(conn),
346        })
347    }
348
349    fn lock_conn(&self) -> Result<std::sync::MutexGuard<'_, Connection>, MetadataStoreError> {
350        self.conn
351            .lock()
352            .map_err(|err| MetadataStoreError::Io(format!("connection mutex poisoned: {err}")))
353    }
354}
355
356#[async_trait]
357impl PersistentMetadataStore for SqliteMetadataStore {
358    async fn get_subscription_cursor(
359        &self,
360        mob_id: &str,
361    ) -> Result<Option<u64>, MetadataStoreError> {
362        let conn = self.lock_conn()?;
363        let mut stmt = conn
364            .prepare_cached(
365                "SELECT value FROM mobkit_metadata WHERE mob_id = ?1 AND key = ?2 LIMIT 1",
366            )
367            .map_err(|err| MetadataStoreError::Io(format!("prepare: {err}")))?;
368        let value: Option<String> = stmt
369            .query_row(rusqlite::params![mob_id, SUBSCRIPTION_CURSOR_KEY], |row| {
370                row.get::<_, String>(0)
371            })
372            .map(Some)
373            .or_else(|err| match err {
374                rusqlite::Error::QueryReturnedNoRows => Ok(None),
375                other => Err(MetadataStoreError::Io(format!("query: {other}"))),
376            })?;
377        match value {
378            Some(s) => s
379                .parse::<u64>()
380                .map(Some)
381                .map_err(|err| MetadataStoreError::Decode(format!("cursor parse: {err}"))),
382            None => Ok(None),
383        }
384    }
385
386    async fn set_subscription_cursor(
387        &self,
388        mob_id: &str,
389        cursor: u64,
390    ) -> Result<(), MetadataStoreError> {
391        let conn = self.lock_conn()?;
392        conn.execute(
393            "INSERT INTO mobkit_metadata (mob_id, key, value) VALUES (?1, ?2, ?3) \
394             ON CONFLICT(mob_id, key) DO UPDATE SET value = excluded.value",
395            rusqlite::params![mob_id, SUBSCRIPTION_CURSOR_KEY, cursor.to_string()],
396        )
397        .map_err(|err| MetadataStoreError::Io(format!("upsert: {err}")))?;
398        Ok(())
399    }
400}
401
402#[cfg(test)]
403#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
404mod tests {
405    use super::*;
406
407    fn labels(pairs: &[(&str, &str)]) -> BTreeMap<String, String> {
408        pairs
409            .iter()
410            .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
411            .collect()
412    }
413
414    #[tokio::test]
415    async fn set_and_get_mob_labels() {
416        let table = RuntimeMetadataTable::new();
417        let scope = MetadataScope::Mob("mob-a".to_string());
418        table
419            .set_labels(scope.clone(), labels(&[("repo", "agents"), ("env", "dev")]))
420            .await;
421        let got = table.get_labels(&scope).await;
422        assert_eq!(got.get("repo").map(String::as_str), Some("agents"));
423        assert_eq!(got.get("env").map(String::as_str), Some("dev"));
424    }
425
426    #[tokio::test]
427    async fn set_replaces_rather_than_merges() {
428        let table = RuntimeMetadataTable::new();
429        let scope = MetadataScope::Mob("mob-a".to_string());
430        table
431            .set_labels(scope.clone(), labels(&[("a", "1"), ("b", "2")]))
432            .await;
433        table.set_labels(scope.clone(), labels(&[("a", "9")])).await;
434        let got = table.get_labels(&scope).await;
435        assert_eq!(got.len(), 1);
436        assert_eq!(got.get("a").map(String::as_str), Some("9"));
437        assert!(!got.contains_key("b"));
438    }
439
440    #[tokio::test]
441    async fn delete_clears_entry() {
442        let table = RuntimeMetadataTable::new();
443        let scope = MetadataScope::Run("mob-a".to_string(), "run-1".to_string());
444        table.set_labels(scope.clone(), labels(&[("k", "v")])).await;
445        let prev = table.delete_labels(&scope).await;
446        assert_eq!(prev.unwrap().get("k").map(String::as_str), Some("v"));
447        let after = table.get_labels(&scope).await;
448        assert!(after.is_empty());
449    }
450
451    #[tokio::test]
452    async fn empty_set_clears_entry() {
453        let table = RuntimeMetadataTable::new();
454        let scope = MetadataScope::Mob("mob-a".to_string());
455        table.set_labels(scope.clone(), labels(&[("k", "v")])).await;
456        table.set_labels(scope.clone(), BTreeMap::new()).await;
457        assert!(table.get_labels(&scope).await.is_empty());
458    }
459
460    #[tokio::test]
461    async fn list_returns_mob_and_run_entries() {
462        let table = RuntimeMetadataTable::new();
463        let mob_scope = MetadataScope::Mob("mob-a".to_string());
464        let run_scope = MetadataScope::Run("mob-a".to_string(), "run-1".to_string());
465        let other_run = MetadataScope::Run("mob-b".to_string(), "run-1".to_string());
466        table
467            .set_labels(mob_scope.clone(), labels(&[("env", "dev")]))
468            .await;
469        table
470            .set_labels(run_scope.clone(), labels(&[("trace", "abc")]))
471            .await;
472        table
473            .set_labels(other_run, labels(&[("trace", "xyz")]))
474            .await;
475
476        let entries = table.list_labels_for_mob("mob-a").await;
477        assert_eq!(entries.len(), 2);
478        let scopes: Vec<&MetadataScope> = entries.iter().map(|(s, _)| s).collect();
479        assert!(scopes.contains(&&mob_scope));
480        assert!(scopes.contains(&&run_scope));
481    }
482
483    // ----- PersistentMetadataStore tests --------------------------------
484
485    #[tokio::test]
486    async fn in_memory_persistent_store_round_trip() {
487        let store = InMemoryMetadataStore::new();
488        assert_eq!(
489            store.get_subscription_cursor("mob-a").await.unwrap(),
490            None,
491            "fresh store should have no cursor",
492        );
493        store.set_subscription_cursor("mob-a", 42).await.unwrap();
494        assert_eq!(
495            store.get_subscription_cursor("mob-a").await.unwrap(),
496            Some(42),
497        );
498        // Per-mob isolation.
499        assert_eq!(store.get_subscription_cursor("mob-b").await.unwrap(), None,);
500    }
501
502    #[tokio::test]
503    async fn in_memory_persistent_store_overwrite() {
504        let store = InMemoryMetadataStore::new();
505        store.set_subscription_cursor("m", 1).await.unwrap();
506        store.set_subscription_cursor("m", 2).await.unwrap();
507        assert_eq!(store.get_subscription_cursor("m").await.unwrap(), Some(2),);
508    }
509
510    #[tokio::test]
511    async fn sqlite_persistent_store_round_trip() {
512        let store = SqliteMetadataStore::in_memory().unwrap();
513        assert_eq!(store.get_subscription_cursor("mob-a").await.unwrap(), None,);
514        store.set_subscription_cursor("mob-a", 1234).await.unwrap();
515        assert_eq!(
516            store.get_subscription_cursor("mob-a").await.unwrap(),
517            Some(1234),
518        );
519        // Overwrite via UPSERT.
520        store.set_subscription_cursor("mob-a", 9999).await.unwrap();
521        assert_eq!(
522            store.get_subscription_cursor("mob-a").await.unwrap(),
523            Some(9999),
524        );
525        // Per-mob isolation.
526        store.set_subscription_cursor("mob-b", 5).await.unwrap();
527        assert_eq!(
528            store.get_subscription_cursor("mob-a").await.unwrap(),
529            Some(9999),
530        );
531        assert_eq!(
532            store.get_subscription_cursor("mob-b").await.unwrap(),
533            Some(5),
534        );
535    }
536
537    #[tokio::test]
538    async fn sqlite_store_persists_across_handles() {
539        // The whole point of SQLite-backed persistence: a fresh handle to
540        // the same DB sees writes from the previous handle. We can't drop
541        // and reopen an in-memory DB (it disappears with the connection),
542        // so write to a tempfile, drop, reopen.
543        let dir = tempfile::tempdir().unwrap();
544        let path = dir.path().join("mobkit-metadata.sqlite");
545        {
546            let store = SqliteMetadataStore::open(&path).unwrap();
547            store.set_subscription_cursor("mob-x", 7777).await.unwrap();
548        }
549        // Reopen.
550        let store = SqliteMetadataStore::open(&path).unwrap();
551        assert_eq!(
552            store.get_subscription_cursor("mob-x").await.unwrap(),
553            Some(7777),
554            "cursor should survive handle drop",
555        );
556    }
557
558    #[tokio::test]
559    async fn run_scope_distinguishes_mobs() {
560        let table = RuntimeMetadataTable::new();
561        let scope_a = MetadataScope::Run("mob-a".to_string(), "run-1".to_string());
562        let scope_b = MetadataScope::Run("mob-b".to_string(), "run-1".to_string());
563        table
564            .set_labels(scope_a.clone(), labels(&[("k", "a")]))
565            .await;
566        table
567            .set_labels(scope_b.clone(), labels(&[("k", "b")]))
568            .await;
569        assert_eq!(
570            table
571                .get_labels(&scope_a)
572                .await
573                .get("k")
574                .map(String::as_str),
575            Some("a")
576        );
577        assert_eq!(
578            table
579                .get_labels(&scope_b)
580                .await
581                .get("k")
582                .map(String::as_str),
583            Some("b")
584        );
585    }
586}