forge_runtime/observability/
partitions.rs1use std::collections::HashMap;
2use std::time::Duration;
3
4use chrono::{DateTime, Datelike, NaiveDate, Utc};
5use sqlx::PgPool;
6use tokio_util::sync::CancellationToken;
7
8use forge_core::{ForgeError, Result};
9
10#[derive(Debug, Clone, Copy, Default)]
12pub enum PartitionGranularity {
13 Daily,
15 #[default]
17 Weekly,
18 Monthly,
20}
21
22impl PartitionGranularity {
23 pub fn suffix_format(&self) -> &'static str {
25 match self {
26 Self::Daily => "%Y%m%d",
27 Self::Weekly => "%Y_w%W",
28 Self::Monthly => "%Y%m",
29 }
30 }
31
32 pub fn next_boundary(&self, from: DateTime<Utc>) -> DateTime<Utc> {
34 match self {
35 Self::Daily => {
36 let next = from.date_naive().succ_opt().unwrap_or(from.date_naive());
37 DateTime::from_naive_utc_and_offset(next.and_hms_opt(0, 0, 0).unwrap(), Utc)
38 }
39 Self::Weekly => {
40 let days_until_monday = (7 - from.weekday().num_days_from_monday()) % 7;
41 let next = from.date_naive()
42 + chrono::Duration::days(if days_until_monday == 0 {
43 7
44 } else {
45 days_until_monday as i64
46 });
47 DateTime::from_naive_utc_and_offset(next.and_hms_opt(0, 0, 0).unwrap(), Utc)
48 }
49 Self::Monthly => {
50 let (year, month) = if from.month() == 12 {
51 (from.year() + 1, 1)
52 } else {
53 (from.year(), from.month() + 1)
54 };
55 let next = NaiveDate::from_ymd_opt(year, month, 1).unwrap();
56 DateTime::from_naive_utc_and_offset(next.and_hms_opt(0, 0, 0).unwrap(), Utc)
57 }
58 }
59 }
60}
61
62#[derive(Debug, Clone)]
64pub struct PartitionConfig {
65 pub lookahead: Duration,
67 pub granularity: PartitionGranularity,
69 pub retention: HashMap<String, Duration>,
71 pub maintenance_interval: Duration,
73}
74
75impl Default for PartitionConfig {
76 fn default() -> Self {
77 let mut retention = HashMap::new();
78 retention.insert("forge_metrics".to_string(), Duration::from_secs(30 * 86400)); retention.insert("forge_logs".to_string(), Duration::from_secs(14 * 86400)); retention.insert("forge_traces".to_string(), Duration::from_secs(7 * 86400)); Self {
83 lookahead: Duration::from_secs(4 * 7 * 86400), granularity: PartitionGranularity::Weekly,
85 retention,
86 maintenance_interval: Duration::from_secs(3600), }
88 }
89}
90
91pub struct PartitionManager {
93 pool: PgPool,
94 config: PartitionConfig,
95}
96
97impl PartitionManager {
98 pub fn new(pool: PgPool, config: PartitionConfig) -> Self {
100 Self { pool, config }
101 }
102
103 pub async fn run(&self, shutdown: CancellationToken) {
105 let mut interval = tokio::time::interval(self.config.maintenance_interval);
106
107 tracing::info!(
108 granularity = ?self.config.granularity,
109 lookahead = ?self.config.lookahead,
110 "Partition manager started"
111 );
112
113 loop {
114 tokio::select! {
115 _ = interval.tick() => {
116 if let Err(e) = self.maintain().await {
117 tracing::error!(error = %e, "Partition maintenance failed");
118 }
119 }
120 _ = shutdown.cancelled() => {
121 tracing::info!("Partition manager shutting down");
122 break;
123 }
124 }
125 }
126 }
127
128 pub async fn maintain(&self) -> Result<()> {
130 self.ensure_future_partitions().await?;
131 self.drop_expired_partitions().await?;
132 Ok(())
133 }
134
135 async fn ensure_future_partitions(&self) -> Result<()> {
137 let now = Utc::now();
138 let lookahead_end =
139 now + chrono::Duration::from_std(self.config.lookahead).unwrap_or_default();
140
141 for table in self.config.retention.keys() {
142 let mut boundary = now;
143 while boundary < lookahead_end {
144 let next = self.config.granularity.next_boundary(boundary);
145 if let Err(e) = self.create_partition(table, boundary, next).await {
146 if !e.to_string().contains("already exists") {
148 tracing::warn!(
149 table = %table,
150 error = %e,
151 "Failed to create partition"
152 );
153 }
154 }
155 boundary = next;
156 }
157 }
158
159 Ok(())
160 }
161
162 async fn drop_expired_partitions(&self) -> Result<()> {
164 let now = Utc::now();
165
166 for (table, retention) in &self.config.retention {
167 let cutoff = now - chrono::Duration::from_std(*retention).unwrap_or_default();
168
169 let partitions: Vec<(String,)> = sqlx::query_as(
171 r#"
172 SELECT tablename::text FROM pg_tables
173 WHERE tablename LIKE $1 || '_%'
174 AND schemaname = 'public'
175 "#,
176 )
177 .bind(table)
178 .fetch_all(&self.pool)
179 .await
180 .map_err(|e| ForgeError::Database(e.to_string()))?;
181
182 for (partition_name,) in partitions {
183 if let Some(partition_end) = self.parse_partition_end(&partition_name) {
185 if partition_end < cutoff {
186 if let Err(e) = self.drop_partition(&partition_name).await {
187 tracing::warn!(
188 partition = %partition_name,
189 error = %e,
190 "Failed to drop expired partition"
191 );
192 } else {
193 tracing::info!(
194 partition = %partition_name,
195 "Dropped expired partition"
196 );
197 }
198 }
199 }
200 }
201 }
202
203 Ok(())
204 }
205
206 async fn create_partition(
208 &self,
209 parent_table: &str,
210 from: DateTime<Utc>,
211 to: DateTime<Utc>,
212 ) -> Result<()> {
213 let suffix = from
214 .format(self.config.granularity.suffix_format())
215 .to_string();
216 let partition_name = format!("{}_{}", parent_table, suffix);
217
218 let from_str = from.format("%Y-%m-%d %H:%M:%S").to_string();
219 let to_str = to.format("%Y-%m-%d %H:%M:%S").to_string();
220
221 let _partition_column = if parent_table == "forge_traces" {
223 "started_at"
224 } else {
225 "timestamp"
226 };
227
228 let sql = format!(
229 "CREATE TABLE IF NOT EXISTS {} PARTITION OF {} FOR VALUES FROM ('{}') TO ('{}')",
230 partition_name, parent_table, from_str, to_str
231 );
232
233 sqlx::query(&sql)
234 .execute(&self.pool)
235 .await
236 .map_err(|e| ForgeError::Database(e.to_string()))?;
237
238 tracing::debug!(
239 partition = %partition_name,
240 from = %from_str,
241 to = %to_str,
242 "Created partition"
243 );
244
245 Ok(())
246 }
247
248 async fn drop_partition(&self, partition_name: &str) -> Result<()> {
250 let sql = format!("DROP TABLE IF EXISTS {} CASCADE", partition_name);
251 sqlx::query(&sql)
252 .execute(&self.pool)
253 .await
254 .map_err(|e| ForgeError::Database(e.to_string()))?;
255 Ok(())
256 }
257
258 fn parse_partition_end(&self, partition_name: &str) -> Option<DateTime<Utc>> {
260 let parts: Vec<&str> = partition_name.rsplitn(2, '_').collect();
262 if parts.is_empty() {
263 return None;
264 }
265
266 let date_str = parts[0];
267
268 if let Ok(date) = NaiveDate::parse_from_str(date_str, "%Y%m%d") {
270 return Some(DateTime::from_naive_utc_and_offset(
271 date.and_hms_opt(0, 0, 0).unwrap(),
272 Utc,
273 ));
274 }
275
276 if date_str.len() == 6 {
278 if let Ok(year) = date_str[..4].parse::<i32>() {
279 if let Ok(month) = date_str[4..].parse::<u32>() {
280 if let Some(date) = NaiveDate::from_ymd_opt(year, month, 1) {
281 return Some(DateTime::from_naive_utc_and_offset(
282 date.and_hms_opt(0, 0, 0).unwrap(),
283 Utc,
284 ));
285 }
286 }
287 }
288 }
289
290 None
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297
298 #[test]
299 fn test_partition_config_default() {
300 let config = PartitionConfig::default();
301 assert_eq!(config.retention.len(), 3);
302 assert!(config.retention.contains_key("forge_metrics"));
303 assert!(config.retention.contains_key("forge_logs"));
304 assert!(config.retention.contains_key("forge_traces"));
305 }
306
307 #[test]
308 fn test_partition_granularity_next_boundary() {
309 let now = Utc::now();
310
311 let daily = PartitionGranularity::Daily;
312 let next_daily = daily.next_boundary(now);
313 assert!(next_daily > now);
314
315 let monthly = PartitionGranularity::Monthly;
316 let next_monthly = monthly.next_boundary(now);
317 assert!(next_monthly > now);
318 }
319}