mod cache;
mod memory;
pub mod migration;
mod sqlite;
#[cfg(feature = "clickhouse")]
mod clickhouse;
#[cfg(feature = "postgres")]
mod postgres;
#[cfg(feature = "redis_backend")]
mod redis_cache;
pub use cache::InMemoryCacheLayer;
pub use memory::{InMemoryMetadataRepository, InMemoryTraceRepository};
pub use sqlite::{SqliteMetadataRepository, SqliteTraceRepository};
#[cfg(feature = "clickhouse")]
pub use self::clickhouse::ClickHouseTraceRepository;
#[cfg(feature = "postgres")]
pub use self::postgres::PostgresMetadataRepository;
#[cfg(feature = "redis_backend")]
pub use self::redis_cache::RedisCacheLayer;
use llmtrace_core::{Result, Storage};
use std::sync::Arc;
pub enum StorageProfile {
Lite {
database_path: String,
},
Memory,
#[cfg(all(
feature = "clickhouse",
feature = "postgres",
feature = "redis_backend"
))]
Production {
clickhouse_url: String,
clickhouse_database: String,
postgres_url: String,
redis_url: String,
},
}
impl StorageProfile {
pub async fn build(self) -> Result<Storage> {
match self {
StorageProfile::Lite { database_path } => {
let url = format!("sqlite:{database_path}");
let pool = sqlite::open_pool(&url).await?;
let traces = Arc::new(SqliteTraceRepository::from_pool(pool.clone()).await?);
let metadata = Arc::new(SqliteMetadataRepository::from_pool(pool).await?);
let cache = Arc::new(InMemoryCacheLayer::new());
Ok(Storage {
traces,
metadata,
cache,
})
}
StorageProfile::Memory => Ok(Storage {
traces: Arc::new(InMemoryTraceRepository::new()),
metadata: Arc::new(InMemoryMetadataRepository::new()),
cache: Arc::new(InMemoryCacheLayer::new()),
}),
#[cfg(all(
feature = "clickhouse",
feature = "postgres",
feature = "redis_backend"
))]
StorageProfile::Production {
clickhouse_url,
clickhouse_database,
postgres_url,
redis_url,
} => {
let traces = Arc::new(
ClickHouseTraceRepository::new(&clickhouse_url, &clickhouse_database).await?,
);
let metadata = Arc::new(PostgresMetadataRepository::new(&postgres_url).await?);
let cache = Arc::new(RedisCacheLayer::new(&redis_url).await?);
Ok(Storage {
traces,
metadata,
cache,
})
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use llmtrace_core::{LLMProvider, Tenant, TenantId, TraceEvent, TraceQuery, TraceSpan};
use std::time::Duration;
use uuid::Uuid;
fn make_trace(tenant_id: TenantId) -> TraceEvent {
let trace_id = Uuid::new_v4();
TraceEvent {
trace_id,
tenant_id,
spans: vec![TraceSpan::new(
trace_id,
tenant_id,
"chat_completion".to_string(),
LLMProvider::OpenAI,
"gpt-4".to_string(),
"hello".to_string(),
)],
created_at: Utc::now(),
}
}
#[tokio::test]
async fn test_storage_profile_memory() {
let storage = StorageProfile::Memory.build().await.unwrap();
let tenant = TenantId::new();
storage
.traces
.store_trace(&make_trace(tenant))
.await
.unwrap();
let traces = storage
.traces
.query_traces(&TraceQuery::new(tenant))
.await
.unwrap();
assert_eq!(traces.len(), 1);
let t = Tenant {
id: tenant,
name: "Test".to_string(),
api_token: "test-token".to_string(),
plan: "free".to_string(),
created_at: Utc::now(),
config: serde_json::json!({}),
};
storage.metadata.create_tenant(&t).await.unwrap();
let retrieved = storage.metadata.get_tenant(tenant).await.unwrap();
assert!(retrieved.is_some());
storage
.cache
.set("k", b"v", Duration::from_secs(60))
.await
.unwrap();
assert_eq!(storage.cache.get("k").await.unwrap(), Some(b"v".to_vec()));
}
#[tokio::test]
async fn test_storage_profile_lite() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db").to_string_lossy().to_string();
let storage = StorageProfile::Lite {
database_path: db_path,
}
.build()
.await
.unwrap();
let tenant = TenantId::new();
storage
.traces
.store_trace(&make_trace(tenant))
.await
.unwrap();
let traces = storage
.traces
.query_traces(&TraceQuery::new(tenant))
.await
.unwrap();
assert_eq!(traces.len(), 1);
let t = Tenant {
id: tenant,
name: "LiteTenant".to_string(),
api_token: "lite-token".to_string(),
plan: "pro".to_string(),
created_at: Utc::now(),
config: serde_json::json!({}),
};
storage.metadata.create_tenant(&t).await.unwrap();
let retrieved = storage.metadata.get_tenant(tenant).await.unwrap().unwrap();
assert_eq!(retrieved.name, "LiteTenant");
assert!(storage.traces.health_check().await.is_ok());
assert!(storage.metadata.health_check().await.is_ok());
assert!(storage.cache.health_check().await.is_ok());
}
#[tokio::test]
#[ignore = "requires running ClickHouse, PostgreSQL, and Redis instances"]
#[cfg(all(
feature = "clickhouse",
feature = "postgres",
feature = "redis_backend"
))]
async fn test_storage_profile_production() {
use std::env;
let clickhouse_url =
env::var("LLMTRACE_CLICKHOUSE_URL").expect("LLMTRACE_CLICKHOUSE_URL must be set");
let postgres_url =
env::var("LLMTRACE_POSTGRES_URL").expect("LLMTRACE_POSTGRES_URL must be set");
let redis_url = env::var("LLMTRACE_REDIS_URL").expect("LLMTRACE_REDIS_URL must be set");
let db_name = format!("llmtrace_prod_test_{}", Uuid::new_v4().simple());
let storage = StorageProfile::Production {
clickhouse_url,
clickhouse_database: db_name,
postgres_url,
redis_url,
}
.build()
.await
.unwrap();
storage.traces.health_check().await.unwrap();
storage.metadata.health_check().await.unwrap();
storage.cache.health_check().await.unwrap();
let tenant = TenantId::new();
storage
.traces
.store_trace(&make_trace(tenant))
.await
.unwrap();
let traces = storage
.traces
.query_traces(&TraceQuery::new(tenant))
.await
.unwrap();
assert_eq!(traces.len(), 1);
assert_eq!(traces[0].spans.len(), 1);
let t = Tenant {
id: tenant,
name: "ProdTest".to_string(),
api_token: "prod-token".to_string(),
plan: "enterprise".to_string(),
created_at: Utc::now(),
config: serde_json::json!({}),
};
storage.metadata.create_tenant(&t).await.unwrap();
let retrieved = storage.metadata.get_tenant(tenant).await.unwrap().unwrap();
assert_eq!(retrieved.name, "ProdTest");
storage
.cache
.set("prod:test:k", b"v", Duration::from_secs(60))
.await
.unwrap();
assert_eq!(
storage.cache.get("prod:test:k").await.unwrap(),
Some(b"v".to_vec())
);
storage.cache.invalidate("prod:test:k").await.unwrap();
assert!(storage.cache.get("prod:test:k").await.unwrap().is_none());
}
}