aisle 0.2.0

Metadata-driven Parquet pruning for Rust: Skip irrelevant data before reading
Documentation
use std::{collections::HashMap, future::Future};

use parquet::{
    arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStreamBuilder},
    bloom_filter::Sbbf,
};

/// Async bloom filter provider trait for custom I/O strategies.
///
/// This trait allows users to implement custom bloom filter loading strategies,
/// including concurrent loading, caching, connection pooling, or batching.
///
/// # Default Implementation
///
/// [`ParquetRecordBatchStreamBuilder`] implements this trait automatically,
/// providing sequential bloom filter loading via the Parquet async API.
///
/// # Custom Implementations
///
/// Advanced users can implement this trait to optimize bloom filter loading
/// for their specific storage backend (S3, GCS, Azure, etc.):
///
/// ```rust,ignore
/// use aisle::AsyncBloomFilterProvider;
/// use parquet::bloom_filter::Sbbf;
/// use std::collections::HashMap;
///
/// struct ConcurrentBloomProvider {
///     // Connection pool, cache, etc.
/// }
///
/// impl AsyncBloomFilterProvider for ConcurrentBloomProvider {
///     async fn bloom_filter(
///         &mut self,
///         row_group_idx: usize,
///         column_idx: usize,
///     ) -> Option<Sbbf> {
///         // Custom logic: check cache, load if needed
///         self.cache.get(&(row_group_idx, column_idx)).cloned()
///     }
///
///     // Override batch method for concurrent loading
///     async fn bloom_filters_batch<'a>(
///         &'a mut self,
///         requests: &'a [(usize, usize)],
///     ) -> HashMap<(usize, usize), Sbbf> {
///         // Load all filters concurrently via get_byte_ranges
///         self.load_batch_concurrent(requests).await
///     }
/// }
/// ```
///
/// # Performance Considerations
///
/// The default implementation loads bloom filters **sequentially** within each
/// row group. For remote storage (S3/GCS), this can be slow. Custom providers
/// can optimize by:
///
/// - **Batching**: Override `bloom_filters_batch` to use `get_byte_ranges`
/// - **Caching**: Store frequently-used bloom filters in memory
/// - **Connection pooling**: Use multiple connections for parallel requests
/// - **Prefetching**: Load bloom filters for upcoming row groups
///
/// # Example: Cached Provider
///
/// ```rust,ignore
/// use std::collections::HashMap;
/// use parquet::bloom_filter::Sbbf;
///
/// struct CachedBloomProvider {
///     reader: AsyncFileReader,
///     cache: HashMap<(usize, usize), Sbbf>,
/// }
///
/// impl AsyncBloomFilterProvider for CachedBloomProvider {
///     async fn bloom_filter(
///         &mut self,
///         row_group_idx: usize,
///         column_idx: usize,
///     ) -> Option<Sbbf> {
///         let key = (row_group_idx, column_idx);
///
///         // Return cached if available
///         if let Some(filter) = self.cache.get(&key) {
///             return Some(filter.clone());
///         }
///
///         // Load and cache
///         if let Some(filter) = self.load_from_reader(key).await {
///             self.cache.insert(key, filter.clone());
///             Some(filter)
///         } else {
///             None
///         }
///     }
/// }
/// ```
pub trait AsyncBloomFilterProvider {
    /// Load the bloom filter for a specific column in a row group.
    ///
    /// Returns `None` if the bloom filter is not available (missing, corrupted,
    /// or I/O error).
    ///
    /// # Parameters
    ///
    /// - `row_group_idx`: Zero-based row group index
    /// - `column_idx`: Zero-based column index in the Parquet schema
    ///
    /// # Returns
    ///
    /// - `Some(Sbbf)`: Successfully loaded bloom filter
    /// - `None`: Bloom filter unavailable (missing or error)
    ///
    /// # Concurrency
    ///
    /// This method takes `&mut self`, so concurrent calls are not possible
    /// at the trait level. Custom providers can implement internal concurrency
    /// using channels, pools, or override `bloom_filters_batch` for batching.
    fn bloom_filter(
        &mut self,
        row_group_idx: usize,
        column_idx: usize,
    ) -> impl Future<Output = Option<Sbbf>> + '_;

    /// Batch bloom filter lookup for multiple (row_group, column) pairs.
    ///
    /// Default implementation executes requests sequentially by calling
    /// `bloom_filter`. **Custom providers should override this method**
    /// to enable concurrent or batched I/O for better performance.
    ///
    /// # Parameters
    ///
    /// - `requests`: Slice of `(row_group_idx, column_idx)` pairs to load
    ///
    /// # Returns
    ///
    /// HashMap mapping `(row_group_idx, column_idx)` to loaded bloom filters.
    /// Missing or failed filters are omitted from the result.
    ///
    /// # Performance
    ///
    /// The default sequential implementation is simple but can be slow for
    /// remote storage. Override this method to:
    ///
    /// - Use `AsyncFileReader::get_byte_ranges` for single HTTP request
    /// - Load filters concurrently via connection pool
    /// - Implement request coalescing or deduplication
    ///
    /// # Example: Batched Implementation
    ///
    /// ```rust,ignore
    /// async fn bloom_filters_batch<'a>(
    ///     &'a mut self,
    ///     requests: &'a [(usize, usize)],
    /// ) -> HashMap<(usize, usize), Sbbf> {
    ///     // Collect byte ranges for all bloom filters
    ///     let ranges: Vec<Range<u64>> = requests
    ///         .iter()
    ///         .map(|&(rg, col)| self.get_bloom_range(rg, col))
    ///         .collect();
    ///
    ///     // Single batched HTTP request
    ///     let bytes_vec = self.reader.get_byte_ranges(ranges).await.unwrap();
    ///
    ///     // Parse and return
    ///     requests.iter()
    ///         .zip(bytes_vec)
    ///         .filter_map(|(key, bytes)| {
    ///             Sbbf::from_bytes(&bytes).ok().map(|f| (*key, f))
    ///         })
    ///         .collect()
    /// }
    /// ```
    fn bloom_filters_batch<'a>(
        &'a mut self,
        requests: &'a [(usize, usize)],
    ) -> impl Future<Output = HashMap<(usize, usize), Sbbf>> + 'a {
        async move {
            let mut result = HashMap::new();
            for &(row_group_idx, column_idx) in requests {
                if let Some(filter) = self.bloom_filter(row_group_idx, column_idx).await {
                    result.insert((row_group_idx, column_idx), filter);
                }
            }
            result
        }
    }
}

impl<T: AsyncFileReader + Send + 'static> AsyncBloomFilterProvider
    for ParquetRecordBatchStreamBuilder<T>
{
    fn bloom_filter(
        &mut self,
        row_group_idx: usize,
        column_idx: usize,
    ) -> impl Future<Output = Option<Sbbf>> + '_ {
        async move {
            match self
                .get_row_group_column_bloom_filter(row_group_idx, column_idx)
                .await
            {
                Ok(Some(filter)) => Some(filter),
                _ => None,
            }
        }
    }
}