1use crate::activations::arbor::ArborStorage;
2use crate::activations::lattice::{
3 GatherStrategy, LatticeEvent, LatticeEventEnvelope, LatticeStorage, NodeOutput, NodeSpec,
4 NodeStatus, ResolvedToken, Token, TokenPayload,
5};
6use futures::Stream;
7use serde_json::Value;
8use std::collections::HashMap;
9use std::sync::Arc;
10
11use super::types::{OrchaEdgeDef, OrchaNodeDef, OrchaNodeKind, OrchaNodeSpec};
12
13#[derive(Clone)]
20pub struct GraphRuntime {
21 storage: Arc<LatticeStorage>,
22}
23
24impl GraphRuntime {
25 pub fn new(storage: Arc<LatticeStorage>) -> Self {
26 Self { storage }
27 }
28
29 pub fn storage(&self) -> Arc<LatticeStorage> {
31 self.storage.clone()
32 }
33
34 pub async fn create_graph(&self, metadata: Value) -> Result<OrchaGraph, String> {
36 let graph_id = self.storage.create_graph(metadata).await?;
37 Ok(OrchaGraph {
38 graph_id,
39 storage: self.storage.clone(),
40 })
41 }
42
43 pub async fn create_child_graph(
45 &self,
46 parent_id: &str,
47 metadata: Value,
48 ) -> Result<OrchaGraph, String> {
49 let graph_id = self.storage.create_child_graph(parent_id, metadata).await?;
50 Ok(OrchaGraph {
51 graph_id,
52 storage: self.storage.clone(),
53 })
54 }
55
56 pub async fn build_child_graph(
61 &self,
62 parent_id: &str,
63 metadata: Value,
64 nodes: Vec<OrchaNodeDef>,
65 edges: Vec<OrchaEdgeDef>,
66 ) -> Result<(String, HashMap<String, String>), String> {
67 let graph = self.create_child_graph(parent_id, metadata).await?;
68 let graph_id = graph.graph_id.clone();
69
70 let mut id_map: HashMap<String, String> = HashMap::new();
71 for OrchaNodeDef { id, spec } in nodes {
72 let result = match spec {
73 OrchaNodeSpec::Task { task, max_retries } => graph.add_task(task, max_retries).await,
74 OrchaNodeSpec::Synthesize { task, max_retries } => graph.add_synthesize(task, max_retries).await,
75 OrchaNodeSpec::Validate { command, cwd, max_retries } => graph.add_validate(command, cwd, max_retries).await,
76 OrchaNodeSpec::Gather { strategy } => graph.add_gather(strategy).await,
77 OrchaNodeSpec::Review { prompt } => graph.add_review(prompt).await,
78 OrchaNodeSpec::Plan { task } => graph.add_plan(task).await,
79 };
80 let lattice_id = result.map_err(|e| format!("Failed to add node '{}': {}", id, e))?;
81 id_map.insert(id, lattice_id);
82 }
83
84 for OrchaEdgeDef { from, to } in edges {
85 let dep_id = id_map
86 .get(&from)
87 .ok_or_else(|| format!("Unknown node id in edge.from: '{}'", from))?
88 .clone();
89 let node_id = id_map
90 .get(&to)
91 .ok_or_else(|| format!("Unknown node id in edge.to: '{}'", to))?
92 .clone();
93 graph
94 .depends_on(&node_id, &dep_id)
95 .await
96 .map_err(|e| format!("Failed to add edge {} → {}: {}", from, to, e))?;
97 }
98
99 Ok((graph_id, id_map))
100 }
101
102 pub fn open_graph(&self, graph_id: String) -> OrchaGraph {
104 OrchaGraph {
105 graph_id,
106 storage: self.storage.clone(),
107 }
108 }
109}
110
111#[derive(Clone)]
119pub struct OrchaGraph {
120 pub graph_id: String,
122 storage: Arc<LatticeStorage>,
123}
124
125impl OrchaGraph {
126 pub async fn add_task(&self, task: impl Into<String>, max_retries: Option<u8>) -> Result<String, String> {
130 let kind = OrchaNodeKind::Task { task: task.into(), max_retries };
131 self.add_spec(NodeSpec::Task {
132 data: serde_json::to_value(&kind).map_err(|e| e.to_string())?,
133 handle: None,
134 })
135 .await
136 }
137
138 pub async fn add_synthesize(&self, task: impl Into<String>, max_retries: Option<u8>) -> Result<String, String> {
142 let kind = OrchaNodeKind::Synthesize { task: task.into(), max_retries };
143 self.add_spec(NodeSpec::Task {
144 data: serde_json::to_value(&kind).map_err(|e| e.to_string())?,
145 handle: None,
146 })
147 .await
148 }
149
150 pub async fn add_validate(
154 &self,
155 command: impl Into<String>,
156 cwd: Option<impl Into<String>>,
157 max_retries: Option<u8>,
158 ) -> Result<String, String> {
159 let kind = OrchaNodeKind::Validate {
160 command: command.into(),
161 cwd: cwd.map(|d| d.into()),
162 max_retries,
163 };
164 self.add_spec(NodeSpec::Task {
165 data: serde_json::to_value(&kind).map_err(|e| e.to_string())?,
166 handle: None,
167 })
168 .await
169 }
170
171 pub async fn add_gather(&self, strategy: GatherStrategy) -> Result<String, String> {
173 self.add_spec(NodeSpec::Gather { strategy }).await
174 }
175
176 pub async fn add_subgraph(&self, child_graph_id: impl Into<String>) -> Result<String, String> {
178 self.add_spec(NodeSpec::SubGraph { graph_id: child_graph_id.into() }).await
179 }
180
181 pub fn open_child_graph(&self, graph_id: impl Into<String>) -> OrchaGraph {
183 OrchaGraph { graph_id: graph_id.into(), storage: self.storage.clone() }
184 }
185
186 pub async fn add_plan(&self, task: impl Into<String>) -> Result<String, String> {
191 let kind = OrchaNodeKind::Plan { task: task.into() };
192 self.add_spec(NodeSpec::Task {
193 data: serde_json::to_value(&kind).map_err(|e| e.to_string())?,
194 handle: None,
195 })
196 .await
197 }
198
199 pub async fn add_review(&self, prompt: impl Into<String>) -> Result<String, String> {
201 let kind = OrchaNodeKind::Review { prompt: prompt.into() };
202 self.add_spec(NodeSpec::Task {
203 data: serde_json::to_value(&kind).map_err(|e| e.to_string())?,
204 handle: None,
205 })
206 .await
207 }
208
209 pub async fn depends_on(
216 &self,
217 dependent: &str,
218 dependency: &str,
219 ) -> Result<(), String> {
220 self.storage
221 .add_edge(
222 &self.graph_id,
223 &dependency.to_string(),
224 &dependent.to_string(),
225 None,
226 )
227 .await
228 }
229
230 pub fn watch(
234 &self,
235 after_seq: Option<u64>,
236 ) -> impl Stream<Item = LatticeEventEnvelope> + Send + 'static {
237 LatticeStorage::execute_stream(self.storage.clone(), self.graph_id.clone(), after_seq)
238 }
239
240 pub async fn start_node(&self, node_id: &str) -> Result<(), String> {
242 self.storage
243 .set_node_status(&node_id.to_string(), NodeStatus::Running, None, None)
244 .await?;
245 self.storage
246 .persist_event(&self.graph_id, &LatticeEvent::NodeStarted {
247 node_id: node_id.to_string(),
248 })
249 .await?;
250 self.storage.notify_graph(&self.graph_id);
251 Ok(())
252 }
253
254 pub async fn complete_node(
256 &self,
257 node_id: &str,
258 output: Option<NodeOutput>,
259 ) -> Result<(), String> {
260 self.storage
261 .advance_graph(&self.graph_id, &node_id.to_string(), output, None)
262 .await
263 }
264
265 pub async fn fail_node(&self, node_id: &str, error: String) -> Result<(), String> {
267 self.storage
268 .advance_graph(&self.graph_id, &node_id.to_string(), None, Some(error))
269 .await
270 }
271
272 pub async fn get_inbound_node_ids(&self, node_id: &str) -> Result<Vec<String>, String> {
274 self.storage.get_inbound_edges(&node_id.to_string()).await
275 }
276
277 pub async fn get_node_spec(&self, node_id: &str) -> Result<NodeSpec, String> {
279 self.storage.get_node(&node_id.to_string()).await.map(|n| n.spec)
280 }
281
282 pub async fn get_node_output(&self, node_id: &str) -> Result<Option<NodeOutput>, String> {
284 self.storage.get_node(&node_id.to_string()).await.map(|n| n.output)
285 }
286
287 pub async fn count_nodes(&self) -> Result<usize, String> {
289 self.storage.count_nodes(&self.graph_id).await
290 }
291
292 pub async fn get_terminal_node_ids(&self) -> Result<Vec<String>, String> {
295 let nodes = self.storage.get_nodes(&self.graph_id).await?;
296 Ok(nodes.into_iter()
297 .filter(|n| n.status == NodeStatus::Complete || n.status == NodeStatus::Failed)
298 .map(|n| n.id)
299 .collect())
300 }
301
302 pub async fn get_node_inputs(&self, node_id: &str) -> Result<Vec<Token>, String> {
304 self.storage.get_node_inputs(&node_id.to_string()).await
305 }
306
307 pub async fn get_resolved_inputs(
312 &self,
313 node_id: &str,
314 arbor: &ArborStorage,
315 ) -> Result<Vec<ResolvedToken>, String> {
316 let tokens = self.storage.get_node_inputs(&node_id.to_string()).await?;
317 let mut resolved = Vec::new();
318 for token in tokens {
319 let data = match token.payload {
320 None => None,
321 Some(TokenPayload::Data { value }) => Some(value),
322 Some(TokenPayload::Handle(handle)) => {
323 let text = resolve_handle(arbor, &handle).await?;
324 Some(serde_json::json!({ "text": text }))
325 }
326 };
327 resolved.push(ResolvedToken { color: token.color, data });
328 }
329 Ok(resolved)
330 }
331
332 async fn add_spec(&self, spec: NodeSpec) -> Result<String, String> {
335 self.storage.add_node(&self.graph_id, None, &spec).await
336 }
337}
338
339use crate::activations::arbor::{ArborId, NodeType};
342
343pub(crate) async fn resolve_handle(
345 arbor: &ArborStorage,
346 handle: &plexus_core::types::Handle,
347) -> Result<String, String> {
348 match handle.method.as_str() {
349 "arbor_tree" => {
350 let tree_id_str = handle
351 .meta
352 .first()
353 .ok_or("arbor_tree handle missing tree_id in meta[0]")?;
354 let node_id_str = handle
355 .meta
356 .get(1)
357 .ok_or("arbor_tree handle missing node_id in meta[1]")?;
358
359 let tree_id = ArborId::parse_str(tree_id_str)
360 .map_err(|e| format!("Invalid tree_id in handle meta[0]: {}", e))?;
361 let node_id = ArborId::parse_str(node_id_str)
362 .map_err(|e| format!("Invalid node_id in handle meta[1]: {}", e))?;
363
364 let path = arbor
365 .context_get_path(&tree_id, &node_id)
366 .await
367 .map_err(|e| format!("Failed to resolve arbor_tree handle: {}", e))?;
368
369 let context = path
370 .iter()
371 .filter_map(|node| match &node.data {
372 NodeType::Text { content } => Some(content.as_str()),
373 _ => None,
374 })
375 .collect::<Vec<_>>()
376 .join("\n\n");
377
378 Ok(context)
379 }
380 other => Err(format!("Unknown handle method: {}", other)),
381 }
382}