Skip to main content

feldera_buffer_cache/
builder.rs

1use crate::ThreadType;
2use crate::{CacheEntry, LruCache, S3FifoCache, SharedBufferCache};
3use enum_map::{Enum, EnumMap};
4use serde::{Deserialize, Serialize};
5use std::fmt::Debug;
6use std::hash::{BuildHasher, Hash, RandomState};
7use std::marker::PhantomData;
8use tracing::warn;
9
10/// Selects which eviction strategy backs a cache instance.
11#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
12#[serde(rename_all = "snake_case")]
13pub enum BufferCacheStrategy {
14    /// Use the sharded S3-FIFO cache backed by `quick_cache`.
15    #[default]
16    S3Fifo,
17
18    /// Use the mutex-protected weighted LRU cache.
19    Lru,
20}
21
22/// Controls how caches are shared across a foreground/background worker pair.
23#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum BufferCacheAllocationStrategy {
26    /// Share one cache across a foreground/background worker pair.
27    #[default]
28    SharedPerWorkerPair,
29
30    /// Create a separate cache for each foreground/background thread.
31    PerThread,
32
33    /// Share one cache across all foreground/background threads.
34    Global,
35}
36
37/// Builds the cache layout used by DBSP runtime worker pairs.
38pub struct BufferCacheBuilder<K, V, S = RandomState> {
39    /// Eviction strategy used for newly constructed caches.
40    strategy: BufferCacheStrategy,
41    /// Optional override for the sharded backend shard count.
42    max_buckets: Option<usize>,
43    /// Sharing policy across worker-pair cache slots.
44    allocation_strategy: BufferCacheAllocationStrategy,
45    /// Hash builder shared by newly constructed caches.
46    hash_builder: S,
47    /// Keeps the builder generic over the cache key and value types.
48    marker: PhantomData<fn(K) -> V>,
49}
50
51impl<K, V> BufferCacheBuilder<K, V, RandomState> {
52    /// Creates a builder that uses the default hash builder.
53    pub fn new() -> Self {
54        Self {
55            strategy: BufferCacheStrategy::default(),
56            max_buckets: None,
57            allocation_strategy: BufferCacheAllocationStrategy::default(),
58            hash_builder: RandomState::new(),
59            marker: PhantomData,
60        }
61    }
62}
63
64impl<K, V> Default for BufferCacheBuilder<K, V, RandomState> {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70impl<K, V, S> BufferCacheBuilder<K, V, S> {
71    /// Sets the eviction strategy for caches created by this builder.
72    pub fn with_buffer_cache_strategy(mut self, strategy: BufferCacheStrategy) -> Self {
73        self.strategy = strategy;
74        self
75    }
76
77    /// Sets the optional shard-count override for sharded backends.
78    pub fn with_buffer_max_buckets(mut self, max_buckets: Option<usize>) -> Self {
79        self.max_buckets = max_buckets;
80        self
81    }
82
83    /// Sets how caches are shared across each worker pair.
84    pub fn with_buffer_cache_allocation_strategy(
85        mut self,
86        allocation_strategy: BufferCacheAllocationStrategy,
87    ) -> Self {
88        self.allocation_strategy = allocation_strategy;
89        self
90    }
91
92    /// Sets the hash builder used for newly constructed caches.
93    pub fn with_hash_builder<NewS>(self, hash_builder: NewS) -> BufferCacheBuilder<K, V, NewS> {
94        BufferCacheBuilder {
95            strategy: self.strategy,
96            max_buckets: self.max_buckets,
97            allocation_strategy: self.allocation_strategy,
98            hash_builder,
99            marker: PhantomData,
100        }
101    }
102}
103
104impl<K, V, S> BufferCacheBuilder<K, V, S>
105where
106    K: Eq + Hash + Ord + Clone + Debug + Send + Sync + 'static,
107    V: CacheEntry + Clone + Send + Sync + 'static,
108    S: BuildHasher + Clone + Send + Sync + 'static,
109{
110    /// Builds one cache slot per [`ThreadType`] for each worker pair.
111    pub fn build(
112        &self,
113        worker_pairs: usize,
114        total_capacity_bytes: usize,
115    ) -> Vec<EnumMap<ThreadType, SharedBufferCache<K, V>>> {
116        if worker_pairs == 0 {
117            return Vec::new();
118        }
119
120        let per_thread_capacity = total_capacity_bytes / worker_pairs / ThreadType::LENGTH;
121        let per_pair_capacity = total_capacity_bytes / worker_pairs;
122
123        match self.strategy {
124            BufferCacheStrategy::Lru => {
125                if self.allocation_strategy == BufferCacheAllocationStrategy::Global {
126                    warn!(
127                        "unsupported buffer cache allocation strategy {:?} set in dev_tweaks for LRU cache, falling back to `per_thread`",
128                        self.allocation_strategy
129                    );
130                }
131                (0..worker_pairs)
132                    .map(|_| self.build_thread_slots(per_thread_capacity))
133                    .collect()
134            }
135            BufferCacheStrategy::S3Fifo => match self.allocation_strategy {
136                BufferCacheAllocationStrategy::PerThread => (0..worker_pairs)
137                    .map(|_| self.build_thread_slots(per_thread_capacity))
138                    .collect(),
139                BufferCacheAllocationStrategy::SharedPerWorkerPair => (0..worker_pairs)
140                    .map(|_| Self::shared_thread_slots(self.build_s3_fifo(per_pair_capacity)))
141                    .collect(),
142                BufferCacheAllocationStrategy::Global => {
143                    let cache = self.build_s3_fifo(total_capacity_bytes);
144                    (0..worker_pairs)
145                        .map(|_| Self::shared_thread_slots(cache.clone()))
146                        .collect()
147                }
148            },
149        }
150    }
151
152    /// Builds one cache instance using the currently selected strategy.
153    pub fn build_single(&self, capacity_bytes: usize) -> SharedBufferCache<K, V> {
154        match self.strategy {
155            BufferCacheStrategy::Lru => self.build_lru(capacity_bytes),
156            BufferCacheStrategy::S3Fifo => self.build_s3_fifo(capacity_bytes),
157        }
158    }
159
160    /// Constructs a weighted LRU cache.
161    fn build_lru(&self, capacity_bytes: usize) -> SharedBufferCache<K, V> {
162        let cache: SharedBufferCache<K, V> = std::sync::Arc::new(LruCache::with_hasher(
163            capacity_bytes,
164            self.hash_builder.clone(),
165        ));
166        cache
167    }
168
169    /// Constructs a sharded S3-FIFO cache.
170    fn build_s3_fifo(&self, capacity_bytes: usize) -> SharedBufferCache<K, V> {
171        let shards = normalize_sharded_cache_shards(self.max_buckets);
172        let cache: SharedBufferCache<K, V> = std::sync::Arc::new(S3FifoCache::with_hasher(
173            capacity_bytes,
174            shards,
175            self.hash_builder.clone(),
176        ));
177        cache
178    }
179
180    /// Builds a separate cache slot for each thread type.
181    fn build_thread_slots(
182        &self,
183        capacity_bytes: usize,
184    ) -> EnumMap<ThreadType, SharedBufferCache<K, V>> {
185        EnumMap::from_fn(|_| self.build_single(capacity_bytes))
186    }
187
188    /// Reuses a single cache backend for every thread-type slot in a worker pair.
189    fn shared_thread_slots(
190        cache: SharedBufferCache<K, V>,
191    ) -> EnumMap<ThreadType, SharedBufferCache<K, V>> {
192        EnumMap::from_fn(|_| cache.clone())
193    }
194}
195
196/// Normalizes an optional bucket count to the shard count expected by the
197/// sharded S3-FIFO backend.
198fn normalize_sharded_cache_shards(max_buckets: Option<usize>) -> usize {
199    match max_buckets {
200        None | Some(0) => S3FifoCache::<(), ()>::DEFAULT_SHARDS,
201        Some(buckets) => buckets
202            .checked_next_power_of_two()
203            .unwrap_or(S3FifoCache::<(), ()>::DEFAULT_SHARDS),
204    }
205}