codex_memory/monitoring/
repository.rs1use anyhow::Result;
2use async_trait::async_trait;
3use sqlx::PgPool;
4use std::collections::HashMap;
5use std::sync::Arc;
6use tracing::debug;
7
8#[async_trait]
10pub trait MonitoringRepository: Send + Sync + std::fmt::Debug {
11 async fn health_check(&self) -> Result<()>;
12 async fn get_memory_tier_distribution(&self) -> Result<HashMap<String, i64>>;
13 async fn check_migration_failures(&self, hours: i64) -> Result<i64>;
14 async fn get_connection_pool_stats(&self) -> Result<ConnectionPoolStats>;
15}
16
17#[derive(Debug, Clone)]
18pub struct ConnectionPoolStats {
19 pub total_connections: u32,
20 pub idle_connections: u32,
21 pub active_connections: u32,
22}
23
24#[derive(Debug)]
26pub struct PostgresMonitoringRepository {
27 db_pool: Arc<PgPool>,
28}
29
30impl PostgresMonitoringRepository {
31 pub fn new(db_pool: Arc<PgPool>) -> Self {
32 Self { db_pool }
33 }
34}
35
36#[async_trait]
37impl MonitoringRepository for PostgresMonitoringRepository {
38 async fn health_check(&self) -> Result<()> {
39 debug!("Performing database health check");
40
41 sqlx::query("SELECT 1 as health_check")
43 .fetch_one(self.db_pool.as_ref())
44 .await?;
45
46 sqlx::query("SELECT COUNT(*) FROM memories WHERE status = 'active'")
48 .fetch_one(self.db_pool.as_ref())
49 .await?;
50
51 debug!("Database health check passed");
52 Ok(())
53 }
54
55 async fn get_memory_tier_distribution(&self) -> Result<HashMap<String, i64>> {
56 debug!("Getting memory tier distribution");
57
58 let rows = sqlx::query_as::<_, (String, i64)>(
59 "SELECT tier, COUNT(*) FROM memories WHERE status = 'active' GROUP BY tier",
60 )
61 .fetch_all(self.db_pool.as_ref())
62 .await?;
63
64 let mut distribution = HashMap::new();
65 for (tier, count) in rows {
66 distribution.insert(tier, count);
67 }
68
69 Ok(distribution)
70 }
71
72 async fn check_migration_failures(&self, hours: i64) -> Result<i64> {
73 debug!("Checking migration failures for last {} hours", hours);
74
75 let column_exists = sqlx::query_scalar::<_, i64>(
77 "SELECT COUNT(*) FROM information_schema.columns WHERE table_name = 'migration_history' AND column_name = 'success'"
78 )
79 .fetch_one(self.db_pool.as_ref())
80 .await?;
81
82 if column_exists > 0 {
83 let failure_count = sqlx::query_scalar::<_, i64>(
84 "SELECT COUNT(*) FROM migration_history WHERE success = false AND migrated_at > NOW() - INTERVAL $1 || ' hours'"
85 )
86 .bind(hours)
87 .fetch_one(self.db_pool.as_ref())
88 .await?;
89
90 Ok(failure_count)
91 } else {
92 Ok(0)
94 }
95 }
96
97 async fn get_connection_pool_stats(&self) -> Result<ConnectionPoolStats> {
98 debug!("Getting connection pool statistics");
99
100 let total_connections = self.db_pool.size();
101 let idle_connections = self.db_pool.num_idle();
102 let active_connections = total_connections - idle_connections as u32;
103
104 Ok(ConnectionPoolStats {
105 total_connections,
106 idle_connections: idle_connections as u32,
107 active_connections,
108 })
109 }
110}
111
112#[derive(Debug)]
114pub struct MockMonitoringRepository;
115
116#[async_trait]
117impl MonitoringRepository for MockMonitoringRepository {
118 async fn health_check(&self) -> Result<()> {
119 Ok(())
120 }
121
122 async fn get_memory_tier_distribution(&self) -> Result<HashMap<String, i64>> {
123 let mut distribution = HashMap::new();
124 distribution.insert("working".to_string(), 100);
125 distribution.insert("warm".to_string(), 200);
126 distribution.insert("cold".to_string(), 300);
127 Ok(distribution)
128 }
129
130 async fn check_migration_failures(&self, _hours: i64) -> Result<i64> {
131 Ok(0)
132 }
133
134 async fn get_connection_pool_stats(&self) -> Result<ConnectionPoolStats> {
135 Ok(ConnectionPoolStats {
136 total_connections: 20,
137 idle_connections: 15,
138 active_connections: 5,
139 })
140 }
141}