Skip to main content

drasi_source_mssql/
descriptor.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! MS SQL source plugin descriptor and configuration DTOs.
16
17use crate::{AuthMode, EncryptionMode, MsSqlSourceBuilder, StartPosition, TableKeyConfig};
18use drasi_plugin_sdk::prelude::*;
19use std::str::FromStr;
20use utoipa::OpenApi;
21
22/// MS SQL source configuration DTO.
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
24#[schema(as = source::mssql::MsSqlSourceConfig)]
25#[serde(rename_all = "camelCase", deny_unknown_fields)]
26pub struct MsSqlSourceConfigDto {
27    #[serde(default = "default_mssql_host")]
28    pub host: ConfigValue<String>,
29    #[serde(default = "default_mssql_port")]
30    pub port: ConfigValue<u16>,
31    pub database: ConfigValue<String>,
32    pub user: ConfigValue<String>,
33    #[serde(default = "default_mssql_password")]
34    pub password: ConfigValue<String>,
35    #[serde(default)]
36    #[schema(value_type = AuthModeDto)]
37    pub auth_mode: ConfigValue<AuthModeDto>,
38    #[serde(default)]
39    pub tables: Vec<String>,
40    #[serde(default = "default_poll_interval_ms")]
41    pub poll_interval_ms: ConfigValue<u64>,
42    #[serde(default)]
43    #[schema(value_type = EncryptionModeDto)]
44    pub encryption: ConfigValue<EncryptionModeDto>,
45    #[serde(default)]
46    pub trust_server_certificate: ConfigValue<bool>,
47    #[serde(default)]
48    #[schema(value_type = Vec<TableKeyConfigDto>)]
49    pub table_keys: Vec<TableKeyConfigDto>,
50    #[serde(default)]
51    #[schema(value_type = StartPositionDto)]
52    pub start_position: ConfigValue<StartPositionDto>,
53}
54
55#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, utoipa::ToSchema)]
56#[schema(as = source::mssql::AuthMode)]
57#[serde(rename_all = "lowercase")]
58#[derive(Default)]
59pub enum AuthModeDto {
60    #[default]
61    SqlServer,
62    Windows,
63    AzureAd,
64}
65
66impl FromStr for AuthModeDto {
67    type Err = String;
68
69    fn from_str(s: &str) -> Result<Self, Self::Err> {
70        match s.to_lowercase().as_str() {
71            "sqlserver" | "sql_server" => Ok(AuthModeDto::SqlServer),
72            "windows" => Ok(AuthModeDto::Windows),
73            "azuread" | "azure_ad" => Ok(AuthModeDto::AzureAd),
74            _ => Err(format!("Invalid auth mode: {s}")),
75        }
76    }
77}
78
79impl From<AuthModeDto> for AuthMode {
80    fn from(dto: AuthModeDto) -> Self {
81        match dto {
82            AuthModeDto::SqlServer => AuthMode::SqlServer,
83            AuthModeDto::Windows => AuthMode::Windows,
84            AuthModeDto::AzureAd => AuthMode::AzureAd,
85        }
86    }
87}
88
89#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, utoipa::ToSchema)]
90#[schema(as = source::mssql::EncryptionMode)]
91#[serde(rename_all = "lowercase")]
92#[derive(Default)]
93pub enum EncryptionModeDto {
94    Off,
95    On,
96    #[default]
97    NotSupported,
98}
99
100impl FromStr for EncryptionModeDto {
101    type Err = String;
102
103    fn from_str(s: &str) -> Result<Self, Self::Err> {
104        match s.to_lowercase().as_str() {
105            "off" => Ok(EncryptionModeDto::Off),
106            "on" => Ok(EncryptionModeDto::On),
107            "notsupported" | "not_supported" => Ok(EncryptionModeDto::NotSupported),
108            _ => Err(format!("Invalid encryption mode: {s}")),
109        }
110    }
111}
112
113impl From<EncryptionModeDto> for EncryptionMode {
114    fn from(dto: EncryptionModeDto) -> Self {
115        match dto {
116            EncryptionModeDto::Off => EncryptionMode::Off,
117            EncryptionModeDto::On => EncryptionMode::On,
118            EncryptionModeDto::NotSupported => EncryptionMode::NotSupported,
119        }
120    }
121}
122
123#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, utoipa::ToSchema)]
124#[schema(as = source::mssql::StartPosition)]
125#[serde(rename_all = "lowercase")]
126#[derive(Default)]
127pub enum StartPositionDto {
128    Beginning,
129    #[default]
130    Current,
131}
132
133impl FromStr for StartPositionDto {
134    type Err = String;
135
136    fn from_str(s: &str) -> Result<Self, Self::Err> {
137        match s.to_lowercase().as_str() {
138            "beginning" => Ok(StartPositionDto::Beginning),
139            "current" => Ok(StartPositionDto::Current),
140            _ => Err(format!("Invalid start position: {s}")),
141        }
142    }
143}
144
145impl From<StartPositionDto> for StartPosition {
146    fn from(dto: StartPositionDto) -> Self {
147        match dto {
148            StartPositionDto::Beginning => StartPosition::Beginning,
149            StartPositionDto::Current => StartPosition::Current,
150        }
151    }
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
155#[schema(as = source::mssql::TableKeyConfig)]
156#[serde(rename_all = "camelCase", deny_unknown_fields)]
157pub struct TableKeyConfigDto {
158    pub table: String,
159    pub key_columns: Vec<String>,
160}
161
162fn default_mssql_host() -> ConfigValue<String> {
163    ConfigValue::Static("localhost".to_string())
164}
165
166fn default_mssql_port() -> ConfigValue<u16> {
167    ConfigValue::Static(1433)
168}
169
170fn default_mssql_password() -> ConfigValue<String> {
171    ConfigValue::Static(String::new())
172}
173
174fn default_poll_interval_ms() -> ConfigValue<u64> {
175    ConfigValue::Static(1000)
176}
177
178#[derive(OpenApi)]
179#[openapi(components(schemas(
180    MsSqlSourceConfigDto,
181    AuthModeDto,
182    EncryptionModeDto,
183    StartPositionDto,
184    TableKeyConfigDto,
185)))]
186struct MsSqlSourceSchemas;
187
188/// Descriptor for the MS SQL source plugin.
189pub struct MsSqlSourceDescriptor;
190
191#[async_trait]
192impl SourcePluginDescriptor for MsSqlSourceDescriptor {
193    fn kind(&self) -> &str {
194        "mssql"
195    }
196
197    fn config_version(&self) -> &str {
198        "1.0.0"
199    }
200
201    fn config_schema_name(&self) -> &str {
202        "source.mssql.MsSqlSourceConfig"
203    }
204
205    fn config_schema_json(&self) -> String {
206        let api = MsSqlSourceSchemas::openapi();
207        serde_json::to_string(
208            &api.components
209                .as_ref()
210                .expect("OpenAPI components missing")
211                .schemas,
212        )
213        .expect("Failed to serialize config schema")
214    }
215
216    async fn create_source(
217        &self,
218        id: &str,
219        config_json: &serde_json::Value,
220        _auto_start: bool,
221    ) -> anyhow::Result<Box<dyn drasi_lib::sources::Source>> {
222        let dto: MsSqlSourceConfigDto = serde_json::from_value(config_json.clone())?;
223        let mapper = DtoMapper::new();
224
225        let host: String = mapper.resolve_string(&dto.host)?;
226        let port: u16 = mapper.resolve_typed(&dto.port)?;
227        let database: String = mapper.resolve_string(&dto.database)?;
228        let user: String = mapper.resolve_string(&dto.user)?;
229        let password: String = mapper.resolve_string(&dto.password)?;
230        let auth_mode: AuthMode = mapper.resolve_typed::<AuthModeDto>(&dto.auth_mode)?.into();
231        let poll_interval_ms: u64 = mapper.resolve_typed(&dto.poll_interval_ms)?;
232        let encryption: EncryptionMode = mapper
233            .resolve_typed::<EncryptionModeDto>(&dto.encryption)?
234            .into();
235        let trust_server_certificate: bool = mapper.resolve_typed(&dto.trust_server_certificate)?;
236        let start_position: StartPosition = mapper
237            .resolve_typed::<StartPositionDto>(&dto.start_position)?
238            .into();
239
240        let mut builder = MsSqlSourceBuilder::new(id)
241            .with_host(host)
242            .with_port(port)
243            .with_database(database)
244            .with_user(user)
245            .with_password(password)
246            .with_auth_mode(auth_mode)
247            .with_tables(dto.tables.clone())
248            .with_poll_interval_ms(poll_interval_ms)
249            .with_encryption(encryption)
250            .with_trust_server_certificate(trust_server_certificate)
251            .with_start_position(start_position);
252
253        for tk in &dto.table_keys {
254            builder = builder.with_table_key(tk.table.clone(), tk.key_columns.clone());
255        }
256
257        let source = builder.build()?;
258        Ok(Box::new(source))
259    }
260}