1use crate::object_id::DependencySortable;
2use crate::parallel_runner::ParallelRunner;
3use crate::quoting::IdentifierQuoter;
4use crate::storage::DataFormat;
5use crate::storage::{CopyDestination, CopySource};
6use crate::*;
7use itertools::Itertools;
8use std::num::NonZeroUsize;
9use tracing::{debug, info, instrument};
10
11#[derive(Debug, Default)]
12pub struct CopyDataOptions {
13 pub data_format: Option<DataFormat>,
15 pub max_parallel: Option<NonZeroUsize>,
17
18 pub target_schema: Option<String>,
20
21 pub rename_schema_to: Option<String>,
24
25 pub schema_only: bool,
27
28 pub differential: bool,
33}
34
35const NON_ZERO_USIZE1: NonZeroUsize = unsafe {
36 NonZeroUsize::new_unchecked(1)
38};
39
40impl CopyDataOptions {
41 fn get_max_parallel_or_1(&self) -> NonZeroUsize {
42 self.max_parallel.unwrap_or(NON_ZERO_USIZE1)
43 }
44}
45
46#[instrument(skip_all)]
50pub async fn copy_data<'d, S: CopySourceFactory, D: CopyDestinationFactory<'d>>(
51 source: &S,
52 destination: &'d mut D,
53 options: CopyDataOptions,
54) -> Result<()> {
55 let data_format = get_data_type(source, destination, &options).await?;
56
57 let expected_parallelism = if options.get_max_parallel_or_1() == NON_ZERO_USIZE1 {
58 SupportedParallelism::Sequential
59 } else {
60 source
61 .supported_parallelism()
62 .negotiate_parallelism(destination.supported_parallelism())
63 };
64
65 let (source, mut destination) = match expected_parallelism {
66 SupportedParallelism::Sequential => (
67 SequentialOrParallel::Sequential(source.create_sequential_source().await?),
68 SequentialOrParallel::Sequential(destination.create_sequential_destination().await?),
69 ),
70 SupportedParallelism::Parallel => (
71 source.create_source().await?,
72 destination.create_destination().await?,
73 ),
74 };
75
76 let definition = source.get_introspection().await?;
77 let destination_definition = if options.differential {
78 destination
79 .try_get_introspeciton()
80 .await?
81 .unwrap_or_default()
82 } else {
83 default()
84 };
85
86 let source_definition = if let Some(target_schema) = &options.target_schema {
87 definition.filtered_to_schema(target_schema)
88 } else {
89 definition
90 };
91
92 let target_definition = if let (Some(target_schema), Some(rename_to)) =
93 (&options.target_schema, &options.rename_schema_to)
94 {
95 source_definition.with_renamed_schema(target_schema, rename_to)
96 } else {
97 source_definition.clone()
98 };
99
100 if let Some(target_schema) = &options.target_schema {
101 destination_definition.filtered_to_schema(target_schema);
102 }
103
104 destination.begin_transaction().await?;
105
106 match &mut destination {
107 SequentialOrParallel::Sequential(ref mut d) => {
108 apply_pre_copy_structure(d, &target_definition, &destination_definition).await?;
109 }
110 SequentialOrParallel::Parallel(ref mut d) => {
111 apply_pre_copy_structure(d, &target_definition, &destination_definition).await?;
112 }
113 }
114
115 destination.commit_transaction().await?;
116
117 if !options.schema_only {
118 let mut parallel_runner = ParallelRunner::new(options.get_max_parallel_or_1());
119
120 for target_schema in &target_definition.schemas {
121 let source_schema = source_definition
122 .schemas
123 .iter()
124 .find(|s| s.object_id == target_schema.object_id);
125 let source_schema = match source_schema {
126 Some(s) => s,
127 None => {
128 continue;
129 }
130 };
131
132 for target_table in &target_schema.tables {
133 if let TableTypeDetails::PartitionedParentTable { .. } = &target_table.table_type {
134 continue;
135 }
136
137 let source_table = source_schema
138 .tables
139 .iter()
140 .find(|t| t.object_id == target_table.object_id);
141 let source_table = match source_table {
142 Some(s) => s,
143 None => {
144 continue;
145 }
146 };
147
148 match source {
149 SequentialOrParallel::Sequential(ref source) => match &mut destination {
150 SequentialOrParallel::Sequential(ref mut destination) => {
151 do_copy(
152 source,
153 destination,
154 target_schema,
155 target_table,
156 source_schema,
157 source_table,
158 &data_format,
159 &options,
160 )
161 .await?
162 }
163 SequentialOrParallel::Parallel(ref mut destination) => {
164 do_copy(
165 source,
166 destination,
167 target_schema,
168 target_table,
169 source_schema,
170 source_table,
171 &data_format,
172 &options,
173 )
174 .await?
175 }
176 },
177 SequentialOrParallel::Parallel(ref source) => match &mut destination {
178 SequentialOrParallel::Sequential(ref mut destination) => {
179 do_copy(
180 source,
181 destination,
182 target_schema,
183 target_table,
184 source_schema,
185 source_table,
186 &data_format,
187 &options,
188 )
189 .await?
190 }
191 SequentialOrParallel::Parallel(ref mut destination) => {
192 let source = source.clone();
193 let destination = destination.clone();
194 let df = data_format.clone();
195 let opt = &options;
196 parallel_runner
197 .enqueue(async move {
198 let source = source;
199 let mut destination = destination;
200 do_copy(
201 &source,
202 &mut destination,
203 target_schema,
204 target_table,
205 source_schema,
206 source_table,
207 &df,
208 opt,
209 )
210 .await
211 })
212 .await?;
213 }
214 },
215 }
216 }
217 }
218
219 parallel_runner.run_remaining().await?;
220 }
221
222 match &mut destination {
223 SequentialOrParallel::Sequential(ref mut destination) => {
224 apply_post_copy_structure_sequential(
225 destination,
226 &target_definition,
227 &destination_definition,
228 )
229 .await?;
230 }
231 SequentialOrParallel::Parallel(ref mut destination) => {
232 apply_post_copy_structure_parallel(
233 destination,
234 &target_definition,
235 &options,
236 &destination_definition,
237 )
238 .await?;
239 }
240 }
241
242 destination.finish().await?;
243
244 Ok(())
245}
246
247#[instrument(skip_all)]
254async fn apply_pre_copy_structure<D: CopyDestination>(
255 destination: &mut D,
256 definition: &PostgresDatabase,
257 target_definition: &PostgresDatabase,
258) -> Result<()> {
259 let identifier_quoter = destination.get_identifier_quoter();
260
261 for schema in &definition.schemas {
262 destination
263 .apply_transactional_statement(&schema.get_create_statement(&identifier_quoter))
264 .await?;
265 }
266
267 for ext in &definition.enabled_extensions {
268 if target_definition
269 .enabled_extensions
270 .iter()
271 .any(|e| e.name == ext.name)
272 {
273 debug!("Extension {} already exists in destination", ext.name);
274 continue;
275 }
276
277 destination
278 .apply_transactional_statement(&ext.get_create_statement(&identifier_quoter))
279 .await?;
280 }
281
282 for schema in &definition.schemas {
283 let target_schema = target_definition.try_get_schema(&schema.name);
284
285 for enumeration in &schema.enums {
286 if target_schema.is_some_and(|s| s.enums.iter().any(|e| e.name == enumeration.name)) {
287 debug!("Enum {} already exists in destination", enumeration.name);
288 continue;
289 }
290
291 destination
292 .apply_transactional_statement(
293 &enumeration.get_create_statement(&identifier_quoter),
294 )
295 .await?;
296 }
297 }
298
299 let mut tables_and_functions: Vec<PostgresThingWithDependencies> = Vec::new();
300
301 for schema in &definition.schemas {
302 let target_schema = target_definition.try_get_schema(&schema.name);
303
304 for function in &schema.functions {
305 if target_schema.is_some_and(|s| {
306 s.functions
307 .iter()
308 .any(|f| f.function_name == function.function_name)
309 }) {
310 debug!(
311 "Function {} already exists in destination",
312 function.function_name
313 );
314 continue;
315 }
316
317 tables_and_functions.push(PostgresThingWithDependencies::Function(function, schema));
318 }
319
320 for aggregate_function in &schema.aggregate_functions {
321 if target_schema.is_some_and(|s| {
322 s.aggregate_functions
323 .iter()
324 .any(|f| f.function_name == aggregate_function.function_name)
325 }) {
326 debug!(
327 "Aggregate function {} already exists in destination",
328 aggregate_function.function_name
329 );
330 continue;
331 }
332
333 tables_and_functions.push(PostgresThingWithDependencies::AggregateFunction(
334 aggregate_function,
335 schema,
336 ));
337 }
338
339 for table in &schema.tables {
340 if target_schema
341 .and_then(|s| s.try_get_table(&table.name))
342 .is_some()
343 {
344 debug!("Table {} already exists in destination", table.name);
345 continue;
346 }
347
348 tables_and_functions.push(PostgresThingWithDependencies::Table(table, schema));
349 }
350
351 for view in &schema.views {
352 if target_schema.is_some_and(|s| s.views.iter().any(|v| v.name == view.name)) {
353 debug!("View {} already exists in destination", view.name);
354 continue;
355 }
356
357 tables_and_functions.push(PostgresThingWithDependencies::View(view, schema));
358 }
359
360 for domain in &schema.domains {
361 if target_schema.is_some_and(|s| s.domains.iter().any(|d| d.name == domain.name)) {
362 debug!("Domain {} already exists in destination", domain.name);
363 continue;
364 }
365
366 tables_and_functions.push(PostgresThingWithDependencies::Domain(domain, schema));
367 }
368 }
369
370 let sorted = tables_and_functions.iter().sort_by_dependencies();
371
372 for thing in sorted {
373 let sql = thing.get_create_sql(&identifier_quoter);
374 destination.apply_transactional_statement(&sql).await?;
375 }
376
377 Ok(())
378}
379
380#[instrument(skip_all)]
382#[allow(clippy::too_many_arguments)]
383async fn do_copy<S: CopySource, D: CopyDestination>(
384 source: &S,
385 destination: &mut D,
386 target_schema: &PostgresSchema,
387 target_table: &PostgresTable,
388 source_schema: &PostgresSchema,
389 source_table: &PostgresTable,
390 data_format: &DataFormat,
391 options: &CopyDataOptions,
392) -> Result<()> {
393 let has_data = options.differential
394 && destination
395 .has_data_in_table(target_schema, target_table)
396 .await?;
397
398 if !has_data {
399 info!(
400 "Skipping table {} as it already has data in the destination",
401 target_table.name
402 );
403 let data = source
404 .get_data(source_schema, source_table, data_format)
405 .await?;
406
407 destination
408 .apply_data(target_schema, target_table, data)
409 .await?;
410 }
411
412 Ok(())
413}
414
415#[instrument(skip_all)]
421fn get_post_apply_statement_groups(
422 definition: &PostgresDatabase,
423 identifier_quoter: &IdentifierQuoter,
424 target_definition: &PostgresDatabase,
425) -> Vec<Vec<String>> {
426 let mut statements = Vec::new();
427
428 for schema in &definition.schemas {
429 let existing_schema = target_definition.try_get_schema(&schema.name);
430
431 let mut group_1 = Vec::new();
432 let mut group_2 = Vec::new();
433 for table in &schema.tables {
434 let existing_table = existing_schema.and_then(|s| s.try_get_table(&table.name));
435
436 for index in &table.indices {
437 if index.index_constraint_type == PostgresIndexType::PrimaryKey {
438 continue;
439 }
440
441 if existing_table.is_some_and(|t| t.indices.iter().any(|i| i.name == index.name)) {
442 debug!(
443 "Index {} on table {} already exists in destination",
444 index.name, table.name
445 );
446 continue;
447 }
448
449 let sql = index.get_create_index_command(schema, table, identifier_quoter);
450 if table.is_timescale_table() {
451 statements.push(vec![sql]);
452 } else {
453 group_1.push(sql);
454 }
455 }
456 }
457
458 for sequence in &schema.sequences {
459 let existing_sequence = existing_schema
460 .and_then(|s| s.sequences.iter().find(|seq| seq.name == sequence.name));
461
462 if existing_sequence.is_none() {
463 group_1.push(sequence.get_create_statement(schema, identifier_quoter));
464 } else {
465 debug!("Sequence {} already exists in destination", sequence.name);
466 }
467 if existing_sequence.is_none()
468 || existing_sequence.is_some_and(|s| s.last_value != sequence.last_value)
469 {
470 if let Some(sql) = sequence.get_set_value_statement(schema, identifier_quoter) {
471 group_2.push(sql);
472 }
473 }
474 }
475
476 for table in &schema.tables {
477 let existing_table = existing_schema.and_then(|s| s.try_get_table(&table.name));
478
479 for column in &table.columns {
480 let target_column =
481 existing_table.and_then(|t| t.columns.iter().find(|c| c.name == column.name));
482
483 if target_column.is_some_and(|c| c.default_value == column.default_value) {
484 debug!(
485 "Default value for column {} on table {} already matches destination",
486 column.name, table.name
487 );
488 continue;
489 }
490
491 if let Some(sql) =
492 column.get_alter_table_set_default_statement(table, schema, identifier_quoter)
493 {
494 group_2.push(sql);
495 }
496 }
497 }
498
499 statements.push(group_1);
500 statements.push(group_2);
501 }
502
503 for schema in &definition.schemas {
504 let existing_schema = target_definition.try_get_schema(&schema.name);
505
506 let mut group_3 = Vec::new();
507 for table in &schema.tables {
508 let existing_table = existing_schema.and_then(|s| s.try_get_table(&table.name));
509 for constraint in &table.constraints {
510 if let PostgresConstraint::Unique(uk) = constraint {
511 if existing_table.is_some_and(|t| {
512 t.constraints.iter().any(|c| c.name() == constraint.name())
513 }) {
514 debug!(
515 "Unique constraint {} on table {} already exists in destination",
516 constraint.name(),
517 table.name
518 );
519 continue;
520 }
521 let sql = uk.get_create_statement(table, schema, identifier_quoter);
522 if table.is_timescale_table() {
523 statements.push(vec![sql]);
524 } else {
525 group_3.push(sql);
526 }
527 }
528 }
529 }
530 statements.push(group_3);
531 }
532
533 for schema in &definition.schemas {
534 let existing_schema = target_definition.try_get_schema(&schema.name);
535 for table in &schema.tables {
536 let existing_table = existing_schema.and_then(|s| s.try_get_table(&table.name));
537 for constraint in &table.constraints {
538 if existing_table
539 .is_some_and(|t| t.constraints.iter().any(|c| c.name() == constraint.name()))
540 {
541 debug!(
542 "Foreign key constraint {} on table {} already exists in destination",
543 constraint.name(),
544 table.name
545 );
546 continue;
547 }
548
549 if let PostgresConstraint::ForeignKey(fk) = constraint {
550 let sql = fk.get_create_statement(table, schema, identifier_quoter);
551 statements.push(vec![sql]);
552 }
553 }
554 }
555 }
556
557 let mut group_4 = Vec::new();
558 for schema in &definition.schemas {
559 let existing_schema = target_definition.try_get_schema(&schema.name);
560
561 for trigger in &schema.triggers {
562 if existing_schema.is_some_and(|s| s.triggers.iter().any(|t| t.name == trigger.name)) {
563 debug!(
564 "Trigger {} on table {} already exists in destination",
565 trigger.name, trigger.table_name
566 );
567 continue;
568 }
569
570 let sql = trigger.get_create_statement(schema, identifier_quoter);
571 group_4.push(sql);
572 }
573 }
574 statements.push(group_4);
575
576 for schema in &definition.schemas {
577 for view in schema.views.iter().sort_by_dependencies() {
578 if let Some(sql) = view.get_refresh_sql(schema, identifier_quoter) {
579 statements.push(vec![sql]);
580 }
581 }
582 }
583
584 let mut group_5 = Vec::new();
585 for job in &definition.timescale_support.user_defined_jobs {
586 if target_definition
587 .timescale_support
588 .user_defined_jobs
589 .iter()
590 .any(|j| {
591 j.function_schema == job.function_schema
592 && j.function_name == job.function_name
593 && j.config == job.config
594 })
595 {
596 debug!(
597 "Timescale job {} already exists in destination",
598 job.function_name
599 );
600 continue;
601 }
602
603 group_5.push(job.get_create_sql(identifier_quoter));
604 }
605
606 for schema in &definition.schemas {
607 let existing_schema = target_definition.try_get_schema(&schema.name);
608
609 for table in &schema.tables {
610 if let TableTypeDetails::TimescaleHypertable {
611 compression: existing_compression,
612 retention: existing_retention,
613 ..
614 } = &table.table_type
615 {
616 let existing_table = existing_schema.and_then(|s| s.try_get_table(&table.name));
617
618 if existing_table.is_some_and(|t| {
619 if let TableTypeDetails::TimescaleHypertable {
620 compression,
621 retention,
622 ..
623 } = &t.table_type
624 {
625 compression == existing_compression && retention == existing_retention
626 } else {
627 false
628 }
629 }) {
630 debug!(
631 "Timescale hypertable {} already exists in destination",
632 table.name
633 );
634 continue;
635 }
636 }
637
638 if let Some(timescale_post) =
639 table.get_timescale_post_settings(schema, identifier_quoter)
640 {
641 group_5.push(timescale_post);
642 }
643 }
644 }
645
646 statements.push(group_5);
647
648 statements
649}
650
651#[instrument(skip_all)]
653async fn apply_post_copy_structure_sequential<D: CopyDestination>(
654 destination: &mut D,
655 definition: &PostgresDatabase,
656 target_definition: &PostgresDatabase,
657) -> Result<()> {
658 let identifier_quoter = destination.get_identifier_quoter();
659
660 let statement_groups =
661 get_post_apply_statement_groups(definition, &identifier_quoter, target_definition);
662
663 for group in statement_groups {
664 for statement in group {
665 destination
666 .apply_non_transactional_statement(&statement)
667 .await?;
668 }
669 }
670
671 Ok(())
672}
673
674#[instrument(skip_all)]
676async fn apply_post_copy_structure_parallel<D: CopyDestination + Sync + Clone>(
677 destination: &mut D,
678 definition: &PostgresDatabase,
679 options: &CopyDataOptions,
680 target_definition: &PostgresDatabase,
681) -> Result<()> {
682 let identifier_quoter = destination.get_identifier_quoter();
683
684 let statement_groups =
685 get_post_apply_statement_groups(definition, &identifier_quoter, target_definition);
686
687 for group in statement_groups {
688 if group.is_empty() {
689 continue;
690 }
691
692 if group.len() == 1 {
693 destination
694 .apply_non_transactional_statement(&group[0])
695 .await?;
696 } else {
697 let mut join_handles = ParallelRunner::new(options.get_max_parallel_or_1());
698
699 for statement in group {
700 let mut destination = destination.clone();
701 join_handles
702 .enqueue(async move {
703 destination
704 .apply_non_transactional_statement(&statement)
705 .await
706 })
707 .await?;
708 }
709
710 join_handles.run_remaining().await?;
711 }
712 }
713
714 Ok(())
715}
716
717#[instrument(skip_all)]
720async fn get_data_type(
721 source: &impl CopySourceFactory,
722 destination: &impl CopyDestinationFactory<'_>,
723 options: &CopyDataOptions,
724) -> Result<DataFormat> {
725 let source_formats = source.supported_data_format().await?;
726 let destination_formats = destination.supported_data_format().await?;
727
728 let overlap = source_formats
729 .iter()
730 .filter(|f| destination_formats.contains(f))
731 .collect_vec();
732
733 if overlap.is_empty()
734 || options
735 .data_format
736 .as_ref()
737 .is_some_and(|d| !overlap.contains(&d))
738 {
739 Err(ElefantToolsError::DataFormatsNotCompatible {
740 supported_by_source: source_formats,
741 supported_by_target: destination_formats,
742 required_format: options.data_format.clone(),
743 })
744 } else {
745 for format in &overlap {
746 if let DataFormat::PostgresBinary { .. } = format {
747 return Ok((*format).clone());
748 }
749 }
750
751 Ok(overlap[0].clone())
752 }
753}