Skip to main content

feldera_buffer_cache/
builder.rs

1use crate::ThreadType;
2use crate::{CacheEntry, LruCache, S3FifoCache, SharedBufferCache};
3use enum_map::{Enum, EnumMap};
4use feldera_types::config::dev_tweaks::{BufferCacheAllocationStrategy, BufferCacheStrategy};
5use std::fmt::Debug;
6use std::hash::{BuildHasher, Hash, RandomState};
7use std::marker::PhantomData;
8use tracing::warn;
9
10/// Builds the cache layout used by DBSP runtime worker pairs.
11pub struct BufferCacheBuilder<K, V, S = RandomState> {
12    /// Eviction strategy used for newly constructed caches.
13    strategy: BufferCacheStrategy,
14    /// Optional override for the sharded backend shard count.
15    max_buckets: Option<usize>,
16    /// Sharing policy across worker-pair cache slots.
17    allocation_strategy: BufferCacheAllocationStrategy,
18    /// Hash builder shared by newly constructed caches.
19    hash_builder: S,
20    /// Keeps the builder generic over the cache key and value types.
21    marker: PhantomData<fn(K) -> V>,
22}
23
24impl<K, V> BufferCacheBuilder<K, V, RandomState> {
25    /// Creates a builder that uses the default hash builder.
26    pub fn new() -> Self {
27        Self {
28            strategy: BufferCacheStrategy::default(),
29            max_buckets: None,
30            allocation_strategy: BufferCacheAllocationStrategy::default(),
31            hash_builder: RandomState::new(),
32            marker: PhantomData,
33        }
34    }
35}
36
37impl<K, V> Default for BufferCacheBuilder<K, V, RandomState> {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl<K, V, S> BufferCacheBuilder<K, V, S> {
44    /// Sets the eviction strategy for caches created by this builder.
45    pub fn with_buffer_cache_strategy(mut self, strategy: BufferCacheStrategy) -> Self {
46        self.strategy = strategy;
47        self
48    }
49
50    /// Sets the optional shard-count override for sharded backends.
51    pub fn with_buffer_max_buckets(mut self, max_buckets: Option<usize>) -> Self {
52        self.max_buckets = max_buckets;
53        self
54    }
55
56    /// Sets how caches are shared across each worker pair.
57    pub fn with_buffer_cache_allocation_strategy(
58        mut self,
59        allocation_strategy: BufferCacheAllocationStrategy,
60    ) -> Self {
61        self.allocation_strategy = allocation_strategy;
62        self
63    }
64
65    /// Sets the hash builder used for newly constructed caches.
66    pub fn with_hash_builder<NewS>(self, hash_builder: NewS) -> BufferCacheBuilder<K, V, NewS> {
67        BufferCacheBuilder {
68            strategy: self.strategy,
69            max_buckets: self.max_buckets,
70            allocation_strategy: self.allocation_strategy,
71            hash_builder,
72            marker: PhantomData,
73        }
74    }
75}
76
77impl<K, V, S> BufferCacheBuilder<K, V, S>
78where
79    K: Eq + Hash + Ord + Clone + Debug + Send + Sync + 'static,
80    V: CacheEntry + Clone + Send + Sync + 'static,
81    S: BuildHasher + Clone + Send + Sync + 'static,
82{
83    /// Builds one cache slot per [`ThreadType`] for each worker pair.
84    pub fn build(
85        &self,
86        worker_pairs: usize,
87        total_capacity_bytes: usize,
88    ) -> Vec<EnumMap<ThreadType, SharedBufferCache<K, V>>> {
89        if worker_pairs == 0 {
90            return Vec::new();
91        }
92
93        let per_thread_capacity = total_capacity_bytes / worker_pairs / ThreadType::LENGTH;
94        let per_pair_capacity = total_capacity_bytes / worker_pairs;
95
96        match self.strategy {
97            BufferCacheStrategy::Lru => {
98                if self.allocation_strategy == BufferCacheAllocationStrategy::Global {
99                    warn!(
100                        "unsupported buffer cache allocation strategy {:?} set in dev_tweaks for LRU cache, falling back to `per_thread`",
101                        self.allocation_strategy
102                    );
103                }
104                (0..worker_pairs)
105                    .map(|_| self.build_thread_slots(per_thread_capacity))
106                    .collect()
107            }
108            BufferCacheStrategy::S3Fifo => match self.allocation_strategy {
109                BufferCacheAllocationStrategy::PerThread => (0..worker_pairs)
110                    .map(|_| self.build_thread_slots(per_thread_capacity))
111                    .collect(),
112                BufferCacheAllocationStrategy::SharedPerWorkerPair => (0..worker_pairs)
113                    .map(|_| Self::shared_thread_slots(self.build_s3_fifo(per_pair_capacity)))
114                    .collect(),
115                BufferCacheAllocationStrategy::Global => {
116                    let cache = self.build_s3_fifo(total_capacity_bytes);
117                    (0..worker_pairs)
118                        .map(|_| Self::shared_thread_slots(cache.clone()))
119                        .collect()
120                }
121            },
122        }
123    }
124
125    /// Builds one cache instance using the currently selected strategy.
126    pub fn build_single(&self, capacity_bytes: usize) -> SharedBufferCache<K, V> {
127        match self.strategy {
128            BufferCacheStrategy::Lru => self.build_lru(capacity_bytes),
129            BufferCacheStrategy::S3Fifo => self.build_s3_fifo(capacity_bytes),
130        }
131    }
132
133    /// Constructs a weighted LRU cache.
134    fn build_lru(&self, capacity_bytes: usize) -> SharedBufferCache<K, V> {
135        let cache: SharedBufferCache<K, V> = std::sync::Arc::new(LruCache::with_hasher(
136            capacity_bytes,
137            self.hash_builder.clone(),
138        ));
139        cache
140    }
141
142    /// Constructs a sharded S3-FIFO cache.
143    fn build_s3_fifo(&self, capacity_bytes: usize) -> SharedBufferCache<K, V> {
144        let shards = normalize_sharded_cache_shards(self.max_buckets);
145        let cache: SharedBufferCache<K, V> = std::sync::Arc::new(S3FifoCache::with_hasher(
146            capacity_bytes,
147            shards,
148            self.hash_builder.clone(),
149        ));
150        cache
151    }
152
153    /// Builds a separate cache slot for each thread type.
154    fn build_thread_slots(
155        &self,
156        capacity_bytes: usize,
157    ) -> EnumMap<ThreadType, SharedBufferCache<K, V>> {
158        EnumMap::from_fn(|_| self.build_single(capacity_bytes))
159    }
160
161    /// Reuses a single cache backend for every thread-type slot in a worker pair.
162    fn shared_thread_slots(
163        cache: SharedBufferCache<K, V>,
164    ) -> EnumMap<ThreadType, SharedBufferCache<K, V>> {
165        EnumMap::from_fn(|_| cache.clone())
166    }
167}
168
169/// Normalizes an optional bucket count to the shard count expected by the
170/// sharded S3-FIFO backend.
171fn normalize_sharded_cache_shards(max_buckets: Option<usize>) -> usize {
172    match max_buckets {
173        None | Some(0) => S3FifoCache::<(), ()>::DEFAULT_SHARDS,
174        Some(buckets) => buckets
175            .checked_next_power_of_two()
176            .unwrap_or(S3FifoCache::<(), ()>::DEFAULT_SHARDS),
177    }
178}