use std::path::{Path, PathBuf};
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
};
use anyhow::{Result, anyhow};
use converge_storage::object_store::ObjectStoreExt;
use converge_storage::{ObjectPath, ObjectStore};
use polars::prelude::*;
pub async fn fetch_parquet(
store: &dyn ObjectStore,
key: &str,
cache_dir: &Path,
) -> Result<PathBuf> {
fetch_to_cache(store, key, cache_dir).await
}
pub async fn fetch_to_cache(
store: &dyn ObjectStore,
key: &str,
cache_dir: &Path,
) -> Result<PathBuf> {
let local_path = cache_path_for_key(key, cache_dir);
if local_path.exists() {
tracing::debug!(path = %local_path.display(), "using cached file");
return Ok(local_path);
}
std::fs::create_dir_all(cache_dir)?;
let obj_path = ObjectPath::from(key);
let result: converge_storage::GetResult = store.get(&obj_path).await?;
let bytes = result.bytes().await?;
std::fs::write(&local_path, &bytes)?;
tracing::info!(
key,
path = %local_path.display(),
bytes = bytes.len(),
"fetched from storage to cache"
);
Ok(local_path)
}
fn cache_path_for_key(key: &str, cache_dir: &Path) -> PathBuf {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
let hash = hasher.finish();
let filename = key
.rsplit('/')
.next()
.filter(|part| !part.is_empty())
.unwrap_or("blob");
let filename = filename
.chars()
.map(|ch| match ch {
':' | '<' | '>' | '"' | '\\' | '|' | '?' | '*' => '_',
_ if ch.is_ascii_control() => '_',
_ => ch,
})
.collect::<String>();
cache_dir.join(format!("{hash:016x}-{filename}"))
}
pub async fn write_parquet_to_store(
store: &dyn ObjectStore,
key: &str,
df: &mut DataFrame,
) -> Result<()> {
let mut buf = Vec::new();
ParquetWriter::new(&mut buf).finish(df)?;
let obj_path = ObjectPath::from(key);
let payload = converge_storage::object_store::PutPayload::from(buf);
store
.put(&obj_path, payload)
.await
.map_err(|e| anyhow!("failed to write parquet to storage: {e}"))?;
tracing::info!(key, "wrote parquet to storage");
Ok(())
}
pub fn scan_local_parquet(path: &Path) -> Result<LazyFrame> {
let path_str = path
.to_str()
.ok_or_else(|| anyhow!("path is not valid utf-8: {:?}", path))?;
let pl_path = PlPath::from_str(path_str);
Ok(LazyFrame::scan_parquet(pl_path, Default::default())?)
}
#[cfg(test)]
mod tests {
use super::*;
use converge_storage::object_store::memory::InMemory;
#[tokio::test]
async fn roundtrip_parquet_through_store() {
let store = InMemory::new();
let cache_dir = tempfile::tempdir().unwrap();
let mut df = df! {
"id" => &[1i64, 2, 3],
"value" => &[10.0f64, 20.0, 30.0],
}
.unwrap();
write_parquet_to_store(&store, "test/data.parquet", &mut df)
.await
.unwrap();
let local_path = fetch_parquet(&store, "test/data.parquet", cache_dir.path())
.await
.unwrap();
let result = tokio::task::spawn_blocking(move || {
scan_local_parquet(&local_path).unwrap().collect().unwrap()
})
.await
.unwrap();
assert_eq!(result.height(), 3);
assert_eq!(result.width(), 2);
}
#[tokio::test]
async fn fetch_uses_cache_on_second_call() {
let store = InMemory::new();
let cache_dir = tempfile::tempdir().unwrap();
let obj_path = ObjectPath::from("cached.parquet");
let payload = converge_storage::object_store::PutPayload::from(vec![1u8, 2, 3]);
store.put(&obj_path, payload).await.unwrap();
let path1 = fetch_to_cache(&store, "cached.parquet", cache_dir.path())
.await
.unwrap();
assert!(path1.exists());
let path2 = fetch_to_cache(&store, "cached.parquet", cache_dir.path())
.await
.unwrap();
assert_eq!(path1, path2);
}
#[tokio::test]
async fn fetch_distinguishes_keys_with_same_basename() {
let store = InMemory::new();
let cache_dir = tempfile::tempdir().unwrap();
let path_a = ObjectPath::from("tenant-a/data.parquet");
let path_b = ObjectPath::from("tenant-b/data.parquet");
store
.put(
&path_a,
converge_storage::object_store::PutPayload::from(vec![1u8, 2, 3]),
)
.await
.unwrap();
store
.put(
&path_b,
converge_storage::object_store::PutPayload::from(vec![4u8, 5, 6]),
)
.await
.unwrap();
let local_a = fetch_to_cache(&store, "tenant-a/data.parquet", cache_dir.path())
.await
.unwrap();
let local_b = fetch_to_cache(&store, "tenant-b/data.parquet", cache_dir.path())
.await
.unwrap();
assert_ne!(local_a, local_b);
assert_eq!(std::fs::read(local_a).unwrap(), vec![1u8, 2, 3]);
assert_eq!(std::fs::read(local_b).unwrap(), vec![4u8, 5, 6]);
}
}