1use super::types::{
2 EdgeCondition, GatherStrategy, GraphId, GraphStatus, JoinType, LatticeEvent,
3 LatticeEventEnvelope, LatticeGraph, LatticeNode, NodeId, NodeOutput, NodeSpec, NodeStatus,
4 Token, TokenPayload,
5};
6use crate::activation_db_path_from_module;
7use crate::activations::storage::init_sqlite_pool;
8use async_stream::stream;
9use futures::Stream;
10use serde_json::Value;
11use sqlx::{sqlite::SqlitePool, Row};
12use std::collections::HashMap;
13use std::path::PathBuf;
14use std::str::FromStr;
15use std::sync::{Arc, RwLock};
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17use tokio::sync::Notify;
18use uuid::Uuid;
19
20#[derive(Debug, Clone)]
21pub struct LatticeStorageConfig {
22 pub db_path: PathBuf,
23}
24
25impl Default for LatticeStorageConfig {
26 fn default() -> Self {
27 Self {
28 db_path: activation_db_path_from_module!("lattice.db"),
29 }
30 }
31}
32
33pub struct LatticeStorage {
34 pool: SqlitePool,
35 graph_notifiers: Arc<RwLock<HashMap<GraphId, Arc<Notify>>>>,
37}
38
39impl LatticeStorage {
40 pub async fn new(config: LatticeStorageConfig) -> Result<Self, String> {
41 let pool = init_sqlite_pool(config.db_path).await?;
42 let storage = Self {
43 pool,
44 graph_notifiers: Arc::new(RwLock::new(HashMap::new())),
45 };
46 storage.run_migrations().await?;
47 Ok(storage)
48 }
49
50 async fn run_migrations(&self) -> Result<(), String> {
51 sqlx::query(r#"
52 CREATE TABLE IF NOT EXISTS lattice_graphs (
53 id TEXT PRIMARY KEY,
54 metadata TEXT NOT NULL DEFAULT '{}',
55 status TEXT NOT NULL DEFAULT 'pending',
56 created_at INTEGER NOT NULL
57 );
58
59 CREATE TABLE IF NOT EXISTS lattice_nodes (
60 id TEXT PRIMARY KEY,
61 graph_id TEXT NOT NULL,
62 spec TEXT NOT NULL,
63 status TEXT NOT NULL DEFAULT 'pending',
64 output TEXT,
65 error TEXT,
66 created_at INTEGER NOT NULL,
67 completed_at INTEGER,
68 FOREIGN KEY (graph_id) REFERENCES lattice_graphs(id)
69 );
70 CREATE INDEX IF NOT EXISTS idx_lattice_nodes_graph ON lattice_nodes(graph_id);
71
72 CREATE TABLE IF NOT EXISTS lattice_edges (
73 id TEXT PRIMARY KEY,
74 graph_id TEXT NOT NULL,
75 from_node_id TEXT NOT NULL,
76 to_node_id TEXT NOT NULL,
77 FOREIGN KEY (graph_id) REFERENCES lattice_graphs(id)
78 );
79 CREATE INDEX IF NOT EXISTS idx_lattice_edges_graph ON lattice_edges(graph_id);
80 CREATE INDEX IF NOT EXISTS idx_lattice_edges_to ON lattice_edges(to_node_id);
81
82 -- Durable event log: every LatticeEvent is appended here.
83 -- seq is the cursor callers use for reconnect replay.
84 CREATE TABLE IF NOT EXISTS lattice_events (
85 seq INTEGER PRIMARY KEY AUTOINCREMENT,
86 graph_id TEXT NOT NULL,
87 event TEXT NOT NULL,
88 created_at INTEGER NOT NULL
89 );
90 CREATE INDEX IF NOT EXISTS idx_lattice_events_graph ON lattice_events(graph_id, seq);
91
92 -- Token delivery tracking (Petri net marking)
93 CREATE TABLE IF NOT EXISTS lattice_edge_tokens (
94 edge_id TEXT NOT NULL,
95 graph_id TEXT NOT NULL,
96 token TEXT NOT NULL,
97 seq INTEGER NOT NULL,
98 PRIMARY KEY (edge_id, seq)
99 );
100 CREATE INDEX IF NOT EXISTS idx_lattice_edge_tokens_graph
101 ON lattice_edge_tokens(graph_id);
102 "#)
103 .execute(&self.pool)
104 .await
105 .map_err(|e| format!("Migration failed: {}", e))?;
106
107 let _ = sqlx::query("ALTER TABLE lattice_edges ADD COLUMN condition TEXT")
109 .execute(&self.pool).await;
110 let _ = sqlx::query("ALTER TABLE lattice_nodes ADD COLUMN join_type TEXT NOT NULL DEFAULT 'all'")
111 .execute(&self.pool).await;
112 let _ = sqlx::query(
113 "ALTER TABLE lattice_graphs ADD COLUMN parent_graph_id TEXT NULL REFERENCES lattice_graphs(id)"
114 ).execute(&self.pool).await;
115 let _ = sqlx::query(
116 "CREATE INDEX IF NOT EXISTS idx_lattice_graphs_parent ON lattice_graphs(parent_graph_id)"
117 ).execute(&self.pool).await;
118
119 Ok(())
120 }
121
122 pub async fn create_graph(&self, metadata: Value) -> Result<GraphId, String> {
125 let id = format!("lattice-{}", Uuid::new_v4());
126 let now = current_timestamp();
127 let metadata_json = serde_json::to_string(&metadata)
128 .map_err(|e| format!("Failed to serialize metadata: {}", e))?;
129
130 sqlx::query(
131 "INSERT INTO lattice_graphs (id, metadata, status, created_at) VALUES (?, ?, 'pending', ?)"
132 )
133 .bind(&id)
134 .bind(&metadata_json)
135 .bind(now)
136 .execute(&self.pool)
137 .await
138 .map_err(|e| format!("Failed to create graph: {}", e))?;
139
140 Ok(id)
141 }
142
143 pub async fn create_child_graph(
144 &self,
145 parent_id: &str,
146 metadata: Value,
147 ) -> Result<String, String> {
148 let id = format!("lattice-{}", Uuid::new_v4());
149 let now = current_timestamp();
150 let metadata_json = serde_json::to_string(&metadata)
151 .map_err(|e| format!("Failed to serialize metadata: {}", e))?;
152
153 sqlx::query(
154 "INSERT INTO lattice_graphs (id, metadata, status, created_at, parent_graph_id) VALUES (?, ?, 'pending', ?, ?)"
155 )
156 .bind(&id)
157 .bind(&metadata_json)
158 .bind(now)
159 .bind(parent_id)
160 .execute(&self.pool)
161 .await
162 .map_err(|e| format!("Failed to create child graph: {}", e))?;
163
164 Ok(id)
165 }
166
167 pub async fn get_child_graphs(&self, parent_id: &str) -> Result<Vec<LatticeGraph>, String> {
168 let rows = sqlx::query(
169 "SELECT id, metadata, status, created_at, parent_graph_id FROM lattice_graphs WHERE parent_graph_id = ? ORDER BY created_at"
170 )
171 .bind(parent_id)
172 .fetch_all(&self.pool)
173 .await
174 .map_err(|e| format!("Failed to fetch child graphs: {}", e))?;
175
176 let mut graphs = Vec::new();
177 for row in rows {
178 let graph_id: String = row.get("id");
179 let node_count: i64 = sqlx::query_scalar(
180 "SELECT COUNT(*) FROM lattice_nodes WHERE graph_id = ?"
181 )
182 .bind(&graph_id)
183 .fetch_one(&self.pool)
184 .await
185 .map_err(|e| format!("Failed to count nodes: {}", e))?;
186
187 let edge_count: i64 = sqlx::query_scalar(
188 "SELECT COUNT(*) FROM lattice_edges WHERE graph_id = ?"
189 )
190 .bind(&graph_id)
191 .fetch_one(&self.pool)
192 .await
193 .map_err(|e| format!("Failed to count edges: {}", e))?;
194
195 graphs.push(self.row_to_graph(row, node_count as usize, edge_count as usize)?);
196 }
197 Ok(graphs)
198 }
199
200 pub async fn add_node(
201 &self,
202 graph_id: &GraphId,
203 node_id_hint: Option<NodeId>,
204 spec: &NodeSpec,
205 ) -> Result<NodeId, String> {
206 let id = node_id_hint.unwrap_or_else(|| Uuid::new_v4().to_string());
207 let now = current_timestamp();
208 let spec_json = serde_json::to_string(spec)
209 .map_err(|e| format!("Failed to serialize spec: {}", e))?;
210
211 sqlx::query(
212 "INSERT INTO lattice_nodes (id, graph_id, spec, status, created_at) VALUES (?, ?, ?, 'pending', ?)"
213 )
214 .bind(&id)
215 .bind(graph_id)
216 .bind(&spec_json)
217 .bind(now)
218 .execute(&self.pool)
219 .await
220 .map_err(|e| format!("Failed to add node: {}", e))?;
221
222 if let Ok(graph_status) = self.get_graph_status(graph_id).await {
223 if graph_status == GraphStatus::Running {
224 let _ = self.check_and_ready(graph_id, &id).await;
225 }
226 }
227
228 Ok(id)
229 }
230
231 pub async fn add_edge(
232 &self,
233 graph_id: &GraphId,
234 from_node_id: &NodeId,
235 to_node_id: &NodeId,
236 condition: Option<&EdgeCondition>,
237 ) -> Result<(), String> {
238 let from_exists: bool = sqlx::query_scalar(
239 "SELECT COUNT(*) > 0 FROM lattice_nodes WHERE id = ? AND graph_id = ?"
240 )
241 .bind(from_node_id)
242 .bind(graph_id)
243 .fetch_one(&self.pool)
244 .await
245 .map_err(|e| format!("Failed to validate from_node: {}", e))?;
246
247 if !from_exists {
248 return Err(format!("Node {} not found in graph {}", from_node_id, graph_id));
249 }
250
251 let to_exists: bool = sqlx::query_scalar(
252 "SELECT COUNT(*) > 0 FROM lattice_nodes WHERE id = ? AND graph_id = ?"
253 )
254 .bind(to_node_id)
255 .bind(graph_id)
256 .fetch_one(&self.pool)
257 .await
258 .map_err(|e| format!("Failed to validate to_node: {}", e))?;
259
260 if !to_exists {
261 return Err(format!("Node {} not found in graph {}", to_node_id, graph_id));
262 }
263
264 let edge_id = Uuid::new_v4().to_string();
265 let condition_json = condition
266 .map(|c| serde_json::to_string(c).map_err(|e| format!("Failed to serialize condition: {}", e)))
267 .transpose()?;
268
269 sqlx::query(
270 "INSERT INTO lattice_edges (id, graph_id, from_node_id, to_node_id, condition) VALUES (?, ?, ?, ?, ?)"
271 )
272 .bind(&edge_id)
273 .bind(graph_id)
274 .bind(from_node_id)
275 .bind(to_node_id)
276 .bind(condition_json.as_deref())
277 .execute(&self.pool)
278 .await
279 .map_err(|e| format!("Failed to add edge: {}", e))?;
280
281 if let Ok(graph_status) = self.get_graph_status(graph_id).await {
282 if graph_status == GraphStatus::Running {
283 if let Ok(src_node) = self.get_node(from_node_id).await {
284 if src_node.status == NodeStatus::Complete {
285 let tokens: Vec<Token> = src_node.output.as_ref()
286 .map(|o| o.tokens().into_iter().cloned().collect())
287 .unwrap_or_else(|| vec![Token::ok()]);
288 for token in &tokens {
289 let matches = condition
290 .map(|c| c.matches(&token.color))
291 .unwrap_or(true);
292 if matches {
293 let seq = self.count_tokens_on_edge(&edge_id).await? + 1;
294 self.deliver_token(&edge_id, graph_id, token, seq).await?;
295 }
296 }
297 let _ = self.check_and_ready(graph_id, to_node_id).await;
298 self.notify_graph(graph_id);
299 }
300 }
301 }
302 }
303
304 Ok(())
305 }
306
307 pub async fn get_graph(&self, graph_id: &GraphId) -> Result<LatticeGraph, String> {
308 let row = sqlx::query(
309 "SELECT id, metadata, status, created_at, parent_graph_id FROM lattice_graphs WHERE id = ?"
310 )
311 .bind(graph_id)
312 .fetch_optional(&self.pool)
313 .await
314 .map_err(|e| format!("Failed to fetch graph: {}", e))?
315 .ok_or_else(|| format!("Graph not found: {}", graph_id))?;
316
317 let node_count: i64 = sqlx::query_scalar(
318 "SELECT COUNT(*) FROM lattice_nodes WHERE graph_id = ?"
319 )
320 .bind(graph_id)
321 .fetch_one(&self.pool)
322 .await
323 .map_err(|e| format!("Failed to count nodes: {}", e))?;
324
325 let edge_count: i64 = sqlx::query_scalar(
326 "SELECT COUNT(*) FROM lattice_edges WHERE graph_id = ?"
327 )
328 .bind(graph_id)
329 .fetch_one(&self.pool)
330 .await
331 .map_err(|e| format!("Failed to count edges: {}", e))?;
332
333 self.row_to_graph(row, node_count as usize, edge_count as usize)
334 }
335
336 pub async fn count_nodes(&self, graph_id: &GraphId) -> Result<usize, String> {
338 let count: i64 = sqlx::query_scalar(
339 "SELECT COUNT(*) FROM lattice_nodes WHERE graph_id = ?"
340 )
341 .bind(graph_id)
342 .fetch_one(&self.pool)
343 .await
344 .map_err(|e| format!("Failed to count nodes: {}", e))?;
345
346 Ok(count as usize)
347 }
348
349 pub async fn get_nodes(&self, graph_id: &GraphId) -> Result<Vec<LatticeNode>, String> {
350 let rows = sqlx::query(
351 "SELECT id, graph_id, spec, status, output, error, created_at, completed_at
352 FROM lattice_nodes WHERE graph_id = ? ORDER BY created_at"
353 )
354 .bind(graph_id)
355 .fetch_all(&self.pool)
356 .await
357 .map_err(|e| format!("Failed to fetch nodes: {}", e))?;
358
359 rows.into_iter().map(|row| self.row_to_node(row)).collect()
360 }
361
362 pub async fn get_node(&self, node_id: &NodeId) -> Result<LatticeNode, String> {
363 let row = sqlx::query(
364 "SELECT id, graph_id, spec, status, output, error, created_at, completed_at
365 FROM lattice_nodes WHERE id = ?"
366 )
367 .bind(node_id)
368 .fetch_optional(&self.pool)
369 .await
370 .map_err(|e| format!("Failed to fetch node: {}", e))?
371 .ok_or_else(|| format!("Node not found: {}", node_id))?;
372
373 self.row_to_node(row)
374 }
375
376 pub async fn get_inbound_edges(&self, node_id: &NodeId) -> Result<Vec<NodeId>, String> {
377 let rows = sqlx::query(
378 "SELECT from_node_id FROM lattice_edges WHERE to_node_id = ?"
379 )
380 .bind(node_id)
381 .fetch_all(&self.pool)
382 .await
383 .map_err(|e| format!("Failed to fetch inbound edges: {}", e))?;
384
385 Ok(rows.into_iter().map(|r| r.get::<String, _>("from_node_id")).collect())
386 }
387
388 pub async fn get_outbound_edges(&self, node_id: &NodeId) -> Result<Vec<NodeId>, String> {
389 let rows = sqlx::query(
390 "SELECT to_node_id FROM lattice_edges WHERE from_node_id = ?"
391 )
392 .bind(node_id)
393 .fetch_all(&self.pool)
394 .await
395 .map_err(|e| format!("Failed to fetch outbound edges: {}", e))?;
396
397 Ok(rows.into_iter().map(|r| r.get::<String, _>("to_node_id")).collect())
398 }
399
400 async fn get_outbound_edges_with_conditions(
402 &self,
403 node_id: &NodeId,
404 ) -> Result<Vec<(String, NodeId, Option<EdgeCondition>)>, String> {
405 let rows = sqlx::query(
406 "SELECT id, to_node_id, condition FROM lattice_edges WHERE from_node_id = ?"
407 )
408 .bind(node_id)
409 .fetch_all(&self.pool)
410 .await
411 .map_err(|e| format!("Failed to fetch outbound edges: {}", e))?;
412
413 rows.into_iter()
414 .map(|row| {
415 let edge_id: String = row.get("id");
416 let to_node_id: String = row.get("to_node_id");
417 let condition_json: Option<String> = row.get("condition");
418 let condition = condition_json
419 .as_deref()
420 .map(|s| {
421 serde_json::from_str::<EdgeCondition>(s)
422 .map_err(|e| format!("Failed to deserialize edge condition: {}", e))
423 })
424 .transpose()?;
425 Ok((edge_id, to_node_id, condition))
426 })
427 .collect()
428 }
429
430 pub async fn get_nodes_with_no_predecessors(
431 &self,
432 graph_id: &GraphId,
433 ) -> Result<Vec<LatticeNode>, String> {
434 let rows = sqlx::query(
436 "SELECT id, graph_id, spec, status, output, error, created_at, completed_at
437 FROM lattice_nodes
438 WHERE graph_id = ?
439 AND id NOT IN (
440 SELECT to_node_id FROM lattice_edges
441 WHERE graph_id = ? AND from_node_id != to_node_id
442 )
443 ORDER BY created_at"
444 )
445 .bind(graph_id)
446 .bind(graph_id)
447 .fetch_all(&self.pool)
448 .await
449 .map_err(|e| format!("Failed to fetch root nodes: {}", e))?;
450
451 rows.into_iter().map(|row| self.row_to_node(row)).collect()
452 }
453
454 pub async fn set_node_status(
455 &self,
456 node_id: &NodeId,
457 status: NodeStatus,
458 output: Option<&NodeOutput>,
459 error: Option<&str>,
460 ) -> Result<(), String> {
461 let now = current_timestamp();
462 let completed_at = match status {
463 NodeStatus::Complete | NodeStatus::Failed => Some(now),
464 _ => None,
465 };
466 let output_json = output
467 .map(|o| serde_json::to_string(o).map_err(|e| format!("Failed to serialize output: {}", e)))
468 .transpose()?;
469
470 sqlx::query(
471 "UPDATE lattice_nodes SET status = ?, output = ?, error = ?, completed_at = ? WHERE id = ?"
472 )
473 .bind(status.to_string())
474 .bind(output_json.as_deref())
475 .bind(error)
476 .bind(completed_at)
477 .bind(node_id)
478 .execute(&self.pool)
479 .await
480 .map_err(|e| format!("Failed to update node status: {}", e))?;
481
482 Ok(())
483 }
484
485 pub async fn update_graph_status(
486 &self,
487 graph_id: &GraphId,
488 status: GraphStatus,
489 ) -> Result<(), String> {
490 sqlx::query("UPDATE lattice_graphs SET status = ? WHERE id = ?")
491 .bind(status.to_string())
492 .bind(graph_id)
493 .execute(&self.pool)
494 .await
495 .map_err(|e| format!("Failed to update graph status: {}", e))?;
496 Ok(())
497 }
498
499 pub async fn list_graphs(&self) -> Result<Vec<LatticeGraph>, String> {
500 let rows = sqlx::query(
501 "SELECT id, metadata, status, created_at, parent_graph_id FROM lattice_graphs ORDER BY created_at DESC"
502 )
503 .fetch_all(&self.pool)
504 .await
505 .map_err(|e| format!("Failed to list graphs: {}", e))?;
506
507 let mut graphs = Vec::new();
508 for row in rows {
509 let graph_id: String = row.get("id");
510 let node_count: i64 = sqlx::query_scalar(
511 "SELECT COUNT(*) FROM lattice_nodes WHERE graph_id = ?"
512 )
513 .bind(&graph_id)
514 .fetch_one(&self.pool)
515 .await
516 .map_err(|e| format!("Failed to count nodes: {}", e))?;
517
518 let edge_count: i64 = sqlx::query_scalar(
519 "SELECT COUNT(*) FROM lattice_edges WHERE graph_id = ?"
520 )
521 .bind(&graph_id)
522 .fetch_one(&self.pool)
523 .await
524 .map_err(|e| format!("Failed to count edges: {}", e))?;
525
526 graphs.push(self.row_to_graph(row, node_count as usize, edge_count as usize)?);
527 }
528 Ok(graphs)
529 }
530
531 pub async fn get_running_graph_ids(&self) -> Result<Vec<GraphId>, String> {
536 let rows = sqlx::query(
537 "SELECT id FROM lattice_graphs WHERE status = 'running' ORDER BY created_at"
538 )
539 .fetch_all(&self.pool)
540 .await
541 .map_err(|e| format!("Failed to fetch running graphs: {}", e))?;
542
543 Ok(rows.into_iter().map(|r| r.get::<String, _>("id")).collect())
544 }
545
546 pub async fn reset_node_to_pending(&self, node_id: &NodeId) -> Result<(), String> {
551 sqlx::query(
552 "UPDATE lattice_nodes SET status = 'pending', output = NULL, error = NULL, completed_at = NULL WHERE id = ?"
553 )
554 .bind(node_id)
555 .execute(&self.pool)
556 .await
557 .map_err(|e| format!("Failed to reset node to pending: {}", e))?;
558 Ok(())
559 }
560
561 async fn deliver_token(
565 &self,
566 edge_id: &str,
567 graph_id: &GraphId,
568 token: &Token,
569 seq: i64,
570 ) -> Result<(), String> {
571 let token_json = serde_json::to_string(token)
572 .map_err(|e| format!("Failed to serialize token: {}", e))?;
573
574 sqlx::query(
575 "INSERT INTO lattice_edge_tokens (edge_id, graph_id, token, seq) VALUES (?, ?, ?, ?)"
576 )
577 .bind(edge_id)
578 .bind(graph_id)
579 .bind(&token_json)
580 .bind(seq)
581 .execute(&self.pool)
582 .await
583 .map_err(|e| format!("Failed to deliver token: {}", e))?;
584
585 Ok(())
586 }
587
588 async fn count_tokens_on_edge(&self, edge_id: &str) -> Result<i64, String> {
590 let count: i64 = sqlx::query_scalar(
591 "SELECT COUNT(*) FROM lattice_edge_tokens WHERE edge_id = ?"
592 )
593 .bind(edge_id)
594 .fetch_one(&self.pool)
595 .await
596 .map_err(|e| format!("Failed to count edge tokens: {}", e))?;
597 Ok(count)
598 }
599
600 async fn count_edges_with_tokens(
602 &self,
603 node_id: &NodeId,
604 ) -> Result<(usize, usize), String> {
605 let total: i64 = sqlx::query_scalar(
606 "SELECT COUNT(*) FROM lattice_edges WHERE to_node_id = ? AND from_node_id != to_node_id"
607 )
608 .bind(node_id)
609 .fetch_one(&self.pool)
610 .await
611 .map_err(|e| format!("Failed to count inbound edges: {}", e))?;
612
613 let delivered: i64 = sqlx::query_scalar(
614 "SELECT COUNT(DISTINCT edge_id) FROM lattice_edge_tokens
615 WHERE edge_id IN (
616 SELECT id FROM lattice_edges WHERE to_node_id = ? AND from_node_id != to_node_id
617 )"
618 )
619 .bind(node_id)
620 .fetch_one(&self.pool)
621 .await
622 .map_err(|e| format!("Failed to count delivered edges: {}", e))?;
623
624 Ok((delivered as usize, total as usize))
625 }
626
627 pub async fn get_node_inputs(&self, node_id: &NodeId) -> Result<Vec<Token>, String> {
629 let rows = sqlx::query(
630 "SELECT et.token FROM lattice_edge_tokens et
631 JOIN lattice_edges e ON et.edge_id = e.id
632 WHERE e.to_node_id = ?
633 ORDER BY et.seq"
634 )
635 .bind(node_id)
636 .fetch_all(&self.pool)
637 .await
638 .map_err(|e| format!("Failed to fetch node inputs: {}", e))?;
639
640 rows.into_iter()
641 .map(|row| {
642 let token_json: String = row.get("token");
643 serde_json::from_str::<Token>(&token_json)
644 .map_err(|e| format!("Failed to deserialize token: {}", e))
645 })
646 .collect()
647 }
648
649 pub async fn persist_event(
653 &self,
654 graph_id: &GraphId,
655 event: &LatticeEvent,
656 ) -> Result<u64, String> {
657 let event_json = serde_json::to_string(event)
658 .map_err(|e| format!("Failed to serialize event: {}", e))?;
659 let now = current_timestamp();
660
661 let result = sqlx::query(
662 "INSERT INTO lattice_events (graph_id, event, created_at) VALUES (?, ?, ?)"
663 )
664 .bind(graph_id)
665 .bind(&event_json)
666 .bind(now)
667 .execute(&self.pool)
668 .await
669 .map_err(|e| format!("Failed to persist event: {}", e))?;
670
671 Ok(result.last_insert_rowid() as u64)
672 }
673
674 pub async fn get_events_after(
676 &self,
677 graph_id: &GraphId,
678 after_seq: u64,
679 ) -> Result<Vec<(u64, LatticeEvent)>, String> {
680 let rows = sqlx::query(
681 "SELECT seq, event FROM lattice_events WHERE graph_id = ? AND seq > ? ORDER BY seq"
682 )
683 .bind(graph_id)
684 .bind(after_seq as i64)
685 .fetch_all(&self.pool)
686 .await
687 .map_err(|e| format!("Failed to fetch events: {}", e))?;
688
689 rows.into_iter()
690 .map(|row| {
691 let seq: i64 = row.get("seq");
692 let event_json: String = row.get("event");
693 let event: LatticeEvent = serde_json::from_str(&event_json)
694 .map_err(|e| format!("Failed to deserialize event: {}", e))?;
695 Ok((seq as u64, event))
696 })
697 .collect()
698 }
699
700 pub fn notify_graph(&self, graph_id: &GraphId) {
702 if let Ok(notifiers) = self.graph_notifiers.read() {
703 if let Some(notifier) = notifiers.get(graph_id) {
704 notifier.notify_one();
705 }
706 }
707 }
708
709 pub fn get_or_create_notifier(&self, graph_id: &GraphId) -> Arc<Notify> {
710 let mut notifiers = self.graph_notifiers.write().unwrap();
711 notifiers
712 .entry(graph_id.clone())
713 .or_insert_with(|| Arc::new(Notify::new()))
714 .clone()
715 }
716
717 pub async fn start_graph(&self, graph_id: &GraphId) -> Result<(), String> {
724 let result = sqlx::query(
726 "UPDATE lattice_graphs SET status = 'running' WHERE id = ? AND status = 'pending'"
727 )
728 .bind(graph_id)
729 .execute(&self.pool)
730 .await
731 .map_err(|e| format!("Failed to start graph: {}", e))?;
732
733 if result.rows_affected() == 0 {
734 return Ok(());
736 }
737
738 let root_nodes = self.get_nodes_with_no_predecessors(graph_id).await?;
739
740 if root_nodes.is_empty() {
741 self.update_graph_status(graph_id, GraphStatus::Complete).await?;
742 self.persist_event(graph_id, &LatticeEvent::GraphDone {
743 graph_id: graph_id.clone(),
744 }).await?;
745 } else {
746 for node in root_nodes {
747 match &node.spec {
748 NodeSpec::Gather { .. } => {
749 }
752 _ => {
753 self.set_node_status(&node.id, NodeStatus::Ready, None, None).await?;
754 self.persist_event(graph_id, &LatticeEvent::NodeReady {
755 node_id: node.id,
756 spec: node.spec,
757 }).await?;
758 }
759 }
760 }
761 }
762
763 self.notify_graph(graph_id);
764 Ok(())
765 }
766
767 pub async fn reset_running_to_ready(
774 &self,
775 graph_id: &GraphId,
776 node_id: &NodeId,
777 ) -> Result<(), String> {
778 let node = self.get_node(node_id).await?;
779 if node.status != NodeStatus::Running {
780 return Ok(());
781 }
782 self.set_node_status(node_id, NodeStatus::Ready, None, None).await?;
783 self.persist_event(graph_id, &LatticeEvent::NodeReady {
784 node_id: node_id.clone(),
785 spec: node.spec,
786 }).await?;
787 self.notify_graph(graph_id);
788 Ok(())
789 }
790
791 pub async fn reemit_ready_nodes(&self, graph_id: &GraphId) -> Result<(), String> {
798 let nodes = self.get_nodes(graph_id).await?;
799 for node in nodes {
800 if node.status == NodeStatus::Ready {
801 self.persist_event(graph_id, &LatticeEvent::NodeReady {
802 node_id: node.id,
803 spec: node.spec,
804 }).await?;
805 }
806 }
807 self.notify_graph(graph_id);
808 Ok(())
809 }
810
811 pub async fn advance_graph(
823 &self,
824 graph_id: &GraphId,
825 completed_node_id: &NodeId,
826 output: Option<NodeOutput>,
827 error: Option<String>,
828 ) -> Result<(), String> {
829 let mut queue: Vec<(NodeId, Option<NodeOutput>, Option<String>)> =
831 vec![(completed_node_id.clone(), output, error)];
832
833 while let Some((nid, out, err)) = queue.pop() {
834 let node = match self.get_node(&nid).await {
836 Ok(n) => n,
837 Err(_) => continue,
838 };
839 if node.status == NodeStatus::Complete || node.status == NodeStatus::Failed {
840 continue;
841 }
842
843 let failed = err.is_some();
844
845 let tokens: Vec<Token>;
847 if failed {
848 let err_msg = err.unwrap_or_default();
849 self.set_node_status(&nid, NodeStatus::Failed, None, Some(&err_msg)).await?;
850 self.persist_event(graph_id, &LatticeEvent::NodeFailed {
851 node_id: nid.clone(),
852 error: err_msg.clone(),
853 }).await?;
854 tokens = vec![Token::error(err_msg)];
855 } else {
856 self.set_node_status(&nid, NodeStatus::Complete, out.as_ref(), None).await?;
857 self.persist_event(graph_id, &LatticeEvent::NodeDone {
858 node_id: nid.clone(),
859 output: out.clone(),
860 }).await?;
861 tokens = out.as_ref()
862 .map(|o| o.tokens().into_iter().cloned().collect())
863 .unwrap_or_else(|| vec![Token::ok()]);
864 }
865
866 let outbound = self.get_outbound_edges_with_conditions(&nid).await?;
868 let mut any_delivered = false;
869
870 for token in &tokens {
871 for (edge_id, to_node_id, condition) in &outbound {
872 let matches = condition.as_ref()
873 .map(|c| c.matches(&token.color))
874 .unwrap_or(true);
875
876 if matches {
877 let seq = self.count_tokens_on_edge(edge_id).await? + 1;
878 self.deliver_token(edge_id, graph_id, token, seq).await?;
879 any_delivered = true;
880
881 if let Some(gather_output) = self.check_and_ready(graph_id, to_node_id).await? {
883 queue.push((to_node_id.clone(), Some(gather_output), None));
884 }
885 }
886 }
887 }
888
889 if failed && !any_delivered {
891 let err_str = tokens.first()
892 .and_then(|t| match &t.payload {
893 Some(TokenPayload::Data { value }) => {
894 value.get("message").and_then(|v| v.as_str()).map(|s| s.to_string())
895 }
896 _ => None,
897 })
898 .unwrap_or_default();
899 self.update_graph_status(graph_id, GraphStatus::Failed).await?;
900 self.persist_event(graph_id, &LatticeEvent::GraphFailed {
901 graph_id: graph_id.clone(),
902 node_id: nid.clone(),
903 error: err_str,
904 }).await?;
905 self.notify_graph(graph_id);
906 return Ok(());
907 }
908 }
909
910 let all_done = self.get_nodes(graph_id).await?
912 .iter()
913 .all(|n| n.status == NodeStatus::Complete);
914 if all_done {
915 self.update_graph_status(graph_id, GraphStatus::Complete).await?;
916 self.persist_event(graph_id, &LatticeEvent::GraphDone {
917 graph_id: graph_id.clone(),
918 }).await?;
919 }
920
921 self.notify_graph(graph_id);
922 Ok(())
923 }
924
925 async fn check_and_ready(
929 &self,
930 graph_id: &GraphId,
931 node_id: &NodeId,
932 ) -> Result<Option<NodeOutput>, String> {
933 let node = self.get_node(node_id).await?;
934
935 if node.status != NodeStatus::Pending && node.status != NodeStatus::Complete {
937 return Ok(None);
938 }
939
940 let (delivered, total) = self.count_edges_with_tokens(node_id).await?;
941 let enabled = match &node.join_type {
942 JoinType::All => delivered == total && total > 0,
943 JoinType::Any => delivered >= 1,
944 };
945
946 if !enabled {
947 return Ok(None);
948 }
949
950 match &node.spec {
951 NodeSpec::Gather { strategy } => {
952 let tokens = self.get_node_inputs(node_id).await?;
953 let output = match strategy {
954 GatherStrategy::All => NodeOutput::Many { tokens },
955 GatherStrategy::First { n } => {
956 let take = (*n).min(tokens.len());
957 NodeOutput::Many { tokens: tokens[..take].to_vec() }
958 }
959 };
960 self.set_node_status(node_id, NodeStatus::Running, None, None).await?;
961 Ok(Some(output))
962 }
963 _ => {
964 self.set_node_status(node_id, NodeStatus::Ready, None, None).await?;
966 self.persist_event(graph_id, &LatticeEvent::NodeReady {
967 node_id: node_id.clone(),
968 spec: node.spec.clone(),
969 }).await?;
970 Ok(None)
971 }
972 }
973 }
974
975 async fn get_graph_status(&self, graph_id: &str) -> Result<GraphStatus, String> {
976 let row = sqlx::query("SELECT status FROM lattice_graphs WHERE id = ?")
977 .bind(graph_id)
978 .fetch_one(&self.pool)
979 .await
980 .map_err(|e| e.to_string())?;
981 let s: String = row.try_get("status").map_err(|e| e.to_string())?;
982 s.parse::<GraphStatus>()
983 }
984
985 pub fn execute_stream(
992 storage: Arc<LatticeStorage>,
993 graph_id: GraphId,
994 after_seq: Option<u64>,
995 ) -> impl Stream<Item = LatticeEventEnvelope> + Send + 'static {
996 stream! {
997 let graph = match storage.get_graph(&graph_id).await {
998 Ok(g) => g,
999 Err(e) => {
1000 yield LatticeEventEnvelope {
1001 seq: 0,
1002 event: LatticeEvent::GraphFailed {
1003 graph_id: graph_id.clone(),
1004 node_id: "".to_string(),
1005 error: format!("Graph not found: {}", e),
1006 },
1007 };
1008 return;
1009 }
1010 };
1011
1012 if after_seq.is_none() && graph.status == GraphStatus::Pending {
1013 if let Err(e) = storage.start_graph(&graph_id).await {
1014 yield LatticeEventEnvelope {
1015 seq: 0,
1016 event: LatticeEvent::GraphFailed {
1017 graph_id: graph_id.clone(),
1018 node_id: "".to_string(),
1019 error: format!("Failed to start graph: {}", e),
1020 },
1021 };
1022 return;
1023 }
1024 }
1025
1026 let notifier = storage.get_or_create_notifier(&graph_id);
1027 let mut cursor = after_seq.unwrap_or(0);
1028
1029 loop {
1030 let events = match storage.get_events_after(&graph_id, cursor).await {
1031 Ok(evs) => evs,
1032 Err(e) => {
1033 yield LatticeEventEnvelope {
1034 seq: cursor,
1035 event: LatticeEvent::GraphFailed {
1036 graph_id: graph_id.clone(),
1037 node_id: "".to_string(),
1038 error: format!("Event read error: {}", e),
1039 },
1040 };
1041 return;
1042 }
1043 };
1044
1045 for (seq, event) in events {
1046 let done = matches!(
1047 event,
1048 LatticeEvent::GraphDone { .. } | LatticeEvent::GraphFailed { .. }
1049 );
1050 cursor = seq;
1051 yield LatticeEventEnvelope { seq, event };
1052 if done { return; }
1053 }
1054
1055 tokio::select! {
1056 _ = notifier.notified() => continue,
1057 _ = tokio::time::sleep(Duration::from_secs(3600)) => {
1058 let event = LatticeEvent::GraphFailed {
1059 graph_id: graph_id.clone(),
1060 node_id: "timeout".to_string(),
1061 error: "Execution timed out".to_string(),
1062 };
1063 let seq = storage.persist_event(&graph_id, &event).await.unwrap_or(cursor + 1);
1064 yield LatticeEventEnvelope { seq, event };
1065 return;
1066 }
1067 }
1068 }
1069 }
1070 }
1071
1072 fn row_to_graph(
1075 &self,
1076 row: sqlx::sqlite::SqliteRow,
1077 node_count: usize,
1078 edge_count: usize,
1079 ) -> Result<LatticeGraph, String> {
1080 let metadata_json: String = row.get("metadata");
1081 let status_str: String = row.get("status");
1082
1083 Ok(LatticeGraph {
1084 id: row.get("id"),
1085 metadata: serde_json::from_str(&metadata_json).unwrap_or(serde_json::json!({})),
1086 status: GraphStatus::from_str(&status_str).unwrap_or(GraphStatus::Pending),
1087 created_at: row.get("created_at"),
1088 node_count,
1089 edge_count,
1090 parent_graph_id: row.try_get::<Option<String>, _>("parent_graph_id").unwrap_or(None),
1091 })
1092 }
1093
1094 fn row_to_node(&self, row: sqlx::sqlite::SqliteRow) -> Result<LatticeNode, String> {
1095 let spec_json: String = row.get("spec");
1096 let status_str: String = row.get("status");
1097 let output_json: Option<String> = row.get("output");
1098
1099 let spec: NodeSpec = serde_json::from_str(&spec_json)
1100 .map_err(|e| format!("Failed to deserialize spec: {}", e))?;
1101 let status = NodeStatus::from_str(&status_str).unwrap_or(NodeStatus::Pending);
1102 let output = output_json
1103 .as_deref()
1104 .map(|s| serde_json::from_str::<NodeOutput>(s))
1105 .transpose()
1106 .map_err(|e| format!("Failed to deserialize output: {}", e))?;
1107
1108 let join_type_str: Option<String> = row.try_get("join_type").ok().flatten();
1110 let join_type = match join_type_str.as_deref().unwrap_or("all") {
1111 "any" => JoinType::Any,
1112 _ => JoinType::All,
1113 };
1114
1115 Ok(LatticeNode {
1116 id: row.get("id"),
1117 graph_id: row.get("graph_id"),
1118 spec,
1119 status,
1120 join_type,
1121 output,
1122 error: row.get("error"),
1123 created_at: row.get("created_at"),
1124 completed_at: row.get("completed_at"),
1125 })
1126 }
1127}
1128
1129fn current_timestamp() -> i64 {
1130 SystemTime::now()
1131 .duration_since(UNIX_EPOCH)
1132 .unwrap()
1133 .as_secs() as i64
1134}