kaccy_db/
partitioning.rs

1//! Database partitioning support for PostgreSQL
2//!
3//! Provides table partitioning strategies (range, list, hash),
4//! partition pruning optimization, and automatic partition management.
5
6use crate::error::{DbError, Result};
7use chrono::{DateTime, Datelike, Utc};
8use sqlx::PgPool;
9use std::fmt;
10
11/// Partitioning strategy
12#[derive(Debug, Clone, PartialEq)]
13pub enum PartitioningStrategy {
14    /// Range partitioning (e.g., by date ranges)
15    Range { column: String },
16    /// List partitioning (e.g., by specific values)
17    List { column: String },
18    /// Hash partitioning (e.g., for even distribution)
19    Hash {
20        column: String,
21        num_partitions: usize,
22    },
23}
24
25/// Time-based partition interval
26#[derive(Debug, Clone, Copy, PartialEq)]
27pub enum PartitionInterval {
28    /// Partition by day
29    Daily,
30    /// Partition by week
31    Weekly,
32    /// Partition by month
33    Monthly,
34    /// Partition by quarter
35    Quarterly,
36    /// Partition by year
37    Yearly,
38}
39
40impl PartitionInterval {
41    /// Get partition suffix for a given date
42    pub fn partition_suffix(&self, date: DateTime<Utc>) -> String {
43        match self {
44            Self::Daily => date.format("%Y%m%d").to_string(),
45            Self::Weekly => {
46                let week = date.iso_week().week();
47                format!("{}w{:02}", date.year(), week)
48            }
49            Self::Monthly => date.format("%Y%m").to_string(),
50            Self::Quarterly => {
51                let quarter = (date.month() - 1) / 3 + 1;
52                format!("{}q{}", date.year(), quarter)
53            }
54            Self::Yearly => date.format("%Y").to_string(),
55        }
56    }
57
58    /// Get start and end dates for partition
59    pub fn partition_bounds(&self, date: DateTime<Utc>) -> (DateTime<Utc>, DateTime<Utc>) {
60        use chrono::{Datelike, Duration, NaiveDate};
61
62        match self {
63            Self::Daily => {
64                let start = date.date_naive().and_hms_opt(0, 0, 0).unwrap();
65                let end = start + Duration::days(1);
66                (
67                    DateTime::from_naive_utc_and_offset(start, Utc),
68                    DateTime::from_naive_utc_and_offset(end, Utc),
69                )
70            }
71            Self::Weekly => {
72                let days_from_monday = date.weekday().num_days_from_monday();
73                let start_date = date.date_naive() - Duration::days(days_from_monday as i64);
74                let start = start_date.and_hms_opt(0, 0, 0).unwrap();
75                let end = start + Duration::weeks(1);
76                (
77                    DateTime::from_naive_utc_and_offset(start, Utc),
78                    DateTime::from_naive_utc_and_offset(end, Utc),
79                )
80            }
81            Self::Monthly => {
82                let start_date = NaiveDate::from_ymd_opt(date.year(), date.month(), 1).unwrap();
83                let start = start_date.and_hms_opt(0, 0, 0).unwrap();
84                let next_month = if date.month() == 12 {
85                    NaiveDate::from_ymd_opt(date.year() + 1, 1, 1).unwrap()
86                } else {
87                    NaiveDate::from_ymd_opt(date.year(), date.month() + 1, 1).unwrap()
88                };
89                let end = next_month.and_hms_opt(0, 0, 0).unwrap();
90                (
91                    DateTime::from_naive_utc_and_offset(start, Utc),
92                    DateTime::from_naive_utc_and_offset(end, Utc),
93                )
94            }
95            Self::Quarterly => {
96                let quarter = (date.month() - 1) / 3 + 1;
97                let start_month = (quarter - 1) * 3 + 1;
98                let start_date = NaiveDate::from_ymd_opt(date.year(), start_month, 1).unwrap();
99                let start = start_date.and_hms_opt(0, 0, 0).unwrap();
100
101                let end_month = start_month + 3;
102                let (end_year, end_month) = if end_month > 12 {
103                    (date.year() + 1, end_month - 12)
104                } else {
105                    (date.year(), end_month)
106                };
107                let end_date = NaiveDate::from_ymd_opt(end_year, end_month, 1).unwrap();
108                let end = end_date.and_hms_opt(0, 0, 0).unwrap();
109
110                (
111                    DateTime::from_naive_utc_and_offset(start, Utc),
112                    DateTime::from_naive_utc_and_offset(end, Utc),
113                )
114            }
115            Self::Yearly => {
116                let start_date = NaiveDate::from_ymd_opt(date.year(), 1, 1).unwrap();
117                let start = start_date.and_hms_opt(0, 0, 0).unwrap();
118                let end_date = NaiveDate::from_ymd_opt(date.year() + 1, 1, 1).unwrap();
119                let end = end_date.and_hms_opt(0, 0, 0).unwrap();
120                (
121                    DateTime::from_naive_utc_and_offset(start, Utc),
122                    DateTime::from_naive_utc_and_offset(end, Utc),
123                )
124            }
125        }
126    }
127}
128
129/// Partition definition
130#[derive(Debug, Clone)]
131pub struct PartitionDefinition {
132    pub table_name: String,
133    pub partition_name: String,
134    pub strategy: PartitioningStrategy,
135}
136
137impl PartitionDefinition {
138    /// Generate SQL for creating partition
139    pub fn create_sql(&self) -> String {
140        match &self.strategy {
141            PartitioningStrategy::Range { column: _ } => {
142                format!(
143                    "CREATE TABLE IF NOT EXISTS {} PARTITION OF {} FOR VALUES FROM (...) TO (...)",
144                    self.partition_name, self.table_name
145                )
146            }
147            PartitioningStrategy::List { column: _ } => {
148                format!(
149                    "CREATE TABLE IF NOT EXISTS {} PARTITION OF {} FOR VALUES IN (...)",
150                    self.partition_name, self.table_name
151                )
152            }
153            PartitioningStrategy::Hash {
154                column: _,
155                num_partitions,
156            } => {
157                format!(
158                    "CREATE TABLE IF NOT EXISTS {} PARTITION OF {} FOR VALUES WITH (MODULUS {}, REMAINDER ...)",
159                    self.partition_name, self.table_name, num_partitions
160                )
161            }
162        }
163    }
164
165    /// Generate SQL for dropping partition
166    pub fn drop_sql(&self) -> String {
167        format!("DROP TABLE IF EXISTS {}", self.partition_name)
168    }
169}
170
171/// Partition manager for automatic partition management
172pub struct PartitionManager {
173    pool: PgPool,
174}
175
176impl PartitionManager {
177    /// Create a new partition manager
178    pub fn new(pool: PgPool) -> Self {
179        Self { pool }
180    }
181
182    /// Create partitioned table
183    pub async fn create_partitioned_table(
184        &self,
185        _table_name: &str,
186        strategy: &PartitioningStrategy,
187        create_table_sql: &str,
188    ) -> Result<()> {
189        let partition_clause = match strategy {
190            PartitioningStrategy::Range { column } => {
191                format!("PARTITION BY RANGE ({})", column)
192            }
193            PartitioningStrategy::List { column } => {
194                format!("PARTITION BY LIST ({})", column)
195            }
196            PartitioningStrategy::Hash { column, .. } => {
197                format!("PARTITION BY HASH ({})", column)
198            }
199        };
200
201        let sql = format!("{} {}", create_table_sql, partition_clause);
202
203        sqlx::query(&sql)
204            .execute(&self.pool)
205            .await
206            .map_err(DbError::from)?;
207
208        Ok(())
209    }
210
211    /// Create a single partition
212    pub async fn create_partition(
213        &self,
214        table_name: &str,
215        partition_name: &str,
216        from_value: &str,
217        to_value: &str,
218    ) -> Result<()> {
219        let sql = format!(
220            "CREATE TABLE IF NOT EXISTS {} PARTITION OF {} FOR VALUES FROM ('{}') TO ('{}')",
221            partition_name, table_name, from_value, to_value
222        );
223
224        sqlx::query(&sql)
225            .execute(&self.pool)
226            .await
227            .map_err(DbError::from)?;
228
229        Ok(())
230    }
231
232    /// Create time-based partition
233    pub async fn create_time_partition(
234        &self,
235        table_name: &str,
236        date: DateTime<Utc>,
237        interval: PartitionInterval,
238    ) -> Result<String> {
239        let suffix = interval.partition_suffix(date);
240        let partition_name = format!("{}_{}", table_name, suffix);
241        let (start, end) = interval.partition_bounds(date);
242
243        let from_value = start.format("%Y-%m-%d %H:%M:%S").to_string();
244        let to_value = end.format("%Y-%m-%d %H:%M:%S").to_string();
245
246        self.create_partition(table_name, &partition_name, &from_value, &to_value)
247            .await?;
248
249        Ok(partition_name)
250    }
251
252    /// Create hash partition
253    pub async fn create_hash_partition(
254        &self,
255        table_name: &str,
256        partition_index: usize,
257        modulus: usize,
258    ) -> Result<String> {
259        let partition_name = format!("{}_p{}", table_name, partition_index);
260
261        let sql = format!(
262            "CREATE TABLE IF NOT EXISTS {} PARTITION OF {} FOR VALUES WITH (MODULUS {}, REMAINDER {})",
263            partition_name, table_name, modulus, partition_index
264        );
265
266        sqlx::query(&sql)
267            .execute(&self.pool)
268            .await
269            .map_err(DbError::from)?;
270
271        Ok(partition_name)
272    }
273
274    /// Drop partition
275    pub async fn drop_partition(&self, partition_name: &str) -> Result<()> {
276        let sql = format!("DROP TABLE IF EXISTS {}", partition_name);
277
278        sqlx::query(&sql)
279            .execute(&self.pool)
280            .await
281            .map_err(DbError::from)?;
282
283        Ok(())
284    }
285
286    /// List all partitions for a table
287    pub async fn list_partitions(&self, table_name: &str) -> Result<Vec<String>> {
288        let rows = sqlx::query_as::<_, (String,)>(
289            "SELECT inhrelid::regclass::text
290             FROM pg_inherits
291             WHERE inhparent = $1::regclass",
292        )
293        .bind(table_name)
294        .fetch_all(&self.pool)
295        .await
296        .map_err(DbError::from)?;
297
298        Ok(rows.into_iter().map(|(name,)| name).collect())
299    }
300
301    /// Get partition statistics
302    pub async fn partition_stats(&self, table_name: &str) -> Result<PartitionStats> {
303        let partitions = self.list_partitions(table_name).await?;
304
305        let mut total_rows = 0;
306        let mut total_size = 0;
307
308        for partition in &partitions {
309            // Get row count
310            let row = sqlx::query_as::<_, (i64,)>(&format!("SELECT COUNT(*) FROM {}", partition))
311                .fetch_one(&self.pool)
312                .await
313                .map_err(DbError::from)?;
314            total_rows += row.0;
315
316            // Get size
317            let size_row =
318                sqlx::query_as::<_, (i64,)>("SELECT pg_total_relation_size($1::regclass)")
319                    .bind(partition)
320                    .fetch_one(&self.pool)
321                    .await
322                    .map_err(DbError::from)?;
323            total_size += size_row.0;
324        }
325
326        Ok(PartitionStats {
327            partition_count: partitions.len(),
328            total_rows: total_rows as usize,
329            total_size_bytes: total_size as usize,
330            partitions,
331        })
332    }
333
334    /// Detach partition (for archival/deletion)
335    pub async fn detach_partition(&self, table_name: &str, partition_name: &str) -> Result<()> {
336        let sql = format!(
337            "ALTER TABLE {} DETACH PARTITION {}",
338            table_name, partition_name
339        );
340
341        sqlx::query(&sql)
342            .execute(&self.pool)
343            .await
344            .map_err(DbError::from)?;
345
346        Ok(())
347    }
348
349    /// Attach partition
350    pub async fn attach_partition(
351        &self,
352        table_name: &str,
353        partition_name: &str,
354        from_value: &str,
355        to_value: &str,
356    ) -> Result<()> {
357        let sql = format!(
358            "ALTER TABLE {} ATTACH PARTITION {} FOR VALUES FROM ('{}') TO ('{}')",
359            table_name, partition_name, from_value, to_value
360        );
361
362        sqlx::query(&sql)
363            .execute(&self.pool)
364            .await
365            .map_err(DbError::from)?;
366
367        Ok(())
368    }
369}
370
371/// Partition statistics
372#[derive(Debug, Clone)]
373pub struct PartitionStats {
374    pub partition_count: usize,
375    pub total_rows: usize,
376    pub total_size_bytes: usize,
377    pub partitions: Vec<String>,
378}
379
380impl fmt::Display for PartitionStats {
381    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
382        write!(
383            f,
384            "Partitions: {}, Rows: {}, Size: {} bytes",
385            self.partition_count, self.total_rows, self.total_size_bytes
386        )
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use chrono::TimeZone;
394
395    #[test]
396    fn test_partition_interval_suffix() {
397        let date = Utc.with_ymd_and_hms(2024, 3, 15, 12, 0, 0).unwrap();
398
399        assert_eq!(PartitionInterval::Daily.partition_suffix(date), "20240315");
400        assert_eq!(PartitionInterval::Monthly.partition_suffix(date), "202403");
401        assert_eq!(PartitionInterval::Yearly.partition_suffix(date), "2024");
402    }
403
404    #[test]
405    fn test_partition_interval_quarterly() {
406        let q1 = Utc.with_ymd_and_hms(2024, 2, 15, 0, 0, 0).unwrap();
407        let q2 = Utc.with_ymd_and_hms(2024, 5, 15, 0, 0, 0).unwrap();
408        let q3 = Utc.with_ymd_and_hms(2024, 8, 15, 0, 0, 0).unwrap();
409        let q4 = Utc.with_ymd_and_hms(2024, 11, 15, 0, 0, 0).unwrap();
410
411        assert_eq!(PartitionInterval::Quarterly.partition_suffix(q1), "2024q1");
412        assert_eq!(PartitionInterval::Quarterly.partition_suffix(q2), "2024q2");
413        assert_eq!(PartitionInterval::Quarterly.partition_suffix(q3), "2024q3");
414        assert_eq!(PartitionInterval::Quarterly.partition_suffix(q4), "2024q4");
415    }
416
417    #[test]
418    fn test_partition_bounds_daily() {
419        let date = Utc.with_ymd_and_hms(2024, 3, 15, 12, 30, 45).unwrap();
420        let (start, end) = PartitionInterval::Daily.partition_bounds(date);
421
422        assert_eq!(start, Utc.with_ymd_and_hms(2024, 3, 15, 0, 0, 0).unwrap());
423        assert_eq!(end, Utc.with_ymd_and_hms(2024, 3, 16, 0, 0, 0).unwrap());
424    }
425
426    #[test]
427    fn test_partition_bounds_monthly() {
428        let date = Utc.with_ymd_and_hms(2024, 3, 15, 12, 0, 0).unwrap();
429        let (start, end) = PartitionInterval::Monthly.partition_bounds(date);
430
431        assert_eq!(start, Utc.with_ymd_and_hms(2024, 3, 1, 0, 0, 0).unwrap());
432        assert_eq!(end, Utc.with_ymd_and_hms(2024, 4, 1, 0, 0, 0).unwrap());
433    }
434
435    #[test]
436    fn test_partition_bounds_yearly() {
437        let date = Utc.with_ymd_and_hms(2026, 6, 15, 12, 0, 0).unwrap();
438        let (start, end) = PartitionInterval::Yearly.partition_bounds(date);
439
440        assert_eq!(start, Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap());
441        assert_eq!(end, Utc.with_ymd_and_hms(2027, 1, 1, 0, 0, 0).unwrap());
442    }
443
444    #[test]
445    fn test_partitioning_strategy_enum() {
446        let range = PartitioningStrategy::Range {
447            column: "created_at".to_string(),
448        };
449        assert!(matches!(range, PartitioningStrategy::Range { .. }));
450
451        let list = PartitioningStrategy::List {
452            column: "status".to_string(),
453        };
454        assert!(matches!(list, PartitioningStrategy::List { .. }));
455
456        let hash = PartitioningStrategy::Hash {
457            column: "user_id".to_string(),
458            num_partitions: 4,
459        };
460        assert!(matches!(hash, PartitioningStrategy::Hash { .. }));
461    }
462
463    #[test]
464    fn test_partition_definition_create_sql() {
465        let def = PartitionDefinition {
466            table_name: "events".to_string(),
467            partition_name: "events_202401".to_string(),
468            strategy: PartitioningStrategy::Range {
469                column: "created_at".to_string(),
470            },
471        };
472
473        let sql = def.create_sql();
474        assert!(sql.contains("CREATE TABLE"));
475        assert!(sql.contains("PARTITION OF"));
476        assert!(sql.contains("events"));
477    }
478
479    #[test]
480    fn test_partition_definition_drop_sql() {
481        let def = PartitionDefinition {
482            table_name: "events".to_string(),
483            partition_name: "events_202401".to_string(),
484            strategy: PartitioningStrategy::Range {
485                column: "created_at".to_string(),
486            },
487        };
488
489        let sql = def.drop_sql();
490        assert_eq!(sql, "DROP TABLE IF EXISTS events_202401");
491    }
492
493    #[test]
494    fn test_partition_stats_display() {
495        let stats = PartitionStats {
496            partition_count: 3,
497            total_rows: 10000,
498            total_size_bytes: 1048576,
499            partitions: vec!["p1".to_string(), "p2".to_string(), "p3".to_string()],
500        };
501
502        let display = format!("{}", stats);
503        assert!(display.contains("Partitions: 3"));
504        assert!(display.contains("Rows: 10000"));
505        assert!(display.contains("Size: 1048576 bytes"));
506    }
507
508    #[test]
509    fn test_partition_interval_enum() {
510        assert_eq!(PartitionInterval::Daily, PartitionInterval::Daily);
511        assert_ne!(PartitionInterval::Daily, PartitionInterval::Monthly);
512    }
513}