use std::path::Path;
use serde::Serialize;
use stillwater::effect::sink::{emit, SinkEffect};
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
use crate::errors::AnalysisError;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReportLine {
JsonLine(String),
TextLine(String),
Separator,
Header(String),
}
impl ReportLine {
pub fn json_line<T: Serialize>(value: &T) -> Self {
match serde_json::to_string(value) {
Ok(json) => Self::JsonLine(json),
Err(e) => Self::TextLine(format!("{{\"error\": \"{}\"}}", e)),
}
}
pub fn text(content: impl Into<String>) -> Self {
Self::TextLine(content.into())
}
pub fn header(title: impl Into<String>) -> Self {
Self::Header(title.into())
}
pub fn separator() -> Self {
Self::Separator
}
pub fn to_output_string(&self) -> String {
match self {
Self::JsonLine(json) => format!("{}\n", json),
Self::TextLine(text) => format!("{}\n", text),
Self::Separator => "---\n".to_string(),
Self::Header(h) => format!("\n## {}\n\n", h),
}
}
}
#[derive(Debug, Clone)]
pub struct SinkConfig {
pub format: ReportFormat,
pub destination: SinkDestination,
pub buffer_size: usize,
}
impl Default for SinkConfig {
fn default() -> Self {
Self {
format: ReportFormat::JsonLines,
destination: SinkDestination::Stdout,
buffer_size: 8192,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReportFormat {
JsonLines,
Text,
Markdown,
}
#[derive(Debug, Clone)]
pub enum SinkDestination {
Stdout,
File(std::path::PathBuf),
}
pub fn emit_report_line<E, Env>(
line: ReportLine,
) -> impl SinkEffect<Output = (), Error = E, Env = Env, Item = ReportLine>
where
E: Send + 'static,
Env: Clone + Send + Sync + 'static,
{
emit(line)
}
pub fn emit_report_lines<E, Env>(
lines: Vec<ReportLine>,
) -> impl SinkEffect<Output = (), Error = E, Env = Env, Item = ReportLine>
where
E: Send + 'static,
Env: Clone + Send + Sync + 'static,
{
stillwater::effect::sink::emit_many(lines)
}
pub fn emit_json_line<T, E, Env>(
value: &T,
) -> impl SinkEffect<Output = (), Error = E, Env = Env, Item = ReportLine>
where
T: Serialize,
E: Send + 'static,
Env: Clone + Send + Sync + 'static,
{
emit_report_line(ReportLine::json_line(value))
}
pub fn emit_text_line<E, Env>(
text: impl Into<String>,
) -> impl SinkEffect<Output = (), Error = E, Env = Env, Item = ReportLine>
where
E: Send + 'static,
Env: Clone + Send + Sync + 'static,
{
emit_report_line(ReportLine::text(text))
}
pub fn emit_header<E, Env>(
title: impl Into<String>,
) -> impl SinkEffect<Output = (), Error = E, Env = Env, Item = ReportLine>
where
E: Send + 'static,
Env: Clone + Send + Sync + 'static,
{
emit_report_line(ReportLine::header(title))
}
pub fn emit_separator<E, Env>(
) -> impl SinkEffect<Output = (), Error = E, Env = Env, Item = ReportLine>
where
E: Send + 'static,
Env: Clone + Send + Sync + 'static,
{
emit_report_line(ReportLine::separator())
}
pub async fn run_with_file_sink<T, Env, E>(
effect: E,
output_path: &Path,
env: &Env,
) -> Result<T, AnalysisError>
where
T: Send,
Env: Clone + Send + Sync + 'static,
E: SinkEffect<Output = T, Error = AnalysisError, Env = Env, Item = ReportLine>,
{
let file = File::create(output_path)
.await
.map_err(|e| AnalysisError::io(format!("Failed to create output file: {}", e)))?;
let writer = std::sync::Arc::new(tokio::sync::Mutex::new(BufWriter::new(file)));
let result = effect
.run_with_sink(env, |line| {
let writer = writer.clone();
async move {
let text = line.to_output_string();
let mut w = writer.lock().await;
let _ = w.write_all(text.as_bytes()).await;
}
})
.await?;
let mut w = writer.lock().await;
w.flush()
.await
.map_err(|e| AnalysisError::io(format!("Failed to flush output file: {}", e)))?;
Ok(result)
}
pub async fn run_with_stdout_sink<T, Env, E>(effect: E, env: &Env) -> Result<T, AnalysisError>
where
T: Send,
Env: Clone + Send + Sync + 'static,
E: SinkEffect<Output = T, Error = AnalysisError, Env = Env, Item = ReportLine>,
{
let writer = std::sync::Arc::new(tokio::sync::Mutex::new(tokio::io::stdout()));
let result = effect
.run_with_sink(env, |line| {
let writer = writer.clone();
async move {
let text = line.to_output_string();
let mut w = writer.lock().await;
let _ = w.write_all(text.as_bytes()).await;
let _ = w.flush().await;
}
})
.await?;
Ok(result)
}
pub async fn run_with_sink<T, Env, E>(
effect: E,
config: &SinkConfig,
env: &Env,
) -> Result<T, AnalysisError>
where
T: Send,
Env: Clone + Send + Sync + 'static,
E: SinkEffect<Output = T, Error = AnalysisError, Env = Env, Item = ReportLine>,
{
match &config.destination {
SinkDestination::Stdout => run_with_stdout_sink(effect, env).await,
SinkDestination::File(path) => run_with_file_sink(effect, path, env).await,
}
}
#[cfg(test)]
mod tests {
use super::*;
use stillwater::effect::sink::SinkEffectExt;
#[test]
fn report_line_json_line_serializes() {
let data = serde_json::json!({"key": "value", "count": 42});
let line = ReportLine::json_line(&data);
match line {
ReportLine::JsonLine(json) => {
assert!(json.contains("key"));
assert!(json.contains("value"));
assert!(json.contains("42"));
}
_ => panic!("Expected JsonLine"),
}
}
#[test]
fn report_line_text_creates_text_line() {
let line = ReportLine::text("Hello, world!");
assert_eq!(line, ReportLine::TextLine("Hello, world!".to_string()));
}
#[test]
fn report_line_header_creates_header() {
let line = ReportLine::header("Analysis Results");
assert_eq!(line, ReportLine::Header("Analysis Results".to_string()));
}
#[test]
fn report_line_separator_creates_separator() {
let line = ReportLine::separator();
assert_eq!(line, ReportLine::Separator);
}
#[test]
fn report_line_to_output_string_json() {
let line = ReportLine::JsonLine(r#"{"key":"value"}"#.to_string());
assert_eq!(line.to_output_string(), "{\"key\":\"value\"}\n");
}
#[test]
fn report_line_to_output_string_text() {
let line = ReportLine::TextLine("Hello".to_string());
assert_eq!(line.to_output_string(), "Hello\n");
}
#[test]
fn report_line_to_output_string_separator() {
let line = ReportLine::Separator;
assert_eq!(line.to_output_string(), "---\n");
}
#[test]
fn report_line_to_output_string_header() {
let line = ReportLine::Header("Title".to_string());
assert_eq!(line.to_output_string(), "\n## Title\n\n");
}
#[test]
fn sink_config_default() {
let config = SinkConfig::default();
assert_eq!(config.format, ReportFormat::JsonLines);
assert!(matches!(config.destination, SinkDestination::Stdout));
assert_eq!(config.buffer_size, 8192);
}
#[tokio::test]
async fn emit_report_line_collects_single_line() {
let effect = emit_report_line::<AnalysisError, ()>(ReportLine::text("test line"));
let (result, lines) = effect.run_collecting(&()).await;
assert!(result.is_ok());
assert_eq!(lines.len(), 1);
assert_eq!(lines[0], ReportLine::TextLine("test line".to_string()));
}
#[tokio::test]
async fn emit_report_lines_collects_multiple() {
let lines_to_emit = vec![
ReportLine::header("Section"),
ReportLine::text("Line 1"),
ReportLine::text("Line 2"),
ReportLine::separator(),
];
let effect = emit_report_lines::<AnalysisError, ()>(lines_to_emit.clone());
let (result, collected) = effect.run_collecting(&()).await;
assert!(result.is_ok());
assert_eq!(collected.len(), 4);
assert_eq!(collected[0], ReportLine::Header("Section".to_string()));
}
#[tokio::test]
async fn chained_emissions_collect_in_order() {
let effect = emit_header::<AnalysisError, ()>("Start")
.and_then(|_| emit_text_line("Middle"))
.and_then(|_| emit_separator())
.and_then(|_| emit_text_line("End"));
let (result, lines) = effect.run_collecting(&()).await;
assert!(result.is_ok());
assert_eq!(lines.len(), 4);
assert_eq!(lines[0], ReportLine::Header("Start".to_string()));
assert_eq!(lines[1], ReportLine::TextLine("Middle".to_string()));
assert_eq!(lines[2], ReportLine::Separator);
assert_eq!(lines[3], ReportLine::TextLine("End".to_string()));
}
#[tokio::test]
async fn sink_effect_with_computation_result() {
let effect = emit_text_line::<AnalysisError, ()>("Starting")
.and_then(|_| emit_text_line("Processing"))
.map(|_| 42);
let (result, lines) = effect.run_collecting(&()).await;
assert_eq!(result, Ok(42));
assert_eq!(lines.len(), 2);
}
#[tokio::test]
async fn run_with_file_sink_writes_to_file() {
let temp_dir = tempfile::tempdir().unwrap();
let output_path = temp_dir.path().join("test_output.txt");
let effect = emit_header::<AnalysisError, ()>("Test Report")
.and_then(|_| emit_text_line("Line 1"))
.and_then(|_| emit_separator())
.and_then(|_| emit_text_line("Line 2"));
let result = run_with_file_sink(effect, &output_path, &()).await;
assert!(result.is_ok());
let contents = std::fs::read_to_string(&output_path).unwrap();
assert!(contents.contains("## Test Report"));
assert!(contents.contains("Line 1"));
assert!(contents.contains("---"));
assert!(contents.contains("Line 2"));
}
#[tokio::test]
async fn json_line_serialization_handles_complex_types() {
#[derive(Serialize)]
struct TestData {
name: String,
count: u32,
nested: NestedData,
}
#[derive(Serialize)]
struct NestedData {
value: f64,
}
let data = TestData {
name: "test".to_string(),
count: 42,
nested: NestedData { value: 99.5 },
};
let line = ReportLine::json_line(&data);
if let ReportLine::JsonLine(json) = line {
assert!(json.contains("test"));
assert!(json.contains("42"));
assert!(json.contains("99.5"));
} else {
panic!("Expected JsonLine");
}
}
#[test]
fn json_line_handles_serialization_error() {
let data = f64::NAN;
let line = ReportLine::json_line(&data);
if let ReportLine::TextLine(text) = line {
assert!(text.contains("error"));
} else if let ReportLine::JsonLine(json) = line {
assert!(!json.is_empty());
}
}
}