Skip to main content

drasi_source_postgres/
descriptor.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! PostgreSQL source plugin descriptor and configuration DTOs.
16
17use crate::{PostgresSourceConfig, SslMode, TableKeyConfig};
18use drasi_plugin_sdk::prelude::*;
19use std::str::FromStr;
20use utoipa::OpenApi;
21
22/// PostgreSQL source configuration DTO.
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
24#[schema(as = source::postgres::PostgresSourceConfig)]
25#[serde(rename_all = "camelCase", deny_unknown_fields)]
26pub struct PostgresSourceConfigDto {
27    #[serde(default = "default_postgres_host")]
28    pub host: ConfigValue<String>,
29    #[serde(default = "default_postgres_port")]
30    pub port: ConfigValue<u16>,
31    pub database: ConfigValue<String>,
32    pub user: ConfigValue<String>,
33    #[serde(default = "default_password")]
34    pub password: ConfigValue<String>,
35    #[serde(default)]
36    pub tables: Vec<String>,
37    #[serde(default = "default_slot_name")]
38    pub slot_name: String,
39    #[serde(default = "default_publication_name")]
40    pub publication_name: String,
41    #[serde(default = "default_ssl_mode")]
42    #[schema(value_type = ConfigValue<source::postgres::SslMode>)]
43    pub ssl_mode: ConfigValue<SslModeDto>,
44    #[serde(default)]
45    #[schema(value_type = Vec<source::postgres::TableKeyConfig>)]
46    pub table_keys: Vec<TableKeyConfigDto>,
47}
48
49#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, utoipa::ToSchema)]
50#[schema(as = source::postgres::SslMode)]
51#[serde(rename_all = "lowercase")]
52#[derive(Default)]
53pub enum SslModeDto {
54    Disable,
55    #[default]
56    Prefer,
57    Require,
58}
59
60impl FromStr for SslModeDto {
61    type Err = String;
62
63    fn from_str(s: &str) -> Result<Self, Self::Err> {
64        match s.to_lowercase().as_str() {
65            "disable" => Ok(SslModeDto::Disable),
66            "prefer" => Ok(SslModeDto::Prefer),
67            "require" => Ok(SslModeDto::Require),
68            _ => Err(format!("Invalid SSL mode: {s}")),
69        }
70    }
71}
72
73impl From<SslModeDto> for SslMode {
74    fn from(dto: SslModeDto) -> Self {
75        match dto {
76            SslModeDto::Disable => SslMode::Disable,
77            SslModeDto::Prefer => SslMode::Prefer,
78            SslModeDto::Require => SslMode::Require,
79        }
80    }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
84#[schema(as = source::postgres::TableKeyConfig)]
85#[serde(rename_all = "camelCase", deny_unknown_fields)]
86pub struct TableKeyConfigDto {
87    pub table: String,
88    pub key_columns: Vec<String>,
89}
90
91fn default_postgres_host() -> ConfigValue<String> {
92    ConfigValue::Static("localhost".to_string())
93}
94
95fn default_postgres_port() -> ConfigValue<u16> {
96    ConfigValue::Static(5432)
97}
98
99fn default_slot_name() -> String {
100    "drasi_slot".to_string()
101}
102
103fn default_publication_name() -> String {
104    "drasi_publication".to_string()
105}
106
107fn default_password() -> ConfigValue<String> {
108    ConfigValue::Static(String::new())
109}
110
111fn default_ssl_mode() -> ConfigValue<SslModeDto> {
112    ConfigValue::Static(SslModeDto::default())
113}
114
115#[derive(OpenApi)]
116#[openapi(components(schemas(PostgresSourceConfigDto, SslModeDto, TableKeyConfigDto,)))]
117struct PostgresSourceSchemas;
118
119/// Descriptor for the PostgreSQL source plugin.
120pub struct PostgresSourceDescriptor;
121
122#[async_trait]
123impl SourcePluginDescriptor for PostgresSourceDescriptor {
124    fn kind(&self) -> &str {
125        "postgres"
126    }
127
128    fn config_version(&self) -> &str {
129        "1.0.0"
130    }
131
132    fn config_schema_name(&self) -> &str {
133        "source.postgres.PostgresSourceConfig"
134    }
135
136    fn config_schema_json(&self) -> String {
137        let api = PostgresSourceSchemas::openapi();
138        serde_json::to_string(
139            &api.components
140                .as_ref()
141                .expect("OpenAPI components missing")
142                .schemas,
143        )
144        .expect("Failed to serialize config schema")
145    }
146
147    async fn create_source(
148        &self,
149        id: &str,
150        config_json: &serde_json::Value,
151        auto_start: bool,
152    ) -> anyhow::Result<Box<dyn drasi_lib::sources::Source>> {
153        let dto: PostgresSourceConfigDto = serde_json::from_value(config_json.clone())?;
154        let mapper = DtoMapper::new();
155
156        let config = PostgresSourceConfig {
157            host: mapper.resolve_string(&dto.host)?,
158            port: mapper.resolve_typed(&dto.port)?,
159            database: mapper.resolve_string(&dto.database)?,
160            user: mapper.resolve_string(&dto.user)?,
161            password: mapper.resolve_string(&dto.password)?,
162            tables: dto.tables.clone(),
163            slot_name: dto.slot_name.clone(),
164            publication_name: dto.publication_name.clone(),
165            ssl_mode: mapper.resolve_typed::<SslModeDto>(&dto.ssl_mode)?.into(),
166            table_keys: dto
167                .table_keys
168                .iter()
169                .map(|tk| TableKeyConfig {
170                    table: tk.table.clone(),
171                    key_columns: tk.key_columns.clone(),
172                })
173                .collect(),
174        };
175
176        let source = crate::PostgresSourceBuilder::new(id)
177            .with_config(config)
178            .with_auto_start(auto_start)
179            .build()?;
180
181        Ok(Box::new(source))
182    }
183}