Skip to main content

streamling_plugin/
api.rs

1#![allow(dead_code)]
2//! This module defines the API that can be used for implementing plugins.
3//! NOTE: this API is NOT FFI-safe and is intended for use in the plugin AFTER the FFI layer.
4//! See `plugin_interface::ffi` for the FFI-safe types and traits.
5
6use crate::{PluginLabel, PluginStateBackendConfig};
7use abi_stable::traits::IntoReprRust;
8use arrow::array::RecordBatch;
9use arrow::datatypes::SchemaRef;
10use arrow::error::ArrowError;
11use async_trait::async_trait;
12use serde::{Deserialize, Serialize};
13use std::fmt::{self, Debug};
14use std::io;
15use std::sync::{Arc, RwLock};
16use streamling_config::StateBackendConfig;
17use streamling_state::{
18    StateBackendError, StateBackendFactories, StateKey, StateOperatorBackend,
19    StateOperatorBackendFactory,
20};
21
22pub static STREAMLING_COLUMN_NAME_OP: &str = "_gs_op";
23
24pub struct PluginStateBackendFactory {
25    factories: StateBackendFactories,
26    application_namespace: String,
27    plugin_reference_name: String,
28}
29
30impl PluginStateBackendFactory {
31    pub fn new(config: PluginStateBackendConfig) -> Self {
32        let state_backend_config: StateBackendConfig =
33            serde_json::from_str(&config.serialized_config)
34                .expect("Failed to deserialize StateBackendConfig");
35
36        let factories = StateBackendFactories::new(state_backend_config)
37            .expect("Failed to create State Backend Factory");
38
39        PluginStateBackendFactory {
40            factories,
41            application_namespace: config.application_namespace.into_rust(),
42            plugin_reference_name: config.plugin_reference_name.into_rust(),
43        }
44    }
45
46    pub fn create<V>(&self) -> Arc<PluginStateBackend<V>>
47    where
48        V: Serialize + for<'de> Deserialize<'de> + Send + Sync + Unpin + Clone + Debug + 'static,
49    {
50        let inner = self.factories.create(&self.application_namespace);
51        Arc::new(PluginStateBackend::new(
52            inner,
53            self.plugin_reference_name.clone(),
54        ))
55    }
56}
57
58/// State backend for plugins.
59/// - `get()` / `put(value)` use the default key: `{reference_name}`
60/// - `get_kv(key)` / `put_kv(key, value)` use key: `{prefix}:{key}` (prefix defaults to reference_name)
61/// - `set_prefix(None)` resets to default (reference_name)
62/// - `set_prefix(Some("custom"))` sets prefix to "custom"
63/// - `set_prefix(Some(""))` removes prefix (global state)
64pub struct PluginStateBackend<V>
65where
66    V: Serialize + for<'de> Deserialize<'de> + Send + Sync + Clone + Debug + 'static,
67{
68    inner: Arc<dyn StateOperatorBackend<V>>,
69    reference_name: String,
70    kv_prefix: RwLock<Option<String>>,
71}
72
73impl<V> PluginStateBackend<V>
74where
75    V: Serialize + for<'de> Deserialize<'de> + Send + Sync + Clone + Debug + 'static,
76{
77    fn new(inner: Arc<dyn StateOperatorBackend<V>>, reference_name: String) -> Self {
78        Self {
79            inner,
80            reference_name,
81            kv_prefix: RwLock::new(None),
82        }
83    }
84
85    fn default_key(&self) -> StateKey {
86        StateKey(self.reference_name.clone())
87    }
88
89    fn build_kv_key(&self, key: &str) -> StateKey {
90        let prefix = self.kv_prefix.read().unwrap();
91        match prefix.as_ref() {
92            None => StateKey(format!("{}:{}", self.reference_name, key)),
93            Some(p) if p.is_empty() => StateKey(key.to_string()),
94            Some(p) => StateKey(format!("{}:{}", p, key)),
95        }
96    }
97
98    /// Set the prefix for `_kv` methods.
99    /// - `None` -> reset to default, keys become `{reference_name}:{key}`
100    /// - `Some("custom")` -> keys become `custom:{key}`
101    /// - `Some("")` -> keys become `{key}` (global state, no prefix)
102    pub fn set_prefix(&self, prefix: Option<&str>) {
103        let mut p = self.kv_prefix.write().unwrap();
104        *p = prefix.map(|s| s.to_string());
105    }
106
107    pub async fn get(&self) -> Result<Option<V>, StateBackendError> {
108        self.inner.get(self.default_key()).await
109    }
110
111    pub async fn put(&self, value: V) -> Result<(), StateBackendError> {
112        self.inner.put(self.default_key(), value).await
113    }
114
115    pub async fn remove(&self) -> Result<(), StateBackendError> {
116        self.inner.remove(self.default_key()).await
117    }
118
119    pub async fn get_kv(&self, key: &str) -> Result<Option<V>, StateBackendError> {
120        self.inner.get(self.build_kv_key(key)).await
121    }
122
123    pub async fn put_kv(&self, key: &str, value: V) -> Result<(), StateBackendError> {
124        self.inner.put(self.build_kv_key(key), value).await
125    }
126
127    pub async fn remove_kv(&self, key: &str) -> Result<(), StateBackendError> {
128        self.inner.remove(self.build_kv_key(key)).await
129    }
130
131    /// Clear the state for the current reference_name (removes the default key)
132    pub async fn clear(&self) -> Result<(), StateBackendError> {
133        self.inner.remove(self.default_key()).await
134    }
135}
136
137impl<V> Debug for PluginStateBackend<V>
138where
139    V: Serialize + for<'de> Deserialize<'de> + Send + Sync + Clone + Debug + 'static,
140{
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        let prefix = self.kv_prefix.read().unwrap();
143        f.debug_struct("PluginStateBackend")
144            .field("reference_name", &self.reference_name)
145            .field("kv_prefix", &prefix)
146            .finish()
147    }
148}
149
150#[derive(Debug)]
151pub enum PluginError {
152    ArrowError(ArrowError),
153    IoError(io::Error),
154    Internal(String),
155    Execution(String),
156    State(StateBackendError),
157}
158
159impl fmt::Display for PluginError {
160    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161        match self {
162            Self::ArrowError(e) => write!(f, "{e}"),
163            Self::IoError(e) => write!(f, "{e}"),
164            Self::Internal(msg) => f.write_str(msg),
165            Self::Execution(msg) => f.write_str(msg),
166            Self::State(e) => write!(f, "{e}"),
167        }
168    }
169}
170
171impl std::error::Error for PluginError {
172    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
173        match self {
174            Self::ArrowError(e) => Some(e),
175            Self::IoError(e) => Some(e),
176            Self::State(e) => Some(e),
177            Self::Internal(_) | Self::Execution(_) => None,
178        }
179    }
180}
181
182#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
183pub struct CheckpointEpoch(pub u64);
184
185#[async_trait]
186pub trait SupportsGracefulShutdown {
187    /// Returns true if the plugin is still running.
188    fn is_running(&self) -> bool;
189
190    /// Attempts to gracefully shut down the plugin.
191    async fn terminate(&self) -> Result<(), PluginError>;
192}
193
194/// Optional, non-FFI trait for the plugins to implement preprocessor support.
195/// Preprocessors transform the raw topology config string before parsing.
196#[async_trait]
197pub trait PreprocessorPlugin: Send + Sync {
198    async fn preprocess_topology(&self, config: String) -> Result<String, PluginError>;
199}
200
201/// Optional, non-FFI trait for the plugins to implement source support.
202#[async_trait]
203pub trait SourcePlugin: SupportsGracefulShutdown + Send + Sync {
204    async fn initialize(&self) -> Result<(), PluginError>;
205    fn output_schema(&self) -> Result<SchemaRef, PluginError>;
206    /// Identity labels for this plugin instance. Typically derived from options at
207    /// construction time (e.g. `chain_slug`, `network`, `topic`). Returned labels flow
208    /// through `PluginResult::labels` into the metrics subsystem as Prometheus labels.
209    /// Default: no labels.
210    fn labels(&self) -> Vec<PluginLabel> {
211        Vec::new()
212    }
213    /// Return an empty batch to indicate missing data if needed.
214    async fn generate_batch(&self) -> Result<RecordBatch, PluginError>;
215    /// Returning a successful result indicates that the checkpoint marker was processed
216    /// successfully, and the mark should be propagated downstream.
217    async fn process_checkpoint_marker(&self, epoch: CheckpointEpoch) -> Result<(), PluginError>;
218    async fn process_checkpoint_finalizer(&self, epoch: CheckpointEpoch)
219    -> Result<(), PluginError>;
220}
221
222/// Optional, non-FFI trait for the plugins to implement transform support.
223#[async_trait]
224pub trait TransformPlugin: SupportsGracefulShutdown + Send + Sync {
225    async fn initialize(&self) -> Result<(), PluginError>;
226    fn output_schema(&self) -> Result<SchemaRef, PluginError>;
227    /// See `SourcePlugin::labels`.
228    fn labels(&self) -> Vec<PluginLabel> {
229        Vec::new()
230    }
231    /// Return an empty batch to indicate missing data if needed.
232    async fn process_batch(&self, data: RecordBatch) -> Result<RecordBatch, PluginError>;
233    /// Returning a successful result indicates that the checkpoint marker was processed
234    /// successfully, and the mark should be propagated downstream.
235    async fn process_checkpoint_marker(&self, epoch: CheckpointEpoch) -> Result<(), PluginError>;
236    async fn process_checkpoint_finalizer(&self, epoch: CheckpointEpoch)
237    -> Result<(), PluginError>;
238}
239
240/// Optional, non-FFI trait for the plugins to implement sink support.
241#[async_trait]
242pub trait SinkPlugin: SupportsGracefulShutdown + Send + Sync {
243    async fn initialize(&self) -> Result<(), PluginError>;
244    /// See `SourcePlugin::labels`.
245    fn labels(&self) -> Vec<PluginLabel> {
246        Vec::new()
247    }
248    async fn process_batch(&self, data: RecordBatch) -> Result<(), PluginError>;
249    /// Returning a successful result indicates that the checkpoint marker was processed
250    /// successfully, and an acknowledgment should be sent back to the source.
251    async fn process_checkpoint_marker(&self, epoch: CheckpointEpoch) -> Result<(), PluginError>;
252    async fn process_checkpoint_finalizer(&self, epoch: CheckpointEpoch)
253    -> Result<(), PluginError>;
254}
255
256/// Trait for plugins to implement side output support.
257/// Side outputs observe data from all sources without modifying the pipeline.
258/// Unlike other plugin types, side outputs use direct FFI invocation (no channels).
259/// One instance is created per source via `new(source_name, schema, options, metrics_recorder)`.
260pub trait SideOutputPlugin: Send + Sync {
261    fn process_batch(&self, batch: &RecordBatch) -> Result<(), String>;
262    fn shutdown(&self);
263}