arkflow_core/
lib.rs

1//! Rust stream processing engine
2
3use datafusion::arrow::record_batch::RecordBatch;
4use serde::Serialize;
5use thiserror::Error;
6
7pub mod cli;
8pub mod config;
9pub mod engine;
10pub mod input;
11pub mod output;
12pub mod pipeline;
13pub mod processor;
14pub mod stream;
15
16/// Error in the stream processing engine
17#[derive(Error, Debug)]
18pub enum Error {
19    #[error("IO error: {0}")]
20    Io(#[from] std::io::Error),
21
22    #[error("Serialization error: {0}")]
23    Serialization(#[from] serde_json::Error),
24
25    #[error("Configuration error: {0}")]
26    Config(String),
27
28    #[error("Read error: {0}")]
29    Read(String),
30
31    #[error("Process errors: {0}")]
32    Process(String),
33
34    #[error("Connection error: {0}")]
35    Connection(String),
36
37    /// Reconnection should be attempted after a connection loss.
38    #[error("Connection lost")]
39    Disconnection,
40
41    #[error("Timeout error")]
42    Timeout,
43
44    #[error("Unknown error: {0}")]
45    Unknown(String),
46
47    #[error("EOF")]
48    EOF,
49}
50
51pub type Bytes = Vec<u8>;
52
53/// Represents a message in a stream processing engine.
54
55#[derive(Clone, Debug)]
56pub struct MessageBatch {
57    /// Message content
58    pub content: Content,
59}
60
61#[derive(Clone, Debug)]
62pub enum Content {
63    Arrow(RecordBatch),
64    Binary(Vec<Bytes>),
65}
66
67impl MessageBatch {
68    pub fn new_binary(content: Vec<Bytes>) -> Self {
69        Self {
70            content: Content::Binary(content),
71        }
72    }
73    pub fn from_json<T: Serialize>(value: &T) -> Result<Self, Error> {
74        let content = serde_json::to_vec(value)?;
75        Ok(Self::new_binary(vec![content]))
76    }
77    pub fn new_arrow(content: RecordBatch) -> Self {
78        Self {
79            content: Content::Arrow(content),
80        }
81    }
82
83    /// Create a message from a string.
84    pub fn from_string(content: &str) -> Self {
85        Self::new_binary(vec![content.as_bytes().to_vec()])
86    }
87
88    /// Parse the message content into a string.
89    pub fn as_string(&self) -> Result<Vec<String>, Error> {
90        match &self.content {
91            Content::Arrow(_) => Err(Error::Process("无法解析为JSON".to_string())),
92            Content::Binary(v) => {
93                let x: Result<Vec<String>, Error> = v
94                    .iter()
95                    .map(|v| {
96                        String::from_utf8(v.clone())
97                            .map_err(|_| Error::Process("无法解析为字符串".to_string()))
98                    })
99                    .collect();
100                Ok(x?)
101            }
102        }
103    }
104    /// Get the binary content of the message.
105    pub fn as_binary(&self) -> &Vec<Bytes> {
106        match &self.content {
107            Content::Arrow(_) => panic!("Cannot get binary content from Arrow message"),
108            Content::Binary(v) => v,
109        }
110    }
111
112    pub fn is_empty(&self) -> bool {
113        self.len() == 0
114    }
115    pub fn len(&self) -> usize {
116        match &self.content {
117            Content::Arrow(v) => v.num_rows(),
118            Content::Binary(v) => v.len(),
119        }
120    }
121}