Skip to main content

ergo_adapter/
lib.rs

1//! ergo_adapter
2//!
3//! Purpose:
4//! - Define the kernel-owned adapter surface for manifests, event binding,
5//!   composition checks, capture helpers, and runtime-facing event types.
6//!
7//! Owns:
8//! - Adapter validation/composition authorities and the typed errors they emit.
9//! - The canonical event payload/context realization boundary used by replay and
10//!   host adapter-bound execution.
11//!
12//! Does not own:
13//! - Host orchestration, replay descriptors, or product-facing error shaping.
14//! - Runtime primitive semantics owned by `ergo_runtime`.
15//!
16//! Connects to:
17//! - `ergo_supervisor` capture/replay and fixture helpers.
18//! - `ergo_host`, which consumes adapter manifests, binder compilation, and
19//!   composition validation without redefining adapter meaning.
20//!
21//! Safety notes:
22//! - `ExternalEventPayloadError` is the typed authority for payload/context
23//!   realization failures that higher layers should carry instead of flattening.
24
25use std::collections::{HashMap, HashSet};
26use std::fmt;
27use std::sync::{Arc, Mutex};
28use std::time::Duration;
29
30use ergo_runtime::catalog::{CorePrimitiveCatalog, CoreRegistries};
31use ergo_runtime::cluster::{ExpandedGraph, PrimitiveCatalog, PrimitiveKind};
32use ergo_runtime::common::{ActionEffect, Value};
33use ergo_runtime::runtime::{
34    execute_with_metadata, validate as runtime_validate, ExecError,
35    ExecutionContext as RuntimeExecutionContext, Registries,
36};
37use serde::{Deserialize, Serialize};
38
39pub mod capture;
40pub(crate) mod common;
41pub mod composition;
42pub mod errors;
43pub mod event_binding;
44pub mod fixture;
45pub mod manifest;
46pub mod provenance;
47pub mod provides;
48pub mod registry;
49mod schema_materialization;
50pub mod validate;
51
52pub use composition::{
53    validate_action_adapter_composition, validate_capture_format,
54    validate_source_adapter_composition, CompositionError, ContextRequirement, SourceRequires,
55};
56pub use errors::InvalidAdapter;
57pub use event_binding::{
58    bind_semantic_event_with_binder, compile_event_binder, EventBinder, EventBindingError,
59};
60pub use manifest::{
61    AcceptsSpec, AdapterManifest, CaptureSpec, ContextKeySpec, EffectSpec, EventKindSpec,
62};
63pub use provenance::fingerprint as adapter_fingerprint;
64pub use provides::{AdapterProvides, ContextKeyProvision};
65pub use registry::register;
66pub use validate::validate_adapter;
67
68#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
69#[serde(transparent)]
70pub struct GraphId(String);
71
72impl GraphId {
73    pub fn new(id: impl Into<String>) -> Self {
74        Self(id.into())
75    }
76
77    pub fn as_str(&self) -> &str {
78        &self.0
79    }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
83#[serde(transparent)]
84pub struct EventId(String);
85
86impl EventId {
87    pub fn new(id: impl Into<String>) -> Self {
88        EventId(id.into())
89    }
90
91    pub fn as_str(&self) -> &str {
92        &self.0
93    }
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97pub enum ErrKind {
98    NetworkTimeout,
99    AdapterUnavailable,
100    ValidationFailed,
101    RuntimeError,
102    /// Deterministic semantic failures that should not be retried.
103    ///
104    /// Examples: DivisionByZero, NonFiniteOutput.
105    /// These will fail identically on retry, so retrying is pathological.
106    ///
107    /// See: B.2 in PHASE_INVARIANTS.md
108    SemanticError,
109    DeadlineExceeded,
110    Cancelled,
111}
112
113/// Result of a runtime invocation, carrying termination status and any effects.
114#[derive(Debug, Clone)]
115struct RunResult {
116    termination: RunTermination,
117    effects: Vec<ActionEffect>,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
121pub enum RunTermination {
122    Completed,
123    TimedOut,
124    Aborted,
125    Failed(ErrKind),
126}
127
128/// ExecutionContext is intentionally opaque to non-adapter callers.
129/// Its internals are owned by the runtime and are not constructible
130/// outside this crate to satisfy CXT-1.
131///
132/// ```compile_fail
133/// use ergo_adapter::ExecutionContext;
134/// use ergo_runtime::runtime::ExecutionContext as RuntimeExecutionContext;
135///
136/// // Constructor is not visible outside ergo-adapter.
137/// let runtime_ctx = RuntimeExecutionContext::default();
138/// let _ctx = ExecutionContext::new(runtime_ctx);
139/// ```
140///
141/// ```compile_fail
142/// use ergo_adapter::ExecutionContext;
143/// use ergo_runtime::runtime::ExecutionContext as RuntimeExecutionContext;
144///
145/// // Opaque fields cannot be set directly.
146/// let runtime_ctx = RuntimeExecutionContext::default();
147/// let _ctx = ExecutionContext { inner: runtime_ctx };
148/// ```
149#[derive(Debug, Clone)]
150pub struct ExecutionContext {
151    inner: RuntimeExecutionContext,
152}
153
154impl ExecutionContext {
155    pub(crate) fn new(inner: RuntimeExecutionContext) -> Self {
156        Self { inner }
157    }
158
159    pub(crate) fn inner(&self) -> &RuntimeExecutionContext {
160        &self.inner
161    }
162}
163
164/// Opaque absolute time used for deterministic scheduling.
165#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)]
166#[serde(transparent)]
167pub struct EventTime(Duration);
168
169impl EventTime {
170    pub fn from_duration(duration: Duration) -> Self {
171        Self(duration)
172    }
173
174    pub fn as_duration(&self) -> Duration {
175        self.0
176    }
177
178    pub fn saturating_add(&self, duration: Duration) -> Self {
179        Self(self.0.saturating_add(duration))
180    }
181}
182
183impl From<Duration> for EventTime {
184    fn from(value: Duration) -> Self {
185        EventTime::from_duration(value)
186    }
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
190#[serde(transparent)]
191pub struct EventPayload {
192    pub data: Vec<u8>,
193}
194
195#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
196#[non_exhaustive]
197pub enum ExternalEventPayloadError {
198    InvalidJson { detail: String },
199    PayloadMustBeJsonObject { got: String },
200}
201
202impl fmt::Display for ExternalEventPayloadError {
203    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204        match self {
205            Self::InvalidJson { detail } => write!(f, "payload bytes are not valid JSON: {detail}"),
206            Self::PayloadMustBeJsonObject { got } => {
207                write!(f, "payload must be a JSON object, got {got}")
208            }
209        }
210    }
211}
212
213impl std::error::Error for ExternalEventPayloadError {}
214
215#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
216pub enum ExternalEventKind {
217    /// Periodic event for graph evaluation cycles.
218    /// Renamed from `Tick` for domain neutrality (see TERMINOLOGY.md ยง9).
219    #[serde(alias = "Tick")]
220    Pump,
221    DataAvailable,
222    Command,
223}
224
225#[derive(Debug, Clone)]
226pub struct ExternalEvent {
227    event_id: EventId,
228    kind: ExternalEventKind,
229    context: ExecutionContext,
230    at: EventTime,
231    payload: EventPayload,
232}
233
234impl ExternalEvent {
235    pub(crate) fn new(
236        event_id: EventId,
237        kind: ExternalEventKind,
238        context: ExecutionContext,
239        at: EventTime,
240        payload: EventPayload,
241    ) -> Self {
242        Self {
243            event_id,
244            kind,
245            context,
246            at,
247            payload,
248        }
249    }
250
251    pub fn mechanical_at(event_id: EventId, kind: ExternalEventKind, at: EventTime) -> Self {
252        let context = ExecutionContext::new(RuntimeExecutionContext::default());
253        Self::new(event_id, kind, context, at, EventPayload::default())
254    }
255
256    pub fn mechanical(event_id: EventId, kind: ExternalEventKind) -> Self {
257        Self::mechanical_at(event_id, kind, EventTime::default())
258    }
259
260    pub fn with_payload(
261        event_id: EventId,
262        kind: ExternalEventKind,
263        at: EventTime,
264        payload: EventPayload,
265    ) -> Result<Self, ExternalEventPayloadError> {
266        let context = ExecutionContext::new(context_from_payload(&payload)?);
267        Ok(Self::new(event_id, kind, context, at, payload))
268    }
269
270    pub fn context(&self) -> &ExecutionContext {
271        &self.context
272    }
273
274    pub fn kind(&self) -> ExternalEventKind {
275        self.kind
276    }
277
278    pub fn event_id(&self) -> &EventId {
279        &self.event_id
280    }
281
282    pub fn at(&self) -> EventTime {
283        self.at
284    }
285
286    pub fn payload(&self) -> &EventPayload {
287        &self.payload
288    }
289}
290
291fn context_from_payload(
292    payload: &EventPayload,
293) -> Result<RuntimeExecutionContext, ExternalEventPayloadError> {
294    let values = payload_values(payload)?;
295    Ok(RuntimeExecutionContext::from_values(values))
296}
297
298fn payload_values(
299    payload: &EventPayload,
300) -> Result<HashMap<String, Value>, ExternalEventPayloadError> {
301    if payload.data.is_empty() {
302        return Ok(HashMap::new());
303    }
304
305    let parsed: serde_json::Value = match serde_json::from_slice(&payload.data) {
306        Ok(value) => value,
307        Err(err) => {
308            return Err(ExternalEventPayloadError::InvalidJson {
309                detail: err.to_string(),
310            });
311        }
312    };
313
314    let Some(object) = parsed.as_object() else {
315        return Err(ExternalEventPayloadError::PayloadMustBeJsonObject {
316            got: json_type_name(&parsed).to_string(),
317        });
318    };
319
320    let mut values = HashMap::new();
321    for (key, value) in object {
322        if let Some(mapped) = json_to_value(value) {
323            values.insert(key.clone(), mapped);
324        }
325    }
326
327    Ok(values)
328}
329
330use common::json_type_name;
331
332fn json_to_value(value: &serde_json::Value) -> Option<Value> {
333    if let Some(number) = value.as_f64() {
334        return Some(Value::Number(number));
335    }
336
337    if let Some(text) = value.as_str() {
338        return Some(Value::String(text.to_string()));
339    }
340
341    if let Some(flag) = value.as_bool() {
342        return Some(Value::Bool(flag));
343    }
344
345    let items = value.as_array()?;
346
347    let mut series = Vec::with_capacity(items.len());
348    for item in items {
349        let number = item.as_f64()?;
350        series.push(number);
351    }
352
353    Some(Value::Series(series))
354}
355
356/// Shared runtime execution dependencies used by both public handle types.
357#[derive(Clone)]
358struct RuntimeState {
359    graph: Arc<ExpandedGraph>,
360    catalog: Arc<CorePrimitiveCatalog>,
361    registries: Arc<CoreRegistries>,
362    adapter_provides: AdapterProvides,
363}
364
365impl RuntimeState {
366    fn new(
367        graph: Arc<ExpandedGraph>,
368        catalog: Arc<CorePrimitiveCatalog>,
369        registries: Arc<CoreRegistries>,
370        adapter_provides: AdapterProvides,
371    ) -> Self {
372        Self {
373            graph,
374            catalog,
375            registries,
376            adapter_provides,
377        }
378    }
379
380    fn graph_emittable_effect_kinds(&self) -> HashSet<String> {
381        let mut kinds = HashSet::new();
382
383        for node in self.graph.nodes.values() {
384            let Some(meta) = self
385                .catalog
386                .get(&node.implementation.impl_id, &node.implementation.version)
387            else {
388                continue;
389            };
390            if meta.kind != PrimitiveKind::Action {
391                continue;
392            }
393
394            let Some(action) = self.registries.actions.get(&node.implementation.impl_id) else {
395                continue;
396            };
397
398            let emits_set_context = !action.manifest().effects.writes.is_empty()
399                || action
400                    .manifest()
401                    .effects
402                    .intents
403                    .iter()
404                    .any(|intent| !intent.mirror_writes.is_empty());
405            if emits_set_context {
406                kinds.insert("set_context".to_string());
407            }
408
409            for intent in &action.manifest().effects.intents {
410                kinds.insert(intent.name.clone());
411            }
412        }
413
414        kinds
415    }
416
417    fn validate_composition(
418        &self,
419        graph: &ergo_runtime::runtime::ValidatedGraph,
420    ) -> Result<(), CompositionError> {
421        // COMP-3: only adapter-bound runs have a capture format to validate.
422        if !self.adapter_provides.capture_format_version.is_empty() {
423            validate_capture_format(&self.adapter_provides.capture_format_version)?;
424        }
425
426        for node in graph.nodes.values() {
427            if node.kind != PrimitiveKind::Source {
428                continue;
429            }
430
431            let Some(primitive) = self.registries.sources.get(&node.impl_id) else {
432                continue;
433            };
434
435            let manifest = primitive.manifest();
436            let source_params =
437                source_parameters_with_manifest_defaults(manifest, &node.parameters);
438            validate_source_adapter_composition(
439                &manifest.requires,
440                &self.adapter_provides,
441                &source_params,
442            )?;
443        }
444
445        for node in graph.nodes.values() {
446            if node.kind != PrimitiveKind::Action {
447                continue;
448            }
449
450            let Some(primitive) = self.registries.actions.get(&node.impl_id) else {
451                continue;
452            };
453
454            let manifest = primitive.manifest();
455            validate_action_adapter_composition(
456                &manifest.effects,
457                &self.adapter_provides,
458                &node.parameters,
459            )?;
460        }
461
462        Ok(())
463    }
464}
465
466/// RuntimeHandle holds the execution dependencies needed to invoke the runtime.
467/// It is constructed with an expanded graph, primitive catalog, registries, and adapter provides.
468#[derive(Clone)]
469pub struct RuntimeHandle {
470    state: RuntimeState,
471}
472
473impl RuntimeHandle {
474    pub fn new(
475        graph: Arc<ExpandedGraph>,
476        catalog: Arc<CorePrimitiveCatalog>,
477        registries: Arc<CoreRegistries>,
478        adapter_provides: AdapterProvides,
479    ) -> Self {
480        Self {
481            state: RuntimeState::new(graph, catalog, registries, adapter_provides),
482        }
483    }
484
485    pub fn run(
486        &self,
487        graph_id: &GraphId,
488        event_id: &EventId,
489        ctx: &ExecutionContext,
490        deadline: Option<Duration>,
491    ) -> RunTermination {
492        execute_once(&self.state, graph_id, event_id, ctx, deadline).termination
493    }
494
495    /// Derive effect kinds that this composed graph can emit based on registered action manifests.
496    pub fn graph_emittable_effect_kinds(&self) -> HashSet<String> {
497        self.state.graph_emittable_effect_kinds()
498    }
499}
500
501/// ReportingRuntimeHandle holds the same execution dependencies as
502/// `RuntimeHandle`, but exposes the low-level reporting seam used by the host
503/// buffering wrapper.
504#[derive(Clone)]
505pub struct ReportingRuntimeHandle {
506    state: RuntimeState,
507}
508
509impl ReportingRuntimeHandle {
510    pub fn new(
511        graph: Arc<ExpandedGraph>,
512        catalog: Arc<CorePrimitiveCatalog>,
513        registries: Arc<CoreRegistries>,
514        adapter_provides: AdapterProvides,
515    ) -> Self {
516        Self {
517            state: RuntimeState::new(graph, catalog, registries, adapter_provides),
518        }
519    }
520
521    pub fn run_reporting(
522        &self,
523        graph_id: &GraphId,
524        event_id: &EventId,
525        ctx: &ExecutionContext,
526        deadline: Option<Duration>,
527        effects_out: &mut Vec<ActionEffect>,
528    ) -> RunTermination {
529        let result = execute_once(&self.state, graph_id, event_id, ctx, deadline);
530        *effects_out = result.effects;
531        result.termination
532    }
533
534    pub fn graph_emittable_effect_kinds(&self) -> HashSet<String> {
535        self.state.graph_emittable_effect_kinds()
536    }
537}
538
539fn execute_once(
540    state: &RuntimeState,
541    graph_id: &GraphId,
542    event_id: &EventId,
543    ctx: &ExecutionContext,
544    deadline: Option<Duration>,
545) -> RunResult {
546    if matches!(deadline, Some(d) if d.is_zero()) {
547        return RunResult {
548            termination: RunTermination::Aborted,
549            effects: vec![],
550        };
551    }
552
553    let validated = match runtime_validate(&state.graph, &*state.catalog) {
554        Ok(graph) => graph,
555        Err(_) => {
556            return RunResult {
557                termination: RunTermination::Failed(ErrKind::ValidationFailed),
558                effects: vec![],
559            }
560        }
561    };
562
563    if state.validate_composition(&validated).is_err() {
564        return RunResult {
565            termination: RunTermination::Failed(ErrKind::ValidationFailed),
566            effects: vec![],
567        };
568    }
569
570    let registries = Registries {
571        sources: &state.registries.sources,
572        computes: &state.registries.computes,
573        triggers: &state.registries.triggers,
574        actions: &state.registries.actions,
575    };
576
577    match execute_with_metadata(
578        &validated,
579        &registries,
580        ctx.inner(),
581        graph_id.as_str(),
582        event_id.as_str(),
583    ) {
584        Ok(report) => RunResult {
585            termination: RunTermination::Completed,
586            effects: report.effects,
587        },
588        Err(exec_err) => {
589            let termination = match exec_err {
590                ExecError::ComputeFailed { .. }
591                | ExecError::NonFiniteOutput { .. }
592                | ExecError::MissingRequiredContextKey { .. }
593                | ExecError::ContextKeyTypeMismatch { .. } => {
594                    RunTermination::Failed(ErrKind::SemanticError)
595                }
596                _ => RunTermination::Failed(ErrKind::RuntimeError),
597            };
598            RunResult {
599                termination,
600                effects: vec![],
601            }
602        }
603    }
604}
605
606fn source_parameters_with_manifest_defaults(
607    manifest: &ergo_runtime::source::SourcePrimitiveManifest,
608    node_parameters: &HashMap<String, ergo_runtime::cluster::ParameterValue>,
609) -> HashMap<String, ergo_runtime::cluster::ParameterValue> {
610    let mut resolved = node_parameters.clone();
611
612    for spec in &manifest.parameters {
613        if resolved.contains_key(&spec.name) {
614            continue;
615        }
616        let Some(default) = &spec.default else {
617            continue;
618        };
619
620        let mapped = match default {
621            ergo_runtime::source::ParameterValue::Int(i) => {
622                ergo_runtime::cluster::ParameterValue::Int(*i)
623            }
624            ergo_runtime::source::ParameterValue::Number(n) => {
625                ergo_runtime::cluster::ParameterValue::Number(*n)
626            }
627            ergo_runtime::source::ParameterValue::Bool(b) => {
628                ergo_runtime::cluster::ParameterValue::Bool(*b)
629            }
630            ergo_runtime::source::ParameterValue::String(s) => {
631                ergo_runtime::cluster::ParameterValue::String(s.clone())
632            }
633            ergo_runtime::source::ParameterValue::Enum(e) => {
634                ergo_runtime::cluster::ParameterValue::Enum(e.clone())
635            }
636        };
637        resolved.insert(spec.name.clone(), mapped);
638    }
639
640    resolved
641}
642
643pub trait RuntimeInvoker {
644    fn run(
645        &self,
646        graph_id: &GraphId,
647        event_id: &EventId,
648        ctx: &ExecutionContext,
649        deadline: Option<Duration>,
650    ) -> RunTermination;
651}
652
653impl RuntimeInvoker for RuntimeHandle {
654    fn run(
655        &self,
656        graph_id: &GraphId,
657        event_id: &EventId,
658        ctx: &ExecutionContext,
659        deadline: Option<Duration>,
660    ) -> RunTermination {
661        RuntimeHandle::run(self, graph_id, event_id, ctx, deadline)
662    }
663}
664
665#[derive(Clone)]
666pub struct FaultRuntimeHandle {
667    schedule: Arc<Mutex<HashMap<EventId, Vec<RunTermination>>>>,
668    default: RunTermination,
669}
670
671impl Default for FaultRuntimeHandle {
672    fn default() -> Self {
673        Self::new(RunTermination::Completed)
674    }
675}
676
677impl FaultRuntimeHandle {
678    pub fn new(default: RunTermination) -> Self {
679        Self {
680            schedule: Arc::new(Mutex::new(HashMap::new())),
681            default,
682        }
683    }
684
685    pub fn with_schedule(
686        default: RunTermination,
687        schedule: HashMap<EventId, Vec<RunTermination>>,
688    ) -> Self {
689        Self {
690            schedule: Arc::new(Mutex::new(schedule)),
691            default,
692        }
693    }
694
695    pub fn push_outcomes(&self, event_id: EventId, outcomes: Vec<RunTermination>) {
696        let mut guard = self.schedule.lock().expect("fault schedule poisoned");
697        guard.insert(event_id, outcomes);
698    }
699}
700
701impl RuntimeInvoker for FaultRuntimeHandle {
702    fn run(
703        &self,
704        graph_id: &GraphId,
705        event_id: &EventId,
706        ctx: &ExecutionContext,
707        deadline: Option<Duration>,
708    ) -> RunTermination {
709        let _ = graph_id;
710        let _ = ctx.inner();
711
712        if matches!(deadline, Some(d) if d.is_zero()) {
713            return RunTermination::Aborted;
714        }
715
716        let mut guard = self.schedule.lock().expect("fault schedule poisoned");
717        let queue = guard.entry(event_id.clone()).or_default();
718        if !queue.is_empty() {
719            queue.remove(0)
720        } else {
721            self.default.clone()
722        }
723    }
724}
725
726#[cfg(test)]
727mod tests;