1use crate::ThreadType;
2use crate::{CacheEntry, LruCache, S3FifoCache, SharedBufferCache};
3use enum_map::{Enum, EnumMap};
4use serde::{Deserialize, Serialize};
5use std::fmt::Debug;
6use std::hash::{BuildHasher, Hash, RandomState};
7use std::marker::PhantomData;
8use tracing::warn;
9
10#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
12#[serde(rename_all = "snake_case")]
13pub enum BufferCacheStrategy {
14 #[default]
16 S3Fifo,
17
18 Lru,
20}
21
22#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum BufferCacheAllocationStrategy {
26 #[default]
28 SharedPerWorkerPair,
29
30 PerThread,
32
33 Global,
35}
36
37pub struct BufferCacheBuilder<K, V, S = RandomState> {
39 strategy: BufferCacheStrategy,
41 max_buckets: Option<usize>,
43 allocation_strategy: BufferCacheAllocationStrategy,
45 hash_builder: S,
47 marker: PhantomData<fn(K) -> V>,
49}
50
51impl<K, V> BufferCacheBuilder<K, V, RandomState> {
52 pub fn new() -> Self {
54 Self {
55 strategy: BufferCacheStrategy::default(),
56 max_buckets: None,
57 allocation_strategy: BufferCacheAllocationStrategy::default(),
58 hash_builder: RandomState::new(),
59 marker: PhantomData,
60 }
61 }
62}
63
64impl<K, V> Default for BufferCacheBuilder<K, V, RandomState> {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl<K, V, S> BufferCacheBuilder<K, V, S> {
71 pub fn with_buffer_cache_strategy(mut self, strategy: BufferCacheStrategy) -> Self {
73 self.strategy = strategy;
74 self
75 }
76
77 pub fn with_buffer_max_buckets(mut self, max_buckets: Option<usize>) -> Self {
79 self.max_buckets = max_buckets;
80 self
81 }
82
83 pub fn with_buffer_cache_allocation_strategy(
85 mut self,
86 allocation_strategy: BufferCacheAllocationStrategy,
87 ) -> Self {
88 self.allocation_strategy = allocation_strategy;
89 self
90 }
91
92 pub fn with_hash_builder<NewS>(self, hash_builder: NewS) -> BufferCacheBuilder<K, V, NewS> {
94 BufferCacheBuilder {
95 strategy: self.strategy,
96 max_buckets: self.max_buckets,
97 allocation_strategy: self.allocation_strategy,
98 hash_builder,
99 marker: PhantomData,
100 }
101 }
102}
103
104impl<K, V, S> BufferCacheBuilder<K, V, S>
105where
106 K: Eq + Hash + Ord + Clone + Debug + Send + Sync + 'static,
107 V: CacheEntry + Clone + Send + Sync + 'static,
108 S: BuildHasher + Clone + Send + Sync + 'static,
109{
110 pub fn build(
112 &self,
113 worker_pairs: usize,
114 total_capacity_bytes: usize,
115 ) -> Vec<EnumMap<ThreadType, SharedBufferCache<K, V>>> {
116 if worker_pairs == 0 {
117 return Vec::new();
118 }
119
120 let per_thread_capacity = total_capacity_bytes / worker_pairs / ThreadType::LENGTH;
121 let per_pair_capacity = total_capacity_bytes / worker_pairs;
122
123 match self.strategy {
124 BufferCacheStrategy::Lru => {
125 if self.allocation_strategy == BufferCacheAllocationStrategy::Global {
126 warn!(
127 "unsupported buffer cache allocation strategy {:?} set in dev_tweaks for LRU cache, falling back to `per_thread`",
128 self.allocation_strategy
129 );
130 }
131 (0..worker_pairs)
132 .map(|_| self.build_thread_slots(per_thread_capacity))
133 .collect()
134 }
135 BufferCacheStrategy::S3Fifo => match self.allocation_strategy {
136 BufferCacheAllocationStrategy::PerThread => (0..worker_pairs)
137 .map(|_| self.build_thread_slots(per_thread_capacity))
138 .collect(),
139 BufferCacheAllocationStrategy::SharedPerWorkerPair => (0..worker_pairs)
140 .map(|_| Self::shared_thread_slots(self.build_s3_fifo(per_pair_capacity)))
141 .collect(),
142 BufferCacheAllocationStrategy::Global => {
143 let cache = self.build_s3_fifo(total_capacity_bytes);
144 (0..worker_pairs)
145 .map(|_| Self::shared_thread_slots(cache.clone()))
146 .collect()
147 }
148 },
149 }
150 }
151
152 pub fn build_single(&self, capacity_bytes: usize) -> SharedBufferCache<K, V> {
154 match self.strategy {
155 BufferCacheStrategy::Lru => self.build_lru(capacity_bytes),
156 BufferCacheStrategy::S3Fifo => self.build_s3_fifo(capacity_bytes),
157 }
158 }
159
160 fn build_lru(&self, capacity_bytes: usize) -> SharedBufferCache<K, V> {
162 let cache: SharedBufferCache<K, V> = std::sync::Arc::new(LruCache::with_hasher(
163 capacity_bytes,
164 self.hash_builder.clone(),
165 ));
166 cache
167 }
168
169 fn build_s3_fifo(&self, capacity_bytes: usize) -> SharedBufferCache<K, V> {
171 let shards = normalize_sharded_cache_shards(self.max_buckets);
172 let cache: SharedBufferCache<K, V> = std::sync::Arc::new(S3FifoCache::with_hasher(
173 capacity_bytes,
174 shards,
175 self.hash_builder.clone(),
176 ));
177 cache
178 }
179
180 fn build_thread_slots(
182 &self,
183 capacity_bytes: usize,
184 ) -> EnumMap<ThreadType, SharedBufferCache<K, V>> {
185 EnumMap::from_fn(|_| self.build_single(capacity_bytes))
186 }
187
188 fn shared_thread_slots(
190 cache: SharedBufferCache<K, V>,
191 ) -> EnumMap<ThreadType, SharedBufferCache<K, V>> {
192 EnumMap::from_fn(|_| cache.clone())
193 }
194}
195
196fn normalize_sharded_cache_shards(max_buckets: Option<usize>) -> usize {
199 match max_buckets {
200 None | Some(0) => S3FifoCache::<(), ()>::DEFAULT_SHARDS,
201 Some(buckets) => buckets
202 .checked_next_power_of_two()
203 .unwrap_or(S3FifoCache::<(), ()>::DEFAULT_SHARDS),
204 }
205}