wp_connector_api/runtime/cnn.rs
1use serde::{Deserialize, Serialize};
2
3use crate::ParamMap;
4
5/// Defines whether a connector operates as a data source or sink.
6#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
7pub enum ConnectorScope {
8 /// Connector reads data (default)
9 #[default]
10 Source,
11 /// Connector writes data
12 Sink,
13}
14
15/// Connector definition containing metadata and default configuration.
16///
17/// This struct is used to describe a connector's capabilities and defaults.
18/// It can be serialized/deserialized for configuration files.
19///
20/// # Serialization
21/// - `kind` is serialized as `"type"`
22/// - `default_params` is serialized as `"params"`
23/// - `scope` and `origin` are runtime-only fields (not serialized)
24#[derive(Debug, Clone, Deserialize, Serialize)]
25pub struct ConnectorDef {
26 /// Unique identifier for this connector instance
27 pub id: String,
28 /// Connector type (e.g., "kafka", "mysql", "elasticsearch")
29 #[serde(rename = "type")]
30 pub kind: String,
31 /// Whether this definition is for source or sink (runtime only)
32 #[serde(skip, default)]
33 pub scope: ConnectorScope,
34 /// Parameter keys that can be overridden at runtime
35 #[serde(default)]
36 pub allow_override: Vec<String>,
37 /// Default parameter values
38 #[serde(default, rename = "params")]
39 pub default_params: ParamMap,
40 /// Origin identifier for tracking (runtime only)
41 #[serde(skip, default)]
42 pub origin: Option<String>,
43}
44
45impl ConnectorDef {
46 /// Set the scope and return self for chaining.
47 pub fn with_scope(mut self, scope: ConnectorScope) -> Self {
48 self.scope = scope;
49 self
50 }
51}
52
53/// Trait for connectors that can act as a data source.
54///
55/// Implement this trait to provide source connector metadata and validation.
56/// Used by [`SourceFactory`] to obtain connector definitions.
57///
58/// # Example
59/// ```ignore
60/// impl SourceDefProvider for MyConnector {
61/// fn source_def(&self) -> ConnectorDef {
62/// ConnectorDef {
63/// id: "my-source".into(),
64/// kind: "custom".into(),
65/// scope: ConnectorScope::Source,
66/// allow_override: vec!["batch_size".into()],
67/// default_params: Default::default(),
68/// origin: None,
69/// }
70/// }
71/// }
72/// ```
73pub trait SourceDefProvider: Send + Sync + 'static {
74 /// Returns the connector definition for source mode.
75 fn source_def(&self) -> ConnectorDef;
76
77 fn source_defs(&self) -> Vec<ConnectorDef> {
78 [self.source_def()].into()
79 }
80 /// Validates a source connector definition.
81 ///
82 /// Override to add custom validation logic. Returns `Ok(())` by default.
83 fn validate_source(&self, _def: &ConnectorDef) -> Result<(), String> {
84 Ok(())
85 }
86}
87
88/// Trait for connectors that can act as a data sink.
89///
90/// Implement this trait to provide sink connector metadata and validation.
91/// Used by [`SinkFactory`] to obtain connector definitions.
92///
93/// # Example
94/// ```ignore
95/// impl SinkDefProvider for MyConnector {
96/// fn sink_def(&self) -> ConnectorDef {
97/// ConnectorDef {
98/// id: "my-sink".into(),
99/// kind: "custom".into(),
100/// scope: ConnectorScope::Sink,
101/// allow_override: vec![],
102/// default_params: Default::default(),
103/// origin: None,
104/// }
105/// }
106/// }
107/// ```
108pub trait SinkDefProvider: Send + Sync + 'static {
109 /// Returns the connector definition for sink mode.
110 fn sink_def(&self) -> ConnectorDef;
111
112 fn sink_defs(&self) -> Vec<ConnectorDef> {
113 [self.sink_def()].into()
114 }
115 /// Validates a sink connector definition.
116 ///
117 /// Override to add custom validation logic. Returns `Ok(())` by default.
118 fn validate_sink(&self, _def: &ConnectorDef) -> Result<(), String> {
119 Ok(())
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126
127 #[test]
128 fn test_connector_scope_default() {
129 let scope = ConnectorScope::default();
130 assert_eq!(scope, ConnectorScope::Source);
131 }
132
133 #[test]
134 fn test_connector_def_serde_and_with_scope() {
135 let json = r#"{"id": "mysql-prod", "type": "mysql", "params": {"host": "localhost"}}"#;
136 let def: ConnectorDef = serde_json::from_str(json).unwrap();
137
138 assert_eq!(def.id, "mysql-prod");
139 assert_eq!(def.kind, "mysql");
140 assert_eq!(def.scope, ConnectorScope::Source); // default
141 assert_eq!(def.default_params.get("host").unwrap(), "localhost");
142
143 let def = def.with_scope(ConnectorScope::Sink);
144 assert_eq!(def.scope, ConnectorScope::Sink);
145 }
146
147 // Test that SourceDefProvider can be implemented independently
148 struct SourceOnlyConnector;
149
150 impl SourceDefProvider for SourceOnlyConnector {
151 fn source_def(&self) -> ConnectorDef {
152 ConnectorDef {
153 id: "source-only".into(),
154 kind: "test".into(),
155 scope: ConnectorScope::Source,
156 allow_override: vec![],
157 default_params: Default::default(),
158 origin: None,
159 }
160 }
161 }
162
163 #[test]
164 fn test_source_only_connector() {
165 let connector = SourceOnlyConnector;
166 let def = connector.source_def();
167 assert_eq!(def.id, "source-only");
168 assert_eq!(def.scope, ConnectorScope::Source);
169
170 // validate_source has default implementation
171 assert!(connector.validate_source(&def).is_ok());
172 }
173
174 // Test that SinkDefProvider can be implemented independently
175 struct SinkOnlyConnector;
176
177 impl SinkDefProvider for SinkOnlyConnector {
178 fn sink_def(&self) -> ConnectorDef {
179 ConnectorDef {
180 id: "sink-only".into(),
181 kind: "test".into(),
182 scope: ConnectorScope::Sink,
183 allow_override: vec![],
184 default_params: Default::default(),
185 origin: None,
186 }
187 }
188 }
189
190 #[test]
191 fn test_sink_only_connector() {
192 let connector = SinkOnlyConnector;
193 let def = connector.sink_def();
194 assert_eq!(def.id, "sink-only");
195 assert_eq!(def.scope, ConnectorScope::Sink);
196
197 // validate_sink has default implementation
198 assert!(connector.validate_sink(&def).is_ok());
199 }
200}