1use std::collections::BTreeMap;
15use std::path::Path;
16use std::sync::{Arc, Mutex};
17
18use async_trait::async_trait;
19use rusqlite::Connection;
20use serde_json::Value;
21use tokio::sync::RwLock;
22
23#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
29pub enum MetadataScope {
30 Mob(String),
31 Run(String, String),
32}
33
34impl MetadataScope {
35 pub fn mob_id(&self) -> &str {
37 match self {
38 Self::Mob(mob) => mob,
39 Self::Run(mob, _) => mob,
40 }
41 }
42
43 pub fn run_id(&self) -> Option<&str> {
45 match self {
46 Self::Mob(_) => None,
47 Self::Run(_, run) => Some(run),
48 }
49 }
50}
51
52#[derive(Debug, Clone, Default)]
57pub struct RuntimeMetadataTable {
58 inner: Arc<RwLock<BTreeMap<MetadataScope, BTreeMap<String, String>>>>,
59}
60
61impl RuntimeMetadataTable {
62 pub fn new() -> Self {
64 Self::default()
65 }
66
67 pub async fn set_labels(&self, scope: MetadataScope, labels: BTreeMap<String, String>) {
70 let mut guard = self.inner.write().await;
71 if labels.is_empty() {
72 guard.remove(&scope);
73 } else {
74 guard.insert(scope, labels);
75 }
76 }
77
78 pub async fn get_labels(&self, scope: &MetadataScope) -> BTreeMap<String, String> {
80 let guard = self.inner.read().await;
81 guard.get(scope).cloned().unwrap_or_default()
82 }
83
84 pub async fn delete_labels(&self, scope: &MetadataScope) -> Option<BTreeMap<String, String>> {
86 let mut guard = self.inner.write().await;
87 guard.remove(scope)
88 }
89
90 pub async fn list_labels_for_mob(
93 &self,
94 mob_id: &str,
95 ) -> Vec<(MetadataScope, BTreeMap<String, String>)> {
96 let guard = self.inner.read().await;
97 guard
98 .iter()
99 .filter(|(scope, _)| scope.mob_id() == mob_id)
100 .map(|(scope, labels)| (scope.clone(), labels.clone()))
101 .collect()
102 }
103}
104
105pub fn parse_labels_param(value: Option<&Value>) -> Result<BTreeMap<String, String>, String> {
111 match value {
112 None | Some(Value::Null) => Ok(BTreeMap::new()),
113 Some(v) => serde_json::from_value::<BTreeMap<String, String>>(v.clone())
114 .map_err(|err| format!("labels must be a map of string to string: {err}")),
115 }
116}
117
118pub fn labels_to_json_value(labels: &BTreeMap<String, String>) -> Value {
120 let mut map = serde_json::Map::with_capacity(labels.len());
121 for (k, v) in labels {
122 map.insert(k.clone(), Value::String(v.clone()));
123 }
124 Value::Object(map)
125}
126
127pub enum LabelRpcResult {
132 Accepted,
134 Labels(BTreeMap<String, String>),
136 InvalidParams(String),
138}
139
140pub async fn dispatch_labels_set(
142 table: &RuntimeMetadataTable,
143 scope: MetadataScope,
144 params: &Value,
145) -> LabelRpcResult {
146 match parse_labels_param(params.get("labels")) {
147 Ok(labels) => {
148 table.set_labels(scope, labels).await;
149 LabelRpcResult::Accepted
150 }
151 Err(message) => LabelRpcResult::InvalidParams(message),
152 }
153}
154
155pub async fn dispatch_labels_get(
157 table: &RuntimeMetadataTable,
158 scope: MetadataScope,
159) -> LabelRpcResult {
160 LabelRpcResult::Labels(table.get_labels(&scope).await)
161}
162
163pub async fn dispatch_labels_delete(
165 table: &RuntimeMetadataTable,
166 scope: MetadataScope,
167) -> LabelRpcResult {
168 let _ = table.delete_labels(&scope).await;
169 LabelRpcResult::Accepted
170}
171
172pub fn parse_run_id_param(params: &Value) -> Result<&str, String> {
174 match params.get("run_id").and_then(Value::as_str) {
175 Some(s) if !s.is_empty() => Ok(s),
176 _ => Err("run_id required".to_string()),
177 }
178}
179
180#[derive(Debug, Clone, PartialEq, Eq)]
192pub enum MetadataStoreError {
193 Io(String),
195 Decode(String),
198}
199
200impl std::fmt::Display for MetadataStoreError {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 match self {
203 Self::Io(msg) => write!(f, "metadata store io: {msg}"),
204 Self::Decode(msg) => write!(f, "metadata store decode: {msg}"),
205 }
206 }
207}
208
209impl std::error::Error for MetadataStoreError {}
210
211#[async_trait]
220pub trait PersistentMetadataStore: Send + Sync {
221 async fn get_subscription_cursor(
225 &self,
226 mob_id: &str,
227 ) -> Result<Option<u64>, MetadataStoreError>;
228
229 async fn set_subscription_cursor(
231 &self,
232 mob_id: &str,
233 cursor: u64,
234 ) -> Result<(), MetadataStoreError>;
235}
236
237#[derive(Debug, Default)]
246pub struct InMemoryMetadataStore {
247 cursors: RwLock<BTreeMap<String, u64>>,
248}
249
250impl InMemoryMetadataStore {
251 pub fn new() -> Self {
252 Self::default()
253 }
254}
255
256#[async_trait]
257impl PersistentMetadataStore for InMemoryMetadataStore {
258 async fn get_subscription_cursor(
259 &self,
260 mob_id: &str,
261 ) -> Result<Option<u64>, MetadataStoreError> {
262 Ok(self.cursors.read().await.get(mob_id).copied())
263 }
264
265 async fn set_subscription_cursor(
266 &self,
267 mob_id: &str,
268 cursor: u64,
269 ) -> Result<(), MetadataStoreError> {
270 self.cursors
271 .write()
272 .await
273 .insert(mob_id.to_string(), cursor);
274 Ok(())
275 }
276}
277
278pub struct SqliteMetadataStore {
301 conn: Mutex<Connection>,
302}
303
304const SUBSCRIPTION_CURSOR_KEY: &str = "subscription_cursor";
305
306impl SqliteMetadataStore {
307 pub fn open(path: impl AsRef<Path>) -> Result<Self, MetadataStoreError> {
313 let conn =
314 Connection::open(path).map_err(|err| MetadataStoreError::Io(format!("open: {err}")))?;
315 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")
316 .map_err(|err| MetadataStoreError::Io(format!("pragma: {err}")))?;
317 conn.execute_batch(
318 "CREATE TABLE IF NOT EXISTS mobkit_metadata (
319 mob_id TEXT NOT NULL,
320 key TEXT NOT NULL,
321 value TEXT NOT NULL,
322 PRIMARY KEY (mob_id, key)
323 );",
324 )
325 .map_err(|err| MetadataStoreError::Io(format!("schema: {err}")))?;
326 Ok(Self {
327 conn: Mutex::new(conn),
328 })
329 }
330
331 pub fn in_memory() -> Result<Self, MetadataStoreError> {
333 let conn = Connection::open_in_memory()
334 .map_err(|err| MetadataStoreError::Io(format!("in-memory open: {err}")))?;
335 conn.execute_batch(
336 "CREATE TABLE IF NOT EXISTS mobkit_metadata (
337 mob_id TEXT NOT NULL,
338 key TEXT NOT NULL,
339 value TEXT NOT NULL,
340 PRIMARY KEY (mob_id, key)
341 );",
342 )
343 .map_err(|err| MetadataStoreError::Io(format!("schema: {err}")))?;
344 Ok(Self {
345 conn: Mutex::new(conn),
346 })
347 }
348
349 fn lock_conn(&self) -> Result<std::sync::MutexGuard<'_, Connection>, MetadataStoreError> {
350 self.conn
351 .lock()
352 .map_err(|err| MetadataStoreError::Io(format!("connection mutex poisoned: {err}")))
353 }
354}
355
356#[async_trait]
357impl PersistentMetadataStore for SqliteMetadataStore {
358 async fn get_subscription_cursor(
359 &self,
360 mob_id: &str,
361 ) -> Result<Option<u64>, MetadataStoreError> {
362 let conn = self.lock_conn()?;
363 let mut stmt = conn
364 .prepare_cached(
365 "SELECT value FROM mobkit_metadata WHERE mob_id = ?1 AND key = ?2 LIMIT 1",
366 )
367 .map_err(|err| MetadataStoreError::Io(format!("prepare: {err}")))?;
368 let value: Option<String> = stmt
369 .query_row(rusqlite::params![mob_id, SUBSCRIPTION_CURSOR_KEY], |row| {
370 row.get::<_, String>(0)
371 })
372 .map(Some)
373 .or_else(|err| match err {
374 rusqlite::Error::QueryReturnedNoRows => Ok(None),
375 other => Err(MetadataStoreError::Io(format!("query: {other}"))),
376 })?;
377 match value {
378 Some(s) => s
379 .parse::<u64>()
380 .map(Some)
381 .map_err(|err| MetadataStoreError::Decode(format!("cursor parse: {err}"))),
382 None => Ok(None),
383 }
384 }
385
386 async fn set_subscription_cursor(
387 &self,
388 mob_id: &str,
389 cursor: u64,
390 ) -> Result<(), MetadataStoreError> {
391 let conn = self.lock_conn()?;
392 conn.execute(
393 "INSERT INTO mobkit_metadata (mob_id, key, value) VALUES (?1, ?2, ?3) \
394 ON CONFLICT(mob_id, key) DO UPDATE SET value = excluded.value",
395 rusqlite::params![mob_id, SUBSCRIPTION_CURSOR_KEY, cursor.to_string()],
396 )
397 .map_err(|err| MetadataStoreError::Io(format!("upsert: {err}")))?;
398 Ok(())
399 }
400}
401
402#[cfg(test)]
403#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
404mod tests {
405 use super::*;
406
407 fn labels(pairs: &[(&str, &str)]) -> BTreeMap<String, String> {
408 pairs
409 .iter()
410 .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
411 .collect()
412 }
413
414 #[tokio::test]
415 async fn set_and_get_mob_labels() {
416 let table = RuntimeMetadataTable::new();
417 let scope = MetadataScope::Mob("mob-a".to_string());
418 table
419 .set_labels(scope.clone(), labels(&[("repo", "agents"), ("env", "dev")]))
420 .await;
421 let got = table.get_labels(&scope).await;
422 assert_eq!(got.get("repo").map(String::as_str), Some("agents"));
423 assert_eq!(got.get("env").map(String::as_str), Some("dev"));
424 }
425
426 #[tokio::test]
427 async fn set_replaces_rather_than_merges() {
428 let table = RuntimeMetadataTable::new();
429 let scope = MetadataScope::Mob("mob-a".to_string());
430 table
431 .set_labels(scope.clone(), labels(&[("a", "1"), ("b", "2")]))
432 .await;
433 table.set_labels(scope.clone(), labels(&[("a", "9")])).await;
434 let got = table.get_labels(&scope).await;
435 assert_eq!(got.len(), 1);
436 assert_eq!(got.get("a").map(String::as_str), Some("9"));
437 assert!(!got.contains_key("b"));
438 }
439
440 #[tokio::test]
441 async fn delete_clears_entry() {
442 let table = RuntimeMetadataTable::new();
443 let scope = MetadataScope::Run("mob-a".to_string(), "run-1".to_string());
444 table.set_labels(scope.clone(), labels(&[("k", "v")])).await;
445 let prev = table.delete_labels(&scope).await;
446 assert_eq!(prev.unwrap().get("k").map(String::as_str), Some("v"));
447 let after = table.get_labels(&scope).await;
448 assert!(after.is_empty());
449 }
450
451 #[tokio::test]
452 async fn empty_set_clears_entry() {
453 let table = RuntimeMetadataTable::new();
454 let scope = MetadataScope::Mob("mob-a".to_string());
455 table.set_labels(scope.clone(), labels(&[("k", "v")])).await;
456 table.set_labels(scope.clone(), BTreeMap::new()).await;
457 assert!(table.get_labels(&scope).await.is_empty());
458 }
459
460 #[tokio::test]
461 async fn list_returns_mob_and_run_entries() {
462 let table = RuntimeMetadataTable::new();
463 let mob_scope = MetadataScope::Mob("mob-a".to_string());
464 let run_scope = MetadataScope::Run("mob-a".to_string(), "run-1".to_string());
465 let other_run = MetadataScope::Run("mob-b".to_string(), "run-1".to_string());
466 table
467 .set_labels(mob_scope.clone(), labels(&[("env", "dev")]))
468 .await;
469 table
470 .set_labels(run_scope.clone(), labels(&[("trace", "abc")]))
471 .await;
472 table
473 .set_labels(other_run, labels(&[("trace", "xyz")]))
474 .await;
475
476 let entries = table.list_labels_for_mob("mob-a").await;
477 assert_eq!(entries.len(), 2);
478 let scopes: Vec<&MetadataScope> = entries.iter().map(|(s, _)| s).collect();
479 assert!(scopes.contains(&&mob_scope));
480 assert!(scopes.contains(&&run_scope));
481 }
482
483 #[tokio::test]
486 async fn in_memory_persistent_store_round_trip() {
487 let store = InMemoryMetadataStore::new();
488 assert_eq!(
489 store.get_subscription_cursor("mob-a").await.unwrap(),
490 None,
491 "fresh store should have no cursor",
492 );
493 store.set_subscription_cursor("mob-a", 42).await.unwrap();
494 assert_eq!(
495 store.get_subscription_cursor("mob-a").await.unwrap(),
496 Some(42),
497 );
498 assert_eq!(store.get_subscription_cursor("mob-b").await.unwrap(), None,);
500 }
501
502 #[tokio::test]
503 async fn in_memory_persistent_store_overwrite() {
504 let store = InMemoryMetadataStore::new();
505 store.set_subscription_cursor("m", 1).await.unwrap();
506 store.set_subscription_cursor("m", 2).await.unwrap();
507 assert_eq!(store.get_subscription_cursor("m").await.unwrap(), Some(2),);
508 }
509
510 #[tokio::test]
511 async fn sqlite_persistent_store_round_trip() {
512 let store = SqliteMetadataStore::in_memory().unwrap();
513 assert_eq!(store.get_subscription_cursor("mob-a").await.unwrap(), None,);
514 store.set_subscription_cursor("mob-a", 1234).await.unwrap();
515 assert_eq!(
516 store.get_subscription_cursor("mob-a").await.unwrap(),
517 Some(1234),
518 );
519 store.set_subscription_cursor("mob-a", 9999).await.unwrap();
521 assert_eq!(
522 store.get_subscription_cursor("mob-a").await.unwrap(),
523 Some(9999),
524 );
525 store.set_subscription_cursor("mob-b", 5).await.unwrap();
527 assert_eq!(
528 store.get_subscription_cursor("mob-a").await.unwrap(),
529 Some(9999),
530 );
531 assert_eq!(
532 store.get_subscription_cursor("mob-b").await.unwrap(),
533 Some(5),
534 );
535 }
536
537 #[tokio::test]
538 async fn sqlite_store_persists_across_handles() {
539 let dir = tempfile::tempdir().unwrap();
544 let path = dir.path().join("mobkit-metadata.sqlite");
545 {
546 let store = SqliteMetadataStore::open(&path).unwrap();
547 store.set_subscription_cursor("mob-x", 7777).await.unwrap();
548 }
549 let store = SqliteMetadataStore::open(&path).unwrap();
551 assert_eq!(
552 store.get_subscription_cursor("mob-x").await.unwrap(),
553 Some(7777),
554 "cursor should survive handle drop",
555 );
556 }
557
558 #[tokio::test]
559 async fn run_scope_distinguishes_mobs() {
560 let table = RuntimeMetadataTable::new();
561 let scope_a = MetadataScope::Run("mob-a".to_string(), "run-1".to_string());
562 let scope_b = MetadataScope::Run("mob-b".to_string(), "run-1".to_string());
563 table
564 .set_labels(scope_a.clone(), labels(&[("k", "a")]))
565 .await;
566 table
567 .set_labels(scope_b.clone(), labels(&[("k", "b")]))
568 .await;
569 assert_eq!(
570 table
571 .get_labels(&scope_a)
572 .await
573 .get("k")
574 .map(String::as_str),
575 Some("a")
576 );
577 assert_eq!(
578 table
579 .get_labels(&scope_b)
580 .await
581 .get("k")
582 .map(String::as_str),
583 Some("b")
584 );
585 }
586}