Skip to main content

ncp_runtime/
lib.rs

1pub mod engine;
2pub mod envelope;
3pub mod manifest;
4pub mod mapping;
5pub mod resolver;
6pub mod result;
7pub mod router;
8pub mod trace;
9
10use std::collections::HashMap;
11use std::path::Path;
12
13use anyhow::{bail, Context, Result};
14
15use result::CborValue;
16
17pub const RUNTIME_VERSION: &str = env!("CARGO_PKG_VERSION");
18pub const WASMTIME_MAJOR: &str = "43";
19
20pub fn now_rfc3339() -> String {
21    chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
22}
23
24// ── Execute types ──────────────────────────────────────────────────
25
26/// Per-invoke metric emitted to hooks.
27pub struct InvokeMetric {
28    pub step: u64,
29    pub node_id: String,
30    pub brick_id: String,
31    pub result_type: String,
32    pub latency_ms: f64,
33    pub envelope_bytes: usize,
34    pub result_bytes: Option<usize>,
35}
36
37/// Hooks for streaming execution events.
38#[derive(Default)]
39pub struct ExecuteHooks<'a> {
40    pub on_invoke: Option<&'a mut dyn FnMut(InvokeMetric)>,
41}
42
43/// Options for a single execute() call.
44pub struct ExecuteOptions {
45    pub trace_id: Option<String>,
46    pub session_id: Option<String>,
47    pub max_steps: Option<u64>,
48    pub max_queued: u64,
49    pub all_terminals: bool,
50    pub verbose: bool,
51}
52
53impl Default for ExecuteOptions {
54    fn default() -> Self {
55        Self {
56            trace_id: None,
57            session_id: None,
58            max_steps: None,
59            max_queued: 10_000,
60            all_terminals: false,
61            verbose: false,
62        }
63    }
64}
65
66/// A terminal result from graph execution.
67pub struct TerminalResult {
68    pub node_id: String,
69    pub brick_id: String,
70    pub step: u64,
71    pub result: result::BrickResult,
72}
73
74/// Execution result counts.
75pub struct ResultCounts {
76    pub success: u64,
77    pub low_confidence: u64,
78    pub failure: u64,
79}
80
81/// Report from a single graph execution.
82pub struct ExecutionReport {
83    pub terminals: Vec<TerminalResult>,
84    pub total_steps: u64,
85    pub counts: ResultCounts,
86}
87
88/// Info about a resolved brick, for caller-side logging.
89pub struct ResolvedBrickInfo {
90    pub brick_id: String,
91    pub version: String,
92    pub wasm_bytes: usize,
93    pub digest: String,
94}
95
96// ── RuntimeContext ──────────────────────────────────────────────────
97
98/// Loaded + compiled graph context. Reusable across multiple execute() calls.
99pub struct RuntimeContext {
100    graph: manifest::GraphManifest,
101    compiled_bricks: HashMap<(String, String), engine::CompiledBrick>,
102    brick_manifests: HashMap<(String, String), manifest::BrickManifest>,
103    node_brick_key: HashMap<String, (String, String)>,
104    edges_by_source: HashMap<String, Vec<usize>>,
105    edge_by_id: HashMap<String, usize>,
106    entry_node_id: String,
107    resolved_info: Vec<ResolvedBrickInfo>,
108}
109
110/// A task in the FIFO queue.
111struct Task {
112    node_id: String,
113    input_json: serde_json::Value,
114    trigger_source_node_id: String,
115    trigger_source_step: u64,
116    trigger_edge_id: String,
117    trigger_routing_reason: String,
118}
119
120impl RuntimeContext {
121    /// Load from file paths (CLI convenience).
122    pub fn load(graph_path: &Path, brick_dir: &Path, brick_map: Option<&Path>) -> Result<Self> {
123        let graph = manifest::load_graph(graph_path)?;
124        let brick_map = brick_map.map(resolver::load_brick_map).transpose()?;
125        Self::from_graph(graph, brick_dir, &brick_map)
126    }
127
128    /// Build from a pre-parsed GraphManifest (for embedding / Phase 3A).
129    pub fn from_graph(
130        graph: manifest::GraphManifest,
131        brick_dir: &Path,
132        brick_map: &Option<resolver::BrickMap>,
133    ) -> Result<Self> {
134        if graph.nodes.is_empty() {
135            bail!("graph validation failed: graph has no nodes");
136        }
137
138        let mut compiled_bricks: HashMap<(String, String), engine::CompiledBrick> = HashMap::new();
139        let mut brick_manifests: HashMap<(String, String), manifest::BrickManifest> =
140            HashMap::new();
141        let mut node_brick_key: HashMap<String, (String, String)> = HashMap::new();
142        let mut resolved_info: Vec<ResolvedBrickInfo> = Vec::new();
143
144        for node in &graph.nodes {
145            let resolved = resolver::resolve_brick(
146                &node.brick.brick_id,
147                &node.brick.version_or_range,
148                brick_dir,
149                brick_map,
150            )?;
151            let key = (
152                node.brick.brick_id.clone(),
153                resolved.manifest.version.clone(),
154            );
155
156            if node_brick_key
157                .insert(node.node_id.clone(), key.clone())
158                .is_some()
159            {
160                bail!(
161                    "graph validation failed: duplicate node_id '{}'",
162                    node.node_id
163                );
164            }
165
166            if let std::collections::hash_map::Entry::Vacant(entry) =
167                compiled_bricks.entry(key.clone())
168            {
169                resolved_info.push(ResolvedBrickInfo {
170                    brick_id: key.0.clone(),
171                    version: key.1.clone(),
172                    wasm_bytes: resolved.wasm_bytes.len(),
173                    digest: resolved.manifest.artifact.digest.clone(),
174                });
175                let compiled = engine::CompiledBrick::new(&resolved.wasm_bytes)?;
176                entry.insert(compiled);
177                brick_manifests.insert(key, resolved.manifest);
178            }
179        }
180
181        // Detect entry node: not the target of any edge
182        let target_nodes: std::collections::HashSet<&str> =
183            graph.edges.iter().map(|e| e.target_node.as_str()).collect();
184
185        let entry_nodes: Vec<&str> = graph
186            .nodes
187            .iter()
188            .map(|n| n.node_id.as_str())
189            .filter(|id| !target_nodes.contains(id))
190            .collect();
191
192        match entry_nodes.len() {
193            0 => bail!("graph validation failed: no entry node found (all nodes are edge targets — cycle-only graph)"),
194            1 => {}
195            _ => bail!("graph validation failed: multiple entry nodes found: {:?}", entry_nodes),
196        }
197        let entry_node_id = entry_nodes[0].to_string();
198
199        // Pre-index edges by source node and by edge ID
200        let mut edges_by_source: HashMap<String, Vec<usize>> = HashMap::new();
201        let mut edge_by_id: HashMap<String, usize> = HashMap::new();
202        for (i, edge) in graph.edges.iter().enumerate() {
203            edges_by_source
204                .entry(edge.source_node.clone())
205                .or_default()
206                .push(i);
207            edge_by_id.insert(edge.edge_id.clone(), i);
208        }
209
210        Ok(Self {
211            graph,
212            compiled_bricks,
213            brick_manifests,
214            node_brick_key,
215            edges_by_source,
216            edge_by_id,
217            entry_node_id,
218            resolved_info,
219        })
220    }
221
222    /// Emit the runtime_info trace record.
223    pub fn emit_runtime_info(&self, tracer: &mut dyn trace::TraceSink) {
224        if tracer.enabled() {
225            tracer.emit_runtime_info(RUNTIME_VERSION, WASMTIME_MAJOR, &now_rfc3339());
226        }
227    }
228
229    pub fn graph_id(&self) -> &str {
230        &self.graph.graph_id
231    }
232    pub fn graph_version(&self) -> &str {
233        &self.graph.graph_version
234    }
235    pub fn node_count(&self) -> usize {
236        self.graph.nodes.len()
237    }
238    pub fn edge_count(&self) -> usize {
239        self.graph.edges.len()
240    }
241    pub fn entry_node_id(&self) -> &str {
242        &self.entry_node_id
243    }
244    pub fn resolved_bricks(&self) -> &[ResolvedBrickInfo] {
245        &self.resolved_info
246    }
247
248    /// Execute the graph with the given input.
249    pub fn execute(
250        &self,
251        json_input: &serde_json::Value,
252        tracer: &mut dyn trace::TraceSink,
253        hooks: &mut ExecuteHooks<'_>,
254        opts: &ExecuteOptions,
255    ) -> Result<ExecutionReport> {
256        let trace_id = opts
257            .trace_id
258            .as_deref()
259            .map(str::to_owned)
260            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
261        let session_id = opts
262            .session_id
263            .as_deref()
264            .map(str::to_owned)
265            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
266
267        let mut queue: std::collections::VecDeque<Task> = std::collections::VecDeque::new();
268        let mut step: u64 = 0;
269        let mut terminals: Vec<TerminalResult> = Vec::new();
270        let mut counts = ResultCounts {
271            success: 0,
272            low_confidence: 0,
273            failure: 0,
274        };
275        let verbose = opts.verbose;
276        let tracing = tracer.enabled();
277
278        // Push entry task
279        queue.push_back(Task {
280            node_id: self.entry_node_id.clone(),
281            input_json: json_input.clone(),
282            trigger_source_node_id: envelope::ROOT_TRIGGER.source_node_id.to_string(),
283            trigger_source_step: envelope::ROOT_TRIGGER.source_step,
284            trigger_edge_id: envelope::ROOT_TRIGGER.edge_id.to_string(),
285            trigger_routing_reason: "entry".to_string(),
286        });
287
288        while let Some(task) = queue.pop_front() {
289            // Safety budget: max_steps
290            if let Some(max) = opts.max_steps {
291                if step >= max {
292                    let msg = format!("max_steps budget ({max}) exhausted");
293                    if verbose {
294                        eprintln!("Safety budget: {msg}");
295                    }
296                    counts.failure += 1;
297                    terminals.push(TerminalResult {
298                        node_id: task.node_id.clone(),
299                        brick_id: self
300                            .node_brick_key
301                            .get(&task.node_id)
302                            .map(|k| k.0.clone())
303                            .unwrap_or_else(|| "__runtime__".to_string()),
304                        step,
305                        result: result::trap_failure("RESOURCE_EXCEEDED", msg),
306                    });
307                    break;
308                }
309            }
310
311            let brick_key = self
312                .node_brick_key
313                .get(&task.node_id)
314                .with_context(|| format!("no brick key recorded for node '{}'", task.node_id))?;
315            let compiled = self
316                .compiled_bricks
317                .get(brick_key)
318                .with_context(|| format!("no compiled brick for key {:?}", brick_key))?;
319            let brick_manifest = self
320                .brick_manifests
321                .get(brick_key)
322                .with_context(|| format!("no manifest for key {:?}", brick_key))?;
323
324            let trigger = envelope::Trigger {
325                source_node_id: &task.trigger_source_node_id,
326                source_step: task.trigger_source_step,
327                edge_id: &task.trigger_edge_id,
328            };
329
330            let env = envelope::build_envelope(
331                &task.input_json,
332                &self.graph.graph_id,
333                &self.graph.graph_version,
334                &task.node_id,
335                &trace_id,
336                &session_id,
337                step,
338                &trigger,
339            )?;
340
341            // Check max_input_bytes — soft failure (routes via on_error, doesn't abort run)
342            let mut pre_invoke_failure: Option<result::BrickResult> = None;
343            if let Some(max_input) = brick_manifest.limits.max_input_bytes {
344                if env.len() as u64 > max_input {
345                    let msg =
346                        format!(
347                        "envelope too large for brick '{}': {} bytes > limits.max_input_bytes {}",
348                        brick_key.0, env.len(), max_input,
349                    );
350                    if verbose {
351                        eprintln!("  {msg}");
352                    }
353                    pre_invoke_failure = Some(result::trap_failure("INVALID_INPUT", msg));
354                }
355            }
356
357            if verbose {
358                if pre_invoke_failure.is_some() {
359                    eprintln!(
360                        "[step {}] Skipping invoke for brick '{}' node '{}' (pre-invoke failure)",
361                        step, brick_key.0, task.node_id,
362                    );
363                } else {
364                    eprintln!(
365                        "[step {}] Invoking brick '{}' node '{}' ({} byte envelope)",
366                        step,
367                        brick_key.0,
368                        task.node_id,
369                        env.len(),
370                    );
371                }
372            }
373
374            // Invoke brick (or use pre-invoke failure)
375            let (brick_result, raw_result_bytes, latency_ms) = if let Some(br) = pre_invoke_failure
376            {
377                (br, None, 0.0)
378            } else {
379                let start = std::time::Instant::now();
380                let invoke_result = compiled.invoke(
381                    &env,
382                    brick_manifest.limits.max_mem_mb,
383                    brick_manifest.limits.max_output_bytes,
384                );
385                let latency = start.elapsed().as_secs_f64() * 1000.0;
386
387                match invoke_result {
388                    Ok(result_bytes) => {
389                        if verbose {
390                            eprintln!("  Got {} byte result", result_bytes.len());
391                        }
392                        let decoded = match result::decode_result(&result_bytes) {
393                            Ok(r) => r,
394                            Err(e) => {
395                                let msg = format!("{e:#}");
396                                if verbose {
397                                    eprintln!("  Result rejected: {msg}");
398                                }
399                                result::trap_failure("RUNTIME_REJECTED", msg)
400                            }
401                        };
402                        (decoded, Some(result_bytes), latency)
403                    }
404                    Err(e) => {
405                        let msg = format!("{e:#}");
406                        let m = msg.to_lowercase();
407                        let error_class = if m.contains("alloc returned 0")
408                            || m.contains("oom")
409                            || m.contains("result too large")
410                            || m.contains("memory limit")
411                            || m.contains("fuel")
412                            || m.contains("resource_exceeded")
413                        {
414                            "RESOURCE_EXCEEDED"
415                        } else {
416                            "COMPUTATION_ERROR"
417                        };
418                        if verbose {
419                            eprintln!("  Brick trap: {msg}");
420                        }
421                        (result::trap_failure(error_class, msg), None, latency)
422                    }
423                }
424            };
425
426            // Emit invoke trace record (skip entirely when tracing disabled)
427            if tracing {
428                tracer.emit_invoke(
429                    &trace_id,
430                    &session_id,
431                    step,
432                    &self.graph.graph_id,
433                    &self.graph.graph_version,
434                    &brick_key.0,
435                    &brick_key.1,
436                    &brick_manifest.artifact.digest,
437                    &task.node_id,
438                    &env,
439                    &task.trigger_source_node_id,
440                    task.trigger_source_step,
441                    &task.trigger_edge_id,
442                    &task.trigger_routing_reason,
443                    &brick_result,
444                    raw_result_bytes.as_deref(),
445                    latency_ms,
446                    &now_rfc3339(),
447                );
448            }
449
450            // Update counts
451            match &brick_result {
452                result::BrickResult::Success { .. } => counts.success += 1,
453                result::BrickResult::LowConfidence { .. } => counts.low_confidence += 1,
454                result::BrickResult::Failure { .. } => counts.failure += 1,
455            }
456
457            // Fire hook
458            if let Some(ref mut on_invoke) = hooks.on_invoke {
459                on_invoke(InvokeMetric {
460                    step,
461                    node_id: task.node_id.clone(),
462                    brick_id: brick_key.0.clone(),
463                    result_type: brick_result.result_type().to_string(),
464                    latency_ms,
465                    envelope_bytes: env.len(),
466                    result_bytes: raw_result_bytes.as_ref().map(|b| b.len()),
467                });
468            }
469
470            if verbose {
471                eprintln!("  Result type: {}", brick_result.result_type());
472            }
473
474            // ── Routing ────────────────────────────────────────────
475            let outbound_indices = self
476                .edges_by_source
477                .get(task.node_id.as_str())
478                .cloned()
479                .unwrap_or_default();
480            let outbound: Vec<&manifest::Edge> = outbound_indices
481                .iter()
482                .map(|&i| &self.graph.edges[i])
483                .collect();
484
485            let output_confidence = brick_result.output().and_then(mapping::extract_confidence);
486
487            let routed = router::route(&outbound, &brick_result, output_confidence);
488
489            if routed.is_empty() {
490                if verbose {
491                    eprintln!("  Terminal node (no outbound edges dispatched)");
492                }
493                terminals.push(TerminalResult {
494                    node_id: task.node_id.clone(),
495                    brick_id: brick_key.0.clone(),
496                    step,
497                    result: brick_result,
498                });
499            } else {
500                for routed_edge in &routed {
501                    let edge_idx = self
502                        .edge_by_id
503                        .get(routed_edge.edge_id.as_str())
504                        .with_context(|| {
505                            format!("routed edge '{}' not found in graph", routed_edge.edge_id)
506                        })?;
507                    let edge_def = &self.graph.edges[*edge_idx];
508
509                    let mapped_input = if edge_def.mapping.is_empty() {
510                        match brick_result.output() {
511                            Some(output) => {
512                                serde_json::json!({ "input": mapping::cbor_to_json(output) })
513                            }
514                            None => serde_json::json!({ "input": null }),
515                        }
516                    } else {
517                        let source_root = match brick_result.output() {
518                            Some(output) => CborValue::Map(vec![(
519                                CborValue::Text("output".to_string()),
520                                output.clone(),
521                            )]),
522                            None => CborValue::Map(vec![]),
523                        };
524
525                        let mut target_cbor = CborValue::Map(vec![]);
526                        for fm in &edge_def.mapping {
527                            let resolved = mapping::resolve_path(&source_root, &fm.from)
528                                .with_context(|| {
529                                    format!(
530                                        "mapping '{}' → '{}': source path '{}' not found in output",
531                                        fm.from, fm.to, fm.from,
532                                    )
533                                })?;
534                            let overlay = mapping::set_path(&fm.to, resolved);
535                            target_cbor = mapping::merge_maps(target_cbor, overlay);
536                        }
537
538                        mapping::cbor_to_json(&target_cbor)
539                    };
540
541                    if verbose {
542                        eprintln!(
543                            "  Route: edge '{}' → node '{}'",
544                            routed_edge.edge_id, routed_edge.target_node,
545                        );
546                    }
547
548                    // Safety budget: max_queued
549                    if queue.len() as u64 >= opts.max_queued {
550                        bail!(
551                            "safety budget exceeded: queue size {} >= max_queued {}",
552                            queue.len(),
553                            opts.max_queued,
554                        );
555                    }
556
557                    queue.push_back(Task {
558                        node_id: routed_edge.target_node.clone(),
559                        input_json: mapped_input,
560                        trigger_source_node_id: task.node_id.clone(),
561                        trigger_source_step: step,
562                        trigger_edge_id: routed_edge.edge_id.clone(),
563                        trigger_routing_reason: "routed".to_string(),
564                    });
565                }
566            }
567
568            step += 1;
569        }
570
571        Ok(ExecutionReport {
572            terminals,
573            total_steps: step,
574            counts,
575        })
576    }
577}