wp_connector_api/runtime/
cnn.rs

1use serde::{Deserialize, Serialize};
2
3use crate::ParamMap;
4
5/// Defines whether a connector operates as a data source or sink.
6#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
7pub enum ConnectorScope {
8    /// Connector reads data (default)
9    #[default]
10    Source,
11    /// Connector writes data
12    Sink,
13}
14
15/// Connector definition containing metadata and default configuration.
16///
17/// This struct is used to describe a connector's capabilities and defaults.
18/// It can be serialized/deserialized for configuration files.
19///
20/// # Serialization
21/// - `kind` is serialized as `"type"`
22/// - `default_params` is serialized as `"params"`
23/// - `scope` and `origin` are runtime-only fields (not serialized)
24#[derive(Debug, Clone, Deserialize, Serialize)]
25pub struct ConnectorDef {
26    /// Unique identifier for this connector instance
27    pub id: String,
28    /// Connector type (e.g., "kafka", "mysql", "elasticsearch")
29    #[serde(rename = "type")]
30    pub kind: String,
31    /// Whether this definition is for source or sink (runtime only)
32    #[serde(skip, default)]
33    pub scope: ConnectorScope,
34    /// Parameter keys that can be overridden at runtime
35    #[serde(default)]
36    pub allow_override: Vec<String>,
37    /// Default parameter values
38    #[serde(default, rename = "params")]
39    pub default_params: ParamMap,
40    /// Origin identifier for tracking (runtime only)
41    #[serde(skip, default)]
42    pub origin: Option<String>,
43}
44
45impl ConnectorDef {
46    /// Set the scope and return self for chaining.
47    pub fn with_scope(mut self, scope: ConnectorScope) -> Self {
48        self.scope = scope;
49        self
50    }
51}
52
53/// Trait for connectors that can act as a data source.
54///
55/// Implement this trait to provide source connector metadata and validation.
56/// Used by [`SourceFactory`] to obtain connector definitions.
57///
58/// # Example
59/// ```ignore
60/// impl SourceDefProvider for MyConnector {
61///     fn source_def(&self) -> ConnectorDef {
62///         ConnectorDef {
63///             id: "my-source".into(),
64///             kind: "custom".into(),
65///             scope: ConnectorScope::Source,
66///             allow_override: vec!["batch_size".into()],
67///             default_params: Default::default(),
68///             origin: None,
69///         }
70///     }
71/// }
72/// ```
73pub trait SourceDefProvider: Send + Sync + 'static {
74    /// Returns the connector definition for source mode.
75    fn source_def(&self) -> ConnectorDef;
76
77    /// Validates a source connector definition.
78    ///
79    /// Override to add custom validation logic. Returns `Ok(())` by default.
80    fn validate_source(&self, _def: &ConnectorDef) -> Result<(), String> {
81        Ok(())
82    }
83}
84
85/// Trait for connectors that can act as a data sink.
86///
87/// Implement this trait to provide sink connector metadata and validation.
88/// Used by [`SinkFactory`] to obtain connector definitions.
89///
90/// # Example
91/// ```ignore
92/// impl SinkDefProvider for MyConnector {
93///     fn sink_def(&self) -> ConnectorDef {
94///         ConnectorDef {
95///             id: "my-sink".into(),
96///             kind: "custom".into(),
97///             scope: ConnectorScope::Sink,
98///             allow_override: vec![],
99///             default_params: Default::default(),
100///             origin: None,
101///         }
102///     }
103/// }
104/// ```
105pub trait SinkDefProvider: Send + Sync + 'static {
106    /// Returns the connector definition for sink mode.
107    fn sink_def(&self) -> ConnectorDef;
108
109    /// Validates a sink connector definition.
110    ///
111    /// Override to add custom validation logic. Returns `Ok(())` by default.
112    fn validate_sink(&self, _def: &ConnectorDef) -> Result<(), String> {
113        Ok(())
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120
121    #[test]
122    fn test_connector_scope_default() {
123        let scope = ConnectorScope::default();
124        assert_eq!(scope, ConnectorScope::Source);
125    }
126
127    #[test]
128    fn test_connector_def_serde_and_with_scope() {
129        let json = r#"{"id": "mysql-prod", "type": "mysql", "params": {"host": "localhost"}}"#;
130        let def: ConnectorDef = serde_json::from_str(json).unwrap();
131
132        assert_eq!(def.id, "mysql-prod");
133        assert_eq!(def.kind, "mysql");
134        assert_eq!(def.scope, ConnectorScope::Source); // default
135        assert_eq!(def.default_params.get("host").unwrap(), "localhost");
136
137        let def = def.with_scope(ConnectorScope::Sink);
138        assert_eq!(def.scope, ConnectorScope::Sink);
139    }
140
141    // Test that SourceDefProvider can be implemented independently
142    struct SourceOnlyConnector;
143
144    impl SourceDefProvider for SourceOnlyConnector {
145        fn source_def(&self) -> ConnectorDef {
146            ConnectorDef {
147                id: "source-only".into(),
148                kind: "test".into(),
149                scope: ConnectorScope::Source,
150                allow_override: vec![],
151                default_params: Default::default(),
152                origin: None,
153            }
154        }
155    }
156
157    #[test]
158    fn test_source_only_connector() {
159        let connector = SourceOnlyConnector;
160        let def = connector.source_def();
161        assert_eq!(def.id, "source-only");
162        assert_eq!(def.scope, ConnectorScope::Source);
163
164        // validate_source has default implementation
165        assert!(connector.validate_source(&def).is_ok());
166    }
167
168    // Test that SinkDefProvider can be implemented independently
169    struct SinkOnlyConnector;
170
171    impl SinkDefProvider for SinkOnlyConnector {
172        fn sink_def(&self) -> ConnectorDef {
173            ConnectorDef {
174                id: "sink-only".into(),
175                kind: "test".into(),
176                scope: ConnectorScope::Sink,
177                allow_override: vec![],
178                default_params: Default::default(),
179                origin: None,
180            }
181        }
182    }
183
184    #[test]
185    fn test_sink_only_connector() {
186        let connector = SinkOnlyConnector;
187        let def = connector.sink_def();
188        assert_eq!(def.id, "sink-only");
189        assert_eq!(def.scope, ConnectorScope::Sink);
190
191        // validate_sink has default implementation
192        assert!(connector.validate_sink(&def).is_ok());
193    }
194}