use datafusion::arrow::record_batch::RecordBatch;
use serde::{Deserialize, Serialize};
use thiserror::Error;
pub mod config;
pub mod input;
pub mod metrics;
pub mod output;
pub mod pipeline;
pub mod processor;
pub mod stream;
#[derive(Error, Debug)]
pub enum Error {
#[error("IO错误: {0}")]
Io(#[from] std::io::Error),
#[error("序列化错误: {0}")]
Serialization(#[from] serde_json::Error),
#[error("配置错误: {0}")]
Config(String),
#[error("读取错误: {0}")]
Reading(String),
#[error("处理错误: {0}")]
Processing(String),
#[error("连接错误: {0}")]
Connection(String),
#[error("连接断开")]
Disconnection,
#[error("超时错误")]
Timeout,
#[error("未知错误: {0}")]
Unknown(String),
#[error("完成")]
Done,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct Metadata {
fields: std::collections::HashMap<String, String>,
}
impl Metadata {
pub fn new() -> Self {
Self {
fields: std::collections::HashMap::new(),
}
}
pub fn set(&mut self, key: &str, value: &str) {
self.fields.insert(key.to_string(), value.to_string());
}
pub fn get(&self, key: &str) -> Option<&String> {
self.fields.get(key)
}
pub fn remove(&mut self, key: &str) -> Option<String> {
self.fields.remove(key)
}
}
type Bytes = Vec<u8>;
#[derive(Clone, Debug)]
pub struct MessageBatch {
content: Content,
}
#[derive(Clone, Debug)]
pub enum Content {
Arrow(RecordBatch),
Binary(Vec<Bytes>),
}
impl MessageBatch {
pub fn new_binary(content: Vec<Bytes>) -> Self {
Self {
content: Content::Binary(content),
}
}
pub fn from_json<T: Serialize>(value: &T) -> Result<Self, Error> {
let content = serde_json::to_vec(value)?;
Ok(Self::new_binary(vec![content]))
}
pub fn new_arrow(content: RecordBatch) -> Self {
Self {
content: Content::Arrow(content),
}
}
pub fn from_string(content: &str) -> Self {
Self::new_binary(vec![content.as_bytes().to_vec()])
}
pub fn as_string(&self) -> Result<Vec<String>, Error> {
match &self.content {
Content::Arrow(_) => Err(Error::Processing("无法解析为JSON".to_string())),
Content::Binary(v) => {
let x: Result<Vec<String>, Error> = v
.iter()
.map(|v| {
String::from_utf8(v.clone())
.map_err(|_| Error::Processing("无法解析为字符串".to_string()))
})
.collect();
Ok(x?)
}
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn len(&self) -> usize {
match &self.content {
Content::Arrow(v) => v.num_rows(),
Content::Binary(v) => v.len(),
}
}
}