elefant_tools/models/
table.rs

1use crate::helpers::StringExt;
2use crate::models::column::PostgresColumn;
3use crate::models::constraint::PostgresConstraint;
4use crate::models::hypertable_retention::HypertableRetention;
5use crate::models::index::PostgresIndex;
6use crate::models::schema::PostgresSchema;
7use crate::object_id::ObjectId;
8use crate::pg_interval::Interval;
9use crate::postgres_client_wrapper::FromPgChar;
10use crate::quoting::AttemptedKeywordUsage::ColumnName;
11use crate::quoting::{
12    quote_value_string, AttemptedKeywordUsage, IdentifierQuoter, Quotable, QuotableIter,
13};
14use crate::storage::DataFormat;
15use crate::{default, ElefantToolsError, HypertableCompression, PostgresIndexType};
16use itertools::Itertools;
17use serde::{Deserialize, Serialize};
18
19#[derive(Debug, Eq, PartialEq, Default, Clone, Serialize, Deserialize)]
20pub struct PostgresTable {
21    pub name: String,
22    pub columns: Vec<PostgresColumn>,
23    pub constraints: Vec<PostgresConstraint>,
24    pub indices: Vec<PostgresIndex>,
25    pub comment: Option<String>,
26    pub storage_parameters: Vec<String>,
27    pub table_type: TableTypeDetails,
28    pub object_id: ObjectId,
29    pub depends_on: Vec<ObjectId>,
30}
31
32impl PostgresTable {
33    pub fn new(name: &str) -> Self {
34        PostgresTable {
35            name: name.to_string(),
36            ..default()
37        }
38    }
39
40    pub fn get_create_statement(
41        &self,
42        schema: &PostgresSchema,
43        identifier_quoter: &IdentifierQuoter,
44    ) -> String {
45        let escaped_relation_name = format!(
46            "{}.{}",
47            schema.name.quote(identifier_quoter, ColumnName),
48            self.name.quote(identifier_quoter, ColumnName)
49        );
50        let mut sql = "create table ".to_string();
51        sql.push_str(&escaped_relation_name);
52
53        if let TableTypeDetails::PartitionedChildTable {
54            partition_expression,
55            parent_table,
56        } = &self.table_type
57        {
58            sql.push_str(" partition of ");
59            sql.push_str(&parent_table.quote(identifier_quoter, ColumnName));
60            sql.push(' ');
61            sql.push_str(partition_expression);
62        } else {
63            sql.push_str(" (");
64
65            let mut text_row_count = 0;
66
67            for column in &self.columns {
68                if text_row_count > 0 {
69                    sql.push(',');
70                }
71                sql.push_str("\n    ");
72                sql.push_str(&column.name.quote(identifier_quoter, ColumnName));
73                sql.push(' ');
74                sql.push_str(&column.data_type.quote(identifier_quoter, ColumnName));
75
76                if let Some(length) = column.data_type_length {
77                    sql.push_str(&format!("({})", length));
78                }
79
80                for _ in 0..column.array_dimensions {
81                    sql.push_str("[]");
82                }
83
84                if !column.is_nullable {
85                    sql.push_str(" not null");
86                }
87
88                if let Some(generated) = &column.generated {
89                    sql.push_str(" generated always as (");
90                    sql.push_str(generated);
91                    sql.push_str(") stored");
92                }
93
94                text_row_count += 1;
95            }
96
97            for index in &self.indices {
98                if index.index_constraint_type == PostgresIndexType::PrimaryKey {
99                    if text_row_count > 0 {
100                        sql.push(',');
101                    }
102
103                    sql.push_str("\n    constraint ");
104                    sql.push_str(&index.name.quote(identifier_quoter, ColumnName));
105                    sql.push_str(" primary key (");
106
107                    // We don't need to escape the column names here as they are already escaped in the index definition.
108                    sql.push_join(", ", index.key_columns.iter().map(|c| &c.name));
109                    sql.push(')');
110                    text_row_count += 1;
111                }
112            }
113
114            for constraint in &self.constraints {
115                if let PostgresConstraint::Check(check) = constraint {
116                    if text_row_count > 0 {
117                        sql.push(',');
118                    }
119                    sql.push_str("\n    constraint ");
120                    sql.push_str(&check.name.quote(identifier_quoter, ColumnName));
121                    sql.push_str(" check ");
122                    sql.push_str(&check.check_clause);
123                    text_row_count += 1;
124                }
125            }
126
127            if let TableTypeDetails::PartitionedParentTable {
128                partition_strategy,
129                partition_columns,
130                ..
131            } = &self.table_type
132            {
133                sql.push_str("\n) partition by ");
134                sql.push_str(match partition_strategy {
135                    TablePartitionStrategy::Hash => "hash",
136                    TablePartitionStrategy::List => "list",
137                    TablePartitionStrategy::Range => "range",
138                });
139                sql.push_str(" (");
140
141                match partition_columns {
142                    PartitionedTableColumns::Columns(columns) => {
143                        sql.push_join(
144                            ", ",
145                            columns
146                                .iter()
147                                .map(|c| c.quote(identifier_quoter, ColumnName)),
148                        );
149                    }
150                    PartitionedTableColumns::Expression(expr) => {
151                        sql.push_str(expr);
152                    }
153                }
154
155                sql.push(')');
156            } else if let TableTypeDetails::InheritedTable { parent_tables } = &self.table_type {
157                sql.push_str("\n) inherits (");
158                sql.push_join(
159                    ", ",
160                    parent_tables.iter().map(|c| {
161                        c.quote(identifier_quoter, AttemptedKeywordUsage::TypeOrFunctionName)
162                    }),
163                );
164                sql.push(')');
165            } else {
166                sql.push_str("\n)");
167            }
168        }
169
170        if !self.storage_parameters.is_empty() {
171            sql.push_str("\nwith (");
172            sql.push_join(", ", self.storage_parameters.iter());
173            sql.push(')');
174        }
175
176        sql.push(';');
177
178        if let Some(c) = &self.comment {
179            sql.push_str(&format!(
180                "\ncomment on table {} is {};",
181                escaped_relation_name,
182                quote_value_string(c)
183            ));
184        }
185
186        for col in &self.columns {
187            if let Some(c) = &col.comment {
188                sql.push_str(&format!(
189                    "\ncomment on column {}.{} is {};",
190                    escaped_relation_name,
191                    col.name.quote(identifier_quoter, ColumnName),
192                    quote_value_string(c)
193                ));
194            }
195        }
196
197        for constraint in &self.constraints {
198            if let PostgresConstraint::Check(constraint) = constraint {
199                if let Some(c) = &constraint.comment {
200                    sql.push_str(&format!(
201                        "\ncomment on constraint {} on {} is {};",
202                        constraint.name.quote(identifier_quoter, ColumnName),
203                        escaped_relation_name,
204                        quote_value_string(c)
205                    ));
206                }
207            }
208        }
209
210        if let TableTypeDetails::TimescaleHypertable {
211            dimensions,
212            compression: _,
213            retention: _,
214        } = &self.table_type
215        {
216            // We don't need timescale to create the indices as we do it later on again based on what was exported.
217            for (idx, dim) in dimensions.iter().enumerate() {
218                match dim {
219                    HypertableDimension::Time {
220                        column_name,
221                        time_interval,
222                    } => {
223                        if idx == 0 {
224                            sql.push_str(&format!("\nselect public.create_hypertable('{}', by_range('{}', INTERVAL '{}'), create_default_indexes => false);", escaped_relation_name, column_name.quote(identifier_quoter, ColumnName), time_interval.to_postgres()));
225                        } else {
226                            sql.push_str(&format!("\nselect public.add_dimension('{}', by_range('{}', INTERVAL '{}'));", escaped_relation_name, column_name.quote(identifier_quoter, ColumnName), time_interval.to_postgres()));
227                        }
228                    }
229                    HypertableDimension::SpaceInterval {
230                        column_name,
231                        integer_interval,
232                    } => {
233                        if idx == 0 {
234                            sql.push_str(&format!("\nselect public.create_hypertable('{}', by_range('{}', {}), create_default_indexes => false);", escaped_relation_name, column_name.quote(identifier_quoter, ColumnName), integer_interval));
235                        } else {
236                            sql.push_str(&format!(
237                                "\nselect public.add_dimension('{}', by_range('{}', {}));",
238                                escaped_relation_name,
239                                column_name.quote(identifier_quoter, ColumnName),
240                                integer_interval
241                            ));
242                        }
243                    }
244                    HypertableDimension::SpacePartitions {
245                        column_name,
246                        num_partitions,
247                    } => {
248                        if idx == 0 {
249                            sql.push_str(&format!("\nselect public.create_hypertable('{}', by_hash('{}', {}), create_default_indexes => false);", escaped_relation_name, column_name.quote(identifier_quoter, ColumnName), num_partitions));
250                        } else {
251                            sql.push_str(&format!(
252                                "\nselect public.add_dimension('{}', by_hash('{}', {}));",
253                                escaped_relation_name,
254                                column_name.quote(identifier_quoter, ColumnName),
255                                num_partitions
256                            ));
257                        }
258                    }
259                }
260            }
261        }
262
263        sql
264    }
265
266    pub fn get_copy_in_command(
267        &self,
268        schema: &PostgresSchema,
269        data_format: &DataFormat,
270        identifier_quoter: &IdentifierQuoter,
271    ) -> String {
272        let mut s = "copy ".to_string();
273
274        s.push_str(&schema.name.quote(identifier_quoter, ColumnName));
275        s.push('.');
276        s.push_str(&self.name.quote(identifier_quoter, ColumnName));
277
278        s.push_str(" (");
279
280        let cols = self.get_copy_columns_expression(identifier_quoter);
281
282        s.push_str(&cols);
283
284        s.push_str(") from stdin with (format ");
285        match data_format {
286            DataFormat::Text => {
287                s.push_str("text");
288            }
289            DataFormat::PostgresBinary { .. } => {
290                s.push_str("binary");
291            }
292        }
293        s.push_str(", header false);");
294
295        s
296    }
297
298    pub fn get_copy_out_command(
299        &self,
300        schema: &PostgresSchema,
301        data_format: &DataFormat,
302        identifier_quoter: &IdentifierQuoter,
303    ) -> String {
304        let mut s = "copy ".to_string();
305
306        if let TableTypeDetails::TimescaleHypertable { .. } = self.table_type {
307            s.push_str("(select ");
308            let cols = self.get_copy_columns_expression(identifier_quoter);
309
310            s.push_str(&cols);
311            s.push_str(" from ");
312
313            s.push_str(&schema.name.quote(identifier_quoter, ColumnName));
314            s.push('.');
315            s.push_str(&self.name.quote(identifier_quoter, ColumnName));
316            s.push_str(") ");
317        } else {
318            s.push_str(&schema.name.quote(identifier_quoter, ColumnName));
319            s.push('.');
320            s.push_str(&self.name.quote(identifier_quoter, ColumnName));
321
322            s.push_str(" (");
323
324            let cols = self.get_copy_columns_expression(identifier_quoter);
325
326            s.push_str(&cols);
327            s.push_str(") ");
328        }
329
330        s.push_str(" to stdout with (format ");
331        match data_format {
332            DataFormat::Text => {
333                s.push_str("text");
334            }
335            DataFormat::PostgresBinary { .. } => {
336                s.push_str("binary");
337            }
338        }
339        s.push_str(", header false, encoding 'utf-8');");
340
341        s
342    }
343
344    fn get_copy_columns_expression(&self, identifier_quoter: &IdentifierQuoter) -> String {
345        self.get_writable_columns()
346            .map(|c| c.name.as_str())
347            .quote(identifier_quoter, ColumnName)
348            .join(", ")
349    }
350
351    pub fn get_writable_columns(&self) -> impl Iterator<Item = &PostgresColumn> {
352        self.columns
353            .iter()
354            .filter(|c| c.generated.is_none())
355            .sorted_by_key(|c| c.ordinal_position)
356    }
357
358    pub fn get_timescale_post_settings(
359        &self,
360        schema: &PostgresSchema,
361        identifier_quoter: &IdentifierQuoter,
362    ) -> Option<String> {
363        if let TableTypeDetails::TimescaleHypertable {
364            compression,
365            retention,
366            ..
367        } = &self.table_type
368        {
369            let escaped_relation_name = format!(
370                "{}.{}",
371                schema.name.quote(identifier_quoter, ColumnName),
372                self.name.quote(identifier_quoter, ColumnName)
373            );
374            let mut sql = String::new();
375            if let Some(compression) = compression {
376                sql.push_str("alter table ");
377                compression.add_compression_settings(
378                    &mut sql,
379                    &escaped_relation_name,
380                    identifier_quoter,
381                );
382            }
383
384            if let Some(retention) = retention {
385                if !sql.is_empty() {
386                    sql.push('\n');
387                }
388
389                retention.add_retention(&mut sql, &escaped_relation_name);
390            }
391
392            if !sql.is_empty() {
393                return Some(sql);
394            }
395        }
396
397        None
398    }
399
400    pub fn is_timescale_table(&self) -> bool {
401        matches!(
402            self.table_type,
403            TableTypeDetails::TimescaleHypertable { .. }
404        )
405    }
406}
407
408#[derive(Debug, Eq, PartialEq, Clone, Default, Serialize, Deserialize)]
409#[serde(tag = "type")]
410pub enum TableTypeDetails {
411    #[default]
412    Table,
413    PartitionedParentTable {
414        partition_strategy: TablePartitionStrategy,
415        default_partition_name: Option<String>,
416        partition_columns: PartitionedTableColumns,
417    },
418    PartitionedChildTable {
419        parent_table: String,
420        partition_expression: String,
421    },
422    InheritedTable {
423        parent_tables: Vec<String>,
424    },
425    TimescaleHypertable {
426        dimensions: Vec<HypertableDimension>,
427        compression: Option<HypertableCompression>,
428        retention: Option<HypertableRetention>,
429    },
430}
431
432#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
433#[serde(tag = "type")]
434pub enum PartitionedTableColumns {
435    Columns(Vec<String>),
436    Expression(String),
437}
438
439#[derive(Debug, Eq, PartialEq, Copy, Clone, Serialize, Deserialize)]
440pub enum TablePartitionStrategy {
441    Hash,
442    List,
443    Range,
444}
445
446impl FromPgChar for TablePartitionStrategy {
447    fn from_pg_char(c: char) -> Result<Self, ElefantToolsError> {
448        match c {
449            'h' => Ok(TablePartitionStrategy::Hash),
450            'l' => Ok(TablePartitionStrategy::List),
451            'r' => Ok(TablePartitionStrategy::Range),
452            _ => Err(ElefantToolsError::InvalidTablePartitioningStrategy(
453                c.to_string(),
454            )),
455        }
456    }
457}
458
459#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
460#[serde(tag = "type")]
461pub enum HypertableDimension {
462    Time {
463        column_name: String,
464        time_interval: Interval,
465    },
466    SpaceInterval {
467        column_name: String,
468        integer_interval: i64,
469    },
470    SpacePartitions {
471        column_name: String,
472        num_partitions: i16,
473    },
474}