#![allow(dead_code)]
use crate::{PluginLabel, PluginStateBackendConfig};
use abi_stable::traits::IntoReprRust;
use arrow::array::RecordBatch;
use arrow::datatypes::SchemaRef;
use arrow::error::ArrowError;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::fmt::{self, Debug};
use std::io;
use std::sync::{Arc, RwLock};
use streamling_config::StateBackendConfig;
use streamling_state::{
StateBackendError, StateBackendFactories, StateKey, StateOperatorBackend,
StateOperatorBackendFactory,
};
pub static STREAMLING_COLUMN_NAME_OP: &str = "_gs_op";
pub struct PluginStateBackendFactory {
factories: StateBackendFactories,
application_namespace: String,
plugin_reference_name: String,
}
impl PluginStateBackendFactory {
pub fn new(config: PluginStateBackendConfig) -> Self {
let state_backend_config: StateBackendConfig =
serde_json::from_str(&config.serialized_config)
.expect("Failed to deserialize StateBackendConfig");
let factories = StateBackendFactories::new(state_backend_config)
.expect("Failed to create State Backend Factory");
PluginStateBackendFactory {
factories,
application_namespace: config.application_namespace.into_rust(),
plugin_reference_name: config.plugin_reference_name.into_rust(),
}
}
pub fn create<V>(&self) -> Arc<PluginStateBackend<V>>
where
V: Serialize + for<'de> Deserialize<'de> + Send + Sync + Unpin + Clone + Debug + 'static,
{
let inner = self.factories.create(&self.application_namespace);
Arc::new(PluginStateBackend::new(
inner,
self.plugin_reference_name.clone(),
))
}
}
pub struct PluginStateBackend<V>
where
V: Serialize + for<'de> Deserialize<'de> + Send + Sync + Clone + Debug + 'static,
{
inner: Arc<dyn StateOperatorBackend<V>>,
reference_name: String,
kv_prefix: RwLock<Option<String>>,
}
impl<V> PluginStateBackend<V>
where
V: Serialize + for<'de> Deserialize<'de> + Send + Sync + Clone + Debug + 'static,
{
fn new(inner: Arc<dyn StateOperatorBackend<V>>, reference_name: String) -> Self {
Self {
inner,
reference_name,
kv_prefix: RwLock::new(None),
}
}
fn default_key(&self) -> StateKey {
StateKey(self.reference_name.clone())
}
fn build_kv_key(&self, key: &str) -> StateKey {
let prefix = self.kv_prefix.read().unwrap();
match prefix.as_ref() {
None => StateKey(format!("{}:{}", self.reference_name, key)),
Some(p) if p.is_empty() => StateKey(key.to_string()),
Some(p) => StateKey(format!("{}:{}", p, key)),
}
}
pub fn set_prefix(&self, prefix: Option<&str>) {
let mut p = self.kv_prefix.write().unwrap();
*p = prefix.map(|s| s.to_string());
}
pub async fn get(&self) -> Result<Option<V>, StateBackendError> {
self.inner.get(self.default_key()).await
}
pub async fn put(&self, value: V) -> Result<(), StateBackendError> {
self.inner.put(self.default_key(), value).await
}
pub async fn remove(&self) -> Result<(), StateBackendError> {
self.inner.remove(self.default_key()).await
}
pub async fn get_kv(&self, key: &str) -> Result<Option<V>, StateBackendError> {
self.inner.get(self.build_kv_key(key)).await
}
pub async fn put_kv(&self, key: &str, value: V) -> Result<(), StateBackendError> {
self.inner.put(self.build_kv_key(key), value).await
}
pub async fn remove_kv(&self, key: &str) -> Result<(), StateBackendError> {
self.inner.remove(self.build_kv_key(key)).await
}
pub async fn clear(&self) -> Result<(), StateBackendError> {
self.inner.remove(self.default_key()).await
}
}
impl<V> Debug for PluginStateBackend<V>
where
V: Serialize + for<'de> Deserialize<'de> + Send + Sync + Clone + Debug + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let prefix = self.kv_prefix.read().unwrap();
f.debug_struct("PluginStateBackend")
.field("reference_name", &self.reference_name)
.field("kv_prefix", &prefix)
.finish()
}
}
#[derive(Debug)]
pub enum PluginError {
ArrowError(ArrowError),
IoError(io::Error),
Internal(String),
Execution(String),
State(StateBackendError),
}
impl fmt::Display for PluginError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::ArrowError(e) => write!(f, "{e}"),
Self::IoError(e) => write!(f, "{e}"),
Self::Internal(msg) => f.write_str(msg),
Self::Execution(msg) => f.write_str(msg),
Self::State(e) => write!(f, "{e}"),
}
}
}
impl std::error::Error for PluginError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::ArrowError(e) => Some(e),
Self::IoError(e) => Some(e),
Self::State(e) => Some(e),
Self::Internal(_) | Self::Execution(_) => None,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
pub struct CheckpointEpoch(pub u64);
#[async_trait]
pub trait SupportsGracefulShutdown {
fn is_running(&self) -> bool;
async fn terminate(&self) -> Result<(), PluginError>;
}
#[async_trait]
pub trait PreprocessorPlugin: Send + Sync {
async fn preprocess_topology(&self, config: String) -> Result<String, PluginError>;
}
#[async_trait]
pub trait SourcePlugin: SupportsGracefulShutdown + Send + Sync {
async fn initialize(&self) -> Result<(), PluginError>;
fn output_schema(&self) -> Result<SchemaRef, PluginError>;
fn labels(&self) -> Vec<PluginLabel> {
Vec::new()
}
async fn generate_batch(&self) -> Result<RecordBatch, PluginError>;
async fn process_checkpoint_marker(&self, epoch: CheckpointEpoch) -> Result<(), PluginError>;
async fn process_checkpoint_finalizer(&self, epoch: CheckpointEpoch)
-> Result<(), PluginError>;
}
#[async_trait]
pub trait TransformPlugin: SupportsGracefulShutdown + Send + Sync {
async fn initialize(&self) -> Result<(), PluginError>;
fn output_schema(&self) -> Result<SchemaRef, PluginError>;
fn labels(&self) -> Vec<PluginLabel> {
Vec::new()
}
async fn process_batch(&self, data: RecordBatch) -> Result<RecordBatch, PluginError>;
async fn process_checkpoint_marker(&self, epoch: CheckpointEpoch) -> Result<(), PluginError>;
async fn process_checkpoint_finalizer(&self, epoch: CheckpointEpoch)
-> Result<(), PluginError>;
}
#[async_trait]
pub trait SinkPlugin: SupportsGracefulShutdown + Send + Sync {
async fn initialize(&self) -> Result<(), PluginError>;
fn labels(&self) -> Vec<PluginLabel> {
Vec::new()
}
async fn process_batch(&self, data: RecordBatch) -> Result<(), PluginError>;
async fn process_checkpoint_marker(&self, epoch: CheckpointEpoch) -> Result<(), PluginError>;
async fn process_checkpoint_finalizer(&self, epoch: CheckpointEpoch)
-> Result<(), PluginError>;
}
pub trait SideOutputPlugin: Send + Sync {
fn process_batch(&self, batch: &RecordBatch) -> Result<(), String>;
fn shutdown(&self);
}