danube_connect_core/traits.rs
1//! Connector trait definitions.
2
3use crate::{
4 ConnectorConfig, ConnectorError, ConnectorResult, ConsumerConfig, ProducerConfig, SinkRecord,
5 SourceRecord,
6};
7use async_trait::async_trait;
8use tokio::sync::mpsc;
9
10/// Trait for implementing Sink Connectors (Danube → External System)
11///
12/// Sink connectors consume messages from Danube topics and write them to external systems.
13///
14/// # Example
15///
16/// ```rust,no_run
17/// use danube_connect_core::{SinkConnector, SinkRecord, ConnectorConfig, ConnectorResult, ConsumerConfig, SubscriptionType};
18/// use async_trait::async_trait;
19/// use std::env;
20///
21/// pub struct HttpSink {
22/// target_url: String,
23/// }
24///
25/// #[async_trait]
26/// impl SinkConnector for HttpSink {
27/// async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()> {
28/// // Connectors now manage their own config
29/// self.target_url = env::var("TARGET_URL")
30/// .map_err(|_| danube_connect_core::ConnectorError::config("TARGET_URL required"))?;
31/// Ok(())
32/// }
33///
34/// async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>> {
35/// Ok(vec![ConsumerConfig {
36/// topic: "/default/http-topic".to_string(),
37/// consumer_name: "http-sink-consumer".to_string(),
38/// subscription: "http-sink-sub".to_string(),
39/// subscription_type: SubscriptionType::Exclusive,
40/// expected_schema_subject: None,
41/// }])
42/// }
43///
44/// async fn process_batch(&mut self, records: Vec<SinkRecord>) -> ConnectorResult<()> {
45/// // Send a batch to the external system
46/// for record in records {
47/// let _ = record;
48/// }
49/// Ok(())
50/// }
51/// }
52/// ```
53#[async_trait]
54pub trait SinkConnector: Send + Sync {
55 /// Initialize the connector with configuration
56 ///
57 /// This method is called once at startup before any message processing begins.
58 /// Use it to:
59 /// - Load connector-specific configuration
60 /// - Establish connections to external systems
61 /// - Validate credentials and connectivity
62 /// - Initialize internal state
63 ///
64 /// # Errors
65 ///
66 /// Return `ConnectorError::Configuration` for configuration issues
67 /// Return `ConnectorError::Fatal` for initialization failures
68 async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()>;
69
70 /// Get consumer configurations for topics to consume from
71 ///
72 /// This method should return a list of consumer configurations, one for each
73 /// Danube topic the connector wants to consume from. The runtime will create
74 /// and manage these consumers automatically.
75 ///
76 /// # Returns
77 ///
78 /// A vector of `ConsumerConfig` specifying which topics to consume from and
79 /// their subscription settings.
80 ///
81 /// # Note
82 ///
83 /// This method is called after `initialize()` and before message processing begins.
84 async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>>;
85
86 /// Process a batch of messages from Danube
87 ///
88 /// This method is called by the runtime whenever the configured batch size or
89 /// batch timeout is reached. Connectors should treat this as the primary sink
90 /// write path.
91 ///
92 /// # Return Value
93 ///
94 /// - `Ok(())`: Batch processed successfully, messages will be acknowledged
95 /// - `Err(ConnectorError::Retryable)`: Transient failure, batch will be retried
96 /// - `Err(ConnectorError::Fatal)`: Permanent failure, connector will stop
97 /// - `Err(ConnectorError::InvalidData)`: Invalid batch content or message data
98 ///
99 /// # Errors
100 ///
101 /// Return appropriate error type based on the failure scenario
102 async fn process_batch(&mut self, records: Vec<SinkRecord>) -> ConnectorResult<()>;
103
104 /// Optional: Called before shutdown for cleanup
105 ///
106 /// # Example
107 ///
108 /// ```rust,no_run
109 /// # use danube_connect_core::{SinkConnector, SinkRecord, ConnectorConfig, ConnectorResult, ConsumerConfig, SubscriptionType};
110 /// # use async_trait::async_trait;
111 /// # struct MyConnector;
112 /// # #[async_trait]
113 /// # impl SinkConnector for MyConnector {
114 /// # async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()> { Ok(()) }
115 /// # async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>> {
116 /// # Ok(vec![ConsumerConfig {
117 /// # topic: "/default/test".to_string(),
118 /// # consumer_name: "test-consumer".to_string(),
119 /// # subscription: "test-sub".to_string(),
120 /// # subscription_type: SubscriptionType::Exclusive,
121 /// # expected_schema_subject: None,
122 /// # }])
123 /// # }
124 /// async fn process_batch(&mut self, records: Vec<SinkRecord>) -> ConnectorResult<()> {
125 /// // Bulk insert all records in one operation
126 /// // self.database.bulk_insert(&records).await?;
127 /// let _ = records;
128 /// Ok(())
129 /// }
130 /// # }
131 /// ```
132 /// Use this to:
133 /// - Flush any pending writes
134 /// - Close connections gracefully
135 /// - Save checkpoints
136 /// - Clean up resources
137 async fn shutdown(&mut self) -> ConnectorResult<()> {
138 Ok(())
139 }
140
141 /// Optional: Health check implementation
142 /// This method is called periodically to verify the connector is healthy.
143 /// Check connectivity to external systems and return an error if unhealthy.
144 async fn health_check(&self) -> ConnectorResult<()> {
145 Ok(())
146 }
147}
148
149/// Trait for implementing Source Connectors (External System → Danube)
150///
151/// Source connectors read data from external systems and publish to Danube topics.
152///
153/// # Example
154///
155/// ```rust,no_run
156/// use danube_connect_core::{SourceConnector, SourceEnvelope, ConnectorConfig, ConnectorResult, Offset, ProducerConfig};
157/// use async_trait::async_trait;
158/// use std::env;
159///
160/// pub struct FileSource {
161/// file_path: String,
162/// position: u64,
163/// }
164///
165/// #[async_trait]
166/// impl SourceConnector for FileSource {
167/// async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()> {
168/// // Connectors now manage their own config
169/// self.file_path = env::var("FILE_PATH")
170/// .map_err(|_| danube_connect_core::ConnectorError::config("FILE_PATH required"))?;
171/// Ok(())
172/// }
173///
174/// async fn producer_configs(&self) -> ConnectorResult<Vec<ProducerConfig>> {
175/// // Define destination topics and their configurations
176/// Ok(vec![
177/// ProducerConfig::new("/default/file_data", 0, false)
178/// ])
179/// }
180///
181/// async fn poll(&mut self) -> ConnectorResult<Vec<SourceEnvelope>> {
182/// // Read new lines from file at self.file_path
183/// Ok(vec![])
184/// }
185///
186/// async fn commit(&mut self, offsets: Vec<Offset>) -> ConnectorResult<()> {
187/// // Save file position
188/// Ok(())
189/// }
190/// }
191/// ```
192#[async_trait]
193pub trait SourceConnector: Send + Sync {
194 /// Initialize the connector with configuration
195 ///
196 /// This method is called once at startup. Use it to:
197 /// - Load configuration
198 /// - Establish connections
199 /// - Load last checkpoint/offset
200 /// - Initialize internal state
201 async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()>;
202
203 /// Get producer configurations for all topics this connector will publish to
204 ///
205 /// This method should return the complete list of Danube topics and their
206 /// configurations (partitions, reliable dispatch) that this connector will use.
207 /// The runtime will create all producers upfront based on this configuration.
208 ///
209 /// The connector doesn't need to know about producers - it just returns
210 /// SourceRecords with topic information, and the runtime maps them to the
211 /// appropriate pre-created producer.
212 ///
213 /// # Returns
214 ///
215 /// Vector of `ProducerConfig` objects, one for each destination topic.
216 async fn producer_configs(&self) -> ConnectorResult<Vec<ProducerConfig>>;
217
218 /// Select the runtime mode used by this source connector.
219 ///
220 /// Return `SourceConnectorMode::Polling` for connectors that periodically
221 /// fetch data via `poll()`. Return `SourceConnectorMode::Streaming` for
222 /// connectors that push records asynchronously via `start_streaming()`.
223 fn mode(&self) -> SourceConnectorMode {
224 SourceConnectorMode::Polling
225 }
226
227 /// Start a streaming source connector.
228 ///
229 /// Connectors that return `SourceConnectorMode::Streaming` should override
230 /// this method and use the provided `SourceSender` to emit records as they
231 /// become available.
232 async fn start_streaming(&mut self, _sender: SourceSender) -> ConnectorResult<()> {
233 Err(ConnectorError::config(
234 "start_streaming() not implemented for this source connector",
235 ))
236 }
237
238 /// Poll for new data from the external system
239 ///
240 /// This method is called repeatedly in a loop. Return:
241 /// - Non-empty vector of envelopes when data is available
242 /// - Empty vector when no data is available (non-blocking)
243 ///
244 /// The runtime will handle publishing records to Danube and calling `commit()`
245 /// after successful acknowledgment.
246 ///
247 /// # Errors
248 ///
249 /// Return `ConnectorError::Retryable` for transient failures
250 /// Return `ConnectorError::Fatal` to stop the connector
251 async fn poll(&mut self) -> ConnectorResult<Vec<SourceEnvelope>> {
252 Err(ConnectorError::config(
253 "poll() not implemented for this source connector",
254 ))
255 }
256
257 /// Optional: Commit offset/checkpoint after successful publish
258 ///
259 /// This method is called by the runtime after messages are successfully
260 /// published and acknowledged by Danube. Use it to save checkpoints or
261 /// acknowledge messages in the source system.
262 ///
263 /// # Arguments
264 ///
265 /// * `offsets` - List of offsets that were successfully published
266 async fn commit(&mut self, offsets: Vec<Offset>) -> ConnectorResult<()> {
267 let _ = offsets; // Suppress unused warning
268 Ok(())
269 }
270
271 /// Optional: Called before shutdown
272 async fn shutdown(&mut self) -> ConnectorResult<()> {
273 Ok(())
274 }
275
276 /// Optional: Health check implementation
277 async fn health_check(&self) -> ConnectorResult<()> {
278 Ok(())
279 }
280}
281
282/// Execution model for a source connector.
283///
284/// This enum determines how the source connector interacts with the runtime.
285#[derive(Debug, Clone, Copy, PartialEq, Eq)]
286pub enum SourceConnectorMode {
287 /// The runtime repeatedly calls `poll()` to fetch new data.
288 Polling,
289 /// The connector pushes new data through `start_streaming()`.
290 Streaming,
291}
292
293/// Handle used by streaming connectors to emit records into the runtime.
294///
295/// This struct provides methods to send records to the runtime for publishing.
296#[derive(Clone)]
297pub struct SourceSender {
298 sender: mpsc::Sender<SourceEnvelope>,
299}
300
301impl SourceSender {
302 /// Create a new `SourceSender` instance.
303 pub(crate) fn new(sender: mpsc::Sender<SourceEnvelope>) -> Self {
304 Self { sender }
305 }
306
307 /// Send a single record envelope to the runtime.
308 ///
309 /// This method is used by streaming connectors to emit records as they become
310 /// available.
311 pub async fn send(&self, envelope: impl Into<SourceEnvelope>) -> ConnectorResult<()> {
312 self.sender
313 .send(envelope.into())
314 .await
315 .map_err(|e| ConnectorError::fatal(format!("failed to emit source envelope: {}", e)))
316 }
317
318 /// Send multiple record envelopes to the runtime in sequence.
319 ///
320 /// This method is used by streaming connectors to emit multiple records at
321 /// once.
322 pub async fn send_batch<I>(&self, envelopes: I) -> ConnectorResult<()>
323 where
324 I: IntoIterator,
325 I::Item: Into<SourceEnvelope>,
326 {
327 for envelope in envelopes {
328 self.send(envelope).await?;
329 }
330
331 Ok(())
332 }
333
334 /// Return `true` when the receiving side of the runtime channel is closed.
335 pub fn is_closed(&self) -> bool {
336 self.sender.is_closed()
337 }
338}
339
340/// A source record together with optional checkpoint information.
341///
342/// This struct represents a single record emitted by a source connector, along
343/// with optional checkpoint information.
344#[derive(Debug, Clone)]
345pub struct SourceEnvelope {
346 /// The record to publish to Danube.
347 pub record: SourceRecord,
348 /// Optional offset/checkpoint to commit after successful publish.
349 pub offset: Option<Offset>,
350}
351
352impl SourceEnvelope {
353 /// Create an envelope without checkpoint information.
354 pub fn new(record: SourceRecord) -> Self {
355 Self {
356 record,
357 offset: None,
358 }
359 }
360
361 /// Create an envelope with an offset to commit after successful delivery.
362 pub fn with_offset(record: SourceRecord, offset: Offset) -> Self {
363 Self {
364 record,
365 offset: Some(offset),
366 }
367 }
368
369 /// Borrow the record carried by this envelope.
370 pub fn record(&self) -> &SourceRecord {
371 &self.record
372 }
373
374 /// Borrow the offset carried by this envelope, if present.
375 pub fn offset(&self) -> Option<&Offset> {
376 self.offset.as_ref()
377 }
378
379 /// Split this envelope into its record and optional offset parts.
380 pub fn into_parts(self) -> (SourceRecord, Option<Offset>) {
381 (self.record, self.offset)
382 }
383}
384
385impl From<SourceRecord> for SourceEnvelope {
386 fn from(record: SourceRecord) -> Self {
387 Self::new(record)
388 }
389}
390
391/// Checkpoint/offset information for source connectors
392///
393/// **Mandatory public API** - used by source connectors for checkpointing.
394#[derive(Debug, Clone, PartialEq, Eq)]
395pub struct Offset {
396 /// The partition or source identifier
397 pub partition: String,
398 /// The offset value (interpretation depends on source)
399 pub value: u64,
400 /// Optional metadata
401 pub metadata: Option<String>,
402}
403
404impl Offset {
405 /// Create a new offset
406 pub fn new(partition: impl Into<String>, value: u64) -> Self {
407 Self {
408 partition: partition.into(),
409 value,
410 metadata: None,
411 }
412 }
413
414 /// Create an offset with metadata
415 pub fn with_metadata(
416 partition: impl Into<String>,
417 value: u64,
418 metadata: impl Into<String>,
419 ) -> Self {
420 Self {
421 partition: partition.into(),
422 value,
423 metadata: Some(metadata.into()),
424 }
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431
432 #[test]
433 fn test_offset_creation() {
434 let offset = Offset::new("partition-0", 42);
435 assert_eq!(offset.partition, "partition-0");
436 assert_eq!(offset.value, 42);
437 assert!(offset.metadata.is_none());
438
439 let offset_with_meta = Offset::with_metadata("partition-1", 100, "some-metadata");
440 assert_eq!(offset_with_meta.metadata, Some("some-metadata".to_string()));
441 }
442}