Skip to main content

agentic_connect/engine/
store.rs

1//! ConnectionStore — SQLite-backed storage for connections, profiles, and credentials.
2
3use chrono::Utc;
4use rusqlite::{params, Connection as SqlConn};
5use std::path::Path;
6
7use crate::types::{
8    Connection, ConnectionId, ConnectionProfile, ConnectError, ConnectResult,
9    HealthCheck, HealthStatus, Protocol, StoredCredential,
10};
11
12/// Persistent storage for all connection data.
13pub struct ConnectionStore {
14    db: SqlConn,
15}
16
17impl ConnectionStore {
18    /// Open or create a store at the given path.
19    pub fn open(path: &Path) -> ConnectResult<Self> {
20        let db = SqlConn::open(path)?;
21        let store = Self { db };
22        store.init_tables()?;
23        Ok(store)
24    }
25
26    /// Create an in-memory store (for testing).
27    pub fn open_memory() -> ConnectResult<Self> {
28        let db = SqlConn::open_in_memory()?;
29        let store = Self { db };
30        store.init_tables()?;
31        Ok(store)
32    }
33
34    fn init_tables(&self) -> ConnectResult<()> {
35        self.db.execute_batch(
36            "CREATE TABLE IF NOT EXISTS connections (
37                id TEXT PRIMARY KEY,
38                name TEXT NOT NULL,
39                protocol TEXT NOT NULL,
40                host TEXT NOT NULL,
41                port INTEGER,
42                path TEXT,
43                auth_json TEXT,
44                tags_json TEXT NOT NULL DEFAULT '[]',
45                created_at TEXT NOT NULL,
46                last_used TEXT,
47                metadata_json TEXT NOT NULL DEFAULT 'null'
48            );
49            CREATE TABLE IF NOT EXISTS profiles (
50                connection_id TEXT PRIMARY KEY,
51                profile_json TEXT NOT NULL,
52                FOREIGN KEY (connection_id) REFERENCES connections(id)
53            );
54            CREATE TABLE IF NOT EXISTS credentials (
55                id TEXT PRIMARY KEY,
56                name TEXT NOT NULL,
57                auth_json TEXT NOT NULL,
58                created_at TEXT NOT NULL,
59                last_rotated TEXT,
60                tags_json TEXT NOT NULL DEFAULT '[]'
61            );
62            CREATE TABLE IF NOT EXISTS health_checks (
63                id INTEGER PRIMARY KEY AUTOINCREMENT,
64                connection_id TEXT NOT NULL,
65                status TEXT NOT NULL,
66                latency_ms REAL,
67                message TEXT,
68                checked_at TEXT NOT NULL,
69                FOREIGN KEY (connection_id) REFERENCES connections(id)
70            );
71            CREATE INDEX IF NOT EXISTS idx_health_conn ON health_checks(connection_id);
72            CREATE INDEX IF NOT EXISTS idx_health_time ON health_checks(checked_at);",
73        )?;
74        Ok(())
75    }
76
77    /// Save a connection.
78    pub fn save_connection(&self, conn: &Connection) -> ConnectResult<()> {
79        let auth_json = conn.auth.as_ref().map(|a| serde_json::to_string(a).unwrap_or_default());
80        let tags_json = serde_json::to_string(&conn.tags).unwrap_or_else(|_| "[]".into());
81        let meta_json = serde_json::to_string(&conn.metadata).unwrap_or_else(|_| "null".into());
82
83        self.db.execute(
84            "INSERT OR REPLACE INTO connections
85             (id, name, protocol, host, port, path, auth_json, tags_json, created_at, last_used, metadata_json)
86             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
87            params![
88                conn.id.to_string(),
89                conn.name,
90                serde_json::to_string(&conn.protocol).unwrap_or_default(),
91                conn.host,
92                conn.port,
93                conn.path,
94                auth_json,
95                tags_json,
96                conn.created_at.to_rfc3339(),
97                conn.last_used.map(|t| t.to_rfc3339()),
98                meta_json,
99            ],
100        )?;
101        Ok(())
102    }
103
104    /// Get a connection by ID.
105    pub fn get_connection(&self, id: &ConnectionId) -> ConnectResult<Option<Connection>> {
106        let mut stmt = self.db.prepare(
107            "SELECT id, name, protocol, host, port, path, auth_json, tags_json,
108                    created_at, last_used, metadata_json
109             FROM connections WHERE id = ?1",
110        )?;
111
112        let result = stmt.query_row(params![id.to_string()], |row| {
113            Ok(self.row_to_connection(row))
114        });
115
116        match result {
117            Ok(conn) => Ok(Some(conn?)),
118            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
119            Err(e) => Err(ConnectError::Database(e.to_string())),
120        }
121    }
122
123    /// List all connections, optionally filtered by tag.
124    pub fn list_connections(&self, tag: Option<&str>) -> ConnectResult<Vec<Connection>> {
125        let mut stmt = self.db.prepare(
126            "SELECT id, name, protocol, host, port, path, auth_json, tags_json,
127                    created_at, last_used, metadata_json
128             FROM connections ORDER BY name",
129        )?;
130
131        let rows = stmt.query_map([], |row| Ok(self.row_to_connection(row)))?;
132        let mut connections = Vec::new();
133        for row in rows {
134            let conn = row.map_err(|e| ConnectError::Database(e.to_string()))??;
135            if let Some(t) = tag {
136                if conn.tags.iter().any(|ct| ct == t) {
137                    connections.push(conn);
138                }
139            } else {
140                connections.push(conn);
141            }
142        }
143        Ok(connections)
144    }
145
146    /// Delete a connection by ID.
147    pub fn delete_connection(&self, id: &ConnectionId) -> ConnectResult<bool> {
148        let count = self.db.execute(
149            "DELETE FROM connections WHERE id = ?1",
150            params![id.to_string()],
151        )?;
152        Ok(count > 0)
153    }
154
155    /// Save a connection profile (soul).
156    pub fn save_profile(&self, profile: &ConnectionProfile) -> ConnectResult<()> {
157        let json = serde_json::to_string(profile)
158            .map_err(|e| ConnectError::Serialization(e.to_string()))?;
159        self.db.execute(
160            "INSERT OR REPLACE INTO profiles (connection_id, profile_json) VALUES (?1, ?2)",
161            params![profile.connection_id.to_string(), json],
162        )?;
163        Ok(())
164    }
165
166    /// Get a connection profile by connection ID.
167    pub fn get_profile(&self, conn_id: &ConnectionId) -> ConnectResult<Option<ConnectionProfile>> {
168        let mut stmt = self.db.prepare(
169            "SELECT profile_json FROM profiles WHERE connection_id = ?1",
170        )?;
171        match stmt.query_row(params![conn_id.to_string()], |row| {
172            let json: String = row.get(0)?;
173            Ok(json)
174        }) {
175            Ok(json) => {
176                let profile: ConnectionProfile = serde_json::from_str(&json)?;
177                Ok(Some(profile))
178            }
179            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
180            Err(e) => Err(ConnectError::Database(e.to_string())),
181        }
182    }
183
184    /// Save a health check result.
185    pub fn save_health_check(&self, check: &HealthCheck) -> ConnectResult<()> {
186        let status = serde_json::to_string(&check.status).unwrap_or_default();
187        self.db.execute(
188            "INSERT INTO health_checks (connection_id, status, latency_ms, message, checked_at)
189             VALUES (?1, ?2, ?3, ?4, ?5)",
190            params![
191                check.connection_id.to_string(),
192                status,
193                check.latency_ms,
194                check.message,
195                check.checked_at.to_rfc3339(),
196            ],
197        )?;
198        Ok(())
199    }
200
201    /// Get recent health checks for a connection.
202    pub fn get_health_history(
203        &self,
204        conn_id: &ConnectionId,
205        limit: usize,
206    ) -> ConnectResult<Vec<HealthCheck>> {
207        let mut stmt = self.db.prepare(
208            "SELECT connection_id, status, latency_ms, message, checked_at
209             FROM health_checks WHERE connection_id = ?1
210             ORDER BY checked_at DESC LIMIT ?2",
211        )?;
212        let rows = stmt.query_map(params![conn_id.to_string(), limit as i64], |row| {
213            let status_str: String = row.get(1)?;
214            let status: HealthStatus =
215                serde_json::from_str(&status_str).unwrap_or(HealthStatus::Unknown);
216            let checked_str: String = row.get(4)?;
217            Ok(HealthCheck {
218                connection_id: *conn_id,
219                protocol: Protocol::Http, // stored separately if needed
220                host: String::new(),
221                port: 0,
222                status,
223                latency_ms: row.get(2)?,
224                message: row.get(3)?,
225                checked_at: chrono::DateTime::parse_from_rfc3339(&checked_str)
226                    .map(|dt| dt.with_timezone(&Utc))
227                    .unwrap_or_else(|_| Utc::now()),
228            })
229        })?;
230        let mut checks = Vec::new();
231        for row in rows {
232            checks.push(row.map_err(|e| ConnectError::Database(e.to_string()))?);
233        }
234        Ok(checks)
235    }
236
237    /// Stats summary.
238    pub fn stats(&self) -> ConnectResult<StoreStats> {
239        let conn_count: i64 = self.db.query_row(
240            "SELECT COUNT(*) FROM connections", [], |row| row.get(0),
241        )?;
242        let profile_count: i64 = self.db.query_row(
243            "SELECT COUNT(*) FROM profiles", [], |row| row.get(0),
244        )?;
245        let health_count: i64 = self.db.query_row(
246            "SELECT COUNT(*) FROM health_checks", [], |row| row.get(0),
247        )?;
248        Ok(StoreStats {
249            connection_count: conn_count as usize,
250            profile_count: profile_count as usize,
251            health_check_count: health_count as usize,
252        })
253    }
254
255    fn row_to_connection(&self, row: &rusqlite::Row) -> ConnectResult<Connection> {
256        let id_str: String = row.get(0).map_err(|e| ConnectError::Database(e.to_string()))?;
257        let protocol_str: String = row.get(2).map_err(|e| ConnectError::Database(e.to_string()))?;
258        let auth_json: Option<String> = row.get(6).map_err(|e| ConnectError::Database(e.to_string()))?;
259        let tags_json: String = row.get(7).map_err(|e| ConnectError::Database(e.to_string()))?;
260        let created_str: String = row.get(8).map_err(|e| ConnectError::Database(e.to_string()))?;
261        let last_used_str: Option<String> = row.get(9).map_err(|e| ConnectError::Database(e.to_string()))?;
262        let meta_json: String = row.get(10).map_err(|e| ConnectError::Database(e.to_string()))?;
263
264        Ok(Connection {
265            id: uuid::Uuid::parse_str(&id_str)
266                .map_err(|e| ConnectError::Database(e.to_string()))?,
267            name: row.get(1).map_err(|e| ConnectError::Database(e.to_string()))?,
268            protocol: serde_json::from_str(&protocol_str).unwrap_or(Protocol::Http),
269            host: row.get(3).map_err(|e| ConnectError::Database(e.to_string()))?,
270            port: row.get(4).map_err(|e| ConnectError::Database(e.to_string()))?,
271            path: row.get(5).map_err(|e| ConnectError::Database(e.to_string()))?,
272            auth: auth_json.and_then(|j| serde_json::from_str(&j).ok()),
273            tags: serde_json::from_str(&tags_json).unwrap_or_default(),
274            created_at: chrono::DateTime::parse_from_rfc3339(&created_str)
275                .map(|dt| dt.with_timezone(&Utc))
276                .unwrap_or_else(|_| Utc::now()),
277            last_used: last_used_str.and_then(|s| {
278                chrono::DateTime::parse_from_rfc3339(&s)
279                    .map(|dt| dt.with_timezone(&Utc))
280                    .ok()
281            }),
282            metadata: serde_json::from_str(&meta_json).unwrap_or(serde_json::Value::Null),
283        })
284    }
285}
286
287/// Store statistics.
288#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
289pub struct StoreStats {
290    pub connection_count: usize,
291    pub profile_count: usize,
292    pub health_check_count: usize,
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298
299    #[test]
300    fn test_open_memory() {
301        let store = ConnectionStore::open_memory().unwrap();
302        let stats = store.stats().unwrap();
303        assert_eq!(stats.connection_count, 0);
304    }
305
306    #[test]
307    fn test_save_and_get_connection() {
308        let store = ConnectionStore::open_memory().unwrap();
309        let conn = Connection::from_url("test", "https://api.example.com/v1").unwrap();
310        store.save_connection(&conn).unwrap();
311        let loaded = store.get_connection(&conn.id).unwrap();
312        assert!(loaded.is_some());
313        assert_eq!(loaded.unwrap().name, "test");
314    }
315
316    #[test]
317    fn test_list_connections() {
318        let store = ConnectionStore::open_memory().unwrap();
319        let c1 = Connection::from_url("api", "https://api.example.com").unwrap();
320        let c2 = Connection::from_url("ssh", "ssh://server.example.com").unwrap();
321        store.save_connection(&c1).unwrap();
322        store.save_connection(&c2).unwrap();
323        let all = store.list_connections(None).unwrap();
324        assert_eq!(all.len(), 2);
325    }
326
327    #[test]
328    fn test_delete_connection() {
329        let store = ConnectionStore::open_memory().unwrap();
330        let conn = Connection::from_url("test", "https://example.com").unwrap();
331        let id = conn.id;
332        store.save_connection(&conn).unwrap();
333        assert!(store.delete_connection(&id).unwrap());
334        assert!(store.get_connection(&id).unwrap().is_none());
335    }
336
337    #[test]
338    fn test_profile_roundtrip() {
339        let store = ConnectionStore::open_memory().unwrap();
340        let conn = Connection::from_url("test", "https://example.com").unwrap();
341        store.save_connection(&conn).unwrap();
342        let mut profile = ConnectionProfile::new(conn.id);
343        profile.record_latency(150.0);
344        store.save_profile(&profile).unwrap();
345        let loaded = store.get_profile(&conn.id).unwrap().unwrap();
346        assert_eq!(loaded.baseline.sample_count, 1);
347    }
348}