Skip to main content

drasi_source_postgres/
config.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configuration for the PostgreSQL replication source.
16//!
17//! This source monitors PostgreSQL databases using logical replication to stream
18//! data changes as they occur.
19
20use serde::{Deserialize, Serialize};
21
22// =============================================================================
23// SSL Configuration
24// =============================================================================
25
26/// SSL mode for PostgreSQL connections
27#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
28#[serde(rename_all = "lowercase")]
29#[derive(Default)]
30pub enum SslMode {
31    /// Disable SSL encryption
32    Disable,
33    /// Prefer SSL but allow unencrypted connections
34    #[default]
35    Prefer,
36    /// Require SSL encryption
37    Require,
38}
39
40impl std::fmt::Display for SslMode {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        match self {
43            Self::Disable => write!(f, "disable"),
44            Self::Prefer => write!(f, "prefer"),
45            Self::Require => write!(f, "require"),
46        }
47    }
48}
49
50// =============================================================================
51// Database Table Configuration
52// =============================================================================
53
54/// Table key configuration for PostgreSQL sources
55#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
56pub struct TableKeyConfig {
57    pub table: String,
58    pub key_columns: Vec<String>,
59}
60
61/// PostgreSQL replication source configuration
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63pub struct PostgresSourceConfig {
64    /// PostgreSQL host
65    #[serde(default = "default_postgres_host")]
66    pub host: String,
67
68    /// PostgreSQL port
69    #[serde(default = "default_postgres_port")]
70    pub port: u16,
71
72    /// Database name
73    pub database: String,
74
75    /// Database user
76    pub user: String,
77
78    /// Database password
79    #[serde(default)]
80    pub password: String,
81
82    /// Tables to replicate
83    #[serde(default)]
84    pub tables: Vec<String>,
85
86    /// Replication slot name
87    #[serde(default = "default_slot_name")]
88    pub slot_name: String,
89
90    /// Publication name
91    #[serde(default = "default_publication_name")]
92    pub publication_name: String,
93
94    /// SSL mode
95    #[serde(default)]
96    pub ssl_mode: SslMode,
97
98    /// Table key configurations
99    #[serde(default)]
100    pub table_keys: Vec<TableKeyConfig>,
101}
102
103fn default_postgres_host() -> String {
104    "localhost".to_string()
105}
106
107fn default_postgres_port() -> u16 {
108    5432
109}
110
111fn default_slot_name() -> String {
112    "drasi_slot".to_string()
113}
114
115fn default_publication_name() -> String {
116    "drasi_publication".to_string()
117}
118
119impl PostgresSourceConfig {
120    /// Validate the configuration and return an error if invalid.
121    ///
122    /// # Errors
123    ///
124    /// Returns an error if:
125    /// - Database name is empty
126    /// - User is empty
127    /// - Port is 0
128    /// - Slot name is empty
129    /// - Publication name is empty
130    pub fn validate(&self) -> anyhow::Result<()> {
131        if self.database.is_empty() {
132            return Err(anyhow::anyhow!(
133                "Validation error: database cannot be empty. \
134                 Please specify the PostgreSQL database name"
135            ));
136        }
137
138        if self.user.is_empty() {
139            return Err(anyhow::anyhow!(
140                "Validation error: user cannot be empty. \
141                 Please specify the PostgreSQL user for replication"
142            ));
143        }
144
145        if self.port == 0 {
146            return Err(anyhow::anyhow!(
147                "Validation error: port cannot be 0. \
148                 Please specify a valid port number (1-65535)"
149            ));
150        }
151
152        if self.slot_name.is_empty() {
153            return Err(anyhow::anyhow!(
154                "Validation error: slot_name cannot be empty. \
155                 Please specify a replication slot name"
156            ));
157        }
158
159        if self.publication_name.is_empty() {
160            return Err(anyhow::anyhow!(
161                "Validation error: publication_name cannot be empty. \
162                 Please specify a publication name"
163            ));
164        }
165
166        Ok(())
167    }
168}