use std::fs::File;
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
use std::path::Path;
use crate::compression::CompressionAlgorithm;
use crate::error::{IoError, Result};
#[derive(Debug, Clone)]
pub struct StreamingConfig {
pub chunk_size: usize,
pub buffer_size: usize,
pub auto_detect_compression: bool,
pub compression: Option<CompressionAlgorithm>,
pub max_chunks: Option<usize>,
pub skip_chunks: usize,
}
impl Default for StreamingConfig {
fn default() -> Self {
Self {
chunk_size: 64 * 1024, buffer_size: 8 * 1024, auto_detect_compression: true,
compression: None,
max_chunks: None,
skip_chunks: 0,
}
}
}
impl StreamingConfig {
pub fn new() -> Self {
Self::default()
}
pub fn chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size;
self
}
pub fn buffer_size(mut self, size: usize) -> Self {
self.buffer_size = size;
self
}
pub fn auto_detect_compression(mut self, enable: bool) -> Self {
self.auto_detect_compression = enable;
self
}
pub fn compression(mut self, algorithm: CompressionAlgorithm) -> Self {
self.compression = Some(algorithm);
self
}
pub fn max_chunks(mut self, max: usize) -> Self {
self.max_chunks = Some(max);
self
}
pub fn skip_chunks(mut self, skip: usize) -> Self {
self.skip_chunks = skip;
self
}
}
pub struct ChunkedReader {
reader: BufReader<File>,
config: StreamingConfig,
chunks_read: usize,
total_bytes_read: u64,
finished: bool,
}
impl ChunkedReader {
pub fn new<P: AsRef<Path>>(path: P, config: StreamingConfig) -> Result<Self> {
let file = File::open(path.as_ref())
.map_err(|e| IoError::FileError(format!("Failed to open file: {e}")))?;
let reader = BufReader::with_capacity(config.buffer_size, file);
Ok(Self {
reader,
config,
chunks_read: 0,
total_bytes_read: 0,
finished: false,
})
}
pub fn bytes_read(&self) -> u64 {
self.total_bytes_read
}
pub fn chunks_read(&self) -> usize {
self.chunks_read
}
pub fn is_finished(&self) -> bool {
self.finished
}
pub fn skip_bytes(&mut self, bytes: u64) -> Result<u64> {
let skipped = self
.reader
.seek(SeekFrom::Current(bytes as i64))
.map_err(|e| IoError::FileError(format!("Failed to skip bytes: {e}")))?;
self.total_bytes_read += bytes;
Ok(skipped)
}
pub fn position(&mut self) -> Result<u64> {
self.reader
.stream_position()
.map_err(|e| IoError::FileError(format!("Failed to get position: {e}")))
}
}
impl Iterator for ChunkedReader {
type Item = Result<Vec<u8>>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
if self.chunks_read < self.config.skip_chunks {
match self.skip_bytes(self.config.chunk_size as u64) {
Ok(_) => {
self.chunks_read += 1;
return self.next(); }
Err(e) => return Some(Err(e)),
}
}
if let Some(max) = self.config.max_chunks {
if self.chunks_read >= max + self.config.skip_chunks {
self.finished = true;
return None;
}
}
let mut chunk = vec![0u8; self.config.chunk_size];
match self.reader.read(&mut chunk) {
Ok(0) => {
self.finished = true;
None
}
Ok(bytes_read) => {
chunk.truncate(bytes_read);
self.total_bytes_read += bytes_read as u64;
self.chunks_read += 1;
Some(Ok(chunk))
}
Err(e) => {
self.finished = true;
Some(Err(IoError::FileError(format!(
"Failed to read chunk: {}",
e
))))
}
}
}
}
pub struct LineChunkedReader {
reader: BufReader<File>,
config: StreamingConfig,
lines_read: usize,
finished: bool,
}
impl LineChunkedReader {
pub fn new<P: AsRef<Path>>(path: P, config: StreamingConfig) -> Result<Self> {
let file = File::open(path.as_ref())
.map_err(|e| IoError::FileError(format!("Failed to open file: {e}")))?;
let reader = BufReader::with_capacity(config.buffer_size, file);
Ok(Self {
reader,
config,
lines_read: 0,
finished: false,
})
}
pub fn lines_read(&self) -> usize {
self.lines_read
}
pub fn is_finished(&self) -> bool {
self.finished
}
}
impl Iterator for LineChunkedReader {
type Item = Result<Vec<String>>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
if self.lines_read < self.config.skip_chunks {
let mut line = String::new();
match self.reader.read_line(&mut line) {
Ok(0) => {
self.finished = true;
return None;
}
Ok(_) => {
self.lines_read += 1;
return self.next(); }
Err(e) => {
return Some(Err(IoError::FileError(format!(
"Failed to skip line: {}",
e
))))
}
}
}
if let Some(max) = self.config.max_chunks {
if self.lines_read >= max + self.config.skip_chunks {
self.finished = true;
return None;
}
}
let mut lines = Vec::new();
let target_lines = self.config.chunk_size;
for _ in 0..target_lines {
let mut line = String::new();
match self.reader.read_line(&mut line) {
Ok(0) => {
self.finished = true;
break;
}
Ok(_) => {
if line.ends_with('\n') {
line.pop();
if line.ends_with('\r') {
line.pop();
}
}
lines.push(line);
self.lines_read += 1;
}
Err(e) => {
self.finished = true;
return Some(Err(IoError::FileError(format!(
"Failed to read line: {}",
e
))));
}
}
}
if lines.is_empty() {
None
} else {
Some(Ok(lines))
}
}
}
pub struct StreamingCsvReader {
line_reader: LineChunkedReader,
header: Option<Vec<String>>,
delimiter: char,
has_header: bool,
}
impl StreamingCsvReader {
pub fn new<P: AsRef<Path>>(path: P, config: StreamingConfig) -> Result<Self> {
let line_reader = LineChunkedReader::new(path, config)?;
Ok(Self {
line_reader,
header: None,
delimiter: ',',
has_header: false,
})
}
pub fn with_delimiter(mut self, delimiter: char) -> Self {
self.delimiter = delimiter;
self
}
pub fn with_header(mut self, hasheader: bool) -> Self {
self.has_header = hasheader;
self
}
pub fn header(&self) -> Option<&Vec<String>> {
self.header.as_ref()
}
fn parse_line(&self, line: &str) -> Vec<String> {
line.split(self.delimiter)
.map(|field| field.trim().to_string())
.collect()
}
}
impl Iterator for StreamingCsvReader {
type Item = Result<Vec<Vec<String>>>;
fn next(&mut self) -> Option<Self::Item> {
if self.has_header && self.header.is_none() {
match self.line_reader.next() {
Some(Ok(lines)) => {
if let Some(header_line) = lines.first() {
self.header = Some(self.parse_line(header_line));
}
let data_lines: Vec<Vec<String>> = lines
.iter()
.skip(1)
.map(|line| self.parse_line(line))
.collect();
if data_lines.is_empty() {
return self.next(); } else {
return Some(Ok(data_lines));
}
}
Some(Err(e)) => return Some(Err(e)),
None => return None,
}
}
match self.line_reader.next() {
Some(Ok(lines)) => {
let data_rows: Vec<Vec<String>> =
lines.iter().map(|line| self.parse_line(line)).collect();
Some(Ok(data_rows))
}
Some(Err(e)) => Some(Err(e)),
None => None,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct StreamingStats {
pub bytes_processed: u64,
pub chunks_processed: usize,
pub lines_processed: usize,
pub processing_time_ms: f64,
pub avg_bytes_per_chunk: f64,
pub avg_speed_mbps: f64,
}
impl StreamingStats {
pub fn new() -> Self {
Self::default()
}
pub fn update_chunk(&mut self, bytes: u64, processing_time_ms: f64) {
self.bytes_processed += bytes;
self.chunks_processed += 1;
self.processing_time_ms += processing_time_ms;
self.avg_bytes_per_chunk = self.bytes_processed as f64 / self.chunks_processed as f64;
if self.processing_time_ms > 0.0 {
let total_mb = self.bytes_processed as f64 / (1024.0 * 1024.0);
let total_seconds = self.processing_time_ms / 1000.0;
self.avg_speed_mbps = total_mb / total_seconds;
}
}
pub fn update_lines(&mut self, lines: usize) {
self.lines_processed += lines;
}
pub fn summary(&self) -> String {
format!(
"Processed {} bytes in {} chunks ({} lines), avg {:.2} MB/s",
self.bytes_processed, self.chunks_processed, self.lines_processed, self.avg_speed_mbps
)
}
}
#[allow(dead_code)]
pub fn process_file_chunked<P, F, T>(
path: P,
config: StreamingConfig,
mut processor: F,
) -> Result<(T, StreamingStats)>
where
P: AsRef<Path>,
F: FnMut(&[u8], usize) -> Result<T>,
T: Default,
{
let reader = ChunkedReader::new(path, config)?;
let mut stats = StreamingStats::new();
let mut result = T::default();
let start_time = std::time::Instant::now();
for (chunk_id, chunk_result) in reader.enumerate() {
let chunk_start = std::time::Instant::now();
match chunk_result {
Ok(chunk_data) => {
result = processor(&chunk_data, chunk_id)?;
let chunk_time = chunk_start.elapsed().as_secs_f64() * 1000.0;
stats.update_chunk(chunk_data.len() as u64, chunk_time);
}
Err(e) => return Err(e),
}
}
stats.processing_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
Ok((result, stats))
}
#[allow(dead_code)]
pub fn process_csv_chunked<P, F, T>(
path: P,
config: StreamingConfig,
has_header: bool,
mut processor: F,
) -> Result<(T, StreamingStats)>
where
P: AsRef<Path>,
F: FnMut(&[Vec<String>], usize, Option<&Vec<String>>) -> Result<T>,
T: Default,
{
let mut reader = StreamingCsvReader::new(path, config)?.with_header(has_header);
let mut stats = StreamingStats::new();
let mut result = T::default();
let start_time = std::time::Instant::now();
let mut chunk_id = 0;
while let Some(chunk_result) = reader.next() {
let chunk_start = std::time::Instant::now();
match chunk_result {
Ok(rows) => {
let header = reader.header();
result = processor(&rows, chunk_id, header)?;
let chunk_time = chunk_start.elapsed().as_secs_f64() * 1000.0;
stats.update_chunk(0, chunk_time); stats.update_lines(rows.len());
chunk_id += 1;
}
Err(e) => return Err(e),
}
}
stats.processing_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
Ok((result, stats))
}
pub mod cdc;
pub mod checkpoint;
pub mod log_parsing;
pub mod time_series_ingestion;
pub mod watermark;
pub mod windows;
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::tempdir;
#[test]
fn test_chunked_reader() {
let temp_dir = tempdir().expect("Operation failed");
let file_path = temp_dir.path().join("test_data.txt");
let test_data = "0123456789".repeat(100); std::fs::write(&file_path, &test_data).expect("Operation failed");
let config = StreamingConfig::new().chunk_size(100);
let reader = ChunkedReader::new(&file_path, config).expect("Operation failed");
let chunks: Result<Vec<_>> = reader.collect();
let chunks = chunks.expect("Operation failed");
assert_eq!(chunks.len(), 10); for chunk in &chunks {
assert_eq!(chunk.len(), 100);
}
}
#[test]
fn test_line_chunked_reader() {
let temp_dir = tempdir().expect("Operation failed");
let file_path = temp_dir.path().join("test_lines.txt");
let lines: Vec<String> = (0..50).map(|i| format!("Line {i}")).collect();
std::fs::write(&file_path, lines.join("\n")).expect("Operation failed");
let config = StreamingConfig::new().chunk_size(10); let reader = LineChunkedReader::new(&file_path, config).expect("Operation failed");
let chunks: Result<Vec<_>> = reader.collect();
let chunks = chunks.expect("Operation failed");
assert_eq!(chunks.len(), 5); for chunk in &chunks {
assert_eq!(chunk.len(), 10);
}
}
#[test]
fn test_streaming_csv_reader() {
let temp_dir = tempdir().expect("Operation failed");
let file_path = temp_dir.path().join("test.csv");
let mut file = File::create(&file_path).expect("Operation failed");
writeln!(file, "name,age,city").expect("Operation failed");
for i in 0..20 {
writeln!(file, "Person{},{},City{}", i, 20 + i, i % 5).expect("Operation failed");
}
let config = StreamingConfig::new().chunk_size(5); let reader = StreamingCsvReader::new(&file_path, config)
.expect("Operation failed")
.with_header(true);
let chunks: Result<Vec<_>> = reader.collect();
let chunks = chunks.expect("Operation failed");
assert_eq!(chunks.len(), 5);
let total_rows: usize = chunks.iter().map(|chunk| chunk.len()).sum();
assert_eq!(total_rows, 20);
for chunk in &chunks {
for row in chunk {
assert_eq!(row.len(), 3); }
}
}
#[test]
fn test_streaming_config() {
let config = StreamingConfig::new()
.chunk_size(1024)
.buffer_size(4096)
.max_chunks(10)
.skip_chunks(2);
assert_eq!(config.chunk_size, 1024);
assert_eq!(config.buffer_size, 4096);
assert_eq!(config.max_chunks, Some(10));
assert_eq!(config.skip_chunks, 2);
}
#[test]
fn test_process_file_chunked() {
let temp_dir = tempdir().expect("Operation failed");
let file_path = temp_dir.path().join("test_process.txt");
let test_data = "Hello World!".repeat(100);
std::fs::write(&file_path, &test_data).expect("Operation failed");
let config = StreamingConfig::new().chunk_size(100);
let (total_size, stats) =
process_file_chunked(&file_path, config, |chunk, _chunk_id| -> Result<usize> {
Ok(chunk.len())
})
.expect("Operation failed");
assert_eq!(total_size, 100); assert!(stats.bytes_processed > 0);
assert!(stats.chunks_processed > 0);
}
}
pub mod relational;