use anyhow::{Context, Result};
use bytes::Bytes;
use object_store::path::Path as ObjectPath;
use object_store::{ObjectStore, PutPayload};
use parquet::file::metadata::ParquetMetaDataReader;
use std::collections::HashMap;
use std::sync::{Arc, Mutex, OnceLock};
use url::Url;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct CacheKey {
scheme: String,
bucket: String,
}
static STORE_CACHE: OnceLock<Mutex<HashMap<CacheKey, Arc<dyn ObjectStore>>>> = OnceLock::new();
fn get_cached_store(
key: CacheKey,
create: impl FnOnce() -> Result<Arc<dyn ObjectStore>>,
) -> Result<Arc<dyn ObjectStore>> {
let cache = STORE_CACHE.get_or_init(|| Mutex::new(HashMap::new()));
let mut cache = cache.lock().unwrap();
if let Some(store) = cache.get(&key) {
return Ok(Arc::clone(store));
}
let store = create()?;
cache.insert(key, Arc::clone(&store));
Ok(store)
}
fn parse_path_for_read(path: &str) -> Result<(Arc<dyn ObjectStore>, ObjectPath)> {
if let Ok(url) = Url::parse(path) {
if url.scheme().len() > 1 {
return parse_url(&url, path);
}
}
let local_path = std::path::Path::new(path)
.canonicalize()
.with_context(|| format!("Failed to resolve path: {}", path))?;
let store = object_store::local::LocalFileSystem::new();
let path_str = local_path.to_string_lossy();
let path_str = path_str.strip_prefix(r"\\?\").unwrap_or(&path_str);
let object_path = ObjectPath::from(path_str);
Ok((Arc::new(store), object_path))
}
fn parse_path_for_write(path: &str) -> Result<(Arc<dyn ObjectStore>, ObjectPath)> {
if let Ok(url) = Url::parse(path) {
if url.scheme().len() > 1 {
return parse_url(&url, path);
}
}
let local_path = if std::path::Path::new(path).is_absolute() {
std::path::PathBuf::from(path)
} else {
std::env::current_dir()
.context("Failed to get current directory")?
.join(path)
};
let store = object_store::local::LocalFileSystem::new();
let path_str = local_path.to_string_lossy();
let path_str = path_str.strip_prefix(r"\\?\").unwrap_or(&path_str);
let object_path = ObjectPath::from(path_str);
Ok((Arc::new(store), object_path))
}
fn parse_url(url: &Url, original: &str) -> Result<(Arc<dyn ObjectStore>, ObjectPath)> {
let object_path = ObjectPath::from(url.path());
match url.scheme() {
"s3" => {
let bucket = url.host_str().context("S3 URL missing bucket")?;
let key = CacheKey {
scheme: "s3".to_string(),
bucket: bucket.to_string(),
};
let store = get_cached_store(key, || {
let s3 = object_store::aws::AmazonS3Builder::from_env()
.with_bucket_name(bucket)
.build()
.context("Failed to create S3 client")?;
Ok(Arc::new(s3) as Arc<dyn ObjectStore>)
})?;
Ok((store, object_path))
}
"gs" => {
let bucket = url.host_str().context("GCS URL missing bucket")?;
let key = CacheKey {
scheme: "gs".to_string(),
bucket: bucket.to_string(),
};
let store = get_cached_store(key, || {
let gcs = object_store::gcp::GoogleCloudStorageBuilder::from_env()
.with_bucket_name(bucket)
.build()
.context("Failed to create GCS client")?;
Ok(Arc::new(gcs) as Arc<dyn ObjectStore>)
})?;
Ok((store, object_path))
}
"az" | "azure" | "abfs" | "abfss" => {
let container = url.host_str().context("Azure URL missing container")?;
let key = CacheKey {
scheme: url.scheme().to_string(),
bucket: container.to_string(),
};
let store = get_cached_store(key, || {
let azure = object_store::azure::MicrosoftAzureBuilder::from_env()
.with_container_name(container)
.build()
.context("Failed to create Azure client")?;
Ok(Arc::new(azure) as Arc<dyn ObjectStore>)
})?;
Ok((store, object_path))
}
"file" => {
let local_path = url
.to_file_path()
.map_err(|_| anyhow::anyhow!("Invalid file URL: {}", original))?;
let store = object_store::local::LocalFileSystem::new();
let path_str = local_path.to_string_lossy();
let path_str = path_str.strip_prefix(r"\\?\").unwrap_or(&path_str);
let path_str = path_str.replace('\\', "/");
let object_path = ObjectPath::from(path_str.as_str());
Ok((Arc::new(store), object_path))
}
scheme => {
anyhow::bail!("Unsupported URL scheme: {scheme}");
}
}
}
pub async fn read_all(path: &str) -> Result<Bytes> {
let (store, object_path) = parse_path_for_read(path)?;
let result = store
.get(&object_path)
.await
.with_context(|| format!("Failed to read: {}", path))?;
let bytes = result
.bytes()
.await
.with_context(|| format!("Failed to read bytes: {}", path))?;
Ok(bytes)
}
pub async fn head(path: &str) -> Result<object_store::ObjectMeta> {
let (store, object_path) = parse_path_for_read(path)?;
let meta = store
.head(&object_path)
.await
.with_context(|| format!("Failed to get metadata: {}", path))?;
Ok(meta)
}
pub async fn read_range(path: &str, start: usize, length: usize) -> Result<Bytes> {
let (store, object_path) = parse_path_for_read(path)?;
let range = std::ops::Range {
start,
end: start + length,
};
let bytes = store
.get_range(&object_path, range)
.await
.with_context(|| format!("Failed to read range from: {}", path))?;
Ok(bytes)
}
pub async fn write_all(path: &str, data: Bytes) -> Result<()> {
let (store, object_path) = parse_path_for_write(path)?;
store
.put(&object_path, PutPayload::from_bytes(data))
.await
.with_context(|| format!("Failed to write: {}", path))?;
Ok(())
}
pub async fn list_files(path: &str, suffix: Option<&str>) -> Result<Vec<String>> {
let (store, object_path) = parse_path_for_read(path)?;
use futures::TryStreamExt;
let mut files = Vec::new();
let mut stream = store.list(Some(&object_path));
while let Some(meta) = stream.try_next().await? {
let file_path = meta.location.to_string();
let file_path = if !file_path.starts_with('/') && !file_path.contains("://") {
format!("/{}", file_path)
} else {
file_path
};
if let Some(s) = suffix {
if file_path.ends_with(s) {
files.push(file_path);
}
} else {
files.push(file_path);
}
}
Ok(files)
}
pub async fn extract_parquet_footer(path: &str) -> Result<(Bytes, u64)> {
let meta = head(path).await?;
let file_size = meta.size as u64;
if file_size < 8 {
anyhow::bail!("Invalid Parquet file: too small to contain footer");
}
if let Some(window_kb) = std::env::var("PARX_SUFFIX_RANGE_KB")
.ok()
.and_then(|v| v.parse::<u64>().ok())
{
if window_kb > 0 {
let window_bytes = window_kb.saturating_mul(1024);
let start = file_size.saturating_sub(window_bytes) as usize;
let tail = read_range(path, start, (file_size as usize).saturating_sub(start)).await?;
if tail.len() >= 8 && &tail[tail.len() - 4..] == b"PAR1" {
let footer_len = u32::from_le_bytes([
tail[tail.len() - 8],
tail[tail.len() - 7],
tail[tail.len() - 6],
tail[tail.len() - 5],
]) as u64;
if footer_len <= file_size - 8 && footer_len + 8 <= tail.len() as u64 {
let footer_start = tail.len() - 8 - footer_len as usize;
let footer_bytes = tail.slice(footer_start..tail.len() - 8);
return Ok((footer_bytes, file_size));
}
}
}
}
let tail = read_range(path, (file_size - 8) as usize, 8).await?;
if &tail[4..8] != b"PAR1" {
anyhow::bail!("Invalid Parquet file: missing PAR1 magic at end");
}
let footer_len = u32::from_le_bytes([tail[0], tail[1], tail[2], tail[3]]) as u64;
if footer_len > file_size - 8 {
anyhow::bail!("Invalid Parquet file: footer length out of bounds");
}
let footer_start = (file_size - 8 - footer_len) as usize;
let footer_bytes = read_range(path, footer_start, footer_len as usize).await?;
Ok((footer_bytes, file_size))
}
pub async fn extract_parquet_page_indexes(path: &str, footer_bytes: &[u8]) -> Result<Bytes> {
let metadata = ParquetMetaDataReader::decode_metadata(footer_bytes)
.with_context(|| format!("Failed to decode Parquet metadata for {}", path))?;
let mut ranges = Vec::new();
for row_group in metadata.row_groups() {
for col in row_group.columns() {
if let (Some(offset), Some(length)) =
(col.column_index_offset(), col.column_index_length())
{
if offset >= 0 && length > 0 {
let start =
usize::try_from(offset).context("column index offset out of range")?;
let len =
usize::try_from(length).context("column index length out of range")?;
ranges.push(start..start + len);
}
}
if let (Some(offset), Some(length)) =
(col.offset_index_offset(), col.offset_index_length())
{
if offset >= 0 && length > 0 {
let start =
usize::try_from(offset).context("offset index offset out of range")?;
let len =
usize::try_from(length).context("offset index length out of range")?;
ranges.push(start..start + len);
}
}
}
}
if ranges.is_empty() {
return Ok(Bytes::new());
}
ranges.sort_by_key(|r| r.start);
let mut merged: Vec<std::ops::Range<usize>> = Vec::with_capacity(ranges.len());
for range in ranges {
if let Some(last) = merged.last_mut() {
if range.start <= last.end {
if range.end > last.end {
last.end = range.end;
}
continue;
}
}
merged.push(range);
}
let total_size: usize = merged.iter().map(|r| r.end - r.start).sum();
let mut output = Vec::with_capacity(total_size);
for range in merged {
let chunk = read_range(path, range.start, range.end - range.start).await?;
output.extend_from_slice(&chunk);
}
Ok(Bytes::from(output))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_url_scheme_detection() {
let windows_path = "C:\\Users\\test\\file.parquet";
if let Ok(url) = Url::parse(windows_path) {
assert_eq!(url.scheme().len(), 1);
assert_eq!(url.scheme(), "c");
}
assert_eq!(Url::parse("s3://bucket/key").unwrap().scheme().len(), 2);
assert_eq!(Url::parse("file:///path").unwrap().scheme().len(), 4);
assert_eq!(Url::parse("gs://bucket/key").unwrap().scheme().len(), 2);
}
}