use std::{
io::{BufReader, BufWriter, Cursor, Read, Write},
path::Path,
};
use bytes::Bytes;
use polars::{
io::cloud::{BlockingCloudWriter, CloudOptions, build_object_store, object_path_from_str},
prelude::PlPathRef,
};
use serde::{Deserialize, Serialize};
use strum::{Display, EnumString, IntoStaticStr};
use crate::error::{ChapatyError, ChapatyResult, IoError};
#[derive(Default, Debug, Clone)]
pub(crate) struct CloudReader {
inner: Cursor<Bytes>,
}
impl CloudReader {
pub async fn new(uri: &str, cloud_options: Option<&CloudOptions>) -> ChapatyResult<Self> {
let (cloud_location, object_store) =
build_object_store(PlPathRef::new(uri), cloud_options, false)
.await
.map_err(|e| IoError::ObjectStoreBuild(e.to_string()))?;
let path = object_path_from_str(&cloud_location.prefix)
.map_err(|e| IoError::ObjectPathBuild(e.to_string()))?;
let result = object_store
.to_dyn_object_store()
.await
.get(&path)
.await
.map_err(map_object_store_err)?;
let bytes = result.bytes().await.map_err(map_object_store_err)?;
Ok(CloudReader {
inner: Cursor::new(bytes),
})
}
}
impl Read for CloudReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.inner.read(buf)
}
}
fn map_object_store_err(err: object_store::Error) -> ChapatyError {
IoError::ReadBytesFailed(err.to_string()).into()
}
#[derive(Debug, Clone)]
pub enum StorageLocation<'a> {
Cloud {
path: &'a str,
options: CloudOptions,
},
Local(&'a Path),
}
impl<'a> StorageLocation<'a> {
pub(crate) async fn writer(
&self,
file_name: &str,
buffer_size: usize,
) -> ChapatyResult<Box<dyn Write + Send>> {
match self {
Self::Cloud { path, options } => {
let full_path = format!("{path}/{file_name}");
BlockingCloudWriter::new(PlPathRef::new(&full_path), Some(options))
.await
.map(|writer| {
Box::new(BufWriter::with_capacity(buffer_size, writer))
as Box<dyn Write + Send>
})
.map_err(|e| ChapatyError::Io(IoError::WriterCreation(e.to_string())))
}
Self::Local(path) => {
if !path.exists() {
std::fs::create_dir_all(path).map_err(|e| {
ChapatyError::Io(IoError::WriterCreation(format!(
"Failed to create directory {:?}: {}",
path, e
)))
})?;
}
let full_path = path.join(file_name);
std::fs::File::create(full_path)
.map(|file| {
Box::new(BufWriter::with_capacity(buffer_size, file))
as Box<dyn Write + Send>
})
.map_err(|e| ChapatyError::Io(IoError::WriterCreation(e.to_string())))
}
}
}
pub(crate) async fn reader_with_size(
&self,
file_name: &str,
buffer_size: usize,
) -> ChapatyResult<(Box<dyn Read + Send>, Option<u64>)> {
match self {
Self::Cloud { path, options } => {
let full_path = format!("{path}/{file_name}");
let cloud_reader = CloudReader::new(&full_path, Some(options)).await?;
Ok((
Box::new(BufReader::with_capacity(buffer_size, cloud_reader))
as Box<dyn Read + Send>,
None,
))
}
Self::Local(path) => {
let full_path = path.join(file_name);
let metadata = std::fs::metadata(&full_path)
.map_err(|e| ChapatyError::Io(IoError::ReaderCreation(e.to_string())))?;
let size = metadata.len();
let file = std::fs::File::open(full_path)
.map_err(|e| ChapatyError::Io(IoError::ReaderCreation(e.to_string())))?;
Ok((
Box::new(BufReader::with_capacity(buffer_size, file)) as Box<dyn Read + Send>,
Some(size),
))
}
}
}
}
#[derive(
Debug,
Clone,
Copy,
PartialEq,
PartialOrd,
Eq,
Hash,
Ord,
Serialize,
Deserialize,
EnumString,
Display,
IntoStaticStr,
Default,
)]
#[strum(serialize_all = "lowercase")]
pub enum SerdeFormat {
#[default]
Postcard,
}
impl SerdeFormat {
pub fn from_path(path: &str) -> ChapatyResult<Self> {
match path
.rsplit_once('.')
.ok_or_else(|| err(path, true))?
.1
.to_lowercase()
.as_str()
{
"postcard" => Ok(Self::Postcard),
ext => Err(err(ext, false)),
}
}
}
fn err(s: &str, missing_extension: bool) -> ChapatyError {
let msg = if missing_extension {
format!("Unsupported file format: missing or invalid extension in path '{s}'")
} else {
format!("Unsupported file format: '{s}'")
};
IoError::UnsupportedFormat(msg).into()
}