feldera_types/config/dev_tweaks.rs
1use std::collections::BTreeMap;
2
3use serde::{Deserialize, Serialize};
4use utoipa::ToSchema;
5
6/// Optional settings for tweaking Feldera internals.
7///
8/// These settings reflect experiments that may come and go and change from
9/// version to version. Users should not consider them to be stable.
10#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, ToSchema)]
11#[serde(default)]
12pub struct DevTweaks {
13 /// Buffer-cache implementation to use for storage reads.
14 ///
15 /// The default is `s3_fifo`.
16 #[serde(skip_serializing_if = "Option::is_none")]
17 pub buffer_cache_strategy: Option<BufferCacheStrategy>,
18
19 /// Override the number of buckets/shards used by sharded buffer caches.
20 ///
21 /// This only applies when `buffer_cache_strategy = "s3_fifo"`. Values are
22 /// rounded up to the next power of two because the current implementation
23 /// shards by `hash(key) & (n - 1)`.
24 #[serde(skip_serializing_if = "Option::is_none")]
25 pub buffer_max_buckets: Option<usize>,
26
27 /// How S3-FIFO caches are assigned to foreground/background workers.
28 ///
29 /// This only applies when `buffer_cache_strategy = "s3_fifo"`. The
30 /// default is `shared_per_worker_pair`; LRU always uses `per_thread`.
31 #[serde(skip_serializing_if = "Option::is_none")]
32 pub buffer_cache_allocation_strategy: Option<BufferCacheAllocationStrategy>,
33
34 /// Target number of cached bytes retained in each `FBuf` slab size class.
35 ///
36 /// The default is 16 MiB.
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub fbuf_slab_bytes_per_class: Option<usize>,
39
40 /// Whether to asynchronously fetch keys needed for the join operator from
41 /// storage. Asynchronous fetching should be faster for high-latency
42 /// storage, such as object storage, but it could use excessive amounts of
43 /// memory if the number of keys fetched is very large.
44 #[serde(skip_serializing_if = "Option::is_none")]
45 pub fetch_join: Option<bool>,
46
47 /// Whether to asynchronously fetch keys needed for the distinct operator
48 /// from storage. Asynchronous fetching should be faster for high-latency
49 /// storage, such as object storage, but it could use excessive amounts of
50 /// memory if the number of keys fetched is very large.
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub fetch_distinct: Option<bool>,
53
54 /// Which merger to use.
55 #[serde(skip_serializing_if = "Option::is_none")]
56 pub merger: Option<MergerType>,
57
58 /// If set, the maximum amount of storage, in MiB, for the POSIX backend to
59 /// allow to be in use before failing all writes with [StorageFull]. This
60 /// is useful for testing on top of storage that does not implement its own
61 /// quota mechanism.
62 ///
63 /// [StorageFull]: std::io::ErrorKind::StorageFull
64 #[serde(skip_serializing_if = "Option::is_none")]
65 pub storage_mb_max: Option<u64>,
66
67 /// Attempt to print a stack trace on stack overflow.
68 ///
69 /// To be used for debugging only; do not enable in production.
70 // NOTE: this flag is handled manually in `adapters/src/server.rs` before
71 // parsing DevTweaks. If the name or type of this field changes, make sure to
72 // adjust `server.rs` accordingly.
73 #[serde(skip_serializing_if = "Option::is_none")]
74 pub stack_overflow_backtrace: Option<bool>,
75
76 /// Controls the maximal number of records output by splitter operators
77 /// (joins, distinct, aggregation, rolling window and group operators) at
78 /// each step.
79 ///
80 /// The default value is 10,000 records.
81 // TODO: It would be better if the value were denominated in bytes rather
82 // than records, and if it were configurable per-operator.
83 #[serde(skip_serializing_if = "Option::is_none")]
84 pub splitter_chunk_size_records: Option<u64>,
85
86 /// Enable adaptive joins.
87 ///
88 /// Adaptive joins dynamically change their partitioning policy to avoid skew.
89 ///
90 /// Adaptive joins are disabled by default.
91 #[serde(skip_serializing_if = "Option::is_none")]
92 pub adaptive_joins: Option<bool>,
93
94 /// Evict eagerly from buffer caches as files get deleted.
95 ///
96 /// This is an optimization that drops files from
97 /// the cache as soon as they are deleted.
98 ///
99 /// It has unknown (no?) performance benefits from what I can tell.
100 ///
101 /// Historically it made sense to do this for two reasons:
102 /// a) we know with 100% guarantee that the file won't ever be
103 /// read again.
104 /// b) we could do this in O(logn) time with the LRU cache.
105 /// This is no longer true for s3-fifo where it is O(n).
106 ///
107 /// If the eviction is expensive, (many small objects in the cache)
108 /// this can cause a regression.
109 ///
110 /// New default disables this behavior by making it false.
111 ///
112 /// If this doesn't cause regression we will remove this option
113 /// in the future.
114 #[serde(skip_serializing_if = "Option::is_none")]
115 pub eager_evict: Option<bool>,
116
117 /// The minimum relative improvement threshold for the join balancer.
118 ///
119 /// The join balancer is a component that dynamically chooses an optimal
120 /// partitioning policy for adaptive join operators. This parameter
121 /// prevents the join balancer from making changes to the partitioning
122 /// policy if the improvement is not significant, since the overhead of such
123 /// rebalancing, especially when performed frequently, can exceed the
124 /// benefits.
125 ///
126 /// A rebalancing is considered significant if the relative estimated
127 /// improvement for the cluster of joins where the rebalancing is applied is
128 /// at least this threshold.
129 ///
130 /// A rebalancing is applied if both this threshold and
131 /// `balancer_min_absolute_improvement_threshold` are met.
132 ///
133 /// The default value is 1.2.
134 #[serde(skip_serializing_if = "Option::is_none")]
135 #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
136 pub balancer_min_relative_improvement_threshold: Option<f64>,
137
138 /// The minimum absolute improvement threshold for the balancer.
139 ///
140 /// The join balancer is a component that dynamically chooses an optimal
141 /// partitioning policy for adaptive join operators. This parameter
142 /// prevents the join balancer from making changes to the partitioning
143 /// policy if the improvement is not significant, since the overhead of such
144 /// rebalancing, especially when performed frequently, can exceed the
145 /// benefits.
146 ///
147 /// A rebalancing is considered significant if the absolute estimated
148 /// improvement for the cluster of joins where the rebalancing is applied is
149 /// at least this threshold. The cost model used by the balancer is based on
150 /// the number of records in the largest partition of a collection.
151 ///
152 /// A rebalancing is applied if both this threshold and
153 /// `balancer_min_relative_improvement_threshold` are met.
154 ///
155 /// The default value is 10,000.
156 #[serde(skip_serializing_if = "Option::is_none")]
157 pub balancer_min_absolute_improvement_threshold: Option<u64>,
158
159 /// Factor that discourages the use of the Balance policy in a perfectly balanced collection.
160 ///
161 /// Assuming a perfectly balanced key distribution, the Balance policy is slightly less efficient than Shard,
162 /// since it requires computing the hash of the entire key/value pair. This factor discourages the use of this policy
163 /// if the skew is `<balancer_balance_tax`.
164 ///
165 /// The default value is 1.1.
166 #[serde(skip_serializing_if = "Option::is_none")]
167 #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
168 pub balancer_balance_tax: Option<f64>,
169
170 /// The balancer threshold for checking for an improved partitioning policy for a stream.
171 ///
172 /// Finding a good partitioning policy for a circuit involves solving an optimization problem,
173 /// which can be relatively expensive. Instead of doing this on every step, the balancer only
174 /// checks for an improved partitioning policy if the key distribution of a stream has changed
175 /// significantly since the current solution was computed. Specifically, it only kicks in when
176 /// the size of at least one shard of at least one stream in the cluster has changed by more than
177 /// this threshold.
178 ///
179 /// The default value is 0.1.
180 #[serde(skip_serializing_if = "Option::is_none")]
181 #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
182 pub balancer_key_distribution_refresh_threshold: Option<f64>,
183
184 /// False-positive rate for Bloom filters on batches on storage, as a
185 /// fraction f, where 0 < f < 1.
186 ///
187 /// The false-positive rate trades off between the amount of memory used by
188 /// Bloom filters and how frequently storage needs to be searched for keys
189 /// that are not actually present. Typical false-positive rates and their
190 /// corresponding memory costs are:
191 ///
192 /// - 0.1: 4.8 bits per key
193 /// - 0.01: 9.6 bits per key
194 /// - 0.001: 14.4 bits per key
195 /// - 0.0001: 19.2 bits per key (default)
196 ///
197 /// Values outside the valid range, such as 0.0, disable Bloom filters.
198 #[serde(skip_serializing_if = "Option::is_none")]
199 #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
200 pub bloom_false_positive_rate: Option<f64>,
201
202 /// Whether file-backed batches may use roaring membership filters when the
203 /// key type supports them.
204 #[serde(skip_serializing_if = "Option::is_none")]
205 pub enable_roaring: Option<bool>,
206
207 /// Maximum batch size in records for level 0 merges.
208 #[serde(skip_serializing_if = "Option::is_none")]
209 pub max_level0_batch_size_records: Option<u16>,
210
211 /// The number of merger threads.
212 ///
213 /// The default value is equal to the number of worker threads.
214 #[serde(skip_serializing_if = "Option::is_none")]
215 pub merger_threads: Option<u16>,
216
217 /// Additional bias the merger assigns to records with negative weights
218 /// (retractions) to promote them to higher levels of the LSM tree sooner.
219 ///
220 /// Reasonable values for this parameter are in the range [0, 10].
221 ///
222 /// The default value is 0, which means that retractions are not given
223 /// any additional bias.
224 #[serde(skip_serializing_if = "Option::is_none")]
225 pub negative_weight_multiplier: Option<u16>,
226
227 /// Options not understood by this particular version.
228 ///
229 /// This allows the pipeline manager to take options that a custom or old
230 /// runtime version accepts.
231 #[serde(flatten)]
232 pub other_options: BTreeMap<String, serde_json::Value>,
233}
234
235impl DevTweaks {
236 pub fn buffer_cache_strategy(&self) -> BufferCacheStrategy {
237 self.buffer_cache_strategy.unwrap_or_default()
238 }
239 pub fn buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
240 self.buffer_cache_allocation_strategy.unwrap_or_default()
241 }
242 pub fn effective_buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
243 match self.buffer_cache_strategy() {
244 BufferCacheStrategy::S3Fifo => self.buffer_cache_allocation_strategy(),
245 BufferCacheStrategy::Lru => BufferCacheAllocationStrategy::PerThread,
246 }
247 }
248 pub fn fetch_join(&self) -> bool {
249 self.fetch_join.unwrap_or(false)
250 }
251 pub fn fetch_distinct(&self) -> bool {
252 self.fetch_distinct.unwrap_or(false)
253 }
254 pub fn merger(&self) -> MergerType {
255 self.merger.unwrap_or_default()
256 }
257 pub fn stack_overflow_backtrace(&self) -> bool {
258 self.stack_overflow_backtrace.unwrap_or(false)
259 }
260 pub fn splitter_chunk_size_records(&self) -> u64 {
261 self.splitter_chunk_size_records.unwrap_or(10_000)
262 }
263 pub fn adaptive_joins(&self) -> bool {
264 self.adaptive_joins.unwrap_or(false)
265 }
266 pub fn balancer_min_relative_improvement_threshold(&self) -> f64 {
267 self.balancer_min_relative_improvement_threshold
268 .unwrap_or(1.2)
269 }
270 pub fn balancer_min_absolute_improvement_threshold(&self) -> u64 {
271 self.balancer_min_absolute_improvement_threshold
272 .unwrap_or(10_000)
273 }
274 pub fn balancer_balance_tax(&self) -> f64 {
275 self.balancer_balance_tax.unwrap_or(1.1)
276 }
277 pub fn balancer_key_distribution_refresh_threshold(&self) -> f64 {
278 self.balancer_key_distribution_refresh_threshold
279 .unwrap_or(0.1)
280 }
281 pub fn bloom_false_positive_rate(&self) -> f64 {
282 self.bloom_false_positive_rate.unwrap_or(0.0001)
283 }
284 pub fn enable_roaring(&self) -> bool {
285 // Roaring is enabled by default, but `enable_roaring = false` remains
286 // available as a kill switch while the feature is still being tuned.
287 self.enable_roaring.unwrap_or(true)
288 }
289 pub fn negative_weight_multiplier(&self) -> u16 {
290 self.negative_weight_multiplier.unwrap_or(0)
291 }
292}
293
294/// Selects which eviction strategy backs a cache instance.
295#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
296#[serde(rename_all = "snake_case")]
297pub enum BufferCacheStrategy {
298 /// Use the sharded S3-FIFO cache backed by `quick_cache`.
299 #[default]
300 S3Fifo,
301
302 /// Use the mutex-protected weighted LRU cache.
303 Lru,
304}
305
306/// Controls how caches are shared across a foreground/background worker pair.
307#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
308#[serde(rename_all = "snake_case")]
309pub enum BufferCacheAllocationStrategy {
310 /// Share one cache across a foreground/background worker pair.
311 #[default]
312 SharedPerWorkerPair,
313
314 /// Create a separate cache for each foreground/background thread.
315 PerThread,
316
317 /// Share one cache across all foreground/background threads.
318 Global,
319}
320
321/// Which merger to use.
322#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
323#[serde(rename_all = "snake_case")]
324pub enum MergerType {
325 /// Newer merger, which should be faster for high-latency storage, such as
326 /// object storage, but it likely needs tuning.
327 PushMerger,
328
329 /// The old standby, with known performance.
330 #[default]
331 ListMerger,
332}
333
334#[cfg(test)]
335mod tests {
336 use serde_json::json;
337
338 use crate::config::{PipelineConfig, RuntimeConfig};
339
340 use super::*;
341
342 /// Regression test: `Option<f64>` fields inside `DevTweaks` must
343 /// survive a JSON-string round-trip through `PipelineConfig`, which
344 /// uses `#[serde(flatten)]` on `RuntimeConfig`. With `serde_json`'s
345 /// `arbitrary_precision` feature enabled, the serde `Content` buffer
346 /// represents numbers as maps, which breaks plain `f64`
347 /// deserialization (serde-rs/json#1157). The `serde_via_value`
348 /// workaround on each `Option<f64>` field fixes this.
349 #[test]
350 fn dev_tweaks_f64_roundtrip_through_pipeline_config() {
351 let mut rc = RuntimeConfig::default();
352 rc.dev_tweaks = DevTweaks {
353 bloom_false_positive_rate: Some(0.0),
354 balancer_balance_tax: Some(1.1),
355 balancer_min_relative_improvement_threshold: Some(1.2),
356 balancer_key_distribution_refresh_threshold: Some(0.1),
357 ..Default::default()
358 };
359 let pc = PipelineConfig {
360 global: rc,
361 multihost: None,
362 name: Some("test-pipeline".into()),
363 given_name: None,
364 storage_config: None,
365 secrets_dir: None,
366 inputs: Default::default(),
367 outputs: Default::default(),
368 program_ir: None,
369 };
370
371 // JSON string round-trip (the path the pipeline process takes).
372 let json = serde_json::to_string_pretty(&pc).unwrap();
373 let pc2: PipelineConfig = serde_json::from_str(&json)
374 .expect("JSON string round-trip of PipelineConfig with f64 dev_tweaks must succeed");
375 assert_eq!(pc2.global.dev_tweaks.bloom_false_positive_rate, Some(0.0));
376 assert_eq!(pc2.global.dev_tweaks.balancer_balance_tax, Some(1.1));
377 assert_eq!(
378 pc2.global
379 .dev_tweaks
380 .balancer_min_relative_improvement_threshold,
381 Some(1.2)
382 );
383 assert_eq!(
384 pc2.global
385 .dev_tweaks
386 .balancer_key_distribution_refresh_threshold,
387 Some(0.1)
388 );
389
390 // serde_json::Value round-trip (the path the pipeline manager takes).
391 let value = serde_json::to_value(&pc).unwrap();
392 let pc3: PipelineConfig = serde_json::from_value(value)
393 .expect("Value round-trip of PipelineConfig with f64 dev_tweaks must succeed");
394 assert_eq!(pc3.global.dev_tweaks.bloom_false_positive_rate, Some(0.0));
395 }
396
397 #[test]
398 fn other_options() {
399 let dt =
400 serde_json::from_value::<DevTweaks>(json!({"xyzzy": 1.0, "foobar": {"key": "value"}}))
401 .unwrap();
402 assert_eq!(
403 &dt.other_options,
404 &BTreeMap::from_iter([
405 (String::from("xyzzy"), json!(1.0)),
406 (String::from("foobar"), json!({"key": "value"}))
407 ]),
408 );
409 }
410}