#![allow(dead_code)]
#![allow(clippy::too_many_arguments)]
use std::future::Future;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
#[cfg(feature = "async")]
use futures::{Stream, StreamExt};
#[cfg(feature = "async")]
use tokio::fs::File;
#[cfg(feature = "async")]
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
use crate::error::{IoError, Result};
#[derive(Debug, Clone)]
pub struct AsyncStreamingConfig {
pub chunk_size: usize,
pub buffer_size: usize,
pub concurrency: usize,
pub enable_backpressure: bool,
pub operation_timeout_ms: Option<u64>,
pub max_chunks: Option<usize>,
pub skip_chunks: usize,
}
impl Default for AsyncStreamingConfig {
fn default() -> Self {
Self {
chunk_size: 64 * 1024, buffer_size: 8 * 1024, concurrency: 4,
enable_backpressure: true,
operation_timeout_ms: None,
max_chunks: None,
skip_chunks: 0,
}
}
}
impl AsyncStreamingConfig {
pub fn new() -> Self {
Self::default()
}
pub fn chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size;
self
}
pub fn buffer_size(mut self, size: usize) -> Self {
self.buffer_size = size;
self
}
pub fn concurrency(mut self, level: usize) -> Self {
self.concurrency = level;
self
}
pub fn enable_backpressure(mut self, enable: bool) -> Self {
self.enable_backpressure = enable;
self
}
pub fn timeout(mut self, timeoutms: u64) -> Self {
self.operation_timeout_ms = Some(timeoutms);
self
}
pub fn max_chunks(mut self, max: usize) -> Self {
self.max_chunks = Some(max);
self
}
pub fn skip_chunks(mut self, skip: usize) -> Self {
self.skip_chunks = skip;
self
}
}
pub struct AsyncChunkedReader {
reader: BufReader<File>,
config: AsyncStreamingConfig,
chunks_read: usize,
total_bytes_read: u64,
finished: bool,
}
impl AsyncChunkedReader {
pub async fn new<P: AsRef<Path>>(path: P, config: AsyncStreamingConfig) -> Result<Self> {
let file = File::open(path.as_ref())
.await
.map_err(|e| IoError::FileError(format!("Failed to open file: {}", e)))?;
let reader = BufReader::with_capacity(config.buffer_size, file);
Ok(Self {
reader,
config,
chunks_read: 0,
total_bytes_read: 0,
finished: false,
})
}
pub fn bytes_read(&self) -> u64 {
self.total_bytes_read
}
pub fn chunks_read(&self) -> usize {
self.chunks_read
}
pub fn is_finished(&self) -> bool {
self.finished
}
pub async fn read_next_chunk(&mut self) -> Result<Option<Vec<u8>>> {
if self.finished {
return Ok(None);
}
if self.chunks_read < self.config.skip_chunks {
let mut buffer = vec![0u8; self.config.chunk_size];
match self.reader.read(&mut buffer).await {
Ok(0) => {
self.finished = true;
return Ok(None);
}
Ok(bytes_read) => {
self.total_bytes_read += bytes_read as u64;
self.chunks_read += 1;
return Box::pin(self.read_next_chunk()).await; }
Err(e) => {
self.finished = true;
return Err(IoError::FileError(format!("Failed to skip chunk: {}", e)));
}
}
}
if let Some(max) = self.config.max_chunks {
if self.chunks_read >= max + self.config.skip_chunks {
self.finished = true;
return Ok(None);
}
}
let mut chunk = vec![0u8; self.config.chunk_size];
let read_future = self.reader.read(&mut chunk);
let bytes_read = if let Some(timeout_ms) = self.config.operation_timeout_ms {
match tokio::time::timeout(tokio::time::Duration::from_millis(timeout_ms), read_future)
.await
{
Ok(Ok(bytes)) => bytes,
Ok(Err(e)) => {
self.finished = true;
return Err(IoError::FileError(format!("Failed to read chunk: {}", e)));
}
Err(_) => {
self.finished = true;
return Err(IoError::FileError("Read operation timed out".to_string()));
}
}
} else {
match read_future.await {
Ok(bytes) => bytes,
Err(e) => {
self.finished = true;
return Err(IoError::FileError(format!("Failed to read chunk: {}", e)));
}
}
};
if bytes_read == 0 {
self.finished = true;
Ok(None)
} else {
chunk.truncate(bytes_read);
self.total_bytes_read += bytes_read as u64;
self.chunks_read += 1;
Ok(Some(chunk))
}
}
}
impl Stream for AsyncChunkedReader {
type Item = Result<Vec<u8>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.finished {
return Poll::Ready(None);
}
let read_future = self.read_next_chunk();
tokio::pin!(read_future);
match read_future.poll(cx) {
Poll::Ready(Ok(Some(chunk))) => Poll::Ready(Some(Ok(chunk))),
Poll::Ready(Ok(None)) => Poll::Ready(None),
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Poll::Pending => Poll::Pending,
}
}
}
pub struct AsyncLineReader {
reader: BufReader<File>,
config: AsyncStreamingConfig,
lines_read: usize,
finished: bool,
}
impl AsyncLineReader {
pub async fn new<P: AsRef<Path>>(path: P, config: AsyncStreamingConfig) -> Result<Self> {
let file = File::open(path.as_ref())
.await
.map_err(|e| IoError::FileError(format!("Failed to open file: {}", e)))?;
let reader = BufReader::with_capacity(config.buffer_size, file);
Ok(Self {
reader,
config,
lines_read: 0,
finished: false,
})
}
pub fn lines_read(&self) -> usize {
self.lines_read
}
pub fn is_finished(&self) -> bool {
self.finished
}
pub async fn read_next_lines(&mut self) -> Result<Option<Vec<String>>> {
if self.finished {
return Ok(None);
}
if self.lines_read < self.config.skip_chunks {
let mut line = String::new();
match self.reader.read_line(&mut line).await {
Ok(0) => {
self.finished = true;
return Ok(None);
}
Ok(_) => {
self.lines_read += 1;
return Box::pin(self.read_next_lines()).await; }
Err(e) => {
self.finished = true;
return Err(IoError::FileError(format!("Failed to skip line: {}", e)));
}
}
}
if let Some(max) = self.config.max_chunks {
if self.lines_read >= max + self.config.skip_chunks {
self.finished = true;
return Ok(None);
}
}
let mut lines = Vec::new();
let target_lines = self.config.chunk_size;
for _ in 0..target_lines {
let mut line = String::new();
let read_result = if let Some(timeout_ms) = self.config.operation_timeout_ms {
match tokio::time::timeout(
tokio::time::Duration::from_millis(timeout_ms),
self.reader.read_line(&mut line),
)
.await
{
Ok(result) => result,
Err(_) => {
self.finished = true;
return Err(IoError::FileError(
"Read line operation timed out".to_string(),
));
}
}
} else {
self.reader.read_line(&mut line).await
};
match read_result {
Ok(0) => {
self.finished = true;
break;
}
Ok(_) => {
if line.ends_with('\n') {
line.pop();
if line.ends_with('\r') {
line.pop();
}
}
lines.push(line);
self.lines_read += 1;
}
Err(e) => {
self.finished = true;
return Err(IoError::FileError(format!("Failed to read line: {}", e)));
}
}
}
if lines.is_empty() {
Ok(None)
} else {
Ok(Some(lines))
}
}
}
impl Stream for AsyncLineReader {
type Item = Result<Vec<String>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.finished {
return Poll::Ready(None);
}
let read_future = self.read_next_lines();
tokio::pin!(read_future);
match read_future.poll(cx) {
Poll::Ready(Ok(Some(lines))) => Poll::Ready(Some(Ok(lines))),
Poll::Ready(Ok(None)) => Poll::Ready(None),
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct AsyncStreamingStats {
pub bytes_processed: u64,
pub chunks_processed: usize,
pub lines_processed: usize,
pub processing_time_ms: f64,
pub concurrent_operations: usize,
pub avg_speed_mbps: f64,
pub peak_memory_usage: usize,
}
impl AsyncStreamingStats {
pub fn new() -> Self {
Self::default()
}
pub fn update_chunk(&mut self, bytes: u64, processing_time_ms: f64) {
self.bytes_processed += bytes;
self.chunks_processed += 1;
self.processing_time_ms += processing_time_ms;
if self.processing_time_ms > 0.0 {
let total_mb = self.bytes_processed as f64 / (1024.0 * 1024.0);
let total_seconds = self.processing_time_ms / 1000.0;
self.avg_speed_mbps = total_mb / total_seconds;
}
}
pub fn update_lines(&mut self, lines: usize) {
self.lines_processed += lines;
}
pub fn update_concurrency(&mut self, count: usize) {
self.concurrent_operations = count;
}
pub fn update_memory_usage(&mut self, usage: usize) {
self.peak_memory_usage = self.peak_memory_usage.max(usage);
}
pub fn summary(&self) -> String {
format!(
"Async processed {} bytes in {} chunks ({} lines), {:.2} MB/s, {} concurrent ops, peak memory: {} KB",
self.bytes_processed,
self.chunks_processed,
self.lines_processed,
self.avg_speed_mbps,
self.concurrent_operations,
self.peak_memory_usage / 1024
)
}
}
pub async fn process_file_async<P, F, Fut, T>(
path: P,
config: AsyncStreamingConfig,
processor: F,
) -> Result<(Vec<T>, AsyncStreamingStats)>
where
P: AsRef<Path>,
F: Fn(Vec<u8>, usize) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<T>> + Send,
T: Send + 'static,
{
let reader = AsyncChunkedReader::new(path, config.clone()).await?;
let mut stats = AsyncStreamingStats::new();
let mut results = Vec::new();
let start_time = std::time::Instant::now();
let processor = std::sync::Arc::new(processor);
let mut chunk_stream = reader
.enumerate()
.map(|(chunk_id, chunk_result)| {
let processor = processor.clone();
async move {
match chunk_result {
Ok(chunk) => {
let chunk_start = std::time::Instant::now();
let result = processor(chunk.clone(), chunk_id).await?;
let processing_time = chunk_start.elapsed().as_secs_f64() * 1000.0;
Ok((result, chunk.len(), processing_time))
}
Err(e) => Err(e),
}
}
})
.buffer_unordered(config.concurrency);
while let Some(result) = chunk_stream.next().await {
match result {
Ok((processed_result, bytes, processing_time)) => {
results.push(processed_result);
stats.update_chunk(bytes as u64, processing_time);
stats.update_concurrency(config.concurrency);
}
Err(e) => return Err(e),
}
}
stats.processing_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
Ok((results, stats))
}
pub async fn process_csv_async<P, F, Fut, T>(
path: P,
config: AsyncStreamingConfig,
processor: F,
) -> Result<(Vec<T>, AsyncStreamingStats)>
where
P: AsRef<Path>,
F: Fn(Vec<String>, usize) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<T>> + Send,
T: Send + 'static,
{
let reader = AsyncLineReader::new(path, config.clone()).await?;
let mut stats = AsyncStreamingStats::new();
let mut results = Vec::new();
let start_time = std::time::Instant::now();
let processor = std::sync::Arc::new(processor);
let mut line_stream = reader
.enumerate()
.map(|(chunk_id, lines_result)| {
let processor = processor.clone();
async move {
match lines_result {
Ok(lines) => {
let chunk_start = std::time::Instant::now();
let mut chunk_results = Vec::new();
for (line_id, line) in lines.into_iter().enumerate() {
let result = processor(vec![line], chunk_id * 1000 + line_id).await?;
chunk_results.push(result);
}
let processing_time = chunk_start.elapsed().as_secs_f64() * 1000.0;
Ok((chunk_results, processing_time))
}
Err(e) => Err(e),
}
}
})
.buffer_unordered(config.concurrency);
while let Some(result) = line_stream.next().await {
match result {
Ok((chunk_results, processing_time)) => {
let line_count = chunk_results.len();
results.extend(chunk_results);
stats.update_chunk(0, processing_time); stats.update_lines(line_count);
stats.update_concurrency(config.concurrency);
}
Err(e) => return Err(e),
}
}
stats.processing_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
Ok((results, stats))
}
pub struct CancellationToken {
cancelled: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
impl CancellationToken {
pub fn new() -> Self {
Self {
cancelled: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
}
}
pub fn cancel(&self) {
self.cancelled
.store(true, std::sync::atomic::Ordering::SeqCst);
}
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(std::sync::atomic::Ordering::SeqCst)
}
pub fn clone(&self) -> Self {
Self {
cancelled: self.cancelled.clone(),
}
}
}
impl Default for CancellationToken {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[tokio::test]
async fn test_async_chunked_reader() {
let temp_dir = tempdir().expect("Operation failed");
let file_path = temp_dir.path().join("test_async.txt");
let test_data = "0123456789".repeat(100); std::fs::write(&file_path, &test_data).expect("Operation failed");
let config = AsyncStreamingConfig::new().chunk_size(100);
let mut reader = AsyncChunkedReader::new(&file_path, config)
.await
.expect("Operation failed");
let mut chunks = Vec::new();
while let Some(chunk_result) = reader.read_next_chunk().await.expect("Operation failed") {
chunks.push(chunk_result);
}
assert_eq!(chunks.len(), 10); for chunk in &chunks {
assert_eq!(chunk.len(), 100);
}
}
#[tokio::test]
async fn test_async_line_reader() {
let temp_dir = tempdir().expect("Operation failed");
let file_path = temp_dir.path().join("test_async_lines.txt");
let lines: Vec<String> = (0..50).map(|i| format!("Line {}", i)).collect();
std::fs::write(&file_path, lines.join("\n")).expect("Operation failed");
let config = AsyncStreamingConfig::new().chunk_size(10); let mut reader = AsyncLineReader::new(&file_path, config)
.await
.expect("Operation failed");
let mut chunks = Vec::new();
while let Some(lines_result) = reader.read_next_lines().await.expect("Operation failed") {
chunks.push(lines_result);
}
assert_eq!(chunks.len(), 5); for chunk in &chunks {
assert_eq!(chunk.len(), 10);
}
}
#[tokio::test]
async fn test_async_config() {
let config = AsyncStreamingConfig::new()
.chunk_size(1024)
.buffer_size(4096)
.concurrency(8)
.timeout(5000);
assert_eq!(config.chunk_size, 1024);
assert_eq!(config.buffer_size, 4096);
assert_eq!(config.concurrency, 8);
assert_eq!(config.operation_timeout_ms, Some(5000));
}
#[tokio::test]
async fn test_cancellation_token() {
let token = CancellationToken::new();
assert!(!token.is_cancelled());
token.cancel();
assert!(token.is_cancelled());
let cloned_token = token.clone();
assert!(cloned_token.is_cancelled());
}
}