Skip to main content

drasi_source_postgres/
config.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configuration for the PostgreSQL replication source.
16//!
17//! This source monitors PostgreSQL databases using logical replication to stream
18//! data changes as they occur.
19
20use serde::{Deserialize, Serialize};
21
22// =============================================================================
23// SSL Configuration
24// =============================================================================
25
26/// SSL mode for PostgreSQL connections
27#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
28#[serde(rename_all = "lowercase")]
29pub enum SslMode {
30    /// Disable SSL encryption
31    Disable,
32    /// Prefer SSL but allow unencrypted connections
33    Prefer,
34    /// Require SSL encryption
35    Require,
36}
37
38impl Default for SslMode {
39    fn default() -> Self {
40        Self::Prefer
41    }
42}
43
44impl std::fmt::Display for SslMode {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self {
47            Self::Disable => write!(f, "disable"),
48            Self::Prefer => write!(f, "prefer"),
49            Self::Require => write!(f, "require"),
50        }
51    }
52}
53
54// =============================================================================
55// Database Table Configuration
56// =============================================================================
57
58/// Table key configuration for PostgreSQL sources
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
60pub struct TableKeyConfig {
61    pub table: String,
62    pub key_columns: Vec<String>,
63}
64
65/// PostgreSQL replication source configuration
66#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
67pub struct PostgresSourceConfig {
68    /// PostgreSQL host
69    #[serde(default = "default_postgres_host")]
70    pub host: String,
71
72    /// PostgreSQL port
73    #[serde(default = "default_postgres_port")]
74    pub port: u16,
75
76    /// Database name
77    pub database: String,
78
79    /// Database user
80    pub user: String,
81
82    /// Database password
83    #[serde(default)]
84    pub password: String,
85
86    /// Tables to replicate
87    #[serde(default)]
88    pub tables: Vec<String>,
89
90    /// Replication slot name
91    #[serde(default = "default_slot_name")]
92    pub slot_name: String,
93
94    /// Publication name
95    #[serde(default = "default_publication_name")]
96    pub publication_name: String,
97
98    /// SSL mode
99    #[serde(default)]
100    pub ssl_mode: SslMode,
101
102    /// Table key configurations
103    #[serde(default)]
104    pub table_keys: Vec<TableKeyConfig>,
105}
106
107fn default_postgres_host() -> String {
108    "localhost".to_string()
109}
110
111fn default_postgres_port() -> u16 {
112    5432
113}
114
115fn default_slot_name() -> String {
116    "drasi_slot".to_string()
117}
118
119fn default_publication_name() -> String {
120    "drasi_publication".to_string()
121}
122
123impl PostgresSourceConfig {
124    /// Validate the configuration and return an error if invalid.
125    ///
126    /// # Errors
127    ///
128    /// Returns an error if:
129    /// - Database name is empty
130    /// - User is empty
131    /// - Port is 0
132    /// - Slot name is empty
133    /// - Publication name is empty
134    pub fn validate(&self) -> anyhow::Result<()> {
135        if self.database.is_empty() {
136            return Err(anyhow::anyhow!(
137                "Validation error: database cannot be empty. \
138                 Please specify the PostgreSQL database name"
139            ));
140        }
141
142        if self.user.is_empty() {
143            return Err(anyhow::anyhow!(
144                "Validation error: user cannot be empty. \
145                 Please specify the PostgreSQL user for replication"
146            ));
147        }
148
149        if self.port == 0 {
150            return Err(anyhow::anyhow!(
151                "Validation error: port cannot be 0. \
152                 Please specify a valid port number (1-65535)"
153            ));
154        }
155
156        if self.slot_name.is_empty() {
157            return Err(anyhow::anyhow!(
158                "Validation error: slot_name cannot be empty. \
159                 Please specify a replication slot name"
160            ));
161        }
162
163        if self.publication_name.is_empty() {
164            return Err(anyhow::anyhow!(
165                "Validation error: publication_name cannot be empty. \
166                 Please specify a publication name"
167            ));
168        }
169
170        Ok(())
171    }
172}