arkflow_core/output/
mod.rs

1//! Output component module
2//!
3//! The output component is responsible for sending the processed data to the target system.
4
5use async_trait::async_trait;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::{Arc, RwLock};
9
10use crate::{Error, MessageBatch};
11
12lazy_static::lazy_static! {
13    static ref OUTPUT_BUILDERS: RwLock<HashMap<String, Arc<dyn OutputBuilder>>> = RwLock::new(HashMap::new());
14}
15/// Feature interface of the output component
16#[async_trait]
17pub trait Output: Send + Sync {
18    /// Connect to the output destination
19    async fn connect(&self) -> Result<(), Error>;
20
21    /// Write a message to the output destination
22    async fn write(&self, msg: &MessageBatch) -> Result<(), Error>;
23
24    /// Close the output destination connection
25    async fn close(&self) -> Result<(), Error>;
26}
27
28/// Output configuration
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct OutputConfig {
31    #[serde(rename = "type")]
32    pub output_type: String,
33    #[serde(flatten)]
34    pub config: Option<serde_json::Value>,
35}
36
37impl OutputConfig {
38    /// Build the output component according to the configuration
39    pub fn build(&self) -> Result<Arc<dyn Output>, Error> {
40        let builders = OUTPUT_BUILDERS.read().unwrap();
41
42        if let Some(builder) = builders.get(&self.output_type) {
43            builder.build(&self.config)
44        } else {
45            Err(Error::Config(format!(
46                "Unknown output type: {}",
47                self.output_type
48            )))
49        }
50    }
51}
52
53pub trait OutputBuilder: Send + Sync {
54    fn build(&self, config: &Option<serde_json::Value>) -> Result<Arc<dyn Output>, Error>;
55}
56
57pub fn register_output_builder(type_name: &str, builder: Arc<dyn OutputBuilder>) {
58    let mut builders = OUTPUT_BUILDERS.write().unwrap();
59    if builders.contains_key(type_name) {
60        panic!("Output type already registered: {}", type_name);
61    }
62    builders.insert(type_name.to_string(), builder);
63}
64
65pub fn get_registered_output_types() -> Vec<String> {
66    let builders = OUTPUT_BUILDERS.read().unwrap();
67    builders.keys().cloned().collect()
68}