wp_connector_api/runtime/
sink.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use std::{path::PathBuf, sync::Arc};
4use wp_model_core::model::DataRecord;
5
6use crate::{SinkDefProvider, SinkResult};
7
8// Reuse workspace error type to avoid duplicating an error abstraction
9
10// ---------- Core Sink Traits ----------
11
12/// Runtime control trait for managing sink lifecycle.
13///
14/// Implementors must ensure all methods are **idempotent** - calling them
15/// multiple times should be safe and produce consistent results.
16///
17/// # Example
18/// ```ignore
19/// #[async_trait]
20/// impl AsyncCtrl for MySink {
21///     async fn stop(&mut self) -> SinkResult<()> {
22///         self.connection.close().await?;
23///         Ok(())
24///     }
25///     async fn reconnect(&mut self) -> SinkResult<()> {
26///         self.connection = Connection::new(&self.config).await?;
27///         Ok(())
28///     }
29/// }
30/// ```
31#[async_trait]
32pub trait AsyncCtrl {
33    /// Gracefully stop the sink and release all resources.
34    ///
35    /// This method must be idempotent - subsequent calls should return `Ok(())`
36    /// without side effects. After calling `stop()`, the sink should not accept
37    /// any more data.
38    async fn stop(&mut self) -> SinkResult<()>;
39
40    /// Reconnect or refresh the sink's underlying connection.
41    ///
42    /// Use this to recover from transient failures without recreating the sink.
43    /// The method should preserve any configuration and state that doesn't
44    /// depend on the connection itself.
45    async fn reconnect(&mut self) -> SinkResult<()>;
46}
47
48/// Trait for sinking structured records.
49///
50/// Provides methods for writing parsed, typed data records to a destination.
51/// Implementations should handle batching efficiently when `sink_records` is called.
52#[async_trait]
53pub trait AsyncRecordSink {
54    /// Write a single data record to the sink.
55    ///
56    /// # Arguments
57    /// * `data` - Reference to the record to write
58    async fn sink_record(&mut self, data: &DataRecord) -> SinkResult<()>;
59
60    /// Write multiple data records in batch.
61    ///
62    /// Implementations should optimize for batch writes when possible.
63    /// The order of records in the vector should be preserved.
64    ///
65    /// # Arguments
66    /// * `data` - Vector of records wrapped in Arc for shared ownership
67    async fn sink_records(&mut self, data: Vec<Arc<DataRecord>>) -> SinkResult<()>;
68}
69
70/// Trait for sinking raw data (strings and bytes).
71///
72/// Provides methods for writing unstructured data to a destination.
73/// Useful for pass-through scenarios or when data doesn't need parsing.
74#[async_trait]
75pub trait AsyncRawDataSink {
76    /// Write a single string payload.
77    async fn sink_str(&mut self, data: &str) -> SinkResult<()>;
78
79    /// Write a single byte payload.
80    async fn sink_bytes(&mut self, data: &[u8]) -> SinkResult<()>;
81
82    /// Write multiple string payloads in batch.
83    ///
84    /// Order of strings should be preserved in the output.
85    async fn sink_str_batch(&mut self, data: Vec<&str>) -> SinkResult<()>;
86
87    /// Write multiple byte payloads in batch.
88    ///
89    /// Order of byte slices should be preserved in the output.
90    async fn sink_bytes_batch(&mut self, data: Vec<&[u8]>) -> SinkResult<()>;
91}
92
93/// Combined trait for full-featured async sinks.
94///
95/// This is a marker trait that combines [`AsyncRecordSink`], [`AsyncRawDataSink`],
96/// and [`AsyncCtrl`]. Types implementing all three traits automatically implement
97/// `AsyncSink` through the blanket implementation.
98///
99/// Use this trait when you need a sink that supports all data formats and
100/// lifecycle management.
101pub trait AsyncSink: AsyncRecordSink + AsyncRawDataSink + AsyncCtrl + Send + Sync {}
102
103impl<T> AsyncSink for T where T: AsyncRecordSink + AsyncRawDataSink + AsyncCtrl + Send + Sync {}
104
105// ---------- Build Ctx ----------
106
107/// Build context passed to sink factories during construction.
108///
109/// Contains runtime configuration such as work directories, replica info,
110/// and rate limiting hints that sinks may use during initialization.
111#[derive(Clone, Debug)]
112pub struct SinkBuildCtx {
113    /// Root directory for sink-specific working files (state, checkpoints, etc.)
114    pub work_root: PathBuf,
115    /// Replica index for parallel group builds (0-based). Defaults to 0.
116    pub replica_idx: usize,
117    /// Replica count for the group (>=1). Defaults to 1.
118    pub replica_cnt: usize,
119    /// Upstream rate limit hint in requests per second. 0 means unlimited.
120    pub rate_limit_rps: usize,
121}
122
123impl SinkBuildCtx {
124    pub fn new(work_root: PathBuf) -> Self {
125        Self {
126            work_root,
127            replica_idx: 0,
128            replica_cnt: 1,
129            rate_limit_rps: 0,
130        }
131    }
132    pub fn new_with_replica(work_root: PathBuf, replica_idx: usize, replica_cnt: usize) -> Self {
133        Self {
134            work_root,
135            replica_idx,
136            replica_cnt: replica_cnt.max(1),
137            rate_limit_rps: 0,
138        }
139    }
140    pub fn with_limit(mut self, rate_limit_rps: usize) -> Self {
141        self.rate_limit_rps = rate_limit_rps;
142        self
143    }
144}
145
146/// Handle wrapping a boxed async sink instance.
147///
148/// Returned by [`SinkFactory::build`] and used by the orchestrator to
149/// manage sink lifecycle and data flow.
150pub struct SinkHandle {
151    /// The boxed sink implementing [`AsyncSink`]
152    pub sink: Box<dyn AsyncSink + 'static>,
153}
154
155impl std::fmt::Debug for SinkHandle {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        // Name matches the type to avoid confusion in logs/diagnostics
158        f.debug_struct("SinkHandle")
159            .field("sink", &"Box<dyn AsyncSink>")
160            .finish()
161    }
162}
163
164impl SinkHandle {
165    pub fn new(sink: Box<dyn AsyncSink + 'static>) -> Self {
166        Self { sink }
167    }
168}
169
170// ---------- Resolved Route Spec + Factory (for runtime decoupling) ----------
171
172/// Resolved sink specification with all parameters flattened.
173///
174/// Contains the fully resolved configuration for a sink instance,
175/// with all inheritance and defaults already applied.
176#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
177pub struct ResolvedSinkSpec {
178    /// Sink group name for routing and management
179    pub group: String,
180    /// Unique sink instance name within the group
181    pub name: String,
182    /// Sink type identifier (e.g., "kafka", "elasticsearch")
183    pub kind: String,
184    /// Reference to the connector definition
185    pub connector_id: String,
186    /// Flattened runtime parameters
187    pub params: crate::types::ParamMap,
188    /// Optional filter expression for selective routing
189    pub filter: Option<String>,
190}
191
192/// Factory trait for creating sink instances.
193///
194/// Implementors must also implement [`SinkDefProvider`] to provide
195/// connector metadata. The orchestrator uses this trait to construct
196/// sinks at runtime based on resolved specifications.
197///
198/// # Example
199/// ```ignore
200/// #[async_trait]
201/// impl SinkFactory for KafkaSinkFactory {
202///     fn kind(&self) -> &'static str { "kafka" }
203///
204///     async fn build(&self, spec: &ResolvedSinkSpec, ctx: &SinkBuildCtx) -> SinkResult<SinkHandle> {
205///         let sink = KafkaSink::new(&spec.params).await?;
206///         Ok(SinkHandle::new(Box::new(sink)))
207///     }
208/// }
209/// ```
210#[async_trait]
211pub trait SinkFactory: SinkDefProvider + Send + Sync + 'static {
212    /// Returns the unique type identifier for this sink factory.
213    fn kind(&self) -> &'static str;
214
215    /// Optional lightweight validation of the sink specification.
216    ///
217    /// Called before `build()` to catch configuration errors early.
218    /// Default implementation accepts all specifications.
219    fn validate_spec(&self, _spec: &ResolvedSinkSpec) -> SinkResult<()> {
220        Ok(())
221    }
222
223    /// Construct a new sink instance from the given specification.
224    ///
225    /// # Arguments
226    /// * `spec` - Resolved sink specification with parameters
227    /// * `ctx` - Build context with runtime configuration
228    ///
229    /// # Returns
230    /// A [`SinkHandle`] wrapping the constructed sink, or an error.
231    async fn build(&self, spec: &ResolvedSinkSpec, ctx: &SinkBuildCtx) -> SinkResult<SinkHandle>;
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use async_trait::async_trait;
238    use std::{path::PathBuf, sync::Arc};
239    use wp_model_core::model::DataRecord;
240
241    #[derive(Default)]
242    struct NoopSink;
243
244    #[async_trait]
245    impl AsyncCtrl for NoopSink {
246        async fn stop(&mut self) -> SinkResult<()> {
247            Ok(())
248        }
249
250        async fn reconnect(&mut self) -> SinkResult<()> {
251            Ok(())
252        }
253    }
254
255    #[async_trait]
256    impl AsyncRecordSink for NoopSink {
257        async fn sink_record(&mut self, _data: &DataRecord) -> SinkResult<()> {
258            Ok(())
259        }
260
261        async fn sink_records(&mut self, _data: Vec<Arc<DataRecord>>) -> SinkResult<()> {
262            Ok(())
263        }
264    }
265
266    #[async_trait]
267    impl AsyncRawDataSink for NoopSink {
268        async fn sink_str(&mut self, _data: &str) -> SinkResult<()> {
269            Ok(())
270        }
271
272        async fn sink_bytes(&mut self, _data: &[u8]) -> SinkResult<()> {
273            Ok(())
274        }
275
276        async fn sink_str_batch(&mut self, _data: Vec<&str>) -> SinkResult<()> {
277            Ok(())
278        }
279
280        async fn sink_bytes_batch(&mut self, _data: Vec<&[u8]>) -> SinkResult<()> {
281            Ok(())
282        }
283    }
284
285    #[test]
286    fn sink_build_ctx_defaults_and_limits() {
287        let ctx = SinkBuildCtx::new(PathBuf::from("/tmp/work"));
288        assert_eq!(ctx.work_root, PathBuf::from("/tmp/work"));
289        assert_eq!(ctx.replica_idx, 0);
290        assert_eq!(ctx.replica_cnt, 1);
291        assert_eq!(ctx.rate_limit_rps, 0);
292
293        let replica_ctx = SinkBuildCtx::new_with_replica(PathBuf::from("/tmp/work"), 2, 0);
294        assert_eq!(replica_ctx.replica_idx, 2);
295        assert_eq!(
296            replica_ctx.replica_cnt, 1,
297            "replica count should clamp to >=1"
298        );
299
300        let limited = SinkBuildCtx::new(PathBuf::from("/tmp/work")).with_limit(250);
301        assert_eq!(limited.rate_limit_rps, 250);
302        assert_eq!(limited.replica_cnt, 1);
303    }
304
305    #[test]
306    fn sink_handle_wraps_async_sink() {
307        let handle = SinkHandle::new(Box::new(NoopSink));
308        assert!(format!("{handle:?}").contains("SinkHandle"));
309    }
310}