danube_connect_core/
traits.rs

1//! Connector trait definitions.
2
3use crate::{
4    ConnectorConfig, ConnectorResult, ConsumerConfig, ProducerConfig, SinkRecord, SourceRecord,
5};
6use async_trait::async_trait;
7
8/// Trait for implementing Sink Connectors (Danube → External System)
9///
10/// Sink connectors consume messages from Danube topics and write them to external systems.
11///
12/// # Example
13///
14/// ```rust,no_run
15/// use danube_connect_core::{SinkConnector, SinkRecord, ConnectorConfig, ConnectorResult, ConsumerConfig, SubscriptionType};
16/// use async_trait::async_trait;
17/// use std::env;
18///
19/// pub struct HttpSink {
20///     target_url: String,
21/// }
22///
23/// #[async_trait]
24/// impl SinkConnector for HttpSink {
25///     async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()> {
26///         // Connectors now manage their own config
27///         self.target_url = env::var("TARGET_URL")
28///             .map_err(|_| danube_connect_core::ConnectorError::config("TARGET_URL required"))?;
29///         Ok(())
30///     }
31///     
32///     async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>> {
33///         Ok(vec![ConsumerConfig {
34///             topic: "/default/http-topic".to_string(),
35///             consumer_name: "http-sink-consumer".to_string(),
36///             subscription: "http-sink-sub".to_string(),
37///             subscription_type: SubscriptionType::Exclusive,
38///             expected_schema_subject: None,
39///         }])
40///     }
41///     
42///     async fn process(&mut self, record: SinkRecord) -> ConnectorResult<()> {
43///         // Send HTTP POST request with self.target_url
44///         Ok(())
45///     }
46/// }
47/// ```
48#[async_trait]
49pub trait SinkConnector: Send + Sync {
50    /// Initialize the connector with configuration
51    ///
52    /// This method is called once at startup before any message processing begins.
53    /// Use it to:
54    /// - Load connector-specific configuration
55    /// - Establish connections to external systems
56    /// - Validate credentials and connectivity
57    /// - Initialize internal state
58    ///
59    /// # Errors
60    ///
61    /// Return `ConnectorError::Configuration` for configuration issues
62    /// Return `ConnectorError::Fatal` for initialization failures
63    async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()>;
64
65    /// Get consumer configurations for topics to consume from
66    ///
67    /// This method should return a list of consumer configurations, one for each
68    /// Danube topic the connector wants to consume from. The runtime will create
69    /// and manage these consumers automatically.
70    ///
71    /// # Returns
72    ///
73    /// A vector of `ConsumerConfig` specifying which topics to consume from and
74    /// their subscription settings.
75    ///
76    /// # Note
77    ///
78    /// This method is called after `initialize()` and before message processing begins.
79    async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>>;
80
81    /// Process a single message from Danube
82    ///
83    /// This method is called for each message received from the Danube topic.
84    ///
85    /// # Return Value
86    ///
87    /// - `Ok(())`: Message processed successfully, will be acknowledged
88    /// - `Err(ConnectorError::Retryable)`: Transient failure, message will be retried
89    /// - `Err(ConnectorError::Fatal)`: Permanent failure, connector will stop
90    /// - `Err(ConnectorError::InvalidData)`: Bad message, will be skipped or sent to DLQ
91    ///
92    /// # Errors
93    ///
94    /// Return appropriate error type based on the failure scenario
95    async fn process(&mut self, record: SinkRecord) -> ConnectorResult<()>;
96
97    /// Process a batch of messages for better throughput
98    ///
99    /// Override this method to implement batch processing for better performance.
100    /// The default implementation calls `process()` for each record sequentially.
101    ///
102    /// # Example
103    ///
104    /// ```rust,no_run
105    /// # use danube_connect_core::{SinkConnector, SinkRecord, ConnectorConfig, ConnectorResult, ConsumerConfig, SubscriptionType};
106    /// # use async_trait::async_trait;
107    /// # struct MyConnector;
108    /// # #[async_trait]
109    /// # impl SinkConnector for MyConnector {
110    /// #     async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()> { Ok(()) }
111    /// #     async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>> {
112    /// #         Ok(vec![ConsumerConfig {
113    /// #             topic: "/default/test".to_string(),
114    /// #             consumer_name: "test-consumer".to_string(),
115    /// #             subscription: "test-sub".to_string(),
116    /// #             subscription_type: SubscriptionType::Exclusive,
117    /// #             expected_schema_subject: None,
118    /// #         }])
119    /// #     }
120    /// #     async fn process(&mut self, record: SinkRecord) -> ConnectorResult<()> { Ok(()) }
121    /// async fn process_batch(&mut self, records: Vec<SinkRecord>) -> ConnectorResult<()> {
122    ///     // Bulk insert all records in one operation
123    ///     // self.database.bulk_insert(&records).await?;
124    ///     Ok(())
125    /// }
126    /// # }
127    /// ```
128    async fn process_batch(&mut self, records: Vec<SinkRecord>) -> ConnectorResult<()> {
129        for record in records {
130            self.process(record).await?;
131        }
132        Ok(())
133    }
134
135    /// Optional: Called before shutdown for cleanup
136    ///
137    /// Use this to:
138    /// - Flush any pending writes
139    /// - Close connections gracefully
140    /// - Save checkpoints
141    /// - Clean up resources
142    async fn shutdown(&mut self) -> ConnectorResult<()> {
143        Ok(())
144    }
145
146    /// Optional: Health check implementation
147    ///
148    /// This method is called periodically to verify the connector is healthy.
149    /// Check connectivity to external systems and return an error if unhealthy.
150    async fn health_check(&self) -> ConnectorResult<()> {
151        Ok(())
152    }
153}
154
155/// Trait for implementing Source Connectors (External System → Danube)
156///
157/// Source connectors read data from external systems and publish to Danube topics.
158///
159/// # Example
160///
161/// ```rust,no_run
162/// use danube_connect_core::{SourceConnector, SourceRecord, ConnectorConfig, ConnectorResult, Offset, ProducerConfig};
163/// use async_trait::async_trait;
164/// use std::env;
165///
166/// pub struct FileSource {
167///     file_path: String,
168///     position: u64,
169/// }
170///
171/// #[async_trait]
172/// impl SourceConnector for FileSource {
173///     async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()> {
174///         // Connectors now manage their own config
175///         self.file_path = env::var("FILE_PATH")
176///             .map_err(|_| danube_connect_core::ConnectorError::config("FILE_PATH required"))?;
177///         Ok(())
178///     }
179///     
180///     async fn producer_configs(&self) -> ConnectorResult<Vec<ProducerConfig>> {
181///         // Define destination topics and their configurations
182///         Ok(vec![
183///             ProducerConfig::new("/default/file_data", 0, false)
184///         ])
185///     }
186///     
187///     async fn poll(&mut self) -> ConnectorResult<Vec<SourceRecord>> {
188///         // Read new lines from file at self.file_path
189///         Ok(vec![])
190///     }
191///     
192///     async fn commit(&mut self, offsets: Vec<Offset>) -> ConnectorResult<()> {
193///         // Save file position
194///         Ok(())
195///     }
196/// }
197/// ```
198#[async_trait]
199pub trait SourceConnector: Send + Sync {
200    /// Initialize the connector with configuration
201    ///
202    /// This method is called once at startup. Use it to:
203    /// - Load configuration
204    /// - Establish connections
205    /// - Load last checkpoint/offset
206    /// - Initialize internal state
207    async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()>;
208
209    /// Get producer configurations for all topics this connector will publish to
210    ///
211    /// This method should return the complete list of Danube topics and their
212    /// configurations (partitions, reliable dispatch) that this connector will use.
213    /// The runtime will create all producers upfront based on this configuration.
214    ///
215    /// The connector doesn't need to know about producers - it just returns
216    /// SourceRecords with topic information, and the runtime maps them to the
217    /// appropriate pre-created producer.
218    ///
219    /// # Returns
220    ///
221    /// Vector of `ProducerConfig` objects, one for each destination topic.
222    async fn producer_configs(&self) -> ConnectorResult<Vec<ProducerConfig>>;
223
224    /// Poll for new data from the external system
225    ///
226    /// This method is called repeatedly in a loop. Return:
227    /// - Non-empty vector of records when data is available
228    /// - Empty vector when no data is available (non-blocking)
229    ///
230    /// The runtime will handle publishing records to Danube and calling `commit()`
231    /// after successful acknowledgment.
232    ///
233    /// # Errors
234    ///
235    /// Return `ConnectorError::Retryable` for transient failures
236    /// Return `ConnectorError::Fatal` to stop the connector
237    async fn poll(&mut self) -> ConnectorResult<Vec<SourceRecord>>;
238
239    /// Optional: Commit offset/checkpoint after successful publish
240    ///
241    /// This method is called by the runtime after messages are successfully
242    /// published and acknowledged by Danube. Use it to save checkpoints or
243    /// acknowledge messages in the source system.
244    ///
245    /// # Arguments
246    ///
247    /// * `offsets` - List of offsets that were successfully published
248    async fn commit(&mut self, offsets: Vec<Offset>) -> ConnectorResult<()> {
249        let _ = offsets; // Suppress unused warning
250        Ok(())
251    }
252
253    /// Optional: Called before shutdown
254    async fn shutdown(&mut self) -> ConnectorResult<()> {
255        Ok(())
256    }
257
258    /// Optional: Health check implementation
259    async fn health_check(&self) -> ConnectorResult<()> {
260        Ok(())
261    }
262}
263
264/// Checkpoint/offset information for source connectors
265///
266/// **Mandatory public API** - used by source connectors for checkpointing.
267#[derive(Debug, Clone, PartialEq, Eq)]
268pub struct Offset {
269    /// The partition or source identifier
270    pub partition: String,
271    /// The offset value (interpretation depends on source)
272    pub value: u64,
273    /// Optional metadata
274    pub metadata: Option<String>,
275}
276
277impl Offset {
278    /// Create a new offset
279    pub fn new(partition: impl Into<String>, value: u64) -> Self {
280        Self {
281            partition: partition.into(),
282            value,
283            metadata: None,
284        }
285    }
286
287    /// Create an offset with metadata
288    pub fn with_metadata(
289        partition: impl Into<String>,
290        value: u64,
291        metadata: impl Into<String>,
292    ) -> Self {
293        Self {
294            partition: partition.into(),
295            value,
296            metadata: Some(metadata.into()),
297        }
298    }
299}
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304
305    #[test]
306    fn test_offset_creation() {
307        let offset = Offset::new("partition-0", 42);
308        assert_eq!(offset.partition, "partition-0");
309        assert_eq!(offset.value, 42);
310        assert!(offset.metadata.is_none());
311
312        let offset_with_meta = Offset::with_metadata("partition-1", 100, "some-metadata");
313        assert_eq!(offset_with_meta.metadata, Some("some-metadata".to_string()));
314    }
315}