Skip to main content

plexus_substrate/activations/lattice/
activation.rs

1use super::storage::{LatticeStorage, LatticeStorageConfig};
2use super::types::*;
3use async_stream::stream;
4use futures::Stream;
5use plexus_macros::activation;
6use serde_json::Value;
7use std::sync::Arc;
8
9/// Lattice — DAG execution engine
10///
11/// Manages graph topology and drives topological execution.
12/// Nodes become "ready" when all predecessor nodes are complete.
13/// The caller (e.g. Orcha) interprets node specs and drives actual execution.
14#[derive(Clone)]
15pub struct Lattice {
16    storage: Arc<LatticeStorage>,
17}
18
19impl Lattice {
20    pub async fn new(config: LatticeStorageConfig) -> Result<Self, String> {
21        let storage = LatticeStorage::new(config).await?;
22        Ok(Self {
23            storage: Arc::new(storage),
24        })
25    }
26
27    /// Expose the underlying storage for library consumers (e.g. Orcha).
28    pub fn storage(&self) -> Arc<LatticeStorage> {
29        self.storage.clone()
30    }
31}
32
33#[plexus_macros::activation(namespace = "lattice",
34version = "1.0.0",
35description = "DAG execution engine — manages graph topology and drives topological execution", crate_path = "plexus_core")]
36impl Lattice {
37    /// Create an empty graph
38    #[plexus_macros::method(params(
39        metadata = "Arbitrary metadata to attach to this graph"
40    ))]
41    async fn create(
42        &self,
43        metadata: Value,
44    ) -> impl Stream<Item = CreateResult> + Send + 'static {
45        let storage = self.storage.clone();
46        stream! {
47            match storage.create_graph(metadata).await {
48                Ok(graph_id) => yield CreateResult::Ok { graph_id },
49                Err(e) => yield CreateResult::Err { message: e },
50            }
51        }
52    }
53
54    /// Add a node to the graph
55    ///
56    /// spec carries the typed node execution semantics (Task, Scatter, Gather, SubGraph).
57    /// node_id is optional — a UUID is generated if not provided.
58    #[plexus_macros::method(params(
59        graph_id = "ID of the graph to add the node to",
60        spec = "Node specification: typed enum (task/scatter/gather/subgraph)",
61        node_id = "Optional node ID hint; a UUID is generated if not provided"
62    ))]
63    async fn add_node(
64        &self,
65        graph_id: GraphId,
66        spec: NodeSpec,
67        node_id: Option<NodeId>,
68    ) -> impl Stream<Item = AddNodeResult> + Send + 'static {
69        let storage = self.storage.clone();
70        stream! {
71            match storage.add_node(&graph_id, node_id, &spec).await {
72                Ok(node_id) => yield AddNodeResult::Ok { node_id },
73                Err(e) => yield AddNodeResult::Err { message: e },
74            }
75        }
76    }
77
78    /// Add a dependency edge: to_node waits for from_node to complete
79    ///
80    /// condition optionally filters which token colors are routed on this edge.
81    /// None (default) passes any token; Some(color) routes only matching-color tokens.
82    #[plexus_macros::method(params(
83        graph_id = "ID of the graph",
84        from_node_id = "Predecessor node — must complete before to_node becomes ready",
85        to_node_id = "Dependent node — becomes ready when all predecessors are complete",
86        condition = "Optional edge condition: filter tokens by color (null = pass any)"
87    ))]
88    async fn add_edge(
89        &self,
90        graph_id: GraphId,
91        from_node_id: NodeId,
92        to_node_id: NodeId,
93        condition: Option<EdgeCondition>,
94    ) -> impl Stream<Item = AddEdgeResult> + Send + 'static {
95        let storage = self.storage.clone();
96        stream! {
97            match storage.add_edge(&graph_id, &from_node_id, &to_node_id, condition.as_ref()).await {
98                Ok(()) => yield AddEdgeResult::Ok,
99                Err(e) => yield AddEdgeResult::Err { message: e },
100            }
101        }
102    }
103
104    /// Start execution — long-lived stream of sequenced events.
105    ///
106    /// **Fresh start** (`after_seq` omitted, graph is Pending):
107    /// Seeds root nodes as Ready, persists NodeReady events, then streams live.
108    ///
109    /// **Reconnect** (`after_seq = <last seq received>`):
110    /// Replays every event that occurred after that sequence number, then streams live.
111    /// Pass the last `seq` from a `LatticeEventEnvelope` you successfully processed.
112    ///
113    /// **Replay from beginning** (`after_seq = 0`, or omitted on an already-Running graph):
114    /// Replays the complete event history then streams live.
115    ///
116    /// The stream closes when `GraphDone` or `GraphFailed` is emitted.
117    #[plexus_macros::method(params(
118        graph_id = "ID of the graph to execute",
119        after_seq = "Cursor for reconnect replay — omit for fresh start, or pass last received seq"
120    ))]
121    async fn execute(
122        &self,
123        graph_id: GraphId,
124        after_seq: Option<u64>,
125    ) -> impl Stream<Item = LatticeEventEnvelope> + Send + 'static {
126        LatticeStorage::execute_stream(self.storage.clone(), graph_id, after_seq)
127    }
128
129    /// Signal that a node finished successfully
130    ///
131    /// output carries typed token(s) to route to successor nodes.
132    /// Triggers NodeReady for any newly unblocked successors.
133    #[plexus_macros::method(params(
134        graph_id = "ID of the graph",
135        node_id = "ID of the completed node",
136        output = "Optional output: Single(token) or Many(tokens) for fan-out"
137    ))]
138    async fn node_complete(
139        &self,
140        graph_id: GraphId,
141        node_id: NodeId,
142        output: Option<NodeOutput>,
143    ) -> impl Stream<Item = NodeUpdateResult> + Send + 'static {
144        let storage = self.storage.clone();
145        stream! {
146            match storage.advance_graph(&graph_id, &node_id, output, None).await {
147                Ok(()) => yield NodeUpdateResult::Ok,
148                Err(e) => yield NodeUpdateResult::Err { message: e },
149            }
150        }
151    }
152
153    /// Signal that a node failed — triggers GraphFailed
154    #[plexus_macros::method(params(
155        graph_id = "ID of the graph",
156        node_id = "ID of the failed node",
157        error = "Error message describing the failure"
158    ))]
159    async fn node_failed(
160        &self,
161        graph_id: GraphId,
162        node_id: NodeId,
163        error: String,
164    ) -> impl Stream<Item = NodeUpdateResult> + Send + 'static {
165        let storage = self.storage.clone();
166        stream! {
167            match storage.advance_graph(&graph_id, &node_id, None, Some(error)).await {
168                Ok(()) => yield NodeUpdateResult::Ok,
169                Err(e) => yield NodeUpdateResult::Err { message: e },
170            }
171        }
172    }
173
174    /// Get raw input tokens for a node — what arrived on all inbound edges.
175    ///
176    /// Returns Token { color, payload: Data { value } | Handle | None }.
177    /// Callers that need handle resolution should use Orcha's resolve_node_inputs instead.
178    #[plexus_macros::method(params(
179        graph_id = "ID of the graph",
180        node_id = "ID of the node to inspect inputs for"
181    ))]
182    async fn get_node_inputs(
183        &self,
184        graph_id: GraphId,
185        node_id: NodeId,
186    ) -> impl Stream<Item = GetNodeInputsResult> + Send + 'static {
187        let storage = self.storage.clone();
188        stream! {
189            // Validate node belongs to graph
190            let nodes = match storage.get_nodes(&graph_id).await {
191                Ok(ns) => ns,
192                Err(e) => { yield GetNodeInputsResult::Err { message: e }; return; }
193            };
194            if !nodes.iter().any(|n| n.id == node_id) {
195                yield GetNodeInputsResult::Err {
196                    message: format!("Node {} not found in graph {}", node_id, graph_id),
197                };
198                return;
199            }
200            match storage.get_node_inputs(&node_id).await {
201                Ok(inputs) => yield GetNodeInputsResult::Ok { inputs },
202                Err(e) => yield GetNodeInputsResult::Err { message: e },
203            }
204        }
205    }
206
207    /// Get graph state and all its nodes
208    #[plexus_macros::method(params(
209        graph_id = "ID of the graph to inspect"
210    ))]
211    async fn get(
212        &self,
213        graph_id: GraphId,
214    ) -> impl Stream<Item = GetGraphResult> + Send + 'static {
215        let storage = self.storage.clone();
216        stream! {
217            let graph = match storage.get_graph(&graph_id).await {
218                Ok(g) => g,
219                Err(e) => { yield GetGraphResult::Err { message: e }; return; }
220            };
221            let nodes = match storage.get_nodes(&graph_id).await {
222                Ok(n) => n,
223                Err(e) => { yield GetGraphResult::Err { message: e }; return; }
224            };
225            yield GetGraphResult::Ok { graph, nodes };
226        }
227    }
228
229    /// List all graphs
230    #[plexus_macros::method]
231    async fn list(&self) -> impl Stream<Item = ListGraphsResult> + Send + 'static {
232        let storage = self.storage.clone();
233        stream! {
234            match storage.list_graphs().await {
235                Ok(graphs) => yield ListGraphsResult::Ok { graphs },
236                Err(e) => yield ListGraphsResult::Err { message: e },
237            }
238        }
239    }
240
241    /// Cancel a running graph
242    #[plexus_macros::method(params(
243        graph_id = "ID of the graph to cancel"
244    ))]
245    async fn cancel(
246        &self,
247        graph_id: GraphId,
248    ) -> impl Stream<Item = CancelResult> + Send + 'static {
249        let storage = self.storage.clone();
250        stream! {
251            match storage.update_graph_status(&graph_id, GraphStatus::Cancelled).await {
252                Ok(()) => yield CancelResult::Ok,
253                Err(e) => yield CancelResult::Err { message: e },
254            }
255        }
256    }
257
258    /// Add a SubGraph node — when dispatched, runs the child graph to completion.
259    ///
260    /// On child success, the parent node receives `{"child_graph_id": "..."}` as output.
261    /// On child failure, the parent node is failed (error edge fires if present).
262    #[plexus_macros::method(params(
263        parent_id = "ID of the parent graph",
264        metadata = "Arbitrary JSON metadata attached to the graph"
265    ))]
266    async fn create_child_graph(
267        &self,
268        parent_id: String,
269        metadata: Value,
270    ) -> impl Stream<Item = CreateChildGraphResult> + Send + 'static {
271        let storage = self.storage.clone();
272        stream! {
273            match storage.create_child_graph(&parent_id, metadata).await {
274                Ok(graph_id) => yield CreateChildGraphResult::Ok { graph_id },
275                Err(e) => yield CreateChildGraphResult::Err { message: e },
276            }
277        }
278    }
279
280    /// List all child graphs of a parent graph
281    #[plexus_macros::method(params(
282        parent_id = "ID of the parent graph"
283    ))]
284    async fn get_child_graphs(
285        &self,
286        parent_id: String,
287    ) -> impl Stream<Item = GetChildGraphsResult> + Send + 'static {
288        let storage = self.storage.clone();
289        stream! {
290            match storage.get_child_graphs(&parent_id).await {
291                Ok(graphs) => yield GetChildGraphsResult::Ok { graphs },
292                Err(e) => yield GetChildGraphsResult::Err { message: e },
293            }
294        }
295    }
296}