#![cfg(feature = "file")]
use std::io::Write;
use std::time::Duration;
use pipeflow::source::{
MessageSender, Source,
file::{FileSource, FileSourceConfig},
};
use tempfile::NamedTempFile;
use tokio::sync::broadcast;
#[tokio::test]
async fn test_file_source_oneshot_reads_all_lines() {
let mut temp_file = NamedTempFile::new().expect("Failed to create temp file");
writeln!(temp_file, "line 1").unwrap();
writeln!(temp_file, "line 2").unwrap();
writeln!(temp_file, "line 3").unwrap();
temp_file.flush().unwrap();
let config = FileSourceConfig {
path: temp_file.path().to_string_lossy().to_string(),
mode: "oneshot".to_string(),
interval: Duration::from_millis(100),
};
let source = FileSource::new("file_source", config).expect("Source should build");
let (sender, mut receiver) = broadcast::channel(16);
let sender = MessageSender::new(sender, None);
let (_shutdown_tx, shutdown_rx) = broadcast::channel(1);
let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });
let msg1 = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
.await
.expect("Timed out waiting for line 1")
.expect("Failed to receive message");
let msg2 = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
.await
.expect("Timed out waiting for line 2")
.expect("Failed to receive message");
let msg3 = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
.await
.expect("Timed out waiting for line 3")
.expect("Failed to receive message");
let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
.await
.expect("Timed out waiting for file source to complete")
.expect("File source task panicked");
assert!(result.is_ok(), "File source should complete cleanly");
assert_eq!(msg1.meta.source_node, "file_source");
assert_eq!(msg1.payload["content"], "line 1");
assert_eq!(msg1.payload["line_number"], 1);
assert_eq!(msg2.payload["content"], "line 2");
assert_eq!(msg2.payload["line_number"], 2);
assert_eq!(msg3.payload["content"], "line 3");
assert_eq!(msg3.payload["line_number"], 3);
}
#[tokio::test]
async fn test_file_source_oneshot_empty_file() {
let temp_file = NamedTempFile::new().expect("Failed to create temp file");
let config = FileSourceConfig {
path: temp_file.path().to_string_lossy().to_string(),
mode: "oneshot".to_string(),
interval: Duration::from_millis(100),
};
let source = FileSource::new("file_source", config).expect("Source should build");
let (sender, mut receiver) = broadcast::channel(16);
let sender = MessageSender::new(sender, None);
let (_shutdown_tx, shutdown_rx) = broadcast::channel(1);
let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });
let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
.await
.expect("Timed out waiting for file source to complete")
.expect("File source task panicked");
assert!(result.is_ok(), "File source should complete cleanly");
let recv_result = receiver.try_recv();
assert!(
recv_result.is_err(),
"Should not receive any messages from empty file"
);
}
#[tokio::test]
async fn test_file_source_tail_detects_new_lines() {
use std::fs::OpenOptions;
let temp_file = NamedTempFile::new().expect("Failed to create temp file");
let file_path = temp_file.path().to_path_buf();
let config = FileSourceConfig {
path: file_path.to_string_lossy().to_string(),
mode: "tail".to_string(),
interval: Duration::from_millis(50),
};
let source = FileSource::new("file_source", config).expect("Source should build");
let (sender, mut receiver) = broadcast::channel(16);
let sender = MessageSender::new(sender, None);
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });
tokio::time::sleep(Duration::from_millis(100)).await;
{
let mut file = OpenOptions::new()
.append(true)
.open(&file_path)
.expect("Failed to open file for appending");
writeln!(file, "new line appended").unwrap();
}
let msg = tokio::time::timeout(Duration::from_secs(2), receiver.recv())
.await
.expect("Timed out waiting for appended line")
.expect("Failed to receive message");
assert_eq!(msg.payload["content"], "new line appended");
assert_eq!(msg.payload["line_number"], 1);
let _ = shutdown_tx.send(());
let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
.await
.expect("Timed out waiting for file source to shut down")
.expect("File source task panicked");
assert!(result.is_ok(), "File source should shut down cleanly");
}
#[tokio::test]
async fn test_file_source_tail_graceful_shutdown() {
let temp_file = NamedTempFile::new().expect("Failed to create temp file");
let config = FileSourceConfig {
path: temp_file.path().to_string_lossy().to_string(),
mode: "tail".to_string(),
interval: Duration::from_millis(100),
};
let source = FileSource::new("file_source", config).expect("Source should build");
let (sender, _receiver) = broadcast::channel(16);
let sender = MessageSender::new(sender, None);
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
let handle = tokio::spawn(async move { source.run(sender, shutdown_rx).await });
tokio::time::sleep(Duration::from_millis(100)).await;
let _ = shutdown_tx.send(());
let result: pipeflow::Result<()> = tokio::time::timeout(Duration::from_secs(2), handle)
.await
.expect("Timed out waiting for file source to shut down")
.expect("File source task panicked");
assert!(result.is_ok(), "File source should shut down cleanly");
}