arkflow_core/processor/
mod.rs1use async_trait::async_trait;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::{Arc, RwLock};
9
10use crate::{Error, MessageBatch};
11
12mod udf;
13
14lazy_static::lazy_static! {
15 static ref PROCESSOR_BUILDERS: RwLock<HashMap<String, Arc<dyn ProcessorBuilder>>> = RwLock::new(HashMap::new());
16}
17
18#[async_trait]
20pub trait Processor: Send + Sync {
21 async fn process(&self, batch: MessageBatch) -> Result<Vec<MessageBatch>, Error>;
23
24 async fn close(&self) -> Result<(), Error>;
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct ProcessorConfig {
31 #[serde(rename = "type")]
32 pub processor_type: String,
33 #[serde(flatten)]
34 pub config: Option<serde_json::Value>,
35}
36
37impl ProcessorConfig {
38 pub fn build(&self) -> Result<Arc<dyn Processor>, Error> {
40 let builders = PROCESSOR_BUILDERS.read().unwrap();
41
42 if let Some(builder) = builders.get(&self.processor_type) {
43 builder.build(&self.config)
44 } else {
45 Err(Error::Config(format!(
46 "Unknown processor type: {}",
47 self.processor_type
48 )))
49 }
50 }
51}
52
53pub trait ProcessorBuilder: Send + Sync {
54 fn build(&self, config: &Option<serde_json::Value>) -> Result<Arc<dyn Processor>, Error>;
55}
56
57pub fn register_processor_builder(type_name: &str, builder: Arc<dyn ProcessorBuilder>) {
58 let mut builders = PROCESSOR_BUILDERS.write().unwrap();
59 if builders.contains_key(type_name) {
60 panic!("Processor type already registered: {}", type_name);
61 }
62 builders.insert(type_name.to_string(), builder);
63}
64
65pub fn get_registered_processor_types() -> Vec<String> {
66 let builders = PROCESSOR_BUILDERS.read().unwrap();
67 builders.keys().cloned().collect()
68}