wp_connector_api/runtime/cnn.rs
1use serde::{Deserialize, Serialize};
2
3use crate::ParamMap;
4
5/// Defines whether a connector operates as a data source or sink.
6#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
7pub enum ConnectorScope {
8 /// Connector reads data (default)
9 #[default]
10 Source,
11 /// Connector writes data
12 Sink,
13}
14
15/// Connector definition containing metadata and default configuration.
16///
17/// This struct is used to describe a connector's capabilities and defaults.
18/// It can be serialized/deserialized for configuration files.
19///
20/// # Serialization
21/// - `kind` is serialized as `"type"`
22/// - `default_params` is serialized as `"params"`
23/// - `scope` and `origin` are runtime-only fields (not serialized)
24#[derive(Debug, Clone, Deserialize, Serialize)]
25pub struct ConnectorDef {
26 /// Unique identifier for this connector instance
27 pub id: String,
28 /// Connector type (e.g., "kafka", "mysql", "elasticsearch")
29 #[serde(rename = "type")]
30 pub kind: String,
31 /// Whether this definition is for source or sink (runtime only)
32 #[serde(skip, default)]
33 pub scope: ConnectorScope,
34 /// Parameter keys that can be overridden at runtime
35 #[serde(default)]
36 pub allow_override: Vec<String>,
37 /// Default parameter values
38 #[serde(default, rename = "params")]
39 pub default_params: ParamMap,
40 /// Origin identifier for tracking (runtime only)
41 #[serde(skip, default)]
42 pub origin: Option<String>,
43}
44
45impl ConnectorDef {
46 /// Set the scope and return self for chaining.
47 pub fn with_scope(mut self, scope: ConnectorScope) -> Self {
48 self.scope = scope;
49 self
50 }
51}
52
53/// Trait for connectors that can act as a data source.
54///
55/// Implement this trait to provide source connector metadata and validation.
56/// Used by [`SourceFactory`] to obtain connector definitions.
57///
58/// # Example
59/// ```ignore
60/// impl SourceDefProvider for MyConnector {
61/// fn source_def(&self) -> ConnectorDef {
62/// ConnectorDef {
63/// id: "my-source".into(),
64/// kind: "custom".into(),
65/// scope: ConnectorScope::Source,
66/// allow_override: vec!["batch_size".into()],
67/// default_params: Default::default(),
68/// origin: None,
69/// }
70/// }
71/// }
72/// ```
73pub trait SourceDefProvider: Send + Sync + 'static {
74 /// Returns the connector definition for source mode.
75 fn source_def(&self) -> ConnectorDef;
76
77 /// Validates a source connector definition.
78 ///
79 /// Override to add custom validation logic. Returns `Ok(())` by default.
80 fn validate_source(&self, _def: &ConnectorDef) -> Result<(), String> {
81 Ok(())
82 }
83}
84
85/// Trait for connectors that can act as a data sink.
86///
87/// Implement this trait to provide sink connector metadata and validation.
88/// Used by [`SinkFactory`] to obtain connector definitions.
89///
90/// # Example
91/// ```ignore
92/// impl SinkDefProvider for MyConnector {
93/// fn sink_def(&self) -> ConnectorDef {
94/// ConnectorDef {
95/// id: "my-sink".into(),
96/// kind: "custom".into(),
97/// scope: ConnectorScope::Sink,
98/// allow_override: vec![],
99/// default_params: Default::default(),
100/// origin: None,
101/// }
102/// }
103/// }
104/// ```
105pub trait SinkDefProvider: Send + Sync + 'static {
106 /// Returns the connector definition for sink mode.
107 fn sink_def(&self) -> ConnectorDef;
108
109 /// Validates a sink connector definition.
110 ///
111 /// Override to add custom validation logic. Returns `Ok(())` by default.
112 fn validate_sink(&self, _def: &ConnectorDef) -> Result<(), String> {
113 Ok(())
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120
121 #[test]
122 fn test_connector_scope_default() {
123 let scope = ConnectorScope::default();
124 assert_eq!(scope, ConnectorScope::Source);
125 }
126
127 #[test]
128 fn test_connector_def_serde_and_with_scope() {
129 let json = r#"{"id": "mysql-prod", "type": "mysql", "params": {"host": "localhost"}}"#;
130 let def: ConnectorDef = serde_json::from_str(json).unwrap();
131
132 assert_eq!(def.id, "mysql-prod");
133 assert_eq!(def.kind, "mysql");
134 assert_eq!(def.scope, ConnectorScope::Source); // default
135 assert_eq!(def.default_params.get("host").unwrap(), "localhost");
136
137 let def = def.with_scope(ConnectorScope::Sink);
138 assert_eq!(def.scope, ConnectorScope::Sink);
139 }
140
141 // Test that SourceDefProvider can be implemented independently
142 struct SourceOnlyConnector;
143
144 impl SourceDefProvider for SourceOnlyConnector {
145 fn source_def(&self) -> ConnectorDef {
146 ConnectorDef {
147 id: "source-only".into(),
148 kind: "test".into(),
149 scope: ConnectorScope::Source,
150 allow_override: vec![],
151 default_params: Default::default(),
152 origin: None,
153 }
154 }
155 }
156
157 #[test]
158 fn test_source_only_connector() {
159 let connector = SourceOnlyConnector;
160 let def = connector.source_def();
161 assert_eq!(def.id, "source-only");
162 assert_eq!(def.scope, ConnectorScope::Source);
163
164 // validate_source has default implementation
165 assert!(connector.validate_source(&def).is_ok());
166 }
167
168 // Test that SinkDefProvider can be implemented independently
169 struct SinkOnlyConnector;
170
171 impl SinkDefProvider for SinkOnlyConnector {
172 fn sink_def(&self) -> ConnectorDef {
173 ConnectorDef {
174 id: "sink-only".into(),
175 kind: "test".into(),
176 scope: ConnectorScope::Sink,
177 allow_override: vec![],
178 default_params: Default::default(),
179 origin: None,
180 }
181 }
182 }
183
184 #[test]
185 fn test_sink_only_connector() {
186 let connector = SinkOnlyConnector;
187 let def = connector.sink_def();
188 assert_eq!(def.id, "sink-only");
189 assert_eq!(def.scope, ConnectorScope::Sink);
190
191 // validate_sink has default implementation
192 assert!(connector.validate_sink(&def).is_ok());
193 }
194}