arkflow_core/input/
mod.rs

1//! Input component module
2//!
3//! The input component is responsible for receiving data from various sources such as message queues, file systems, HTTP endpoints, and so on.
4
5use crate::{Error, MessageBatch};
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::{Arc, RwLock};
10
11lazy_static::lazy_static! {
12    static ref INPUT_BUILDERS: RwLock<HashMap<String, Arc<dyn InputBuilder>>> = RwLock::new(HashMap::new());
13}
14
15pub trait InputBuilder: Send + Sync {
16    fn build(&self, config: &Option<serde_json::Value>) -> Result<Arc<dyn Input>, Error>;
17}
18
19#[async_trait]
20pub trait Ack: Send + Sync {
21    async fn ack(&self);
22}
23
24#[async_trait]
25pub trait Input: Send + Sync {
26    /// Connect to the input source
27    async fn connect(&self) -> Result<(), Error>;
28
29    /// Read the message from the input source
30    async fn read(&self) -> Result<(MessageBatch, Arc<dyn Ack>), Error>;
31
32    /// Close the input source connection
33    async fn close(&self) -> Result<(), Error>;
34}
35
36pub struct NoopAck;
37
38#[async_trait]
39impl Ack for NoopAck {
40    async fn ack(&self) {}
41}
42
43/// Input configuration
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct InputConfig {
46    #[serde(rename = "type")]
47    pub input_type: String,
48    #[serde(flatten)]
49    pub config: Option<serde_json::Value>,
50}
51
52impl InputConfig {
53    /// Building input components
54    pub fn build(&self) -> Result<Arc<dyn Input>, Error> {
55        let builders = INPUT_BUILDERS.read().unwrap();
56
57        if let Some(builder) = builders.get(&self.input_type) {
58            builder.build(&self.config)
59        } else {
60            Err(Error::Config(format!(
61                "Unknown input type: {}",
62                self.input_type
63            )))
64        }
65    }
66}
67
68pub fn register_input_builder(type_name: &str, builder: Arc<dyn InputBuilder>) {
69    let mut builders = INPUT_BUILDERS.write().unwrap();
70    if builders.contains_key(type_name) {
71        panic!("Input type already registered: {}", type_name)
72    }
73    builders.insert(type_name.to_string(), builder);
74}
75
76pub fn get_registered_input_types() -> Vec<String> {
77    let builders = INPUT_BUILDERS.read().unwrap();
78    builders.keys().cloned().collect()
79}