Skip to main content

ncp_runtime/
lib.rs

1//! NCP reference runtime.
2//!
3//! `ncp-runtime` provides the reference executor and CLI for the
4//! Neural Computation Protocol: composable, auditable WASM Brick graphs
5//! for agentic AI systems.
6//!
7//! Most users start with the `ncp` CLI:
8//!
9//! ```text
10//! cargo install ncp-runtime --version 0.3.6 --locked
11//! ncp --version
12//! ```
13//!
14//! The runtime loads graph manifests, verifies Brick digests, executes
15//! WASM Bricks through Wasmtime, routes results deterministically, and
16//! emits replayable JSONL traces.
17//!
18//! Useful links:
19//!
20//! - [Project README](https://github.com/madeinplutofabio/neural-computation-protocol)
21//! - [Install guide](https://github.com/madeinplutofabio/neural-computation-protocol/blob/main/docs/INSTALL.md)
22//! - [Adoption guide](https://github.com/madeinplutofabio/neural-computation-protocol/blob/main/docs/ADOPTION_GUIDE.md)
23//! - [Protocol spec](https://github.com/madeinplutofabio/neural-computation-protocol/blob/main/spec/ncp-v0.2.3.md)
24//!
25//! Note: the CLI/runtime is stable enough for evaluation and early
26//! integration. Treat the Rust module API as reference-runtime internals
27//! until the SDK work lands.
28
29pub mod engine;
30pub mod envelope;
31pub mod manifest;
32pub mod mapping;
33pub mod resolver;
34pub mod result;
35pub mod router;
36pub mod trace;
37
38use std::collections::HashMap;
39use std::path::Path;
40
41use anyhow::{bail, Context, Result};
42
43use result::CborValue;
44
45pub const RUNTIME_VERSION: &str = env!("CARGO_PKG_VERSION");
46pub const WASMTIME_MAJOR: &str = "43";
47
48pub fn now_rfc3339() -> String {
49    chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
50}
51
52// ── Execute types ──────────────────────────────────────────────────
53
54/// Per-invoke metric emitted to hooks.
55pub struct InvokeMetric {
56    pub step: u64,
57    pub node_id: String,
58    pub brick_id: String,
59    pub result_type: String,
60    pub latency_ms: f64,
61    pub envelope_bytes: usize,
62    pub result_bytes: Option<usize>,
63}
64
65/// Hooks for streaming execution events.
66#[derive(Default)]
67pub struct ExecuteHooks<'a> {
68    pub on_invoke: Option<&'a mut dyn FnMut(InvokeMetric)>,
69}
70
71/// Options for a single execute() call.
72pub struct ExecuteOptions {
73    pub trace_id: Option<String>,
74    pub session_id: Option<String>,
75    pub max_steps: Option<u64>,
76    pub max_queued: u64,
77    pub all_terminals: bool,
78    pub verbose: bool,
79}
80
81impl Default for ExecuteOptions {
82    fn default() -> Self {
83        Self {
84            trace_id: None,
85            session_id: None,
86            max_steps: None,
87            max_queued: 10_000,
88            all_terminals: false,
89            verbose: false,
90        }
91    }
92}
93
94/// A terminal result from graph execution.
95pub struct TerminalResult {
96    pub node_id: String,
97    pub brick_id: String,
98    pub step: u64,
99    pub result: result::BrickResult,
100}
101
102/// Execution result counts.
103pub struct ResultCounts {
104    pub success: u64,
105    pub low_confidence: u64,
106    pub failure: u64,
107}
108
109/// Report from a single graph execution.
110pub struct ExecutionReport {
111    pub terminals: Vec<TerminalResult>,
112    pub total_steps: u64,
113    pub counts: ResultCounts,
114}
115
116/// Info about a resolved brick, for caller-side logging.
117pub struct ResolvedBrickInfo {
118    pub brick_id: String,
119    pub version: String,
120    pub wasm_bytes: usize,
121    pub digest: String,
122}
123
124// ── RuntimeContext ──────────────────────────────────────────────────
125
126/// Loaded + compiled graph context. Reusable across multiple execute() calls.
127pub struct RuntimeContext {
128    graph: manifest::GraphManifest,
129    compiled_bricks: HashMap<(String, String), engine::CompiledBrick>,
130    brick_manifests: HashMap<(String, String), manifest::BrickManifest>,
131    node_brick_key: HashMap<String, (String, String)>,
132    edges_by_source: HashMap<String, Vec<usize>>,
133    edge_by_id: HashMap<String, usize>,
134    entry_node_id: String,
135    resolved_info: Vec<ResolvedBrickInfo>,
136}
137
138/// A task in the FIFO queue.
139struct Task {
140    node_id: String,
141    input_json: serde_json::Value,
142    trigger_source_node_id: String,
143    trigger_source_step: u64,
144    trigger_edge_id: String,
145    trigger_routing_reason: String,
146}
147
148impl RuntimeContext {
149    /// Load from file paths (CLI convenience).
150    pub fn load(graph_path: &Path, brick_dir: &Path, brick_map: Option<&Path>) -> Result<Self> {
151        let graph = manifest::load_graph(graph_path)?;
152        let brick_map = brick_map.map(resolver::load_brick_map).transpose()?;
153        Self::from_graph(graph, brick_dir, &brick_map)
154    }
155
156    /// Build from a pre-parsed GraphManifest (for embedding / Phase 3A).
157    pub fn from_graph(
158        graph: manifest::GraphManifest,
159        brick_dir: &Path,
160        brick_map: &Option<resolver::BrickMap>,
161    ) -> Result<Self> {
162        if graph.nodes.is_empty() {
163            bail!("graph validation failed: graph has no nodes");
164        }
165
166        let mut compiled_bricks: HashMap<(String, String), engine::CompiledBrick> = HashMap::new();
167        let mut brick_manifests: HashMap<(String, String), manifest::BrickManifest> =
168            HashMap::new();
169        let mut node_brick_key: HashMap<String, (String, String)> = HashMap::new();
170        let mut resolved_info: Vec<ResolvedBrickInfo> = Vec::new();
171
172        for node in &graph.nodes {
173            let resolved = resolver::resolve_brick(
174                &node.brick.brick_id,
175                &node.brick.version_or_range,
176                brick_dir,
177                brick_map,
178            )?;
179            let key = (
180                node.brick.brick_id.clone(),
181                resolved.manifest.version.clone(),
182            );
183
184            if node_brick_key
185                .insert(node.node_id.clone(), key.clone())
186                .is_some()
187            {
188                bail!(
189                    "graph validation failed: duplicate node_id '{}'",
190                    node.node_id
191                );
192            }
193
194            if let std::collections::hash_map::Entry::Vacant(entry) =
195                compiled_bricks.entry(key.clone())
196            {
197                resolved_info.push(ResolvedBrickInfo {
198                    brick_id: key.0.clone(),
199                    version: key.1.clone(),
200                    wasm_bytes: resolved.wasm_bytes.len(),
201                    digest: resolved.manifest.artifact.digest.clone(),
202                });
203                let compiled = engine::CompiledBrick::new(&resolved.wasm_bytes)?;
204                entry.insert(compiled);
205                brick_manifests.insert(key, resolved.manifest);
206            }
207        }
208
209        // Detect entry node: not the target of any edge
210        let target_nodes: std::collections::HashSet<&str> =
211            graph.edges.iter().map(|e| e.target_node.as_str()).collect();
212
213        let entry_nodes: Vec<&str> = graph
214            .nodes
215            .iter()
216            .map(|n| n.node_id.as_str())
217            .filter(|id| !target_nodes.contains(id))
218            .collect();
219
220        match entry_nodes.len() {
221            0 => bail!("graph validation failed: no entry node found (all nodes are edge targets — cycle-only graph)"),
222            1 => {}
223            _ => bail!("graph validation failed: multiple entry nodes found: {:?}", entry_nodes),
224        }
225        let entry_node_id = entry_nodes[0].to_string();
226
227        // Pre-index edges by source node and by edge ID
228        let mut edges_by_source: HashMap<String, Vec<usize>> = HashMap::new();
229        let mut edge_by_id: HashMap<String, usize> = HashMap::new();
230        for (i, edge) in graph.edges.iter().enumerate() {
231            edges_by_source
232                .entry(edge.source_node.clone())
233                .or_default()
234                .push(i);
235            edge_by_id.insert(edge.edge_id.clone(), i);
236        }
237
238        Ok(Self {
239            graph,
240            compiled_bricks,
241            brick_manifests,
242            node_brick_key,
243            edges_by_source,
244            edge_by_id,
245            entry_node_id,
246            resolved_info,
247        })
248    }
249
250    /// Emit the runtime_info trace record.
251    pub fn emit_runtime_info(&self, tracer: &mut dyn trace::TraceSink) {
252        if tracer.enabled() {
253            tracer.emit_runtime_info(RUNTIME_VERSION, WASMTIME_MAJOR, &now_rfc3339());
254        }
255    }
256
257    pub fn graph_id(&self) -> &str {
258        &self.graph.graph_id
259    }
260    pub fn graph_version(&self) -> &str {
261        &self.graph.graph_version
262    }
263    pub fn node_count(&self) -> usize {
264        self.graph.nodes.len()
265    }
266    pub fn edge_count(&self) -> usize {
267        self.graph.edges.len()
268    }
269    pub fn entry_node_id(&self) -> &str {
270        &self.entry_node_id
271    }
272    pub fn resolved_bricks(&self) -> &[ResolvedBrickInfo] {
273        &self.resolved_info
274    }
275
276    /// Execute the graph with the given input.
277    pub fn execute(
278        &self,
279        json_input: &serde_json::Value,
280        tracer: &mut dyn trace::TraceSink,
281        hooks: &mut ExecuteHooks<'_>,
282        opts: &ExecuteOptions,
283    ) -> Result<ExecutionReport> {
284        let trace_id = opts
285            .trace_id
286            .as_deref()
287            .map(str::to_owned)
288            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
289        let session_id = opts
290            .session_id
291            .as_deref()
292            .map(str::to_owned)
293            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
294
295        let mut queue: std::collections::VecDeque<Task> = std::collections::VecDeque::new();
296        let mut step: u64 = 0;
297        let mut terminals: Vec<TerminalResult> = Vec::new();
298        let mut counts = ResultCounts {
299            success: 0,
300            low_confidence: 0,
301            failure: 0,
302        };
303        let verbose = opts.verbose;
304        let tracing = tracer.enabled();
305
306        // Push entry task
307        queue.push_back(Task {
308            node_id: self.entry_node_id.clone(),
309            input_json: json_input.clone(),
310            trigger_source_node_id: envelope::ROOT_TRIGGER.source_node_id.to_string(),
311            trigger_source_step: envelope::ROOT_TRIGGER.source_step,
312            trigger_edge_id: envelope::ROOT_TRIGGER.edge_id.to_string(),
313            trigger_routing_reason: "entry".to_string(),
314        });
315
316        while let Some(task) = queue.pop_front() {
317            // Safety budget: max_steps
318            if let Some(max) = opts.max_steps {
319                if step >= max {
320                    let msg = format!("max_steps budget ({max}) exhausted");
321                    if verbose {
322                        eprintln!("Safety budget: {msg}");
323                    }
324                    counts.failure += 1;
325                    terminals.push(TerminalResult {
326                        node_id: task.node_id.clone(),
327                        brick_id: self
328                            .node_brick_key
329                            .get(&task.node_id)
330                            .map(|k| k.0.clone())
331                            .unwrap_or_else(|| "__runtime__".to_string()),
332                        step,
333                        result: result::trap_failure("RESOURCE_EXCEEDED", msg),
334                    });
335                    break;
336                }
337            }
338
339            let brick_key = self
340                .node_brick_key
341                .get(&task.node_id)
342                .with_context(|| format!("no brick key recorded for node '{}'", task.node_id))?;
343            let compiled = self
344                .compiled_bricks
345                .get(brick_key)
346                .with_context(|| format!("no compiled brick for key {:?}", brick_key))?;
347            let brick_manifest = self
348                .brick_manifests
349                .get(brick_key)
350                .with_context(|| format!("no manifest for key {:?}", brick_key))?;
351
352            let trigger = envelope::Trigger {
353                source_node_id: &task.trigger_source_node_id,
354                source_step: task.trigger_source_step,
355                edge_id: &task.trigger_edge_id,
356            };
357
358            let env = envelope::build_envelope(
359                &task.input_json,
360                &self.graph.graph_id,
361                &self.graph.graph_version,
362                &task.node_id,
363                &trace_id,
364                &session_id,
365                step,
366                &trigger,
367            )?;
368
369            // Check max_input_bytes — soft failure (routes via on_error, doesn't abort run)
370            let mut pre_invoke_failure: Option<result::BrickResult> = None;
371            if let Some(max_input) = brick_manifest.limits.max_input_bytes {
372                if env.len() as u64 > max_input {
373                    let msg =
374                        format!(
375                        "envelope too large for brick '{}': {} bytes > limits.max_input_bytes {}",
376                        brick_key.0, env.len(), max_input,
377                    );
378                    if verbose {
379                        eprintln!("  {msg}");
380                    }
381                    pre_invoke_failure = Some(result::trap_failure("INVALID_INPUT", msg));
382                }
383            }
384
385            if verbose {
386                if pre_invoke_failure.is_some() {
387                    eprintln!(
388                        "[step {}] Skipping invoke for brick '{}' node '{}' (pre-invoke failure)",
389                        step, brick_key.0, task.node_id,
390                    );
391                } else {
392                    eprintln!(
393                        "[step {}] Invoking brick '{}' node '{}' ({} byte envelope)",
394                        step,
395                        brick_key.0,
396                        task.node_id,
397                        env.len(),
398                    );
399                }
400            }
401
402            // Invoke brick (or use pre-invoke failure)
403            let (brick_result, raw_result_bytes, latency_ms) = if let Some(br) = pre_invoke_failure
404            {
405                (br, None, 0.0)
406            } else {
407                let start = std::time::Instant::now();
408                let invoke_result = compiled.invoke(
409                    &env,
410                    brick_manifest.limits.max_mem_mb,
411                    brick_manifest.limits.max_output_bytes,
412                );
413                let latency = start.elapsed().as_secs_f64() * 1000.0;
414
415                match invoke_result {
416                    Ok(result_bytes) => {
417                        if verbose {
418                            eprintln!("  Got {} byte result", result_bytes.len());
419                        }
420                        let decoded = match result::decode_result(&result_bytes) {
421                            Ok(r) => r,
422                            Err(e) => {
423                                let msg = format!("{e:#}");
424                                if verbose {
425                                    eprintln!("  Result rejected: {msg}");
426                                }
427                                result::trap_failure("RUNTIME_REJECTED", msg)
428                            }
429                        };
430                        (decoded, Some(result_bytes), latency)
431                    }
432                    Err(e) => {
433                        let msg = format!("{e:#}");
434                        let m = msg.to_lowercase();
435                        let error_class = if m.contains("alloc returned 0")
436                            || m.contains("oom")
437                            || m.contains("result too large")
438                            || m.contains("memory limit")
439                            || m.contains("fuel")
440                            || m.contains("resource_exceeded")
441                        {
442                            "RESOURCE_EXCEEDED"
443                        } else {
444                            "COMPUTATION_ERROR"
445                        };
446                        if verbose {
447                            eprintln!("  Brick trap: {msg}");
448                        }
449                        (result::trap_failure(error_class, msg), None, latency)
450                    }
451                }
452            };
453
454            // Emit invoke trace record (skip entirely when tracing disabled)
455            if tracing {
456                tracer.emit_invoke(
457                    &trace_id,
458                    &session_id,
459                    step,
460                    &self.graph.graph_id,
461                    &self.graph.graph_version,
462                    &brick_key.0,
463                    &brick_key.1,
464                    &brick_manifest.artifact.digest,
465                    &task.node_id,
466                    &env,
467                    &task.trigger_source_node_id,
468                    task.trigger_source_step,
469                    &task.trigger_edge_id,
470                    &task.trigger_routing_reason,
471                    &brick_result,
472                    raw_result_bytes.as_deref(),
473                    latency_ms,
474                    &now_rfc3339(),
475                );
476            }
477
478            // Update counts
479            match &brick_result {
480                result::BrickResult::Success { .. } => counts.success += 1,
481                result::BrickResult::LowConfidence { .. } => counts.low_confidence += 1,
482                result::BrickResult::Failure { .. } => counts.failure += 1,
483            }
484
485            // Fire hook
486            if let Some(ref mut on_invoke) = hooks.on_invoke {
487                on_invoke(InvokeMetric {
488                    step,
489                    node_id: task.node_id.clone(),
490                    brick_id: brick_key.0.clone(),
491                    result_type: brick_result.result_type().to_string(),
492                    latency_ms,
493                    envelope_bytes: env.len(),
494                    result_bytes: raw_result_bytes.as_ref().map(|b| b.len()),
495                });
496            }
497
498            if verbose {
499                eprintln!("  Result type: {}", brick_result.result_type());
500            }
501
502            // ── Routing ────────────────────────────────────────────
503            let outbound_indices = self
504                .edges_by_source
505                .get(task.node_id.as_str())
506                .cloned()
507                .unwrap_or_default();
508            let outbound: Vec<&manifest::Edge> = outbound_indices
509                .iter()
510                .map(|&i| &self.graph.edges[i])
511                .collect();
512
513            let output_confidence = brick_result.output().and_then(mapping::extract_confidence);
514
515            let routed = router::route(&outbound, &brick_result, output_confidence);
516
517            if routed.is_empty() {
518                if verbose {
519                    eprintln!("  Terminal node (no outbound edges dispatched)");
520                }
521                terminals.push(TerminalResult {
522                    node_id: task.node_id.clone(),
523                    brick_id: brick_key.0.clone(),
524                    step,
525                    result: brick_result,
526                });
527            } else {
528                for routed_edge in &routed {
529                    let edge_idx = self
530                        .edge_by_id
531                        .get(routed_edge.edge_id.as_str())
532                        .with_context(|| {
533                            format!("routed edge '{}' not found in graph", routed_edge.edge_id)
534                        })?;
535                    let edge_def = &self.graph.edges[*edge_idx];
536
537                    let mapped_input = if edge_def.mapping.is_empty() {
538                        match brick_result.output() {
539                            Some(output) => {
540                                serde_json::json!({ "input": mapping::cbor_to_json(output) })
541                            }
542                            None => serde_json::json!({ "input": null }),
543                        }
544                    } else {
545                        let source_root = match brick_result.output() {
546                            Some(output) => CborValue::Map(vec![(
547                                CborValue::Text("output".to_string()),
548                                output.clone(),
549                            )]),
550                            None => CborValue::Map(vec![]),
551                        };
552
553                        let mut target_cbor = CborValue::Map(vec![]);
554                        for fm in &edge_def.mapping {
555                            let resolved = mapping::resolve_path(&source_root, &fm.from)
556                                .with_context(|| {
557                                    format!(
558                                        "mapping '{}' → '{}': source path '{}' not found in output",
559                                        fm.from, fm.to, fm.from,
560                                    )
561                                })?;
562                            let overlay = mapping::set_path(&fm.to, resolved);
563                            target_cbor = mapping::merge_maps(target_cbor, overlay);
564                        }
565
566                        mapping::cbor_to_json(&target_cbor)
567                    };
568
569                    if verbose {
570                        eprintln!(
571                            "  Route: edge '{}' → node '{}'",
572                            routed_edge.edge_id, routed_edge.target_node,
573                        );
574                    }
575
576                    // Safety budget: max_queued
577                    if queue.len() as u64 >= opts.max_queued {
578                        bail!(
579                            "safety budget exceeded: queue size {} >= max_queued {}",
580                            queue.len(),
581                            opts.max_queued,
582                        );
583                    }
584
585                    queue.push_back(Task {
586                        node_id: routed_edge.target_node.clone(),
587                        input_json: mapped_input,
588                        trigger_source_node_id: task.node_id.clone(),
589                        trigger_source_step: step,
590                        trigger_edge_id: routed_edge.edge_id.clone(),
591                        trigger_routing_reason: "routed".to_string(),
592                    });
593                }
594            }
595
596            step += 1;
597        }
598
599        Ok(ExecutionReport {
600            terminals,
601            total_steps: step,
602            counts,
603        })
604    }
605}