1use std::collections::HashMap;
42use std::path::Path;
43use std::sync::Arc;
44use std::time::Duration;
45
46use thiserror::Error;
47use tokio::sync::{broadcast, Mutex};
48use tokio::task::JoinHandle;
49use tokio::time::sleep;
50use tracing::{debug, error, info, warn};
51
52use vigy_eval::{ExtensionHandle, LispReconciler, Reconciler, VigyHost};
53use vigy_store::{Store, StoreError};
54use vigy_types::{ReconcileAction, ResultStatus, Vigy, VigyId, VigyRun};
55
56#[derive(Debug, Error)]
57pub enum RuntimeError {
58 #[error("store: {0}")]
59 Store(#[from] StoreError),
60 #[error("vigy types: {0}")]
61 Types(#[from] vigy_types::Error),
62 #[error("eval: {0}")]
63 Eval(#[from] vigy_eval::EvalErr),
64}
65
66pub type Result<T> = std::result::Result<T, RuntimeError>;
67
68#[derive(Clone)]
71pub struct RuntimeHandle {
72 inner: Arc<Inner>,
73}
74
75struct Inner {
76 store: Store,
77 tasks: Mutex<HashMap<VigyId, JoinHandle<()>>>,
78 bus: broadcast::Sender<VigyRun>,
79 extensions: Vec<ExtensionHandle>,
84}
85
86const EVENT_BUS_CAPACITY: usize = 1024;
87const MAX_BACKOFF: Duration = Duration::from_secs(30);
88
89impl RuntimeHandle {
90 pub async fn open(path: &Path) -> Result<Self> {
93 let store = Store::open(path).await?;
94 Self::with_store(store, vigy_eval::standard_extensions()).await
95 }
96
97 pub async fn open_in_memory() -> Result<Self> {
99 let store = Store::open_in_memory().await?;
100 Self::with_store(store, vigy_eval::standard_extensions()).await
101 }
102
103 pub async fn open_with_extensions(
109 path: &Path,
110 extensions: Vec<ExtensionHandle>,
111 ) -> Result<Self> {
112 let store = Store::open(path).await?;
113 Self::with_store(store, extensions).await
114 }
115
116 async fn with_store(store: Store, extensions: Vec<ExtensionHandle>) -> Result<Self> {
117 let (bus, _) = broadcast::channel(EVENT_BUS_CAPACITY);
118 let handle = Self {
119 inner: Arc::new(Inner {
120 store,
121 tasks: Mutex::new(HashMap::new()),
122 bus,
123 extensions,
124 }),
125 };
126 let existing = handle.inner.store.list_vigies(None).await?;
128 let count = existing.len();
129 for v in existing {
130 if v.enabled {
131 handle.spawn_task(v).await;
132 }
133 }
134 info!(resumed = count, "runtime ready");
135 Ok(handle)
136 }
137
138 pub async fn register_or_update(&self, vigy: Vigy) -> Result<Vigy> {
142 self.inner.store.upsert_vigy(&vigy).await?;
143 if vigy.enabled {
144 self.spawn_task(vigy.clone()).await;
145 } else {
146 self.cancel_task(&vigy.id).await;
147 }
148 Ok(vigy)
149 }
150
151 pub async fn enable(&self, id: &VigyId) -> Result<Vigy> {
152 self.inner.store.set_enabled(id, true).await?;
153 let v = self.inner.store.get_vigy(id).await?;
154 self.spawn_task(v.clone()).await;
155 Ok(v)
156 }
157
158 pub async fn disable(&self, id: &VigyId) -> Result<Vigy> {
159 self.inner.store.set_enabled(id, false).await?;
160 self.cancel_task(id).await;
161 self.inner.store.get_vigy(id).await.map_err(Into::into)
162 }
163
164 pub async fn delete(&self, id: &VigyId) -> Result<bool> {
165 self.cancel_task(id).await;
166 Ok(self.inner.store.delete_vigy(id).await?)
167 }
168
169 pub async fn get(&self, id: &VigyId) -> Result<Vigy> {
170 Ok(self.inner.store.get_vigy(id).await?)
171 }
172
173 pub async fn list(&self, label_selector: Option<&str>) -> Result<Vec<Vigy>> {
174 Ok(self.inner.store.list_vigies(label_selector).await?)
175 }
176
177 pub async fn recent_runs(&self, id: &VigyId, limit: u64) -> Result<Vec<VigyRun>> {
178 Ok(self.inner.store.recent_runs(id, limit).await?)
179 }
180
181 pub async fn tick_now(&self, id: &VigyId) -> Result<VigyRun> {
184 let vigy = self.inner.store.get_vigy(id).await?;
185 let run = run_once(&self.inner.store, &vigy, &self.inner.extensions).await;
186 self.inner.store.insert_run(&run).await?;
187 let _ = self.inner.bus.send(run.clone());
188 Ok(run)
189 }
190
191 pub async fn tick_now_with(
195 &self,
196 id: &VigyId,
197 reconciler: &dyn Reconciler,
198 ) -> Result<VigyRun> {
199 let vigy = self.inner.store.get_vigy(id).await?;
200 let run = run_once_with(&self.inner.store, &vigy, reconciler).await;
201 self.inner.store.insert_run(&run).await?;
202 let _ = self.inner.bus.send(run.clone());
203 Ok(run)
204 }
205
206 pub fn subscribe(&self) -> broadcast::Receiver<VigyRun> {
209 self.inner.bus.subscribe()
210 }
211
212 async fn spawn_task(&self, vigy: Vigy) {
215 self.cancel_task(&vigy.id).await;
216 let inner = self.inner.clone();
217 let handle = tokio::spawn(tick_loop(inner, vigy.clone()));
218 self.inner.tasks.lock().await.insert(vigy.id, handle);
219 }
220
221 async fn cancel_task(&self, id: &VigyId) {
222 if let Some(h) = self.inner.tasks.lock().await.remove(id) {
223 h.abort();
224 }
225 }
226}
227
228async fn tick_loop(inner: Arc<Inner>, vigy: Vigy) {
231 let id = vigy.id.clone();
232 let interval = vigy.tick_interval.as_duration();
233 let mut failures = 0u32;
234
235 info!(vigy_id = %id, name = %vigy.name, interval_ms = interval.as_millis() as u64, "vigy tick loop started");
236
237 loop {
238 sleep(interval).await;
239
240 let current = match inner.store.get_vigy(&id).await {
244 Ok(v) if v.enabled => v,
245 Ok(_) => {
246 debug!(vigy_id = %id, "vigy disabled mid-flight; exiting loop");
247 break;
248 }
249 Err(e) => {
250 error!(vigy_id = %id, err = %e, "vigy disappeared from store; exiting loop");
251 break;
252 }
253 };
254
255 let run = run_once(&inner.store, ¤t, &inner.extensions).await;
256 let failed = matches!(run.result, ResultStatus::Failed);
257
258 if let Err(e) = inner.store.insert_run(&run).await {
259 error!(vigy_id = %id, err = %e, "failed to persist VigyRun");
260 }
261 let _ = inner.bus.send(run);
262
263 if failed {
264 failures = failures.saturating_add(1);
265 let backoff = backoff_for(failures);
266 warn!(vigy_id = %id, failures, backoff_ms = backoff.as_millis() as u64, "vigy tick failed; backing off");
267 sleep(backoff).await;
268 } else {
269 failures = 0;
270 }
271 }
272}
273
274fn backoff_for(failures: u32) -> Duration {
275 let secs = 1u64.checked_shl(failures.saturating_sub(1).min(5)).unwrap_or(MAX_BACKOFF.as_secs());
278 Duration::from_secs(secs).min(MAX_BACKOFF)
279}
280
281const KV_TICK_COUNT: &str = "__sys::tick_count";
286const KV_PREV_TICK_MS: &str = "__sys::prev_tick_ms";
287
288async fn run_once_with(
298 store: &vigy_store::Store,
299 vigy: &Vigy,
300 reconciler: &dyn Reconciler,
301) -> VigyRun {
302 let span = tracing::info_span!(
303 "vigy.tick",
304 vigy_id = %vigy.id,
305 vigy_name = %vigy.name,
306 tick_interval_ms = vigy.tick_interval.as_millis(),
307 );
308 let _enter = span.enter();
309
310 let now = time::OffsetDateTime::now_utc();
311 let tick_start_ms = (now.unix_timestamp_nanos() / 1_000_000) as i64;
312
313 let kv = match store.load_kv(&vigy.id).await {
314 Ok(k) => k,
315 Err(e) => {
316 tracing::warn!(vigy_id = %vigy.id, err = %e, "kv load failed; tick proceeds with empty kv");
317 std::collections::BTreeMap::new()
318 }
319 };
320
321 let prior_tick_count = kv.get(KV_TICK_COUNT).and_then(|v| v.as_i64()).unwrap_or(0);
322 let previous_tick_ms = kv.get(KV_PREV_TICK_MS).and_then(|v| v.as_i64());
323 let tick_count = prior_tick_count + 1;
324
325 let mut host = VigyHost {
326 tick_start_ms,
327 previous_tick_ms,
328 tick_count,
329 kv,
330 ..Default::default()
331 };
332 host.kv.insert(
333 KV_TICK_COUNT.to_string(),
334 serde_json::Value::Number(tick_count.into()),
335 );
336 host.kv.insert(
337 KV_PREV_TICK_MS.to_string(),
338 serde_json::Value::Number(tick_start_ms.into()),
339 );
340 host.kv_dirty.insert(KV_TICK_COUNT.to_string());
341 host.kv_dirty.insert(KV_PREV_TICK_MS.to_string());
342
343 let run = VigyRun::started(vigy.id.clone());
344
345 match reconciler.tick(host).await {
346 Ok(populated) => {
347 for action in &populated.actions {
351 tracing::event!(
352 tracing::Level::DEBUG,
353 kind = ?action.kind,
354 has_payload = action.payload.is_some(),
355 result = ?action.result,
356 "vigy.action",
357 );
358 }
359 tracing::info!(
362 action_count = populated.actions.len(),
363 kv_writes = populated.kv_dirty.len(),
364 kv_deletes = populated.kv_deleted.len(),
365 conditions = populated.conditions.len(),
366 "vigy.tick.completed",
367 );
368
369 let dirty: std::collections::BTreeMap<String, serde_json::Value> = populated
370 .kv_dirty
371 .iter()
372 .filter_map(|k| populated.kv.get(k).map(|v| (k.clone(), v.clone())))
373 .collect();
374 if !dirty.is_empty() || !populated.kv_deleted.is_empty() {
375 if let Err(e) = store.save_kv(&vigy.id, &dirty, &populated.kv_deleted).await {
376 tracing::warn!(vigy_id = %vigy.id, err = %e, "kv save failed; in-memory state lost");
377 }
378 }
379 let actions: Vec<ReconcileAction> = populated.actions;
380 run.complete_ok(actions)
381 }
382 Err(e) => {
383 tracing::warn!(vigy_id = %vigy.id, err = %e, "vigy.tick.failed");
384 run.complete_failed(format!("{e}"))
385 }
386 }
387}
388
389async fn run_once(
392 store: &vigy_store::Store,
393 vigy: &Vigy,
394 extensions: &[ExtensionHandle],
395) -> VigyRun {
396 let reconciler = LispReconciler::with_extensions(vigy.program.clone(), extensions.to_vec());
397 run_once_with(store, vigy, &reconciler).await
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403 use vigy_types::TickInterval;
404
405 #[tokio::test]
406 async fn register_and_tick_emits_an_action() {
407 let rt = RuntimeHandle::open_in_memory().await.unwrap();
408 let v = Vigy::new(
409 "test",
410 "(vigy-noop)",
411 TickInterval::from_millis(100).unwrap(),
412 )
413 .unwrap();
414 let mut sub = rt.subscribe();
415 let id = v.id.clone();
416 rt.register_or_update(v).await.unwrap();
417 let run = rt.tick_now(&id).await.unwrap();
420 assert_eq!(run.actions.len(), 1);
421 let bus_run = sub.recv().await.unwrap();
423 assert_eq!(bus_run.id, run.id);
424 }
425
426 #[tokio::test]
427 async fn disable_stops_ticking() {
428 let rt = RuntimeHandle::open_in_memory().await.unwrap();
429 let v = Vigy::new(
430 "test",
431 "(vigy-noop)",
432 TickInterval::from_millis(100).unwrap(),
433 )
434 .unwrap();
435 let id = v.id.clone();
436 rt.register_or_update(v).await.unwrap();
437 rt.disable(&id).await.unwrap();
438 assert!(!rt.get(&id).await.unwrap().enabled);
439 }
440
441 #[tokio::test]
442 async fn kv_persists_across_ticks() {
443 let rt = RuntimeHandle::open_in_memory().await.unwrap();
444 let v = Vigy::new(
446 "counter",
447 "(vigy-incr \"hits\")",
448 TickInterval::from_millis(100).unwrap(),
449 )
450 .unwrap();
451 let id = v.id.clone();
452 rt.register_or_update(v).await.unwrap();
453
454 rt.tick_now(&id).await.unwrap();
456 rt.tick_now(&id).await.unwrap();
457 rt.tick_now(&id).await.unwrap();
458
459 let kv = rt.inner.store.load_kv(&id).await.unwrap();
460 let hits = kv.get("hits").and_then(|v| v.as_i64()).unwrap_or(0);
461 assert_eq!(hits, 3, "counter should have incremented once per tick");
462
463 let tick_count = kv
465 .get("__sys::tick_count")
466 .and_then(|v| v.as_i64())
467 .unwrap_or(0);
468 assert_eq!(tick_count, 3);
469 }
470
471 #[tokio::test]
472 async fn tick_now_with_swaps_reconciler() {
473 use vigy_eval::{ChainReconciler, LispReconciler, NoopReconciler};
479
480 let rt = RuntimeHandle::open_in_memory().await.unwrap();
481 let v = Vigy::new(
482 "swap-test",
483 "(vigy-noop)",
484 TickInterval::from_millis(100).unwrap(),
485 )
486 .unwrap();
487 let id = v.id.clone();
488 rt.register_or_update(v).await.unwrap();
489
490 let r1 = rt
492 .tick_now_with(&id, &NoopReconciler)
493 .await
494 .unwrap();
495 assert!(r1.actions.is_empty());
496
497 let chain = ChainReconciler::new(vec![
499 Box::new(LispReconciler::standard("(vigy-pull \"a\")")),
500 Box::new(LispReconciler::standard("(vigy-pull \"b\")")),
501 ]);
502 let r2 = rt.tick_now_with(&id, &chain).await.unwrap();
503 assert_eq!(r2.actions.len(), 2);
504 assert_eq!(
505 r2.actions
506 .iter()
507 .map(|a| a.kind)
508 .collect::<Vec<_>>(),
509 vec![
510 vigy_types::ReconcileKind::Pull,
511 vigy_types::ReconcileKind::Pull,
512 ]
513 );
514 }
515
516 #[tokio::test]
517 async fn convergence_flag_survives_ticks() {
518 let rt = RuntimeHandle::open_in_memory().await.unwrap();
519 let v = Vigy::new(
521 "converger",
522 r#"
523 (vigy-set "is_converged_now" (vigy-converged? "goal"))
524 (vigy-mark-converged "goal")
525 "#,
526 TickInterval::from_millis(100).unwrap(),
527 )
528 .unwrap();
529 let id = v.id.clone();
530 rt.register_or_update(v).await.unwrap();
531
532 rt.tick_now(&id).await.unwrap();
533 let kv1 = rt.inner.store.load_kv(&id).await.unwrap();
534 assert_eq!(
536 kv1.get("is_converged_now"),
537 Some(&serde_json::Value::Bool(false))
538 );
539
540 rt.tick_now(&id).await.unwrap();
541 let kv2 = rt.inner.store.load_kv(&id).await.unwrap();
542 assert_eq!(
544 kv2.get("is_converged_now"),
545 Some(&serde_json::Value::Bool(true))
546 );
547 }
548
549 #[tokio::test]
550 async fn failed_run_records_error_and_keeps_loop_alive() {
551 let rt = RuntimeHandle::open_in_memory().await.unwrap();
552 let v = Vigy::new(
554 "broken",
555 "(this-symbol-does-not-exist)",
556 TickInterval::from_millis(100).unwrap(),
557 )
558 .unwrap();
559 let id = v.id.clone();
560 rt.register_or_update(v).await.unwrap();
561 let run = rt.tick_now(&id).await.unwrap();
562 assert!(matches!(run.result, ResultStatus::Failed));
563 assert!(run.error.is_some());
564 }
565
566 #[test]
567 fn backoff_curve() {
568 assert_eq!(backoff_for(1), Duration::from_secs(1));
569 assert_eq!(backoff_for(2), Duration::from_secs(2));
570 assert_eq!(backoff_for(3), Duration::from_secs(4));
571 assert_eq!(backoff_for(4), Duration::from_secs(8));
572 assert_eq!(backoff_for(5), Duration::from_secs(16));
573 assert_eq!(backoff_for(6), Duration::from_secs(30));
574 assert_eq!(backoff_for(100), Duration::from_secs(30));
575 }
576}