Skip to main content

plexus_substrate/activations/orcha/pm/
storage.rs

1use crate::activations::storage::{activation_db_path, init_sqlite_pool};
2use sqlx::{sqlite::SqlitePool, Row};
3use std::collections::HashMap;
4use std::path::PathBuf;
5
6#[derive(Debug, Clone)]
7pub struct PmStorageConfig {
8    pub db_path: PathBuf,
9}
10
11impl Default for PmStorageConfig {
12    fn default() -> Self {
13        Self {
14            db_path: activation_db_path("pm", "pm.db"),
15        }
16    }
17}
18
19pub struct PmStorage {
20    pool: SqlitePool,
21}
22
23impl PmStorage {
24    pub async fn new(config: PmStorageConfig) -> Result<Self, String> {
25        let pool = init_sqlite_pool(config.db_path).await?;
26        let storage = Self { pool };
27        storage.init_schema().await?;
28        Ok(storage)
29    }
30
31    async fn init_schema(&self) -> Result<(), String> {
32        sqlx::query(
33            r#"
34            CREATE TABLE IF NOT EXISTS orcha_ticket_maps (
35                graph_id   TEXT NOT NULL,
36                ticket_id  TEXT NOT NULL,
37                node_id    TEXT NOT NULL,
38                created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
39                PRIMARY KEY (graph_id, ticket_id)
40            )
41            "#,
42        )
43        .execute(&self.pool)
44        .await
45        .map_err(|e| format!("Failed to create orcha_ticket_maps table: {}", e))?;
46
47        sqlx::query(
48            r#"
49            CREATE TABLE IF NOT EXISTS orcha_ticket_sources (
50                graph_id   TEXT PRIMARY KEY,
51                source     TEXT NOT NULL,
52                created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
53            )
54            "#,
55        )
56        .execute(&self.pool)
57        .await
58        .map_err(|e| format!("Failed to create orcha_ticket_sources table: {}", e))?;
59
60        // Migrate: add created_at column if it doesn't exist yet (idempotent).
61        let _ = sqlx::query(
62            "ALTER TABLE orcha_ticket_maps ADD COLUMN created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))",
63        )
64        .execute(&self.pool)
65        .await;
66
67        sqlx::query(
68            "CREATE INDEX IF NOT EXISTS idx_ticket_maps_graph ON orcha_ticket_maps(graph_id)",
69        )
70        .execute(&self.pool)
71        .await
72        .map_err(|e| format!("Failed to create graph index: {}", e))?;
73
74        sqlx::query(
75            "CREATE INDEX IF NOT EXISTS idx_ticket_maps_node ON orcha_ticket_maps(graph_id, node_id)",
76        )
77        .execute(&self.pool)
78        .await
79        .map_err(|e| format!("Failed to create node index: {}", e))?;
80
81        sqlx::query(
82            r#"
83            CREATE TABLE IF NOT EXISTS orcha_node_logs (
84                id         INTEGER PRIMARY KEY AUTOINCREMENT,
85                graph_id   TEXT NOT NULL,
86                node_id    TEXT NOT NULL,
87                ticket_id  TEXT,
88                seq        INTEGER NOT NULL,
89                event_type TEXT NOT NULL,
90                event_data TEXT NOT NULL,
91                created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
92            )
93            "#,
94        )
95        .execute(&self.pool)
96        .await
97        .map_err(|e| format!("Failed to create orcha_node_logs table: {}", e))?;
98
99        sqlx::query(
100            "CREATE INDEX IF NOT EXISTS idx_node_logs ON orcha_node_logs(graph_id, node_id, seq)",
101        )
102        .execute(&self.pool)
103        .await
104        .map_err(|e| format!("Failed to create node logs index: {}", e))?;
105
106        Ok(())
107    }
108
109    /// Insert or replace all ticket→node mappings for a graph.
110    pub async fn save_ticket_map(
111        &self,
112        graph_id: &str,
113        map: &HashMap<String, String>,
114    ) -> Result<(), String> {
115        for (ticket_id, node_id) in map {
116            sqlx::query(
117                "INSERT OR REPLACE INTO orcha_ticket_maps (graph_id, ticket_id, node_id) VALUES (?, ?, ?)",
118            )
119            .bind(graph_id)
120            .bind(ticket_id)
121            .bind(node_id)
122            .execute(&self.pool)
123            .await
124            .map_err(|e| format!("Failed to save ticket map entry: {}", e))?;
125        }
126        Ok(())
127    }
128
129    /// Fetch the ticket_id→node_id map for a graph.
130    pub async fn get_ticket_map(&self, graph_id: &str) -> Result<HashMap<String, String>, String> {
131        let rows = sqlx::query(
132            "SELECT ticket_id, node_id FROM orcha_ticket_maps WHERE graph_id = ?",
133        )
134        .bind(graph_id)
135        .fetch_all(&self.pool)
136        .await
137        .map_err(|e| format!("Failed to fetch ticket map: {}", e))?;
138
139        let mut map = HashMap::new();
140        for row in rows {
141            let ticket_id: String = row.get("ticket_id");
142            let node_id: String = row.get("node_id");
143            map.insert(ticket_id, node_id);
144        }
145        Ok(map)
146    }
147
148    /// List all known graph IDs ordered by first insertion time descending.
149    /// Returns `Vec<(graph_id, created_at)>`.
150    pub async fn list_ticket_maps(&self, limit: usize) -> Result<Vec<(String, i64)>, String> {
151        let rows = sqlx::query(
152            "SELECT graph_id, MIN(created_at) AS created_at \
153             FROM orcha_ticket_maps \
154             GROUP BY graph_id \
155             ORDER BY created_at DESC \
156             LIMIT ?",
157        )
158        .bind(limit as i64)
159        .fetch_all(&self.pool)
160        .await
161        .map_err(|e| format!("Failed to list ticket maps: {}", e))?;
162
163        let result = rows
164            .into_iter()
165            .map(|row| {
166                let graph_id: String = row.get("graph_id");
167                let created_at: i64 = row.get("created_at");
168                (graph_id, created_at)
169            })
170            .collect();
171
172        Ok(result)
173    }
174
175    /// Save the raw ticket source for a graph.
176    pub async fn save_ticket_source(&self, graph_id: &str, source: &str) -> Result<(), String> {
177        sqlx::query(
178            "INSERT OR REPLACE INTO orcha_ticket_sources (graph_id, source) VALUES (?, ?)",
179        )
180        .bind(graph_id)
181        .bind(source)
182        .execute(&self.pool)
183        .await
184        .map_err(|e| format!("Failed to save ticket source: {}", e))?;
185        Ok(())
186    }
187
188    /// Fetch the raw ticket source for a graph.
189    pub async fn get_ticket_source(&self, graph_id: &str) -> Result<Option<String>, String> {
190        let row = sqlx::query(
191            "SELECT source FROM orcha_ticket_sources WHERE graph_id = ?",
192        )
193        .bind(graph_id)
194        .fetch_optional(&self.pool)
195        .await
196        .map_err(|e| format!("Failed to fetch ticket source: {}", e))?;
197        Ok(row.map(|r| r.get("source")))
198    }
199
200    /// Reverse lookup: node_id → ticket_id.
201    pub async fn get_ticket_for_node(
202        &self,
203        graph_id: &str,
204        node_id: &str,
205    ) -> Result<Option<String>, String> {
206        let row = sqlx::query(
207            "SELECT ticket_id FROM orcha_ticket_maps WHERE graph_id = ? AND node_id = ?",
208        )
209        .bind(graph_id)
210        .bind(node_id)
211        .fetch_optional(&self.pool)
212        .await
213        .map_err(|e| format!("Failed to fetch ticket for node: {}", e))?;
214
215        Ok(row.map(|r| r.get("ticket_id")))
216    }
217
218    /// Append a single log entry for a node execution event.
219    ///
220    /// `event_type` is one of: "prompt", "start", "tool_use", "tool_result",
221    /// "complete", "error", "passthrough", "outcome".
222    /// `event_data` is a JSON string.
223    pub async fn append_node_log(
224        &self,
225        graph_id: &str,
226        node_id: &str,
227        ticket_id: Option<&str>,
228        seq: i64,
229        event_type: &str,
230        event_data: &str,
231    ) -> Result<(), String> {
232        sqlx::query(
233            "INSERT INTO orcha_node_logs (graph_id, node_id, ticket_id, seq, event_type, event_data) \
234             VALUES (?, ?, ?, ?, ?, ?)",
235        )
236        .bind(graph_id)
237        .bind(node_id)
238        .bind(ticket_id)
239        .bind(seq)
240        .bind(event_type)
241        .bind(event_data)
242        .execute(&self.pool)
243        .await
244        .map_err(|e| format!("Failed to append node log: {}", e))?;
245        Ok(())
246    }
247
248    /// Fetch all log entries for a (graph_id, node_id) pair, ordered by seq.
249    pub async fn get_node_log(
250        &self,
251        graph_id: &str,
252        node_id: &str,
253    ) -> Result<Vec<NodeLogEntry>, String> {
254        let rows = sqlx::query(
255            "SELECT seq, event_type, event_data, created_at \
256             FROM orcha_node_logs \
257             WHERE graph_id = ? AND node_id = ? \
258             ORDER BY seq ASC",
259        )
260        .bind(graph_id)
261        .bind(node_id)
262        .fetch_all(&self.pool)
263        .await
264        .map_err(|e| format!("Failed to fetch node log: {}", e))?;
265
266        let entries = rows
267            .into_iter()
268            .map(|row| NodeLogEntry {
269                seq: row.get("seq"),
270                event_type: row.get("event_type"),
271                event_data: row.get("event_data"),
272                created_at: row.get("created_at"),
273            })
274            .collect();
275        Ok(entries)
276    }
277}
278
279/// A single entry in the node execution log.
280#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
281pub struct NodeLogEntry {
282    pub seq: i64,
283    pub event_type: String,
284    /// Raw JSON string for the event payload.
285    pub event_data: String,
286    pub created_at: i64,
287}