1pub mod engine;
2pub mod envelope;
3pub mod manifest;
4pub mod mapping;
5pub mod resolver;
6pub mod result;
7pub mod router;
8pub mod trace;
9
10use std::collections::HashMap;
11use std::path::Path;
12
13use anyhow::{bail, Context, Result};
14
15use result::CborValue;
16
17pub const RUNTIME_VERSION: &str = env!("CARGO_PKG_VERSION");
18pub const WASMTIME_MAJOR: &str = "43";
19
20pub fn now_rfc3339() -> String {
21 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
22}
23
24pub struct InvokeMetric {
28 pub step: u64,
29 pub node_id: String,
30 pub brick_id: String,
31 pub result_type: String,
32 pub latency_ms: f64,
33 pub envelope_bytes: usize,
34 pub result_bytes: Option<usize>,
35}
36
37#[derive(Default)]
39pub struct ExecuteHooks<'a> {
40 pub on_invoke: Option<&'a mut dyn FnMut(InvokeMetric)>,
41}
42
43pub struct ExecuteOptions {
45 pub trace_id: Option<String>,
46 pub session_id: Option<String>,
47 pub max_steps: Option<u64>,
48 pub max_queued: u64,
49 pub all_terminals: bool,
50 pub verbose: bool,
51}
52
53impl Default for ExecuteOptions {
54 fn default() -> Self {
55 Self {
56 trace_id: None,
57 session_id: None,
58 max_steps: None,
59 max_queued: 10_000,
60 all_terminals: false,
61 verbose: false,
62 }
63 }
64}
65
66pub struct TerminalResult {
68 pub node_id: String,
69 pub brick_id: String,
70 pub step: u64,
71 pub result: result::BrickResult,
72}
73
74pub struct ResultCounts {
76 pub success: u64,
77 pub low_confidence: u64,
78 pub failure: u64,
79}
80
81pub struct ExecutionReport {
83 pub terminals: Vec<TerminalResult>,
84 pub total_steps: u64,
85 pub counts: ResultCounts,
86}
87
88pub struct ResolvedBrickInfo {
90 pub brick_id: String,
91 pub version: String,
92 pub wasm_bytes: usize,
93 pub digest: String,
94}
95
96pub struct RuntimeContext {
100 graph: manifest::GraphManifest,
101 compiled_bricks: HashMap<(String, String), engine::CompiledBrick>,
102 brick_manifests: HashMap<(String, String), manifest::BrickManifest>,
103 node_brick_key: HashMap<String, (String, String)>,
104 edges_by_source: HashMap<String, Vec<usize>>,
105 edge_by_id: HashMap<String, usize>,
106 entry_node_id: String,
107 resolved_info: Vec<ResolvedBrickInfo>,
108}
109
110struct Task {
112 node_id: String,
113 input_json: serde_json::Value,
114 trigger_source_node_id: String,
115 trigger_source_step: u64,
116 trigger_edge_id: String,
117 trigger_routing_reason: String,
118}
119
120impl RuntimeContext {
121 pub fn load(graph_path: &Path, brick_dir: &Path, brick_map: Option<&Path>) -> Result<Self> {
123 let graph = manifest::load_graph(graph_path)?;
124 let brick_map = brick_map.map(resolver::load_brick_map).transpose()?;
125 Self::from_graph(graph, brick_dir, &brick_map)
126 }
127
128 pub fn from_graph(
130 graph: manifest::GraphManifest,
131 brick_dir: &Path,
132 brick_map: &Option<resolver::BrickMap>,
133 ) -> Result<Self> {
134 if graph.nodes.is_empty() {
135 bail!("graph validation failed: graph has no nodes");
136 }
137
138 let mut compiled_bricks: HashMap<(String, String), engine::CompiledBrick> = HashMap::new();
139 let mut brick_manifests: HashMap<(String, String), manifest::BrickManifest> =
140 HashMap::new();
141 let mut node_brick_key: HashMap<String, (String, String)> = HashMap::new();
142 let mut resolved_info: Vec<ResolvedBrickInfo> = Vec::new();
143
144 for node in &graph.nodes {
145 let resolved = resolver::resolve_brick(
146 &node.brick.brick_id,
147 &node.brick.version_or_range,
148 brick_dir,
149 brick_map,
150 )?;
151 let key = (
152 node.brick.brick_id.clone(),
153 resolved.manifest.version.clone(),
154 );
155
156 if node_brick_key
157 .insert(node.node_id.clone(), key.clone())
158 .is_some()
159 {
160 bail!(
161 "graph validation failed: duplicate node_id '{}'",
162 node.node_id
163 );
164 }
165
166 if let std::collections::hash_map::Entry::Vacant(entry) =
167 compiled_bricks.entry(key.clone())
168 {
169 resolved_info.push(ResolvedBrickInfo {
170 brick_id: key.0.clone(),
171 version: key.1.clone(),
172 wasm_bytes: resolved.wasm_bytes.len(),
173 digest: resolved.manifest.artifact.digest.clone(),
174 });
175 let compiled = engine::CompiledBrick::new(&resolved.wasm_bytes)?;
176 entry.insert(compiled);
177 brick_manifests.insert(key, resolved.manifest);
178 }
179 }
180
181 let target_nodes: std::collections::HashSet<&str> =
183 graph.edges.iter().map(|e| e.target_node.as_str()).collect();
184
185 let entry_nodes: Vec<&str> = graph
186 .nodes
187 .iter()
188 .map(|n| n.node_id.as_str())
189 .filter(|id| !target_nodes.contains(id))
190 .collect();
191
192 match entry_nodes.len() {
193 0 => bail!("graph validation failed: no entry node found (all nodes are edge targets — cycle-only graph)"),
194 1 => {}
195 _ => bail!("graph validation failed: multiple entry nodes found: {:?}", entry_nodes),
196 }
197 let entry_node_id = entry_nodes[0].to_string();
198
199 let mut edges_by_source: HashMap<String, Vec<usize>> = HashMap::new();
201 let mut edge_by_id: HashMap<String, usize> = HashMap::new();
202 for (i, edge) in graph.edges.iter().enumerate() {
203 edges_by_source
204 .entry(edge.source_node.clone())
205 .or_default()
206 .push(i);
207 edge_by_id.insert(edge.edge_id.clone(), i);
208 }
209
210 Ok(Self {
211 graph,
212 compiled_bricks,
213 brick_manifests,
214 node_brick_key,
215 edges_by_source,
216 edge_by_id,
217 entry_node_id,
218 resolved_info,
219 })
220 }
221
222 pub fn emit_runtime_info(&self, tracer: &mut dyn trace::TraceSink) {
224 if tracer.enabled() {
225 tracer.emit_runtime_info(RUNTIME_VERSION, WASMTIME_MAJOR, &now_rfc3339());
226 }
227 }
228
229 pub fn graph_id(&self) -> &str {
230 &self.graph.graph_id
231 }
232 pub fn graph_version(&self) -> &str {
233 &self.graph.graph_version
234 }
235 pub fn node_count(&self) -> usize {
236 self.graph.nodes.len()
237 }
238 pub fn edge_count(&self) -> usize {
239 self.graph.edges.len()
240 }
241 pub fn entry_node_id(&self) -> &str {
242 &self.entry_node_id
243 }
244 pub fn resolved_bricks(&self) -> &[ResolvedBrickInfo] {
245 &self.resolved_info
246 }
247
248 pub fn execute(
250 &self,
251 json_input: &serde_json::Value,
252 tracer: &mut dyn trace::TraceSink,
253 hooks: &mut ExecuteHooks<'_>,
254 opts: &ExecuteOptions,
255 ) -> Result<ExecutionReport> {
256 let trace_id = opts
257 .trace_id
258 .as_deref()
259 .map(str::to_owned)
260 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
261 let session_id = opts
262 .session_id
263 .as_deref()
264 .map(str::to_owned)
265 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
266
267 let mut queue: std::collections::VecDeque<Task> = std::collections::VecDeque::new();
268 let mut step: u64 = 0;
269 let mut terminals: Vec<TerminalResult> = Vec::new();
270 let mut counts = ResultCounts {
271 success: 0,
272 low_confidence: 0,
273 failure: 0,
274 };
275 let verbose = opts.verbose;
276 let tracing = tracer.enabled();
277
278 queue.push_back(Task {
280 node_id: self.entry_node_id.clone(),
281 input_json: json_input.clone(),
282 trigger_source_node_id: envelope::ROOT_TRIGGER.source_node_id.to_string(),
283 trigger_source_step: envelope::ROOT_TRIGGER.source_step,
284 trigger_edge_id: envelope::ROOT_TRIGGER.edge_id.to_string(),
285 trigger_routing_reason: "entry".to_string(),
286 });
287
288 while let Some(task) = queue.pop_front() {
289 if let Some(max) = opts.max_steps {
291 if step >= max {
292 let msg = format!("max_steps budget ({max}) exhausted");
293 if verbose {
294 eprintln!("Safety budget: {msg}");
295 }
296 counts.failure += 1;
297 terminals.push(TerminalResult {
298 node_id: task.node_id.clone(),
299 brick_id: self
300 .node_brick_key
301 .get(&task.node_id)
302 .map(|k| k.0.clone())
303 .unwrap_or_else(|| "__runtime__".to_string()),
304 step,
305 result: result::trap_failure("RESOURCE_EXCEEDED", msg),
306 });
307 break;
308 }
309 }
310
311 let brick_key = self
312 .node_brick_key
313 .get(&task.node_id)
314 .with_context(|| format!("no brick key recorded for node '{}'", task.node_id))?;
315 let compiled = self
316 .compiled_bricks
317 .get(brick_key)
318 .with_context(|| format!("no compiled brick for key {:?}", brick_key))?;
319 let brick_manifest = self
320 .brick_manifests
321 .get(brick_key)
322 .with_context(|| format!("no manifest for key {:?}", brick_key))?;
323
324 let trigger = envelope::Trigger {
325 source_node_id: &task.trigger_source_node_id,
326 source_step: task.trigger_source_step,
327 edge_id: &task.trigger_edge_id,
328 };
329
330 let env = envelope::build_envelope(
331 &task.input_json,
332 &self.graph.graph_id,
333 &self.graph.graph_version,
334 &task.node_id,
335 &trace_id,
336 &session_id,
337 step,
338 &trigger,
339 )?;
340
341 let mut pre_invoke_failure: Option<result::BrickResult> = None;
343 if let Some(max_input) = brick_manifest.limits.max_input_bytes {
344 if env.len() as u64 > max_input {
345 let msg =
346 format!(
347 "envelope too large for brick '{}': {} bytes > limits.max_input_bytes {}",
348 brick_key.0, env.len(), max_input,
349 );
350 if verbose {
351 eprintln!(" {msg}");
352 }
353 pre_invoke_failure = Some(result::trap_failure("INVALID_INPUT", msg));
354 }
355 }
356
357 if verbose {
358 if pre_invoke_failure.is_some() {
359 eprintln!(
360 "[step {}] Skipping invoke for brick '{}' node '{}' (pre-invoke failure)",
361 step, brick_key.0, task.node_id,
362 );
363 } else {
364 eprintln!(
365 "[step {}] Invoking brick '{}' node '{}' ({} byte envelope)",
366 step,
367 brick_key.0,
368 task.node_id,
369 env.len(),
370 );
371 }
372 }
373
374 let (brick_result, raw_result_bytes, latency_ms) = if let Some(br) = pre_invoke_failure
376 {
377 (br, None, 0.0)
378 } else {
379 let start = std::time::Instant::now();
380 let invoke_result = compiled.invoke(
381 &env,
382 brick_manifest.limits.max_mem_mb,
383 brick_manifest.limits.max_output_bytes,
384 );
385 let latency = start.elapsed().as_secs_f64() * 1000.0;
386
387 match invoke_result {
388 Ok(result_bytes) => {
389 if verbose {
390 eprintln!(" Got {} byte result", result_bytes.len());
391 }
392 let decoded = match result::decode_result(&result_bytes) {
393 Ok(r) => r,
394 Err(e) => {
395 let msg = format!("{e:#}");
396 if verbose {
397 eprintln!(" Result rejected: {msg}");
398 }
399 result::trap_failure("RUNTIME_REJECTED", msg)
400 }
401 };
402 (decoded, Some(result_bytes), latency)
403 }
404 Err(e) => {
405 let msg = format!("{e:#}");
406 let m = msg.to_lowercase();
407 let error_class = if m.contains("alloc returned 0")
408 || m.contains("oom")
409 || m.contains("result too large")
410 || m.contains("memory limit")
411 || m.contains("fuel")
412 || m.contains("resource_exceeded")
413 {
414 "RESOURCE_EXCEEDED"
415 } else {
416 "COMPUTATION_ERROR"
417 };
418 if verbose {
419 eprintln!(" Brick trap: {msg}");
420 }
421 (result::trap_failure(error_class, msg), None, latency)
422 }
423 }
424 };
425
426 if tracing {
428 tracer.emit_invoke(
429 &trace_id,
430 &session_id,
431 step,
432 &self.graph.graph_id,
433 &self.graph.graph_version,
434 &brick_key.0,
435 &brick_key.1,
436 &brick_manifest.artifact.digest,
437 &task.node_id,
438 &env,
439 &task.trigger_source_node_id,
440 task.trigger_source_step,
441 &task.trigger_edge_id,
442 &task.trigger_routing_reason,
443 &brick_result,
444 raw_result_bytes.as_deref(),
445 latency_ms,
446 &now_rfc3339(),
447 );
448 }
449
450 match &brick_result {
452 result::BrickResult::Success { .. } => counts.success += 1,
453 result::BrickResult::LowConfidence { .. } => counts.low_confidence += 1,
454 result::BrickResult::Failure { .. } => counts.failure += 1,
455 }
456
457 if let Some(ref mut on_invoke) = hooks.on_invoke {
459 on_invoke(InvokeMetric {
460 step,
461 node_id: task.node_id.clone(),
462 brick_id: brick_key.0.clone(),
463 result_type: brick_result.result_type().to_string(),
464 latency_ms,
465 envelope_bytes: env.len(),
466 result_bytes: raw_result_bytes.as_ref().map(|b| b.len()),
467 });
468 }
469
470 if verbose {
471 eprintln!(" Result type: {}", brick_result.result_type());
472 }
473
474 let outbound_indices = self
476 .edges_by_source
477 .get(task.node_id.as_str())
478 .cloned()
479 .unwrap_or_default();
480 let outbound: Vec<&manifest::Edge> = outbound_indices
481 .iter()
482 .map(|&i| &self.graph.edges[i])
483 .collect();
484
485 let output_confidence = brick_result.output().and_then(mapping::extract_confidence);
486
487 let routed = router::route(&outbound, &brick_result, output_confidence);
488
489 if routed.is_empty() {
490 if verbose {
491 eprintln!(" Terminal node (no outbound edges dispatched)");
492 }
493 terminals.push(TerminalResult {
494 node_id: task.node_id.clone(),
495 brick_id: brick_key.0.clone(),
496 step,
497 result: brick_result,
498 });
499 } else {
500 for routed_edge in &routed {
501 let edge_idx = self
502 .edge_by_id
503 .get(routed_edge.edge_id.as_str())
504 .with_context(|| {
505 format!("routed edge '{}' not found in graph", routed_edge.edge_id)
506 })?;
507 let edge_def = &self.graph.edges[*edge_idx];
508
509 let mapped_input = if edge_def.mapping.is_empty() {
510 match brick_result.output() {
511 Some(output) => {
512 serde_json::json!({ "input": mapping::cbor_to_json(output) })
513 }
514 None => serde_json::json!({ "input": null }),
515 }
516 } else {
517 let source_root = match brick_result.output() {
518 Some(output) => CborValue::Map(vec![(
519 CborValue::Text("output".to_string()),
520 output.clone(),
521 )]),
522 None => CborValue::Map(vec![]),
523 };
524
525 let mut target_cbor = CborValue::Map(vec![]);
526 for fm in &edge_def.mapping {
527 let resolved = mapping::resolve_path(&source_root, &fm.from)
528 .with_context(|| {
529 format!(
530 "mapping '{}' → '{}': source path '{}' not found in output",
531 fm.from, fm.to, fm.from,
532 )
533 })?;
534 let overlay = mapping::set_path(&fm.to, resolved);
535 target_cbor = mapping::merge_maps(target_cbor, overlay);
536 }
537
538 mapping::cbor_to_json(&target_cbor)
539 };
540
541 if verbose {
542 eprintln!(
543 " Route: edge '{}' → node '{}'",
544 routed_edge.edge_id, routed_edge.target_node,
545 );
546 }
547
548 if queue.len() as u64 >= opts.max_queued {
550 bail!(
551 "safety budget exceeded: queue size {} >= max_queued {}",
552 queue.len(),
553 opts.max_queued,
554 );
555 }
556
557 queue.push_back(Task {
558 node_id: routed_edge.target_node.clone(),
559 input_json: mapped_input,
560 trigger_source_node_id: task.node_id.clone(),
561 trigger_source_step: step,
562 trigger_edge_id: routed_edge.edge_id.clone(),
563 trigger_routing_reason: "routed".to_string(),
564 });
565 }
566 }
567
568 step += 1;
569 }
570
571 Ok(ExecutionReport {
572 terminals,
573 total_steps: step,
574 counts,
575 })
576 }
577}