drasi_source_postgres/
descriptor.rs1use crate::{PostgresSourceConfig, SslMode, TableKeyConfig};
18use drasi_plugin_sdk::prelude::*;
19use std::str::FromStr;
20use utoipa::OpenApi;
21
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
24#[schema(as = source::postgres::PostgresSourceConfig)]
25#[serde(rename_all = "camelCase", deny_unknown_fields)]
26pub struct PostgresSourceConfigDto {
27 #[serde(default = "default_postgres_host")]
28 pub host: ConfigValue<String>,
29 #[serde(default = "default_postgres_port")]
30 pub port: ConfigValue<u16>,
31 pub database: ConfigValue<String>,
32 pub user: ConfigValue<String>,
33 #[serde(default = "default_password")]
34 pub password: ConfigValue<String>,
35 #[serde(default)]
36 pub tables: Vec<String>,
37 #[serde(default = "default_slot_name")]
38 pub slot_name: String,
39 #[serde(default = "default_publication_name")]
40 pub publication_name: String,
41 #[serde(default = "default_ssl_mode")]
42 #[schema(value_type = ConfigValue<source::postgres::SslMode>)]
43 pub ssl_mode: ConfigValue<SslModeDto>,
44 #[serde(default)]
45 #[schema(value_type = Vec<source::postgres::TableKeyConfig>)]
46 pub table_keys: Vec<TableKeyConfigDto>,
47}
48
49#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, utoipa::ToSchema)]
50#[schema(as = source::postgres::SslMode)]
51#[serde(rename_all = "lowercase")]
52#[derive(Default)]
53pub enum SslModeDto {
54 Disable,
55 #[default]
56 Prefer,
57 Require,
58}
59
60impl FromStr for SslModeDto {
61 type Err = String;
62
63 fn from_str(s: &str) -> Result<Self, Self::Err> {
64 match s.to_lowercase().as_str() {
65 "disable" => Ok(SslModeDto::Disable),
66 "prefer" => Ok(SslModeDto::Prefer),
67 "require" => Ok(SslModeDto::Require),
68 _ => Err(format!("Invalid SSL mode: {s}")),
69 }
70 }
71}
72
73impl From<SslModeDto> for SslMode {
74 fn from(dto: SslModeDto) -> Self {
75 match dto {
76 SslModeDto::Disable => SslMode::Disable,
77 SslModeDto::Prefer => SslMode::Prefer,
78 SslModeDto::Require => SslMode::Require,
79 }
80 }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
84#[schema(as = source::postgres::TableKeyConfig)]
85#[serde(rename_all = "camelCase", deny_unknown_fields)]
86pub struct TableKeyConfigDto {
87 pub table: String,
88 pub key_columns: Vec<String>,
89}
90
91fn default_postgres_host() -> ConfigValue<String> {
92 ConfigValue::Static("localhost".to_string())
93}
94
95fn default_postgres_port() -> ConfigValue<u16> {
96 ConfigValue::Static(5432)
97}
98
99fn default_slot_name() -> String {
100 "drasi_slot".to_string()
101}
102
103fn default_publication_name() -> String {
104 "drasi_publication".to_string()
105}
106
107fn default_password() -> ConfigValue<String> {
108 ConfigValue::Static(String::new())
109}
110
111fn default_ssl_mode() -> ConfigValue<SslModeDto> {
112 ConfigValue::Static(SslModeDto::default())
113}
114
115#[derive(OpenApi)]
116#[openapi(components(schemas(PostgresSourceConfigDto, SslModeDto, TableKeyConfigDto,)))]
117struct PostgresSourceSchemas;
118
119pub struct PostgresSourceDescriptor;
121
122#[async_trait]
123impl SourcePluginDescriptor for PostgresSourceDescriptor {
124 fn kind(&self) -> &str {
125 "postgres"
126 }
127
128 fn config_version(&self) -> &str {
129 "1.0.0"
130 }
131
132 fn config_schema_name(&self) -> &str {
133 "source.postgres.PostgresSourceConfig"
134 }
135
136 fn config_schema_json(&self) -> String {
137 let api = PostgresSourceSchemas::openapi();
138 serde_json::to_string(
139 &api.components
140 .as_ref()
141 .expect("OpenAPI components missing")
142 .schemas,
143 )
144 .expect("Failed to serialize config schema")
145 }
146
147 async fn create_source(
148 &self,
149 id: &str,
150 config_json: &serde_json::Value,
151 auto_start: bool,
152 ) -> anyhow::Result<Box<dyn drasi_lib::sources::Source>> {
153 let dto: PostgresSourceConfigDto = serde_json::from_value(config_json.clone())?;
154 let mapper = DtoMapper::new();
155
156 let config = PostgresSourceConfig {
157 host: mapper.resolve_string(&dto.host)?,
158 port: mapper.resolve_typed(&dto.port)?,
159 database: mapper.resolve_string(&dto.database)?,
160 user: mapper.resolve_string(&dto.user)?,
161 password: mapper.resolve_string(&dto.password)?,
162 tables: dto.tables.clone(),
163 slot_name: dto.slot_name.clone(),
164 publication_name: dto.publication_name.clone(),
165 ssl_mode: mapper.resolve_typed::<SslModeDto>(&dto.ssl_mode)?.into(),
166 table_keys: dto
167 .table_keys
168 .iter()
169 .map(|tk| TableKeyConfig {
170 table: tk.table.clone(),
171 key_columns: tk.key_columns.clone(),
172 })
173 .collect(),
174 };
175
176 let source = crate::PostgresSourceBuilder::new(id)
177 .with_config(config)
178 .with_auto_start(auto_start)
179 .build()?;
180
181 Ok(Box::new(source))
182 }
183}