feldera_types/config/dev_tweaks.rs
1use serde::{Deserialize, Serialize};
2use utoipa::ToSchema;
3
4/// Optional settings for tweaking Feldera internals.
5///
6/// These settings reflect experiments that may come and go and change from
7/// version to version. Users should not consider them to be stable.
8#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, ToSchema)]
9#[serde(default)]
10pub struct DevTweaks {
11 /// Buffer-cache implementation to use for storage reads.
12 ///
13 /// The default is `s3_fifo`.
14 #[serde(skip_serializing_if = "Option::is_none")]
15 pub buffer_cache_strategy: Option<BufferCacheStrategy>,
16
17 /// Override the number of buckets/shards used by sharded buffer caches.
18 ///
19 /// This only applies when `buffer_cache_strategy = "s3_fifo"`. Values are
20 /// rounded up to the next power of two because the current implementation
21 /// shards by `hash(key) & (n - 1)`.
22 #[serde(skip_serializing_if = "Option::is_none")]
23 pub buffer_max_buckets: Option<usize>,
24
25 /// How S3-FIFO caches are assigned to foreground/background workers.
26 ///
27 /// This only applies when `buffer_cache_strategy = "s3_fifo"`. The
28 /// default is `shared_per_worker_pair`; LRU always uses `per_thread`.
29 #[serde(skip_serializing_if = "Option::is_none")]
30 pub buffer_cache_allocation_strategy: Option<BufferCacheAllocationStrategy>,
31
32 /// Target number of cached bytes retained in each `FBuf` slab size class.
33 ///
34 /// The default is 16 MiB.
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub fbuf_slab_bytes_per_class: Option<usize>,
37
38 /// Whether to asynchronously fetch keys needed for the join operator from
39 /// storage. Asynchronous fetching should be faster for high-latency
40 /// storage, such as object storage, but it could use excessive amounts of
41 /// memory if the number of keys fetched is very large.
42 #[serde(skip_serializing_if = "Option::is_none")]
43 pub fetch_join: Option<bool>,
44
45 /// Whether to asynchronously fetch keys needed for the distinct operator
46 /// from storage. Asynchronous fetching should be faster for high-latency
47 /// storage, such as object storage, but it could use excessive amounts of
48 /// memory if the number of keys fetched is very large.
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub fetch_distinct: Option<bool>,
51
52 /// Which merger to use.
53 #[serde(skip_serializing_if = "Option::is_none")]
54 pub merger: Option<MergerType>,
55
56 /// If set, the maximum amount of storage, in MiB, for the POSIX backend to
57 /// allow to be in use before failing all writes with [StorageFull]. This
58 /// is useful for testing on top of storage that does not implement its own
59 /// quota mechanism.
60 ///
61 /// [StorageFull]: std::io::ErrorKind::StorageFull
62 #[serde(skip_serializing_if = "Option::is_none")]
63 pub storage_mb_max: Option<u64>,
64
65 /// Attempt to print a stack trace on stack overflow.
66 ///
67 /// To be used for debugging only; do not enable in production.
68 // NOTE: this flag is handled manually in `adapters/src/server.rs` before
69 // parsing DevTweaks. If the name or type of this field changes, make sure to
70 // adjust `server.rs` accordingly.
71 #[serde(skip_serializing_if = "Option::is_none")]
72 pub stack_overflow_backtrace: Option<bool>,
73
74 /// Controls the maximal number of records output by splitter operators
75 /// (joins, distinct, aggregation, rolling window and group operators) at
76 /// each step.
77 ///
78 /// The default value is 10,000 records.
79 // TODO: It would be better if the value were denominated in bytes rather
80 // than records, and if it were configurable per-operator.
81 #[serde(skip_serializing_if = "Option::is_none")]
82 pub splitter_chunk_size_records: Option<u64>,
83
84 /// Enable adaptive joins.
85 ///
86 /// Adaptive joins dynamically change their partitioning policy to avoid skew.
87 ///
88 /// Adaptive joins are disabled by default.
89 #[serde(skip_serializing_if = "Option::is_none")]
90 pub adaptive_joins: Option<bool>,
91
92 /// The minimum relative improvement threshold for the join balancer.
93 ///
94 /// The join balancer is a component that dynamically chooses an optimal
95 /// partitioning policy for adaptive join operators. This parameter
96 /// prevents the join balancer from making changes to the partitioning
97 /// policy if the improvement is not significant, since the overhead of such
98 /// rebalancing, especially when performed frequently, can exceed the
99 /// benefits.
100 ///
101 /// A rebalancing is considered significant if the relative estimated
102 /// improvement for the cluster of joins where the rebalancing is applied is
103 /// at least this threshold.
104 ///
105 /// A rebalancing is applied if both this threshold and
106 /// `balancer_min_absolute_improvement_threshold` are met.
107 ///
108 /// The default value is 1.2.
109 #[serde(skip_serializing_if = "Option::is_none")]
110 pub balancer_min_relative_improvement_threshold: Option<f64>,
111
112 /// The minimum absolute improvement threshold for the balancer.
113 ///
114 /// The join balancer is a component that dynamically chooses an optimal
115 /// partitioning policy for adaptive join operators. This parameter
116 /// prevents the join balancer from making changes to the partitioning
117 /// policy if the improvement is not significant, since the overhead of such
118 /// rebalancing, especially when performed frequently, can exceed the
119 /// benefits.
120 ///
121 /// A rebalancing is considered significant if the absolute estimated
122 /// improvement for the cluster of joins where the rebalancing is applied is
123 /// at least this threshold. The cost model used by the balancer is based on
124 /// the number of records in the largest partition of a collection.
125 ///
126 /// A rebalancing is applied if both this threshold and
127 /// `balancer_min_relative_improvement_threshold` are met.
128 ///
129 /// The default value is 10,000.
130 #[serde(skip_serializing_if = "Option::is_none")]
131 pub balancer_min_absolute_improvement_threshold: Option<u64>,
132
133 /// Factor that discourages the use of the Balance policy in a perfectly balanced collection.
134 ///
135 /// Assuming a perfectly balanced key distribution, the Balance policy is slightly less efficient than Shard,
136 /// since it requires computing the hash of the entire key/value pair. This factor discourages the use of this policy
137 /// if the skew is `<balancer_balance_tax`.
138 ///
139 /// The default value is 1.1.
140 #[serde(skip_serializing_if = "Option::is_none")]
141 pub balancer_balance_tax: Option<f64>,
142
143 /// The balancer threshold for checking for an improved partitioning policy for a stream.
144 ///
145 /// Finding a good partitioning policy for a circuit involves solving an optimization problem,
146 /// which can be relatively expensive. Instead of doing this on every step, the balancer only
147 /// checks for an improved partitioning policy if the key distribution of a stream has changed
148 /// significantly since the current solution was computed. Specifically, it only kicks in when
149 /// the size of at least one shard of at least one stream in the cluster has changed by more than
150 /// this threshold.
151 ///
152 /// The default value is 0.1.
153 #[serde(skip_serializing_if = "Option::is_none")]
154 pub balancer_key_distribution_refresh_threshold: Option<f64>,
155
156 /// False-positive rate for Bloom filters on batches on storage, as a
157 /// fraction f, where 0 < f < 1.
158 ///
159 /// The false-positive rate trades off between the amount of memory used by
160 /// Bloom filters and how frequently storage needs to be searched for keys
161 /// that are not actually present. Typical false-positive rates and their
162 /// corresponding memory costs are:
163 ///
164 /// - 0.1: 4.8 bits per key
165 /// - 0.01: 9.6 bits per key
166 /// - 0.001: 14.4 bits per key
167 /// - 0.0001: 19.2 bits per key (default)
168 ///
169 /// Values outside the valid range, such as 0.0, disable Bloom filters.
170 #[serde(skip_serializing_if = "Option::is_none")]
171 pub bloom_false_positive_rate: Option<f64>,
172
173 /// Maximum batch size in records for level 0 merges.
174 #[serde(skip_serializing_if = "Option::is_none")]
175 pub max_level0_batch_size_records: Option<u16>,
176
177 /// The number of merger threads.
178 ///
179 /// The default value is equal to the number of worker threads.
180 #[serde(skip_serializing_if = "Option::is_none")]
181 pub merger_threads: Option<u16>,
182
183 /// Additional bias the merger assigns to records with negative weights
184 /// (retractions) to promote them to higher levels of the LSM tree sooner.
185 ///
186 /// Reasonable values for this parameter are in the range [0, 10].
187 ///
188 /// The default value is 0, which means that retractions are not given
189 /// any additional bias.
190 #[serde(skip_serializing_if = "Option::is_none")]
191 pub negative_weight_multiplier: Option<u16>,
192}
193
194impl DevTweaks {
195 pub fn buffer_cache_strategy(&self) -> BufferCacheStrategy {
196 self.buffer_cache_strategy.unwrap_or_default()
197 }
198 pub fn buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
199 self.buffer_cache_allocation_strategy.unwrap_or_default()
200 }
201 pub fn effective_buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
202 match self.buffer_cache_strategy() {
203 BufferCacheStrategy::S3Fifo => self.buffer_cache_allocation_strategy(),
204 BufferCacheStrategy::Lru => BufferCacheAllocationStrategy::PerThread,
205 }
206 }
207 pub fn fetch_join(&self) -> bool {
208 self.fetch_join.unwrap_or(false)
209 }
210 pub fn fetch_distinct(&self) -> bool {
211 self.fetch_distinct.unwrap_or(false)
212 }
213 pub fn merger(&self) -> MergerType {
214 self.merger.unwrap_or_default()
215 }
216 pub fn stack_overflow_backtrace(&self) -> bool {
217 self.stack_overflow_backtrace.unwrap_or(false)
218 }
219 pub fn splitter_chunk_size_records(&self) -> u64 {
220 self.splitter_chunk_size_records.unwrap_or(10_000)
221 }
222 pub fn adaptive_joins(&self) -> bool {
223 self.adaptive_joins.unwrap_or(false)
224 }
225 pub fn balancer_min_relative_improvement_threshold(&self) -> f64 {
226 self.balancer_min_relative_improvement_threshold
227 .unwrap_or(1.2)
228 }
229 pub fn balancer_min_absolute_improvement_threshold(&self) -> u64 {
230 self.balancer_min_absolute_improvement_threshold
231 .unwrap_or(10_000)
232 }
233 pub fn balancer_balance_tax(&self) -> f64 {
234 self.balancer_balance_tax.unwrap_or(1.1)
235 }
236 pub fn balancer_key_distribution_refresh_threshold(&self) -> f64 {
237 self.balancer_key_distribution_refresh_threshold
238 .unwrap_or(0.1)
239 }
240 pub fn bloom_false_positive_rate(&self) -> f64 {
241 self.bloom_false_positive_rate.unwrap_or(0.0001)
242 }
243 pub fn negative_weight_multiplier(&self) -> u16 {
244 self.negative_weight_multiplier.unwrap_or(0)
245 }
246}
247
248/// Selects which eviction strategy backs a cache instance.
249#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
250#[serde(rename_all = "snake_case")]
251pub enum BufferCacheStrategy {
252 /// Use the sharded S3-FIFO cache backed by `quick_cache`.
253 #[default]
254 S3Fifo,
255
256 /// Use the mutex-protected weighted LRU cache.
257 Lru,
258}
259
260/// Controls how caches are shared across a foreground/background worker pair.
261#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
262#[serde(rename_all = "snake_case")]
263pub enum BufferCacheAllocationStrategy {
264 /// Share one cache across a foreground/background worker pair.
265 #[default]
266 SharedPerWorkerPair,
267
268 /// Create a separate cache for each foreground/background thread.
269 PerThread,
270
271 /// Share one cache across all foreground/background threads.
272 Global,
273}
274
275/// Which merger to use.
276#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
277#[serde(rename_all = "snake_case")]
278pub enum MergerType {
279 /// Newer merger, which should be faster for high-latency storage, such as
280 /// object storage, but it likely needs tuning.
281 PushMerger,
282
283 /// The old standby, with known performance.
284 #[default]
285 ListMerger,
286}