elefant_tools/
copy_data.rs

1use crate::object_id::DependencySortable;
2use crate::parallel_runner::ParallelRunner;
3use crate::quoting::IdentifierQuoter;
4use crate::storage::DataFormat;
5use crate::storage::{CopyDestination, CopySource};
6use crate::*;
7use itertools::Itertools;
8use std::num::NonZeroUsize;
9use tracing::{debug, info, instrument};
10
11#[derive(Debug, Default)]
12pub struct CopyDataOptions {
13    /// Force this data format to be used
14    pub data_format: Option<DataFormat>,
15    /// How many tables to copy in parallel at most
16    pub max_parallel: Option<NonZeroUsize>,
17
18    /// The schema to inspect
19    pub target_schema: Option<String>,
20
21    /// If `target_schema` is specified it will be renamed to this
22    /// when applied to the destination.
23    pub rename_schema_to: Option<String>,
24
25    /// Only the schema will be copied, but not any data
26    pub schema_only: bool,
27
28    /// Only the structures missing in the destination will be copied.
29    /// Data copy is only checked against "empty table" vs "non-empty table".
30    /// This only works with data sources that supports structural inspections, aka
31    /// not sql-files.
32    pub differential: bool,
33}
34
35const NON_ZERO_USIZE1: NonZeroUsize = unsafe {
36    // SAFETY: 1 is not zero
37    NonZeroUsize::new_unchecked(1)
38};
39
40impl CopyDataOptions {
41    fn get_max_parallel_or_1(&self) -> NonZeroUsize {
42        self.max_parallel.unwrap_or(NON_ZERO_USIZE1)
43    }
44}
45
46/// Copies data and structures from the provided source to the destination.
47///
48/// This is probably the main function you want to deal with when using Elefant Tools as a library.
49#[instrument(skip_all)]
50pub async fn copy_data<'d, S: CopySourceFactory, D: CopyDestinationFactory<'d>>(
51    source: &S,
52    destination: &'d mut D,
53    options: CopyDataOptions,
54) -> Result<()> {
55    let data_format = get_data_type(source, destination, &options).await?;
56
57    let expected_parallelism = if options.get_max_parallel_or_1() == NON_ZERO_USIZE1 {
58        SupportedParallelism::Sequential
59    } else {
60        source
61            .supported_parallelism()
62            .negotiate_parallelism(destination.supported_parallelism())
63    };
64
65    let (source, mut destination) = match expected_parallelism {
66        SupportedParallelism::Sequential => (
67            SequentialOrParallel::Sequential(source.create_sequential_source().await?),
68            SequentialOrParallel::Sequential(destination.create_sequential_destination().await?),
69        ),
70        SupportedParallelism::Parallel => (
71            source.create_source().await?,
72            destination.create_destination().await?,
73        ),
74    };
75
76    let definition = source.get_introspection().await?;
77    let destination_definition = if options.differential {
78        destination
79            .try_get_introspeciton()
80            .await?
81            .unwrap_or_default()
82    } else {
83        default()
84    };
85
86    let source_definition = if let Some(target_schema) = &options.target_schema {
87        definition.filtered_to_schema(target_schema)
88    } else {
89        definition
90    };
91
92    let target_definition = if let (Some(target_schema), Some(rename_to)) =
93        (&options.target_schema, &options.rename_schema_to)
94    {
95        source_definition.with_renamed_schema(target_schema, rename_to)
96    } else {
97        source_definition.clone()
98    };
99
100    if let Some(target_schema) = &options.target_schema {
101        destination_definition.filtered_to_schema(target_schema);
102    }
103
104    destination.begin_transaction().await?;
105
106    match &mut destination {
107        SequentialOrParallel::Sequential(ref mut d) => {
108            apply_pre_copy_structure(d, &target_definition, &destination_definition).await?;
109        }
110        SequentialOrParallel::Parallel(ref mut d) => {
111            apply_pre_copy_structure(d, &target_definition, &destination_definition).await?;
112        }
113    }
114
115    destination.commit_transaction().await?;
116
117    if !options.schema_only {
118        let mut parallel_runner = ParallelRunner::new(options.get_max_parallel_or_1());
119
120        for target_schema in &target_definition.schemas {
121            let source_schema = source_definition
122                .schemas
123                .iter()
124                .find(|s| s.object_id == target_schema.object_id);
125            let source_schema = match source_schema {
126                Some(s) => s,
127                None => {
128                    continue;
129                }
130            };
131
132            for target_table in &target_schema.tables {
133                if let TableTypeDetails::PartitionedParentTable { .. } = &target_table.table_type {
134                    continue;
135                }
136
137                let source_table = source_schema
138                    .tables
139                    .iter()
140                    .find(|t| t.object_id == target_table.object_id);
141                let source_table = match source_table {
142                    Some(s) => s,
143                    None => {
144                        continue;
145                    }
146                };
147
148                match source {
149                    SequentialOrParallel::Sequential(ref source) => match &mut destination {
150                        SequentialOrParallel::Sequential(ref mut destination) => {
151                            do_copy(
152                                source,
153                                destination,
154                                target_schema,
155                                target_table,
156                                source_schema,
157                                source_table,
158                                &data_format,
159                                &options,
160                            )
161                            .await?
162                        }
163                        SequentialOrParallel::Parallel(ref mut destination) => {
164                            do_copy(
165                                source,
166                                destination,
167                                target_schema,
168                                target_table,
169                                source_schema,
170                                source_table,
171                                &data_format,
172                                &options,
173                            )
174                            .await?
175                        }
176                    },
177                    SequentialOrParallel::Parallel(ref source) => match &mut destination {
178                        SequentialOrParallel::Sequential(ref mut destination) => {
179                            do_copy(
180                                source,
181                                destination,
182                                target_schema,
183                                target_table,
184                                source_schema,
185                                source_table,
186                                &data_format,
187                                &options,
188                            )
189                            .await?
190                        }
191                        SequentialOrParallel::Parallel(ref mut destination) => {
192                            let source = source.clone();
193                            let destination = destination.clone();
194                            let df = data_format.clone();
195                            let opt = &options;
196                            parallel_runner
197                                .enqueue(async move {
198                                    let source = source;
199                                    let mut destination = destination;
200                                    do_copy(
201                                        &source,
202                                        &mut destination,
203                                        target_schema,
204                                        target_table,
205                                        source_schema,
206                                        source_table,
207                                        &df,
208                                        opt,
209                                    )
210                                    .await
211                                })
212                                .await?;
213                        }
214                    },
215                }
216            }
217        }
218
219        parallel_runner.run_remaining().await?;
220    }
221
222    match &mut destination {
223        SequentialOrParallel::Sequential(ref mut destination) => {
224            apply_post_copy_structure_sequential(
225                destination,
226                &target_definition,
227                &destination_definition,
228            )
229            .await?;
230        }
231        SequentialOrParallel::Parallel(ref mut destination) => {
232            apply_post_copy_structure_parallel(
233                destination,
234                &target_definition,
235                &options,
236                &destination_definition,
237            )
238            .await?;
239        }
240    }
241
242    destination.finish().await?;
243
244    Ok(())
245}
246
247/// Applies all structures needed to be able to actually insert data. This includes:
248/// * Creating schemas
249/// * Creating tables
250/// * Creating functions
251/// * Creating views
252/// * Creating custom types
253#[instrument(skip_all)]
254async fn apply_pre_copy_structure<D: CopyDestination>(
255    destination: &mut D,
256    definition: &PostgresDatabase,
257    target_definition: &PostgresDatabase,
258) -> Result<()> {
259    let identifier_quoter = destination.get_identifier_quoter();
260
261    for schema in &definition.schemas {
262        destination
263            .apply_transactional_statement(&schema.get_create_statement(&identifier_quoter))
264            .await?;
265    }
266
267    for ext in &definition.enabled_extensions {
268        if target_definition
269            .enabled_extensions
270            .iter()
271            .any(|e| e.name == ext.name)
272        {
273            debug!("Extension {} already exists in destination", ext.name);
274            continue;
275        }
276
277        destination
278            .apply_transactional_statement(&ext.get_create_statement(&identifier_quoter))
279            .await?;
280    }
281
282    for schema in &definition.schemas {
283        let target_schema = target_definition.try_get_schema(&schema.name);
284
285        for enumeration in &schema.enums {
286            if target_schema.is_some_and(|s| s.enums.iter().any(|e| e.name == enumeration.name)) {
287                debug!("Enum {} already exists in destination", enumeration.name);
288                continue;
289            }
290
291            destination
292                .apply_transactional_statement(
293                    &enumeration.get_create_statement(&identifier_quoter),
294                )
295                .await?;
296        }
297    }
298
299    let mut tables_and_functions: Vec<PostgresThingWithDependencies> = Vec::new();
300
301    for schema in &definition.schemas {
302        let target_schema = target_definition.try_get_schema(&schema.name);
303
304        for function in &schema.functions {
305            if target_schema.is_some_and(|s| {
306                s.functions
307                    .iter()
308                    .any(|f| f.function_name == function.function_name)
309            }) {
310                debug!(
311                    "Function {} already exists in destination",
312                    function.function_name
313                );
314                continue;
315            }
316
317            tables_and_functions.push(PostgresThingWithDependencies::Function(function, schema));
318        }
319
320        for aggregate_function in &schema.aggregate_functions {
321            if target_schema.is_some_and(|s| {
322                s.aggregate_functions
323                    .iter()
324                    .any(|f| f.function_name == aggregate_function.function_name)
325            }) {
326                debug!(
327                    "Aggregate function {} already exists in destination",
328                    aggregate_function.function_name
329                );
330                continue;
331            }
332
333            tables_and_functions.push(PostgresThingWithDependencies::AggregateFunction(
334                aggregate_function,
335                schema,
336            ));
337        }
338
339        for table in &schema.tables {
340            if target_schema
341                .and_then(|s| s.try_get_table(&table.name))
342                .is_some()
343            {
344                debug!("Table {} already exists in destination", table.name);
345                continue;
346            }
347
348            tables_and_functions.push(PostgresThingWithDependencies::Table(table, schema));
349        }
350
351        for view in &schema.views {
352            if target_schema.is_some_and(|s| s.views.iter().any(|v| v.name == view.name)) {
353                debug!("View {} already exists in destination", view.name);
354                continue;
355            }
356
357            tables_and_functions.push(PostgresThingWithDependencies::View(view, schema));
358        }
359
360        for domain in &schema.domains {
361            if target_schema.is_some_and(|s| s.domains.iter().any(|d| d.name == domain.name)) {
362                debug!("Domain {} already exists in destination", domain.name);
363                continue;
364            }
365
366            tables_and_functions.push(PostgresThingWithDependencies::Domain(domain, schema));
367        }
368    }
369
370    let sorted = tables_and_functions.iter().sort_by_dependencies();
371
372    for thing in sorted {
373        let sql = thing.get_create_sql(&identifier_quoter);
374        destination.apply_transactional_statement(&sql).await?;
375    }
376
377    Ok(())
378}
379
380/// Actually copies data between two tables.
381#[instrument(skip_all)]
382#[allow(clippy::too_many_arguments)]
383async fn do_copy<S: CopySource, D: CopyDestination>(
384    source: &S,
385    destination: &mut D,
386    target_schema: &PostgresSchema,
387    target_table: &PostgresTable,
388    source_schema: &PostgresSchema,
389    source_table: &PostgresTable,
390    data_format: &DataFormat,
391    options: &CopyDataOptions,
392) -> Result<()> {
393    let has_data = options.differential
394        && destination
395            .has_data_in_table(target_schema, target_table)
396            .await?;
397
398    if !has_data {
399        info!(
400            "Skipping table {} as it already has data in the destination",
401            target_table.name
402        );
403        let data = source
404            .get_data(source_schema, source_table, data_format)
405            .await?;
406
407        destination
408            .apply_data(target_schema, target_table, data)
409            .await?;
410    }
411
412    Ok(())
413}
414
415/// Get instructions to apply after the data has been copied. This includes:
416/// * Creating indexes
417/// * Creating constraints
418/// * Creating triggers
419/// * Refreshing materialized views
420#[instrument(skip_all)]
421fn get_post_apply_statement_groups(
422    definition: &PostgresDatabase,
423    identifier_quoter: &IdentifierQuoter,
424    target_definition: &PostgresDatabase,
425) -> Vec<Vec<String>> {
426    let mut statements = Vec::new();
427
428    for schema in &definition.schemas {
429        let existing_schema = target_definition.try_get_schema(&schema.name);
430
431        let mut group_1 = Vec::new();
432        let mut group_2 = Vec::new();
433        for table in &schema.tables {
434            let existing_table = existing_schema.and_then(|s| s.try_get_table(&table.name));
435
436            for index in &table.indices {
437                if index.index_constraint_type == PostgresIndexType::PrimaryKey {
438                    continue;
439                }
440
441                if existing_table.is_some_and(|t| t.indices.iter().any(|i| i.name == index.name)) {
442                    debug!(
443                        "Index {} on table {} already exists in destination",
444                        index.name, table.name
445                    );
446                    continue;
447                }
448
449                let sql = index.get_create_index_command(schema, table, identifier_quoter);
450                if table.is_timescale_table() {
451                    statements.push(vec![sql]);
452                } else {
453                    group_1.push(sql);
454                }
455            }
456        }
457
458        for sequence in &schema.sequences {
459            let existing_sequence = existing_schema
460                .and_then(|s| s.sequences.iter().find(|seq| seq.name == sequence.name));
461
462            if existing_sequence.is_none() {
463                group_1.push(sequence.get_create_statement(schema, identifier_quoter));
464            } else {
465                debug!("Sequence {} already exists in destination", sequence.name);
466            }
467            if existing_sequence.is_none()
468                || existing_sequence.is_some_and(|s| s.last_value != sequence.last_value)
469            {
470                if let Some(sql) = sequence.get_set_value_statement(schema, identifier_quoter) {
471                    group_2.push(sql);
472                }
473            }
474        }
475
476        for table in &schema.tables {
477            let existing_table = existing_schema.and_then(|s| s.try_get_table(&table.name));
478
479            for column in &table.columns {
480                let target_column =
481                    existing_table.and_then(|t| t.columns.iter().find(|c| c.name == column.name));
482
483                if target_column.is_some_and(|c| c.default_value == column.default_value) {
484                    debug!(
485                        "Default value for column {} on table {} already matches destination",
486                        column.name, table.name
487                    );
488                    continue;
489                }
490
491                if let Some(sql) =
492                    column.get_alter_table_set_default_statement(table, schema, identifier_quoter)
493                {
494                    group_2.push(sql);
495                }
496            }
497        }
498
499        statements.push(group_1);
500        statements.push(group_2);
501    }
502
503    for schema in &definition.schemas {
504        let existing_schema = target_definition.try_get_schema(&schema.name);
505
506        let mut group_3 = Vec::new();
507        for table in &schema.tables {
508            let existing_table = existing_schema.and_then(|s| s.try_get_table(&table.name));
509            for constraint in &table.constraints {
510                if let PostgresConstraint::Unique(uk) = constraint {
511                    if existing_table.is_some_and(|t| {
512                        t.constraints.iter().any(|c| c.name() == constraint.name())
513                    }) {
514                        debug!(
515                            "Unique constraint {} on table {} already exists in destination",
516                            constraint.name(),
517                            table.name
518                        );
519                        continue;
520                    }
521                    let sql = uk.get_create_statement(table, schema, identifier_quoter);
522                    if table.is_timescale_table() {
523                        statements.push(vec![sql]);
524                    } else {
525                        group_3.push(sql);
526                    }
527                }
528            }
529        }
530        statements.push(group_3);
531    }
532
533    for schema in &definition.schemas {
534        let existing_schema = target_definition.try_get_schema(&schema.name);
535        for table in &schema.tables {
536            let existing_table = existing_schema.and_then(|s| s.try_get_table(&table.name));
537            for constraint in &table.constraints {
538                if existing_table
539                    .is_some_and(|t| t.constraints.iter().any(|c| c.name() == constraint.name()))
540                {
541                    debug!(
542                        "Foreign key constraint {} on table {} already exists in destination",
543                        constraint.name(),
544                        table.name
545                    );
546                    continue;
547                }
548
549                if let PostgresConstraint::ForeignKey(fk) = constraint {
550                    let sql = fk.get_create_statement(table, schema, identifier_quoter);
551                    statements.push(vec![sql]);
552                }
553            }
554        }
555    }
556
557    let mut group_4 = Vec::new();
558    for schema in &definition.schemas {
559        let existing_schema = target_definition.try_get_schema(&schema.name);
560
561        for trigger in &schema.triggers {
562            if existing_schema.is_some_and(|s| s.triggers.iter().any(|t| t.name == trigger.name)) {
563                debug!(
564                    "Trigger {} on table {} already exists in destination",
565                    trigger.name, trigger.table_name
566                );
567                continue;
568            }
569
570            let sql = trigger.get_create_statement(schema, identifier_quoter);
571            group_4.push(sql);
572        }
573    }
574    statements.push(group_4);
575
576    for schema in &definition.schemas {
577        for view in schema.views.iter().sort_by_dependencies() {
578            if let Some(sql) = view.get_refresh_sql(schema, identifier_quoter) {
579                statements.push(vec![sql]);
580            }
581        }
582    }
583
584    let mut group_5 = Vec::new();
585    for job in &definition.timescale_support.user_defined_jobs {
586        if target_definition
587            .timescale_support
588            .user_defined_jobs
589            .iter()
590            .any(|j| {
591                j.function_schema == job.function_schema
592                    && j.function_name == job.function_name
593                    && j.config == job.config
594            })
595        {
596            debug!(
597                "Timescale job {} already exists in destination",
598                job.function_name
599            );
600            continue;
601        }
602
603        group_5.push(job.get_create_sql(identifier_quoter));
604    }
605
606    for schema in &definition.schemas {
607        let existing_schema = target_definition.try_get_schema(&schema.name);
608
609        for table in &schema.tables {
610            if let TableTypeDetails::TimescaleHypertable {
611                compression: existing_compression,
612                retention: existing_retention,
613                ..
614            } = &table.table_type
615            {
616                let existing_table = existing_schema.and_then(|s| s.try_get_table(&table.name));
617
618                if existing_table.is_some_and(|t| {
619                    if let TableTypeDetails::TimescaleHypertable {
620                        compression,
621                        retention,
622                        ..
623                    } = &t.table_type
624                    {
625                        compression == existing_compression && retention == existing_retention
626                    } else {
627                        false
628                    }
629                }) {
630                    debug!(
631                        "Timescale hypertable {} already exists in destination",
632                        table.name
633                    );
634                    continue;
635                }
636            }
637
638            if let Some(timescale_post) =
639                table.get_timescale_post_settings(schema, identifier_quoter)
640            {
641                group_5.push(timescale_post);
642            }
643        }
644    }
645
646    statements.push(group_5);
647
648    statements
649}
650
651/// Applies the structures generated in [get_post_apply_statement_groups] to the destination sequentially.
652#[instrument(skip_all)]
653async fn apply_post_copy_structure_sequential<D: CopyDestination>(
654    destination: &mut D,
655    definition: &PostgresDatabase,
656    target_definition: &PostgresDatabase,
657) -> Result<()> {
658    let identifier_quoter = destination.get_identifier_quoter();
659
660    let statement_groups =
661        get_post_apply_statement_groups(definition, &identifier_quoter, target_definition);
662
663    for group in statement_groups {
664        for statement in group {
665            destination
666                .apply_non_transactional_statement(&statement)
667                .await?;
668        }
669    }
670
671    Ok(())
672}
673
674/// Applies the structures generated in [get_post_apply_statement_groups] to the destination in parallel.
675#[instrument(skip_all)]
676async fn apply_post_copy_structure_parallel<D: CopyDestination + Sync + Clone>(
677    destination: &mut D,
678    definition: &PostgresDatabase,
679    options: &CopyDataOptions,
680    target_definition: &PostgresDatabase,
681) -> Result<()> {
682    let identifier_quoter = destination.get_identifier_quoter();
683
684    let statement_groups =
685        get_post_apply_statement_groups(definition, &identifier_quoter, target_definition);
686
687    for group in statement_groups {
688        if group.is_empty() {
689            continue;
690        }
691
692        if group.len() == 1 {
693            destination
694                .apply_non_transactional_statement(&group[0])
695                .await?;
696        } else {
697            let mut join_handles = ParallelRunner::new(options.get_max_parallel_or_1());
698
699            for statement in group {
700                let mut destination = destination.clone();
701                join_handles
702                    .enqueue(async move {
703                        destination
704                            .apply_non_transactional_statement(&statement)
705                            .await
706                    })
707                    .await?;
708            }
709
710            join_handles.run_remaining().await?;
711        }
712    }
713
714    Ok(())
715}
716
717/// Get the data format to use when copying data from the source to the destination, that both
718/// source and destination supports.
719#[instrument(skip_all)]
720async fn get_data_type(
721    source: &impl CopySourceFactory,
722    destination: &impl CopyDestinationFactory<'_>,
723    options: &CopyDataOptions,
724) -> Result<DataFormat> {
725    let source_formats = source.supported_data_format().await?;
726    let destination_formats = destination.supported_data_format().await?;
727
728    let overlap = source_formats
729        .iter()
730        .filter(|f| destination_formats.contains(f))
731        .collect_vec();
732
733    if overlap.is_empty()
734        || options
735            .data_format
736            .as_ref()
737            .is_some_and(|d| !overlap.contains(&d))
738    {
739        Err(ElefantToolsError::DataFormatsNotCompatible {
740            supported_by_source: source_formats,
741            supported_by_target: destination_formats,
742            required_format: options.data_format.clone(),
743        })
744    } else {
745        for format in &overlap {
746            if let DataFormat::PostgresBinary { .. } = format {
747                return Ok((*format).clone());
748            }
749        }
750
751        Ok(overlap[0].clone())
752    }
753}