wp_connector_api/runtime/sink.rs
1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use std::{path::PathBuf, sync::Arc};
4use wp_model_core::model::DataRecord;
5
6use crate::{SinkDefProvider, SinkResult};
7
8// Reuse workspace error type to avoid duplicating an error abstraction
9
10// ---------- Core Sink Traits ----------
11
12/// Runtime control trait for managing sink lifecycle.
13///
14/// Implementors must ensure all methods are **idempotent** - calling them
15/// multiple times should be safe and produce consistent results.
16///
17/// # Example
18/// ```ignore
19/// #[async_trait]
20/// impl AsyncCtrl for MySink {
21/// async fn stop(&mut self) -> SinkResult<()> {
22/// self.connection.close().await?;
23/// Ok(())
24/// }
25/// async fn reconnect(&mut self) -> SinkResult<()> {
26/// self.connection = Connection::new(&self.config).await?;
27/// Ok(())
28/// }
29/// }
30/// ```
31#[async_trait]
32pub trait AsyncCtrl {
33 /// Gracefully stop the sink and release all resources.
34 ///
35 /// This method must be idempotent - subsequent calls should return `Ok(())`
36 /// without side effects. After calling `stop()`, the sink should not accept
37 /// any more data.
38 async fn stop(&mut self) -> SinkResult<()>;
39
40 /// Reconnect or refresh the sink's underlying connection.
41 ///
42 /// Use this to recover from transient failures without recreating the sink.
43 /// The method should preserve any configuration and state that doesn't
44 /// depend on the connection itself.
45 async fn reconnect(&mut self) -> SinkResult<()>;
46}
47
48/// Trait for sinking structured records.
49///
50/// Provides methods for writing parsed, typed data records to a destination.
51/// Implementations should handle batching efficiently when `sink_records` is called.
52#[async_trait]
53pub trait AsyncRecordSink {
54 /// Write a single data record to the sink.
55 ///
56 /// # Arguments
57 /// * `data` - Reference to the record to write
58 async fn sink_record(&mut self, data: &DataRecord) -> SinkResult<()>;
59
60 /// Write multiple data records in batch.
61 ///
62 /// Implementations should optimize for batch writes when possible.
63 /// The order of records in the vector should be preserved.
64 ///
65 /// # Arguments
66 /// * `data` - Vector of records wrapped in Arc for shared ownership
67 async fn sink_records(&mut self, data: Vec<Arc<DataRecord>>) -> SinkResult<()>;
68}
69
70/// Trait for sinking raw data (strings and bytes).
71///
72/// Provides methods for writing unstructured data to a destination.
73/// Useful for pass-through scenarios or when data doesn't need parsing.
74#[async_trait]
75pub trait AsyncRawDataSink {
76 /// Write a single string payload.
77 async fn sink_str(&mut self, data: &str) -> SinkResult<()>;
78
79 /// Write a single byte payload.
80 async fn sink_bytes(&mut self, data: &[u8]) -> SinkResult<()>;
81
82 /// Write multiple string payloads in batch.
83 ///
84 /// Order of strings should be preserved in the output.
85 async fn sink_str_batch(&mut self, data: Vec<&str>) -> SinkResult<()>;
86
87 /// Write multiple byte payloads in batch.
88 ///
89 /// Order of byte slices should be preserved in the output.
90 async fn sink_bytes_batch(&mut self, data: Vec<&[u8]>) -> SinkResult<()>;
91}
92
93/// Combined trait for full-featured async sinks.
94///
95/// This is a marker trait that combines [`AsyncRecordSink`], [`AsyncRawDataSink`],
96/// and [`AsyncCtrl`]. Types implementing all three traits automatically implement
97/// `AsyncSink` through the blanket implementation.
98///
99/// Use this trait when you need a sink that supports all data formats and
100/// lifecycle management.
101pub trait AsyncSink: AsyncRecordSink + AsyncRawDataSink + AsyncCtrl + Send + Sync {}
102
103impl<T> AsyncSink for T where T: AsyncRecordSink + AsyncRawDataSink + AsyncCtrl + Send + Sync {}
104
105// ---------- Build Ctx ----------
106
107/// Build context passed to sink factories during construction.
108///
109/// Contains runtime configuration such as work directories, replica info,
110/// and rate limiting hints that sinks may use during initialization.
111#[derive(Clone, Debug)]
112pub struct SinkBuildCtx {
113 /// Root directory for sink-specific working files (state, checkpoints, etc.)
114 pub work_root: PathBuf,
115 /// Replica index for parallel group builds (0-based). Defaults to 0.
116 pub replica_idx: usize,
117 /// Replica count for the group (>=1). Defaults to 1.
118 pub replica_cnt: usize,
119 /// Upstream rate limit hint in requests per second. 0 means unlimited.
120 pub rate_limit_rps: usize,
121}
122
123impl SinkBuildCtx {
124 pub fn new(work_root: PathBuf) -> Self {
125 Self {
126 work_root,
127 replica_idx: 0,
128 replica_cnt: 1,
129 rate_limit_rps: 0,
130 }
131 }
132 pub fn new_with_replica(work_root: PathBuf, replica_idx: usize, replica_cnt: usize) -> Self {
133 Self {
134 work_root,
135 replica_idx,
136 replica_cnt: replica_cnt.max(1),
137 rate_limit_rps: 0,
138 }
139 }
140 pub fn with_limit(mut self, rate_limit_rps: usize) -> Self {
141 self.rate_limit_rps = rate_limit_rps;
142 self
143 }
144}
145
146/// Handle wrapping a boxed async sink instance.
147///
148/// Returned by [`SinkFactory::build`] and used by the orchestrator to
149/// manage sink lifecycle and data flow.
150pub struct SinkHandle {
151 /// The boxed sink implementing [`AsyncSink`]
152 pub sink: Box<dyn AsyncSink + 'static>,
153}
154
155impl std::fmt::Debug for SinkHandle {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 // Name matches the type to avoid confusion in logs/diagnostics
158 f.debug_struct("SinkHandle")
159 .field("sink", &"Box<dyn AsyncSink>")
160 .finish()
161 }
162}
163
164impl SinkHandle {
165 pub fn new(sink: Box<dyn AsyncSink + 'static>) -> Self {
166 Self { sink }
167 }
168}
169
170// ---------- Resolved Route Spec + Factory (for runtime decoupling) ----------
171
172/// Resolved sink specification with all parameters flattened.
173///
174/// Contains the fully resolved configuration for a sink instance,
175/// with all inheritance and defaults already applied.
176#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
177pub struct ResolvedSinkSpec {
178 /// Sink group name for routing and management
179 pub group: String,
180 /// Unique sink instance name within the group
181 pub name: String,
182 /// Sink type identifier (e.g., "kafka", "elasticsearch")
183 pub kind: String,
184 /// Reference to the connector definition
185 pub connector_id: String,
186 /// Flattened runtime parameters
187 pub params: crate::types::ParamMap,
188 /// Optional filter expression for selective routing
189 pub filter: Option<String>,
190}
191
192/// Factory trait for creating sink instances.
193///
194/// Implementors must also implement [`SinkDefProvider`] to provide
195/// connector metadata. The orchestrator uses this trait to construct
196/// sinks at runtime based on resolved specifications.
197///
198/// # Example
199/// ```ignore
200/// #[async_trait]
201/// impl SinkFactory for KafkaSinkFactory {
202/// fn kind(&self) -> &'static str { "kafka" }
203///
204/// async fn build(&self, spec: &ResolvedSinkSpec, ctx: &SinkBuildCtx) -> SinkResult<SinkHandle> {
205/// let sink = KafkaSink::new(&spec.params).await?;
206/// Ok(SinkHandle::new(Box::new(sink)))
207/// }
208/// }
209/// ```
210#[async_trait]
211pub trait SinkFactory: SinkDefProvider + Send + Sync + 'static {
212 /// Returns the unique type identifier for this sink factory.
213 fn kind(&self) -> &'static str;
214
215 /// Optional lightweight validation of the sink specification.
216 ///
217 /// Called before `build()` to catch configuration errors early.
218 /// Default implementation accepts all specifications.
219 fn validate_spec(&self, _spec: &ResolvedSinkSpec) -> SinkResult<()> {
220 Ok(())
221 }
222
223 /// Construct a new sink instance from the given specification.
224 ///
225 /// # Arguments
226 /// * `spec` - Resolved sink specification with parameters
227 /// * `ctx` - Build context with runtime configuration
228 ///
229 /// # Returns
230 /// A [`SinkHandle`] wrapping the constructed sink, or an error.
231 async fn build(&self, spec: &ResolvedSinkSpec, ctx: &SinkBuildCtx) -> SinkResult<SinkHandle>;
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use async_trait::async_trait;
238 use std::{path::PathBuf, sync::Arc};
239 use wp_model_core::model::DataRecord;
240
241 #[derive(Default)]
242 struct NoopSink;
243
244 #[async_trait]
245 impl AsyncCtrl for NoopSink {
246 async fn stop(&mut self) -> SinkResult<()> {
247 Ok(())
248 }
249
250 async fn reconnect(&mut self) -> SinkResult<()> {
251 Ok(())
252 }
253 }
254
255 #[async_trait]
256 impl AsyncRecordSink for NoopSink {
257 async fn sink_record(&mut self, _data: &DataRecord) -> SinkResult<()> {
258 Ok(())
259 }
260
261 async fn sink_records(&mut self, _data: Vec<Arc<DataRecord>>) -> SinkResult<()> {
262 Ok(())
263 }
264 }
265
266 #[async_trait]
267 impl AsyncRawDataSink for NoopSink {
268 async fn sink_str(&mut self, _data: &str) -> SinkResult<()> {
269 Ok(())
270 }
271
272 async fn sink_bytes(&mut self, _data: &[u8]) -> SinkResult<()> {
273 Ok(())
274 }
275
276 async fn sink_str_batch(&mut self, _data: Vec<&str>) -> SinkResult<()> {
277 Ok(())
278 }
279
280 async fn sink_bytes_batch(&mut self, _data: Vec<&[u8]>) -> SinkResult<()> {
281 Ok(())
282 }
283 }
284
285 #[test]
286 fn sink_build_ctx_defaults_and_limits() {
287 let ctx = SinkBuildCtx::new(PathBuf::from("/tmp/work"));
288 assert_eq!(ctx.work_root, PathBuf::from("/tmp/work"));
289 assert_eq!(ctx.replica_idx, 0);
290 assert_eq!(ctx.replica_cnt, 1);
291 assert_eq!(ctx.rate_limit_rps, 0);
292
293 let replica_ctx = SinkBuildCtx::new_with_replica(PathBuf::from("/tmp/work"), 2, 0);
294 assert_eq!(replica_ctx.replica_idx, 2);
295 assert_eq!(
296 replica_ctx.replica_cnt, 1,
297 "replica count should clamp to >=1"
298 );
299
300 let limited = SinkBuildCtx::new(PathBuf::from("/tmp/work")).with_limit(250);
301 assert_eq!(limited.rate_limit_rps, 250);
302 assert_eq!(limited.replica_cnt, 1);
303 }
304
305 #[test]
306 fn sink_handle_wraps_async_sink() {
307 let handle = SinkHandle::new(Box::new(NoopSink));
308 assert!(format!("{handle:?}").contains("SinkHandle"));
309 }
310}