plexus_substrate/activations/lattice/
activation.rs1use super::storage::{LatticeStorage, LatticeStorageConfig};
2use super::types::*;
3use async_stream::stream;
4use futures::Stream;
5use plexus_macros::activation;
6use serde_json::Value;
7use std::sync::Arc;
8
9#[derive(Clone)]
15pub struct Lattice {
16 storage: Arc<LatticeStorage>,
17}
18
19impl Lattice {
20 pub async fn new(config: LatticeStorageConfig) -> Result<Self, String> {
21 let storage = LatticeStorage::new(config).await?;
22 Ok(Self {
23 storage: Arc::new(storage),
24 })
25 }
26
27 pub fn storage(&self) -> Arc<LatticeStorage> {
29 self.storage.clone()
30 }
31}
32
33#[plexus_macros::activation(namespace = "lattice",
34version = "1.0.0",
35description = "DAG execution engine — manages graph topology and drives topological execution", crate_path = "plexus_core")]
36impl Lattice {
37 #[plexus_macros::method(params(
39 metadata = "Arbitrary metadata to attach to this graph"
40 ))]
41 async fn create(
42 &self,
43 metadata: Value,
44 ) -> impl Stream<Item = CreateResult> + Send + 'static {
45 let storage = self.storage.clone();
46 stream! {
47 match storage.create_graph(metadata).await {
48 Ok(graph_id) => yield CreateResult::Ok { graph_id },
49 Err(e) => yield CreateResult::Err { message: e },
50 }
51 }
52 }
53
54 #[plexus_macros::method(params(
59 graph_id = "ID of the graph to add the node to",
60 spec = "Node specification: typed enum (task/scatter/gather/subgraph)",
61 node_id = "Optional node ID hint; a UUID is generated if not provided"
62 ))]
63 async fn add_node(
64 &self,
65 graph_id: GraphId,
66 spec: NodeSpec,
67 node_id: Option<NodeId>,
68 ) -> impl Stream<Item = AddNodeResult> + Send + 'static {
69 let storage = self.storage.clone();
70 stream! {
71 match storage.add_node(&graph_id, node_id, &spec).await {
72 Ok(node_id) => yield AddNodeResult::Ok { node_id },
73 Err(e) => yield AddNodeResult::Err { message: e },
74 }
75 }
76 }
77
78 #[plexus_macros::method(params(
83 graph_id = "ID of the graph",
84 from_node_id = "Predecessor node — must complete before to_node becomes ready",
85 to_node_id = "Dependent node — becomes ready when all predecessors are complete",
86 condition = "Optional edge condition: filter tokens by color (null = pass any)"
87 ))]
88 async fn add_edge(
89 &self,
90 graph_id: GraphId,
91 from_node_id: NodeId,
92 to_node_id: NodeId,
93 condition: Option<EdgeCondition>,
94 ) -> impl Stream<Item = AddEdgeResult> + Send + 'static {
95 let storage = self.storage.clone();
96 stream! {
97 match storage.add_edge(&graph_id, &from_node_id, &to_node_id, condition.as_ref()).await {
98 Ok(()) => yield AddEdgeResult::Ok,
99 Err(e) => yield AddEdgeResult::Err { message: e },
100 }
101 }
102 }
103
104 #[plexus_macros::method(params(
118 graph_id = "ID of the graph to execute",
119 after_seq = "Cursor for reconnect replay — omit for fresh start, or pass last received seq"
120 ))]
121 async fn execute(
122 &self,
123 graph_id: GraphId,
124 after_seq: Option<u64>,
125 ) -> impl Stream<Item = LatticeEventEnvelope> + Send + 'static {
126 LatticeStorage::execute_stream(self.storage.clone(), graph_id, after_seq)
127 }
128
129 #[plexus_macros::method(params(
134 graph_id = "ID of the graph",
135 node_id = "ID of the completed node",
136 output = "Optional output: Single(token) or Many(tokens) for fan-out"
137 ))]
138 async fn node_complete(
139 &self,
140 graph_id: GraphId,
141 node_id: NodeId,
142 output: Option<NodeOutput>,
143 ) -> impl Stream<Item = NodeUpdateResult> + Send + 'static {
144 let storage = self.storage.clone();
145 stream! {
146 match storage.advance_graph(&graph_id, &node_id, output, None).await {
147 Ok(()) => yield NodeUpdateResult::Ok,
148 Err(e) => yield NodeUpdateResult::Err { message: e },
149 }
150 }
151 }
152
153 #[plexus_macros::method(params(
155 graph_id = "ID of the graph",
156 node_id = "ID of the failed node",
157 error = "Error message describing the failure"
158 ))]
159 async fn node_failed(
160 &self,
161 graph_id: GraphId,
162 node_id: NodeId,
163 error: String,
164 ) -> impl Stream<Item = NodeUpdateResult> + Send + 'static {
165 let storage = self.storage.clone();
166 stream! {
167 match storage.advance_graph(&graph_id, &node_id, None, Some(error)).await {
168 Ok(()) => yield NodeUpdateResult::Ok,
169 Err(e) => yield NodeUpdateResult::Err { message: e },
170 }
171 }
172 }
173
174 #[plexus_macros::method(params(
179 graph_id = "ID of the graph",
180 node_id = "ID of the node to inspect inputs for"
181 ))]
182 async fn get_node_inputs(
183 &self,
184 graph_id: GraphId,
185 node_id: NodeId,
186 ) -> impl Stream<Item = GetNodeInputsResult> + Send + 'static {
187 let storage = self.storage.clone();
188 stream! {
189 let nodes = match storage.get_nodes(&graph_id).await {
191 Ok(ns) => ns,
192 Err(e) => { yield GetNodeInputsResult::Err { message: e }; return; }
193 };
194 if !nodes.iter().any(|n| n.id == node_id) {
195 yield GetNodeInputsResult::Err {
196 message: format!("Node {} not found in graph {}", node_id, graph_id),
197 };
198 return;
199 }
200 match storage.get_node_inputs(&node_id).await {
201 Ok(inputs) => yield GetNodeInputsResult::Ok { inputs },
202 Err(e) => yield GetNodeInputsResult::Err { message: e },
203 }
204 }
205 }
206
207 #[plexus_macros::method(params(
209 graph_id = "ID of the graph to inspect"
210 ))]
211 async fn get(
212 &self,
213 graph_id: GraphId,
214 ) -> impl Stream<Item = GetGraphResult> + Send + 'static {
215 let storage = self.storage.clone();
216 stream! {
217 let graph = match storage.get_graph(&graph_id).await {
218 Ok(g) => g,
219 Err(e) => { yield GetGraphResult::Err { message: e }; return; }
220 };
221 let nodes = match storage.get_nodes(&graph_id).await {
222 Ok(n) => n,
223 Err(e) => { yield GetGraphResult::Err { message: e }; return; }
224 };
225 yield GetGraphResult::Ok { graph, nodes };
226 }
227 }
228
229 #[plexus_macros::method]
231 async fn list(&self) -> impl Stream<Item = ListGraphsResult> + Send + 'static {
232 let storage = self.storage.clone();
233 stream! {
234 match storage.list_graphs().await {
235 Ok(graphs) => yield ListGraphsResult::Ok { graphs },
236 Err(e) => yield ListGraphsResult::Err { message: e },
237 }
238 }
239 }
240
241 #[plexus_macros::method(params(
243 graph_id = "ID of the graph to cancel"
244 ))]
245 async fn cancel(
246 &self,
247 graph_id: GraphId,
248 ) -> impl Stream<Item = CancelResult> + Send + 'static {
249 let storage = self.storage.clone();
250 stream! {
251 match storage.update_graph_status(&graph_id, GraphStatus::Cancelled).await {
252 Ok(()) => yield CancelResult::Ok,
253 Err(e) => yield CancelResult::Err { message: e },
254 }
255 }
256 }
257
258 #[plexus_macros::method(params(
263 parent_id = "ID of the parent graph",
264 metadata = "Arbitrary JSON metadata attached to the graph"
265 ))]
266 async fn create_child_graph(
267 &self,
268 parent_id: String,
269 metadata: Value,
270 ) -> impl Stream<Item = CreateChildGraphResult> + Send + 'static {
271 let storage = self.storage.clone();
272 stream! {
273 match storage.create_child_graph(&parent_id, metadata).await {
274 Ok(graph_id) => yield CreateChildGraphResult::Ok { graph_id },
275 Err(e) => yield CreateChildGraphResult::Err { message: e },
276 }
277 }
278 }
279
280 #[plexus_macros::method(params(
282 parent_id = "ID of the parent graph"
283 ))]
284 async fn get_child_graphs(
285 &self,
286 parent_id: String,
287 ) -> impl Stream<Item = GetChildGraphsResult> + Send + 'static {
288 let storage = self.storage.clone();
289 stream! {
290 match storage.get_child_graphs(&parent_id).await {
291 Ok(graphs) => yield GetChildGraphsResult::Ok { graphs },
292 Err(e) => yield GetChildGraphsResult::Err { message: e },
293 }
294 }
295 }
296}