Skip to main content

drasi_source_postgres/
descriptor.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! PostgreSQL source plugin descriptor and configuration DTOs.
16
17use crate::{PostgresSourceConfig, SslMode, TableKeyConfig};
18use drasi_plugin_sdk::prelude::*;
19use std::str::FromStr;
20use utoipa::OpenApi;
21
22/// PostgreSQL source configuration DTO.
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
24#[schema(as = source::postgres::PostgresSourceConfig)]
25#[serde(rename_all = "camelCase", deny_unknown_fields)]
26pub struct PostgresSourceConfigDto {
27    #[serde(default = "default_postgres_host")]
28    pub host: ConfigValue<String>,
29    #[serde(default = "default_postgres_port")]
30    pub port: ConfigValue<u16>,
31    pub database: ConfigValue<String>,
32    pub user: ConfigValue<String>,
33    #[serde(default = "default_password")]
34    pub password: ConfigValue<String>,
35    #[serde(default)]
36    pub tables: Vec<String>,
37    #[serde(default = "default_slot_name")]
38    pub slot_name: String,
39    #[serde(default = "default_publication_name")]
40    pub publication_name: String,
41    #[serde(default = "default_ssl_mode")]
42    #[schema(value_type = ConfigValue<source::postgres::SslMode>)]
43    pub ssl_mode: ConfigValue<SslModeDto>,
44    #[serde(default)]
45    #[schema(value_type = Vec<source::postgres::TableKeyConfig>)]
46    pub table_keys: Vec<TableKeyConfigDto>,
47}
48
49#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, utoipa::ToSchema)]
50#[schema(as = source::postgres::SslMode)]
51#[serde(rename_all = "lowercase")]
52#[derive(Default)]
53pub enum SslModeDto {
54    Disable,
55    #[default]
56    Prefer,
57    Require,
58}
59
60impl FromStr for SslModeDto {
61    type Err = String;
62
63    fn from_str(s: &str) -> Result<Self, Self::Err> {
64        match s.to_lowercase().as_str() {
65            "disable" => Ok(SslModeDto::Disable),
66            "prefer" => Ok(SslModeDto::Prefer),
67            "require" => Ok(SslModeDto::Require),
68            _ => Err(format!("Invalid SSL mode: {s}")),
69        }
70    }
71}
72
73impl From<SslModeDto> for SslMode {
74    fn from(dto: SslModeDto) -> Self {
75        match dto {
76            SslModeDto::Disable => SslMode::Disable,
77            SslModeDto::Prefer => SslMode::Prefer,
78            SslModeDto::Require => SslMode::Require,
79        }
80    }
81}
82
83impl From<&SslMode> for SslModeDto {
84    fn from(mode: &SslMode) -> Self {
85        match mode {
86            SslMode::Disable => SslModeDto::Disable,
87            SslMode::Prefer => SslModeDto::Prefer,
88            SslMode::Require => SslModeDto::Require,
89        }
90    }
91}
92
93impl From<&TableKeyConfig> for TableKeyConfigDto {
94    fn from(tk: &TableKeyConfig) -> Self {
95        Self {
96            table: tk.table.clone(),
97            key_columns: tk.key_columns.clone(),
98        }
99    }
100}
101
102impl From<&PostgresSourceConfig> for PostgresSourceConfigDto {
103    fn from(config: &PostgresSourceConfig) -> Self {
104        Self {
105            host: ConfigValue::Static(config.host.clone()),
106            port: ConfigValue::Static(config.port),
107            database: ConfigValue::Static(config.database.clone()),
108            user: ConfigValue::Static(config.user.clone()),
109            password: ConfigValue::Static(config.password.clone()),
110            tables: config.tables.clone(),
111            slot_name: config.slot_name.clone(),
112            publication_name: config.publication_name.clone(),
113            ssl_mode: ConfigValue::Static(SslModeDto::from(&config.ssl_mode)),
114            table_keys: config
115                .table_keys
116                .iter()
117                .map(TableKeyConfigDto::from)
118                .collect(),
119        }
120    }
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
124#[schema(as = source::postgres::TableKeyConfig)]
125#[serde(rename_all = "camelCase", deny_unknown_fields)]
126pub struct TableKeyConfigDto {
127    pub table: String,
128    pub key_columns: Vec<String>,
129}
130
131fn default_postgres_host() -> ConfigValue<String> {
132    ConfigValue::Static("localhost".to_string())
133}
134
135fn default_postgres_port() -> ConfigValue<u16> {
136    ConfigValue::Static(5432)
137}
138
139fn default_slot_name() -> String {
140    "drasi_slot".to_string()
141}
142
143fn default_publication_name() -> String {
144    "drasi_publication".to_string()
145}
146
147fn default_password() -> ConfigValue<String> {
148    ConfigValue::Static(String::new())
149}
150
151fn default_ssl_mode() -> ConfigValue<SslModeDto> {
152    ConfigValue::Static(SslModeDto::default())
153}
154
155#[derive(OpenApi)]
156#[openapi(components(schemas(PostgresSourceConfigDto, SslModeDto, TableKeyConfigDto,)))]
157struct PostgresSourceSchemas;
158
159/// Descriptor for the PostgreSQL source plugin.
160pub struct PostgresSourceDescriptor;
161
162#[async_trait]
163impl SourcePluginDescriptor for PostgresSourceDescriptor {
164    fn kind(&self) -> &str {
165        "postgres"
166    }
167
168    fn config_version(&self) -> &str {
169        "1.0.0"
170    }
171
172    fn config_schema_name(&self) -> &str {
173        "source.postgres.PostgresSourceConfig"
174    }
175
176    fn config_schema_json(&self) -> String {
177        use drasi_plugin_sdk::schema_ui::SchemaUiAnnotator;
178        let api = PostgresSourceSchemas::openapi();
179        let schemas = serde_json::to_value(
180            &api.components
181                .as_ref()
182                .expect("OpenAPI components missing")
183                .schemas,
184        )
185        .expect("Failed to serialize config schema");
186
187        SchemaUiAnnotator::new(schemas, "source.postgres.PostgresSourceConfig")
188            .expect("root schema not found")
189            .field("host", |f| {
190                f.group("Connection").order(1).placeholder("localhost")
191            })
192            .field("port", |f| {
193                f.group("Connection").order(2).placeholder("5432")
194            })
195            .field("database", |f| {
196                f.group("Connection").order(3).placeholder("mydb")
197            })
198            .field("user", |f| {
199                f.group("Authentication").order(1).placeholder("postgres")
200            })
201            .field("password", |f| {
202                f.group("Authentication").order(2).widget("password")
203            })
204            .field("tables", |f| f.group("Tables").order(1))
205            .field("tableKeys", |f| f.group("Tables").order(2))
206            .field("slotName", |f| {
207                f.group("Replication")
208                    .order(1)
209                    .placeholder("drasi_slot")
210                    .collapsed(true)
211            })
212            .field("publicationName", |f| {
213                f.group("Replication")
214                    .order(2)
215                    .placeholder("drasi_publication")
216            })
217            .field("sslMode", |f| f.group("SSL").order(1).collapsed(true))
218            .annotate()
219    }
220
221    async fn create_source(
222        &self,
223        id: &str,
224        config_json: &serde_json::Value,
225        auto_start: bool,
226    ) -> anyhow::Result<Box<dyn drasi_lib::sources::Source>> {
227        let dto: PostgresSourceConfigDto = serde_json::from_value(config_json.clone())?;
228        let mapper = DtoMapper::new();
229
230        let config = PostgresSourceConfig {
231            host: mapper.resolve_string(&dto.host).await?,
232            port: mapper.resolve_typed(&dto.port).await?,
233            database: mapper.resolve_string(&dto.database).await?,
234            user: mapper.resolve_string(&dto.user).await?,
235            password: mapper.resolve_string(&dto.password).await?,
236            tables: dto.tables.clone(),
237            slot_name: dto.slot_name.clone(),
238            publication_name: dto.publication_name.clone(),
239            ssl_mode: mapper
240                .resolve_typed::<SslModeDto>(&dto.ssl_mode)
241                .await?
242                .into(),
243            table_keys: dto
244                .table_keys
245                .iter()
246                .map(|tk| TableKeyConfig {
247                    table: tk.table.clone(),
248                    key_columns: tk.key_columns.clone(),
249                })
250                .collect(),
251        };
252
253        let mut source = crate::PostgresSourceBuilder::new(id)
254            .with_config(config)
255            .with_auto_start(auto_start)
256            .build()?;
257
258        source.base.set_raw_config(config_json.clone());
259
260        Ok(Box::new(source))
261    }
262}