use std::sync::Arc;
use bytes::Bytes;
use futures_lite::{AsyncReadExt, StreamExt};
use smol::channel::{Receiver, Sender, unbounded};
#[must_use]
pub fn create_channel<T>() -> (Sender<T>, Receiver<T>) {
unbounded()
}
pub struct AsyncFileProcessor {
buffer_size: usize,
progress_callback: Option<Box<dyn Fn(u64) + Send + Sync>>,
}
impl AsyncFileProcessor {
#[must_use]
pub fn new() -> Self {
Self::with_config(8192, None)
}
#[must_use]
pub fn with_config(
buffer_size: usize,
progress_callback: Option<Box<dyn Fn(u64) + Send + Sync>>,
) -> Self {
Self {
buffer_size,
progress_callback,
}
}
#[must_use]
pub fn builder() -> AsyncFileProcessorBuilder {
AsyncFileProcessorBuilder::new()
}
pub async fn process_file<F, T>(
&self,
path: &str,
processor: F,
) -> Result<Vec<T>, std::io::Error>
where
F: Fn(Bytes) -> T + Send + Sync,
T: Send + 'static,
{
let mut file = smol::fs::File::open(path).await?;
let mut results = Vec::new();
let mut buffer = vec![0u8; self.buffer_size];
let mut bytes_processed = 0u64;
loop {
let bytes_read = file.read(&mut buffer).await?;
if bytes_read == 0 {
break;
}
let chunk = Bytes::copy_from_slice(&buffer[..bytes_read]);
let result = processor(chunk);
results.push(result);
bytes_processed += bytes_read as u64;
if let Some(ref callback) = self.progress_callback {
callback(bytes_processed);
}
}
Ok(results)
}
pub async fn process_file_async<F, Fut, T>(
&self,
path: &str,
processor: F,
) -> Result<Vec<T>, std::io::Error>
where
F: Fn(Bytes) -> Fut + Send + Sync,
Fut: std::future::Future<Output = Result<T, std::io::Error>> + Send,
T: Send + 'static,
{
let mut file = smol::fs::File::open(path).await?;
let mut results = Vec::new();
let mut buffer = vec![0u8; self.buffer_size];
let mut bytes_processed = 0u64;
loop {
let bytes_read = file.read(&mut buffer).await?;
if bytes_read == 0 {
break;
}
let chunk = Bytes::copy_from_slice(&buffer[..bytes_read]);
let result = processor(chunk).await?;
results.push(result);
bytes_processed += bytes_read as u64;
if let Some(ref callback) = self.progress_callback {
callback(bytes_processed);
}
}
Ok(results)
}
}
impl Default for AsyncFileProcessor {
fn default() -> Self {
Self::new()
}
}
pub struct AsyncFileProcessorBuilder {
buffer_size: usize,
progress_callback: Option<Box<dyn Fn(u64) + Send + Sync>>,
}
impl AsyncFileProcessorBuilder {
#[must_use]
pub fn new() -> Self {
Self {
buffer_size: 8192,
progress_callback: None,
}
}
#[must_use]
pub fn buffer_size(mut self, size: usize) -> Self {
self.buffer_size = size;
self
}
#[must_use]
pub fn progress_callback<F>(mut self, callback: F) -> Self
where
F: Fn(u64) + Send + Sync + 'static,
{
self.progress_callback = Some(Box::new(callback));
self
}
#[must_use]
pub fn build(self) -> AsyncFileProcessor {
AsyncFileProcessor {
buffer_size: self.buffer_size,
progress_callback: self.progress_callback,
}
}
}
impl Default for AsyncFileProcessorBuilder {
fn default() -> Self {
Self::new()
}
}
pub async fn process_file_async<F>(path: &str, processor: F) -> Result<(), std::io::Error>
where
F: Fn(String) + Send + Sync + 'static,
{
let content = crate::io::utils::read_file_async(path).await?;
processor(content);
Ok(())
}
pub struct AsyncStreamUtils;
impl AsyncStreamUtils {
pub async fn map_stream<S, F, T, U>(mut stream: S, transformer: F) -> Vec<U>
where
S: futures_lite::Stream<Item = T> + Unpin,
F: Fn(T) -> U,
U: Send + 'static,
{
let mut results = Vec::new();
while let Some(item) = stream.next().await {
let transformed_item = transformer(item);
results.push(transformed_item);
}
results
}
pub async fn filter_stream<S, F, T>(mut stream: S, predicate: F) -> Vec<T>
where
S: futures_lite::Stream<Item = T> + Unpin,
F: Fn(&T) -> bool,
T: Send + 'static,
{
let mut results = Vec::new();
while let Some(item) = stream.next().await {
if predicate(&item) {
results.push(item);
}
}
results
}
pub async fn collect_stream<S, T>(mut stream: S) -> Vec<T>
where
S: futures_lite::Stream<Item = T> + Unpin,
T: Send + 'static,
{
let mut results = Vec::new();
while let Some(item) = stream.next().await {
results.push(item);
}
results
}
}
pub struct ChannelStreamProcessor<T> {
sender: Sender<T>,
receiver: Receiver<T>,
processor: Arc<dyn Fn(T) -> T + Send + Sync>,
}
impl<T> ChannelStreamProcessor<T>
where
T: Send + 'static + Clone,
{
#[must_use]
pub fn new<F>(processor: F) -> Self
where
F: Fn(T) -> T + Send + Sync + 'static,
{
let (sender, receiver) = create_channel();
Self {
sender,
receiver,
processor: Arc::new(processor),
}
}
pub async fn send(&self, data: T) -> Result<(), smol::channel::SendError<T>> {
self.sender.send(data).await
}
pub async fn receive(&self) -> Result<T, smol::channel::RecvError> {
let data = self.receiver.recv().await?;
Ok((self.processor)(data))
}
pub fn start_background_processing(&self) {
let receiver = self.receiver.clone();
let processor = self.processor.clone();
smol::spawn(async move {
while let Ok(data) = receiver.recv().await {
let processed = processor(data);
drop(processed);
}
})
.detach();
}
}
pub struct BufferedAsyncReader<R> {
reader: R,
buffer: Vec<u8>,
buffer_size: usize,
position: usize,
limit: usize,
}
impl<R> BufferedAsyncReader<R>
where
R: AsyncReadExt + Unpin,
{
#[must_use]
pub fn new(reader: R, buffer_size: usize) -> Self {
Self {
reader,
buffer: vec![0u8; buffer_size],
buffer_size,
position: 0,
limit: 0,
}
}
pub async fn read_buffer(&mut self) -> Result<&[u8], std::io::Error> {
if self.position >= self.limit {
self.limit = self.reader.read(&mut self.buffer).await?;
self.position = 0;
if self.limit == 0 {
return Ok(&self.buffer[0..0]);
}
}
Ok(&self.buffer[self.position..self.limit])
}
pub fn consume(&mut self, amount: usize) {
self.position = (self.position + amount).min(self.limit);
}
#[must_use]
pub fn buffer_size(&self) -> usize {
self.buffer_size
}
}