wp_connector_api/runtime/
cnn.rs

1use serde::{Deserialize, Serialize};
2
3use crate::ParamMap;
4
5/// Defines whether a connector operates as a data source or sink.
6#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
7pub enum ConnectorScope {
8    /// Connector reads data (default)
9    #[default]
10    Source,
11    /// Connector writes data
12    Sink,
13}
14
15/// Connector definition containing metadata and default configuration.
16///
17/// This struct is used to describe a connector's capabilities and defaults.
18/// It can be serialized/deserialized for configuration files.
19///
20/// # Serialization
21/// - `kind` is serialized as `"type"`
22/// - `default_params` is serialized as `"params"`
23/// - `scope` and `origin` are runtime-only fields (not serialized)
24#[derive(Debug, Clone, Deserialize, Serialize)]
25pub struct ConnectorDef {
26    /// Unique identifier for this connector instance
27    pub id: String,
28    /// Connector type (e.g., "kafka", "mysql", "elasticsearch")
29    #[serde(rename = "type")]
30    pub kind: String,
31    /// Whether this definition is for source or sink (runtime only)
32    #[serde(skip, default)]
33    pub scope: ConnectorScope,
34    /// Parameter keys that can be overridden at runtime
35    #[serde(default)]
36    pub allow_override: Vec<String>,
37    /// Default parameter values
38    #[serde(default, rename = "params")]
39    pub default_params: ParamMap,
40    /// Origin identifier for tracking (runtime only)
41    #[serde(skip, default)]
42    pub origin: Option<String>,
43}
44
45impl ConnectorDef {
46    /// Set the scope and return self for chaining.
47    pub fn with_scope(mut self, scope: ConnectorScope) -> Self {
48        self.scope = scope;
49        self
50    }
51}
52
53/// Trait for connectors that can act as a data source.
54///
55/// Implement this trait to provide source connector metadata and validation.
56/// Used by [`SourceFactory`] to obtain connector definitions.
57///
58/// # Example
59/// ```ignore
60/// impl SourceDefProvider for MyConnector {
61///     fn source_def(&self) -> ConnectorDef {
62///         ConnectorDef {
63///             id: "my-source".into(),
64///             kind: "custom".into(),
65///             scope: ConnectorScope::Source,
66///             allow_override: vec!["batch_size".into()],
67///             default_params: Default::default(),
68///             origin: None,
69///         }
70///     }
71/// }
72/// ```
73pub trait SourceDefProvider: Send + Sync + 'static {
74    /// Returns the connector definition for source mode.
75    fn source_def(&self) -> ConnectorDef;
76
77    fn source_defs(&self) -> Vec<ConnectorDef> {
78        [self.source_def()].into()
79    }
80    /// Validates a source connector definition.
81    ///
82    /// Override to add custom validation logic. Returns `Ok(())` by default.
83    fn validate_source(&self, _def: &ConnectorDef) -> Result<(), String> {
84        Ok(())
85    }
86}
87
88/// Trait for connectors that can act as a data sink.
89///
90/// Implement this trait to provide sink connector metadata and validation.
91/// Used by [`SinkFactory`] to obtain connector definitions.
92///
93/// # Example
94/// ```ignore
95/// impl SinkDefProvider for MyConnector {
96///     fn sink_def(&self) -> ConnectorDef {
97///         ConnectorDef {
98///             id: "my-sink".into(),
99///             kind: "custom".into(),
100///             scope: ConnectorScope::Sink,
101///             allow_override: vec![],
102///             default_params: Default::default(),
103///             origin: None,
104///         }
105///     }
106/// }
107/// ```
108pub trait SinkDefProvider: Send + Sync + 'static {
109    /// Returns the connector definition for sink mode.
110    fn sink_def(&self) -> ConnectorDef;
111
112    fn sink_defs(&self) -> Vec<ConnectorDef> {
113        [self.sink_def()].into()
114    }
115    /// Validates a sink connector definition.
116    ///
117    /// Override to add custom validation logic. Returns `Ok(())` by default.
118    fn validate_sink(&self, _def: &ConnectorDef) -> Result<(), String> {
119        Ok(())
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126
127    #[test]
128    fn test_connector_scope_default() {
129        let scope = ConnectorScope::default();
130        assert_eq!(scope, ConnectorScope::Source);
131    }
132
133    #[test]
134    fn test_connector_def_serde_and_with_scope() {
135        let json = r#"{"id": "mysql-prod", "type": "mysql", "params": {"host": "localhost"}}"#;
136        let def: ConnectorDef = serde_json::from_str(json).unwrap();
137
138        assert_eq!(def.id, "mysql-prod");
139        assert_eq!(def.kind, "mysql");
140        assert_eq!(def.scope, ConnectorScope::Source); // default
141        assert_eq!(def.default_params.get("host").unwrap(), "localhost");
142
143        let def = def.with_scope(ConnectorScope::Sink);
144        assert_eq!(def.scope, ConnectorScope::Sink);
145    }
146
147    // Test that SourceDefProvider can be implemented independently
148    struct SourceOnlyConnector;
149
150    impl SourceDefProvider for SourceOnlyConnector {
151        fn source_def(&self) -> ConnectorDef {
152            ConnectorDef {
153                id: "source-only".into(),
154                kind: "test".into(),
155                scope: ConnectorScope::Source,
156                allow_override: vec![],
157                default_params: Default::default(),
158                origin: None,
159            }
160        }
161    }
162
163    #[test]
164    fn test_source_only_connector() {
165        let connector = SourceOnlyConnector;
166        let def = connector.source_def();
167        assert_eq!(def.id, "source-only");
168        assert_eq!(def.scope, ConnectorScope::Source);
169
170        // validate_source has default implementation
171        assert!(connector.validate_source(&def).is_ok());
172    }
173
174    // Test that SinkDefProvider can be implemented independently
175    struct SinkOnlyConnector;
176
177    impl SinkDefProvider for SinkOnlyConnector {
178        fn sink_def(&self) -> ConnectorDef {
179            ConnectorDef {
180                id: "sink-only".into(),
181                kind: "test".into(),
182                scope: ConnectorScope::Sink,
183                allow_override: vec![],
184                default_params: Default::default(),
185                origin: None,
186            }
187        }
188    }
189
190    #[test]
191    fn test_sink_only_connector() {
192        let connector = SinkOnlyConnector;
193        let def = connector.sink_def();
194        assert_eq!(def.id, "sink-only");
195        assert_eq!(def.scope, ConnectorScope::Sink);
196
197        // validate_sink has default implementation
198        assert!(connector.validate_sink(&def).is_ok());
199    }
200}