1use crate::{PostgresSourceConfig, SslMode, TableKeyConfig};
18use drasi_plugin_sdk::prelude::*;
19use std::str::FromStr;
20use utoipa::OpenApi;
21
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
24#[schema(as = source::postgres::PostgresSourceConfig)]
25#[serde(rename_all = "camelCase", deny_unknown_fields)]
26pub struct PostgresSourceConfigDto {
27 #[serde(default = "default_postgres_host")]
28 pub host: ConfigValue<String>,
29 #[serde(default = "default_postgres_port")]
30 pub port: ConfigValue<u16>,
31 pub database: ConfigValue<String>,
32 pub user: ConfigValue<String>,
33 #[serde(default = "default_password")]
34 pub password: ConfigValue<String>,
35 #[serde(default)]
36 pub tables: Vec<String>,
37 #[serde(default = "default_slot_name")]
38 pub slot_name: String,
39 #[serde(default = "default_publication_name")]
40 pub publication_name: String,
41 #[serde(default = "default_ssl_mode")]
42 #[schema(value_type = ConfigValue<source::postgres::SslMode>)]
43 pub ssl_mode: ConfigValue<SslModeDto>,
44 #[serde(default)]
45 #[schema(value_type = Vec<source::postgres::TableKeyConfig>)]
46 pub table_keys: Vec<TableKeyConfigDto>,
47}
48
49#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, utoipa::ToSchema)]
50#[schema(as = source::postgres::SslMode)]
51#[serde(rename_all = "lowercase")]
52#[derive(Default)]
53pub enum SslModeDto {
54 Disable,
55 #[default]
56 Prefer,
57 Require,
58}
59
60impl FromStr for SslModeDto {
61 type Err = String;
62
63 fn from_str(s: &str) -> Result<Self, Self::Err> {
64 match s.to_lowercase().as_str() {
65 "disable" => Ok(SslModeDto::Disable),
66 "prefer" => Ok(SslModeDto::Prefer),
67 "require" => Ok(SslModeDto::Require),
68 _ => Err(format!("Invalid SSL mode: {s}")),
69 }
70 }
71}
72
73impl From<SslModeDto> for SslMode {
74 fn from(dto: SslModeDto) -> Self {
75 match dto {
76 SslModeDto::Disable => SslMode::Disable,
77 SslModeDto::Prefer => SslMode::Prefer,
78 SslModeDto::Require => SslMode::Require,
79 }
80 }
81}
82
83impl From<&SslMode> for SslModeDto {
84 fn from(mode: &SslMode) -> Self {
85 match mode {
86 SslMode::Disable => SslModeDto::Disable,
87 SslMode::Prefer => SslModeDto::Prefer,
88 SslMode::Require => SslModeDto::Require,
89 }
90 }
91}
92
93impl From<&TableKeyConfig> for TableKeyConfigDto {
94 fn from(tk: &TableKeyConfig) -> Self {
95 Self {
96 table: tk.table.clone(),
97 key_columns: tk.key_columns.clone(),
98 }
99 }
100}
101
102impl From<&PostgresSourceConfig> for PostgresSourceConfigDto {
103 fn from(config: &PostgresSourceConfig) -> Self {
104 Self {
105 host: ConfigValue::Static(config.host.clone()),
106 port: ConfigValue::Static(config.port),
107 database: ConfigValue::Static(config.database.clone()),
108 user: ConfigValue::Static(config.user.clone()),
109 password: ConfigValue::Static(config.password.clone()),
110 tables: config.tables.clone(),
111 slot_name: config.slot_name.clone(),
112 publication_name: config.publication_name.clone(),
113 ssl_mode: ConfigValue::Static(SslModeDto::from(&config.ssl_mode)),
114 table_keys: config
115 .table_keys
116 .iter()
117 .map(TableKeyConfigDto::from)
118 .collect(),
119 }
120 }
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
124#[schema(as = source::postgres::TableKeyConfig)]
125#[serde(rename_all = "camelCase", deny_unknown_fields)]
126pub struct TableKeyConfigDto {
127 pub table: String,
128 pub key_columns: Vec<String>,
129}
130
131fn default_postgres_host() -> ConfigValue<String> {
132 ConfigValue::Static("localhost".to_string())
133}
134
135fn default_postgres_port() -> ConfigValue<u16> {
136 ConfigValue::Static(5432)
137}
138
139fn default_slot_name() -> String {
140 "drasi_slot".to_string()
141}
142
143fn default_publication_name() -> String {
144 "drasi_publication".to_string()
145}
146
147fn default_password() -> ConfigValue<String> {
148 ConfigValue::Static(String::new())
149}
150
151fn default_ssl_mode() -> ConfigValue<SslModeDto> {
152 ConfigValue::Static(SslModeDto::default())
153}
154
155#[derive(OpenApi)]
156#[openapi(components(schemas(PostgresSourceConfigDto, SslModeDto, TableKeyConfigDto,)))]
157struct PostgresSourceSchemas;
158
159pub struct PostgresSourceDescriptor;
161
162#[async_trait]
163impl SourcePluginDescriptor for PostgresSourceDescriptor {
164 fn kind(&self) -> &str {
165 "postgres"
166 }
167
168 fn config_version(&self) -> &str {
169 "1.0.0"
170 }
171
172 fn config_schema_name(&self) -> &str {
173 "source.postgres.PostgresSourceConfig"
174 }
175
176 fn config_schema_json(&self) -> String {
177 use drasi_plugin_sdk::schema_ui::SchemaUiAnnotator;
178 let api = PostgresSourceSchemas::openapi();
179 let schemas = serde_json::to_value(
180 &api.components
181 .as_ref()
182 .expect("OpenAPI components missing")
183 .schemas,
184 )
185 .expect("Failed to serialize config schema");
186
187 SchemaUiAnnotator::new(schemas, "source.postgres.PostgresSourceConfig")
188 .expect("root schema not found")
189 .field("host", |f| {
190 f.group("Connection").order(1).placeholder("localhost")
191 })
192 .field("port", |f| {
193 f.group("Connection").order(2).placeholder("5432")
194 })
195 .field("database", |f| {
196 f.group("Connection").order(3).placeholder("mydb")
197 })
198 .field("user", |f| {
199 f.group("Authentication").order(1).placeholder("postgres")
200 })
201 .field("password", |f| {
202 f.group("Authentication").order(2).widget("password")
203 })
204 .field("tables", |f| f.group("Tables").order(1))
205 .field("tableKeys", |f| f.group("Tables").order(2))
206 .field("slotName", |f| {
207 f.group("Replication")
208 .order(1)
209 .placeholder("drasi_slot")
210 .collapsed(true)
211 })
212 .field("publicationName", |f| {
213 f.group("Replication")
214 .order(2)
215 .placeholder("drasi_publication")
216 })
217 .field("sslMode", |f| f.group("SSL").order(1).collapsed(true))
218 .annotate()
219 }
220
221 async fn create_source(
222 &self,
223 id: &str,
224 config_json: &serde_json::Value,
225 auto_start: bool,
226 ) -> anyhow::Result<Box<dyn drasi_lib::sources::Source>> {
227 let dto: PostgresSourceConfigDto = serde_json::from_value(config_json.clone())?;
228 let mapper = DtoMapper::new();
229
230 let config = PostgresSourceConfig {
231 host: mapper.resolve_string(&dto.host).await?,
232 port: mapper.resolve_typed(&dto.port).await?,
233 database: mapper.resolve_string(&dto.database).await?,
234 user: mapper.resolve_string(&dto.user).await?,
235 password: mapper.resolve_string(&dto.password).await?,
236 tables: dto.tables.clone(),
237 slot_name: dto.slot_name.clone(),
238 publication_name: dto.publication_name.clone(),
239 ssl_mode: mapper
240 .resolve_typed::<SslModeDto>(&dto.ssl_mode)
241 .await?
242 .into(),
243 table_keys: dto
244 .table_keys
245 .iter()
246 .map(|tk| TableKeyConfig {
247 table: tk.table.clone(),
248 key_columns: tk.key_columns.clone(),
249 })
250 .collect(),
251 };
252
253 let mut source = crate::PostgresSourceBuilder::new(id)
254 .with_config(config)
255 .with_auto_start(auto_start)
256 .build()?;
257
258 source.base.set_raw_config(config_json.clone());
259
260 Ok(Box::new(source))
261 }
262}