Skip to main content

plexus_substrate/activations/orcha/
graph_runtime.rs

1use crate::activations::arbor::ArborStorage;
2use crate::activations::lattice::{
3    GatherStrategy, LatticeEvent, LatticeEventEnvelope, LatticeStorage, NodeOutput, NodeSpec,
4    NodeStatus, ResolvedToken, Token, TokenPayload,
5};
6use futures::Stream;
7use serde_json::Value;
8use std::collections::HashMap;
9use std::sync::Arc;
10
11use super::types::{OrchaEdgeDef, OrchaNodeDef, OrchaNodeKind, OrchaNodeSpec};
12
13// ─── GraphRuntime (factory) ───────────────────────────────────────────────────
14
15/// Orcha's interface to the graph execution engine.
16///
17/// Serves as the factory for `OrchaGraph` handles.  The lattice backend is
18/// an implementation detail — callers only ever see `OrchaGraph`.
19#[derive(Clone)]
20pub struct GraphRuntime {
21    storage: Arc<LatticeStorage>,
22}
23
24impl GraphRuntime {
25    pub fn new(storage: Arc<LatticeStorage>) -> Self {
26        Self { storage }
27    }
28
29    /// Expose the underlying lattice storage.
30    pub fn storage(&self) -> Arc<LatticeStorage> {
31        self.storage.clone()
32    }
33
34    /// Create a new execution graph.
35    pub async fn create_graph(&self, metadata: Value) -> Result<OrchaGraph, String> {
36        let graph_id = self.storage.create_graph(metadata).await?;
37        Ok(OrchaGraph {
38            graph_id,
39            storage: self.storage.clone(),
40        })
41    }
42
43    /// Create a new execution graph as a child of an existing graph.
44    pub async fn create_child_graph(
45        &self,
46        parent_id: &str,
47        metadata: Value,
48    ) -> Result<OrchaGraph, String> {
49        let graph_id = self.storage.create_child_graph(parent_id, metadata).await?;
50        Ok(OrchaGraph {
51            graph_id,
52            storage: self.storage.clone(),
53        })
54    }
55
56    /// Build a child graph from node+edge definitions.
57    ///
58    /// Creates the child graph under `parent_id`, adds all nodes, and wires all edges.
59    /// Returns `(child_graph_id, ticket_id→lattice_node_id map)` on success.
60    pub async fn build_child_graph(
61        &self,
62        parent_id: &str,
63        metadata: Value,
64        nodes: Vec<OrchaNodeDef>,
65        edges: Vec<OrchaEdgeDef>,
66    ) -> Result<(String, HashMap<String, String>), String> {
67        let graph = self.create_child_graph(parent_id, metadata).await?;
68        let graph_id = graph.graph_id.clone();
69
70        let mut id_map: HashMap<String, String> = HashMap::new();
71        for OrchaNodeDef { id, spec } in nodes {
72            let result = match spec {
73                OrchaNodeSpec::Task { task, max_retries } => graph.add_task(task, max_retries).await,
74                OrchaNodeSpec::Synthesize { task, max_retries } => graph.add_synthesize(task, max_retries).await,
75                OrchaNodeSpec::Validate { command, cwd, max_retries } => graph.add_validate(command, cwd, max_retries).await,
76                OrchaNodeSpec::Gather { strategy } => graph.add_gather(strategy).await,
77                OrchaNodeSpec::Review { prompt } => graph.add_review(prompt).await,
78                OrchaNodeSpec::Plan { task } => graph.add_plan(task).await,
79            };
80            let lattice_id = result.map_err(|e| format!("Failed to add node '{}': {}", id, e))?;
81            id_map.insert(id, lattice_id);
82        }
83
84        for OrchaEdgeDef { from, to } in edges {
85            let dep_id = id_map
86                .get(&from)
87                .ok_or_else(|| format!("Unknown node id in edge.from: '{}'", from))?
88                .clone();
89            let node_id = id_map
90                .get(&to)
91                .ok_or_else(|| format!("Unknown node id in edge.to: '{}'", to))?
92                .clone();
93            graph
94                .depends_on(&node_id, &dep_id)
95                .await
96                .map_err(|e| format!("Failed to add edge {} → {}: {}", from, to, e))?;
97        }
98
99        Ok((graph_id, id_map))
100    }
101
102    /// Open an existing graph by ID.
103    pub fn open_graph(&self, graph_id: String) -> OrchaGraph {
104        OrchaGraph {
105            graph_id,
106            storage: self.storage.clone(),
107        }
108    }
109}
110
111// ─── OrchaGraph ───────────────────────────────────────────────────────────────
112
113/// A handle to a single execution graph.
114///
115/// Provides Orcha's typed node-building API and graph-scoped execution control.
116/// No lattice types leak past this boundary — callers work entirely in Orcha
117/// concepts (tasks, validate steps, synthesize steps, review gates).
118#[derive(Clone)]
119pub struct OrchaGraph {
120    /// The lattice graph ID — exposed for logging / monitoring.
121    pub graph_id: String,
122    storage: Arc<LatticeStorage>,
123}
124
125impl OrchaGraph {
126    // ─── Node builders ───────────────────────────────────────────────────────
127
128    /// Add a task node.
129    pub async fn add_task(&self, task: impl Into<String>, max_retries: Option<u8>) -> Result<String, String> {
130        let kind = OrchaNodeKind::Task { task: task.into(), max_retries };
131        self.add_spec(NodeSpec::Task {
132            data: serde_json::to_value(&kind).map_err(|e| e.to_string())?,
133            handle: None,
134        })
135        .await
136    }
137
138    /// Add a synthesize node.
139    ///
140    /// Like task, but graph_runner prepends resolved input tokens as `<prior_work>` context.
141    pub async fn add_synthesize(&self, task: impl Into<String>, max_retries: Option<u8>) -> Result<String, String> {
142        let kind = OrchaNodeKind::Synthesize { task: task.into(), max_retries };
143        self.add_spec(NodeSpec::Task {
144            data: serde_json::to_value(&kind).map_err(|e| e.to_string())?,
145            handle: None,
146        })
147        .await
148    }
149
150    /// Add a validate node.
151    ///
152    /// Orcha runs `command` in a shell inside `cwd` (default `/workspace`).
153    pub async fn add_validate(
154        &self,
155        command: impl Into<String>,
156        cwd: Option<impl Into<String>>,
157        max_retries: Option<u8>,
158    ) -> Result<String, String> {
159        let kind = OrchaNodeKind::Validate {
160            command: command.into(),
161            cwd: cwd.map(|d| d.into()),
162            max_retries,
163        };
164        self.add_spec(NodeSpec::Task {
165            data: serde_json::to_value(&kind).map_err(|e| e.to_string())?,
166            handle: None,
167        })
168        .await
169    }
170
171    /// Add a gather node — engine-executed, auto-fires when all inbound tokens arrive.
172    pub async fn add_gather(&self, strategy: GatherStrategy) -> Result<String, String> {
173        self.add_spec(NodeSpec::Gather { strategy }).await
174    }
175
176    /// Add a SubGraph node — when ready, runs the child graph to completion.
177    pub async fn add_subgraph(&self, child_graph_id: impl Into<String>) -> Result<String, String> {
178        self.add_spec(NodeSpec::SubGraph { graph_id: child_graph_id.into() }).await
179    }
180
181    /// Open a sibling graph by ID sharing the same LatticeStorage.
182    pub fn open_child_graph(&self, graph_id: impl Into<String>) -> OrchaGraph {
183        OrchaGraph { graph_id: graph_id.into(), storage: self.storage.clone() }
184    }
185
186    /// Add a plan node.
187    ///
188    /// When dispatched, runs Claude to produce a ticket file, compiles it into
189    /// a child graph, and executes the child graph inline.
190    pub async fn add_plan(&self, task: impl Into<String>) -> Result<String, String> {
191        let kind = OrchaNodeKind::Plan { task: task.into() };
192        self.add_spec(NodeSpec::Task {
193            data: serde_json::to_value(&kind).map_err(|e| e.to_string())?,
194            handle: None,
195        })
196        .await
197    }
198
199    /// Add a review node.
200    pub async fn add_review(&self, prompt: impl Into<String>) -> Result<String, String> {
201        let kind = OrchaNodeKind::Review { prompt: prompt.into() };
202        self.add_spec(NodeSpec::Task {
203            data: serde_json::to_value(&kind).map_err(|e| e.to_string())?,
204            handle: None,
205        })
206        .await
207    }
208
209    /// Declare that `dependent` waits for `dependency` to complete first.
210    ///
211    /// ```text
212    /// graph.depends_on(&validate_node, &task_node)
213    /// // validate_node will not start until task_node is complete
214    /// ```
215    pub async fn depends_on(
216        &self,
217        dependent: &str,
218        dependency: &str,
219    ) -> Result<(), String> {
220        self.storage
221            .add_edge(
222                &self.graph_id,
223                &dependency.to_string(),
224                &dependent.to_string(),
225                None,
226            )
227            .await
228    }
229
230    // ─── Execution control ───────────────────────────────────────────────────
231
232    /// Watch this graph's event stream.
233    pub fn watch(
234        &self,
235        after_seq: Option<u64>,
236    ) -> impl Stream<Item = LatticeEventEnvelope> + Send + 'static {
237        LatticeStorage::execute_stream(self.storage.clone(), self.graph_id.clone(), after_seq)
238    }
239
240    /// Signal that a node started executing.
241    pub async fn start_node(&self, node_id: &str) -> Result<(), String> {
242        self.storage
243            .set_node_status(&node_id.to_string(), NodeStatus::Running, None, None)
244            .await?;
245        self.storage
246            .persist_event(&self.graph_id, &LatticeEvent::NodeStarted {
247                node_id: node_id.to_string(),
248            })
249            .await?;
250        self.storage.notify_graph(&self.graph_id);
251        Ok(())
252    }
253
254    /// Signal that a node completed successfully, optionally carrying output.
255    pub async fn complete_node(
256        &self,
257        node_id: &str,
258        output: Option<NodeOutput>,
259    ) -> Result<(), String> {
260        self.storage
261            .advance_graph(&self.graph_id, &node_id.to_string(), output, None)
262            .await
263    }
264
265    /// Signal that a node failed.
266    pub async fn fail_node(&self, node_id: &str, error: String) -> Result<(), String> {
267        self.storage
268            .advance_graph(&self.graph_id, &node_id.to_string(), None, Some(error))
269            .await
270    }
271
272    /// Get IDs of nodes that have an edge pointing into `node_id`.
273    pub async fn get_inbound_node_ids(&self, node_id: &str) -> Result<Vec<String>, String> {
274        self.storage.get_inbound_edges(&node_id.to_string()).await
275    }
276
277    /// Get the spec of an existing node.
278    pub async fn get_node_spec(&self, node_id: &str) -> Result<NodeSpec, String> {
279        self.storage.get_node(&node_id.to_string()).await.map(|n| n.spec)
280    }
281
282    /// Get the stored output of a completed node.
283    pub async fn get_node_output(&self, node_id: &str) -> Result<Option<NodeOutput>, String> {
284        self.storage.get_node(&node_id.to_string()).await.map(|n| n.output)
285    }
286
287    /// Count the total number of nodes in this graph.
288    pub async fn count_nodes(&self) -> Result<usize, String> {
289        self.storage.count_nodes(&self.graph_id).await
290    }
291
292    /// Get the IDs of nodes that have already reached a terminal state (Complete or Failed).
293    /// Used by run_graph_execution to pre-populate the dispatched set on reconnect.
294    pub async fn get_terminal_node_ids(&self) -> Result<Vec<String>, String> {
295        let nodes = self.storage.get_nodes(&self.graph_id).await?;
296        Ok(nodes.into_iter()
297            .filter(|n| n.status == NodeStatus::Complete || n.status == NodeStatus::Failed)
298            .map(|n| n.id)
299            .collect())
300    }
301
302    /// Get raw input tokens for a node (what arrived on all inbound edges).
303    pub async fn get_node_inputs(&self, node_id: &str) -> Result<Vec<Token>, String> {
304        self.storage.get_node_inputs(&node_id.to_string()).await
305    }
306
307    /// Get input tokens with Handle payloads resolved to inline Values.
308    ///
309    /// Lattice resolves handles server-side via Arbor.
310    /// Returns ResolvedToken { color, data: Option<Value> }.
311    pub async fn get_resolved_inputs(
312        &self,
313        node_id: &str,
314        arbor: &ArborStorage,
315    ) -> Result<Vec<ResolvedToken>, String> {
316        let tokens = self.storage.get_node_inputs(&node_id.to_string()).await?;
317        let mut resolved = Vec::new();
318        for token in tokens {
319            let data = match token.payload {
320                None => None,
321                Some(TokenPayload::Data { value }) => Some(value),
322                Some(TokenPayload::Handle(handle)) => {
323                    let text = resolve_handle(arbor, &handle).await?;
324                    Some(serde_json::json!({ "text": text }))
325                }
326            };
327            resolved.push(ResolvedToken { color: token.color, data });
328        }
329        Ok(resolved)
330    }
331
332    // ─── Internal ────────────────────────────────────────────────────────────
333
334    async fn add_spec(&self, spec: NodeSpec) -> Result<String, String> {
335        self.storage.add_node(&self.graph_id, None, &spec).await
336    }
337}
338
339// ─── Handle Resolution ────────────────────────────────────────────────────────
340
341use crate::activations::arbor::{ArborId, NodeType};
342
343/// Resolve an arbor_tree Handle into a context string.
344pub(crate) async fn resolve_handle(
345    arbor: &ArborStorage,
346    handle: &plexus_core::types::Handle,
347) -> Result<String, String> {
348    match handle.method.as_str() {
349        "arbor_tree" => {
350            let tree_id_str = handle
351                .meta
352                .first()
353                .ok_or("arbor_tree handle missing tree_id in meta[0]")?;
354            let node_id_str = handle
355                .meta
356                .get(1)
357                .ok_or("arbor_tree handle missing node_id in meta[1]")?;
358
359            let tree_id = ArborId::parse_str(tree_id_str)
360                .map_err(|e| format!("Invalid tree_id in handle meta[0]: {}", e))?;
361            let node_id = ArborId::parse_str(node_id_str)
362                .map_err(|e| format!("Invalid node_id in handle meta[1]: {}", e))?;
363
364            let path = arbor
365                .context_get_path(&tree_id, &node_id)
366                .await
367                .map_err(|e| format!("Failed to resolve arbor_tree handle: {}", e))?;
368
369            let context = path
370                .iter()
371                .filter_map(|node| match &node.data {
372                    NodeType::Text { content } => Some(content.as_str()),
373                    _ => None,
374                })
375                .collect::<Vec<_>>()
376                .join("\n\n");
377
378            Ok(context)
379        }
380        other => Err(format!("Unknown handle method: {}", other)),
381    }
382}