1use chrono::Utc;
4use rusqlite::{params, Connection as SqlConn};
5use std::path::Path;
6
7use crate::types::{
8 Connection, ConnectionId, ConnectionProfile, ConnectError, ConnectResult,
9 HealthCheck, HealthStatus, Protocol, StoredCredential,
10};
11
12pub struct ConnectionStore {
14 db: SqlConn,
15}
16
17impl ConnectionStore {
18 pub fn open(path: &Path) -> ConnectResult<Self> {
20 let db = SqlConn::open(path)?;
21 let store = Self { db };
22 store.init_tables()?;
23 Ok(store)
24 }
25
26 pub fn open_memory() -> ConnectResult<Self> {
28 let db = SqlConn::open_in_memory()?;
29 let store = Self { db };
30 store.init_tables()?;
31 Ok(store)
32 }
33
34 fn init_tables(&self) -> ConnectResult<()> {
35 self.db.execute_batch(
36 "CREATE TABLE IF NOT EXISTS connections (
37 id TEXT PRIMARY KEY,
38 name TEXT NOT NULL,
39 protocol TEXT NOT NULL,
40 host TEXT NOT NULL,
41 port INTEGER,
42 path TEXT,
43 auth_json TEXT,
44 tags_json TEXT NOT NULL DEFAULT '[]',
45 created_at TEXT NOT NULL,
46 last_used TEXT,
47 metadata_json TEXT NOT NULL DEFAULT 'null'
48 );
49 CREATE TABLE IF NOT EXISTS profiles (
50 connection_id TEXT PRIMARY KEY,
51 profile_json TEXT NOT NULL,
52 FOREIGN KEY (connection_id) REFERENCES connections(id)
53 );
54 CREATE TABLE IF NOT EXISTS credentials (
55 id TEXT PRIMARY KEY,
56 name TEXT NOT NULL,
57 auth_json TEXT NOT NULL,
58 created_at TEXT NOT NULL,
59 last_rotated TEXT,
60 tags_json TEXT NOT NULL DEFAULT '[]'
61 );
62 CREATE TABLE IF NOT EXISTS health_checks (
63 id INTEGER PRIMARY KEY AUTOINCREMENT,
64 connection_id TEXT NOT NULL,
65 status TEXT NOT NULL,
66 latency_ms REAL,
67 message TEXT,
68 checked_at TEXT NOT NULL,
69 FOREIGN KEY (connection_id) REFERENCES connections(id)
70 );
71 CREATE INDEX IF NOT EXISTS idx_health_conn ON health_checks(connection_id);
72 CREATE INDEX IF NOT EXISTS idx_health_time ON health_checks(checked_at);",
73 )?;
74 Ok(())
75 }
76
77 pub fn save_connection(&self, conn: &Connection) -> ConnectResult<()> {
79 let auth_json = conn.auth.as_ref().map(|a| serde_json::to_string(a).unwrap_or_default());
80 let tags_json = serde_json::to_string(&conn.tags).unwrap_or_else(|_| "[]".into());
81 let meta_json = serde_json::to_string(&conn.metadata).unwrap_or_else(|_| "null".into());
82
83 self.db.execute(
84 "INSERT OR REPLACE INTO connections
85 (id, name, protocol, host, port, path, auth_json, tags_json, created_at, last_used, metadata_json)
86 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
87 params![
88 conn.id.to_string(),
89 conn.name,
90 serde_json::to_string(&conn.protocol).unwrap_or_default(),
91 conn.host,
92 conn.port,
93 conn.path,
94 auth_json,
95 tags_json,
96 conn.created_at.to_rfc3339(),
97 conn.last_used.map(|t| t.to_rfc3339()),
98 meta_json,
99 ],
100 )?;
101 Ok(())
102 }
103
104 pub fn get_connection(&self, id: &ConnectionId) -> ConnectResult<Option<Connection>> {
106 let mut stmt = self.db.prepare(
107 "SELECT id, name, protocol, host, port, path, auth_json, tags_json,
108 created_at, last_used, metadata_json
109 FROM connections WHERE id = ?1",
110 )?;
111
112 let result = stmt.query_row(params![id.to_string()], |row| {
113 Ok(self.row_to_connection(row))
114 });
115
116 match result {
117 Ok(conn) => Ok(Some(conn?)),
118 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
119 Err(e) => Err(ConnectError::Database(e.to_string())),
120 }
121 }
122
123 pub fn list_connections(&self, tag: Option<&str>) -> ConnectResult<Vec<Connection>> {
125 let mut stmt = self.db.prepare(
126 "SELECT id, name, protocol, host, port, path, auth_json, tags_json,
127 created_at, last_used, metadata_json
128 FROM connections ORDER BY name",
129 )?;
130
131 let rows = stmt.query_map([], |row| Ok(self.row_to_connection(row)))?;
132 let mut connections = Vec::new();
133 for row in rows {
134 let conn = row.map_err(|e| ConnectError::Database(e.to_string()))??;
135 if let Some(t) = tag {
136 if conn.tags.iter().any(|ct| ct == t) {
137 connections.push(conn);
138 }
139 } else {
140 connections.push(conn);
141 }
142 }
143 Ok(connections)
144 }
145
146 pub fn delete_connection(&self, id: &ConnectionId) -> ConnectResult<bool> {
148 let count = self.db.execute(
149 "DELETE FROM connections WHERE id = ?1",
150 params![id.to_string()],
151 )?;
152 Ok(count > 0)
153 }
154
155 pub fn save_profile(&self, profile: &ConnectionProfile) -> ConnectResult<()> {
157 let json = serde_json::to_string(profile)
158 .map_err(|e| ConnectError::Serialization(e.to_string()))?;
159 self.db.execute(
160 "INSERT OR REPLACE INTO profiles (connection_id, profile_json) VALUES (?1, ?2)",
161 params![profile.connection_id.to_string(), json],
162 )?;
163 Ok(())
164 }
165
166 pub fn get_profile(&self, conn_id: &ConnectionId) -> ConnectResult<Option<ConnectionProfile>> {
168 let mut stmt = self.db.prepare(
169 "SELECT profile_json FROM profiles WHERE connection_id = ?1",
170 )?;
171 match stmt.query_row(params![conn_id.to_string()], |row| {
172 let json: String = row.get(0)?;
173 Ok(json)
174 }) {
175 Ok(json) => {
176 let profile: ConnectionProfile = serde_json::from_str(&json)?;
177 Ok(Some(profile))
178 }
179 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
180 Err(e) => Err(ConnectError::Database(e.to_string())),
181 }
182 }
183
184 pub fn save_health_check(&self, check: &HealthCheck) -> ConnectResult<()> {
186 let status = serde_json::to_string(&check.status).unwrap_or_default();
187 self.db.execute(
188 "INSERT INTO health_checks (connection_id, status, latency_ms, message, checked_at)
189 VALUES (?1, ?2, ?3, ?4, ?5)",
190 params![
191 check.connection_id.to_string(),
192 status,
193 check.latency_ms,
194 check.message,
195 check.checked_at.to_rfc3339(),
196 ],
197 )?;
198 Ok(())
199 }
200
201 pub fn get_health_history(
203 &self,
204 conn_id: &ConnectionId,
205 limit: usize,
206 ) -> ConnectResult<Vec<HealthCheck>> {
207 let mut stmt = self.db.prepare(
208 "SELECT connection_id, status, latency_ms, message, checked_at
209 FROM health_checks WHERE connection_id = ?1
210 ORDER BY checked_at DESC LIMIT ?2",
211 )?;
212 let rows = stmt.query_map(params![conn_id.to_string(), limit as i64], |row| {
213 let status_str: String = row.get(1)?;
214 let status: HealthStatus =
215 serde_json::from_str(&status_str).unwrap_or(HealthStatus::Unknown);
216 let checked_str: String = row.get(4)?;
217 Ok(HealthCheck {
218 connection_id: *conn_id,
219 protocol: Protocol::Http, host: String::new(),
221 port: 0,
222 status,
223 latency_ms: row.get(2)?,
224 message: row.get(3)?,
225 checked_at: chrono::DateTime::parse_from_rfc3339(&checked_str)
226 .map(|dt| dt.with_timezone(&Utc))
227 .unwrap_or_else(|_| Utc::now()),
228 })
229 })?;
230 let mut checks = Vec::new();
231 for row in rows {
232 checks.push(row.map_err(|e| ConnectError::Database(e.to_string()))?);
233 }
234 Ok(checks)
235 }
236
237 pub fn stats(&self) -> ConnectResult<StoreStats> {
239 let conn_count: i64 = self.db.query_row(
240 "SELECT COUNT(*) FROM connections", [], |row| row.get(0),
241 )?;
242 let profile_count: i64 = self.db.query_row(
243 "SELECT COUNT(*) FROM profiles", [], |row| row.get(0),
244 )?;
245 let health_count: i64 = self.db.query_row(
246 "SELECT COUNT(*) FROM health_checks", [], |row| row.get(0),
247 )?;
248 Ok(StoreStats {
249 connection_count: conn_count as usize,
250 profile_count: profile_count as usize,
251 health_check_count: health_count as usize,
252 })
253 }
254
255 fn row_to_connection(&self, row: &rusqlite::Row) -> ConnectResult<Connection> {
256 let id_str: String = row.get(0).map_err(|e| ConnectError::Database(e.to_string()))?;
257 let protocol_str: String = row.get(2).map_err(|e| ConnectError::Database(e.to_string()))?;
258 let auth_json: Option<String> = row.get(6).map_err(|e| ConnectError::Database(e.to_string()))?;
259 let tags_json: String = row.get(7).map_err(|e| ConnectError::Database(e.to_string()))?;
260 let created_str: String = row.get(8).map_err(|e| ConnectError::Database(e.to_string()))?;
261 let last_used_str: Option<String> = row.get(9).map_err(|e| ConnectError::Database(e.to_string()))?;
262 let meta_json: String = row.get(10).map_err(|e| ConnectError::Database(e.to_string()))?;
263
264 Ok(Connection {
265 id: uuid::Uuid::parse_str(&id_str)
266 .map_err(|e| ConnectError::Database(e.to_string()))?,
267 name: row.get(1).map_err(|e| ConnectError::Database(e.to_string()))?,
268 protocol: serde_json::from_str(&protocol_str).unwrap_or(Protocol::Http),
269 host: row.get(3).map_err(|e| ConnectError::Database(e.to_string()))?,
270 port: row.get(4).map_err(|e| ConnectError::Database(e.to_string()))?,
271 path: row.get(5).map_err(|e| ConnectError::Database(e.to_string()))?,
272 auth: auth_json.and_then(|j| serde_json::from_str(&j).ok()),
273 tags: serde_json::from_str(&tags_json).unwrap_or_default(),
274 created_at: chrono::DateTime::parse_from_rfc3339(&created_str)
275 .map(|dt| dt.with_timezone(&Utc))
276 .unwrap_or_else(|_| Utc::now()),
277 last_used: last_used_str.and_then(|s| {
278 chrono::DateTime::parse_from_rfc3339(&s)
279 .map(|dt| dt.with_timezone(&Utc))
280 .ok()
281 }),
282 metadata: serde_json::from_str(&meta_json).unwrap_or(serde_json::Value::Null),
283 })
284 }
285}
286
287#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
289pub struct StoreStats {
290 pub connection_count: usize,
291 pub profile_count: usize,
292 pub health_check_count: usize,
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298
299 #[test]
300 fn test_open_memory() {
301 let store = ConnectionStore::open_memory().unwrap();
302 let stats = store.stats().unwrap();
303 assert_eq!(stats.connection_count, 0);
304 }
305
306 #[test]
307 fn test_save_and_get_connection() {
308 let store = ConnectionStore::open_memory().unwrap();
309 let conn = Connection::from_url("test", "https://api.example.com/v1").unwrap();
310 store.save_connection(&conn).unwrap();
311 let loaded = store.get_connection(&conn.id).unwrap();
312 assert!(loaded.is_some());
313 assert_eq!(loaded.unwrap().name, "test");
314 }
315
316 #[test]
317 fn test_list_connections() {
318 let store = ConnectionStore::open_memory().unwrap();
319 let c1 = Connection::from_url("api", "https://api.example.com").unwrap();
320 let c2 = Connection::from_url("ssh", "ssh://server.example.com").unwrap();
321 store.save_connection(&c1).unwrap();
322 store.save_connection(&c2).unwrap();
323 let all = store.list_connections(None).unwrap();
324 assert_eq!(all.len(), 2);
325 }
326
327 #[test]
328 fn test_delete_connection() {
329 let store = ConnectionStore::open_memory().unwrap();
330 let conn = Connection::from_url("test", "https://example.com").unwrap();
331 let id = conn.id;
332 store.save_connection(&conn).unwrap();
333 assert!(store.delete_connection(&id).unwrap());
334 assert!(store.get_connection(&id).unwrap().is_none());
335 }
336
337 #[test]
338 fn test_profile_roundtrip() {
339 let store = ConnectionStore::open_memory().unwrap();
340 let conn = Connection::from_url("test", "https://example.com").unwrap();
341 store.save_connection(&conn).unwrap();
342 let mut profile = ConnectionProfile::new(conn.id);
343 profile.record_latency(150.0);
344 store.save_profile(&profile).unwrap();
345 let loaded = store.get_profile(&conn.id).unwrap().unwrap();
346 assert_eq!(loaded.baseline.sample_count, 1);
347 }
348}