Skip to main content

plexus_substrate/activations/lattice/
storage.rs

1use super::types::{
2    EdgeCondition, GatherStrategy, GraphId, GraphStatus, JoinType, LatticeEvent,
3    LatticeEventEnvelope, LatticeGraph, LatticeNode, NodeId, NodeOutput, NodeSpec, NodeStatus,
4    Token, TokenPayload,
5};
6use crate::activation_db_path_from_module;
7use crate::activations::storage::init_sqlite_pool;
8use async_stream::stream;
9use futures::Stream;
10use serde_json::Value;
11use sqlx::{sqlite::SqlitePool, Row};
12use std::collections::HashMap;
13use std::path::PathBuf;
14use std::str::FromStr;
15use std::sync::{Arc, RwLock};
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17use tokio::sync::Notify;
18use uuid::Uuid;
19
20#[derive(Debug, Clone)]
21pub struct LatticeStorageConfig {
22    pub db_path: PathBuf,
23}
24
25impl Default for LatticeStorageConfig {
26    fn default() -> Self {
27        Self {
28            db_path: activation_db_path_from_module!("lattice.db"),
29        }
30    }
31}
32
33pub struct LatticeStorage {
34    pool: SqlitePool,
35    /// Per-graph Notify — wakes the execute() stream when new events are persisted.
36    graph_notifiers: Arc<RwLock<HashMap<GraphId, Arc<Notify>>>>,
37}
38
39impl LatticeStorage {
40    pub async fn new(config: LatticeStorageConfig) -> Result<Self, String> {
41        let pool = init_sqlite_pool(config.db_path).await?;
42        let storage = Self {
43            pool,
44            graph_notifiers: Arc::new(RwLock::new(HashMap::new())),
45        };
46        storage.run_migrations().await?;
47        Ok(storage)
48    }
49
50    async fn run_migrations(&self) -> Result<(), String> {
51        sqlx::query(r#"
52            CREATE TABLE IF NOT EXISTS lattice_graphs (
53                id          TEXT PRIMARY KEY,
54                metadata    TEXT NOT NULL DEFAULT '{}',
55                status      TEXT NOT NULL DEFAULT 'pending',
56                created_at  INTEGER NOT NULL
57            );
58
59            CREATE TABLE IF NOT EXISTS lattice_nodes (
60                id           TEXT PRIMARY KEY,
61                graph_id     TEXT NOT NULL,
62                spec         TEXT NOT NULL,
63                status       TEXT NOT NULL DEFAULT 'pending',
64                output       TEXT,
65                error        TEXT,
66                created_at   INTEGER NOT NULL,
67                completed_at INTEGER,
68                FOREIGN KEY (graph_id) REFERENCES lattice_graphs(id)
69            );
70            CREATE INDEX IF NOT EXISTS idx_lattice_nodes_graph ON lattice_nodes(graph_id);
71
72            CREATE TABLE IF NOT EXISTS lattice_edges (
73                id           TEXT PRIMARY KEY,
74                graph_id     TEXT NOT NULL,
75                from_node_id TEXT NOT NULL,
76                to_node_id   TEXT NOT NULL,
77                FOREIGN KEY (graph_id) REFERENCES lattice_graphs(id)
78            );
79            CREATE INDEX IF NOT EXISTS idx_lattice_edges_graph ON lattice_edges(graph_id);
80            CREATE INDEX IF NOT EXISTS idx_lattice_edges_to    ON lattice_edges(to_node_id);
81
82            -- Durable event log: every LatticeEvent is appended here.
83            -- seq is the cursor callers use for reconnect replay.
84            CREATE TABLE IF NOT EXISTS lattice_events (
85                seq        INTEGER PRIMARY KEY AUTOINCREMENT,
86                graph_id   TEXT NOT NULL,
87                event      TEXT NOT NULL,
88                created_at INTEGER NOT NULL
89            );
90            CREATE INDEX IF NOT EXISTS idx_lattice_events_graph ON lattice_events(graph_id, seq);
91
92            -- Token delivery tracking (Petri net marking)
93            CREATE TABLE IF NOT EXISTS lattice_edge_tokens (
94                edge_id    TEXT NOT NULL,
95                graph_id   TEXT NOT NULL,
96                token      TEXT NOT NULL,
97                seq        INTEGER NOT NULL,
98                PRIMARY KEY (edge_id, seq)
99            );
100            CREATE INDEX IF NOT EXISTS idx_lattice_edge_tokens_graph
101                ON lattice_edge_tokens(graph_id);
102        "#)
103        .execute(&self.pool)
104        .await
105        .map_err(|e| format!("Migration failed: {}", e))?;
106
107        // Add columns to existing tables (ignore if already exists)
108        let _ = sqlx::query("ALTER TABLE lattice_edges ADD COLUMN condition TEXT")
109            .execute(&self.pool).await;
110        let _ = sqlx::query("ALTER TABLE lattice_nodes ADD COLUMN join_type TEXT NOT NULL DEFAULT 'all'")
111            .execute(&self.pool).await;
112        let _ = sqlx::query(
113            "ALTER TABLE lattice_graphs ADD COLUMN parent_graph_id TEXT NULL REFERENCES lattice_graphs(id)"
114        ).execute(&self.pool).await;
115        let _ = sqlx::query(
116            "CREATE INDEX IF NOT EXISTS idx_lattice_graphs_parent ON lattice_graphs(parent_graph_id)"
117        ).execute(&self.pool).await;
118
119        Ok(())
120    }
121
122    // ─── Graph / Node / Edge CRUD ────────────────────────────────────────────
123
124    pub async fn create_graph(&self, metadata: Value) -> Result<GraphId, String> {
125        let id = format!("lattice-{}", Uuid::new_v4());
126        let now = current_timestamp();
127        let metadata_json = serde_json::to_string(&metadata)
128            .map_err(|e| format!("Failed to serialize metadata: {}", e))?;
129
130        sqlx::query(
131            "INSERT INTO lattice_graphs (id, metadata, status, created_at) VALUES (?, ?, 'pending', ?)"
132        )
133        .bind(&id)
134        .bind(&metadata_json)
135        .bind(now)
136        .execute(&self.pool)
137        .await
138        .map_err(|e| format!("Failed to create graph: {}", e))?;
139
140        Ok(id)
141    }
142
143    pub async fn create_child_graph(
144        &self,
145        parent_id: &str,
146        metadata: Value,
147    ) -> Result<String, String> {
148        let id = format!("lattice-{}", Uuid::new_v4());
149        let now = current_timestamp();
150        let metadata_json = serde_json::to_string(&metadata)
151            .map_err(|e| format!("Failed to serialize metadata: {}", e))?;
152
153        sqlx::query(
154            "INSERT INTO lattice_graphs (id, metadata, status, created_at, parent_graph_id) VALUES (?, ?, 'pending', ?, ?)"
155        )
156        .bind(&id)
157        .bind(&metadata_json)
158        .bind(now)
159        .bind(parent_id)
160        .execute(&self.pool)
161        .await
162        .map_err(|e| format!("Failed to create child graph: {}", e))?;
163
164        Ok(id)
165    }
166
167    pub async fn get_child_graphs(&self, parent_id: &str) -> Result<Vec<LatticeGraph>, String> {
168        let rows = sqlx::query(
169            "SELECT id, metadata, status, created_at, parent_graph_id FROM lattice_graphs WHERE parent_graph_id = ? ORDER BY created_at"
170        )
171        .bind(parent_id)
172        .fetch_all(&self.pool)
173        .await
174        .map_err(|e| format!("Failed to fetch child graphs: {}", e))?;
175
176        let mut graphs = Vec::new();
177        for row in rows {
178            let graph_id: String = row.get("id");
179            let node_count: i64 = sqlx::query_scalar(
180                "SELECT COUNT(*) FROM lattice_nodes WHERE graph_id = ?"
181            )
182            .bind(&graph_id)
183            .fetch_one(&self.pool)
184            .await
185            .map_err(|e| format!("Failed to count nodes: {}", e))?;
186
187            let edge_count: i64 = sqlx::query_scalar(
188                "SELECT COUNT(*) FROM lattice_edges WHERE graph_id = ?"
189            )
190            .bind(&graph_id)
191            .fetch_one(&self.pool)
192            .await
193            .map_err(|e| format!("Failed to count edges: {}", e))?;
194
195            graphs.push(self.row_to_graph(row, node_count as usize, edge_count as usize)?);
196        }
197        Ok(graphs)
198    }
199
200    pub async fn add_node(
201        &self,
202        graph_id: &GraphId,
203        node_id_hint: Option<NodeId>,
204        spec: &NodeSpec,
205    ) -> Result<NodeId, String> {
206        let id = node_id_hint.unwrap_or_else(|| Uuid::new_v4().to_string());
207        let now = current_timestamp();
208        let spec_json = serde_json::to_string(spec)
209            .map_err(|e| format!("Failed to serialize spec: {}", e))?;
210
211        sqlx::query(
212            "INSERT INTO lattice_nodes (id, graph_id, spec, status, created_at) VALUES (?, ?, ?, 'pending', ?)"
213        )
214        .bind(&id)
215        .bind(graph_id)
216        .bind(&spec_json)
217        .bind(now)
218        .execute(&self.pool)
219        .await
220        .map_err(|e| format!("Failed to add node: {}", e))?;
221
222        if let Ok(graph_status) = self.get_graph_status(graph_id).await {
223            if graph_status == GraphStatus::Running {
224                let _ = self.check_and_ready(graph_id, &id).await;
225            }
226        }
227
228        Ok(id)
229    }
230
231    pub async fn add_edge(
232        &self,
233        graph_id: &GraphId,
234        from_node_id: &NodeId,
235        to_node_id: &NodeId,
236        condition: Option<&EdgeCondition>,
237    ) -> Result<(), String> {
238        let from_exists: bool = sqlx::query_scalar(
239            "SELECT COUNT(*) > 0 FROM lattice_nodes WHERE id = ? AND graph_id = ?"
240        )
241        .bind(from_node_id)
242        .bind(graph_id)
243        .fetch_one(&self.pool)
244        .await
245        .map_err(|e| format!("Failed to validate from_node: {}", e))?;
246
247        if !from_exists {
248            return Err(format!("Node {} not found in graph {}", from_node_id, graph_id));
249        }
250
251        let to_exists: bool = sqlx::query_scalar(
252            "SELECT COUNT(*) > 0 FROM lattice_nodes WHERE id = ? AND graph_id = ?"
253        )
254        .bind(to_node_id)
255        .bind(graph_id)
256        .fetch_one(&self.pool)
257        .await
258        .map_err(|e| format!("Failed to validate to_node: {}", e))?;
259
260        if !to_exists {
261            return Err(format!("Node {} not found in graph {}", to_node_id, graph_id));
262        }
263
264        let edge_id = Uuid::new_v4().to_string();
265        let condition_json = condition
266            .map(|c| serde_json::to_string(c).map_err(|e| format!("Failed to serialize condition: {}", e)))
267            .transpose()?;
268
269        sqlx::query(
270            "INSERT INTO lattice_edges (id, graph_id, from_node_id, to_node_id, condition) VALUES (?, ?, ?, ?, ?)"
271        )
272        .bind(&edge_id)
273        .bind(graph_id)
274        .bind(from_node_id)
275        .bind(to_node_id)
276        .bind(condition_json.as_deref())
277        .execute(&self.pool)
278        .await
279        .map_err(|e| format!("Failed to add edge: {}", e))?;
280
281        if let Ok(graph_status) = self.get_graph_status(graph_id).await {
282            if graph_status == GraphStatus::Running {
283                if let Ok(src_node) = self.get_node(from_node_id).await {
284                    if src_node.status == NodeStatus::Complete {
285                        let tokens: Vec<Token> = src_node.output.as_ref()
286                            .map(|o| o.tokens().into_iter().cloned().collect())
287                            .unwrap_or_else(|| vec![Token::ok()]);
288                        for token in &tokens {
289                            let matches = condition
290                                .map(|c| c.matches(&token.color))
291                                .unwrap_or(true);
292                            if matches {
293                                let seq = self.count_tokens_on_edge(&edge_id).await? + 1;
294                                self.deliver_token(&edge_id, graph_id, token, seq).await?;
295                            }
296                        }
297                        let _ = self.check_and_ready(graph_id, to_node_id).await;
298                        self.notify_graph(graph_id);
299                    }
300                }
301            }
302        }
303
304        Ok(())
305    }
306
307    pub async fn get_graph(&self, graph_id: &GraphId) -> Result<LatticeGraph, String> {
308        let row = sqlx::query(
309            "SELECT id, metadata, status, created_at, parent_graph_id FROM lattice_graphs WHERE id = ?"
310        )
311        .bind(graph_id)
312        .fetch_optional(&self.pool)
313        .await
314        .map_err(|e| format!("Failed to fetch graph: {}", e))?
315        .ok_or_else(|| format!("Graph not found: {}", graph_id))?;
316
317        let node_count: i64 = sqlx::query_scalar(
318            "SELECT COUNT(*) FROM lattice_nodes WHERE graph_id = ?"
319        )
320        .bind(graph_id)
321        .fetch_one(&self.pool)
322        .await
323        .map_err(|e| format!("Failed to count nodes: {}", e))?;
324
325        let edge_count: i64 = sqlx::query_scalar(
326            "SELECT COUNT(*) FROM lattice_edges WHERE graph_id = ?"
327        )
328        .bind(graph_id)
329        .fetch_one(&self.pool)
330        .await
331        .map_err(|e| format!("Failed to count edges: {}", e))?;
332
333        self.row_to_graph(row, node_count as usize, edge_count as usize)
334    }
335
336    /// Count the total number of nodes in a graph.
337    pub async fn count_nodes(&self, graph_id: &GraphId) -> Result<usize, String> {
338        let count: i64 = sqlx::query_scalar(
339            "SELECT COUNT(*) FROM lattice_nodes WHERE graph_id = ?"
340        )
341        .bind(graph_id)
342        .fetch_one(&self.pool)
343        .await
344        .map_err(|e| format!("Failed to count nodes: {}", e))?;
345
346        Ok(count as usize)
347    }
348
349    pub async fn get_nodes(&self, graph_id: &GraphId) -> Result<Vec<LatticeNode>, String> {
350        let rows = sqlx::query(
351            "SELECT id, graph_id, spec, status, output, error, created_at, completed_at
352             FROM lattice_nodes WHERE graph_id = ? ORDER BY created_at"
353        )
354        .bind(graph_id)
355        .fetch_all(&self.pool)
356        .await
357        .map_err(|e| format!("Failed to fetch nodes: {}", e))?;
358
359        rows.into_iter().map(|row| self.row_to_node(row)).collect()
360    }
361
362    pub async fn get_node(&self, node_id: &NodeId) -> Result<LatticeNode, String> {
363        let row = sqlx::query(
364            "SELECT id, graph_id, spec, status, output, error, created_at, completed_at
365             FROM lattice_nodes WHERE id = ?"
366        )
367        .bind(node_id)
368        .fetch_optional(&self.pool)
369        .await
370        .map_err(|e| format!("Failed to fetch node: {}", e))?
371        .ok_or_else(|| format!("Node not found: {}", node_id))?;
372
373        self.row_to_node(row)
374    }
375
376    pub async fn get_inbound_edges(&self, node_id: &NodeId) -> Result<Vec<NodeId>, String> {
377        let rows = sqlx::query(
378            "SELECT from_node_id FROM lattice_edges WHERE to_node_id = ?"
379        )
380        .bind(node_id)
381        .fetch_all(&self.pool)
382        .await
383        .map_err(|e| format!("Failed to fetch inbound edges: {}", e))?;
384
385        Ok(rows.into_iter().map(|r| r.get::<String, _>("from_node_id")).collect())
386    }
387
388    pub async fn get_outbound_edges(&self, node_id: &NodeId) -> Result<Vec<NodeId>, String> {
389        let rows = sqlx::query(
390            "SELECT to_node_id FROM lattice_edges WHERE from_node_id = ?"
391        )
392        .bind(node_id)
393        .fetch_all(&self.pool)
394        .await
395        .map_err(|e| format!("Failed to fetch outbound edges: {}", e))?;
396
397        Ok(rows.into_iter().map(|r| r.get::<String, _>("to_node_id")).collect())
398    }
399
400    /// Returns (edge_id, to_node_id, condition) for each outbound edge.
401    async fn get_outbound_edges_with_conditions(
402        &self,
403        node_id: &NodeId,
404    ) -> Result<Vec<(String, NodeId, Option<EdgeCondition>)>, String> {
405        let rows = sqlx::query(
406            "SELECT id, to_node_id, condition FROM lattice_edges WHERE from_node_id = ?"
407        )
408        .bind(node_id)
409        .fetch_all(&self.pool)
410        .await
411        .map_err(|e| format!("Failed to fetch outbound edges: {}", e))?;
412
413        rows.into_iter()
414            .map(|row| {
415                let edge_id: String = row.get("id");
416                let to_node_id: String = row.get("to_node_id");
417                let condition_json: Option<String> = row.get("condition");
418                let condition = condition_json
419                    .as_deref()
420                    .map(|s| {
421                        serde_json::from_str::<EdgeCondition>(s)
422                            .map_err(|e| format!("Failed to deserialize edge condition: {}", e))
423                    })
424                    .transpose()?;
425                Ok((edge_id, to_node_id, condition))
426            })
427            .collect()
428    }
429
430    pub async fn get_nodes_with_no_predecessors(
431        &self,
432        graph_id: &GraphId,
433    ) -> Result<Vec<LatticeNode>, String> {
434        // Exclude self-loop edges when determining root nodes
435        let rows = sqlx::query(
436            "SELECT id, graph_id, spec, status, output, error, created_at, completed_at
437             FROM lattice_nodes
438             WHERE graph_id = ?
439               AND id NOT IN (
440                   SELECT to_node_id FROM lattice_edges
441                   WHERE graph_id = ? AND from_node_id != to_node_id
442               )
443             ORDER BY created_at"
444        )
445        .bind(graph_id)
446        .bind(graph_id)
447        .fetch_all(&self.pool)
448        .await
449        .map_err(|e| format!("Failed to fetch root nodes: {}", e))?;
450
451        rows.into_iter().map(|row| self.row_to_node(row)).collect()
452    }
453
454    pub async fn set_node_status(
455        &self,
456        node_id: &NodeId,
457        status: NodeStatus,
458        output: Option<&NodeOutput>,
459        error: Option<&str>,
460    ) -> Result<(), String> {
461        let now = current_timestamp();
462        let completed_at = match status {
463            NodeStatus::Complete | NodeStatus::Failed => Some(now),
464            _ => None,
465        };
466        let output_json = output
467            .map(|o| serde_json::to_string(o).map_err(|e| format!("Failed to serialize output: {}", e)))
468            .transpose()?;
469
470        sqlx::query(
471            "UPDATE lattice_nodes SET status = ?, output = ?, error = ?, completed_at = ? WHERE id = ?"
472        )
473        .bind(status.to_string())
474        .bind(output_json.as_deref())
475        .bind(error)
476        .bind(completed_at)
477        .bind(node_id)
478        .execute(&self.pool)
479        .await
480        .map_err(|e| format!("Failed to update node status: {}", e))?;
481
482        Ok(())
483    }
484
485    pub async fn update_graph_status(
486        &self,
487        graph_id: &GraphId,
488        status: GraphStatus,
489    ) -> Result<(), String> {
490        sqlx::query("UPDATE lattice_graphs SET status = ? WHERE id = ?")
491            .bind(status.to_string())
492            .bind(graph_id)
493            .execute(&self.pool)
494            .await
495            .map_err(|e| format!("Failed to update graph status: {}", e))?;
496        Ok(())
497    }
498
499    pub async fn list_graphs(&self) -> Result<Vec<LatticeGraph>, String> {
500        let rows = sqlx::query(
501            "SELECT id, metadata, status, created_at, parent_graph_id FROM lattice_graphs ORDER BY created_at DESC"
502        )
503        .fetch_all(&self.pool)
504        .await
505        .map_err(|e| format!("Failed to list graphs: {}", e))?;
506
507        let mut graphs = Vec::new();
508        for row in rows {
509            let graph_id: String = row.get("id");
510            let node_count: i64 = sqlx::query_scalar(
511                "SELECT COUNT(*) FROM lattice_nodes WHERE graph_id = ?"
512            )
513            .bind(&graph_id)
514            .fetch_one(&self.pool)
515            .await
516            .map_err(|e| format!("Failed to count nodes: {}", e))?;
517
518            let edge_count: i64 = sqlx::query_scalar(
519                "SELECT COUNT(*) FROM lattice_edges WHERE graph_id = ?"
520            )
521            .bind(&graph_id)
522            .fetch_one(&self.pool)
523            .await
524            .map_err(|e| format!("Failed to count edges: {}", e))?;
525
526            graphs.push(self.row_to_graph(row, node_count as usize, edge_count as usize)?);
527        }
528        Ok(graphs)
529    }
530
531    /// Return graph IDs whose status is 'running'.
532    ///
533    /// Used by the startup recovery pass to find graphs that were mid-execution
534    /// when the substrate last shut down.
535    pub async fn get_running_graph_ids(&self) -> Result<Vec<GraphId>, String> {
536        let rows = sqlx::query(
537            "SELECT id FROM lattice_graphs WHERE status = 'running' ORDER BY created_at"
538        )
539        .fetch_all(&self.pool)
540        .await
541        .map_err(|e| format!("Failed to fetch running graphs: {}", e))?;
542
543        Ok(rows.into_iter().map(|r| r.get::<String, _>("id")).collect())
544    }
545
546    /// Reset a single node to 'pending', clearing output/error/completed_at.
547    ///
548    /// Used by the startup recovery pass to un-stick nodes that were left in
549    /// 'ready' state when the substrate last shut down.
550    pub async fn reset_node_to_pending(&self, node_id: &NodeId) -> Result<(), String> {
551        sqlx::query(
552            "UPDATE lattice_nodes SET status = 'pending', output = NULL, error = NULL, completed_at = NULL WHERE id = ?"
553        )
554        .bind(node_id)
555        .execute(&self.pool)
556        .await
557        .map_err(|e| format!("Failed to reset node to pending: {}", e))?;
558        Ok(())
559    }
560
561    // ─── Token Delivery ──────────────────────────────────────────────────────
562
563    /// Append a token delivery to an edge.
564    async fn deliver_token(
565        &self,
566        edge_id: &str,
567        graph_id: &GraphId,
568        token: &Token,
569        seq: i64,
570    ) -> Result<(), String> {
571        let token_json = serde_json::to_string(token)
572            .map_err(|e| format!("Failed to serialize token: {}", e))?;
573
574        sqlx::query(
575            "INSERT INTO lattice_edge_tokens (edge_id, graph_id, token, seq) VALUES (?, ?, ?, ?)"
576        )
577        .bind(edge_id)
578        .bind(graph_id)
579        .bind(&token_json)
580        .bind(seq)
581        .execute(&self.pool)
582        .await
583        .map_err(|e| format!("Failed to deliver token: {}", e))?;
584
585        Ok(())
586    }
587
588    /// Count tokens already delivered to a specific edge.
589    async fn count_tokens_on_edge(&self, edge_id: &str) -> Result<i64, String> {
590        let count: i64 = sqlx::query_scalar(
591            "SELECT COUNT(*) FROM lattice_edge_tokens WHERE edge_id = ?"
592        )
593        .bind(edge_id)
594        .fetch_one(&self.pool)
595        .await
596        .map_err(|e| format!("Failed to count edge tokens: {}", e))?;
597        Ok(count)
598    }
599
600    /// How many distinct inbound edges have at least one token, vs total inbound.
601    async fn count_edges_with_tokens(
602        &self,
603        node_id: &NodeId,
604    ) -> Result<(usize, usize), String> {
605        let total: i64 = sqlx::query_scalar(
606            "SELECT COUNT(*) FROM lattice_edges WHERE to_node_id = ? AND from_node_id != to_node_id"
607        )
608        .bind(node_id)
609        .fetch_one(&self.pool)
610        .await
611        .map_err(|e| format!("Failed to count inbound edges: {}", e))?;
612
613        let delivered: i64 = sqlx::query_scalar(
614            "SELECT COUNT(DISTINCT edge_id) FROM lattice_edge_tokens
615             WHERE edge_id IN (
616                 SELECT id FROM lattice_edges WHERE to_node_id = ? AND from_node_id != to_node_id
617             )"
618        )
619        .bind(node_id)
620        .fetch_one(&self.pool)
621        .await
622        .map_err(|e| format!("Failed to count delivered edges: {}", e))?;
623
624        Ok((delivered as usize, total as usize))
625    }
626
627    /// Collect all tokens from all inbound edges of a node (for Gather).
628    pub async fn get_node_inputs(&self, node_id: &NodeId) -> Result<Vec<Token>, String> {
629        let rows = sqlx::query(
630            "SELECT et.token FROM lattice_edge_tokens et
631             JOIN lattice_edges e ON et.edge_id = e.id
632             WHERE e.to_node_id = ?
633             ORDER BY et.seq"
634        )
635        .bind(node_id)
636        .fetch_all(&self.pool)
637        .await
638        .map_err(|e| format!("Failed to fetch node inputs: {}", e))?;
639
640        rows.into_iter()
641            .map(|row| {
642                let token_json: String = row.get("token");
643                serde_json::from_str::<Token>(&token_json)
644                    .map_err(|e| format!("Failed to deserialize token: {}", e))
645            })
646            .collect()
647    }
648
649    // ─── Event Log ───────────────────────────────────────────────────────────
650
651    /// Append an event to the durable log and return its sequence number.
652    pub async fn persist_event(
653        &self,
654        graph_id: &GraphId,
655        event: &LatticeEvent,
656    ) -> Result<u64, String> {
657        let event_json = serde_json::to_string(event)
658            .map_err(|e| format!("Failed to serialize event: {}", e))?;
659        let now = current_timestamp();
660
661        let result = sqlx::query(
662            "INSERT INTO lattice_events (graph_id, event, created_at) VALUES (?, ?, ?)"
663        )
664        .bind(graph_id)
665        .bind(&event_json)
666        .bind(now)
667        .execute(&self.pool)
668        .await
669        .map_err(|e| format!("Failed to persist event: {}", e))?;
670
671        Ok(result.last_insert_rowid() as u64)
672    }
673
674    /// Read all events for a graph with seq > after_seq, in order.
675    pub async fn get_events_after(
676        &self,
677        graph_id: &GraphId,
678        after_seq: u64,
679    ) -> Result<Vec<(u64, LatticeEvent)>, String> {
680        let rows = sqlx::query(
681            "SELECT seq, event FROM lattice_events WHERE graph_id = ? AND seq > ? ORDER BY seq"
682        )
683        .bind(graph_id)
684        .bind(after_seq as i64)
685        .fetch_all(&self.pool)
686        .await
687        .map_err(|e| format!("Failed to fetch events: {}", e))?;
688
689        rows.into_iter()
690            .map(|row| {
691                let seq: i64 = row.get("seq");
692                let event_json: String = row.get("event");
693                let event: LatticeEvent = serde_json::from_str(&event_json)
694                    .map_err(|e| format!("Failed to deserialize event: {}", e))?;
695                Ok((seq as u64, event))
696            })
697            .collect()
698    }
699
700    /// Wake the execute() stream for a graph.
701    pub fn notify_graph(&self, graph_id: &GraphId) {
702        if let Ok(notifiers) = self.graph_notifiers.read() {
703            if let Some(notifier) = notifiers.get(graph_id) {
704                notifier.notify_one();
705            }
706        }
707    }
708
709    pub fn get_or_create_notifier(&self, graph_id: &GraphId) -> Arc<Notify> {
710        let mut notifiers = self.graph_notifiers.write().unwrap();
711        notifiers
712            .entry(graph_id.clone())
713            .or_insert_with(|| Arc::new(Notify::new()))
714            .clone()
715    }
716
717    // ─── Graph Lifecycle ─────────────────────────────────────────────────────
718
719    /// Atomically transition a Pending graph to Running, seed root nodes as Ready,
720    /// and persist the initial NodeReady events.
721    ///
722    /// Idempotent: if another caller already started the graph, this is a no-op.
723    pub async fn start_graph(&self, graph_id: &GraphId) -> Result<(), String> {
724        // CAS: only transition if still Pending
725        let result = sqlx::query(
726            "UPDATE lattice_graphs SET status = 'running' WHERE id = ? AND status = 'pending'"
727        )
728        .bind(graph_id)
729        .execute(&self.pool)
730        .await
731        .map_err(|e| format!("Failed to start graph: {}", e))?;
732
733        if result.rows_affected() == 0 {
734            // Already started by a concurrent caller — that's fine
735            return Ok(());
736        }
737
738        let root_nodes = self.get_nodes_with_no_predecessors(graph_id).await?;
739
740        if root_nodes.is_empty() {
741            self.update_graph_status(graph_id, GraphStatus::Complete).await?;
742            self.persist_event(graph_id, &LatticeEvent::GraphDone {
743                graph_id: graph_id.clone(),
744            }).await?;
745        } else {
746            for node in root_nodes {
747                match &node.spec {
748                    NodeSpec::Gather { .. } => {
749                        // Root Gather node with no predecessors — skip NodeReady.
750                        // It will stay Pending (degenerate: no tokens will ever arrive).
751                    }
752                    _ => {
753                        self.set_node_status(&node.id, NodeStatus::Ready, None, None).await?;
754                        self.persist_event(graph_id, &LatticeEvent::NodeReady {
755                            node_id: node.id,
756                            spec: node.spec,
757                        }).await?;
758                    }
759                }
760            }
761        }
762
763        self.notify_graph(graph_id);
764        Ok(())
765    }
766
767    /// Reset a zombie Running node back to Ready for crash recovery.
768    ///
769    /// Inbound edge tokens are never deleted, so the node's join condition
770    /// is still satisfied and it can be re-dispatched immediately.
771    /// Emits a fresh NodeReady event so the `run_graph_execution` watcher
772    /// picks it up.  Idempotent: no-op if node is not Running.
773    pub async fn reset_running_to_ready(
774        &self,
775        graph_id: &GraphId,
776        node_id: &NodeId,
777    ) -> Result<(), String> {
778        let node = self.get_node(node_id).await?;
779        if node.status != NodeStatus::Running {
780            return Ok(());
781        }
782        self.set_node_status(node_id, NodeStatus::Ready, None, None).await?;
783        self.persist_event(graph_id, &LatticeEvent::NodeReady {
784            node_id: node_id.clone(),
785            spec: node.spec,
786        }).await?;
787        self.notify_graph(graph_id);
788        Ok(())
789    }
790
791    /// Re-emit NodeReady events for all nodes currently in the 'ready' state.
792    ///
793    /// Called during startup recovery after stuck nodes have been reset.
794    /// The graph must already be in 'running' status.  This re-seeds the
795    /// execute() stream so that `run_graph_execution` watchers can pick up
796    /// and dispatch the pending work.
797    pub async fn reemit_ready_nodes(&self, graph_id: &GraphId) -> Result<(), String> {
798        let nodes = self.get_nodes(graph_id).await?;
799        for node in nodes {
800            if node.status == NodeStatus::Ready {
801                self.persist_event(graph_id, &LatticeEvent::NodeReady {
802                    node_id: node.id,
803                    spec: node.spec,
804                }).await?;
805            }
806        }
807        self.notify_graph(graph_id);
808        Ok(())
809    }
810
811    // ─── Transition Logic ────────────────────────────────────────────────────
812
813    /// Called by node_complete / node_failed.
814    ///
815    /// Uses an iterative queue to handle Gather auto-execution without async recursion.
816    /// Implements the colored token model:
817    ///   1. Produce tokens from output/error
818    ///   2. Route tokens on outbound edges (filtered by edge condition)
819    ///   3. For each downstream node: check_and_ready (enables Gather auto-execution)
820    ///   4. Error fallback if no tokens delivered
821    ///   5. Check graph completion
822    pub async fn advance_graph(
823        &self,
824        graph_id: &GraphId,
825        completed_node_id: &NodeId,
826        output: Option<NodeOutput>,
827        error: Option<String>,
828    ) -> Result<(), String> {
829        // Queue: (node_id, output, error) — Gather nodes are auto-executed via queue
830        let mut queue: Vec<(NodeId, Option<NodeOutput>, Option<String>)> =
831            vec![(completed_node_id.clone(), output, error)];
832
833        while let Some((nid, out, err)) = queue.pop() {
834            // IDEMPOTENCY: skip if already terminal
835            let node = match self.get_node(&nid).await {
836                Ok(n) => n,
837                Err(_) => continue,
838            };
839            if node.status == NodeStatus::Complete || node.status == NodeStatus::Failed {
840                continue;
841            }
842
843            let failed = err.is_some();
844
845            // PRODUCE TOKENS
846            let tokens: Vec<Token>;
847            if failed {
848                let err_msg = err.unwrap_or_default();
849                self.set_node_status(&nid, NodeStatus::Failed, None, Some(&err_msg)).await?;
850                self.persist_event(graph_id, &LatticeEvent::NodeFailed {
851                    node_id: nid.clone(),
852                    error: err_msg.clone(),
853                }).await?;
854                tokens = vec![Token::error(err_msg)];
855            } else {
856                self.set_node_status(&nid, NodeStatus::Complete, out.as_ref(), None).await?;
857                self.persist_event(graph_id, &LatticeEvent::NodeDone {
858                    node_id: nid.clone(),
859                    output: out.clone(),
860                }).await?;
861                tokens = out.as_ref()
862                    .map(|o| o.tokens().into_iter().cloned().collect())
863                    .unwrap_or_else(|| vec![Token::ok()]);
864            }
865
866            // ROUTE TOKENS ON OUTBOUND EDGES
867            let outbound = self.get_outbound_edges_with_conditions(&nid).await?;
868            let mut any_delivered = false;
869
870            for token in &tokens {
871                for (edge_id, to_node_id, condition) in &outbound {
872                    let matches = condition.as_ref()
873                        .map(|c| c.matches(&token.color))
874                        .unwrap_or(true);
875
876                    if matches {
877                        let seq = self.count_tokens_on_edge(edge_id).await? + 1;
878                        self.deliver_token(edge_id, graph_id, token, seq).await?;
879                        any_delivered = true;
880
881                        // Check if downstream node is now enabled
882                        if let Some(gather_output) = self.check_and_ready(graph_id, to_node_id).await? {
883                            queue.push((to_node_id.clone(), Some(gather_output), None));
884                        }
885                    }
886                }
887            }
888
889            // ERROR FALLBACK: no handler found for error token
890            if failed && !any_delivered {
891                let err_str = tokens.first()
892                    .and_then(|t| match &t.payload {
893                        Some(TokenPayload::Data { value }) => {
894                            value.get("message").and_then(|v| v.as_str()).map(|s| s.to_string())
895                        }
896                        _ => None,
897                    })
898                    .unwrap_or_default();
899                self.update_graph_status(graph_id, GraphStatus::Failed).await?;
900                self.persist_event(graph_id, &LatticeEvent::GraphFailed {
901                    graph_id: graph_id.clone(),
902                    node_id: nid.clone(),
903                    error: err_str,
904                }).await?;
905                self.notify_graph(graph_id);
906                return Ok(());
907            }
908        }
909
910        // GRAPH COMPLETION: check if all nodes are complete
911        let all_done = self.get_nodes(graph_id).await?
912            .iter()
913            .all(|n| n.status == NodeStatus::Complete);
914        if all_done {
915            self.update_graph_status(graph_id, GraphStatus::Complete).await?;
916            self.persist_event(graph_id, &LatticeEvent::GraphDone {
917                graph_id: graph_id.clone(),
918            }).await?;
919        }
920
921        self.notify_graph(graph_id);
922        Ok(())
923    }
924
925    /// Check if a node is enabled (all/any inbound edges delivered tokens).
926    /// Returns Some(output) for Gather nodes (caller should queue for processing).
927    /// Returns None for Task/Scatter (sets Ready and persists NodeReady).
928    async fn check_and_ready(
929        &self,
930        graph_id: &GraphId,
931        node_id: &NodeId,
932    ) -> Result<Option<NodeOutput>, String> {
933        let node = self.get_node(node_id).await?;
934
935        // Only transition from Pending or Complete (Complete → Ready allows self-loops)
936        if node.status != NodeStatus::Pending && node.status != NodeStatus::Complete {
937            return Ok(None);
938        }
939
940        let (delivered, total) = self.count_edges_with_tokens(node_id).await?;
941        let enabled = match &node.join_type {
942            JoinType::All => delivered == total && total > 0,
943            JoinType::Any => delivered >= 1,
944        };
945
946        if !enabled {
947            return Ok(None);
948        }
949
950        match &node.spec {
951            NodeSpec::Gather { strategy } => {
952                let tokens = self.get_node_inputs(node_id).await?;
953                let output = match strategy {
954                    GatherStrategy::All => NodeOutput::Many { tokens },
955                    GatherStrategy::First { n } => {
956                        let take = (*n).min(tokens.len());
957                        NodeOutput::Many { tokens: tokens[..take].to_vec() }
958                    }
959                };
960                self.set_node_status(node_id, NodeStatus::Running, None, None).await?;
961                Ok(Some(output))
962            }
963            _ => {
964                // Task / Scatter / SubGraph — emit NodeReady for caller to handle
965                self.set_node_status(node_id, NodeStatus::Ready, None, None).await?;
966                self.persist_event(graph_id, &LatticeEvent::NodeReady {
967                    node_id: node_id.clone(),
968                    spec: node.spec.clone(),
969                }).await?;
970                Ok(None)
971            }
972        }
973    }
974
975    async fn get_graph_status(&self, graph_id: &str) -> Result<GraphStatus, String> {
976        let row = sqlx::query("SELECT status FROM lattice_graphs WHERE id = ?")
977            .bind(graph_id)
978            .fetch_one(&self.pool)
979            .await
980            .map_err(|e| e.to_string())?;
981        let s: String = row.try_get("status").map_err(|e| e.to_string())?;
982        s.parse::<GraphStatus>()
983    }
984
985    // ─── Execute Stream ──────────────────────────────────────────────────────
986
987    /// Long-lived stream of sequenced events for a graph.
988    ///
989    /// This is the canonical implementation — both the lattice activation and
990    /// Orcha (as a library consumer) call this directly.
991    pub fn execute_stream(
992        storage: Arc<LatticeStorage>,
993        graph_id: GraphId,
994        after_seq: Option<u64>,
995    ) -> impl Stream<Item = LatticeEventEnvelope> + Send + 'static {
996        stream! {
997            let graph = match storage.get_graph(&graph_id).await {
998                Ok(g) => g,
999                Err(e) => {
1000                    yield LatticeEventEnvelope {
1001                        seq: 0,
1002                        event: LatticeEvent::GraphFailed {
1003                            graph_id: graph_id.clone(),
1004                            node_id: "".to_string(),
1005                            error: format!("Graph not found: {}", e),
1006                        },
1007                    };
1008                    return;
1009                }
1010            };
1011
1012            if after_seq.is_none() && graph.status == GraphStatus::Pending {
1013                if let Err(e) = storage.start_graph(&graph_id).await {
1014                    yield LatticeEventEnvelope {
1015                        seq: 0,
1016                        event: LatticeEvent::GraphFailed {
1017                            graph_id: graph_id.clone(),
1018                            node_id: "".to_string(),
1019                            error: format!("Failed to start graph: {}", e),
1020                        },
1021                    };
1022                    return;
1023                }
1024            }
1025
1026            let notifier = storage.get_or_create_notifier(&graph_id);
1027            let mut cursor = after_seq.unwrap_or(0);
1028
1029            loop {
1030                let events = match storage.get_events_after(&graph_id, cursor).await {
1031                    Ok(evs) => evs,
1032                    Err(e) => {
1033                        yield LatticeEventEnvelope {
1034                            seq: cursor,
1035                            event: LatticeEvent::GraphFailed {
1036                                graph_id: graph_id.clone(),
1037                                node_id: "".to_string(),
1038                                error: format!("Event read error: {}", e),
1039                            },
1040                        };
1041                        return;
1042                    }
1043                };
1044
1045                for (seq, event) in events {
1046                    let done = matches!(
1047                        event,
1048                        LatticeEvent::GraphDone { .. } | LatticeEvent::GraphFailed { .. }
1049                    );
1050                    cursor = seq;
1051                    yield LatticeEventEnvelope { seq, event };
1052                    if done { return; }
1053                }
1054
1055                tokio::select! {
1056                    _ = notifier.notified() => continue,
1057                    _ = tokio::time::sleep(Duration::from_secs(3600)) => {
1058                        let event = LatticeEvent::GraphFailed {
1059                            graph_id: graph_id.clone(),
1060                            node_id: "timeout".to_string(),
1061                            error: "Execution timed out".to_string(),
1062                        };
1063                        let seq = storage.persist_event(&graph_id, &event).await.unwrap_or(cursor + 1);
1064                        yield LatticeEventEnvelope { seq, event };
1065                        return;
1066                    }
1067                }
1068            }
1069        }
1070    }
1071
1072    // ─── Row Helpers ─────────────────────────────────────────────────────────
1073
1074    fn row_to_graph(
1075        &self,
1076        row: sqlx::sqlite::SqliteRow,
1077        node_count: usize,
1078        edge_count: usize,
1079    ) -> Result<LatticeGraph, String> {
1080        let metadata_json: String = row.get("metadata");
1081        let status_str: String = row.get("status");
1082
1083        Ok(LatticeGraph {
1084            id: row.get("id"),
1085            metadata: serde_json::from_str(&metadata_json).unwrap_or(serde_json::json!({})),
1086            status: GraphStatus::from_str(&status_str).unwrap_or(GraphStatus::Pending),
1087            created_at: row.get("created_at"),
1088            node_count,
1089            edge_count,
1090            parent_graph_id: row.try_get::<Option<String>, _>("parent_graph_id").unwrap_or(None),
1091        })
1092    }
1093
1094    fn row_to_node(&self, row: sqlx::sqlite::SqliteRow) -> Result<LatticeNode, String> {
1095        let spec_json: String = row.get("spec");
1096        let status_str: String = row.get("status");
1097        let output_json: Option<String> = row.get("output");
1098
1099        let spec: NodeSpec = serde_json::from_str(&spec_json)
1100            .map_err(|e| format!("Failed to deserialize spec: {}", e))?;
1101        let status = NodeStatus::from_str(&status_str).unwrap_or(NodeStatus::Pending);
1102        let output = output_json
1103            .as_deref()
1104            .map(|s| serde_json::from_str::<NodeOutput>(s))
1105            .transpose()
1106            .map_err(|e| format!("Failed to deserialize output: {}", e))?;
1107
1108        // join_type column may not exist on older databases
1109        let join_type_str: Option<String> = row.try_get("join_type").ok().flatten();
1110        let join_type = match join_type_str.as_deref().unwrap_or("all") {
1111            "any" => JoinType::Any,
1112            _ => JoinType::All,
1113        };
1114
1115        Ok(LatticeNode {
1116            id: row.get("id"),
1117            graph_id: row.get("graph_id"),
1118            spec,
1119            status,
1120            join_type,
1121            output,
1122            error: row.get("error"),
1123            created_at: row.get("created_at"),
1124            completed_at: row.get("completed_at"),
1125        })
1126    }
1127}
1128
1129fn current_timestamp() -> i64 {
1130    SystemTime::now()
1131        .duration_since(UNIX_EPOCH)
1132        .unwrap()
1133        .as_secs() as i64
1134}