1use std::collections::{HashMap, HashSet};
26use std::fmt;
27use std::sync::{Arc, Mutex};
28use std::time::Duration;
29
30use ergo_runtime::catalog::{CorePrimitiveCatalog, CoreRegistries};
31use ergo_runtime::cluster::{ExpandedGraph, PrimitiveCatalog, PrimitiveKind};
32use ergo_runtime::common::{ActionEffect, Value};
33use ergo_runtime::runtime::{
34 execute_with_metadata, validate as runtime_validate, ExecError,
35 ExecutionContext as RuntimeExecutionContext, Registries,
36};
37use serde::{Deserialize, Serialize};
38
39pub mod capture;
40pub(crate) mod common;
41pub mod composition;
42pub mod errors;
43pub mod event_binding;
44pub mod fixture;
45pub mod manifest;
46pub mod provenance;
47pub mod provides;
48pub mod registry;
49mod schema_materialization;
50pub mod validate;
51
52pub use composition::{
53 validate_action_adapter_composition, validate_capture_format,
54 validate_source_adapter_composition, CompositionError, ContextRequirement, SourceRequires,
55};
56pub use errors::InvalidAdapter;
57pub use event_binding::{
58 bind_semantic_event_with_binder, compile_event_binder, EventBinder, EventBindingError,
59};
60pub use manifest::{
61 AcceptsSpec, AdapterManifest, CaptureSpec, ContextKeySpec, EffectSpec, EventKindSpec,
62};
63pub use provenance::fingerprint as adapter_fingerprint;
64pub use provides::{AdapterProvides, ContextKeyProvision};
65pub use registry::register;
66pub use validate::validate_adapter;
67
68#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
69#[serde(transparent)]
70pub struct GraphId(String);
71
72impl GraphId {
73 pub fn new(id: impl Into<String>) -> Self {
74 Self(id.into())
75 }
76
77 pub fn as_str(&self) -> &str {
78 &self.0
79 }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
83#[serde(transparent)]
84pub struct EventId(String);
85
86impl EventId {
87 pub fn new(id: impl Into<String>) -> Self {
88 EventId(id.into())
89 }
90
91 pub fn as_str(&self) -> &str {
92 &self.0
93 }
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97pub enum ErrKind {
98 NetworkTimeout,
99 AdapterUnavailable,
100 ValidationFailed,
101 RuntimeError,
102 SemanticError,
109 DeadlineExceeded,
110 Cancelled,
111}
112
113#[derive(Debug, Clone)]
115struct RunResult {
116 termination: RunTermination,
117 effects: Vec<ActionEffect>,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
121pub enum RunTermination {
122 Completed,
123 TimedOut,
124 Aborted,
125 Failed(ErrKind),
126}
127
128#[derive(Debug, Clone)]
150pub struct ExecutionContext {
151 inner: RuntimeExecutionContext,
152}
153
154impl ExecutionContext {
155 pub(crate) fn new(inner: RuntimeExecutionContext) -> Self {
156 Self { inner }
157 }
158
159 pub(crate) fn inner(&self) -> &RuntimeExecutionContext {
160 &self.inner
161 }
162}
163
164#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)]
166#[serde(transparent)]
167pub struct EventTime(Duration);
168
169impl EventTime {
170 pub fn from_duration(duration: Duration) -> Self {
171 Self(duration)
172 }
173
174 pub fn as_duration(&self) -> Duration {
175 self.0
176 }
177
178 pub fn saturating_add(&self, duration: Duration) -> Self {
179 Self(self.0.saturating_add(duration))
180 }
181}
182
183impl From<Duration> for EventTime {
184 fn from(value: Duration) -> Self {
185 EventTime::from_duration(value)
186 }
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
190#[serde(transparent)]
191pub struct EventPayload {
192 pub data: Vec<u8>,
193}
194
195#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
196#[non_exhaustive]
197pub enum ExternalEventPayloadError {
198 InvalidJson { detail: String },
199 PayloadMustBeJsonObject { got: String },
200}
201
202impl fmt::Display for ExternalEventPayloadError {
203 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204 match self {
205 Self::InvalidJson { detail } => write!(f, "payload bytes are not valid JSON: {detail}"),
206 Self::PayloadMustBeJsonObject { got } => {
207 write!(f, "payload must be a JSON object, got {got}")
208 }
209 }
210 }
211}
212
213impl std::error::Error for ExternalEventPayloadError {}
214
215#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
216pub enum ExternalEventKind {
217 #[serde(alias = "Tick")]
220 Pump,
221 DataAvailable,
222 Command,
223}
224
225#[derive(Debug, Clone)]
226pub struct ExternalEvent {
227 event_id: EventId,
228 kind: ExternalEventKind,
229 context: ExecutionContext,
230 at: EventTime,
231 payload: EventPayload,
232}
233
234impl ExternalEvent {
235 pub(crate) fn new(
236 event_id: EventId,
237 kind: ExternalEventKind,
238 context: ExecutionContext,
239 at: EventTime,
240 payload: EventPayload,
241 ) -> Self {
242 Self {
243 event_id,
244 kind,
245 context,
246 at,
247 payload,
248 }
249 }
250
251 pub fn mechanical_at(event_id: EventId, kind: ExternalEventKind, at: EventTime) -> Self {
252 let context = ExecutionContext::new(RuntimeExecutionContext::default());
253 Self::new(event_id, kind, context, at, EventPayload::default())
254 }
255
256 pub fn mechanical(event_id: EventId, kind: ExternalEventKind) -> Self {
257 Self::mechanical_at(event_id, kind, EventTime::default())
258 }
259
260 pub fn with_payload(
261 event_id: EventId,
262 kind: ExternalEventKind,
263 at: EventTime,
264 payload: EventPayload,
265 ) -> Result<Self, ExternalEventPayloadError> {
266 let context = ExecutionContext::new(context_from_payload(&payload)?);
267 Ok(Self::new(event_id, kind, context, at, payload))
268 }
269
270 pub fn context(&self) -> &ExecutionContext {
271 &self.context
272 }
273
274 pub fn kind(&self) -> ExternalEventKind {
275 self.kind
276 }
277
278 pub fn event_id(&self) -> &EventId {
279 &self.event_id
280 }
281
282 pub fn at(&self) -> EventTime {
283 self.at
284 }
285
286 pub fn payload(&self) -> &EventPayload {
287 &self.payload
288 }
289}
290
291fn context_from_payload(
292 payload: &EventPayload,
293) -> Result<RuntimeExecutionContext, ExternalEventPayloadError> {
294 let values = payload_values(payload)?;
295 Ok(RuntimeExecutionContext::from_values(values))
296}
297
298fn payload_values(
299 payload: &EventPayload,
300) -> Result<HashMap<String, Value>, ExternalEventPayloadError> {
301 if payload.data.is_empty() {
302 return Ok(HashMap::new());
303 }
304
305 let parsed: serde_json::Value = match serde_json::from_slice(&payload.data) {
306 Ok(value) => value,
307 Err(err) => {
308 return Err(ExternalEventPayloadError::InvalidJson {
309 detail: err.to_string(),
310 });
311 }
312 };
313
314 let Some(object) = parsed.as_object() else {
315 return Err(ExternalEventPayloadError::PayloadMustBeJsonObject {
316 got: json_type_name(&parsed).to_string(),
317 });
318 };
319
320 let mut values = HashMap::new();
321 for (key, value) in object {
322 if let Some(mapped) = json_to_value(value) {
323 values.insert(key.clone(), mapped);
324 }
325 }
326
327 Ok(values)
328}
329
330use common::json_type_name;
331
332fn json_to_value(value: &serde_json::Value) -> Option<Value> {
333 if let Some(number) = value.as_f64() {
334 return Some(Value::Number(number));
335 }
336
337 if let Some(text) = value.as_str() {
338 return Some(Value::String(text.to_string()));
339 }
340
341 if let Some(flag) = value.as_bool() {
342 return Some(Value::Bool(flag));
343 }
344
345 let items = value.as_array()?;
346
347 let mut series = Vec::with_capacity(items.len());
348 for item in items {
349 let number = item.as_f64()?;
350 series.push(number);
351 }
352
353 Some(Value::Series(series))
354}
355
356#[derive(Clone)]
358struct RuntimeState {
359 graph: Arc<ExpandedGraph>,
360 catalog: Arc<CorePrimitiveCatalog>,
361 registries: Arc<CoreRegistries>,
362 adapter_provides: AdapterProvides,
363}
364
365impl RuntimeState {
366 fn new(
367 graph: Arc<ExpandedGraph>,
368 catalog: Arc<CorePrimitiveCatalog>,
369 registries: Arc<CoreRegistries>,
370 adapter_provides: AdapterProvides,
371 ) -> Self {
372 Self {
373 graph,
374 catalog,
375 registries,
376 adapter_provides,
377 }
378 }
379
380 fn graph_emittable_effect_kinds(&self) -> HashSet<String> {
381 let mut kinds = HashSet::new();
382
383 for node in self.graph.nodes.values() {
384 let Some(meta) = self
385 .catalog
386 .get(&node.implementation.impl_id, &node.implementation.version)
387 else {
388 continue;
389 };
390 if meta.kind != PrimitiveKind::Action {
391 continue;
392 }
393
394 let Some(action) = self.registries.actions.get(&node.implementation.impl_id) else {
395 continue;
396 };
397
398 let emits_set_context = !action.manifest().effects.writes.is_empty()
399 || action
400 .manifest()
401 .effects
402 .intents
403 .iter()
404 .any(|intent| !intent.mirror_writes.is_empty());
405 if emits_set_context {
406 kinds.insert("set_context".to_string());
407 }
408
409 for intent in &action.manifest().effects.intents {
410 kinds.insert(intent.name.clone());
411 }
412 }
413
414 kinds
415 }
416
417 fn validate_composition(
418 &self,
419 graph: &ergo_runtime::runtime::ValidatedGraph,
420 ) -> Result<(), CompositionError> {
421 if !self.adapter_provides.capture_format_version.is_empty() {
423 validate_capture_format(&self.adapter_provides.capture_format_version)?;
424 }
425
426 for node in graph.nodes.values() {
427 if node.kind != PrimitiveKind::Source {
428 continue;
429 }
430
431 let Some(primitive) = self.registries.sources.get(&node.impl_id) else {
432 continue;
433 };
434
435 let manifest = primitive.manifest();
436 let source_params =
437 source_parameters_with_manifest_defaults(manifest, &node.parameters);
438 validate_source_adapter_composition(
439 &manifest.requires,
440 &self.adapter_provides,
441 &source_params,
442 )?;
443 }
444
445 for node in graph.nodes.values() {
446 if node.kind != PrimitiveKind::Action {
447 continue;
448 }
449
450 let Some(primitive) = self.registries.actions.get(&node.impl_id) else {
451 continue;
452 };
453
454 let manifest = primitive.manifest();
455 validate_action_adapter_composition(
456 &manifest.effects,
457 &self.adapter_provides,
458 &node.parameters,
459 )?;
460 }
461
462 Ok(())
463 }
464}
465
466#[derive(Clone)]
469pub struct RuntimeHandle {
470 state: RuntimeState,
471}
472
473impl RuntimeHandle {
474 pub fn new(
475 graph: Arc<ExpandedGraph>,
476 catalog: Arc<CorePrimitiveCatalog>,
477 registries: Arc<CoreRegistries>,
478 adapter_provides: AdapterProvides,
479 ) -> Self {
480 Self {
481 state: RuntimeState::new(graph, catalog, registries, adapter_provides),
482 }
483 }
484
485 pub fn run(
486 &self,
487 graph_id: &GraphId,
488 event_id: &EventId,
489 ctx: &ExecutionContext,
490 deadline: Option<Duration>,
491 ) -> RunTermination {
492 execute_once(&self.state, graph_id, event_id, ctx, deadline).termination
493 }
494
495 pub fn graph_emittable_effect_kinds(&self) -> HashSet<String> {
497 self.state.graph_emittable_effect_kinds()
498 }
499}
500
501#[derive(Clone)]
505pub struct ReportingRuntimeHandle {
506 state: RuntimeState,
507}
508
509impl ReportingRuntimeHandle {
510 pub fn new(
511 graph: Arc<ExpandedGraph>,
512 catalog: Arc<CorePrimitiveCatalog>,
513 registries: Arc<CoreRegistries>,
514 adapter_provides: AdapterProvides,
515 ) -> Self {
516 Self {
517 state: RuntimeState::new(graph, catalog, registries, adapter_provides),
518 }
519 }
520
521 pub fn run_reporting(
522 &self,
523 graph_id: &GraphId,
524 event_id: &EventId,
525 ctx: &ExecutionContext,
526 deadline: Option<Duration>,
527 effects_out: &mut Vec<ActionEffect>,
528 ) -> RunTermination {
529 let result = execute_once(&self.state, graph_id, event_id, ctx, deadline);
530 *effects_out = result.effects;
531 result.termination
532 }
533
534 pub fn graph_emittable_effect_kinds(&self) -> HashSet<String> {
535 self.state.graph_emittable_effect_kinds()
536 }
537}
538
539fn execute_once(
540 state: &RuntimeState,
541 graph_id: &GraphId,
542 event_id: &EventId,
543 ctx: &ExecutionContext,
544 deadline: Option<Duration>,
545) -> RunResult {
546 if matches!(deadline, Some(d) if d.is_zero()) {
547 return RunResult {
548 termination: RunTermination::Aborted,
549 effects: vec![],
550 };
551 }
552
553 let validated = match runtime_validate(&state.graph, &*state.catalog) {
554 Ok(graph) => graph,
555 Err(_) => {
556 return RunResult {
557 termination: RunTermination::Failed(ErrKind::ValidationFailed),
558 effects: vec![],
559 }
560 }
561 };
562
563 if state.validate_composition(&validated).is_err() {
564 return RunResult {
565 termination: RunTermination::Failed(ErrKind::ValidationFailed),
566 effects: vec![],
567 };
568 }
569
570 let registries = Registries {
571 sources: &state.registries.sources,
572 computes: &state.registries.computes,
573 triggers: &state.registries.triggers,
574 actions: &state.registries.actions,
575 };
576
577 match execute_with_metadata(
578 &validated,
579 ®istries,
580 ctx.inner(),
581 graph_id.as_str(),
582 event_id.as_str(),
583 ) {
584 Ok(report) => RunResult {
585 termination: RunTermination::Completed,
586 effects: report.effects,
587 },
588 Err(exec_err) => {
589 let termination = match exec_err {
590 ExecError::ComputeFailed { .. }
591 | ExecError::NonFiniteOutput { .. }
592 | ExecError::MissingRequiredContextKey { .. }
593 | ExecError::ContextKeyTypeMismatch { .. } => {
594 RunTermination::Failed(ErrKind::SemanticError)
595 }
596 _ => RunTermination::Failed(ErrKind::RuntimeError),
597 };
598 RunResult {
599 termination,
600 effects: vec![],
601 }
602 }
603 }
604}
605
606fn source_parameters_with_manifest_defaults(
607 manifest: &ergo_runtime::source::SourcePrimitiveManifest,
608 node_parameters: &HashMap<String, ergo_runtime::cluster::ParameterValue>,
609) -> HashMap<String, ergo_runtime::cluster::ParameterValue> {
610 let mut resolved = node_parameters.clone();
611
612 for spec in &manifest.parameters {
613 if resolved.contains_key(&spec.name) {
614 continue;
615 }
616 let Some(default) = &spec.default else {
617 continue;
618 };
619
620 let mapped = match default {
621 ergo_runtime::source::ParameterValue::Int(i) => {
622 ergo_runtime::cluster::ParameterValue::Int(*i)
623 }
624 ergo_runtime::source::ParameterValue::Number(n) => {
625 ergo_runtime::cluster::ParameterValue::Number(*n)
626 }
627 ergo_runtime::source::ParameterValue::Bool(b) => {
628 ergo_runtime::cluster::ParameterValue::Bool(*b)
629 }
630 ergo_runtime::source::ParameterValue::String(s) => {
631 ergo_runtime::cluster::ParameterValue::String(s.clone())
632 }
633 ergo_runtime::source::ParameterValue::Enum(e) => {
634 ergo_runtime::cluster::ParameterValue::Enum(e.clone())
635 }
636 };
637 resolved.insert(spec.name.clone(), mapped);
638 }
639
640 resolved
641}
642
643pub trait RuntimeInvoker {
644 fn run(
645 &self,
646 graph_id: &GraphId,
647 event_id: &EventId,
648 ctx: &ExecutionContext,
649 deadline: Option<Duration>,
650 ) -> RunTermination;
651}
652
653impl RuntimeInvoker for RuntimeHandle {
654 fn run(
655 &self,
656 graph_id: &GraphId,
657 event_id: &EventId,
658 ctx: &ExecutionContext,
659 deadline: Option<Duration>,
660 ) -> RunTermination {
661 RuntimeHandle::run(self, graph_id, event_id, ctx, deadline)
662 }
663}
664
665#[derive(Clone)]
666pub struct FaultRuntimeHandle {
667 schedule: Arc<Mutex<HashMap<EventId, Vec<RunTermination>>>>,
668 default: RunTermination,
669}
670
671impl Default for FaultRuntimeHandle {
672 fn default() -> Self {
673 Self::new(RunTermination::Completed)
674 }
675}
676
677impl FaultRuntimeHandle {
678 pub fn new(default: RunTermination) -> Self {
679 Self {
680 schedule: Arc::new(Mutex::new(HashMap::new())),
681 default,
682 }
683 }
684
685 pub fn with_schedule(
686 default: RunTermination,
687 schedule: HashMap<EventId, Vec<RunTermination>>,
688 ) -> Self {
689 Self {
690 schedule: Arc::new(Mutex::new(schedule)),
691 default,
692 }
693 }
694
695 pub fn push_outcomes(&self, event_id: EventId, outcomes: Vec<RunTermination>) {
696 let mut guard = self.schedule.lock().expect("fault schedule poisoned");
697 guard.insert(event_id, outcomes);
698 }
699}
700
701impl RuntimeInvoker for FaultRuntimeHandle {
702 fn run(
703 &self,
704 graph_id: &GraphId,
705 event_id: &EventId,
706 ctx: &ExecutionContext,
707 deadline: Option<Duration>,
708 ) -> RunTermination {
709 let _ = graph_id;
710 let _ = ctx.inner();
711
712 if matches!(deadline, Some(d) if d.is_zero()) {
713 return RunTermination::Aborted;
714 }
715
716 let mut guard = self.schedule.lock().expect("fault schedule poisoned");
717 let queue = guard.entry(event_id.clone()).or_default();
718 if !queue.is_empty() {
719 queue.remove(0)
720 } else {
721 self.default.clone()
722 }
723 }
724}
725
726#[cfg(test)]
727mod tests;