pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
#![cfg(feature = "file")]

use std::io::Write;
use std::time::Duration;

use pipeflow::source::{
    MessageSender, Source,
    file::{FileSource, FileSourceConfig},
};
use tempfile::NamedTempFile;
use tokio::sync::broadcast;

#[tokio::test]
async fn test_file_source_oneshot_reads_all_lines() {
    let mut temp_file = NamedTempFile::new().expect("Failed to create temp file");
    writeln!(temp_file, "line 1").unwrap();
    writeln!(temp_file, "line 2").unwrap();
    writeln!(temp_file, "line 3").unwrap();
    temp_file.flush().unwrap();

    let config = FileSourceConfig {
        path: temp_file.path().to_string_lossy().to_string(),
        mode: "oneshot".to_string(),
        interval: Duration::from_millis(100),
    };

    let source = FileSource::new("file_source", config).expect("Source should build");

    let (sender, mut receiver) = broadcast::channel(16);
    let sender = MessageSender::new(sender, None);
    let (_shutdown_tx, shutdown_rx) = broadcast::channel(1);

    let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });

    // Receive all 3 lines
    let msg1 = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
        .await
        .expect("Timed out waiting for line 1")
        .expect("Failed to receive message");
    let msg2 = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
        .await
        .expect("Timed out waiting for line 2")
        .expect("Failed to receive message");
    let msg3 = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
        .await
        .expect("Timed out waiting for line 3")
        .expect("Failed to receive message");

    // Wait for source to complete
    let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
        .await
        .expect("Timed out waiting for file source to complete")
        .expect("File source task panicked");
    assert!(result.is_ok(), "File source should complete cleanly");

    assert_eq!(msg1.meta.source_node, "file_source");
    assert_eq!(msg1.payload["content"], "line 1");
    assert_eq!(msg1.payload["line_number"], 1);

    assert_eq!(msg2.payload["content"], "line 2");
    assert_eq!(msg2.payload["line_number"], 2);

    assert_eq!(msg3.payload["content"], "line 3");
    assert_eq!(msg3.payload["line_number"], 3);
}

#[tokio::test]
async fn test_file_source_oneshot_empty_file() {
    let temp_file = NamedTempFile::new().expect("Failed to create temp file");

    let config = FileSourceConfig {
        path: temp_file.path().to_string_lossy().to_string(),
        mode: "oneshot".to_string(),
        interval: Duration::from_millis(100),
    };

    let source = FileSource::new("file_source", config).expect("Source should build");

    let (sender, mut receiver) = broadcast::channel(16);
    let sender = MessageSender::new(sender, None);
    let (_shutdown_tx, shutdown_rx) = broadcast::channel(1);

    let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });

    // Source should complete immediately for empty file
    let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
        .await
        .expect("Timed out waiting for file source to complete")
        .expect("File source task panicked");
    assert!(result.is_ok(), "File source should complete cleanly");

    // For empty file, source completes immediately.
    // Either we timeout (source very fast), or we get RecvError (sender dropped).
    // Both indicate no actual lines were received.
    let recv_result = receiver.try_recv();
    assert!(
        recv_result.is_err(),
        "Should not receive any messages from empty file"
    );
}

#[tokio::test]
async fn test_file_source_tail_detects_new_lines() {
    use std::fs::OpenOptions;

    let temp_file = NamedTempFile::new().expect("Failed to create temp file");
    let file_path = temp_file.path().to_path_buf();

    let config = FileSourceConfig {
        path: file_path.to_string_lossy().to_string(),
        mode: "tail".to_string(),
        interval: Duration::from_millis(50),
    };

    let source = FileSource::new("file_source", config).expect("Source should build");

    let (sender, mut receiver) = broadcast::channel(16);
    let sender = MessageSender::new(sender, None);
    let (shutdown_tx, shutdown_rx) = broadcast::channel(1);

    let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });

    // Wait a bit for source to start and seek to end
    tokio::time::sleep(Duration::from_millis(100)).await;

    // Append a new line to the file
    {
        let mut file = OpenOptions::new()
            .append(true)
            .open(&file_path)
            .expect("Failed to open file for appending");
        writeln!(file, "new line appended").unwrap();
    }

    // Should receive the new line
    let msg = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
        .await
        .expect("Timed out waiting for appended line")
        .expect("Failed to receive message");

    assert_eq!(msg.payload["content"], "new line appended");
    assert_eq!(msg.payload["line_number"], 1);

    // Shutdown
    let _ = shutdown_tx.send(());
    let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
        .await
        .expect("Timed out waiting for file source to shut down")
        .expect("File source task panicked");
    assert!(result.is_ok(), "File source should shut down cleanly");
}

#[tokio::test]
async fn test_file_source_tail_graceful_shutdown() {
    let temp_file = NamedTempFile::new().expect("Failed to create temp file");

    let config = FileSourceConfig {
        path: temp_file.path().to_string_lossy().to_string(),
        mode: "tail".to_string(),
        interval: Duration::from_millis(100),
    };

    let source = FileSource::new("file_source", config).expect("Source should build");

    let (sender, _receiver) = broadcast::channel(16);
    let sender = MessageSender::new(sender, None);
    let (shutdown_tx, shutdown_rx) = broadcast::channel(1);

    let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });

    // Wait a bit for source to start
    tokio::time::sleep(Duration::from_millis(100)).await;

    // Shutdown
    let _ = shutdown_tx.send(());

    let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
        .await
        .expect("Timed out waiting for file source to shut down")
        .expect("File source task panicked");
    assert!(result.is_ok(), "File source should shut down cleanly");
}