use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tracing::info;
use crate::common::message::SharedMessage;
use crate::error::Result;
use crate::sink::Sink;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum OutputFormat {
#[default]
Pretty,
Json,
Text,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ConsoleSinkConfig {
#[serde(default)]
pub format: OutputFormat,
}
pub struct ConsoleSink {
id: String,
config: ConsoleSinkConfig,
}
impl ConsoleSink {
pub fn new(id: impl Into<String>, config: ConsoleSinkConfig) -> Self {
let id = id.into();
info!(sink_id = %id, format = ?config.format, "Console sink created");
Self { id, config }
}
}
#[async_trait]
impl Sink for ConsoleSink {
fn id(&self) -> &str {
&self.id
}
async fn process(&self, msg: SharedMessage) -> Result<()> {
let output = match self.config.format {
OutputFormat::Pretty => {
serde_json::to_string_pretty(msg.as_ref()).unwrap_or_else(|_| format!("{:?}", msg))
}
OutputFormat::Json => {
serde_json::to_string(msg.as_ref()).unwrap_or_else(|_| format!("{:?}", msg))
}
OutputFormat::Text => {
match &msg.payload {
serde_json::Value::String(s) => s.clone(),
other => format!("{:?}", other),
}
}
};
println!("{}", output);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_default() {
let cfg = ConsoleSinkConfig::default();
assert_eq!(cfg.format, OutputFormat::Pretty);
}
#[test]
fn test_new() {
let sink = ConsoleSink::new("test_sink", ConsoleSinkConfig::default());
assert_eq!(sink.id(), "test_sink");
}
#[test]
fn test_config_deserialize_default() {
let yaml = "{}";
let cfg: ConsoleSinkConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(cfg.format, OutputFormat::Pretty);
}
#[test]
fn test_config_deserialize_json_format() {
let yaml = "format: json";
let cfg: ConsoleSinkConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(cfg.format, OutputFormat::Json);
}
#[test]
fn test_config_deserialize_pretty_format() {
let yaml = "format: pretty";
let cfg: ConsoleSinkConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(cfg.format, OutputFormat::Pretty);
}
#[test]
fn test_config_deserialize_text_format() {
let yaml = "format: text";
let cfg: ConsoleSinkConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(cfg.format, OutputFormat::Text);
}
#[tokio::test]
async fn test_process_message() {
use crate::common::message::Message;
use std::sync::Arc;
let sink = ConsoleSink::new("test", ConsoleSinkConfig::default());
let msg = Arc::new(Message::new("source", serde_json::json!({"key": "value"})));
let result = sink.process(msg).await;
assert!(result.is_ok());
}
}