drasi_source_postgres/
config.rs1use serde::{Deserialize, Serialize};
21
22#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
28#[serde(rename_all = "lowercase")]
29pub enum SslMode {
30 Disable,
32 Prefer,
34 Require,
36}
37
38impl Default for SslMode {
39 fn default() -> Self {
40 Self::Prefer
41 }
42}
43
44impl std::fmt::Display for SslMode {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 Self::Disable => write!(f, "disable"),
48 Self::Prefer => write!(f, "prefer"),
49 Self::Require => write!(f, "require"),
50 }
51 }
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
60pub struct TableKeyConfig {
61 pub table: String,
62 pub key_columns: Vec<String>,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
67pub struct PostgresSourceConfig {
68 #[serde(default = "default_postgres_host")]
70 pub host: String,
71
72 #[serde(default = "default_postgres_port")]
74 pub port: u16,
75
76 pub database: String,
78
79 pub user: String,
81
82 #[serde(default)]
84 pub password: String,
85
86 #[serde(default)]
88 pub tables: Vec<String>,
89
90 #[serde(default = "default_slot_name")]
92 pub slot_name: String,
93
94 #[serde(default = "default_publication_name")]
96 pub publication_name: String,
97
98 #[serde(default)]
100 pub ssl_mode: SslMode,
101
102 #[serde(default)]
104 pub table_keys: Vec<TableKeyConfig>,
105}
106
107fn default_postgres_host() -> String {
108 "localhost".to_string()
109}
110
111fn default_postgres_port() -> u16 {
112 5432
113}
114
115fn default_slot_name() -> String {
116 "drasi_slot".to_string()
117}
118
119fn default_publication_name() -> String {
120 "drasi_publication".to_string()
121}
122
123impl PostgresSourceConfig {
124 pub fn validate(&self) -> anyhow::Result<()> {
135 if self.database.is_empty() {
136 return Err(anyhow::anyhow!(
137 "Validation error: database cannot be empty. \
138 Please specify the PostgreSQL database name"
139 ));
140 }
141
142 if self.user.is_empty() {
143 return Err(anyhow::anyhow!(
144 "Validation error: user cannot be empty. \
145 Please specify the PostgreSQL user for replication"
146 ));
147 }
148
149 if self.port == 0 {
150 return Err(anyhow::anyhow!(
151 "Validation error: port cannot be 0. \
152 Please specify a valid port number (1-65535)"
153 ));
154 }
155
156 if self.slot_name.is_empty() {
157 return Err(anyhow::anyhow!(
158 "Validation error: slot_name cannot be empty. \
159 Please specify a replication slot name"
160 ));
161 }
162
163 if self.publication_name.is_empty() {
164 return Err(anyhow::anyhow!(
165 "Validation error: publication_name cannot be empty. \
166 Please specify a publication name"
167 ));
168 }
169
170 Ok(())
171 }
172}