danube_connect_core/traits.rs
1//! Connector trait definitions.
2
3use crate::{
4 ConnectorConfig, ConnectorResult, ConsumerConfig, ProducerConfig, SinkRecord, SourceRecord,
5};
6use async_trait::async_trait;
7
8/// Trait for implementing Sink Connectors (Danube → External System)
9///
10/// Sink connectors consume messages from Danube topics and write them to external systems.
11///
12/// # Example
13///
14/// ```rust,no_run
15/// use danube_connect_core::{SinkConnector, SinkRecord, ConnectorConfig, ConnectorResult, ConsumerConfig, SubscriptionType};
16/// use async_trait::async_trait;
17/// use std::env;
18///
19/// pub struct HttpSink {
20/// target_url: String,
21/// }
22///
23/// #[async_trait]
24/// impl SinkConnector for HttpSink {
25/// async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()> {
26/// // Connectors now manage their own config
27/// self.target_url = env::var("TARGET_URL")
28/// .map_err(|_| danube_connect_core::ConnectorError::config("TARGET_URL required"))?;
29/// Ok(())
30/// }
31///
32/// async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>> {
33/// Ok(vec![ConsumerConfig {
34/// topic: "/default/http-topic".to_string(),
35/// consumer_name: "http-sink-consumer".to_string(),
36/// subscription: "http-sink-sub".to_string(),
37/// subscription_type: SubscriptionType::Exclusive,
38/// expected_schema_subject: None,
39/// }])
40/// }
41///
42/// async fn process(&mut self, record: SinkRecord) -> ConnectorResult<()> {
43/// // Send HTTP POST request with self.target_url
44/// Ok(())
45/// }
46/// }
47/// ```
48#[async_trait]
49pub trait SinkConnector: Send + Sync {
50 /// Initialize the connector with configuration
51 ///
52 /// This method is called once at startup before any message processing begins.
53 /// Use it to:
54 /// - Load connector-specific configuration
55 /// - Establish connections to external systems
56 /// - Validate credentials and connectivity
57 /// - Initialize internal state
58 ///
59 /// # Errors
60 ///
61 /// Return `ConnectorError::Configuration` for configuration issues
62 /// Return `ConnectorError::Fatal` for initialization failures
63 async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()>;
64
65 /// Get consumer configurations for topics to consume from
66 ///
67 /// This method should return a list of consumer configurations, one for each
68 /// Danube topic the connector wants to consume from. The runtime will create
69 /// and manage these consumers automatically.
70 ///
71 /// # Returns
72 ///
73 /// A vector of `ConsumerConfig` specifying which topics to consume from and
74 /// their subscription settings.
75 ///
76 /// # Note
77 ///
78 /// This method is called after `initialize()` and before message processing begins.
79 async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>>;
80
81 /// Process a single message from Danube
82 ///
83 /// This method is called for each message received from the Danube topic.
84 ///
85 /// # Return Value
86 ///
87 /// - `Ok(())`: Message processed successfully, will be acknowledged
88 /// - `Err(ConnectorError::Retryable)`: Transient failure, message will be retried
89 /// - `Err(ConnectorError::Fatal)`: Permanent failure, connector will stop
90 /// - `Err(ConnectorError::InvalidData)`: Bad message, will be skipped or sent to DLQ
91 ///
92 /// # Errors
93 ///
94 /// Return appropriate error type based on the failure scenario
95 async fn process(&mut self, record: SinkRecord) -> ConnectorResult<()>;
96
97 /// Process a batch of messages for better throughput
98 ///
99 /// Override this method to implement batch processing for better performance.
100 /// The default implementation calls `process()` for each record sequentially.
101 ///
102 /// # Example
103 ///
104 /// ```rust,no_run
105 /// # use danube_connect_core::{SinkConnector, SinkRecord, ConnectorConfig, ConnectorResult, ConsumerConfig, SubscriptionType};
106 /// # use async_trait::async_trait;
107 /// # struct MyConnector;
108 /// # #[async_trait]
109 /// # impl SinkConnector for MyConnector {
110 /// # async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()> { Ok(()) }
111 /// # async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>> {
112 /// # Ok(vec![ConsumerConfig {
113 /// # topic: "/default/test".to_string(),
114 /// # consumer_name: "test-consumer".to_string(),
115 /// # subscription: "test-sub".to_string(),
116 /// # subscription_type: SubscriptionType::Exclusive,
117 /// # expected_schema_subject: None,
118 /// # }])
119 /// # }
120 /// # async fn process(&mut self, record: SinkRecord) -> ConnectorResult<()> { Ok(()) }
121 /// async fn process_batch(&mut self, records: Vec<SinkRecord>) -> ConnectorResult<()> {
122 /// // Bulk insert all records in one operation
123 /// // self.database.bulk_insert(&records).await?;
124 /// Ok(())
125 /// }
126 /// # }
127 /// ```
128 async fn process_batch(&mut self, records: Vec<SinkRecord>) -> ConnectorResult<()> {
129 for record in records {
130 self.process(record).await?;
131 }
132 Ok(())
133 }
134
135 /// Optional: Called before shutdown for cleanup
136 ///
137 /// Use this to:
138 /// - Flush any pending writes
139 /// - Close connections gracefully
140 /// - Save checkpoints
141 /// - Clean up resources
142 async fn shutdown(&mut self) -> ConnectorResult<()> {
143 Ok(())
144 }
145
146 /// Optional: Health check implementation
147 ///
148 /// This method is called periodically to verify the connector is healthy.
149 /// Check connectivity to external systems and return an error if unhealthy.
150 async fn health_check(&self) -> ConnectorResult<()> {
151 Ok(())
152 }
153}
154
155/// Trait for implementing Source Connectors (External System → Danube)
156///
157/// Source connectors read data from external systems and publish to Danube topics.
158///
159/// # Example
160///
161/// ```rust,no_run
162/// use danube_connect_core::{SourceConnector, SourceRecord, ConnectorConfig, ConnectorResult, Offset, ProducerConfig};
163/// use async_trait::async_trait;
164/// use std::env;
165///
166/// pub struct FileSource {
167/// file_path: String,
168/// position: u64,
169/// }
170///
171/// #[async_trait]
172/// impl SourceConnector for FileSource {
173/// async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()> {
174/// // Connectors now manage their own config
175/// self.file_path = env::var("FILE_PATH")
176/// .map_err(|_| danube_connect_core::ConnectorError::config("FILE_PATH required"))?;
177/// Ok(())
178/// }
179///
180/// async fn producer_configs(&self) -> ConnectorResult<Vec<ProducerConfig>> {
181/// // Define destination topics and their configurations
182/// Ok(vec![
183/// ProducerConfig::new("/default/file_data", 0, false)
184/// ])
185/// }
186///
187/// async fn poll(&mut self) -> ConnectorResult<Vec<SourceRecord>> {
188/// // Read new lines from file at self.file_path
189/// Ok(vec![])
190/// }
191///
192/// async fn commit(&mut self, offsets: Vec<Offset>) -> ConnectorResult<()> {
193/// // Save file position
194/// Ok(())
195/// }
196/// }
197/// ```
198#[async_trait]
199pub trait SourceConnector: Send + Sync {
200 /// Initialize the connector with configuration
201 ///
202 /// This method is called once at startup. Use it to:
203 /// - Load configuration
204 /// - Establish connections
205 /// - Load last checkpoint/offset
206 /// - Initialize internal state
207 async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()>;
208
209 /// Get producer configurations for all topics this connector will publish to
210 ///
211 /// This method should return the complete list of Danube topics and their
212 /// configurations (partitions, reliable dispatch) that this connector will use.
213 /// The runtime will create all producers upfront based on this configuration.
214 ///
215 /// The connector doesn't need to know about producers - it just returns
216 /// SourceRecords with topic information, and the runtime maps them to the
217 /// appropriate pre-created producer.
218 ///
219 /// # Returns
220 ///
221 /// Vector of `ProducerConfig` objects, one for each destination topic.
222 async fn producer_configs(&self) -> ConnectorResult<Vec<ProducerConfig>>;
223
224 /// Poll for new data from the external system
225 ///
226 /// This method is called repeatedly in a loop. Return:
227 /// - Non-empty vector of records when data is available
228 /// - Empty vector when no data is available (non-blocking)
229 ///
230 /// The runtime will handle publishing records to Danube and calling `commit()`
231 /// after successful acknowledgment.
232 ///
233 /// # Errors
234 ///
235 /// Return `ConnectorError::Retryable` for transient failures
236 /// Return `ConnectorError::Fatal` to stop the connector
237 async fn poll(&mut self) -> ConnectorResult<Vec<SourceRecord>>;
238
239 /// Optional: Commit offset/checkpoint after successful publish
240 ///
241 /// This method is called by the runtime after messages are successfully
242 /// published and acknowledged by Danube. Use it to save checkpoints or
243 /// acknowledge messages in the source system.
244 ///
245 /// # Arguments
246 ///
247 /// * `offsets` - List of offsets that were successfully published
248 async fn commit(&mut self, offsets: Vec<Offset>) -> ConnectorResult<()> {
249 let _ = offsets; // Suppress unused warning
250 Ok(())
251 }
252
253 /// Optional: Called before shutdown
254 async fn shutdown(&mut self) -> ConnectorResult<()> {
255 Ok(())
256 }
257
258 /// Optional: Health check implementation
259 async fn health_check(&self) -> ConnectorResult<()> {
260 Ok(())
261 }
262}
263
264/// Checkpoint/offset information for source connectors
265///
266/// **Mandatory public API** - used by source connectors for checkpointing.
267#[derive(Debug, Clone, PartialEq, Eq)]
268pub struct Offset {
269 /// The partition or source identifier
270 pub partition: String,
271 /// The offset value (interpretation depends on source)
272 pub value: u64,
273 /// Optional metadata
274 pub metadata: Option<String>,
275}
276
277impl Offset {
278 /// Create a new offset
279 pub fn new(partition: impl Into<String>, value: u64) -> Self {
280 Self {
281 partition: partition.into(),
282 value,
283 metadata: None,
284 }
285 }
286
287 /// Create an offset with metadata
288 pub fn with_metadata(
289 partition: impl Into<String>,
290 value: u64,
291 metadata: impl Into<String>,
292 ) -> Self {
293 Self {
294 partition: partition.into(),
295 value,
296 metadata: Some(metadata.into()),
297 }
298 }
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304
305 #[test]
306 fn test_offset_creation() {
307 let offset = Offset::new("partition-0", 42);
308 assert_eq!(offset.partition, "partition-0");
309 assert_eq!(offset.value, 42);
310 assert!(offset.metadata.is_none());
311
312 let offset_with_meta = Offset::with_metadata("partition-1", 100, "some-metadata");
313 assert_eq!(offset_with_meta.metadata, Some("some-metadata".to_string()));
314 }
315}