1use crate::error::{DbError, Result};
7use chrono::{DateTime, Datelike, Utc};
8use sqlx::PgPool;
9use std::fmt;
10
11#[derive(Debug, Clone, PartialEq)]
13pub enum PartitioningStrategy {
14 Range { column: String },
16 List { column: String },
18 Hash {
20 column: String,
21 num_partitions: usize,
22 },
23}
24
25#[derive(Debug, Clone, Copy, PartialEq)]
27pub enum PartitionInterval {
28 Daily,
30 Weekly,
32 Monthly,
34 Quarterly,
36 Yearly,
38}
39
40impl PartitionInterval {
41 pub fn partition_suffix(&self, date: DateTime<Utc>) -> String {
43 match self {
44 Self::Daily => date.format("%Y%m%d").to_string(),
45 Self::Weekly => {
46 let week = date.iso_week().week();
47 format!("{}w{:02}", date.year(), week)
48 }
49 Self::Monthly => date.format("%Y%m").to_string(),
50 Self::Quarterly => {
51 let quarter = (date.month() - 1) / 3 + 1;
52 format!("{}q{}", date.year(), quarter)
53 }
54 Self::Yearly => date.format("%Y").to_string(),
55 }
56 }
57
58 pub fn partition_bounds(&self, date: DateTime<Utc>) -> (DateTime<Utc>, DateTime<Utc>) {
60 use chrono::{Datelike, Duration, NaiveDate};
61
62 match self {
63 Self::Daily => {
64 let start = date.date_naive().and_hms_opt(0, 0, 0).unwrap();
65 let end = start + Duration::days(1);
66 (
67 DateTime::from_naive_utc_and_offset(start, Utc),
68 DateTime::from_naive_utc_and_offset(end, Utc),
69 )
70 }
71 Self::Weekly => {
72 let days_from_monday = date.weekday().num_days_from_monday();
73 let start_date = date.date_naive() - Duration::days(days_from_monday as i64);
74 let start = start_date.and_hms_opt(0, 0, 0).unwrap();
75 let end = start + Duration::weeks(1);
76 (
77 DateTime::from_naive_utc_and_offset(start, Utc),
78 DateTime::from_naive_utc_and_offset(end, Utc),
79 )
80 }
81 Self::Monthly => {
82 let start_date = NaiveDate::from_ymd_opt(date.year(), date.month(), 1).unwrap();
83 let start = start_date.and_hms_opt(0, 0, 0).unwrap();
84 let next_month = if date.month() == 12 {
85 NaiveDate::from_ymd_opt(date.year() + 1, 1, 1).unwrap()
86 } else {
87 NaiveDate::from_ymd_opt(date.year(), date.month() + 1, 1).unwrap()
88 };
89 let end = next_month.and_hms_opt(0, 0, 0).unwrap();
90 (
91 DateTime::from_naive_utc_and_offset(start, Utc),
92 DateTime::from_naive_utc_and_offset(end, Utc),
93 )
94 }
95 Self::Quarterly => {
96 let quarter = (date.month() - 1) / 3 + 1;
97 let start_month = (quarter - 1) * 3 + 1;
98 let start_date = NaiveDate::from_ymd_opt(date.year(), start_month, 1).unwrap();
99 let start = start_date.and_hms_opt(0, 0, 0).unwrap();
100
101 let end_month = start_month + 3;
102 let (end_year, end_month) = if end_month > 12 {
103 (date.year() + 1, end_month - 12)
104 } else {
105 (date.year(), end_month)
106 };
107 let end_date = NaiveDate::from_ymd_opt(end_year, end_month, 1).unwrap();
108 let end = end_date.and_hms_opt(0, 0, 0).unwrap();
109
110 (
111 DateTime::from_naive_utc_and_offset(start, Utc),
112 DateTime::from_naive_utc_and_offset(end, Utc),
113 )
114 }
115 Self::Yearly => {
116 let start_date = NaiveDate::from_ymd_opt(date.year(), 1, 1).unwrap();
117 let start = start_date.and_hms_opt(0, 0, 0).unwrap();
118 let end_date = NaiveDate::from_ymd_opt(date.year() + 1, 1, 1).unwrap();
119 let end = end_date.and_hms_opt(0, 0, 0).unwrap();
120 (
121 DateTime::from_naive_utc_and_offset(start, Utc),
122 DateTime::from_naive_utc_and_offset(end, Utc),
123 )
124 }
125 }
126 }
127}
128
129#[derive(Debug, Clone)]
131pub struct PartitionDefinition {
132 pub table_name: String,
133 pub partition_name: String,
134 pub strategy: PartitioningStrategy,
135}
136
137impl PartitionDefinition {
138 pub fn create_sql(&self) -> String {
140 match &self.strategy {
141 PartitioningStrategy::Range { column: _ } => {
142 format!(
143 "CREATE TABLE IF NOT EXISTS {} PARTITION OF {} FOR VALUES FROM (...) TO (...)",
144 self.partition_name, self.table_name
145 )
146 }
147 PartitioningStrategy::List { column: _ } => {
148 format!(
149 "CREATE TABLE IF NOT EXISTS {} PARTITION OF {} FOR VALUES IN (...)",
150 self.partition_name, self.table_name
151 )
152 }
153 PartitioningStrategy::Hash {
154 column: _,
155 num_partitions,
156 } => {
157 format!(
158 "CREATE TABLE IF NOT EXISTS {} PARTITION OF {} FOR VALUES WITH (MODULUS {}, REMAINDER ...)",
159 self.partition_name, self.table_name, num_partitions
160 )
161 }
162 }
163 }
164
165 pub fn drop_sql(&self) -> String {
167 format!("DROP TABLE IF EXISTS {}", self.partition_name)
168 }
169}
170
171pub struct PartitionManager {
173 pool: PgPool,
174}
175
176impl PartitionManager {
177 pub fn new(pool: PgPool) -> Self {
179 Self { pool }
180 }
181
182 pub async fn create_partitioned_table(
184 &self,
185 _table_name: &str,
186 strategy: &PartitioningStrategy,
187 create_table_sql: &str,
188 ) -> Result<()> {
189 let partition_clause = match strategy {
190 PartitioningStrategy::Range { column } => {
191 format!("PARTITION BY RANGE ({})", column)
192 }
193 PartitioningStrategy::List { column } => {
194 format!("PARTITION BY LIST ({})", column)
195 }
196 PartitioningStrategy::Hash { column, .. } => {
197 format!("PARTITION BY HASH ({})", column)
198 }
199 };
200
201 let sql = format!("{} {}", create_table_sql, partition_clause);
202
203 sqlx::query(&sql)
204 .execute(&self.pool)
205 .await
206 .map_err(DbError::from)?;
207
208 Ok(())
209 }
210
211 pub async fn create_partition(
213 &self,
214 table_name: &str,
215 partition_name: &str,
216 from_value: &str,
217 to_value: &str,
218 ) -> Result<()> {
219 let sql = format!(
220 "CREATE TABLE IF NOT EXISTS {} PARTITION OF {} FOR VALUES FROM ('{}') TO ('{}')",
221 partition_name, table_name, from_value, to_value
222 );
223
224 sqlx::query(&sql)
225 .execute(&self.pool)
226 .await
227 .map_err(DbError::from)?;
228
229 Ok(())
230 }
231
232 pub async fn create_time_partition(
234 &self,
235 table_name: &str,
236 date: DateTime<Utc>,
237 interval: PartitionInterval,
238 ) -> Result<String> {
239 let suffix = interval.partition_suffix(date);
240 let partition_name = format!("{}_{}", table_name, suffix);
241 let (start, end) = interval.partition_bounds(date);
242
243 let from_value = start.format("%Y-%m-%d %H:%M:%S").to_string();
244 let to_value = end.format("%Y-%m-%d %H:%M:%S").to_string();
245
246 self.create_partition(table_name, &partition_name, &from_value, &to_value)
247 .await?;
248
249 Ok(partition_name)
250 }
251
252 pub async fn create_hash_partition(
254 &self,
255 table_name: &str,
256 partition_index: usize,
257 modulus: usize,
258 ) -> Result<String> {
259 let partition_name = format!("{}_p{}", table_name, partition_index);
260
261 let sql = format!(
262 "CREATE TABLE IF NOT EXISTS {} PARTITION OF {} FOR VALUES WITH (MODULUS {}, REMAINDER {})",
263 partition_name, table_name, modulus, partition_index
264 );
265
266 sqlx::query(&sql)
267 .execute(&self.pool)
268 .await
269 .map_err(DbError::from)?;
270
271 Ok(partition_name)
272 }
273
274 pub async fn drop_partition(&self, partition_name: &str) -> Result<()> {
276 let sql = format!("DROP TABLE IF EXISTS {}", partition_name);
277
278 sqlx::query(&sql)
279 .execute(&self.pool)
280 .await
281 .map_err(DbError::from)?;
282
283 Ok(())
284 }
285
286 pub async fn list_partitions(&self, table_name: &str) -> Result<Vec<String>> {
288 let rows = sqlx::query_as::<_, (String,)>(
289 "SELECT inhrelid::regclass::text
290 FROM pg_inherits
291 WHERE inhparent = $1::regclass",
292 )
293 .bind(table_name)
294 .fetch_all(&self.pool)
295 .await
296 .map_err(DbError::from)?;
297
298 Ok(rows.into_iter().map(|(name,)| name).collect())
299 }
300
301 pub async fn partition_stats(&self, table_name: &str) -> Result<PartitionStats> {
303 let partitions = self.list_partitions(table_name).await?;
304
305 let mut total_rows = 0;
306 let mut total_size = 0;
307
308 for partition in &partitions {
309 let row = sqlx::query_as::<_, (i64,)>(&format!("SELECT COUNT(*) FROM {}", partition))
311 .fetch_one(&self.pool)
312 .await
313 .map_err(DbError::from)?;
314 total_rows += row.0;
315
316 let size_row =
318 sqlx::query_as::<_, (i64,)>("SELECT pg_total_relation_size($1::regclass)")
319 .bind(partition)
320 .fetch_one(&self.pool)
321 .await
322 .map_err(DbError::from)?;
323 total_size += size_row.0;
324 }
325
326 Ok(PartitionStats {
327 partition_count: partitions.len(),
328 total_rows: total_rows as usize,
329 total_size_bytes: total_size as usize,
330 partitions,
331 })
332 }
333
334 pub async fn detach_partition(&self, table_name: &str, partition_name: &str) -> Result<()> {
336 let sql = format!(
337 "ALTER TABLE {} DETACH PARTITION {}",
338 table_name, partition_name
339 );
340
341 sqlx::query(&sql)
342 .execute(&self.pool)
343 .await
344 .map_err(DbError::from)?;
345
346 Ok(())
347 }
348
349 pub async fn attach_partition(
351 &self,
352 table_name: &str,
353 partition_name: &str,
354 from_value: &str,
355 to_value: &str,
356 ) -> Result<()> {
357 let sql = format!(
358 "ALTER TABLE {} ATTACH PARTITION {} FOR VALUES FROM ('{}') TO ('{}')",
359 table_name, partition_name, from_value, to_value
360 );
361
362 sqlx::query(&sql)
363 .execute(&self.pool)
364 .await
365 .map_err(DbError::from)?;
366
367 Ok(())
368 }
369}
370
371#[derive(Debug, Clone)]
373pub struct PartitionStats {
374 pub partition_count: usize,
375 pub total_rows: usize,
376 pub total_size_bytes: usize,
377 pub partitions: Vec<String>,
378}
379
380impl fmt::Display for PartitionStats {
381 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
382 write!(
383 f,
384 "Partitions: {}, Rows: {}, Size: {} bytes",
385 self.partition_count, self.total_rows, self.total_size_bytes
386 )
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393 use chrono::TimeZone;
394
395 #[test]
396 fn test_partition_interval_suffix() {
397 let date = Utc.with_ymd_and_hms(2024, 3, 15, 12, 0, 0).unwrap();
398
399 assert_eq!(PartitionInterval::Daily.partition_suffix(date), "20240315");
400 assert_eq!(PartitionInterval::Monthly.partition_suffix(date), "202403");
401 assert_eq!(PartitionInterval::Yearly.partition_suffix(date), "2024");
402 }
403
404 #[test]
405 fn test_partition_interval_quarterly() {
406 let q1 = Utc.with_ymd_and_hms(2024, 2, 15, 0, 0, 0).unwrap();
407 let q2 = Utc.with_ymd_and_hms(2024, 5, 15, 0, 0, 0).unwrap();
408 let q3 = Utc.with_ymd_and_hms(2024, 8, 15, 0, 0, 0).unwrap();
409 let q4 = Utc.with_ymd_and_hms(2024, 11, 15, 0, 0, 0).unwrap();
410
411 assert_eq!(PartitionInterval::Quarterly.partition_suffix(q1), "2024q1");
412 assert_eq!(PartitionInterval::Quarterly.partition_suffix(q2), "2024q2");
413 assert_eq!(PartitionInterval::Quarterly.partition_suffix(q3), "2024q3");
414 assert_eq!(PartitionInterval::Quarterly.partition_suffix(q4), "2024q4");
415 }
416
417 #[test]
418 fn test_partition_bounds_daily() {
419 let date = Utc.with_ymd_and_hms(2024, 3, 15, 12, 30, 45).unwrap();
420 let (start, end) = PartitionInterval::Daily.partition_bounds(date);
421
422 assert_eq!(start, Utc.with_ymd_and_hms(2024, 3, 15, 0, 0, 0).unwrap());
423 assert_eq!(end, Utc.with_ymd_and_hms(2024, 3, 16, 0, 0, 0).unwrap());
424 }
425
426 #[test]
427 fn test_partition_bounds_monthly() {
428 let date = Utc.with_ymd_and_hms(2024, 3, 15, 12, 0, 0).unwrap();
429 let (start, end) = PartitionInterval::Monthly.partition_bounds(date);
430
431 assert_eq!(start, Utc.with_ymd_and_hms(2024, 3, 1, 0, 0, 0).unwrap());
432 assert_eq!(end, Utc.with_ymd_and_hms(2024, 4, 1, 0, 0, 0).unwrap());
433 }
434
435 #[test]
436 fn test_partition_bounds_yearly() {
437 let date = Utc.with_ymd_and_hms(2026, 6, 15, 12, 0, 0).unwrap();
438 let (start, end) = PartitionInterval::Yearly.partition_bounds(date);
439
440 assert_eq!(start, Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap());
441 assert_eq!(end, Utc.with_ymd_and_hms(2027, 1, 1, 0, 0, 0).unwrap());
442 }
443
444 #[test]
445 fn test_partitioning_strategy_enum() {
446 let range = PartitioningStrategy::Range {
447 column: "created_at".to_string(),
448 };
449 assert!(matches!(range, PartitioningStrategy::Range { .. }));
450
451 let list = PartitioningStrategy::List {
452 column: "status".to_string(),
453 };
454 assert!(matches!(list, PartitioningStrategy::List { .. }));
455
456 let hash = PartitioningStrategy::Hash {
457 column: "user_id".to_string(),
458 num_partitions: 4,
459 };
460 assert!(matches!(hash, PartitioningStrategy::Hash { .. }));
461 }
462
463 #[test]
464 fn test_partition_definition_create_sql() {
465 let def = PartitionDefinition {
466 table_name: "events".to_string(),
467 partition_name: "events_202401".to_string(),
468 strategy: PartitioningStrategy::Range {
469 column: "created_at".to_string(),
470 },
471 };
472
473 let sql = def.create_sql();
474 assert!(sql.contains("CREATE TABLE"));
475 assert!(sql.contains("PARTITION OF"));
476 assert!(sql.contains("events"));
477 }
478
479 #[test]
480 fn test_partition_definition_drop_sql() {
481 let def = PartitionDefinition {
482 table_name: "events".to_string(),
483 partition_name: "events_202401".to_string(),
484 strategy: PartitioningStrategy::Range {
485 column: "created_at".to_string(),
486 },
487 };
488
489 let sql = def.drop_sql();
490 assert_eq!(sql, "DROP TABLE IF EXISTS events_202401");
491 }
492
493 #[test]
494 fn test_partition_stats_display() {
495 let stats = PartitionStats {
496 partition_count: 3,
497 total_rows: 10000,
498 total_size_bytes: 1048576,
499 partitions: vec!["p1".to_string(), "p2".to_string(), "p3".to_string()],
500 };
501
502 let display = format!("{}", stats);
503 assert!(display.contains("Partitions: 3"));
504 assert!(display.contains("Rows: 10000"));
505 assert!(display.contains("Size: 1048576 bytes"));
506 }
507
508 #[test]
509 fn test_partition_interval_enum() {
510 assert_eq!(PartitionInterval::Daily, PartitionInterval::Daily);
511 assert_ne!(PartitionInterval::Daily, PartitionInterval::Monthly);
512 }
513}