codex_memory/monitoring/
repository.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use sqlx::PgPool;
4use std::collections::HashMap;
5use std::sync::Arc;
6use tracing::debug;
7
8/// Repository abstraction for monitoring operations
9#[async_trait]
10pub trait MonitoringRepository: Send + Sync + std::fmt::Debug {
11    async fn health_check(&self) -> Result<()>;
12    async fn get_memory_tier_distribution(&self) -> Result<HashMap<String, i64>>;
13    async fn check_migration_failures(&self, hours: i64) -> Result<i64>;
14    async fn get_connection_pool_stats(&self) -> Result<ConnectionPoolStats>;
15}
16
17#[derive(Debug, Clone)]
18pub struct ConnectionPoolStats {
19    pub total_connections: u32,
20    pub idle_connections: u32,
21    pub active_connections: u32,
22}
23
24/// PostgreSQL implementation of monitoring repository
25#[derive(Debug)]
26pub struct PostgresMonitoringRepository {
27    db_pool: Arc<PgPool>,
28}
29
30impl PostgresMonitoringRepository {
31    pub fn new(db_pool: Arc<PgPool>) -> Self {
32        Self { db_pool }
33    }
34}
35
36#[async_trait]
37impl MonitoringRepository for PostgresMonitoringRepository {
38    async fn health_check(&self) -> Result<()> {
39        debug!("Performing database health check");
40
41        // Test basic connectivity
42        sqlx::query("SELECT 1 as health_check")
43            .fetch_one(self.db_pool.as_ref())
44            .await?;
45
46        // Test with a more complex query
47        sqlx::query("SELECT COUNT(*) FROM memories WHERE status = 'active'")
48            .fetch_one(self.db_pool.as_ref())
49            .await?;
50
51        debug!("Database health check passed");
52        Ok(())
53    }
54
55    async fn get_memory_tier_distribution(&self) -> Result<HashMap<String, i64>> {
56        debug!("Getting memory tier distribution");
57
58        let rows = sqlx::query_as::<_, (String, i64)>(
59            "SELECT tier, COUNT(*) FROM memories WHERE status = 'active' GROUP BY tier",
60        )
61        .fetch_all(self.db_pool.as_ref())
62        .await?;
63
64        let mut distribution = HashMap::new();
65        for (tier, count) in rows {
66            distribution.insert(tier, count);
67        }
68
69        Ok(distribution)
70    }
71
72    async fn check_migration_failures(&self, hours: i64) -> Result<i64> {
73        debug!("Checking migration failures for last {} hours", hours);
74
75        // First check if the success column exists
76        let column_exists = sqlx::query_scalar::<_, i64>(
77            "SELECT COUNT(*) FROM information_schema.columns WHERE table_name = 'migration_history' AND column_name = 'success'"
78        )
79        .fetch_one(self.db_pool.as_ref())
80        .await?;
81
82        if column_exists > 0 {
83            let failure_count = sqlx::query_scalar::<_, i64>(
84                "SELECT COUNT(*) FROM migration_history WHERE success = false AND migrated_at > NOW() - INTERVAL $1 || ' hours'"
85            )
86            .bind(hours)
87            .fetch_one(self.db_pool.as_ref())
88            .await?;
89
90            Ok(failure_count)
91        } else {
92            // Column doesn't exist, return 0
93            Ok(0)
94        }
95    }
96
97    async fn get_connection_pool_stats(&self) -> Result<ConnectionPoolStats> {
98        debug!("Getting connection pool statistics");
99
100        let total_connections = self.db_pool.size();
101        let idle_connections = self.db_pool.num_idle();
102        let active_connections = total_connections - idle_connections as u32;
103
104        Ok(ConnectionPoolStats {
105            total_connections,
106            idle_connections: idle_connections as u32,
107            active_connections,
108        })
109    }
110}
111
112/// Mock repository for testing
113#[derive(Debug)]
114pub struct MockMonitoringRepository;
115
116#[async_trait]
117impl MonitoringRepository for MockMonitoringRepository {
118    async fn health_check(&self) -> Result<()> {
119        Ok(())
120    }
121
122    async fn get_memory_tier_distribution(&self) -> Result<HashMap<String, i64>> {
123        let mut distribution = HashMap::new();
124        distribution.insert("working".to_string(), 100);
125        distribution.insert("warm".to_string(), 200);
126        distribution.insert("cold".to_string(), 300);
127        Ok(distribution)
128    }
129
130    async fn check_migration_failures(&self, _hours: i64) -> Result<i64> {
131        Ok(0)
132    }
133
134    async fn get_connection_pool_stats(&self) -> Result<ConnectionPoolStats> {
135        Ok(ConnectionPoolStats {
136            total_connections: 20,
137            idle_connections: 15,
138            active_connections: 5,
139        })
140    }
141}