1use crate::helpers::StringExt;
2use crate::models::column::PostgresColumn;
3use crate::models::constraint::PostgresConstraint;
4use crate::models::hypertable_retention::HypertableRetention;
5use crate::models::index::PostgresIndex;
6use crate::models::schema::PostgresSchema;
7use crate::object_id::ObjectId;
8use crate::pg_interval::Interval;
9use crate::postgres_client_wrapper::FromPgChar;
10use crate::quoting::AttemptedKeywordUsage::ColumnName;
11use crate::quoting::{
12 quote_value_string, AttemptedKeywordUsage, IdentifierQuoter, Quotable, QuotableIter,
13};
14use crate::storage::DataFormat;
15use crate::{default, ElefantToolsError, HypertableCompression, PostgresIndexType};
16use itertools::Itertools;
17use serde::{Deserialize, Serialize};
18
19#[derive(Debug, Eq, PartialEq, Default, Clone, Serialize, Deserialize)]
20pub struct PostgresTable {
21 pub name: String,
22 pub columns: Vec<PostgresColumn>,
23 pub constraints: Vec<PostgresConstraint>,
24 pub indices: Vec<PostgresIndex>,
25 pub comment: Option<String>,
26 pub storage_parameters: Vec<String>,
27 pub table_type: TableTypeDetails,
28 pub object_id: ObjectId,
29 pub depends_on: Vec<ObjectId>,
30}
31
32impl PostgresTable {
33 pub fn new(name: &str) -> Self {
34 PostgresTable {
35 name: name.to_string(),
36 ..default()
37 }
38 }
39
40 pub fn get_create_statement(
41 &self,
42 schema: &PostgresSchema,
43 identifier_quoter: &IdentifierQuoter,
44 ) -> String {
45 let escaped_relation_name = format!(
46 "{}.{}",
47 schema.name.quote(identifier_quoter, ColumnName),
48 self.name.quote(identifier_quoter, ColumnName)
49 );
50 let mut sql = "create table ".to_string();
51 sql.push_str(&escaped_relation_name);
52
53 if let TableTypeDetails::PartitionedChildTable {
54 partition_expression,
55 parent_table,
56 } = &self.table_type
57 {
58 sql.push_str(" partition of ");
59 sql.push_str(&parent_table.quote(identifier_quoter, ColumnName));
60 sql.push(' ');
61 sql.push_str(partition_expression);
62 } else {
63 sql.push_str(" (");
64
65 let mut text_row_count = 0;
66
67 for column in &self.columns {
68 if text_row_count > 0 {
69 sql.push(',');
70 }
71 sql.push_str("\n ");
72 sql.push_str(&column.name.quote(identifier_quoter, ColumnName));
73 sql.push(' ');
74 sql.push_str(&column.data_type.quote(identifier_quoter, ColumnName));
75
76 if let Some(length) = column.data_type_length {
77 sql.push_str(&format!("({})", length));
78 }
79
80 for _ in 0..column.array_dimensions {
81 sql.push_str("[]");
82 }
83
84 if !column.is_nullable {
85 sql.push_str(" not null");
86 }
87
88 if let Some(generated) = &column.generated {
89 sql.push_str(" generated always as (");
90 sql.push_str(generated);
91 sql.push_str(") stored");
92 }
93
94 text_row_count += 1;
95 }
96
97 for index in &self.indices {
98 if index.index_constraint_type == PostgresIndexType::PrimaryKey {
99 if text_row_count > 0 {
100 sql.push(',');
101 }
102
103 sql.push_str("\n constraint ");
104 sql.push_str(&index.name.quote(identifier_quoter, ColumnName));
105 sql.push_str(" primary key (");
106
107 sql.push_join(", ", index.key_columns.iter().map(|c| &c.name));
109 sql.push(')');
110 text_row_count += 1;
111 }
112 }
113
114 for constraint in &self.constraints {
115 if let PostgresConstraint::Check(check) = constraint {
116 if text_row_count > 0 {
117 sql.push(',');
118 }
119 sql.push_str("\n constraint ");
120 sql.push_str(&check.name.quote(identifier_quoter, ColumnName));
121 sql.push_str(" check ");
122 sql.push_str(&check.check_clause);
123 text_row_count += 1;
124 }
125 }
126
127 if let TableTypeDetails::PartitionedParentTable {
128 partition_strategy,
129 partition_columns,
130 ..
131 } = &self.table_type
132 {
133 sql.push_str("\n) partition by ");
134 sql.push_str(match partition_strategy {
135 TablePartitionStrategy::Hash => "hash",
136 TablePartitionStrategy::List => "list",
137 TablePartitionStrategy::Range => "range",
138 });
139 sql.push_str(" (");
140
141 match partition_columns {
142 PartitionedTableColumns::Columns(columns) => {
143 sql.push_join(
144 ", ",
145 columns
146 .iter()
147 .map(|c| c.quote(identifier_quoter, ColumnName)),
148 );
149 }
150 PartitionedTableColumns::Expression(expr) => {
151 sql.push_str(expr);
152 }
153 }
154
155 sql.push(')');
156 } else if let TableTypeDetails::InheritedTable { parent_tables } = &self.table_type {
157 sql.push_str("\n) inherits (");
158 sql.push_join(
159 ", ",
160 parent_tables.iter().map(|c| {
161 c.quote(identifier_quoter, AttemptedKeywordUsage::TypeOrFunctionName)
162 }),
163 );
164 sql.push(')');
165 } else {
166 sql.push_str("\n)");
167 }
168 }
169
170 if !self.storage_parameters.is_empty() {
171 sql.push_str("\nwith (");
172 sql.push_join(", ", self.storage_parameters.iter());
173 sql.push(')');
174 }
175
176 sql.push(';');
177
178 if let Some(c) = &self.comment {
179 sql.push_str(&format!(
180 "\ncomment on table {} is {};",
181 escaped_relation_name,
182 quote_value_string(c)
183 ));
184 }
185
186 for col in &self.columns {
187 if let Some(c) = &col.comment {
188 sql.push_str(&format!(
189 "\ncomment on column {}.{} is {};",
190 escaped_relation_name,
191 col.name.quote(identifier_quoter, ColumnName),
192 quote_value_string(c)
193 ));
194 }
195 }
196
197 for constraint in &self.constraints {
198 if let PostgresConstraint::Check(constraint) = constraint {
199 if let Some(c) = &constraint.comment {
200 sql.push_str(&format!(
201 "\ncomment on constraint {} on {} is {};",
202 constraint.name.quote(identifier_quoter, ColumnName),
203 escaped_relation_name,
204 quote_value_string(c)
205 ));
206 }
207 }
208 }
209
210 if let TableTypeDetails::TimescaleHypertable {
211 dimensions,
212 compression: _,
213 retention: _,
214 } = &self.table_type
215 {
216 for (idx, dim) in dimensions.iter().enumerate() {
218 match dim {
219 HypertableDimension::Time {
220 column_name,
221 time_interval,
222 } => {
223 if idx == 0 {
224 sql.push_str(&format!("\nselect public.create_hypertable('{}', by_range('{}', INTERVAL '{}'), create_default_indexes => false);", escaped_relation_name, column_name.quote(identifier_quoter, ColumnName), time_interval.to_postgres()));
225 } else {
226 sql.push_str(&format!("\nselect public.add_dimension('{}', by_range('{}', INTERVAL '{}'));", escaped_relation_name, column_name.quote(identifier_quoter, ColumnName), time_interval.to_postgres()));
227 }
228 }
229 HypertableDimension::SpaceInterval {
230 column_name,
231 integer_interval,
232 } => {
233 if idx == 0 {
234 sql.push_str(&format!("\nselect public.create_hypertable('{}', by_range('{}', {}), create_default_indexes => false);", escaped_relation_name, column_name.quote(identifier_quoter, ColumnName), integer_interval));
235 } else {
236 sql.push_str(&format!(
237 "\nselect public.add_dimension('{}', by_range('{}', {}));",
238 escaped_relation_name,
239 column_name.quote(identifier_quoter, ColumnName),
240 integer_interval
241 ));
242 }
243 }
244 HypertableDimension::SpacePartitions {
245 column_name,
246 num_partitions,
247 } => {
248 if idx == 0 {
249 sql.push_str(&format!("\nselect public.create_hypertable('{}', by_hash('{}', {}), create_default_indexes => false);", escaped_relation_name, column_name.quote(identifier_quoter, ColumnName), num_partitions));
250 } else {
251 sql.push_str(&format!(
252 "\nselect public.add_dimension('{}', by_hash('{}', {}));",
253 escaped_relation_name,
254 column_name.quote(identifier_quoter, ColumnName),
255 num_partitions
256 ));
257 }
258 }
259 }
260 }
261 }
262
263 sql
264 }
265
266 pub fn get_copy_in_command(
267 &self,
268 schema: &PostgresSchema,
269 data_format: &DataFormat,
270 identifier_quoter: &IdentifierQuoter,
271 ) -> String {
272 let mut s = "copy ".to_string();
273
274 s.push_str(&schema.name.quote(identifier_quoter, ColumnName));
275 s.push('.');
276 s.push_str(&self.name.quote(identifier_quoter, ColumnName));
277
278 s.push_str(" (");
279
280 let cols = self.get_copy_columns_expression(identifier_quoter);
281
282 s.push_str(&cols);
283
284 s.push_str(") from stdin with (format ");
285 match data_format {
286 DataFormat::Text => {
287 s.push_str("text");
288 }
289 DataFormat::PostgresBinary { .. } => {
290 s.push_str("binary");
291 }
292 }
293 s.push_str(", header false);");
294
295 s
296 }
297
298 pub fn get_copy_out_command(
299 &self,
300 schema: &PostgresSchema,
301 data_format: &DataFormat,
302 identifier_quoter: &IdentifierQuoter,
303 ) -> String {
304 let mut s = "copy ".to_string();
305
306 if let TableTypeDetails::TimescaleHypertable { .. } = self.table_type {
307 s.push_str("(select ");
308 let cols = self.get_copy_columns_expression(identifier_quoter);
309
310 s.push_str(&cols);
311 s.push_str(" from ");
312
313 s.push_str(&schema.name.quote(identifier_quoter, ColumnName));
314 s.push('.');
315 s.push_str(&self.name.quote(identifier_quoter, ColumnName));
316 s.push_str(") ");
317 } else {
318 s.push_str(&schema.name.quote(identifier_quoter, ColumnName));
319 s.push('.');
320 s.push_str(&self.name.quote(identifier_quoter, ColumnName));
321
322 s.push_str(" (");
323
324 let cols = self.get_copy_columns_expression(identifier_quoter);
325
326 s.push_str(&cols);
327 s.push_str(") ");
328 }
329
330 s.push_str(" to stdout with (format ");
331 match data_format {
332 DataFormat::Text => {
333 s.push_str("text");
334 }
335 DataFormat::PostgresBinary { .. } => {
336 s.push_str("binary");
337 }
338 }
339 s.push_str(", header false, encoding 'utf-8');");
340
341 s
342 }
343
344 fn get_copy_columns_expression(&self, identifier_quoter: &IdentifierQuoter) -> String {
345 self.get_writable_columns()
346 .map(|c| c.name.as_str())
347 .quote(identifier_quoter, ColumnName)
348 .join(", ")
349 }
350
351 pub fn get_writable_columns(&self) -> impl Iterator<Item = &PostgresColumn> {
352 self.columns
353 .iter()
354 .filter(|c| c.generated.is_none())
355 .sorted_by_key(|c| c.ordinal_position)
356 }
357
358 pub fn get_timescale_post_settings(
359 &self,
360 schema: &PostgresSchema,
361 identifier_quoter: &IdentifierQuoter,
362 ) -> Option<String> {
363 if let TableTypeDetails::TimescaleHypertable {
364 compression,
365 retention,
366 ..
367 } = &self.table_type
368 {
369 let escaped_relation_name = format!(
370 "{}.{}",
371 schema.name.quote(identifier_quoter, ColumnName),
372 self.name.quote(identifier_quoter, ColumnName)
373 );
374 let mut sql = String::new();
375 if let Some(compression) = compression {
376 sql.push_str("alter table ");
377 compression.add_compression_settings(
378 &mut sql,
379 &escaped_relation_name,
380 identifier_quoter,
381 );
382 }
383
384 if let Some(retention) = retention {
385 if !sql.is_empty() {
386 sql.push('\n');
387 }
388
389 retention.add_retention(&mut sql, &escaped_relation_name);
390 }
391
392 if !sql.is_empty() {
393 return Some(sql);
394 }
395 }
396
397 None
398 }
399
400 pub fn is_timescale_table(&self) -> bool {
401 matches!(
402 self.table_type,
403 TableTypeDetails::TimescaleHypertable { .. }
404 )
405 }
406}
407
408#[derive(Debug, Eq, PartialEq, Clone, Default, Serialize, Deserialize)]
409#[serde(tag = "type")]
410pub enum TableTypeDetails {
411 #[default]
412 Table,
413 PartitionedParentTable {
414 partition_strategy: TablePartitionStrategy,
415 default_partition_name: Option<String>,
416 partition_columns: PartitionedTableColumns,
417 },
418 PartitionedChildTable {
419 parent_table: String,
420 partition_expression: String,
421 },
422 InheritedTable {
423 parent_tables: Vec<String>,
424 },
425 TimescaleHypertable {
426 dimensions: Vec<HypertableDimension>,
427 compression: Option<HypertableCompression>,
428 retention: Option<HypertableRetention>,
429 },
430}
431
432#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
433#[serde(tag = "type")]
434pub enum PartitionedTableColumns {
435 Columns(Vec<String>),
436 Expression(String),
437}
438
439#[derive(Debug, Eq, PartialEq, Copy, Clone, Serialize, Deserialize)]
440pub enum TablePartitionStrategy {
441 Hash,
442 List,
443 Range,
444}
445
446impl FromPgChar for TablePartitionStrategy {
447 fn from_pg_char(c: char) -> Result<Self, ElefantToolsError> {
448 match c {
449 'h' => Ok(TablePartitionStrategy::Hash),
450 'l' => Ok(TablePartitionStrategy::List),
451 'r' => Ok(TablePartitionStrategy::Range),
452 _ => Err(ElefantToolsError::InvalidTablePartitioningStrategy(
453 c.to_string(),
454 )),
455 }
456 }
457}
458
459#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
460#[serde(tag = "type")]
461pub enum HypertableDimension {
462 Time {
463 column_name: String,
464 time_interval: Interval,
465 },
466 SpaceInterval {
467 column_name: String,
468 integer_interval: i64,
469 },
470 SpacePartitions {
471 column_name: String,
472 num_partitions: i16,
473 },
474}