wf_connector_api/lib.rs
1//! # wf-connector-api
2//!
3//! Minimal Arrow-native connector API for warp-fusion.
4//!
5//! ## Design
6//!
7//! `wp-connector-api` sources produce `SourceEvent { payload: RawData }`,
8//! designed for downstream parse pipelines. CEP engines like warp-fusion
9//! operate on Arrow `RecordBatch` directly.
10//!
11//! `wf-connector-api` fills this gap — one trait for sources, extensible
12//! to sinks in the future (e.g. `BatchSink` for Arrow-native output).
13//!
14//! ## Relationship with `wp-connector-api`
15//!
16//! | | wp-connector-api | wf-connector-api |
17//! |---|---|---|
18//! | Source data | `SourceEvent { payload: RawData }` | `RecordBatch` (columnar) |
19//! | Consumer | parse pipeline (WPL) | CEP engine (warp-fusion) |
20//! | Error model | `SourceResult<T>` (orion-error) | `SourceResult<T>` (orion-error) |
21//! | Lifecycle | `start()` / `receive()` / `close()` | `start()` / `receive_batch()` / `close()` |
22//!
23//! `wp-connectors` (the implementation crate) can implement BOTH traits
24//! for the same connector (Kafka / File / TCP), sharing connection logic.
25
26use arrow::record_batch::RecordBatch;
27use async_trait::async_trait;
28use orion_error::conversion::ToStructError;
29use orion_error::{OrionError, StructError, UnifiedReason};
30use std::error::Error as StdError;
31
32// -- Error -------------------------------------------------------------------
33
34/// Connector error reason.
35///
36/// All leaf variants carry detail via `err_detail()`. `SourceError` wraps
37/// each variant with a detail string and optional source error.
38#[derive(Debug, Clone, PartialEq, OrionError)]
39pub enum SourceReason {
40 /// End of stream — no more data will be produced.
41 #[orion_error(message = "end of stream", identity = "sys.wf_connector.eof")]
42 EOF,
43 /// No data currently available (not EOF); caller should retry.
44 #[orion_error(message = "no data available", identity = "sys.wf_connector.not_data")]
45 NotData,
46 /// I/O error from the underlying transport.
47 #[orion_error(message = "I/O error", identity = "sys.wf_connector.io")]
48 Io,
49 /// Failed to establish connection / bind / subscribe.
50 #[orion_error(message = "connection error", identity = "sys.wf_connector.connect")]
51 Connect,
52 /// Message / frame decoding failed.
53 #[orion_error(message = "decode error", identity = "sys.wf_connector.decode")]
54 Decode,
55 /// Referenced connector not found in registry.
56 #[orion_error(
57 message = "connector not found",
58 identity = "sys.wf_connector.not_found"
59 )]
60 NotFound,
61 /// Catch-all for unexpected errors.
62 #[orion_error(transparent)]
63 General(UnifiedReason),
64}
65
66impl SourceReason {
67 /// Create an error with detail message.
68 pub fn err_detail<S: Into<String>>(self, detail: S) -> SourceError {
69 self.to_err().with_detail(detail.into())
70 }
71
72 /// Create an error with a source (chained) error.
73 pub fn err_source<E>(self, source: E) -> SourceError
74 where
75 E: StdError + Send + Sync + 'static,
76 {
77 self.to_err().with_source(source)
78 }
79}
80
81pub type SourceError = StructError<SourceReason>;
82pub type SourceResult<T> = Result<T, SourceError>;
83
84// -- Source ------------------------------------------------------------------
85
86/// A batch-oriented data source that produces Arrow [`RecordBatch`]es.
87///
88/// # Lifecycle
89///
90/// 1. `start()` — initialize (connect, subscribe, bind)
91/// 2. `receive_batch()` — pull data in a loop
92/// 3. `close()` — release resources (unsubscribe, close connections)
93///
94/// `close()` must be idempotent — safe to call multiple times, even before `start()`.
95///
96/// # Empty vs EOF
97///
98/// - Return `Ok(vec![])` when no data is currently available (caller should retry).
99/// - Return `Err(SourceReason::EOF.into())` when the stream has ended.
100#[async_trait]
101pub trait BatchSource: Send {
102 /// Initialize the source. Called once before the first `receive_batch()`.
103 ///
104 /// Default is a no-op.
105 async fn start(&mut self) -> SourceResult<()> {
106 Ok(())
107 }
108
109 /// Receive zero or more [`RecordBatch`]es.
110 ///
111 /// An empty `Vec` means "no data right now" — the caller should poll again.
112 /// An error with `SourceReason::EOF` means the stream has ended.
113 async fn receive_batch(&mut self) -> SourceResult<Vec<RecordBatch>>;
114
115 /// Close the source and release all resources.
116 ///
117 /// Must be idempotent — safe to call multiple times or before `start()`.
118 /// Default is a no-op.
119 async fn close(&mut self) -> SourceResult<()> {
120 Ok(())
121 }
122
123 /// Unique identifier for this source instance (logging / metrics).
124 fn identifier(&self) -> &str;
125}
126
127// -- Sink (TBD) --------------------------------------------------------------
128
129// Future extension:
130//
131// ```ignore
132// #[async_trait]
133// pub trait BatchSink: Send {
134// async fn start(&mut self) -> SourceResult<()> { Ok(()) }
135// async fn send_batch(&mut self, batch: &RecordBatch) -> SourceResult<()>;
136// async fn flush(&mut self) -> SourceResult<()>;
137// async fn close(&mut self) -> SourceResult<()> { Ok(()) }
138// fn identifier(&self) -> &str;
139// }
140// ```