forge_runtime/observability/
partitions.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use chrono::{DateTime, Datelike, NaiveDate, Utc};
5use sqlx::PgPool;
6use tokio_util::sync::CancellationToken;
7
8use forge_core::{ForgeError, Result};
9
10/// Partition granularity.
11#[derive(Debug, Clone, Copy, Default)]
12pub enum PartitionGranularity {
13    /// Daily partitions.
14    Daily,
15    /// Weekly partitions.
16    #[default]
17    Weekly,
18    /// Monthly partitions.
19    Monthly,
20}
21
22impl PartitionGranularity {
23    /// Get the partition suffix format.
24    pub fn suffix_format(&self) -> &'static str {
25        match self {
26            Self::Daily => "%Y%m%d",
27            Self::Weekly => "%Y_w%W",
28            Self::Monthly => "%Y%m",
29        }
30    }
31
32    /// Get the next partition boundary.
33    pub fn next_boundary(&self, from: DateTime<Utc>) -> DateTime<Utc> {
34        match self {
35            Self::Daily => {
36                let next = from.date_naive().succ_opt().unwrap_or(from.date_naive());
37                DateTime::from_naive_utc_and_offset(next.and_hms_opt(0, 0, 0).unwrap(), Utc)
38            }
39            Self::Weekly => {
40                let days_until_monday = (7 - from.weekday().num_days_from_monday()) % 7;
41                let next = from.date_naive()
42                    + chrono::Duration::days(if days_until_monday == 0 {
43                        7
44                    } else {
45                        days_until_monday as i64
46                    });
47                DateTime::from_naive_utc_and_offset(next.and_hms_opt(0, 0, 0).unwrap(), Utc)
48            }
49            Self::Monthly => {
50                let (year, month) = if from.month() == 12 {
51                    (from.year() + 1, 1)
52                } else {
53                    (from.year(), from.month() + 1)
54                };
55                let next = NaiveDate::from_ymd_opt(year, month, 1).unwrap();
56                DateTime::from_naive_utc_and_offset(next.and_hms_opt(0, 0, 0).unwrap(), Utc)
57            }
58        }
59    }
60}
61
62/// Configuration for partition management.
63#[derive(Debug, Clone)]
64pub struct PartitionConfig {
65    /// How far ahead to create partitions.
66    pub lookahead: Duration,
67    /// Partition size/granularity.
68    pub granularity: PartitionGranularity,
69    /// Retention periods per table.
70    pub retention: HashMap<String, Duration>,
71    /// How often to run maintenance.
72    pub maintenance_interval: Duration,
73}
74
75impl Default for PartitionConfig {
76    fn default() -> Self {
77        let mut retention = HashMap::new();
78        retention.insert("forge_metrics".to_string(), Duration::from_secs(30 * 86400)); // 30 days
79        retention.insert("forge_logs".to_string(), Duration::from_secs(14 * 86400)); // 14 days
80        retention.insert("forge_traces".to_string(), Duration::from_secs(7 * 86400)); // 7 days
81
82        Self {
83            lookahead: Duration::from_secs(4 * 7 * 86400), // 4 weeks
84            granularity: PartitionGranularity::Weekly,
85            retention,
86            maintenance_interval: Duration::from_secs(3600), // 1 hour
87        }
88    }
89}
90
91/// Partition manager for observability tables.
92pub struct PartitionManager {
93    pool: PgPool,
94    config: PartitionConfig,
95}
96
97impl PartitionManager {
98    /// Create a new partition manager.
99    pub fn new(pool: PgPool, config: PartitionConfig) -> Self {
100        Self { pool, config }
101    }
102
103    /// Run the partition manager until shutdown.
104    pub async fn run(&self, shutdown: CancellationToken) {
105        let mut interval = tokio::time::interval(self.config.maintenance_interval);
106
107        tracing::info!(
108            granularity = ?self.config.granularity,
109            lookahead = ?self.config.lookahead,
110            "Partition manager started"
111        );
112
113        loop {
114            tokio::select! {
115                _ = interval.tick() => {
116                    if let Err(e) = self.maintain().await {
117                        tracing::error!(error = %e, "Partition maintenance failed");
118                    }
119                }
120                _ = shutdown.cancelled() => {
121                    tracing::info!("Partition manager shutting down");
122                    break;
123                }
124            }
125        }
126    }
127
128    /// Run maintenance: create future partitions and drop expired ones.
129    pub async fn maintain(&self) -> Result<()> {
130        self.ensure_future_partitions().await?;
131        self.drop_expired_partitions().await?;
132        Ok(())
133    }
134
135    /// Ensure partitions exist for the lookahead period.
136    async fn ensure_future_partitions(&self) -> Result<()> {
137        let now = Utc::now();
138        let lookahead_end =
139            now + chrono::Duration::from_std(self.config.lookahead).unwrap_or_default();
140
141        for table in self.config.retention.keys() {
142            let mut boundary = now;
143            while boundary < lookahead_end {
144                let next = self.config.granularity.next_boundary(boundary);
145                if let Err(e) = self.create_partition(table, boundary, next).await {
146                    // Ignore "already exists" errors
147                    if !e.to_string().contains("already exists") {
148                        tracing::warn!(
149                            table = %table,
150                            error = %e,
151                            "Failed to create partition"
152                        );
153                    }
154                }
155                boundary = next;
156            }
157        }
158
159        Ok(())
160    }
161
162    /// Drop partitions older than retention period.
163    async fn drop_expired_partitions(&self) -> Result<()> {
164        let now = Utc::now();
165
166        for (table, retention) in &self.config.retention {
167            let cutoff = now - chrono::Duration::from_std(*retention).unwrap_or_default();
168
169            // List partitions for this table
170            let partitions: Vec<(String,)> = sqlx::query_as(
171                r#"
172                SELECT tablename::text FROM pg_tables
173                WHERE tablename LIKE $1 || '_%'
174                AND schemaname = 'public'
175                "#,
176            )
177            .bind(table)
178            .fetch_all(&self.pool)
179            .await
180            .map_err(|e| ForgeError::Database(e.to_string()))?;
181
182            for (partition_name,) in partitions {
183                // Try to parse the partition date from the name
184                if let Some(partition_end) = self.parse_partition_end(&partition_name) {
185                    if partition_end < cutoff {
186                        if let Err(e) = self.drop_partition(&partition_name).await {
187                            tracing::warn!(
188                                partition = %partition_name,
189                                error = %e,
190                                "Failed to drop expired partition"
191                            );
192                        } else {
193                            tracing::info!(
194                                partition = %partition_name,
195                                "Dropped expired partition"
196                            );
197                        }
198                    }
199                }
200            }
201        }
202
203        Ok(())
204    }
205
206    /// Create a partition for a time range.
207    async fn create_partition(
208        &self,
209        parent_table: &str,
210        from: DateTime<Utc>,
211        to: DateTime<Utc>,
212    ) -> Result<()> {
213        let suffix = from
214            .format(self.config.granularity.suffix_format())
215            .to_string();
216        let partition_name = format!("{}_{}", parent_table, suffix);
217
218        let from_str = from.format("%Y-%m-%d %H:%M:%S").to_string();
219        let to_str = to.format("%Y-%m-%d %H:%M:%S").to_string();
220
221        // Determine the partition key column (reserved for future use)
222        let _partition_column = if parent_table == "forge_traces" {
223            "started_at"
224        } else {
225            "timestamp"
226        };
227
228        let sql = format!(
229            "CREATE TABLE IF NOT EXISTS {} PARTITION OF {} FOR VALUES FROM ('{}') TO ('{}')",
230            partition_name, parent_table, from_str, to_str
231        );
232
233        sqlx::query(&sql)
234            .execute(&self.pool)
235            .await
236            .map_err(|e| ForgeError::Database(e.to_string()))?;
237
238        tracing::debug!(
239            partition = %partition_name,
240            from = %from_str,
241            to = %to_str,
242            "Created partition"
243        );
244
245        Ok(())
246    }
247
248    /// Drop a partition.
249    async fn drop_partition(&self, partition_name: &str) -> Result<()> {
250        let sql = format!("DROP TABLE IF EXISTS {} CASCADE", partition_name);
251        sqlx::query(&sql)
252            .execute(&self.pool)
253            .await
254            .map_err(|e| ForgeError::Database(e.to_string()))?;
255        Ok(())
256    }
257
258    /// Parse the end date from a partition name.
259    fn parse_partition_end(&self, partition_name: &str) -> Option<DateTime<Utc>> {
260        // Try to extract date from partition name like "forge_metrics_20250101"
261        let parts: Vec<&str> = partition_name.rsplitn(2, '_').collect();
262        if parts.is_empty() {
263            return None;
264        }
265
266        let date_str = parts[0];
267
268        // Try parsing as YYYYMMDD
269        if let Ok(date) = NaiveDate::parse_from_str(date_str, "%Y%m%d") {
270            return Some(DateTime::from_naive_utc_and_offset(
271                date.and_hms_opt(0, 0, 0).unwrap(),
272                Utc,
273            ));
274        }
275
276        // Try parsing as YYYYMM
277        if date_str.len() == 6 {
278            if let Ok(year) = date_str[..4].parse::<i32>() {
279                if let Ok(month) = date_str[4..].parse::<u32>() {
280                    if let Some(date) = NaiveDate::from_ymd_opt(year, month, 1) {
281                        return Some(DateTime::from_naive_utc_and_offset(
282                            date.and_hms_opt(0, 0, 0).unwrap(),
283                            Utc,
284                        ));
285                    }
286                }
287            }
288        }
289
290        None
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297
298    #[test]
299    fn test_partition_config_default() {
300        let config = PartitionConfig::default();
301        assert_eq!(config.retention.len(), 3);
302        assert!(config.retention.contains_key("forge_metrics"));
303        assert!(config.retention.contains_key("forge_logs"));
304        assert!(config.retention.contains_key("forge_traces"));
305    }
306
307    #[test]
308    fn test_partition_granularity_next_boundary() {
309        let now = Utc::now();
310
311        let daily = PartitionGranularity::Daily;
312        let next_daily = daily.next_boundary(now);
313        assert!(next_daily > now);
314
315        let monthly = PartitionGranularity::Monthly;
316        let next_monthly = monthly.next_boundary(now);
317        assert!(next_monthly > now);
318    }
319}