drasi_source_postgres/
config.rs1use serde::{Deserialize, Serialize};
21
22#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
28#[serde(rename_all = "lowercase")]
29#[derive(Default)]
30pub enum SslMode {
31 Disable,
33 #[default]
35 Prefer,
36 Require,
38}
39
40impl std::fmt::Display for SslMode {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 match self {
43 Self::Disable => write!(f, "disable"),
44 Self::Prefer => write!(f, "prefer"),
45 Self::Require => write!(f, "require"),
46 }
47 }
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
56pub struct TableKeyConfig {
57 pub table: String,
58 pub key_columns: Vec<String>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63pub struct PostgresSourceConfig {
64 #[serde(default = "default_postgres_host")]
66 pub host: String,
67
68 #[serde(default = "default_postgres_port")]
70 pub port: u16,
71
72 pub database: String,
74
75 pub user: String,
77
78 #[serde(default)]
80 pub password: String,
81
82 #[serde(default)]
84 pub tables: Vec<String>,
85
86 #[serde(default = "default_slot_name")]
88 pub slot_name: String,
89
90 #[serde(default = "default_publication_name")]
92 pub publication_name: String,
93
94 #[serde(default)]
96 pub ssl_mode: SslMode,
97
98 #[serde(default)]
100 pub table_keys: Vec<TableKeyConfig>,
101}
102
103fn default_postgres_host() -> String {
104 "localhost".to_string()
105}
106
107fn default_postgres_port() -> u16 {
108 5432
109}
110
111fn default_slot_name() -> String {
112 "drasi_slot".to_string()
113}
114
115fn default_publication_name() -> String {
116 "drasi_publication".to_string()
117}
118
119impl PostgresSourceConfig {
120 pub fn validate(&self) -> anyhow::Result<()> {
131 if self.database.is_empty() {
132 return Err(anyhow::anyhow!(
133 "Validation error: database cannot be empty. \
134 Please specify the PostgreSQL database name"
135 ));
136 }
137
138 if self.user.is_empty() {
139 return Err(anyhow::anyhow!(
140 "Validation error: user cannot be empty. \
141 Please specify the PostgreSQL user for replication"
142 ));
143 }
144
145 if self.port == 0 {
146 return Err(anyhow::anyhow!(
147 "Validation error: port cannot be 0. \
148 Please specify a valid port number (1-65535)"
149 ));
150 }
151
152 if self.slot_name.is_empty() {
153 return Err(anyhow::anyhow!(
154 "Validation error: slot_name cannot be empty. \
155 Please specify a replication slot name"
156 ));
157 }
158
159 if self.publication_name.is_empty() {
160 return Err(anyhow::anyhow!(
161 "Validation error: publication_name cannot be empty. \
162 Please specify a publication name"
163 ));
164 }
165
166 Ok(())
167 }
168}