Skip to main content

wf_connector_api/
lib.rs

1//! # wf-connector-api
2//!
3//! Minimal Arrow-native connector API for warp-fusion.
4//!
5//! ## Design
6//!
7//! `wp-connector-api` sources produce `SourceEvent { payload: RawData }`,
8//! designed for downstream parse pipelines. CEP engines like warp-fusion
9//! operate on Arrow `RecordBatch` directly.
10//!
11//! `wf-connector-api` fills this gap — one trait for sources, extensible
12//! to sinks in the future (e.g. `BatchSink` for Arrow-native output).
13//!
14//! ## Relationship with `wp-connector-api`
15//!
16//! | | wp-connector-api | wf-connector-api |
17//! |---|---|---|
18//! | Source data | `SourceEvent { payload: RawData }` | `RecordBatch` (columnar) |
19//! | Consumer | parse pipeline (WPL) | CEP engine (warp-fusion) |
20//! | Error model | `SourceResult<T>` (orion-error) | `SourceResult<T>` (orion-error) |
21//! | Lifecycle | `start()` / `receive()` / `close()` | `start()` / `receive_batch()` / `close()` |
22//!
23//! `wp-connectors` (the implementation crate) can implement BOTH traits
24//! for the same connector (Kafka / File / TCP), sharing connection logic.
25
26use arrow::record_batch::RecordBatch;
27use async_trait::async_trait;
28use orion_error::conversion::ToStructError;
29use orion_error::{OrionError, StructError, UnifiedReason};
30use std::error::Error as StdError;
31
32// -- Error -------------------------------------------------------------------
33
34/// Connector error reason.
35///
36/// All leaf variants carry detail via `err_detail()`. `SourceError` wraps
37/// each variant with a detail string and optional source error.
38#[derive(Debug, Clone, PartialEq, OrionError)]
39pub enum SourceReason {
40    /// End of stream — no more data will be produced.
41    #[orion_error(message = "end of stream", identity = "sys.wf_connector.eof")]
42    EOF,
43    /// No data currently available (not EOF); caller should retry.
44    #[orion_error(message = "no data available", identity = "sys.wf_connector.not_data")]
45    NotData,
46    /// I/O error from the underlying transport.
47    #[orion_error(message = "I/O error", identity = "sys.wf_connector.io")]
48    Io,
49    /// Failed to establish connection / bind / subscribe.
50    #[orion_error(message = "connection error", identity = "sys.wf_connector.connect")]
51    Connect,
52    /// Message / frame decoding failed.
53    #[orion_error(message = "decode error", identity = "sys.wf_connector.decode")]
54    Decode,
55    /// Referenced connector not found in registry.
56    #[orion_error(
57        message = "connector not found",
58        identity = "sys.wf_connector.not_found"
59    )]
60    NotFound,
61    /// Catch-all for unexpected errors.
62    #[orion_error(transparent)]
63    General(UnifiedReason),
64}
65
66impl SourceReason {
67    /// Create an error with detail message.
68    pub fn err_detail<S: Into<String>>(self, detail: S) -> SourceError {
69        self.to_err().with_detail(detail.into())
70    }
71
72    /// Create an error with a source (chained) error.
73    pub fn err_source<E>(self, source: E) -> SourceError
74    where
75        E: StdError + Send + Sync + 'static,
76    {
77        self.to_err().with_source(source)
78    }
79}
80
81pub type SourceError = StructError<SourceReason>;
82pub type SourceResult<T> = Result<T, SourceError>;
83
84// -- Source ------------------------------------------------------------------
85
86/// A batch-oriented data source that produces Arrow [`RecordBatch`]es.
87///
88/// # Lifecycle
89///
90/// 1. `start()` — initialize (connect, subscribe, bind)
91/// 2. `receive_batch()` — pull data in a loop
92/// 3. `close()` — release resources (unsubscribe, close connections)
93///
94/// `close()` must be idempotent — safe to call multiple times, even before `start()`.
95///
96/// # Empty vs EOF
97///
98/// - Return `Ok(vec![])` when no data is currently available (caller should retry).
99/// - Return `Err(SourceReason::EOF.into())` when the stream has ended.
100#[async_trait]
101pub trait BatchSource: Send {
102    /// Initialize the source. Called once before the first `receive_batch()`.
103    ///
104    /// Default is a no-op.
105    async fn start(&mut self) -> SourceResult<()> {
106        Ok(())
107    }
108
109    /// Receive zero or more [`RecordBatch`]es.
110    ///
111    /// An empty `Vec` means "no data right now" — the caller should poll again.
112    /// An error with `SourceReason::EOF` means the stream has ended.
113    async fn receive_batch(&mut self) -> SourceResult<Vec<RecordBatch>>;
114
115    /// Close the source and release all resources.
116    ///
117    /// Must be idempotent — safe to call multiple times or before `start()`.
118    /// Default is a no-op.
119    async fn close(&mut self) -> SourceResult<()> {
120        Ok(())
121    }
122
123    /// Unique identifier for this source instance (logging / metrics).
124    fn identifier(&self) -> &str;
125}
126
127// -- Sink (TBD) --------------------------------------------------------------
128
129// Future extension:
130//
131// ```ignore
132// #[async_trait]
133// pub trait BatchSink: Send {
134//     async fn start(&mut self) -> SourceResult<()> { Ok(()) }
135//     async fn send_batch(&mut self, batch: &RecordBatch) -> SourceResult<()>;
136//     async fn flush(&mut self) -> SourceResult<()>;
137//     async fn close(&mut self) -> SourceResult<()> { Ok(()) }
138//     fn identifier(&self) -> &str;
139// }
140// ```