use crate::{data_cache::DataCache, sync::Arc};
use mountpoint_s3_client::ObjectClient;
use crate::{Runtime, mem_limiter::MemoryLimiter};
use super::caching_stream::CachingPartStream;
use super::part_stream::{ClientPartStream, PartStream};
use super::{Prefetcher, PrefetcherConfig};
pub struct PrefetcherBuilder<Client> {
inner: Box<dyn PrefetcherBuild<Client>>,
}
impl<Client> PrefetcherBuilder<Client>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
pub fn default_builder(client: Client) -> Self {
Self {
inner: Box::new(DefaultPrefetcherBuilder { client }),
}
}
pub fn caching_builder<Cache>(cache: Cache, client: Client) -> Self
where
Cache: DataCache + Send + Sync + 'static,
{
Self {
inner: Box::new(CachingPrefetcherBuilder { cache, client }),
}
}
pub fn build(
self,
runtime: Runtime,
mem_limiter: Arc<MemoryLimiter>,
prefetcher_config: PrefetcherConfig,
) -> Prefetcher<Client> {
self.inner.build(runtime, mem_limiter, prefetcher_config)
}
}
trait PrefetcherBuild<Client>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
fn build(
self: Box<Self>,
runtime: Runtime,
mem_limiter: Arc<MemoryLimiter>,
prefetcher_config: PrefetcherConfig,
) -> Prefetcher<Client>;
}
struct DefaultPrefetcherBuilder<Client> {
client: Client,
}
impl<Client> PrefetcherBuild<Client> for DefaultPrefetcherBuilder<Client>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
fn build(
self: Box<Self>,
runtime: Runtime,
mem_limiter: Arc<MemoryLimiter>,
prefetcher_config: PrefetcherConfig,
) -> Prefetcher<Client> {
let part_stream = ClientPartStream::new(runtime, self.client, mem_limiter.clone());
Prefetcher::new(PartStream::new(part_stream), prefetcher_config, mem_limiter)
}
}
struct CachingPrefetcherBuilder<Cache, Client> {
cache: Cache,
client: Client,
}
impl<Cache, Client> PrefetcherBuild<Client> for CachingPrefetcherBuilder<Cache, Client>
where
Cache: DataCache + Send + Sync + 'static,
Client: ObjectClient + Clone + Send + Sync + 'static,
{
fn build(
self: Box<Self>,
runtime: Runtime,
mem_limiter: Arc<MemoryLimiter>,
prefetcher_config: PrefetcherConfig,
) -> Prefetcher<Client> {
let part_stream = CachingPartStream::new(runtime, self.client, mem_limiter.clone(), self.cache);
Prefetcher::new(PartStream::new(part_stream), prefetcher_config, mem_limiter)
}
}