use axum::{
body::Body,
http::{HeaderMap, StatusCode, header},
response::{IntoResponse, Response},
};
use bytes::Bytes;
use futures::stream;
use std::io::{Cursor, Write};
use tokio_stream::Stream;
use zip::{CompressionMethod, ZipWriter, write::FileOptions};
pub fn stream_parquet_zip_response(
file_buffers: Vec<Vec<u8>>,
base_name: &str,
) -> Result<Response, crate::error::ServerError> {
let zip_buffer = create_zip_from_buffers(file_buffers, base_name)?;
let stream = create_chunked_stream(zip_buffer);
let mut headers = HeaderMap::new();
headers.insert(header::CONTENT_TYPE, "application/zip".parse().unwrap());
headers.insert(
header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}.zip\"", base_name)
.parse()
.unwrap(),
);
let body = Body::from_stream(stream);
Ok((StatusCode::OK, headers, body).into_response())
}
fn create_zip_from_buffers(
file_buffers: Vec<Vec<u8>>,
base_name: &str,
) -> Result<Vec<u8>, crate::error::ServerError> {
let mut zip_buffer = Vec::new();
let cursor = Cursor::new(&mut zip_buffer);
let mut zip = ZipWriter::new(cursor);
let options = FileOptions::<()>::default()
.compression_method(CompressionMethod::Stored)
.large_file(true);
for (i, buffer) in file_buffers.iter().enumerate() {
let file_name = if i == 0 {
format!("{}.parquet", base_name)
} else {
format!("{}_{:03}.parquet", base_name, i + 1)
};
zip.start_file(file_name, options).map_err(|e| {
crate::error::ServerError::InternalError(format!(
"Failed to start ZIP file entry: {}",
e
))
})?;
zip.write_all(buffer).map_err(|e| {
crate::error::ServerError::InternalError(format!("Failed to write to ZIP: {}", e))
})?;
}
zip.finish().map_err(|e| {
crate::error::ServerError::InternalError(format!("Failed to finish ZIP: {}", e))
})?;
Ok(zip_buffer)
}
fn create_chunked_stream(
buffer: Vec<u8>,
) -> impl Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static {
const CHUNK_SIZE: usize = 65536;
let chunks: Vec<Bytes> = buffer
.chunks(CHUNK_SIZE)
.map(Bytes::copy_from_slice)
.collect();
stream::iter(chunks.into_iter().map(Ok))
}
pub fn stream_single_parquet_response(
parquet_data: Vec<u8>,
) -> Result<Response, crate::error::ServerError> {
let stream = create_chunked_stream(parquet_data);
let mut headers = HeaderMap::new();
headers.insert(header::CONTENT_TYPE, "application/parquet".parse().unwrap());
headers.insert(
header::CONTENT_DISPOSITION,
"attachment; filename=\"data.parquet\"".parse().unwrap(),
);
let body = Body::from_stream(stream);
Ok((StatusCode::OK, headers, body).into_response())
}
#[allow(dead_code)]
pub fn calculate_total_size(buffers: &[Vec<u8>]) -> usize {
buffers.iter().map(|b| b.len()).sum()
}
pub fn should_use_streaming(total_size: usize) -> bool {
const STREAMING_THRESHOLD: usize = 10 * 1024 * 1024;
total_size > STREAMING_THRESHOLD
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_calculate_total_size() {
let buffers = vec![vec![0u8; 1000], vec![0u8; 2000], vec![0u8; 3000]];
assert_eq!(calculate_total_size(&buffers), 6000);
}
#[test]
fn test_should_use_streaming() {
assert!(!should_use_streaming(1024));
assert!(should_use_streaming(20 * 1024 * 1024));
}
#[test]
fn test_create_zip_from_buffers() {
let buffers = vec![vec![1u8, 2, 3, 4], vec![5u8, 6, 7, 8]];
let zip_data = create_zip_from_buffers(buffers, "test").unwrap();
assert!(!zip_data.is_empty());
assert_eq!(&zip_data[0..2], b"PK");
}
}