plexus_substrate/activations/orcha/pm/
storage.rs1use crate::activations::storage::{activation_db_path, init_sqlite_pool};
2use sqlx::{sqlite::SqlitePool, Row};
3use std::collections::HashMap;
4use std::path::PathBuf;
5
6#[derive(Debug, Clone)]
7pub struct PmStorageConfig {
8 pub db_path: PathBuf,
9}
10
11impl Default for PmStorageConfig {
12 fn default() -> Self {
13 Self {
14 db_path: activation_db_path("pm", "pm.db"),
15 }
16 }
17}
18
19pub struct PmStorage {
20 pool: SqlitePool,
21}
22
23impl PmStorage {
24 pub async fn new(config: PmStorageConfig) -> Result<Self, String> {
25 let pool = init_sqlite_pool(config.db_path).await?;
26 let storage = Self { pool };
27 storage.init_schema().await?;
28 Ok(storage)
29 }
30
31 async fn init_schema(&self) -> Result<(), String> {
32 sqlx::query(
33 r#"
34 CREATE TABLE IF NOT EXISTS orcha_ticket_maps (
35 graph_id TEXT NOT NULL,
36 ticket_id TEXT NOT NULL,
37 node_id TEXT NOT NULL,
38 created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
39 PRIMARY KEY (graph_id, ticket_id)
40 )
41 "#,
42 )
43 .execute(&self.pool)
44 .await
45 .map_err(|e| format!("Failed to create orcha_ticket_maps table: {}", e))?;
46
47 sqlx::query(
48 r#"
49 CREATE TABLE IF NOT EXISTS orcha_ticket_sources (
50 graph_id TEXT PRIMARY KEY,
51 source TEXT NOT NULL,
52 created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
53 )
54 "#,
55 )
56 .execute(&self.pool)
57 .await
58 .map_err(|e| format!("Failed to create orcha_ticket_sources table: {}", e))?;
59
60 let _ = sqlx::query(
62 "ALTER TABLE orcha_ticket_maps ADD COLUMN created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))",
63 )
64 .execute(&self.pool)
65 .await;
66
67 sqlx::query(
68 "CREATE INDEX IF NOT EXISTS idx_ticket_maps_graph ON orcha_ticket_maps(graph_id)",
69 )
70 .execute(&self.pool)
71 .await
72 .map_err(|e| format!("Failed to create graph index: {}", e))?;
73
74 sqlx::query(
75 "CREATE INDEX IF NOT EXISTS idx_ticket_maps_node ON orcha_ticket_maps(graph_id, node_id)",
76 )
77 .execute(&self.pool)
78 .await
79 .map_err(|e| format!("Failed to create node index: {}", e))?;
80
81 sqlx::query(
82 r#"
83 CREATE TABLE IF NOT EXISTS orcha_node_logs (
84 id INTEGER PRIMARY KEY AUTOINCREMENT,
85 graph_id TEXT NOT NULL,
86 node_id TEXT NOT NULL,
87 ticket_id TEXT,
88 seq INTEGER NOT NULL,
89 event_type TEXT NOT NULL,
90 event_data TEXT NOT NULL,
91 created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
92 )
93 "#,
94 )
95 .execute(&self.pool)
96 .await
97 .map_err(|e| format!("Failed to create orcha_node_logs table: {}", e))?;
98
99 sqlx::query(
100 "CREATE INDEX IF NOT EXISTS idx_node_logs ON orcha_node_logs(graph_id, node_id, seq)",
101 )
102 .execute(&self.pool)
103 .await
104 .map_err(|e| format!("Failed to create node logs index: {}", e))?;
105
106 Ok(())
107 }
108
109 pub async fn save_ticket_map(
111 &self,
112 graph_id: &str,
113 map: &HashMap<String, String>,
114 ) -> Result<(), String> {
115 for (ticket_id, node_id) in map {
116 sqlx::query(
117 "INSERT OR REPLACE INTO orcha_ticket_maps (graph_id, ticket_id, node_id) VALUES (?, ?, ?)",
118 )
119 .bind(graph_id)
120 .bind(ticket_id)
121 .bind(node_id)
122 .execute(&self.pool)
123 .await
124 .map_err(|e| format!("Failed to save ticket map entry: {}", e))?;
125 }
126 Ok(())
127 }
128
129 pub async fn get_ticket_map(&self, graph_id: &str) -> Result<HashMap<String, String>, String> {
131 let rows = sqlx::query(
132 "SELECT ticket_id, node_id FROM orcha_ticket_maps WHERE graph_id = ?",
133 )
134 .bind(graph_id)
135 .fetch_all(&self.pool)
136 .await
137 .map_err(|e| format!("Failed to fetch ticket map: {}", e))?;
138
139 let mut map = HashMap::new();
140 for row in rows {
141 let ticket_id: String = row.get("ticket_id");
142 let node_id: String = row.get("node_id");
143 map.insert(ticket_id, node_id);
144 }
145 Ok(map)
146 }
147
148 pub async fn list_ticket_maps(&self, limit: usize) -> Result<Vec<(String, i64)>, String> {
151 let rows = sqlx::query(
152 "SELECT graph_id, MIN(created_at) AS created_at \
153 FROM orcha_ticket_maps \
154 GROUP BY graph_id \
155 ORDER BY created_at DESC \
156 LIMIT ?",
157 )
158 .bind(limit as i64)
159 .fetch_all(&self.pool)
160 .await
161 .map_err(|e| format!("Failed to list ticket maps: {}", e))?;
162
163 let result = rows
164 .into_iter()
165 .map(|row| {
166 let graph_id: String = row.get("graph_id");
167 let created_at: i64 = row.get("created_at");
168 (graph_id, created_at)
169 })
170 .collect();
171
172 Ok(result)
173 }
174
175 pub async fn save_ticket_source(&self, graph_id: &str, source: &str) -> Result<(), String> {
177 sqlx::query(
178 "INSERT OR REPLACE INTO orcha_ticket_sources (graph_id, source) VALUES (?, ?)",
179 )
180 .bind(graph_id)
181 .bind(source)
182 .execute(&self.pool)
183 .await
184 .map_err(|e| format!("Failed to save ticket source: {}", e))?;
185 Ok(())
186 }
187
188 pub async fn get_ticket_source(&self, graph_id: &str) -> Result<Option<String>, String> {
190 let row = sqlx::query(
191 "SELECT source FROM orcha_ticket_sources WHERE graph_id = ?",
192 )
193 .bind(graph_id)
194 .fetch_optional(&self.pool)
195 .await
196 .map_err(|e| format!("Failed to fetch ticket source: {}", e))?;
197 Ok(row.map(|r| r.get("source")))
198 }
199
200 pub async fn get_ticket_for_node(
202 &self,
203 graph_id: &str,
204 node_id: &str,
205 ) -> Result<Option<String>, String> {
206 let row = sqlx::query(
207 "SELECT ticket_id FROM orcha_ticket_maps WHERE graph_id = ? AND node_id = ?",
208 )
209 .bind(graph_id)
210 .bind(node_id)
211 .fetch_optional(&self.pool)
212 .await
213 .map_err(|e| format!("Failed to fetch ticket for node: {}", e))?;
214
215 Ok(row.map(|r| r.get("ticket_id")))
216 }
217
218 pub async fn append_node_log(
224 &self,
225 graph_id: &str,
226 node_id: &str,
227 ticket_id: Option<&str>,
228 seq: i64,
229 event_type: &str,
230 event_data: &str,
231 ) -> Result<(), String> {
232 sqlx::query(
233 "INSERT INTO orcha_node_logs (graph_id, node_id, ticket_id, seq, event_type, event_data) \
234 VALUES (?, ?, ?, ?, ?, ?)",
235 )
236 .bind(graph_id)
237 .bind(node_id)
238 .bind(ticket_id)
239 .bind(seq)
240 .bind(event_type)
241 .bind(event_data)
242 .execute(&self.pool)
243 .await
244 .map_err(|e| format!("Failed to append node log: {}", e))?;
245 Ok(())
246 }
247
248 pub async fn get_node_log(
250 &self,
251 graph_id: &str,
252 node_id: &str,
253 ) -> Result<Vec<NodeLogEntry>, String> {
254 let rows = sqlx::query(
255 "SELECT seq, event_type, event_data, created_at \
256 FROM orcha_node_logs \
257 WHERE graph_id = ? AND node_id = ? \
258 ORDER BY seq ASC",
259 )
260 .bind(graph_id)
261 .bind(node_id)
262 .fetch_all(&self.pool)
263 .await
264 .map_err(|e| format!("Failed to fetch node log: {}", e))?;
265
266 let entries = rows
267 .into_iter()
268 .map(|row| NodeLogEntry {
269 seq: row.get("seq"),
270 event_type: row.get("event_type"),
271 event_data: row.get("event_data"),
272 created_at: row.get("created_at"),
273 })
274 .collect();
275 Ok(entries)
276 }
277}
278
279#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
281pub struct NodeLogEntry {
282 pub seq: i64,
283 pub event_type: String,
284 pub event_data: String,
286 pub created_at: i64,
287}