arkflow_core/processor/
mod.rs

1//! Processor component module
2//!
3//! The processor component is responsible for transforming, filtering, enriching, and so on.
4
5use async_trait::async_trait;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::{Arc, RwLock};
9
10use crate::{Error, MessageBatch};
11
12mod udf;
13
14lazy_static::lazy_static! {
15    static ref PROCESSOR_BUILDERS: RwLock<HashMap<String, Arc<dyn ProcessorBuilder>>> = RwLock::new(HashMap::new());
16}
17
18/// Characteristic interface of the processor component
19#[async_trait]
20pub trait Processor: Send + Sync {
21    /// Process messages
22    async fn process(&self, batch: MessageBatch) -> Result<Vec<MessageBatch>, Error>;
23
24    /// Turn off the processor
25    async fn close(&self) -> Result<(), Error>;
26}
27
28/// Processor configuration
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct ProcessorConfig {
31    #[serde(rename = "type")]
32    pub processor_type: String,
33    #[serde(flatten)]
34    pub config: Option<serde_json::Value>,
35}
36
37impl ProcessorConfig {
38    /// Build the processor components according to the configuration
39    pub fn build(&self) -> Result<Arc<dyn Processor>, Error> {
40        let builders = PROCESSOR_BUILDERS.read().unwrap();
41
42        if let Some(builder) = builders.get(&self.processor_type) {
43            builder.build(&self.config)
44        } else {
45            Err(Error::Config(format!(
46                "Unknown processor type: {}",
47                self.processor_type
48            )))
49        }
50    }
51}
52
53pub trait ProcessorBuilder: Send + Sync {
54    fn build(&self, config: &Option<serde_json::Value>) -> Result<Arc<dyn Processor>, Error>;
55}
56
57pub fn register_processor_builder(type_name: &str, builder: Arc<dyn ProcessorBuilder>) {
58    let mut builders = PROCESSOR_BUILDERS.write().unwrap();
59    if builders.contains_key(type_name) {
60        panic!("Processor type already registered: {}", type_name);
61    }
62    builders.insert(type_name.to_string(), builder);
63}
64
65pub fn get_registered_processor_types() -> Vec<String> {
66    let builders = PROCESSOR_BUILDERS.read().unwrap();
67    builders.keys().cloned().collect()
68}