1use crate::{AuthMode, EncryptionMode, MsSqlSourceBuilder, StartPosition, TableKeyConfig};
18use drasi_plugin_sdk::prelude::*;
19use std::str::FromStr;
20use utoipa::OpenApi;
21
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
24#[schema(as = source::mssql::MsSqlSourceConfig)]
25#[serde(rename_all = "camelCase", deny_unknown_fields)]
26pub struct MsSqlSourceConfigDto {
27 #[serde(default = "default_mssql_host")]
28 pub host: ConfigValue<String>,
29 #[serde(default = "default_mssql_port")]
30 pub port: ConfigValue<u16>,
31 pub database: ConfigValue<String>,
32 pub user: ConfigValue<String>,
33 #[serde(default = "default_mssql_password")]
34 pub password: ConfigValue<String>,
35 #[serde(default)]
36 #[schema(value_type = AuthModeDto)]
37 pub auth_mode: ConfigValue<AuthModeDto>,
38 #[serde(default)]
39 pub tables: Vec<String>,
40 #[serde(default = "default_poll_interval_ms")]
41 pub poll_interval_ms: ConfigValue<u64>,
42 #[serde(default)]
43 #[schema(value_type = EncryptionModeDto)]
44 pub encryption: ConfigValue<EncryptionModeDto>,
45 #[serde(default)]
46 pub trust_server_certificate: ConfigValue<bool>,
47 #[serde(default)]
48 #[schema(value_type = Vec<TableKeyConfigDto>)]
49 pub table_keys: Vec<TableKeyConfigDto>,
50 #[serde(default)]
51 #[schema(value_type = StartPositionDto)]
52 pub start_position: ConfigValue<StartPositionDto>,
53}
54
55#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, utoipa::ToSchema)]
56#[schema(as = source::mssql::AuthMode)]
57#[serde(rename_all = "lowercase")]
58#[derive(Default)]
59pub enum AuthModeDto {
60 #[default]
61 SqlServer,
62 Windows,
63 AzureAd,
64}
65
66impl FromStr for AuthModeDto {
67 type Err = String;
68
69 fn from_str(s: &str) -> Result<Self, Self::Err> {
70 match s.to_lowercase().as_str() {
71 "sqlserver" | "sql_server" => Ok(AuthModeDto::SqlServer),
72 "windows" => Ok(AuthModeDto::Windows),
73 "azuread" | "azure_ad" => Ok(AuthModeDto::AzureAd),
74 _ => Err(format!("Invalid auth mode: {s}")),
75 }
76 }
77}
78
79impl From<AuthModeDto> for AuthMode {
80 fn from(dto: AuthModeDto) -> Self {
81 match dto {
82 AuthModeDto::SqlServer => AuthMode::SqlServer,
83 AuthModeDto::Windows => AuthMode::Windows,
84 AuthModeDto::AzureAd => AuthMode::AzureAd,
85 }
86 }
87}
88
89#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, utoipa::ToSchema)]
90#[schema(as = source::mssql::EncryptionMode)]
91#[serde(rename_all = "lowercase")]
92#[derive(Default)]
93pub enum EncryptionModeDto {
94 Off,
95 On,
96 #[default]
97 NotSupported,
98}
99
100impl FromStr for EncryptionModeDto {
101 type Err = String;
102
103 fn from_str(s: &str) -> Result<Self, Self::Err> {
104 match s.to_lowercase().as_str() {
105 "off" => Ok(EncryptionModeDto::Off),
106 "on" => Ok(EncryptionModeDto::On),
107 "notsupported" | "not_supported" => Ok(EncryptionModeDto::NotSupported),
108 _ => Err(format!("Invalid encryption mode: {s}")),
109 }
110 }
111}
112
113impl From<EncryptionModeDto> for EncryptionMode {
114 fn from(dto: EncryptionModeDto) -> Self {
115 match dto {
116 EncryptionModeDto::Off => EncryptionMode::Off,
117 EncryptionModeDto::On => EncryptionMode::On,
118 EncryptionModeDto::NotSupported => EncryptionMode::NotSupported,
119 }
120 }
121}
122
123#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, utoipa::ToSchema)]
124#[schema(as = source::mssql::StartPosition)]
125#[serde(rename_all = "lowercase")]
126#[derive(Default)]
127pub enum StartPositionDto {
128 Beginning,
129 #[default]
130 Current,
131}
132
133impl FromStr for StartPositionDto {
134 type Err = String;
135
136 fn from_str(s: &str) -> Result<Self, Self::Err> {
137 match s.to_lowercase().as_str() {
138 "beginning" => Ok(StartPositionDto::Beginning),
139 "current" => Ok(StartPositionDto::Current),
140 _ => Err(format!("Invalid start position: {s}")),
141 }
142 }
143}
144
145impl From<StartPositionDto> for StartPosition {
146 fn from(dto: StartPositionDto) -> Self {
147 match dto {
148 StartPositionDto::Beginning => StartPosition::Beginning,
149 StartPositionDto::Current => StartPosition::Current,
150 }
151 }
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
155#[schema(as = source::mssql::TableKeyConfig)]
156#[serde(rename_all = "camelCase", deny_unknown_fields)]
157pub struct TableKeyConfigDto {
158 pub table: String,
159 pub key_columns: Vec<String>,
160}
161
162fn default_mssql_host() -> ConfigValue<String> {
163 ConfigValue::Static("localhost".to_string())
164}
165
166fn default_mssql_port() -> ConfigValue<u16> {
167 ConfigValue::Static(1433)
168}
169
170fn default_mssql_password() -> ConfigValue<String> {
171 ConfigValue::Static(String::new())
172}
173
174fn default_poll_interval_ms() -> ConfigValue<u64> {
175 ConfigValue::Static(1000)
176}
177
178#[derive(OpenApi)]
179#[openapi(components(schemas(
180 MsSqlSourceConfigDto,
181 AuthModeDto,
182 EncryptionModeDto,
183 StartPositionDto,
184 TableKeyConfigDto,
185)))]
186struct MsSqlSourceSchemas;
187
188pub struct MsSqlSourceDescriptor;
190
191#[async_trait]
192impl SourcePluginDescriptor for MsSqlSourceDescriptor {
193 fn kind(&self) -> &str {
194 "mssql"
195 }
196
197 fn config_version(&self) -> &str {
198 "1.0.0"
199 }
200
201 fn config_schema_name(&self) -> &str {
202 "source.mssql.MsSqlSourceConfig"
203 }
204
205 fn config_schema_json(&self) -> String {
206 let api = MsSqlSourceSchemas::openapi();
207 serde_json::to_string(
208 &api.components
209 .as_ref()
210 .expect("OpenAPI components missing")
211 .schemas,
212 )
213 .expect("Failed to serialize config schema")
214 }
215
216 async fn create_source(
217 &self,
218 id: &str,
219 config_json: &serde_json::Value,
220 _auto_start: bool,
221 ) -> anyhow::Result<Box<dyn drasi_lib::sources::Source>> {
222 let dto: MsSqlSourceConfigDto = serde_json::from_value(config_json.clone())?;
223 let mapper = DtoMapper::new();
224
225 let host: String = mapper.resolve_string(&dto.host)?;
226 let port: u16 = mapper.resolve_typed(&dto.port)?;
227 let database: String = mapper.resolve_string(&dto.database)?;
228 let user: String = mapper.resolve_string(&dto.user)?;
229 let password: String = mapper.resolve_string(&dto.password)?;
230 let auth_mode: AuthMode = mapper.resolve_typed::<AuthModeDto>(&dto.auth_mode)?.into();
231 let poll_interval_ms: u64 = mapper.resolve_typed(&dto.poll_interval_ms)?;
232 let encryption: EncryptionMode = mapper
233 .resolve_typed::<EncryptionModeDto>(&dto.encryption)?
234 .into();
235 let trust_server_certificate: bool = mapper.resolve_typed(&dto.trust_server_certificate)?;
236 let start_position: StartPosition = mapper
237 .resolve_typed::<StartPositionDto>(&dto.start_position)?
238 .into();
239
240 let mut builder = MsSqlSourceBuilder::new(id)
241 .with_host(host)
242 .with_port(port)
243 .with_database(database)
244 .with_user(user)
245 .with_password(password)
246 .with_auth_mode(auth_mode)
247 .with_tables(dto.tables.clone())
248 .with_poll_interval_ms(poll_interval_ms)
249 .with_encryption(encryption)
250 .with_trust_server_certificate(trust_server_certificate)
251 .with_start_position(start_position);
252
253 for tk in &dto.table_keys {
254 builder = builder.with_table_key(tk.table.clone(), tk.key_columns.clone());
255 }
256
257 let source = builder.build()?;
258 Ok(Box::new(source))
259 }
260}