use std::collections::{HashMap, HashSet};
use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use ergo_runtime::catalog::{CorePrimitiveCatalog, CoreRegistries};
use ergo_runtime::cluster::{ExpandedGraph, PrimitiveCatalog, PrimitiveKind};
use ergo_runtime::common::{ActionEffect, Value};
use ergo_runtime::runtime::{
execute_with_metadata, validate as runtime_validate, ExecError,
ExecutionContext as RuntimeExecutionContext, Registries,
};
use serde::{Deserialize, Serialize};
pub mod capture;
pub(crate) mod common;
pub mod composition;
pub mod errors;
pub mod event_binding;
pub mod fixture;
pub mod manifest;
pub mod provenance;
pub mod provides;
pub mod registry;
mod schema_materialization;
pub mod validate;
pub use composition::{
validate_action_adapter_composition, validate_capture_format,
validate_source_adapter_composition, CompositionError, ContextRequirement, SourceRequires,
};
pub use errors::InvalidAdapter;
pub use event_binding::{
bind_semantic_event_with_binder, compile_event_binder, EventBinder, EventBindingError,
};
pub use manifest::{
AcceptsSpec, AdapterManifest, CaptureSpec, ContextKeySpec, EffectSpec, EventKindSpec,
};
pub use provenance::fingerprint as adapter_fingerprint;
pub use provides::{AdapterProvides, ContextKeyProvision};
pub use registry::register;
pub use validate::validate_adapter;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct GraphId(String);
impl GraphId {
pub fn new(id: impl Into<String>) -> Self {
Self(id.into())
}
pub fn as_str(&self) -> &str {
&self.0
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct EventId(String);
impl EventId {
pub fn new(id: impl Into<String>) -> Self {
EventId(id.into())
}
pub fn as_str(&self) -> &str {
&self.0
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ErrKind {
NetworkTimeout,
AdapterUnavailable,
ValidationFailed,
RuntimeError,
SemanticError,
DeadlineExceeded,
Cancelled,
}
#[derive(Debug, Clone)]
struct RunResult {
termination: RunTermination,
effects: Vec<ActionEffect>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RunTermination {
Completed,
TimedOut,
Aborted,
Failed(ErrKind),
}
#[derive(Debug, Clone)]
pub struct ExecutionContext {
inner: RuntimeExecutionContext,
}
impl ExecutionContext {
pub(crate) fn new(inner: RuntimeExecutionContext) -> Self {
Self { inner }
}
pub(crate) fn inner(&self) -> &RuntimeExecutionContext {
&self.inner
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)]
#[serde(transparent)]
pub struct EventTime(Duration);
impl EventTime {
pub fn from_duration(duration: Duration) -> Self {
Self(duration)
}
pub fn as_duration(&self) -> Duration {
self.0
}
pub fn saturating_add(&self, duration: Duration) -> Self {
Self(self.0.saturating_add(duration))
}
}
impl From<Duration> for EventTime {
fn from(value: Duration) -> Self {
EventTime::from_duration(value)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(transparent)]
pub struct EventPayload {
pub data: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum ExternalEventPayloadError {
InvalidJson { detail: String },
PayloadMustBeJsonObject { got: String },
}
impl fmt::Display for ExternalEventPayloadError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidJson { detail } => write!(f, "payload bytes are not valid JSON: {detail}"),
Self::PayloadMustBeJsonObject { got } => {
write!(f, "payload must be a JSON object, got {got}")
}
}
}
}
impl std::error::Error for ExternalEventPayloadError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ExternalEventKind {
#[serde(alias = "Tick")]
Pump,
DataAvailable,
Command,
}
#[derive(Debug, Clone)]
pub struct ExternalEvent {
event_id: EventId,
kind: ExternalEventKind,
context: ExecutionContext,
at: EventTime,
payload: EventPayload,
}
impl ExternalEvent {
pub(crate) fn new(
event_id: EventId,
kind: ExternalEventKind,
context: ExecutionContext,
at: EventTime,
payload: EventPayload,
) -> Self {
Self {
event_id,
kind,
context,
at,
payload,
}
}
pub fn mechanical_at(event_id: EventId, kind: ExternalEventKind, at: EventTime) -> Self {
let context = ExecutionContext::new(RuntimeExecutionContext::default());
Self::new(event_id, kind, context, at, EventPayload::default())
}
pub fn mechanical(event_id: EventId, kind: ExternalEventKind) -> Self {
Self::mechanical_at(event_id, kind, EventTime::default())
}
pub fn with_payload(
event_id: EventId,
kind: ExternalEventKind,
at: EventTime,
payload: EventPayload,
) -> Result<Self, ExternalEventPayloadError> {
let context = ExecutionContext::new(context_from_payload(&payload)?);
Ok(Self::new(event_id, kind, context, at, payload))
}
pub fn context(&self) -> &ExecutionContext {
&self.context
}
pub fn kind(&self) -> ExternalEventKind {
self.kind
}
pub fn event_id(&self) -> &EventId {
&self.event_id
}
pub fn at(&self) -> EventTime {
self.at
}
pub fn payload(&self) -> &EventPayload {
&self.payload
}
}
fn context_from_payload(
payload: &EventPayload,
) -> Result<RuntimeExecutionContext, ExternalEventPayloadError> {
let values = payload_values(payload)?;
Ok(RuntimeExecutionContext::from_values(values))
}
fn payload_values(
payload: &EventPayload,
) -> Result<HashMap<String, Value>, ExternalEventPayloadError> {
if payload.data.is_empty() {
return Ok(HashMap::new());
}
let parsed: serde_json::Value = match serde_json::from_slice(&payload.data) {
Ok(value) => value,
Err(err) => {
return Err(ExternalEventPayloadError::InvalidJson {
detail: err.to_string(),
});
}
};
let Some(object) = parsed.as_object() else {
return Err(ExternalEventPayloadError::PayloadMustBeJsonObject {
got: json_type_name(&parsed).to_string(),
});
};
let mut values = HashMap::new();
for (key, value) in object {
if let Some(mapped) = json_to_value(value) {
values.insert(key.clone(), mapped);
}
}
Ok(values)
}
use common::json_type_name;
fn json_to_value(value: &serde_json::Value) -> Option<Value> {
if let Some(number) = value.as_f64() {
return Some(Value::Number(number));
}
if let Some(text) = value.as_str() {
return Some(Value::String(text.to_string()));
}
if let Some(flag) = value.as_bool() {
return Some(Value::Bool(flag));
}
let items = value.as_array()?;
let mut series = Vec::with_capacity(items.len());
for item in items {
let number = item.as_f64()?;
series.push(number);
}
Some(Value::Series(series))
}
#[derive(Clone)]
struct RuntimeState {
graph: Arc<ExpandedGraph>,
catalog: Arc<CorePrimitiveCatalog>,
registries: Arc<CoreRegistries>,
adapter_provides: AdapterProvides,
}
impl RuntimeState {
fn new(
graph: Arc<ExpandedGraph>,
catalog: Arc<CorePrimitiveCatalog>,
registries: Arc<CoreRegistries>,
adapter_provides: AdapterProvides,
) -> Self {
Self {
graph,
catalog,
registries,
adapter_provides,
}
}
fn graph_emittable_effect_kinds(&self) -> HashSet<String> {
let mut kinds = HashSet::new();
for node in self.graph.nodes.values() {
let Some(meta) = self
.catalog
.get(&node.implementation.impl_id, &node.implementation.version)
else {
continue;
};
if meta.kind != PrimitiveKind::Action {
continue;
}
let Some(action) = self.registries.actions.get(&node.implementation.impl_id) else {
continue;
};
let emits_set_context = !action.manifest().effects.writes.is_empty()
|| action
.manifest()
.effects
.intents
.iter()
.any(|intent| !intent.mirror_writes.is_empty());
if emits_set_context {
kinds.insert("set_context".to_string());
}
for intent in &action.manifest().effects.intents {
kinds.insert(intent.name.clone());
}
}
kinds
}
fn validate_composition(
&self,
graph: &ergo_runtime::runtime::ValidatedGraph,
) -> Result<(), CompositionError> {
if !self.adapter_provides.capture_format_version.is_empty() {
validate_capture_format(&self.adapter_provides.capture_format_version)?;
}
for node in graph.nodes.values() {
if node.kind != PrimitiveKind::Source {
continue;
}
let Some(primitive) = self.registries.sources.get(&node.impl_id) else {
continue;
};
let manifest = primitive.manifest();
let source_params =
source_parameters_with_manifest_defaults(manifest, &node.parameters);
validate_source_adapter_composition(
&manifest.requires,
&self.adapter_provides,
&source_params,
)?;
}
for node in graph.nodes.values() {
if node.kind != PrimitiveKind::Action {
continue;
}
let Some(primitive) = self.registries.actions.get(&node.impl_id) else {
continue;
};
let manifest = primitive.manifest();
validate_action_adapter_composition(
&manifest.effects,
&self.adapter_provides,
&node.parameters,
)?;
}
Ok(())
}
}
#[derive(Clone)]
pub struct RuntimeHandle {
state: RuntimeState,
}
impl RuntimeHandle {
pub fn new(
graph: Arc<ExpandedGraph>,
catalog: Arc<CorePrimitiveCatalog>,
registries: Arc<CoreRegistries>,
adapter_provides: AdapterProvides,
) -> Self {
Self {
state: RuntimeState::new(graph, catalog, registries, adapter_provides),
}
}
pub fn run(
&self,
graph_id: &GraphId,
event_id: &EventId,
ctx: &ExecutionContext,
deadline: Option<Duration>,
) -> RunTermination {
execute_once(&self.state, graph_id, event_id, ctx, deadline).termination
}
pub fn graph_emittable_effect_kinds(&self) -> HashSet<String> {
self.state.graph_emittable_effect_kinds()
}
}
#[derive(Clone)]
pub struct ReportingRuntimeHandle {
state: RuntimeState,
}
impl ReportingRuntimeHandle {
pub fn new(
graph: Arc<ExpandedGraph>,
catalog: Arc<CorePrimitiveCatalog>,
registries: Arc<CoreRegistries>,
adapter_provides: AdapterProvides,
) -> Self {
Self {
state: RuntimeState::new(graph, catalog, registries, adapter_provides),
}
}
pub fn run_reporting(
&self,
graph_id: &GraphId,
event_id: &EventId,
ctx: &ExecutionContext,
deadline: Option<Duration>,
effects_out: &mut Vec<ActionEffect>,
) -> RunTermination {
let result = execute_once(&self.state, graph_id, event_id, ctx, deadline);
*effects_out = result.effects;
result.termination
}
pub fn graph_emittable_effect_kinds(&self) -> HashSet<String> {
self.state.graph_emittable_effect_kinds()
}
}
fn execute_once(
state: &RuntimeState,
graph_id: &GraphId,
event_id: &EventId,
ctx: &ExecutionContext,
deadline: Option<Duration>,
) -> RunResult {
if matches!(deadline, Some(d) if d.is_zero()) {
return RunResult {
termination: RunTermination::Aborted,
effects: vec![],
};
}
let validated = match runtime_validate(&state.graph, &*state.catalog) {
Ok(graph) => graph,
Err(_) => {
return RunResult {
termination: RunTermination::Failed(ErrKind::ValidationFailed),
effects: vec![],
}
}
};
if state.validate_composition(&validated).is_err() {
return RunResult {
termination: RunTermination::Failed(ErrKind::ValidationFailed),
effects: vec![],
};
}
let registries = Registries {
sources: &state.registries.sources,
computes: &state.registries.computes,
triggers: &state.registries.triggers,
actions: &state.registries.actions,
};
match execute_with_metadata(
&validated,
®istries,
ctx.inner(),
graph_id.as_str(),
event_id.as_str(),
) {
Ok(report) => RunResult {
termination: RunTermination::Completed,
effects: report.effects,
},
Err(exec_err) => {
let termination = match exec_err {
ExecError::ComputeFailed { .. }
| ExecError::NonFiniteOutput { .. }
| ExecError::MissingRequiredContextKey { .. }
| ExecError::ContextKeyTypeMismatch { .. } => {
RunTermination::Failed(ErrKind::SemanticError)
}
_ => RunTermination::Failed(ErrKind::RuntimeError),
};
RunResult {
termination,
effects: vec![],
}
}
}
}
fn source_parameters_with_manifest_defaults(
manifest: &ergo_runtime::source::SourcePrimitiveManifest,
node_parameters: &HashMap<String, ergo_runtime::cluster::ParameterValue>,
) -> HashMap<String, ergo_runtime::cluster::ParameterValue> {
let mut resolved = node_parameters.clone();
for spec in &manifest.parameters {
if resolved.contains_key(&spec.name) {
continue;
}
let Some(default) = &spec.default else {
continue;
};
let mapped = match default {
ergo_runtime::source::ParameterValue::Int(i) => {
ergo_runtime::cluster::ParameterValue::Int(*i)
}
ergo_runtime::source::ParameterValue::Number(n) => {
ergo_runtime::cluster::ParameterValue::Number(*n)
}
ergo_runtime::source::ParameterValue::Bool(b) => {
ergo_runtime::cluster::ParameterValue::Bool(*b)
}
ergo_runtime::source::ParameterValue::String(s) => {
ergo_runtime::cluster::ParameterValue::String(s.clone())
}
ergo_runtime::source::ParameterValue::Enum(e) => {
ergo_runtime::cluster::ParameterValue::Enum(e.clone())
}
};
resolved.insert(spec.name.clone(), mapped);
}
resolved
}
pub trait RuntimeInvoker {
fn run(
&self,
graph_id: &GraphId,
event_id: &EventId,
ctx: &ExecutionContext,
deadline: Option<Duration>,
) -> RunTermination;
}
impl RuntimeInvoker for RuntimeHandle {
fn run(
&self,
graph_id: &GraphId,
event_id: &EventId,
ctx: &ExecutionContext,
deadline: Option<Duration>,
) -> RunTermination {
RuntimeHandle::run(self, graph_id, event_id, ctx, deadline)
}
}
#[derive(Clone)]
pub struct FaultRuntimeHandle {
schedule: Arc<Mutex<HashMap<EventId, Vec<RunTermination>>>>,
default: RunTermination,
}
impl Default for FaultRuntimeHandle {
fn default() -> Self {
Self::new(RunTermination::Completed)
}
}
impl FaultRuntimeHandle {
pub fn new(default: RunTermination) -> Self {
Self {
schedule: Arc::new(Mutex::new(HashMap::new())),
default,
}
}
pub fn with_schedule(
default: RunTermination,
schedule: HashMap<EventId, Vec<RunTermination>>,
) -> Self {
Self {
schedule: Arc::new(Mutex::new(schedule)),
default,
}
}
pub fn push_outcomes(&self, event_id: EventId, outcomes: Vec<RunTermination>) {
let mut guard = self.schedule.lock().expect("fault schedule poisoned");
guard.insert(event_id, outcomes);
}
}
impl RuntimeInvoker for FaultRuntimeHandle {
fn run(
&self,
graph_id: &GraphId,
event_id: &EventId,
ctx: &ExecutionContext,
deadline: Option<Duration>,
) -> RunTermination {
let _ = graph_id;
let _ = ctx.inner();
if matches!(deadline, Some(d) if d.is_zero()) {
return RunTermination::Aborted;
}
let mut guard = self.schedule.lock().expect("fault schedule poisoned");
let queue = guard.entry(event_id.clone()).or_default();
if !queue.is_empty() {
queue.remove(0)
} else {
self.default.clone()
}
}
}
#[cfg(test)]
mod tests;