1pub mod engine;
30pub mod envelope;
31pub mod manifest;
32pub mod mapping;
33pub mod resolver;
34pub mod result;
35pub mod router;
36pub mod trace;
37
38use std::collections::HashMap;
39use std::path::Path;
40
41use anyhow::{bail, Context, Result};
42
43use result::CborValue;
44
45pub const RUNTIME_VERSION: &str = env!("CARGO_PKG_VERSION");
46pub const WASMTIME_MAJOR: &str = "43";
47
48pub fn now_rfc3339() -> String {
49 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
50}
51
52pub struct InvokeMetric {
56 pub step: u64,
57 pub node_id: String,
58 pub brick_id: String,
59 pub result_type: String,
60 pub latency_ms: f64,
61 pub envelope_bytes: usize,
62 pub result_bytes: Option<usize>,
63}
64
65#[derive(Default)]
67pub struct ExecuteHooks<'a> {
68 pub on_invoke: Option<&'a mut dyn FnMut(InvokeMetric)>,
69}
70
71pub struct ExecuteOptions {
73 pub trace_id: Option<String>,
74 pub session_id: Option<String>,
75 pub max_steps: Option<u64>,
76 pub max_queued: u64,
77 pub all_terminals: bool,
78 pub verbose: bool,
79}
80
81impl Default for ExecuteOptions {
82 fn default() -> Self {
83 Self {
84 trace_id: None,
85 session_id: None,
86 max_steps: None,
87 max_queued: 10_000,
88 all_terminals: false,
89 verbose: false,
90 }
91 }
92}
93
94pub struct TerminalResult {
96 pub node_id: String,
97 pub brick_id: String,
98 pub step: u64,
99 pub result: result::BrickResult,
100}
101
102pub struct ResultCounts {
104 pub success: u64,
105 pub low_confidence: u64,
106 pub failure: u64,
107}
108
109pub struct ExecutionReport {
111 pub terminals: Vec<TerminalResult>,
112 pub total_steps: u64,
113 pub counts: ResultCounts,
114}
115
116pub struct ResolvedBrickInfo {
118 pub brick_id: String,
119 pub version: String,
120 pub wasm_bytes: usize,
121 pub digest: String,
122}
123
124pub struct RuntimeContext {
128 graph: manifest::GraphManifest,
129 compiled_bricks: HashMap<(String, String), engine::CompiledBrick>,
130 brick_manifests: HashMap<(String, String), manifest::BrickManifest>,
131 node_brick_key: HashMap<String, (String, String)>,
132 edges_by_source: HashMap<String, Vec<usize>>,
133 edge_by_id: HashMap<String, usize>,
134 entry_node_id: String,
135 resolved_info: Vec<ResolvedBrickInfo>,
136}
137
138struct Task {
140 node_id: String,
141 input_json: serde_json::Value,
142 trigger_source_node_id: String,
143 trigger_source_step: u64,
144 trigger_edge_id: String,
145 trigger_routing_reason: String,
146}
147
148impl RuntimeContext {
149 pub fn load(graph_path: &Path, brick_dir: &Path, brick_map: Option<&Path>) -> Result<Self> {
151 let graph = manifest::load_graph(graph_path)?;
152 let brick_map = brick_map.map(resolver::load_brick_map).transpose()?;
153 Self::from_graph(graph, brick_dir, &brick_map)
154 }
155
156 pub fn from_graph(
158 graph: manifest::GraphManifest,
159 brick_dir: &Path,
160 brick_map: &Option<resolver::BrickMap>,
161 ) -> Result<Self> {
162 if graph.nodes.is_empty() {
163 bail!("graph validation failed: graph has no nodes");
164 }
165
166 let mut compiled_bricks: HashMap<(String, String), engine::CompiledBrick> = HashMap::new();
167 let mut brick_manifests: HashMap<(String, String), manifest::BrickManifest> =
168 HashMap::new();
169 let mut node_brick_key: HashMap<String, (String, String)> = HashMap::new();
170 let mut resolved_info: Vec<ResolvedBrickInfo> = Vec::new();
171
172 for node in &graph.nodes {
173 let resolved = resolver::resolve_brick(
174 &node.brick.brick_id,
175 &node.brick.version_or_range,
176 brick_dir,
177 brick_map,
178 )?;
179 let key = (
180 node.brick.brick_id.clone(),
181 resolved.manifest.version.clone(),
182 );
183
184 if node_brick_key
185 .insert(node.node_id.clone(), key.clone())
186 .is_some()
187 {
188 bail!(
189 "graph validation failed: duplicate node_id '{}'",
190 node.node_id
191 );
192 }
193
194 if let std::collections::hash_map::Entry::Vacant(entry) =
195 compiled_bricks.entry(key.clone())
196 {
197 resolved_info.push(ResolvedBrickInfo {
198 brick_id: key.0.clone(),
199 version: key.1.clone(),
200 wasm_bytes: resolved.wasm_bytes.len(),
201 digest: resolved.manifest.artifact.digest.clone(),
202 });
203 let compiled = engine::CompiledBrick::new(&resolved.wasm_bytes)?;
204 entry.insert(compiled);
205 brick_manifests.insert(key, resolved.manifest);
206 }
207 }
208
209 let target_nodes: std::collections::HashSet<&str> =
211 graph.edges.iter().map(|e| e.target_node.as_str()).collect();
212
213 let entry_nodes: Vec<&str> = graph
214 .nodes
215 .iter()
216 .map(|n| n.node_id.as_str())
217 .filter(|id| !target_nodes.contains(id))
218 .collect();
219
220 match entry_nodes.len() {
221 0 => bail!("graph validation failed: no entry node found (all nodes are edge targets — cycle-only graph)"),
222 1 => {}
223 _ => bail!("graph validation failed: multiple entry nodes found: {:?}", entry_nodes),
224 }
225 let entry_node_id = entry_nodes[0].to_string();
226
227 let mut edges_by_source: HashMap<String, Vec<usize>> = HashMap::new();
229 let mut edge_by_id: HashMap<String, usize> = HashMap::new();
230 for (i, edge) in graph.edges.iter().enumerate() {
231 edges_by_source
232 .entry(edge.source_node.clone())
233 .or_default()
234 .push(i);
235 edge_by_id.insert(edge.edge_id.clone(), i);
236 }
237
238 Ok(Self {
239 graph,
240 compiled_bricks,
241 brick_manifests,
242 node_brick_key,
243 edges_by_source,
244 edge_by_id,
245 entry_node_id,
246 resolved_info,
247 })
248 }
249
250 pub fn emit_runtime_info(&self, tracer: &mut dyn trace::TraceSink) {
252 if tracer.enabled() {
253 tracer.emit_runtime_info(RUNTIME_VERSION, WASMTIME_MAJOR, &now_rfc3339());
254 }
255 }
256
257 pub fn graph_id(&self) -> &str {
258 &self.graph.graph_id
259 }
260 pub fn graph_version(&self) -> &str {
261 &self.graph.graph_version
262 }
263 pub fn node_count(&self) -> usize {
264 self.graph.nodes.len()
265 }
266 pub fn edge_count(&self) -> usize {
267 self.graph.edges.len()
268 }
269 pub fn entry_node_id(&self) -> &str {
270 &self.entry_node_id
271 }
272 pub fn resolved_bricks(&self) -> &[ResolvedBrickInfo] {
273 &self.resolved_info
274 }
275
276 pub fn execute(
278 &self,
279 json_input: &serde_json::Value,
280 tracer: &mut dyn trace::TraceSink,
281 hooks: &mut ExecuteHooks<'_>,
282 opts: &ExecuteOptions,
283 ) -> Result<ExecutionReport> {
284 let trace_id = opts
285 .trace_id
286 .as_deref()
287 .map(str::to_owned)
288 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
289 let session_id = opts
290 .session_id
291 .as_deref()
292 .map(str::to_owned)
293 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
294
295 let mut queue: std::collections::VecDeque<Task> = std::collections::VecDeque::new();
296 let mut step: u64 = 0;
297 let mut terminals: Vec<TerminalResult> = Vec::new();
298 let mut counts = ResultCounts {
299 success: 0,
300 low_confidence: 0,
301 failure: 0,
302 };
303 let verbose = opts.verbose;
304 let tracing = tracer.enabled();
305
306 queue.push_back(Task {
308 node_id: self.entry_node_id.clone(),
309 input_json: json_input.clone(),
310 trigger_source_node_id: envelope::ROOT_TRIGGER.source_node_id.to_string(),
311 trigger_source_step: envelope::ROOT_TRIGGER.source_step,
312 trigger_edge_id: envelope::ROOT_TRIGGER.edge_id.to_string(),
313 trigger_routing_reason: "entry".to_string(),
314 });
315
316 while let Some(task) = queue.pop_front() {
317 if let Some(max) = opts.max_steps {
319 if step >= max {
320 let msg = format!("max_steps budget ({max}) exhausted");
321 if verbose {
322 eprintln!("Safety budget: {msg}");
323 }
324 counts.failure += 1;
325 terminals.push(TerminalResult {
326 node_id: task.node_id.clone(),
327 brick_id: self
328 .node_brick_key
329 .get(&task.node_id)
330 .map(|k| k.0.clone())
331 .unwrap_or_else(|| "__runtime__".to_string()),
332 step,
333 result: result::trap_failure("RESOURCE_EXCEEDED", msg),
334 });
335 break;
336 }
337 }
338
339 let brick_key = self
340 .node_brick_key
341 .get(&task.node_id)
342 .with_context(|| format!("no brick key recorded for node '{}'", task.node_id))?;
343 let compiled = self
344 .compiled_bricks
345 .get(brick_key)
346 .with_context(|| format!("no compiled brick for key {:?}", brick_key))?;
347 let brick_manifest = self
348 .brick_manifests
349 .get(brick_key)
350 .with_context(|| format!("no manifest for key {:?}", brick_key))?;
351
352 let trigger = envelope::Trigger {
353 source_node_id: &task.trigger_source_node_id,
354 source_step: task.trigger_source_step,
355 edge_id: &task.trigger_edge_id,
356 };
357
358 let env = envelope::build_envelope(
359 &task.input_json,
360 &self.graph.graph_id,
361 &self.graph.graph_version,
362 &task.node_id,
363 &trace_id,
364 &session_id,
365 step,
366 &trigger,
367 )?;
368
369 let mut pre_invoke_failure: Option<result::BrickResult> = None;
371 if let Some(max_input) = brick_manifest.limits.max_input_bytes {
372 if env.len() as u64 > max_input {
373 let msg =
374 format!(
375 "envelope too large for brick '{}': {} bytes > limits.max_input_bytes {}",
376 brick_key.0, env.len(), max_input,
377 );
378 if verbose {
379 eprintln!(" {msg}");
380 }
381 pre_invoke_failure = Some(result::trap_failure("INVALID_INPUT", msg));
382 }
383 }
384
385 if verbose {
386 if pre_invoke_failure.is_some() {
387 eprintln!(
388 "[step {}] Skipping invoke for brick '{}' node '{}' (pre-invoke failure)",
389 step, brick_key.0, task.node_id,
390 );
391 } else {
392 eprintln!(
393 "[step {}] Invoking brick '{}' node '{}' ({} byte envelope)",
394 step,
395 brick_key.0,
396 task.node_id,
397 env.len(),
398 );
399 }
400 }
401
402 let (brick_result, raw_result_bytes, latency_ms) = if let Some(br) = pre_invoke_failure
404 {
405 (br, None, 0.0)
406 } else {
407 let start = std::time::Instant::now();
408 let invoke_result = compiled.invoke(
409 &env,
410 brick_manifest.limits.max_mem_mb,
411 brick_manifest.limits.max_output_bytes,
412 );
413 let latency = start.elapsed().as_secs_f64() * 1000.0;
414
415 match invoke_result {
416 Ok(result_bytes) => {
417 if verbose {
418 eprintln!(" Got {} byte result", result_bytes.len());
419 }
420 let decoded = match result::decode_result(&result_bytes) {
421 Ok(r) => r,
422 Err(e) => {
423 let msg = format!("{e:#}");
424 if verbose {
425 eprintln!(" Result rejected: {msg}");
426 }
427 result::trap_failure("RUNTIME_REJECTED", msg)
428 }
429 };
430 (decoded, Some(result_bytes), latency)
431 }
432 Err(e) => {
433 let msg = format!("{e:#}");
434 let m = msg.to_lowercase();
435 let error_class = if m.contains("alloc returned 0")
436 || m.contains("oom")
437 || m.contains("result too large")
438 || m.contains("memory limit")
439 || m.contains("fuel")
440 || m.contains("resource_exceeded")
441 {
442 "RESOURCE_EXCEEDED"
443 } else {
444 "COMPUTATION_ERROR"
445 };
446 if verbose {
447 eprintln!(" Brick trap: {msg}");
448 }
449 (result::trap_failure(error_class, msg), None, latency)
450 }
451 }
452 };
453
454 if tracing {
456 tracer.emit_invoke(
457 &trace_id,
458 &session_id,
459 step,
460 &self.graph.graph_id,
461 &self.graph.graph_version,
462 &brick_key.0,
463 &brick_key.1,
464 &brick_manifest.artifact.digest,
465 &task.node_id,
466 &env,
467 &task.trigger_source_node_id,
468 task.trigger_source_step,
469 &task.trigger_edge_id,
470 &task.trigger_routing_reason,
471 &brick_result,
472 raw_result_bytes.as_deref(),
473 latency_ms,
474 &now_rfc3339(),
475 );
476 }
477
478 match &brick_result {
480 result::BrickResult::Success { .. } => counts.success += 1,
481 result::BrickResult::LowConfidence { .. } => counts.low_confidence += 1,
482 result::BrickResult::Failure { .. } => counts.failure += 1,
483 }
484
485 if let Some(ref mut on_invoke) = hooks.on_invoke {
487 on_invoke(InvokeMetric {
488 step,
489 node_id: task.node_id.clone(),
490 brick_id: brick_key.0.clone(),
491 result_type: brick_result.result_type().to_string(),
492 latency_ms,
493 envelope_bytes: env.len(),
494 result_bytes: raw_result_bytes.as_ref().map(|b| b.len()),
495 });
496 }
497
498 if verbose {
499 eprintln!(" Result type: {}", brick_result.result_type());
500 }
501
502 let outbound_indices = self
504 .edges_by_source
505 .get(task.node_id.as_str())
506 .cloned()
507 .unwrap_or_default();
508 let outbound: Vec<&manifest::Edge> = outbound_indices
509 .iter()
510 .map(|&i| &self.graph.edges[i])
511 .collect();
512
513 let output_confidence = brick_result.output().and_then(mapping::extract_confidence);
514
515 let routed = router::route(&outbound, &brick_result, output_confidence);
516
517 if routed.is_empty() {
518 if verbose {
519 eprintln!(" Terminal node (no outbound edges dispatched)");
520 }
521 terminals.push(TerminalResult {
522 node_id: task.node_id.clone(),
523 brick_id: brick_key.0.clone(),
524 step,
525 result: brick_result,
526 });
527 } else {
528 for routed_edge in &routed {
529 let edge_idx = self
530 .edge_by_id
531 .get(routed_edge.edge_id.as_str())
532 .with_context(|| {
533 format!("routed edge '{}' not found in graph", routed_edge.edge_id)
534 })?;
535 let edge_def = &self.graph.edges[*edge_idx];
536
537 let mapped_input = if edge_def.mapping.is_empty() {
538 match brick_result.output() {
539 Some(output) => {
540 serde_json::json!({ "input": mapping::cbor_to_json(output) })
541 }
542 None => serde_json::json!({ "input": null }),
543 }
544 } else {
545 let source_root = match brick_result.output() {
546 Some(output) => CborValue::Map(vec![(
547 CborValue::Text("output".to_string()),
548 output.clone(),
549 )]),
550 None => CborValue::Map(vec![]),
551 };
552
553 let mut target_cbor = CborValue::Map(vec![]);
554 for fm in &edge_def.mapping {
555 let resolved = mapping::resolve_path(&source_root, &fm.from)
556 .with_context(|| {
557 format!(
558 "mapping '{}' → '{}': source path '{}' not found in output",
559 fm.from, fm.to, fm.from,
560 )
561 })?;
562 let overlay = mapping::set_path(&fm.to, resolved);
563 target_cbor = mapping::merge_maps(target_cbor, overlay);
564 }
565
566 mapping::cbor_to_json(&target_cbor)
567 };
568
569 if verbose {
570 eprintln!(
571 " Route: edge '{}' → node '{}'",
572 routed_edge.edge_id, routed_edge.target_node,
573 );
574 }
575
576 if queue.len() as u64 >= opts.max_queued {
578 bail!(
579 "safety budget exceeded: queue size {} >= max_queued {}",
580 queue.len(),
581 opts.max_queued,
582 );
583 }
584
585 queue.push_back(Task {
586 node_id: routed_edge.target_node.clone(),
587 input_json: mapped_input,
588 trigger_source_node_id: task.node_id.clone(),
589 trigger_source_step: step,
590 trigger_edge_id: routed_edge.edge_id.clone(),
591 trigger_routing_reason: "routed".to_string(),
592 });
593 }
594 }
595
596 step += 1;
597 }
598
599 Ok(ExecutionReport {
600 terminals,
601 total_steps: step,
602 counts,
603 })
604 }
605}