pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! Console sink for printing messages to stdout
//!
//! Formats and prints pipeline messages to the console for debugging and monitoring.
//!
//! # Output Formats
//!
//! - **pretty**: Pretty-printed JSON with indentation (full message with metadata)
//! - **json**: Compact single-line JSON (full message with metadata)
//! - **text**: Plain text output (payload only, no metadata, no JSON formatting)

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tracing::info;

use crate::common::message::SharedMessage;
use crate::error::Result;
use crate::sink::Sink;

/// Output format for the console sink
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum OutputFormat {
    /// Pretty-printed JSON with indentation (full message with metadata)
    #[default]
    Pretty,
    /// Compact single-line JSON (full message with metadata)
    Json,
    /// Plain text format (payload only, no metadata, no JSON formatting)
    Text,
}

/// Console sink configuration
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ConsoleSinkConfig {
    /// Output format
    #[serde(default)]
    pub format: OutputFormat,
}

/// Console sink that prints messages to stdout
pub struct ConsoleSink {
    id: String,
    config: ConsoleSinkConfig,
}

impl ConsoleSink {
    /// Create a new console sink
    pub fn new(id: impl Into<String>, config: ConsoleSinkConfig) -> Self {
        let id = id.into();
        info!(sink_id = %id, format = ?config.format, "Console sink created");
        Self { id, config }
    }
}

#[async_trait]
impl Sink for ConsoleSink {
    fn id(&self) -> &str {
        &self.id
    }

    async fn process(&self, msg: SharedMessage) -> Result<()> {
        let output = match self.config.format {
            OutputFormat::Pretty => {
                serde_json::to_string_pretty(msg.as_ref()).unwrap_or_else(|_| format!("{:?}", msg))
            }
            OutputFormat::Json => {
                serde_json::to_string(msg.as_ref()).unwrap_or_else(|_| format!("{:?}", msg))
            }
            OutputFormat::Text => {
                // Output payload only as text (no JSON formatting)
                match &msg.payload {
                    serde_json::Value::String(s) => s.clone(),
                    other => format!("{:?}", other),
                }
            }
        };

        println!("{}", output);
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_config_default() {
        let cfg = ConsoleSinkConfig::default();
        assert_eq!(cfg.format, OutputFormat::Pretty);
    }

    #[test]
    fn test_new() {
        let sink = ConsoleSink::new("test_sink", ConsoleSinkConfig::default());
        assert_eq!(sink.id(), "test_sink");
    }

    #[test]
    fn test_config_deserialize_default() {
        let yaml = "{}";
        let cfg: ConsoleSinkConfig = serde_yaml::from_str(yaml).unwrap();
        assert_eq!(cfg.format, OutputFormat::Pretty);
    }

    #[test]
    fn test_config_deserialize_json_format() {
        let yaml = "format: json";
        let cfg: ConsoleSinkConfig = serde_yaml::from_str(yaml).unwrap();
        assert_eq!(cfg.format, OutputFormat::Json);
    }

    #[test]
    fn test_config_deserialize_pretty_format() {
        let yaml = "format: pretty";
        let cfg: ConsoleSinkConfig = serde_yaml::from_str(yaml).unwrap();
        assert_eq!(cfg.format, OutputFormat::Pretty);
    }

    #[test]
    fn test_config_deserialize_text_format() {
        let yaml = "format: text";
        let cfg: ConsoleSinkConfig = serde_yaml::from_str(yaml).unwrap();
        assert_eq!(cfg.format, OutputFormat::Text);
    }

    #[tokio::test]
    async fn test_process_message() {
        use crate::common::message::Message;
        use std::sync::Arc;

        let sink = ConsoleSink::new("test", ConsoleSinkConfig::default());
        let msg = Arc::new(Message::new("source", serde_json::json!({"key": "value"})));

        // Should not error
        let result = sink.process(msg).await;
        assert!(result.is_ok());
    }
}