Skip to main content

vigy_runtime/
lib.rs

1//! Tokio-driven tick scheduler + registry for vigies.
2//!
3//! ## Topology
4//!
5//! ```text
6//!                 ┌──────────────────────────┐
7//!                 │  RuntimeHandle (Clone)   │  ← public surface used by
8//!                 └────────────┬─────────────┘    vigy-cli / vigy-rpc /
9//!                              │                  vigy-graphql / vigy-rest
10//!                              ▼
11//!                 ┌──────────────────────────┐
12//!                 │      Inner (Arc'd)       │
13//!                 │  - store: Store          │
14//!                 │  - tasks: Mutex<HashMap> │
15//!                 │  - bus:   broadcast tx   │
16//!                 └────────────┬─────────────┘
17//!                              │ spawn per-vigy
18//!                              ▼
19//!     ┌─────────────────────────────────────────────┐
20//!     │  tick_loop(vigy_id) — one tokio task each   │
21//!     │  loop {                                      │
22//!     │    sleep(interval).                          │
23//!     │    evaluate(vigy.program, fresh host).       │
24//!     │    persist VigyRun.                          │
25//!     │    broadcast to bus.                         │
26//!     │    backoff on error.                         │
27//!     │  }                                           │
28//!     └─────────────────────────────────────────────┘
29//! ```
30//!
31//! ## Invariants
32//!
33//! - **One task per vigy.** `register_or_update` cancels the old task
34//!   before spawning the new one, so there's no double-tick risk.
35//! - **Backoff on errors.** Three failing ticks in a row → cap at
36//!   30s sleep until a tick succeeds.
37//! - **Store is source of truth.** On startup, `RuntimeHandle::open`
38//!   reads existing vigies from the store and spawns tasks for the
39//!   enabled ones.
40
41use std::collections::HashMap;
42use std::path::Path;
43use std::sync::Arc;
44use std::time::Duration;
45
46use thiserror::Error;
47use tokio::sync::{broadcast, Mutex};
48use tokio::task::JoinHandle;
49use tokio::time::sleep;
50use tracing::{debug, error, info, warn};
51
52use vigy_eval::{ExtensionHandle, LispReconciler, Reconciler, VigyHost};
53use vigy_store::{Store, StoreError};
54use vigy_types::{ReconcileAction, ResultStatus, Vigy, VigyId, VigyRun};
55
56#[derive(Debug, Error)]
57pub enum RuntimeError {
58    #[error("store: {0}")]
59    Store(#[from] StoreError),
60    #[error("vigy types: {0}")]
61    Types(#[from] vigy_types::Error),
62    #[error("eval: {0}")]
63    Eval(#[from] vigy_eval::EvalErr),
64}
65
66pub type Result<T> = std::result::Result<T, RuntimeError>;
67
68/// Public, cheaply-cloneable handle. All API surfaces drive the runtime
69/// through this — no direct access to internals.
70#[derive(Clone)]
71pub struct RuntimeHandle {
72    inner: Arc<Inner>,
73}
74
75struct Inner {
76    store: Store,
77    tasks: Mutex<HashMap<VigyId, JoinHandle<()>>>,
78    bus: broadcast::Sender<VigyRun>,
79    /// Shared extension bundle every LispReconciler in this runtime
80    /// gets. Hosts (mado, tear-daemon) construct the runtime with
81    /// `with_extensions(...)` to add their own intrinsics; the default
82    /// is `vigy_eval::standard_extensions()`.
83    extensions: Vec<ExtensionHandle>,
84}
85
86const EVENT_BUS_CAPACITY: usize = 1024;
87const MAX_BACKOFF: Duration = Duration::from_secs(30);
88
89impl RuntimeHandle {
90    /// Open / create the persistent store at `path`, then re-spawn
91    /// tick tasks for any enabled vigies recorded there.
92    pub async fn open(path: &Path) -> Result<Self> {
93        let store = Store::open(path).await?;
94        Self::with_store(store, vigy_eval::standard_extensions()).await
95    }
96
97    /// In-memory store — only useful in tests + ephemeral one-shot runs.
98    pub async fn open_in_memory() -> Result<Self> {
99        let store = Store::open_in_memory().await?;
100        Self::with_store(store, vigy_eval::standard_extensions()).await
101    }
102
103    /// Open with a custom extension bundle. mado calls this with
104    /// `standard_extensions() ++ [MadoTearExtension]`; tear-daemon
105    /// calls it with `standard_extensions() ++ [TearCoreExtension]`.
106    /// Pure operators (`vigy serve`) take the default via [`open`] which
107    /// uses `vigy_eval::standard_extensions()` only.
108    pub async fn open_with_extensions(
109        path: &Path,
110        extensions: Vec<ExtensionHandle>,
111    ) -> Result<Self> {
112        let store = Store::open(path).await?;
113        Self::with_store(store, extensions).await
114    }
115
116    async fn with_store(store: Store, extensions: Vec<ExtensionHandle>) -> Result<Self> {
117        let (bus, _) = broadcast::channel(EVENT_BUS_CAPACITY);
118        let handle = Self {
119            inner: Arc::new(Inner {
120                store,
121                tasks: Mutex::new(HashMap::new()),
122                bus,
123                extensions,
124            }),
125        };
126        // Resume any pre-existing vigies.
127        let existing = handle.inner.store.list_vigies(None).await?;
128        let count = existing.len();
129        for v in existing {
130            if v.enabled {
131                handle.spawn_task(v).await;
132            }
133        }
134        info!(resumed = count, "runtime ready");
135        Ok(handle)
136    }
137
138    /// Register a fresh vigy (or replace if its id matches an existing one).
139    /// The new task spawns immediately; the old task (if any) is cancelled
140    /// before the new one starts to guarantee no concurrent ticks.
141    pub async fn register_or_update(&self, vigy: Vigy) -> Result<Vigy> {
142        self.inner.store.upsert_vigy(&vigy).await?;
143        if vigy.enabled {
144            self.spawn_task(vigy.clone()).await;
145        } else {
146            self.cancel_task(&vigy.id).await;
147        }
148        Ok(vigy)
149    }
150
151    pub async fn enable(&self, id: &VigyId) -> Result<Vigy> {
152        self.inner.store.set_enabled(id, true).await?;
153        let v = self.inner.store.get_vigy(id).await?;
154        self.spawn_task(v.clone()).await;
155        Ok(v)
156    }
157
158    pub async fn disable(&self, id: &VigyId) -> Result<Vigy> {
159        self.inner.store.set_enabled(id, false).await?;
160        self.cancel_task(id).await;
161        self.inner.store.get_vigy(id).await.map_err(Into::into)
162    }
163
164    pub async fn delete(&self, id: &VigyId) -> Result<bool> {
165        self.cancel_task(id).await;
166        Ok(self.inner.store.delete_vigy(id).await?)
167    }
168
169    pub async fn get(&self, id: &VigyId) -> Result<Vigy> {
170        Ok(self.inner.store.get_vigy(id).await?)
171    }
172
173    pub async fn list(&self, label_selector: Option<&str>) -> Result<Vec<Vigy>> {
174        Ok(self.inner.store.list_vigies(label_selector).await?)
175    }
176
177    pub async fn recent_runs(&self, id: &VigyId, limit: u64) -> Result<Vec<VigyRun>> {
178        Ok(self.inner.store.recent_runs(id, limit).await?)
179    }
180
181    /// Force-tick a vigy now, regardless of its schedule. Useful for
182    /// `carve gate`-style CI hooks + the `vigy <id> tick` CLI.
183    pub async fn tick_now(&self, id: &VigyId) -> Result<VigyRun> {
184        let vigy = self.inner.store.get_vigy(id).await?;
185        let run = run_once(&self.inner.store, &vigy, &self.inner.extensions).await;
186        self.inner.store.insert_run(&run).await?;
187        let _ = self.inner.bus.send(run.clone());
188        Ok(run)
189    }
190
191    /// Force-tick a vigy through a caller-supplied reconciler. Useful
192    /// for tests that swap in NoopReconciler / ChainReconciler / a
193    /// custom impl without changing the vigy's stored program.
194    pub async fn tick_now_with(
195        &self,
196        id: &VigyId,
197        reconciler: &dyn Reconciler,
198    ) -> Result<VigyRun> {
199        let vigy = self.inner.store.get_vigy(id).await?;
200        let run = run_once_with(&self.inner.store, &vigy, reconciler).await;
201        self.inner.store.insert_run(&run).await?;
202        let _ = self.inner.bus.send(run.clone());
203        Ok(run)
204    }
205
206    /// Subscribe to the reconcile event bus. Caller gets every tick's
207    /// finalised `VigyRun` until they drop the receiver.
208    pub fn subscribe(&self) -> broadcast::Receiver<VigyRun> {
209        self.inner.bus.subscribe()
210    }
211
212    // ---------- internals ----------
213
214    async fn spawn_task(&self, vigy: Vigy) {
215        self.cancel_task(&vigy.id).await;
216        let inner = self.inner.clone();
217        let handle = tokio::spawn(tick_loop(inner, vigy.clone()));
218        self.inner.tasks.lock().await.insert(vigy.id, handle);
219    }
220
221    async fn cancel_task(&self, id: &VigyId) {
222        if let Some(h) = self.inner.tasks.lock().await.remove(id) {
223            h.abort();
224        }
225    }
226}
227
228/// The per-vigy reconcile loop. Lives in its own task; one per registered
229/// vigy. Aborts when the runtime drops it.
230async fn tick_loop(inner: Arc<Inner>, vigy: Vigy) {
231    let id = vigy.id.clone();
232    let interval = vigy.tick_interval.as_duration();
233    let mut failures = 0u32;
234
235    info!(vigy_id = %id, name = %vigy.name, interval_ms = interval.as_millis() as u64, "vigy tick loop started");
236
237    loop {
238        sleep(interval).await;
239
240        // Always re-read the vigy from the store: lets edits land without
241        // having to restart the task. (We still re-register on update to
242        // pick up new tick intervals; this is belt-and-suspenders.)
243        let current = match inner.store.get_vigy(&id).await {
244            Ok(v) if v.enabled => v,
245            Ok(_) => {
246                debug!(vigy_id = %id, "vigy disabled mid-flight; exiting loop");
247                break;
248            }
249            Err(e) => {
250                error!(vigy_id = %id, err = %e, "vigy disappeared from store; exiting loop");
251                break;
252            }
253        };
254
255        let run = run_once(&inner.store, &current, &inner.extensions).await;
256        let failed = matches!(run.result, ResultStatus::Failed);
257
258        if let Err(e) = inner.store.insert_run(&run).await {
259            error!(vigy_id = %id, err = %e, "failed to persist VigyRun");
260        }
261        let _ = inner.bus.send(run);
262
263        if failed {
264            failures = failures.saturating_add(1);
265            let backoff = backoff_for(failures);
266            warn!(vigy_id = %id, failures, backoff_ms = backoff.as_millis() as u64, "vigy tick failed; backing off");
267            sleep(backoff).await;
268        } else {
269            failures = 0;
270        }
271    }
272}
273
274fn backoff_for(failures: u32) -> Duration {
275    // Exponential backoff capped at MAX_BACKOFF. Failures: 1 → 1s,
276    // 2 → 2s, 3 → 4s, 4 → 8s, 5 → 16s, ≥6 → 30s.
277    let secs = 1u64.checked_shl(failures.saturating_sub(1).min(5)).unwrap_or(MAX_BACKOFF.as_secs());
278    Duration::from_secs(secs).min(MAX_BACKOFF)
279}
280
281/// Reserved kv keys the runtime maintains automatically. Operator
282/// programs SHOULD NOT write these — read via `(vigy-tick-count)` etc.
283/// Persisting them in the same kv table keeps the framework state
284/// homogeneous (one round-trip per tick to load/save everything).
285const KV_TICK_COUNT: &str = "__sys::tick_count";
286const KV_PREV_TICK_MS: &str = "__sys::prev_tick_ms";
287
288/// Execute one tick of a vigy through the supplied reconciler.
289/// Handles the persistence sandwich — hydrate kv before, save dirty
290/// after — so reconciler impls can be pure with respect to storage.
291///
292/// Each tick runs inside a `tracing::info_span!("vigy.tick", …)` so
293/// distributed tracing falls out of the existing tracing-subscriber
294/// configuration. Action emissions emit a structured event inside
295/// that span so traces show every reconcile action with kind +
296/// result + message.
297async fn run_once_with(
298    store: &vigy_store::Store,
299    vigy: &Vigy,
300    reconciler: &dyn Reconciler,
301) -> VigyRun {
302    let span = tracing::info_span!(
303        "vigy.tick",
304        vigy_id = %vigy.id,
305        vigy_name = %vigy.name,
306        tick_interval_ms = vigy.tick_interval.as_millis(),
307    );
308    let _enter = span.enter();
309
310    let now = time::OffsetDateTime::now_utc();
311    let tick_start_ms = (now.unix_timestamp_nanos() / 1_000_000) as i64;
312
313    let kv = match store.load_kv(&vigy.id).await {
314        Ok(k) => k,
315        Err(e) => {
316            tracing::warn!(vigy_id = %vigy.id, err = %e, "kv load failed; tick proceeds with empty kv");
317            std::collections::BTreeMap::new()
318        }
319    };
320
321    let prior_tick_count = kv.get(KV_TICK_COUNT).and_then(|v| v.as_i64()).unwrap_or(0);
322    let previous_tick_ms = kv.get(KV_PREV_TICK_MS).and_then(|v| v.as_i64());
323    let tick_count = prior_tick_count + 1;
324
325    let mut host = VigyHost {
326        tick_start_ms,
327        previous_tick_ms,
328        tick_count,
329        kv,
330        ..Default::default()
331    };
332    host.kv.insert(
333        KV_TICK_COUNT.to_string(),
334        serde_json::Value::Number(tick_count.into()),
335    );
336    host.kv.insert(
337        KV_PREV_TICK_MS.to_string(),
338        serde_json::Value::Number(tick_start_ms.into()),
339    );
340    host.kv_dirty.insert(KV_TICK_COUNT.to_string());
341    host.kv_dirty.insert(KV_PREV_TICK_MS.to_string());
342
343    let run = VigyRun::started(vigy.id.clone());
344
345    match reconciler.tick(host).await {
346        Ok(populated) => {
347            // Emit a structured event per action so distributed traces
348            // see every reconcile decision with its typed kind, payload
349            // size, and result (if the action was pre-resolved).
350            for action in &populated.actions {
351                tracing::event!(
352                    tracing::Level::DEBUG,
353                    kind = ?action.kind,
354                    has_payload = action.payload.is_some(),
355                    result = ?action.result,
356                    "vigy.action",
357                );
358            }
359            // Per-tick summary at INFO so even a tracing filter that
360            // suppresses DEBUG sees one line per tick.
361            tracing::info!(
362                action_count = populated.actions.len(),
363                kv_writes = populated.kv_dirty.len(),
364                kv_deletes = populated.kv_deleted.len(),
365                conditions = populated.conditions.len(),
366                "vigy.tick.completed",
367            );
368
369            let dirty: std::collections::BTreeMap<String, serde_json::Value> = populated
370                .kv_dirty
371                .iter()
372                .filter_map(|k| populated.kv.get(k).map(|v| (k.clone(), v.clone())))
373                .collect();
374            if !dirty.is_empty() || !populated.kv_deleted.is_empty() {
375                if let Err(e) = store.save_kv(&vigy.id, &dirty, &populated.kv_deleted).await {
376                    tracing::warn!(vigy_id = %vigy.id, err = %e, "kv save failed; in-memory state lost");
377                }
378            }
379            let actions: Vec<ReconcileAction> = populated.actions;
380            run.complete_ok(actions)
381        }
382        Err(e) => {
383            tracing::warn!(vigy_id = %vigy.id, err = %e, "vigy.tick.failed");
384            run.complete_failed(format!("{e}"))
385        }
386    }
387}
388
389/// Backwards-compatible default tick — builds a LispReconciler with the
390/// runtime's extensions and dispatches through [`run_once_with`].
391async fn run_once(
392    store: &vigy_store::Store,
393    vigy: &Vigy,
394    extensions: &[ExtensionHandle],
395) -> VigyRun {
396    let reconciler = LispReconciler::with_extensions(vigy.program.clone(), extensions.to_vec());
397    run_once_with(store, vigy, &reconciler).await
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403    use vigy_types::TickInterval;
404
405    #[tokio::test]
406    async fn register_and_tick_emits_an_action() {
407        let rt = RuntimeHandle::open_in_memory().await.unwrap();
408        let v = Vigy::new(
409            "test",
410            "(vigy-noop)",
411            TickInterval::from_millis(100).unwrap(),
412        )
413        .unwrap();
414        let mut sub = rt.subscribe();
415        let id = v.id.clone();
416        rt.register_or_update(v).await.unwrap();
417        // Force a tick directly instead of waiting on the scheduled one
418        // — keeps the test deterministic + fast.
419        let run = rt.tick_now(&id).await.unwrap();
420        assert_eq!(run.actions.len(), 1);
421        // The event bus also saw it.
422        let bus_run = sub.recv().await.unwrap();
423        assert_eq!(bus_run.id, run.id);
424    }
425
426    #[tokio::test]
427    async fn disable_stops_ticking() {
428        let rt = RuntimeHandle::open_in_memory().await.unwrap();
429        let v = Vigy::new(
430            "test",
431            "(vigy-noop)",
432            TickInterval::from_millis(100).unwrap(),
433        )
434        .unwrap();
435        let id = v.id.clone();
436        rt.register_or_update(v).await.unwrap();
437        rt.disable(&id).await.unwrap();
438        assert!(!rt.get(&id).await.unwrap().enabled);
439    }
440
441    #[tokio::test]
442    async fn kv_persists_across_ticks() {
443        let rt = RuntimeHandle::open_in_memory().await.unwrap();
444        // A vigy that increments a counter on each tick.
445        let v = Vigy::new(
446            "counter",
447            "(vigy-incr \"hits\")",
448            TickInterval::from_millis(100).unwrap(),
449        )
450        .unwrap();
451        let id = v.id.clone();
452        rt.register_or_update(v).await.unwrap();
453
454        // Tick three times and read the kv state via the store each time.
455        rt.tick_now(&id).await.unwrap();
456        rt.tick_now(&id).await.unwrap();
457        rt.tick_now(&id).await.unwrap();
458
459        let kv = rt.inner.store.load_kv(&id).await.unwrap();
460        let hits = kv.get("hits").and_then(|v| v.as_i64()).unwrap_or(0);
461        assert_eq!(hits, 3, "counter should have incremented once per tick");
462
463        // Reserved sys keys should also be populated.
464        let tick_count = kv
465            .get("__sys::tick_count")
466            .and_then(|v| v.as_i64())
467            .unwrap_or(0);
468        assert_eq!(tick_count, 3);
469    }
470
471    #[tokio::test]
472    async fn tick_now_with_swaps_reconciler() {
473        // Register a vigy whose stored program would emit a Noop. Then
474        // force-tick it with NoopReconciler (no actions), then with a
475        // ChainReconciler that runs two LispReconcilers in sequence —
476        // proving the runtime really dispatches through the trait, not
477        // through the stored program.
478        use vigy_eval::{ChainReconciler, LispReconciler, NoopReconciler};
479
480        let rt = RuntimeHandle::open_in_memory().await.unwrap();
481        let v = Vigy::new(
482            "swap-test",
483            "(vigy-noop)",
484            TickInterval::from_millis(100).unwrap(),
485        )
486        .unwrap();
487        let id = v.id.clone();
488        rt.register_or_update(v).await.unwrap();
489
490        // NoopReconciler: no actions.
491        let r1 = rt
492            .tick_now_with(&id, &NoopReconciler)
493            .await
494            .unwrap();
495        assert!(r1.actions.is_empty());
496
497        // ChainReconciler: two pulls in sequence.
498        let chain = ChainReconciler::new(vec![
499            Box::new(LispReconciler::standard("(vigy-pull \"a\")")),
500            Box::new(LispReconciler::standard("(vigy-pull \"b\")")),
501        ]);
502        let r2 = rt.tick_now_with(&id, &chain).await.unwrap();
503        assert_eq!(r2.actions.len(), 2);
504        assert_eq!(
505            r2.actions
506                .iter()
507                .map(|a| a.kind)
508                .collect::<Vec<_>>(),
509            vec![
510                vigy_types::ReconcileKind::Pull,
511                vigy_types::ReconcileKind::Pull,
512            ]
513        );
514    }
515
516    #[tokio::test]
517    async fn convergence_flag_survives_ticks() {
518        let rt = RuntimeHandle::open_in_memory().await.unwrap();
519        // A vigy that converges on its first tick + records the state.
520        let v = Vigy::new(
521            "converger",
522            r#"
523            (vigy-set "is_converged_now" (vigy-converged? "goal"))
524            (vigy-mark-converged "goal")
525            "#,
526            TickInterval::from_millis(100).unwrap(),
527        )
528        .unwrap();
529        let id = v.id.clone();
530        rt.register_or_update(v).await.unwrap();
531
532        rt.tick_now(&id).await.unwrap();
533        let kv1 = rt.inner.store.load_kv(&id).await.unwrap();
534        // First tick: marked AFTER reading, so reads false.
535        assert_eq!(
536            kv1.get("is_converged_now"),
537            Some(&serde_json::Value::Bool(false))
538        );
539
540        rt.tick_now(&id).await.unwrap();
541        let kv2 = rt.inner.store.load_kv(&id).await.unwrap();
542        // Second tick: already marked, so reads true.
543        assert_eq!(
544            kv2.get("is_converged_now"),
545            Some(&serde_json::Value::Bool(true))
546        );
547    }
548
549    #[tokio::test]
550    async fn failed_run_records_error_and_keeps_loop_alive() {
551        let rt = RuntimeHandle::open_in_memory().await.unwrap();
552        // Unbound symbol → eval error → run.result = Failed.
553        let v = Vigy::new(
554            "broken",
555            "(this-symbol-does-not-exist)",
556            TickInterval::from_millis(100).unwrap(),
557        )
558        .unwrap();
559        let id = v.id.clone();
560        rt.register_or_update(v).await.unwrap();
561        let run = rt.tick_now(&id).await.unwrap();
562        assert!(matches!(run.result, ResultStatus::Failed));
563        assert!(run.error.is_some());
564    }
565
566    #[test]
567    fn backoff_curve() {
568        assert_eq!(backoff_for(1), Duration::from_secs(1));
569        assert_eq!(backoff_for(2), Duration::from_secs(2));
570        assert_eq!(backoff_for(3), Duration::from_secs(4));
571        assert_eq!(backoff_for(4), Duration::from_secs(8));
572        assert_eq!(backoff_for(5), Duration::from_secs(16));
573        assert_eq!(backoff_for(6), Duration::from_secs(30));
574        assert_eq!(backoff_for(100), Duration::from_secs(30));
575    }
576}