elefant_tools/storage/postgres/
postgres_instance_storage.rs1use crate::postgres_client_wrapper::{FromPgChar, FromRow, RowEnumExt};
2use crate::quoting::AllowedKeywordUsage;
3use crate::storage::postgres::parallel_copy_destination::ParallelSafePostgresInstanceCopyDestinationStorage;
4use crate::storage::postgres::parallel_copy_source::ParallelSafePostgresInstanceCopySourceStorage;
5use crate::storage::postgres::sequential_copy_destination::SequentialSafePostgresInstanceCopyDestinationStorage;
6use crate::storage::postgres::sequential_copy_source::SequentialSafePostgresInstanceCopySourceStorage;
7use crate::{
8 BaseCopyTarget, CopyDestinationFactory, CopySourceFactory, DataFormat, ElefantToolsError,
9 IdentifierQuoter, PostgresClientWrapper, SequentialOrParallel, SupportedParallelism,
10};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio_postgres::Row;
14use tracing::instrument;
15
16pub struct PostgresInstanceStorage<'a> {
18 pub(crate) connection: &'a PostgresClientWrapper,
19 pub(crate) postgres_version: String,
20 pub(crate) identifier_quoter: Arc<IdentifierQuoter>,
21}
22
23impl<'a> PostgresInstanceStorage<'a> {
24 #[instrument(skip_all)]
25 pub async fn new(connection: &'a PostgresClientWrapper) -> crate::Result<Self> {
26 let postgres_version = connection.get_single_result("select version()").await?;
27
28 let keywords = connection
29 .get_results::<Keyword>(
30 "select word, catcode from pg_get_keywords() where catcode <> 'U'",
31 )
32 .await?;
33
34 let mut keyword_info = HashMap::new();
35
36 for keyword in keywords {
37 keyword_info.insert(
38 keyword.word,
39 AllowedKeywordUsage {
40 column_name: keyword.category == KeywordType::AllowedInColumnName
41 || keyword.category == KeywordType::AllowedInTypeOrFunctionName,
42 type_or_function_name: keyword.category
43 == KeywordType::AllowedInTypeOrFunctionName,
44 },
45 );
46 }
47
48 let quoter = IdentifierQuoter::new(keyword_info);
49
50 Ok(PostgresInstanceStorage {
51 connection,
52 postgres_version,
53 identifier_quoter: Arc::new(quoter),
54 })
55 }
56
57 pub fn get_identifier_quoter(&self) -> Arc<IdentifierQuoter> {
58 self.identifier_quoter.clone()
59 }
60}
61
62struct Keyword {
63 word: String,
64 category: KeywordType,
65}
66
67impl FromRow for Keyword {
68 fn from_row(row: Row) -> crate::Result<Self> {
69 Ok(Keyword {
70 word: row.try_get(0)?,
71 category: row.try_get_enum_value(1)?,
72 })
73 }
74}
75
76#[derive(Eq, PartialEq, Debug)]
77enum KeywordType {
78 Unreserved,
79 AllowedInColumnName,
80 AllowedInTypeOrFunctionName,
81 Reserved,
82}
83
84impl FromPgChar for KeywordType {
85 fn from_pg_char(c: char) -> crate::Result<Self> {
86 match c {
87 'U' => Ok(KeywordType::Unreserved),
88 'C' => Ok(KeywordType::AllowedInColumnName),
89 'T' => Ok(KeywordType::AllowedInTypeOrFunctionName),
90 'R' => Ok(KeywordType::Reserved),
91 _ => Err(ElefantToolsError::InvalidKeywordType(c.to_string())),
92 }
93 }
94}
95
96impl BaseCopyTarget for PostgresInstanceStorage<'_> {
97 async fn supported_data_format(&self) -> crate::Result<Vec<DataFormat>> {
98 Ok(vec![
99 DataFormat::Text,
100 DataFormat::PostgresBinary {
101 postgres_version: Some(self.postgres_version.clone()),
102 },
103 ])
104 }
105}
106
107impl<'a> CopySourceFactory for PostgresInstanceStorage<'a> {
108 type SequentialSource = SequentialSafePostgresInstanceCopySourceStorage<'a>;
109 type ParallelSource = ParallelSafePostgresInstanceCopySourceStorage<'a>;
110
111 async fn create_source(
112 &self,
113 ) -> crate::Result<SequentialOrParallel<Self::SequentialSource, Self::ParallelSource>> {
114 let parallel = ParallelSafePostgresInstanceCopySourceStorage::new(self).await?;
115
116 Ok(SequentialOrParallel::Parallel(parallel))
117 }
118
119 async fn create_sequential_source(&self) -> crate::Result<Self::SequentialSource> {
120 let seq = SequentialSafePostgresInstanceCopySourceStorage::new(self).await?;
121
122 Ok(seq)
123 }
124
125 fn supported_parallelism(&self) -> SupportedParallelism {
126 SupportedParallelism::Parallel
127 }
128}
129
130impl<'a> CopyDestinationFactory<'a> for PostgresInstanceStorage<'a> {
131 type SequentialDestination = SequentialSafePostgresInstanceCopyDestinationStorage<'a>;
132 type ParallelDestination = ParallelSafePostgresInstanceCopyDestinationStorage<'a>;
133
134 async fn create_destination(
135 &'a mut self,
136 ) -> crate::Result<SequentialOrParallel<Self::SequentialDestination, Self::ParallelDestination>>
137 {
138 let par = ParallelSafePostgresInstanceCopyDestinationStorage::new(self).await?;
139
140 Ok(SequentialOrParallel::Parallel(par))
141 }
142
143 async fn create_sequential_destination(
144 &'a mut self,
145 ) -> crate::Result<Self::SequentialDestination> {
146 let seq = SequentialSafePostgresInstanceCopyDestinationStorage::new(self).await?;
147
148 Ok(seq)
149 }
150
151 fn supported_parallelism(&self) -> SupportedParallelism {
152 SupportedParallelism::Parallel
153 }
154}