use chrono::Utc;
use rusqlite::{params, Connection as SqlConn};
use std::path::Path;
use crate::types::{
Connection, ConnectionId, ConnectionProfile, ConnectError, ConnectResult,
HealthCheck, HealthStatus, Protocol, StoredCredential,
};
pub struct ConnectionStore {
db: SqlConn,
}
impl ConnectionStore {
pub fn open(path: &Path) -> ConnectResult<Self> {
let db = SqlConn::open(path)?;
let store = Self { db };
store.init_tables()?;
Ok(store)
}
pub fn open_memory() -> ConnectResult<Self> {
let db = SqlConn::open_in_memory()?;
let store = Self { db };
store.init_tables()?;
Ok(store)
}
fn init_tables(&self) -> ConnectResult<()> {
self.db.execute_batch(
"CREATE TABLE IF NOT EXISTS connections (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
protocol TEXT NOT NULL,
host TEXT NOT NULL,
port INTEGER,
path TEXT,
auth_json TEXT,
tags_json TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL,
last_used TEXT,
metadata_json TEXT NOT NULL DEFAULT 'null'
);
CREATE TABLE IF NOT EXISTS profiles (
connection_id TEXT PRIMARY KEY,
profile_json TEXT NOT NULL,
FOREIGN KEY (connection_id) REFERENCES connections(id)
);
CREATE TABLE IF NOT EXISTS credentials (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
auth_json TEXT NOT NULL,
created_at TEXT NOT NULL,
last_rotated TEXT,
tags_json TEXT NOT NULL DEFAULT '[]'
);
CREATE TABLE IF NOT EXISTS health_checks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
connection_id TEXT NOT NULL,
status TEXT NOT NULL,
latency_ms REAL,
message TEXT,
checked_at TEXT NOT NULL,
FOREIGN KEY (connection_id) REFERENCES connections(id)
);
CREATE INDEX IF NOT EXISTS idx_health_conn ON health_checks(connection_id);
CREATE INDEX IF NOT EXISTS idx_health_time ON health_checks(checked_at);",
)?;
Ok(())
}
pub fn save_connection(&self, conn: &Connection) -> ConnectResult<()> {
let auth_json = conn.auth.as_ref().map(|a| serde_json::to_string(a).unwrap_or_default());
let tags_json = serde_json::to_string(&conn.tags).unwrap_or_else(|_| "[]".into());
let meta_json = serde_json::to_string(&conn.metadata).unwrap_or_else(|_| "null".into());
self.db.execute(
"INSERT OR REPLACE INTO connections
(id, name, protocol, host, port, path, auth_json, tags_json, created_at, last_used, metadata_json)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
params![
conn.id.to_string(),
conn.name,
serde_json::to_string(&conn.protocol).unwrap_or_default(),
conn.host,
conn.port,
conn.path,
auth_json,
tags_json,
conn.created_at.to_rfc3339(),
conn.last_used.map(|t| t.to_rfc3339()),
meta_json,
],
)?;
Ok(())
}
pub fn get_connection(&self, id: &ConnectionId) -> ConnectResult<Option<Connection>> {
let mut stmt = self.db.prepare(
"SELECT id, name, protocol, host, port, path, auth_json, tags_json,
created_at, last_used, metadata_json
FROM connections WHERE id = ?1",
)?;
let result = stmt.query_row(params![id.to_string()], |row| {
Ok(self.row_to_connection(row))
});
match result {
Ok(conn) => Ok(Some(conn?)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(ConnectError::Database(e.to_string())),
}
}
pub fn list_connections(&self, tag: Option<&str>) -> ConnectResult<Vec<Connection>> {
let mut stmt = self.db.prepare(
"SELECT id, name, protocol, host, port, path, auth_json, tags_json,
created_at, last_used, metadata_json
FROM connections ORDER BY name",
)?;
let rows = stmt.query_map([], |row| Ok(self.row_to_connection(row)))?;
let mut connections = Vec::new();
for row in rows {
let conn = row.map_err(|e| ConnectError::Database(e.to_string()))??;
if let Some(t) = tag {
if conn.tags.iter().any(|ct| ct == t) {
connections.push(conn);
}
} else {
connections.push(conn);
}
}
Ok(connections)
}
pub fn delete_connection(&self, id: &ConnectionId) -> ConnectResult<bool> {
let count = self.db.execute(
"DELETE FROM connections WHERE id = ?1",
params![id.to_string()],
)?;
Ok(count > 0)
}
pub fn save_profile(&self, profile: &ConnectionProfile) -> ConnectResult<()> {
let json = serde_json::to_string(profile)
.map_err(|e| ConnectError::Serialization(e.to_string()))?;
self.db.execute(
"INSERT OR REPLACE INTO profiles (connection_id, profile_json) VALUES (?1, ?2)",
params![profile.connection_id.to_string(), json],
)?;
Ok(())
}
pub fn get_profile(&self, conn_id: &ConnectionId) -> ConnectResult<Option<ConnectionProfile>> {
let mut stmt = self.db.prepare(
"SELECT profile_json FROM profiles WHERE connection_id = ?1",
)?;
match stmt.query_row(params![conn_id.to_string()], |row| {
let json: String = row.get(0)?;
Ok(json)
}) {
Ok(json) => {
let profile: ConnectionProfile = serde_json::from_str(&json)?;
Ok(Some(profile))
}
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(ConnectError::Database(e.to_string())),
}
}
pub fn save_health_check(&self, check: &HealthCheck) -> ConnectResult<()> {
let status = serde_json::to_string(&check.status).unwrap_or_default();
self.db.execute(
"INSERT INTO health_checks (connection_id, status, latency_ms, message, checked_at)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![
check.connection_id.to_string(),
status,
check.latency_ms,
check.message,
check.checked_at.to_rfc3339(),
],
)?;
Ok(())
}
pub fn get_health_history(
&self,
conn_id: &ConnectionId,
limit: usize,
) -> ConnectResult<Vec<HealthCheck>> {
let mut stmt = self.db.prepare(
"SELECT connection_id, status, latency_ms, message, checked_at
FROM health_checks WHERE connection_id = ?1
ORDER BY checked_at DESC LIMIT ?2",
)?;
let rows = stmt.query_map(params![conn_id.to_string(), limit as i64], |row| {
let status_str: String = row.get(1)?;
let status: HealthStatus =
serde_json::from_str(&status_str).unwrap_or(HealthStatus::Unknown);
let checked_str: String = row.get(4)?;
Ok(HealthCheck {
connection_id: *conn_id,
protocol: Protocol::Http, host: String::new(),
port: 0,
status,
latency_ms: row.get(2)?,
message: row.get(3)?,
checked_at: chrono::DateTime::parse_from_rfc3339(&checked_str)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now()),
})
})?;
let mut checks = Vec::new();
for row in rows {
checks.push(row.map_err(|e| ConnectError::Database(e.to_string()))?);
}
Ok(checks)
}
pub fn stats(&self) -> ConnectResult<StoreStats> {
let conn_count: i64 = self.db.query_row(
"SELECT COUNT(*) FROM connections", [], |row| row.get(0),
)?;
let profile_count: i64 = self.db.query_row(
"SELECT COUNT(*) FROM profiles", [], |row| row.get(0),
)?;
let health_count: i64 = self.db.query_row(
"SELECT COUNT(*) FROM health_checks", [], |row| row.get(0),
)?;
Ok(StoreStats {
connection_count: conn_count as usize,
profile_count: profile_count as usize,
health_check_count: health_count as usize,
})
}
fn row_to_connection(&self, row: &rusqlite::Row) -> ConnectResult<Connection> {
let id_str: String = row.get(0).map_err(|e| ConnectError::Database(e.to_string()))?;
let protocol_str: String = row.get(2).map_err(|e| ConnectError::Database(e.to_string()))?;
let auth_json: Option<String> = row.get(6).map_err(|e| ConnectError::Database(e.to_string()))?;
let tags_json: String = row.get(7).map_err(|e| ConnectError::Database(e.to_string()))?;
let created_str: String = row.get(8).map_err(|e| ConnectError::Database(e.to_string()))?;
let last_used_str: Option<String> = row.get(9).map_err(|e| ConnectError::Database(e.to_string()))?;
let meta_json: String = row.get(10).map_err(|e| ConnectError::Database(e.to_string()))?;
Ok(Connection {
id: uuid::Uuid::parse_str(&id_str)
.map_err(|e| ConnectError::Database(e.to_string()))?,
name: row.get(1).map_err(|e| ConnectError::Database(e.to_string()))?,
protocol: serde_json::from_str(&protocol_str).unwrap_or(Protocol::Http),
host: row.get(3).map_err(|e| ConnectError::Database(e.to_string()))?,
port: row.get(4).map_err(|e| ConnectError::Database(e.to_string()))?,
path: row.get(5).map_err(|e| ConnectError::Database(e.to_string()))?,
auth: auth_json.and_then(|j| serde_json::from_str(&j).ok()),
tags: serde_json::from_str(&tags_json).unwrap_or_default(),
created_at: chrono::DateTime::parse_from_rfc3339(&created_str)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now()),
last_used: last_used_str.and_then(|s| {
chrono::DateTime::parse_from_rfc3339(&s)
.map(|dt| dt.with_timezone(&Utc))
.ok()
}),
metadata: serde_json::from_str(&meta_json).unwrap_or(serde_json::Value::Null),
})
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct StoreStats {
pub connection_count: usize,
pub profile_count: usize,
pub health_check_count: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_open_memory() {
let store = ConnectionStore::open_memory().unwrap();
let stats = store.stats().unwrap();
assert_eq!(stats.connection_count, 0);
}
#[test]
fn test_save_and_get_connection() {
let store = ConnectionStore::open_memory().unwrap();
let conn = Connection::from_url("test", "https://api.example.com/v1").unwrap();
store.save_connection(&conn).unwrap();
let loaded = store.get_connection(&conn.id).unwrap();
assert!(loaded.is_some());
assert_eq!(loaded.unwrap().name, "test");
}
#[test]
fn test_list_connections() {
let store = ConnectionStore::open_memory().unwrap();
let c1 = Connection::from_url("api", "https://api.example.com").unwrap();
let c2 = Connection::from_url("ssh", "ssh://server.example.com").unwrap();
store.save_connection(&c1).unwrap();
store.save_connection(&c2).unwrap();
let all = store.list_connections(None).unwrap();
assert_eq!(all.len(), 2);
}
#[test]
fn test_delete_connection() {
let store = ConnectionStore::open_memory().unwrap();
let conn = Connection::from_url("test", "https://example.com").unwrap();
let id = conn.id;
store.save_connection(&conn).unwrap();
assert!(store.delete_connection(&id).unwrap());
assert!(store.get_connection(&id).unwrap().is_none());
}
#[test]
fn test_profile_roundtrip() {
let store = ConnectionStore::open_memory().unwrap();
let conn = Connection::from_url("test", "https://example.com").unwrap();
store.save_connection(&conn).unwrap();
let mut profile = ConnectionProfile::new(conn.id);
profile.record_latency(150.0);
store.save_profile(&profile).unwrap();
let loaded = store.get_profile(&conn.id).unwrap().unwrap();
assert_eq!(loaded.baseline.sample_count, 1);
}
}