1#![allow(dead_code)]
2use crate::{PluginLabel, PluginStateBackendConfig};
7use abi_stable::traits::IntoReprRust;
8use arrow::array::RecordBatch;
9use arrow::datatypes::SchemaRef;
10use arrow::error::ArrowError;
11use async_trait::async_trait;
12use serde::{Deserialize, Serialize};
13use std::fmt::{self, Debug};
14use std::io;
15use std::sync::{Arc, RwLock};
16use streamling_config::StateBackendConfig;
17use streamling_state::{
18 StateBackendError, StateBackendFactories, StateKey, StateOperatorBackend,
19 StateOperatorBackendFactory,
20};
21
22pub static STREAMLING_COLUMN_NAME_OP: &str = "_gs_op";
23
24pub struct PluginStateBackendFactory {
25 factories: StateBackendFactories,
26 application_namespace: String,
27 plugin_reference_name: String,
28}
29
30impl PluginStateBackendFactory {
31 pub fn new(config: PluginStateBackendConfig) -> Self {
32 let state_backend_config: StateBackendConfig =
33 serde_json::from_str(&config.serialized_config)
34 .expect("Failed to deserialize StateBackendConfig");
35
36 let factories = StateBackendFactories::new(state_backend_config)
37 .expect("Failed to create State Backend Factory");
38
39 PluginStateBackendFactory {
40 factories,
41 application_namespace: config.application_namespace.into_rust(),
42 plugin_reference_name: config.plugin_reference_name.into_rust(),
43 }
44 }
45
46 pub fn create<V>(&self) -> Arc<PluginStateBackend<V>>
47 where
48 V: Serialize + for<'de> Deserialize<'de> + Send + Sync + Unpin + Clone + Debug + 'static,
49 {
50 let inner = self.factories.create(&self.application_namespace);
51 Arc::new(PluginStateBackend::new(
52 inner,
53 self.plugin_reference_name.clone(),
54 ))
55 }
56}
57
58pub struct PluginStateBackend<V>
65where
66 V: Serialize + for<'de> Deserialize<'de> + Send + Sync + Clone + Debug + 'static,
67{
68 inner: Arc<dyn StateOperatorBackend<V>>,
69 reference_name: String,
70 kv_prefix: RwLock<Option<String>>,
71}
72
73impl<V> PluginStateBackend<V>
74where
75 V: Serialize + for<'de> Deserialize<'de> + Send + Sync + Clone + Debug + 'static,
76{
77 fn new(inner: Arc<dyn StateOperatorBackend<V>>, reference_name: String) -> Self {
78 Self {
79 inner,
80 reference_name,
81 kv_prefix: RwLock::new(None),
82 }
83 }
84
85 fn default_key(&self) -> StateKey {
86 StateKey(self.reference_name.clone())
87 }
88
89 fn build_kv_key(&self, key: &str) -> StateKey {
90 let prefix = self.kv_prefix.read().unwrap();
91 match prefix.as_ref() {
92 None => StateKey(format!("{}:{}", self.reference_name, key)),
93 Some(p) if p.is_empty() => StateKey(key.to_string()),
94 Some(p) => StateKey(format!("{}:{}", p, key)),
95 }
96 }
97
98 pub fn set_prefix(&self, prefix: Option<&str>) {
103 let mut p = self.kv_prefix.write().unwrap();
104 *p = prefix.map(|s| s.to_string());
105 }
106
107 pub async fn get(&self) -> Result<Option<V>, StateBackendError> {
108 self.inner.get(self.default_key()).await
109 }
110
111 pub async fn put(&self, value: V) -> Result<(), StateBackendError> {
112 self.inner.put(self.default_key(), value).await
113 }
114
115 pub async fn remove(&self) -> Result<(), StateBackendError> {
116 self.inner.remove(self.default_key()).await
117 }
118
119 pub async fn get_kv(&self, key: &str) -> Result<Option<V>, StateBackendError> {
120 self.inner.get(self.build_kv_key(key)).await
121 }
122
123 pub async fn put_kv(&self, key: &str, value: V) -> Result<(), StateBackendError> {
124 self.inner.put(self.build_kv_key(key), value).await
125 }
126
127 pub async fn remove_kv(&self, key: &str) -> Result<(), StateBackendError> {
128 self.inner.remove(self.build_kv_key(key)).await
129 }
130
131 pub async fn clear(&self) -> Result<(), StateBackendError> {
133 self.inner.remove(self.default_key()).await
134 }
135}
136
137impl<V> Debug for PluginStateBackend<V>
138where
139 V: Serialize + for<'de> Deserialize<'de> + Send + Sync + Clone + Debug + 'static,
140{
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 let prefix = self.kv_prefix.read().unwrap();
143 f.debug_struct("PluginStateBackend")
144 .field("reference_name", &self.reference_name)
145 .field("kv_prefix", &prefix)
146 .finish()
147 }
148}
149
150#[derive(Debug)]
151pub enum PluginError {
152 ArrowError(ArrowError),
153 IoError(io::Error),
154 Internal(String),
155 Execution(String),
156 State(StateBackendError),
157}
158
159impl fmt::Display for PluginError {
160 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161 match self {
162 Self::ArrowError(e) => write!(f, "{e}"),
163 Self::IoError(e) => write!(f, "{e}"),
164 Self::Internal(msg) => f.write_str(msg),
165 Self::Execution(msg) => f.write_str(msg),
166 Self::State(e) => write!(f, "{e}"),
167 }
168 }
169}
170
171impl std::error::Error for PluginError {
172 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
173 match self {
174 Self::ArrowError(e) => Some(e),
175 Self::IoError(e) => Some(e),
176 Self::State(e) => Some(e),
177 Self::Internal(_) | Self::Execution(_) => None,
178 }
179 }
180}
181
182#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
183pub struct CheckpointEpoch(pub u64);
184
185#[async_trait]
186pub trait SupportsGracefulShutdown {
187 fn is_running(&self) -> bool;
189
190 async fn terminate(&self) -> Result<(), PluginError>;
192}
193
194#[async_trait]
197pub trait PreprocessorPlugin: Send + Sync {
198 async fn preprocess_topology(&self, config: String) -> Result<String, PluginError>;
199}
200
201#[async_trait]
203pub trait SourcePlugin: SupportsGracefulShutdown + Send + Sync {
204 async fn initialize(&self) -> Result<(), PluginError>;
205 fn output_schema(&self) -> Result<SchemaRef, PluginError>;
206 fn labels(&self) -> Vec<PluginLabel> {
211 Vec::new()
212 }
213 async fn generate_batch(&self) -> Result<RecordBatch, PluginError>;
215 async fn process_checkpoint_marker(&self, epoch: CheckpointEpoch) -> Result<(), PluginError>;
218 async fn process_checkpoint_finalizer(&self, epoch: CheckpointEpoch)
219 -> Result<(), PluginError>;
220}
221
222#[async_trait]
224pub trait TransformPlugin: SupportsGracefulShutdown + Send + Sync {
225 async fn initialize(&self) -> Result<(), PluginError>;
226 fn output_schema(&self) -> Result<SchemaRef, PluginError>;
227 fn labels(&self) -> Vec<PluginLabel> {
229 Vec::new()
230 }
231 async fn process_batch(&self, data: RecordBatch) -> Result<RecordBatch, PluginError>;
233 async fn process_checkpoint_marker(&self, epoch: CheckpointEpoch) -> Result<(), PluginError>;
236 async fn process_checkpoint_finalizer(&self, epoch: CheckpointEpoch)
237 -> Result<(), PluginError>;
238}
239
240#[async_trait]
242pub trait SinkPlugin: SupportsGracefulShutdown + Send + Sync {
243 async fn initialize(&self) -> Result<(), PluginError>;
244 fn labels(&self) -> Vec<PluginLabel> {
246 Vec::new()
247 }
248 async fn process_batch(&self, data: RecordBatch) -> Result<(), PluginError>;
249 async fn process_checkpoint_marker(&self, epoch: CheckpointEpoch) -> Result<(), PluginError>;
252 async fn process_checkpoint_finalizer(&self, epoch: CheckpointEpoch)
253 -> Result<(), PluginError>;
254}
255
256pub trait SideOutputPlugin: Send + Sync {
261 fn process_batch(&self, batch: &RecordBatch) -> Result<(), String>;
262 fn shutdown(&self);
263}