elefant_tools/storage/postgres/
postgres_instance_storage.rs

1use crate::postgres_client_wrapper::{FromPgChar, FromRow, RowEnumExt};
2use crate::quoting::AllowedKeywordUsage;
3use crate::storage::postgres::parallel_copy_destination::ParallelSafePostgresInstanceCopyDestinationStorage;
4use crate::storage::postgres::parallel_copy_source::ParallelSafePostgresInstanceCopySourceStorage;
5use crate::storage::postgres::sequential_copy_destination::SequentialSafePostgresInstanceCopyDestinationStorage;
6use crate::storage::postgres::sequential_copy_source::SequentialSafePostgresInstanceCopySourceStorage;
7use crate::{
8    BaseCopyTarget, CopyDestinationFactory, CopySourceFactory, DataFormat, ElefantToolsError,
9    IdentifierQuoter, PostgresClientWrapper, SequentialOrParallel, SupportedParallelism,
10};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio_postgres::Row;
14use tracing::instrument;
15
16/// A CopyTarget for Postgres.
17pub struct PostgresInstanceStorage<'a> {
18    pub(crate) connection: &'a PostgresClientWrapper,
19    pub(crate) postgres_version: String,
20    pub(crate) identifier_quoter: Arc<IdentifierQuoter>,
21}
22
23impl<'a> PostgresInstanceStorage<'a> {
24    #[instrument(skip_all)]
25    pub async fn new(connection: &'a PostgresClientWrapper) -> crate::Result<Self> {
26        let postgres_version = connection.get_single_result("select version()").await?;
27
28        let keywords = connection
29            .get_results::<Keyword>(
30                "select word, catcode from pg_get_keywords() where catcode <> 'U'",
31            )
32            .await?;
33
34        let mut keyword_info = HashMap::new();
35
36        for keyword in keywords {
37            keyword_info.insert(
38                keyword.word,
39                AllowedKeywordUsage {
40                    column_name: keyword.category == KeywordType::AllowedInColumnName
41                        || keyword.category == KeywordType::AllowedInTypeOrFunctionName,
42                    type_or_function_name: keyword.category
43                        == KeywordType::AllowedInTypeOrFunctionName,
44                },
45            );
46        }
47
48        let quoter = IdentifierQuoter::new(keyword_info);
49
50        Ok(PostgresInstanceStorage {
51            connection,
52            postgres_version,
53            identifier_quoter: Arc::new(quoter),
54        })
55    }
56
57    pub fn get_identifier_quoter(&self) -> Arc<IdentifierQuoter> {
58        self.identifier_quoter.clone()
59    }
60}
61
62struct Keyword {
63    word: String,
64    category: KeywordType,
65}
66
67impl FromRow for Keyword {
68    fn from_row(row: Row) -> crate::Result<Self> {
69        Ok(Keyword {
70            word: row.try_get(0)?,
71            category: row.try_get_enum_value(1)?,
72        })
73    }
74}
75
76#[derive(Eq, PartialEq, Debug)]
77enum KeywordType {
78    Unreserved,
79    AllowedInColumnName,
80    AllowedInTypeOrFunctionName,
81    Reserved,
82}
83
84impl FromPgChar for KeywordType {
85    fn from_pg_char(c: char) -> crate::Result<Self> {
86        match c {
87            'U' => Ok(KeywordType::Unreserved),
88            'C' => Ok(KeywordType::AllowedInColumnName),
89            'T' => Ok(KeywordType::AllowedInTypeOrFunctionName),
90            'R' => Ok(KeywordType::Reserved),
91            _ => Err(ElefantToolsError::InvalidKeywordType(c.to_string())),
92        }
93    }
94}
95
96impl BaseCopyTarget for PostgresInstanceStorage<'_> {
97    async fn supported_data_format(&self) -> crate::Result<Vec<DataFormat>> {
98        Ok(vec![
99            DataFormat::Text,
100            DataFormat::PostgresBinary {
101                postgres_version: Some(self.postgres_version.clone()),
102            },
103        ])
104    }
105}
106
107impl<'a> CopySourceFactory for PostgresInstanceStorage<'a> {
108    type SequentialSource = SequentialSafePostgresInstanceCopySourceStorage<'a>;
109    type ParallelSource = ParallelSafePostgresInstanceCopySourceStorage<'a>;
110
111    async fn create_source(
112        &self,
113    ) -> crate::Result<SequentialOrParallel<Self::SequentialSource, Self::ParallelSource>> {
114        let parallel = ParallelSafePostgresInstanceCopySourceStorage::new(self).await?;
115
116        Ok(SequentialOrParallel::Parallel(parallel))
117    }
118
119    async fn create_sequential_source(&self) -> crate::Result<Self::SequentialSource> {
120        let seq = SequentialSafePostgresInstanceCopySourceStorage::new(self).await?;
121
122        Ok(seq)
123    }
124
125    fn supported_parallelism(&self) -> SupportedParallelism {
126        SupportedParallelism::Parallel
127    }
128}
129
130impl<'a> CopyDestinationFactory<'a> for PostgresInstanceStorage<'a> {
131    type SequentialDestination = SequentialSafePostgresInstanceCopyDestinationStorage<'a>;
132    type ParallelDestination = ParallelSafePostgresInstanceCopyDestinationStorage<'a>;
133
134    async fn create_destination(
135        &'a mut self,
136    ) -> crate::Result<SequentialOrParallel<Self::SequentialDestination, Self::ParallelDestination>>
137    {
138        let par = ParallelSafePostgresInstanceCopyDestinationStorage::new(self).await?;
139
140        Ok(SequentialOrParallel::Parallel(par))
141    }
142
143    async fn create_sequential_destination(
144        &'a mut self,
145    ) -> crate::Result<Self::SequentialDestination> {
146        let seq = SequentialSafePostgresInstanceCopyDestinationStorage::new(self).await?;
147
148        Ok(seq)
149    }
150
151    fn supported_parallelism(&self) -> SupportedParallelism {
152        SupportedParallelism::Parallel
153    }
154}