use sqlx::SqlitePool;
use crate::storage::StorageError;
pub const METRIC_SERIES_TABLE_DDL: &str = r#"
CREATE TABLE IF NOT EXISTS metric_series (
series_id INTEGER PRIMARY KEY,
category TEXT NOT NULL CHECK(category IN (
'network.tcp', 'network.ping', 'network.http',
'crypto', 'polymarket', 'stock', 'custom'
)),
name TEXT NOT NULL,
target TEXT NOT NULL,
static_tags TEXT DEFAULT '{}',
description TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
"#;
pub const METRIC_VALUES_TABLE_DDL: &str = r#"
CREATE TABLE IF NOT EXISTS metric_values (
ts INTEGER NOT NULL,
series_id INTEGER NOT NULL,
value REAL NOT NULL,
unit TEXT,
success INTEGER NOT NULL DEFAULT 1,
duration_ms INTEGER,
dynamic_tags TEXT DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_metric_values_ts ON metric_values(ts);
CREATE INDEX IF NOT EXISTS idx_metric_values_series_id ON metric_values(series_id);
"#;
pub const EVENTS_TABLE_DDL: &str = r#"
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
source TEXT NOT NULL CHECK(source IN (
'collector.network.tcp', 'collector.network.ping', 'collector.network.http',
'rule.engine', 'system'
)),
kind TEXT NOT NULL CHECK(kind IN ('alert', 'error', 'system', 'audit')),
severity TEXT NOT NULL CHECK(severity IN ('debug', 'info', 'warn', 'error', 'critical')),
message TEXT NOT NULL,
payload TEXT DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_events_ts ON events(ts);
CREATE INDEX IF NOT EXISTS idx_events_source ON events(source);
"#;
pub const COLLECTORS_TABLE_DDL: &str = r#"
CREATE TABLE IF NOT EXISTS collectors (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL CHECK(type IN ('tcp', 'ping', 'http')),
name TEXT NOT NULL,
source TEXT NOT NULL CHECK(source IN ('config', 'api')),
enabled INTEGER NOT NULL DEFAULT 1,
group_name TEXT NOT NULL DEFAULT 'default',
config TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
UNIQUE(type, name)
);
"#;
pub async fn init_schema(pool: &SqlitePool) -> Result<(), StorageError> {
sqlx::query(METRIC_SERIES_TABLE_DDL).execute(pool).await?;
sqlx::query(METRIC_VALUES_TABLE_DDL).execute(pool).await?;
sqlx::query(EVENTS_TABLE_DDL).execute(pool).await?;
sqlx::query(COLLECTORS_TABLE_DDL).execute(pool).await?;
tracing::info!("Database schema initialized");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::db::SqlitePool;
#[tokio::test]
async fn test_schema_initialization() {
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
init_schema(pool.inner()).await.unwrap();
let count: (i32,) = sqlx::query_as(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='metric_series'",
)
.fetch_one(pool.inner())
.await
.unwrap();
assert_eq!(count.0, 1);
let count: (i32,) = sqlx::query_as(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='metric_values'",
)
.fetch_one(pool.inner())
.await
.unwrap();
assert_eq!(count.0, 1);
let count: (i32,) = sqlx::query_as(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='events'",
)
.fetch_one(pool.inner())
.await
.unwrap();
assert_eq!(count.0, 1);
let count: (i32,) = sqlx::query_as(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='collectors'",
)
.fetch_one(pool.inner())
.await
.unwrap();
assert_eq!(count.0, 1);
pool.close().await;
}
#[tokio::test]
async fn test_metric_series_upsert() {
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
init_schema(pool.inner()).await.unwrap();
sqlx::query(
"INSERT INTO metric_series (series_id, category, name, target, created_at, updated_at)
VALUES (12345, 'network.tcp', 'latency', '127.0.0.1:6379', 1000, 1000)",
)
.execute(pool.inner())
.await
.unwrap();
sqlx::query(
"INSERT OR REPLACE INTO metric_series (series_id, category, name, target, description, created_at, updated_at)
VALUES (12345, 'network.tcp', 'latency', '127.0.0.1:6379', 'Updated desc', 1000, 2000)",
)
.execute(pool.inner())
.await
.unwrap();
let count: (i32,) = sqlx::query_as("SELECT COUNT(*) FROM metric_series")
.fetch_one(pool.inner())
.await
.unwrap();
assert_eq!(count.0, 1);
let desc: (String,) =
sqlx::query_as("SELECT description FROM metric_series WHERE series_id = 12345")
.fetch_one(pool.inner())
.await
.unwrap();
assert_eq!(desc.0, "Updated desc");
pool.close().await;
}
}