1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
use crate::postgres::table_mode::TableMode;
use anyhow::Result;
use async_trait::async_trait;
#[cfg(test)]
use mockall::automock;
#[derive(Debug)]
pub struct InsertDataframePayload {
pub database_name: String,
pub schema_name: String,
pub table_name: String,
}
impl InsertDataframePayload {
pub fn as_key(&self) -> String {
format!(
"{}:{}:{}",
self.database_name, self.schema_name, self.table_name
)
}
}
#[derive(Debug)]
pub struct UpsertDataframePayload {
pub database_name: String,
pub schema_name: String,
pub table_name: String,
pub primary_key: String,
}
#[cfg_attr(test, automock)]
#[async_trait]
pub trait PostgresOperator {
/// Get the columns of a table.
///
/// # Arguments
///
/// * `schema_name` - The name of the schema.
/// * `table_name` - The name of the table.
///
/// # Returns
///
/// A IndexMap containing the column names and their data types.
async fn get_table_columns(
&self,
schema_name: &str,
table_name: &str,
) -> Result<indexmap::IndexMap<String, String>>;
//// Get the primary key of a table.
///
/// # Arguments
///
/// * `table_name` - The name of the table.
/// * `schema_name` - The name of the schema.
///
/// # Returns
///
/// The primary key of the table.
async fn get_primary_key(&self, table_name: &str, schema_name: &str) -> Result<Vec<String>>;
/// Create a schema in the target database.
///
/// # Arguments
///
/// * `schema_name` - The name of the schema.
///
/// # Returns
///
/// A Result indicating success or failure.
async fn create_schema(&self, schema_name: &str) -> Result<()>;
/// Create a table in the target database.
///
/// # Arguments
///
/// * `column_data_types` - The data types of the columns in the table.
/// * `primary_key` - The primary key of the table.
/// * `schema_name` - The name of the schema.
/// * `table_name` - The name of the table.
///
/// # Returns
///
/// A Result indicating success or failure.
async fn create_table(
&self,
column_data_types: &indexmap::IndexMap<String, String>,
primary_key: &[String],
schema_name: &str,
table_name: &str,
) -> Result<()>;
/// Get the tables in a schema.
///
/// # Arguments
///
/// * `schema_name` - The name of the schema.
/// * `included_tables` - The tables to include.
/// * `excluded_tables` - The tables to exclude.
/// * `table_mode` - The mode to use for the tables.
///
/// # Returns
///
/// A Vec containing the tables in the schema.
async fn get_tables_in_schema(
&self,
schema_name: &str,
included_tables: &[String],
excluded_tables: &[String],
table_mode: &TableMode,
) -> Result<Vec<String>>;
/// Insert a DataFrame into the target database.
///
/// # Arguments
///
/// * `df` - The DataFrame to insert.
/// * `database_name` - The name of the database.
/// * `schema_name` - The name of the schema.
/// * `table_name` - The name of the table.
///
/// # Returns
///
/// A Result indicating success or failure.
async fn insert_dataframe_in_target_db(
&self,
df: &polars::frame::DataFrame,
payload: &InsertDataframePayload,
) -> Result<()>;
/// Upsert a DataFrame into the target database.
///
/// # Arguments
///
/// * `df` - The DataFrame to upsert.
/// * `database_name` - The name of the database.
/// * `schema_name` - The name of the schema.
/// * `table_name` - The name of the table.
/// * `primary_key` - The primary key of the table.
///
/// # Returns
///
/// A Result indicating success or failure.
async fn upsert_dataframe_in_target_db(
&self,
df: &polars::frame::DataFrame,
payload: &UpsertDataframePayload,
) -> Result<()>;
/// Drop schema in the target database.
///
/// # Arguments
///
/// * `schema_name` - The name of the schema.
///
/// # Returns
///
/// A Result indicating success or failure.
async fn drop_schema(&self, schema_name: &str) -> Result<()>;
/// Run a generic SQL command.
///
/// # Arguments
///
/// * `sql_command` - The SQL command to run.
///
/// # Returns
///
/// A Result indicating success or failure.
async fn run_sql_command(&self, sql_command: &str) -> Result<()>;
/// Close the connection pool.
///
/// # Returns
///
/// A Result indicating success or failure.
async fn close_connection_pool(&self);
}