1use datafusion::arrow::record_batch::RecordBatch;
4use serde::Serialize;
5use thiserror::Error;
6
7pub mod cli;
8pub mod config;
9pub mod engine;
10pub mod input;
11pub mod output;
12pub mod pipeline;
13pub mod processor;
14pub mod stream;
15
16#[derive(Error, Debug)]
18pub enum Error {
19 #[error("IO error: {0}")]
20 Io(#[from] std::io::Error),
21
22 #[error("Serialization error: {0}")]
23 Serialization(#[from] serde_json::Error),
24
25 #[error("Configuration error: {0}")]
26 Config(String),
27
28 #[error("Read error: {0}")]
29 Read(String),
30
31 #[error("Process errors: {0}")]
32 Process(String),
33
34 #[error("Connection error: {0}")]
35 Connection(String),
36
37 #[error("Connection lost")]
39 Disconnection,
40
41 #[error("Timeout error")]
42 Timeout,
43
44 #[error("Unknown error: {0}")]
45 Unknown(String),
46
47 #[error("EOF")]
48 EOF,
49}
50
51pub type Bytes = Vec<u8>;
52
53#[derive(Clone, Debug)]
56pub struct MessageBatch {
57 pub content: Content,
59}
60
61#[derive(Clone, Debug)]
62pub enum Content {
63 Arrow(RecordBatch),
64 Binary(Vec<Bytes>),
65}
66
67impl MessageBatch {
68 pub fn new_binary(content: Vec<Bytes>) -> Self {
69 Self {
70 content: Content::Binary(content),
71 }
72 }
73 pub fn from_json<T: Serialize>(value: &T) -> Result<Self, Error> {
74 let content = serde_json::to_vec(value)?;
75 Ok(Self::new_binary(vec![content]))
76 }
77 pub fn new_arrow(content: RecordBatch) -> Self {
78 Self {
79 content: Content::Arrow(content),
80 }
81 }
82
83 pub fn from_string(content: &str) -> Self {
85 Self::new_binary(vec![content.as_bytes().to_vec()])
86 }
87
88 pub fn as_string(&self) -> Result<Vec<String>, Error> {
90 match &self.content {
91 Content::Arrow(_) => Err(Error::Process("无法解析为JSON".to_string())),
92 Content::Binary(v) => {
93 let x: Result<Vec<String>, Error> = v
94 .iter()
95 .map(|v| {
96 String::from_utf8(v.clone())
97 .map_err(|_| Error::Process("无法解析为字符串".to_string()))
98 })
99 .collect();
100 Ok(x?)
101 }
102 }
103 }
104 pub fn as_binary(&self) -> &Vec<Bytes> {
106 match &self.content {
107 Content::Arrow(_) => panic!("Cannot get binary content from Arrow message"),
108 Content::Binary(v) => v,
109 }
110 }
111
112 pub fn is_empty(&self) -> bool {
113 self.len() == 0
114 }
115 pub fn len(&self) -> usize {
116 match &self.content {
117 Content::Arrow(v) => v.num_rows(),
118 Content::Binary(v) => v.len(),
119 }
120 }
121}