Skip to main content

danube_connect_core/
traits.rs

1//! Connector trait definitions.
2
3use crate::{
4    ConnectorConfig, ConnectorError, ConnectorResult, ConsumerConfig, ProducerConfig, SinkRecord,
5    SourceRecord,
6};
7use async_trait::async_trait;
8use tokio::sync::mpsc;
9
10/// Trait for implementing Sink Connectors (Danube → External System)
11///
12/// Sink connectors consume messages from Danube topics and write them to external systems.
13///
14/// # Example
15///
16/// ```rust,no_run
17/// use danube_connect_core::{SinkConnector, SinkRecord, ConnectorConfig, ConnectorResult, ConsumerConfig, SubscriptionType};
18/// use async_trait::async_trait;
19/// use std::env;
20///
21/// pub struct HttpSink {
22///     target_url: String,
23/// }
24///
25/// #[async_trait]
26/// impl SinkConnector for HttpSink {
27///     async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()> {
28///         // Connectors now manage their own config
29///         self.target_url = env::var("TARGET_URL")
30///             .map_err(|_| danube_connect_core::ConnectorError::config("TARGET_URL required"))?;
31///         Ok(())
32///     }
33///     
34///     async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>> {
35///         Ok(vec![ConsumerConfig {
36///             topic: "/default/http-topic".to_string(),
37///             consumer_name: "http-sink-consumer".to_string(),
38///             subscription: "http-sink-sub".to_string(),
39///             subscription_type: SubscriptionType::Exclusive,
40///             expected_schema_subject: None,
41///         }])
42///     }
43///     
44///     async fn process_batch(&mut self, records: Vec<SinkRecord>) -> ConnectorResult<()> {
45///         // Send a batch to the external system
46///         for record in records {
47///             let _ = record;
48///         }
49///         Ok(())
50///     }
51/// }
52/// ```
53#[async_trait]
54pub trait SinkConnector: Send + Sync {
55    /// Initialize the connector with configuration
56    ///
57    /// This method is called once at startup before any message processing begins.
58    /// Use it to:
59    /// - Load connector-specific configuration
60    /// - Establish connections to external systems
61    /// - Validate credentials and connectivity
62    /// - Initialize internal state
63    ///
64    /// # Errors
65    ///
66    /// Return `ConnectorError::Configuration` for configuration issues
67    /// Return `ConnectorError::Fatal` for initialization failures
68    async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()>;
69
70    /// Get consumer configurations for topics to consume from
71    ///
72    /// This method should return a list of consumer configurations, one for each
73    /// Danube topic the connector wants to consume from. The runtime will create
74    /// and manage these consumers automatically.
75    ///
76    /// # Returns
77    ///
78    /// A vector of `ConsumerConfig` specifying which topics to consume from and
79    /// their subscription settings.
80    ///
81    /// # Note
82    ///
83    /// This method is called after `initialize()` and before message processing begins.
84    async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>>;
85
86    /// Process a batch of messages from Danube
87    ///
88    /// This method is called by the runtime whenever the configured batch size or
89    /// batch timeout is reached. Connectors should treat this as the primary sink
90    /// write path.
91    ///
92    /// # Return Value
93    ///
94    /// - `Ok(())`: Batch processed successfully, messages will be acknowledged
95    /// - `Err(ConnectorError::Retryable)`: Transient failure, batch will be retried
96    /// - `Err(ConnectorError::Fatal)`: Permanent failure, connector will stop
97    /// - `Err(ConnectorError::InvalidData)`: Invalid batch content or message data
98    ///
99    /// # Errors
100    ///
101    /// Return appropriate error type based on the failure scenario
102    async fn process_batch(&mut self, records: Vec<SinkRecord>) -> ConnectorResult<()>;
103
104    /// Optional: Called before shutdown for cleanup
105    ///
106    /// # Example
107    ///
108    /// ```rust,no_run
109    /// # use danube_connect_core::{SinkConnector, SinkRecord, ConnectorConfig, ConnectorResult, ConsumerConfig, SubscriptionType};
110    /// # use async_trait::async_trait;
111    /// # struct MyConnector;
112    /// # #[async_trait]
113    /// # impl SinkConnector for MyConnector {
114    /// #     async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()> { Ok(()) }
115    /// #     async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>> {
116    /// #         Ok(vec![ConsumerConfig {
117    /// #             topic: "/default/test".to_string(),
118    /// #             consumer_name: "test-consumer".to_string(),
119    /// #             subscription: "test-sub".to_string(),
120    /// #             subscription_type: SubscriptionType::Exclusive,
121    /// #             expected_schema_subject: None,
122    /// #         }])
123    /// #     }
124    /// async fn process_batch(&mut self, records: Vec<SinkRecord>) -> ConnectorResult<()> {
125    ///     // Bulk insert all records in one operation
126    ///     // self.database.bulk_insert(&records).await?;
127    ///     let _ = records;
128    ///     Ok(())
129    /// }
130    /// # }
131    /// ```
132    /// Use this to:
133    /// - Flush any pending writes
134    /// - Close connections gracefully
135    /// - Save checkpoints
136    /// - Clean up resources
137    async fn shutdown(&mut self) -> ConnectorResult<()> {
138        Ok(())
139    }
140
141    /// Optional: Health check implementation
142    /// This method is called periodically to verify the connector is healthy.
143    /// Check connectivity to external systems and return an error if unhealthy.
144    async fn health_check(&self) -> ConnectorResult<()> {
145        Ok(())
146    }
147}
148
149/// Trait for implementing Source Connectors (External System → Danube)
150///
151/// Source connectors read data from external systems and publish to Danube topics.
152///
153/// # Example
154///
155/// ```rust,no_run
156/// use danube_connect_core::{SourceConnector, SourceEnvelope, ConnectorConfig, ConnectorResult, Offset, ProducerConfig};
157/// use async_trait::async_trait;
158/// use std::env;
159///
160/// pub struct FileSource {
161///     file_path: String,
162///     position: u64,
163/// }
164///
165/// #[async_trait]
166/// impl SourceConnector for FileSource {
167///     async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()> {
168///         // Connectors now manage their own config
169///         self.file_path = env::var("FILE_PATH")
170///             .map_err(|_| danube_connect_core::ConnectorError::config("FILE_PATH required"))?;
171///         Ok(())
172///     }
173///     
174///     async fn producer_configs(&self) -> ConnectorResult<Vec<ProducerConfig>> {
175///         // Define destination topics and their configurations
176///         Ok(vec![
177///             ProducerConfig::new("/default/file_data", 0, false)
178///         ])
179///     }
180///     
181///     async fn poll(&mut self) -> ConnectorResult<Vec<SourceEnvelope>> {
182///         // Read new lines from file at self.file_path
183///         Ok(vec![])
184///     }
185///     
186///     async fn commit(&mut self, offsets: Vec<Offset>) -> ConnectorResult<()> {
187///         // Save file position
188///         Ok(())
189///     }
190/// }
191/// ```
192#[async_trait]
193pub trait SourceConnector: Send + Sync {
194    /// Initialize the connector with configuration
195    ///
196    /// This method is called once at startup. Use it to:
197    /// - Load configuration
198    /// - Establish connections
199    /// - Load last checkpoint/offset
200    /// - Initialize internal state
201    async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()>;
202
203    /// Get producer configurations for all topics this connector will publish to
204    ///
205    /// This method should return the complete list of Danube topics and their
206    /// configurations (partitions, reliable dispatch) that this connector will use.
207    /// The runtime will create all producers upfront based on this configuration.
208    ///
209    /// The connector doesn't need to know about producers - it just returns
210    /// SourceRecords with topic information, and the runtime maps them to the
211    /// appropriate pre-created producer.
212    ///
213    /// # Returns
214    ///
215    /// Vector of `ProducerConfig` objects, one for each destination topic.
216    async fn producer_configs(&self) -> ConnectorResult<Vec<ProducerConfig>>;
217
218    /// Select the runtime mode used by this source connector.
219    ///
220    /// Return `SourceConnectorMode::Polling` for connectors that periodically
221    /// fetch data via `poll()`. Return `SourceConnectorMode::Streaming` for
222    /// connectors that push records asynchronously via `start_streaming()`.
223    fn mode(&self) -> SourceConnectorMode {
224        SourceConnectorMode::Polling
225    }
226
227    /// Start a streaming source connector.
228    ///
229    /// Connectors that return `SourceConnectorMode::Streaming` should override
230    /// this method and use the provided `SourceSender` to emit records as they
231    /// become available.
232    async fn start_streaming(&mut self, _sender: SourceSender) -> ConnectorResult<()> {
233        Err(ConnectorError::config(
234            "start_streaming() not implemented for this source connector",
235        ))
236    }
237
238    /// Poll for new data from the external system
239    ///
240    /// This method is called repeatedly in a loop. Return:
241    /// - Non-empty vector of envelopes when data is available
242    /// - Empty vector when no data is available (non-blocking)
243    ///
244    /// The runtime will handle publishing records to Danube and calling `commit()`
245    /// after successful acknowledgment.
246    ///
247    /// # Errors
248    ///
249    /// Return `ConnectorError::Retryable` for transient failures
250    /// Return `ConnectorError::Fatal` to stop the connector
251    async fn poll(&mut self) -> ConnectorResult<Vec<SourceEnvelope>> {
252        Err(ConnectorError::config(
253            "poll() not implemented for this source connector",
254        ))
255    }
256
257    /// Optional: Commit offset/checkpoint after successful publish
258    ///
259    /// This method is called by the runtime after messages are successfully
260    /// published and acknowledged by Danube. Use it to save checkpoints or
261    /// acknowledge messages in the source system.
262    ///
263    /// # Arguments
264    ///
265    /// * `offsets` - List of offsets that were successfully published
266    async fn commit(&mut self, offsets: Vec<Offset>) -> ConnectorResult<()> {
267        let _ = offsets; // Suppress unused warning
268        Ok(())
269    }
270
271    /// Optional: Called before shutdown
272    async fn shutdown(&mut self) -> ConnectorResult<()> {
273        Ok(())
274    }
275
276    /// Optional: Health check implementation
277    async fn health_check(&self) -> ConnectorResult<()> {
278        Ok(())
279    }
280}
281
282/// Execution model for a source connector.
283///
284/// This enum determines how the source connector interacts with the runtime.
285#[derive(Debug, Clone, Copy, PartialEq, Eq)]
286pub enum SourceConnectorMode {
287    /// The runtime repeatedly calls `poll()` to fetch new data.
288    Polling,
289    /// The connector pushes new data through `start_streaming()`.
290    Streaming,
291}
292
293/// Handle used by streaming connectors to emit records into the runtime.
294///
295/// This struct provides methods to send records to the runtime for publishing.
296#[derive(Clone)]
297pub struct SourceSender {
298    sender: mpsc::Sender<SourceEnvelope>,
299}
300
301impl SourceSender {
302    /// Create a new `SourceSender` instance.
303    pub(crate) fn new(sender: mpsc::Sender<SourceEnvelope>) -> Self {
304        Self { sender }
305    }
306
307    /// Send a single record envelope to the runtime.
308    ///
309    /// This method is used by streaming connectors to emit records as they become
310    /// available.
311    pub async fn send(&self, envelope: impl Into<SourceEnvelope>) -> ConnectorResult<()> {
312        self.sender
313            .send(envelope.into())
314            .await
315            .map_err(|e| ConnectorError::fatal(format!("failed to emit source envelope: {}", e)))
316    }
317
318    /// Send multiple record envelopes to the runtime in sequence.
319    ///
320    /// This method is used by streaming connectors to emit multiple records at
321    /// once.
322    pub async fn send_batch<I>(&self, envelopes: I) -> ConnectorResult<()>
323    where
324        I: IntoIterator,
325        I::Item: Into<SourceEnvelope>,
326    {
327        for envelope in envelopes {
328            self.send(envelope).await?;
329        }
330
331        Ok(())
332    }
333
334    /// Return `true` when the receiving side of the runtime channel is closed.
335    pub fn is_closed(&self) -> bool {
336        self.sender.is_closed()
337    }
338}
339
340/// A source record together with optional checkpoint information.
341///
342/// This struct represents a single record emitted by a source connector, along
343/// with optional checkpoint information.
344#[derive(Debug, Clone)]
345pub struct SourceEnvelope {
346    /// The record to publish to Danube.
347    pub record: SourceRecord,
348    /// Optional offset/checkpoint to commit after successful publish.
349    pub offset: Option<Offset>,
350}
351
352impl SourceEnvelope {
353    /// Create an envelope without checkpoint information.
354    pub fn new(record: SourceRecord) -> Self {
355        Self {
356            record,
357            offset: None,
358        }
359    }
360
361    /// Create an envelope with an offset to commit after successful delivery.
362    pub fn with_offset(record: SourceRecord, offset: Offset) -> Self {
363        Self {
364            record,
365            offset: Some(offset),
366        }
367    }
368
369    /// Borrow the record carried by this envelope.
370    pub fn record(&self) -> &SourceRecord {
371        &self.record
372    }
373
374    /// Borrow the offset carried by this envelope, if present.
375    pub fn offset(&self) -> Option<&Offset> {
376        self.offset.as_ref()
377    }
378
379    /// Split this envelope into its record and optional offset parts.
380    pub fn into_parts(self) -> (SourceRecord, Option<Offset>) {
381        (self.record, self.offset)
382    }
383}
384
385impl From<SourceRecord> for SourceEnvelope {
386    fn from(record: SourceRecord) -> Self {
387        Self::new(record)
388    }
389}
390
391/// Checkpoint/offset information for source connectors
392///
393/// **Mandatory public API** - used by source connectors for checkpointing.
394#[derive(Debug, Clone, PartialEq, Eq)]
395pub struct Offset {
396    /// The partition or source identifier
397    pub partition: String,
398    /// The offset value (interpretation depends on source)
399    pub value: u64,
400    /// Optional metadata
401    pub metadata: Option<String>,
402}
403
404impl Offset {
405    /// Create a new offset
406    pub fn new(partition: impl Into<String>, value: u64) -> Self {
407        Self {
408            partition: partition.into(),
409            value,
410            metadata: None,
411        }
412    }
413
414    /// Create an offset with metadata
415    pub fn with_metadata(
416        partition: impl Into<String>,
417        value: u64,
418        metadata: impl Into<String>,
419    ) -> Self {
420        Self {
421            partition: partition.into(),
422            value,
423            metadata: Some(metadata.into()),
424        }
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431
432    #[test]
433    fn test_offset_creation() {
434        let offset = Offset::new("partition-0", 42);
435        assert_eq!(offset.partition, "partition-0");
436        assert_eq!(offset.value, 42);
437        assert!(offset.metadata.is_none());
438
439        let offset_with_meta = Offset::with_metadata("partition-1", 100, "some-metadata");
440        assert_eq!(offset_with_meta.metadata, Some("some-metadata".to_string()));
441    }
442}