Skip to main content

plexus_substrate/activations/lattice/
types.rs

1use plexus_core::types::Handle;
2use schemars::JsonSchema;
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5
6pub type GraphId = String;
7pub type NodeId = String;
8
9// ─── Token Model ──────────────────────────────────────────────────────────────
10
11/// Token color — the routing discriminant (Petri net "color").
12/// Ok/Error are lattice primitives; Named is application vocabulary.
13#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
14#[serde(tag = "type", rename_all = "snake_case")]
15pub enum TokenColor {
16    Ok,
17    Error,
18    Named { name: String },
19}
20
21impl Default for TokenColor {
22    fn default() -> Self {
23        TokenColor::Ok
24    }
25}
26
27/// Token payload — data OR a handle, never both.
28/// A token can also carry no payload (color-only signal for pure routing).
29#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
30#[serde(tag = "type", rename_all = "snake_case")]
31pub enum TokenPayload {
32    Data { value: Value },
33    Handle(Handle),
34}
35
36/// Atomic unit flowing on edges.
37/// payload is optional — a token may be a color-only routing signal.
38#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
39pub struct Token {
40    #[serde(default)]
41    pub color: TokenColor,
42    pub payload: Option<TokenPayload>,
43}
44
45impl Token {
46    pub fn ok() -> Self {
47        Self { color: TokenColor::Ok, payload: None }
48    }
49
50    pub fn ok_data(data: Value) -> Self {
51        Self {
52            color: TokenColor::Ok,
53            payload: Some(TokenPayload::Data { value: data }),
54        }
55    }
56
57    pub fn ok_handle(handle: Handle) -> Self {
58        Self {
59            color: TokenColor::Ok,
60            payload: Some(TokenPayload::Handle(handle)),
61        }
62    }
63
64    pub fn error(message: String) -> Self {
65        Self {
66            color: TokenColor::Error,
67            payload: Some(TokenPayload::Data {
68                value: serde_json::json!({ "message": message }),
69            }),
70        }
71    }
72}
73
74/// Output produced when completing a node.
75/// Many triggers fan-out — one downstream execution per token (used by Scatter).
76#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
77#[serde(tag = "type", rename_all = "snake_case")]
78pub enum NodeOutput {
79    Single(Token),
80    Many { tokens: Vec<Token> },
81}
82
83impl NodeOutput {
84    pub fn tokens(&self) -> Vec<&Token> {
85        match self {
86            NodeOutput::Single(t) => vec![t],
87            NodeOutput::Many { tokens } => tokens.iter().collect(),
88        }
89    }
90}
91
92/// A token with its payload fully resolved to an inline Value.
93/// Handles have been fetched from their backing store server-side.
94#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
95pub struct ResolvedToken {
96    pub color: TokenColor,
97    pub data: Option<Value>,
98}
99
100// ─── Graph Structure ──────────────────────────────────────────────────────────
101
102/// How a node becomes enabled by its predecessors.
103#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
104#[serde(rename_all = "snake_case")]
105pub enum JoinType {
106    #[default]
107    All, // AND-join: every inbound edge must deliver a token
108    Any, // OR-join: any inbound edge token is enough
109}
110
111/// What to produce from a Gather node (auto-executed by the engine).
112#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
113#[serde(tag = "type", rename_all = "snake_case")]
114pub enum GatherStrategy {
115    All,
116    First { n: usize },
117}
118
119/// Node execution semantics (the "place type" in colored Petri net terms).
120#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
121#[serde(tag = "type", rename_all = "snake_case")]
122pub enum NodeSpec {
123    /// Caller-executed: engine emits NodeReady, caller drives and reports output.
124    Task { data: Value, handle: Option<Handle> },
125
126    /// Like Task but expected to produce NodeOutput::Many for fan-out.
127    Scatter { data: Value, handle: Option<Handle> },
128
129    /// Engine-executed: collects inbound tokens per strategy, produces Many output.
130    Gather { strategy: GatherStrategy },
131
132    /// Engine-executed: launch nested graph. Not yet implemented (reserved).
133    SubGraph { graph_id: String },
134}
135
136/// Edge condition — filter tokens by color.
137/// None = pass any token; Some(color) = only route if token.color == color.
138#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
139pub struct EdgeCondition(pub Option<TokenColor>);
140
141impl EdgeCondition {
142    pub fn matches(&self, color: &TokenColor) -> bool {
143        match &self.0 {
144            None => true,
145            Some(c) => c == color,
146        }
147    }
148}
149
150// ─── Node / Graph Status ──────────────────────────────────────────────────────
151
152#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
153#[serde(rename_all = "snake_case")]
154pub enum NodeStatus {
155    Pending,
156    Ready,
157    Running,
158    Complete,
159    Failed,
160}
161
162impl std::fmt::Display for NodeStatus {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        match self {
165            NodeStatus::Pending => write!(f, "pending"),
166            NodeStatus::Ready => write!(f, "ready"),
167            NodeStatus::Running => write!(f, "running"),
168            NodeStatus::Complete => write!(f, "complete"),
169            NodeStatus::Failed => write!(f, "failed"),
170        }
171    }
172}
173
174impl std::str::FromStr for NodeStatus {
175    type Err = String;
176
177    fn from_str(s: &str) -> Result<Self, Self::Err> {
178        match s {
179            "pending" => Ok(NodeStatus::Pending),
180            "ready" => Ok(NodeStatus::Ready),
181            "running" => Ok(NodeStatus::Running),
182            "complete" => Ok(NodeStatus::Complete),
183            "failed" => Ok(NodeStatus::Failed),
184            other => Err(format!("Unknown node status: {}", other)),
185        }
186    }
187}
188
189#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
190#[serde(rename_all = "snake_case")]
191pub enum GraphStatus {
192    Pending,
193    Running,
194    Complete,
195    Failed,
196    Cancelled,
197}
198
199impl std::fmt::Display for GraphStatus {
200    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201        match self {
202            GraphStatus::Pending => write!(f, "pending"),
203            GraphStatus::Running => write!(f, "running"),
204            GraphStatus::Complete => write!(f, "complete"),
205            GraphStatus::Failed => write!(f, "failed"),
206            GraphStatus::Cancelled => write!(f, "cancelled"),
207        }
208    }
209}
210
211impl std::str::FromStr for GraphStatus {
212    type Err = String;
213
214    fn from_str(s: &str) -> Result<Self, Self::Err> {
215        match s {
216            "pending" => Ok(GraphStatus::Pending),
217            "running" => Ok(GraphStatus::Running),
218            "complete" => Ok(GraphStatus::Complete),
219            "failed" => Ok(GraphStatus::Failed),
220            "cancelled" => Ok(GraphStatus::Cancelled),
221            other => Err(format!("Unknown graph status: {}", other)),
222        }
223    }
224}
225
226// ─── Graph / Node Data Models ─────────────────────────────────────────────────
227
228#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
229pub struct LatticeNode {
230    pub id: NodeId,
231    pub graph_id: GraphId,
232    pub spec: NodeSpec,
233    pub status: NodeStatus,
234    pub join_type: JoinType,
235    pub output: Option<NodeOutput>,
236    pub error: Option<String>,
237    pub created_at: i64,
238    pub completed_at: Option<i64>,
239}
240
241#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
242pub struct LatticeGraph {
243    pub id: GraphId,
244    pub metadata: Value,
245    pub status: GraphStatus,
246    pub created_at: i64,
247    pub node_count: usize,
248    pub edge_count: usize,
249    pub parent_graph_id: Option<String>,
250}
251
252// ─── Events ───────────────────────────────────────────────────────────────────
253
254/// Events emitted by the execute() stream
255#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
256#[serde(tag = "type", rename_all = "snake_case")]
257pub enum LatticeEvent {
258    NodeReady { node_id: NodeId, spec: NodeSpec },
259    NodeStarted { node_id: NodeId },
260    NodeDone { node_id: NodeId, output: Option<NodeOutput> },
261    NodeFailed { node_id: NodeId, error: String },
262    GraphDone { graph_id: GraphId },
263    GraphFailed { graph_id: GraphId, node_id: NodeId, error: String },
264}
265
266/// An event paired with its durable sequence number.
267///
268/// Callers should persist the last `seq` they successfully processed.
269/// On reconnect, pass it as `after_seq` to `execute()` to replay everything
270/// that happened while disconnected — no gaps, correct ordering.
271#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
272pub struct LatticeEventEnvelope {
273    /// Monotonically increasing sequence number assigned at persistence time.
274    pub seq: u64,
275    pub event: LatticeEvent,
276}
277
278// ─── Result Types ─────────────────────────────────────────────────────────────
279
280#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
281#[serde(tag = "type", rename_all = "snake_case")]
282pub enum CreateResult {
283    Ok { graph_id: GraphId },
284    Err { message: String },
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
288#[serde(tag = "type", rename_all = "snake_case")]
289pub enum AddNodeResult {
290    Ok { node_id: NodeId },
291    Err { message: String },
292}
293
294#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
295#[serde(tag = "type", rename_all = "snake_case")]
296pub enum AddEdgeResult {
297    Ok,
298    Err { message: String },
299}
300
301#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
302#[serde(tag = "type", rename_all = "snake_case")]
303pub enum NodeUpdateResult {
304    Ok,
305    Err { message: String },
306}
307
308#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
309#[serde(tag = "type", rename_all = "snake_case")]
310pub enum CancelResult {
311    Ok,
312    Err { message: String },
313}
314
315#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
316#[serde(tag = "type", rename_all = "snake_case")]
317pub enum GetGraphResult {
318    Ok { graph: LatticeGraph, nodes: Vec<LatticeNode> },
319    Err { message: String },
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
323#[serde(tag = "type", rename_all = "snake_case")]
324pub enum ListGraphsResult {
325    Ok { graphs: Vec<LatticeGraph> },
326    Err { message: String },
327}
328
329#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
330#[serde(tag = "type", rename_all = "snake_case")]
331pub enum GetNodeInputsResult {
332    Ok { inputs: Vec<Token> },
333    Err { message: String },
334}
335
336#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
337#[serde(tag = "type", rename_all = "snake_case")]
338pub enum CreateChildGraphResult {
339    Ok { graph_id: GraphId },
340    Err { message: String },
341}
342
343#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
344#[serde(tag = "type", rename_all = "snake_case")]
345pub enum GetChildGraphsResult {
346    Ok { graphs: Vec<LatticeGraph> },
347    Err { message: String },
348}