use crate::{PostgresSourceConfig, SslMode, TableKeyConfig};
use drasi_plugin_sdk::prelude::*;
use std::str::FromStr;
use utoipa::OpenApi;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
#[schema(as = source::postgres::PostgresSourceConfig)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct PostgresSourceConfigDto {
#[serde(default = "default_postgres_host")]
pub host: ConfigValue<String>,
#[serde(default = "default_postgres_port")]
pub port: ConfigValue<u16>,
pub database: ConfigValue<String>,
pub user: ConfigValue<String>,
#[serde(default = "default_password")]
pub password: ConfigValue<String>,
#[serde(default)]
pub tables: Vec<String>,
#[serde(default = "default_slot_name")]
pub slot_name: String,
#[serde(default = "default_publication_name")]
pub publication_name: String,
#[serde(default = "default_ssl_mode")]
#[schema(value_type = ConfigValue<source::postgres::SslMode>)]
pub ssl_mode: ConfigValue<SslModeDto>,
#[serde(default)]
#[schema(value_type = Vec<source::postgres::TableKeyConfig>)]
pub table_keys: Vec<TableKeyConfigDto>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, utoipa::ToSchema)]
#[schema(as = source::postgres::SslMode)]
#[serde(rename_all = "lowercase")]
#[derive(Default)]
pub enum SslModeDto {
Disable,
#[default]
Prefer,
Require,
}
impl FromStr for SslModeDto {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"disable" => Ok(SslModeDto::Disable),
"prefer" => Ok(SslModeDto::Prefer),
"require" => Ok(SslModeDto::Require),
_ => Err(format!("Invalid SSL mode: {s}")),
}
}
}
impl From<SslModeDto> for SslMode {
fn from(dto: SslModeDto) -> Self {
match dto {
SslModeDto::Disable => SslMode::Disable,
SslModeDto::Prefer => SslMode::Prefer,
SslModeDto::Require => SslMode::Require,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
#[schema(as = source::postgres::TableKeyConfig)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct TableKeyConfigDto {
pub table: String,
pub key_columns: Vec<String>,
}
fn default_postgres_host() -> ConfigValue<String> {
ConfigValue::Static("localhost".to_string())
}
fn default_postgres_port() -> ConfigValue<u16> {
ConfigValue::Static(5432)
}
fn default_slot_name() -> String {
"drasi_slot".to_string()
}
fn default_publication_name() -> String {
"drasi_publication".to_string()
}
fn default_password() -> ConfigValue<String> {
ConfigValue::Static(String::new())
}
fn default_ssl_mode() -> ConfigValue<SslModeDto> {
ConfigValue::Static(SslModeDto::default())
}
#[derive(OpenApi)]
#[openapi(components(schemas(PostgresSourceConfigDto, SslModeDto, TableKeyConfigDto,)))]
struct PostgresSourceSchemas;
pub struct PostgresSourceDescriptor;
#[async_trait]
impl SourcePluginDescriptor for PostgresSourceDescriptor {
fn kind(&self) -> &str {
"postgres"
}
fn config_version(&self) -> &str {
"1.0.0"
}
fn config_schema_name(&self) -> &str {
"source.postgres.PostgresSourceConfig"
}
fn config_schema_json(&self) -> String {
let api = PostgresSourceSchemas::openapi();
serde_json::to_string(
&api.components
.as_ref()
.expect("OpenAPI components missing")
.schemas,
)
.expect("Failed to serialize config schema")
}
async fn create_source(
&self,
id: &str,
config_json: &serde_json::Value,
auto_start: bool,
) -> anyhow::Result<Box<dyn drasi_lib::sources::Source>> {
let dto: PostgresSourceConfigDto = serde_json::from_value(config_json.clone())?;
let mapper = DtoMapper::new();
let config = PostgresSourceConfig {
host: mapper.resolve_string(&dto.host)?,
port: mapper.resolve_typed(&dto.port)?,
database: mapper.resolve_string(&dto.database)?,
user: mapper.resolve_string(&dto.user)?,
password: mapper.resolve_string(&dto.password)?,
tables: dto.tables.clone(),
slot_name: dto.slot_name.clone(),
publication_name: dto.publication_name.clone(),
ssl_mode: mapper.resolve_typed::<SslModeDto>(&dto.ssl_mode)?.into(),
table_keys: dto
.table_keys
.iter()
.map(|tk| TableKeyConfig {
table: tk.table.clone(),
key_columns: tk.key_columns.clone(),
})
.collect(),
};
let source = crate::PostgresSourceBuilder::new(id)
.with_config(config)
.with_auto_start(auto_start)
.build()?;
Ok(Box::new(source))
}
}