Skip to main content

feldera_types/config/
dev_tweaks.rs

1use std::collections::BTreeMap;
2
3use serde::{Deserialize, Serialize};
4use utoipa::ToSchema;
5
6/// Optional settings for tweaking Feldera internals.
7///
8/// These settings reflect experiments that may come and go and change from
9/// version to version.  Users should not consider them to be stable.
10#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, ToSchema)]
11#[serde(default)]
12pub struct DevTweaks {
13    /// Buffer-cache implementation to use for storage reads.
14    ///
15    /// The default is `s3_fifo`.
16    #[serde(skip_serializing_if = "Option::is_none")]
17    pub buffer_cache_strategy: Option<BufferCacheStrategy>,
18
19    /// Override the number of buckets/shards used by sharded buffer caches.
20    ///
21    /// This only applies when `buffer_cache_strategy = "s3_fifo"`. Values are
22    /// rounded up to the next power of two because the current implementation
23    /// shards by `hash(key) & (n - 1)`.
24    #[serde(skip_serializing_if = "Option::is_none")]
25    pub buffer_max_buckets: Option<usize>,
26
27    /// How S3-FIFO caches are assigned to foreground/background workers.
28    ///
29    /// This only applies when `buffer_cache_strategy = "s3_fifo"`. The
30    /// default is `shared_per_worker_pair`; LRU always uses `per_thread`.
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub buffer_cache_allocation_strategy: Option<BufferCacheAllocationStrategy>,
33
34    /// Target number of cached bytes retained in each `FBuf` slab size class.
35    ///
36    /// The default is 16 MiB.
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub fbuf_slab_bytes_per_class: Option<usize>,
39
40    /// Whether to asynchronously fetch keys needed for the join operator from
41    /// storage.  Asynchronous fetching should be faster for high-latency
42    /// storage, such as object storage, but it could use excessive amounts of
43    /// memory if the number of keys fetched is very large.
44    #[serde(skip_serializing_if = "Option::is_none")]
45    pub fetch_join: Option<bool>,
46
47    /// Whether to asynchronously fetch keys needed for the distinct operator
48    /// from storage.  Asynchronous fetching should be faster for high-latency
49    /// storage, such as object storage, but it could use excessive amounts of
50    /// memory if the number of keys fetched is very large.
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub fetch_distinct: Option<bool>,
53
54    /// Which merger to use.
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub merger: Option<MergerType>,
57
58    /// If set, the maximum amount of storage, in MiB, for the POSIX backend to
59    /// allow to be in use before failing all writes with [StorageFull].  This
60    /// is useful for testing on top of storage that does not implement its own
61    /// quota mechanism.
62    ///
63    /// [StorageFull]: std::io::ErrorKind::StorageFull
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub storage_mb_max: Option<u64>,
66
67    /// Attempt to print a stack trace on stack overflow.
68    ///
69    /// To be used for debugging only; do not enable in production.
70    // NOTE: this flag is handled manually in `adapters/src/server.rs` before
71    // parsing DevTweaks. If the name or type of this field changes, make sure to
72    // adjust `server.rs` accordingly.
73    #[serde(skip_serializing_if = "Option::is_none")]
74    pub stack_overflow_backtrace: Option<bool>,
75
76    /// Controls the maximal number of records output by splitter operators
77    /// (joins, distinct, aggregation, rolling window and group operators) at
78    /// each step.
79    ///
80    /// The default value is 10,000 records.
81    // TODO: It would be better if the value were denominated in bytes rather
82    // than records, and if it were configurable per-operator.
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub splitter_chunk_size_records: Option<u64>,
85
86    /// Enable adaptive joins.
87    ///
88    /// Adaptive joins dynamically change their partitioning policy to avoid skew.
89    ///
90    /// Adaptive joins are disabled by default.
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub adaptive_joins: Option<bool>,
93
94    /// Evict eagerly from buffer caches as files get deleted.
95    ///
96    /// This is an optimization that drops files from
97    /// the cache as soon as they are deleted.
98    ///
99    /// It has unknown (no?) performance benefits from what I can tell.
100    ///
101    /// Historically it made sense to do this for two reasons:
102    /// a) we know with 100% guarantee that the file won't ever be
103    ///    read again.
104    /// b) we could do this in O(logn) time with the LRU cache.
105    ///    This is no longer true for s3-fifo where it is O(n).
106    ///
107    /// If the eviction is expensive, (many small objects in the cache)
108    /// this can cause a regression.
109    ///
110    /// New default disables this behavior by making it false.
111    ///
112    /// If this doesn't cause regression we will remove this option
113    /// in the future.
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub eager_evict: Option<bool>,
116
117    /// The minimum relative improvement threshold for the join balancer.
118    ///
119    /// The join balancer is a component that dynamically chooses an optimal
120    /// partitioning policy for adaptive join operators.  This parameter
121    /// prevents the join balancer from making changes to the partitioning
122    /// policy if the improvement is not significant, since the overhead of such
123    /// rebalancing, especially when performed frequently, can exceed the
124    /// benefits.
125    ///
126    /// A rebalancing is considered significant if the relative estimated
127    /// improvement for the cluster of joins where the rebalancing is applied is
128    /// at least this threshold.
129    ///
130    /// A rebalancing is applied if both this threshold and
131    /// `balancer_min_absolute_improvement_threshold` are met.
132    ///
133    /// The default value is 1.2.
134    #[serde(skip_serializing_if = "Option::is_none")]
135    #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
136    pub balancer_min_relative_improvement_threshold: Option<f64>,
137
138    /// The minimum absolute improvement threshold for the balancer.
139    ///
140    /// The join balancer is a component that dynamically chooses an optimal
141    /// partitioning policy for adaptive join operators.  This parameter
142    /// prevents the join balancer from making changes to the partitioning
143    /// policy if the improvement is not significant, since the overhead of such
144    /// rebalancing, especially when performed frequently, can exceed the
145    /// benefits.
146    ///
147    /// A rebalancing is considered significant if the absolute estimated
148    /// improvement for the cluster of joins where the rebalancing is applied is
149    /// at least this threshold. The cost model used by the balancer is based on
150    /// the number of records in the largest partition of a collection.
151    ///
152    /// A rebalancing is applied if both this threshold and
153    /// `balancer_min_relative_improvement_threshold` are met.
154    ///
155    /// The default value is 10,000.
156    #[serde(skip_serializing_if = "Option::is_none")]
157    pub balancer_min_absolute_improvement_threshold: Option<u64>,
158
159    /// Factor that discourages the use of the Balance policy in a perfectly balanced collection.
160    ///
161    /// Assuming a perfectly balanced key distribution, the Balance policy is slightly less efficient than Shard,
162    /// since it requires computing the hash of the entire key/value pair. This factor discourages the use of this policy
163    /// if the skew is `<balancer_balance_tax`.
164    ///
165    /// The default value is 1.1.
166    #[serde(skip_serializing_if = "Option::is_none")]
167    #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
168    pub balancer_balance_tax: Option<f64>,
169
170    /// The balancer threshold for checking for an improved partitioning policy for a stream.
171    ///
172    /// Finding a good partitioning policy for a circuit involves solving an optimization problem,
173    /// which can be relatively expensive. Instead of doing this on every step, the balancer only
174    /// checks for an improved partitioning policy if the key distribution of a stream has changed
175    /// significantly since the current solution was computed.  Specifically, it only kicks in when
176    /// the size of at least one shard of at least one stream in the cluster has changed by more than
177    /// this threshold.
178    ///
179    /// The default value is 0.1.
180    #[serde(skip_serializing_if = "Option::is_none")]
181    #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
182    pub balancer_key_distribution_refresh_threshold: Option<f64>,
183
184    /// False-positive rate for Bloom filters on batches on storage, as a
185    /// fraction f, where 0 < f < 1.
186    ///
187    /// The false-positive rate trades off between the amount of memory used by
188    /// Bloom filters and how frequently storage needs to be searched for keys
189    /// that are not actually present.  Typical false-positive rates and their
190    /// corresponding memory costs are:
191    ///
192    /// - 0.1: 4.8 bits per key
193    /// - 0.01: 9.6 bits per key
194    /// - 0.001: 14.4 bits per key
195    /// - 0.0001: 19.2 bits per key (default)
196    ///
197    /// Values outside the valid range, such as 0.0, disable Bloom filters.
198    #[serde(skip_serializing_if = "Option::is_none")]
199    #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
200    pub bloom_false_positive_rate: Option<f64>,
201
202    /// Whether file-backed batches may use roaring membership filters when the
203    /// key type supports them.
204    #[serde(skip_serializing_if = "Option::is_none")]
205    pub enable_roaring: Option<bool>,
206
207    /// Maximum batch size in records for level 0 merges.
208    #[serde(skip_serializing_if = "Option::is_none")]
209    pub max_level0_batch_size_records: Option<u16>,
210
211    /// The number of merger threads.
212    ///
213    /// The default value is equal to the number of worker threads.
214    #[serde(skip_serializing_if = "Option::is_none")]
215    pub merger_threads: Option<u16>,
216
217    /// Additional bias the merger assigns to records with negative weights
218    /// (retractions) to promote them to higher levels of the LSM tree sooner.
219    ///
220    /// Reasonable values for this parameter are in the range [0, 10].
221    ///
222    /// The default value is 0, which means that retractions are not given
223    /// any additional bias.
224    #[serde(skip_serializing_if = "Option::is_none")]
225    pub negative_weight_multiplier: Option<u16>,
226
227    /// Options not understood by this particular version.
228    ///
229    /// This allows the pipeline manager to take options that a custom or old
230    /// runtime version accepts.
231    #[serde(flatten)]
232    pub other_options: BTreeMap<String, serde_json::Value>,
233}
234
235impl DevTweaks {
236    pub fn buffer_cache_strategy(&self) -> BufferCacheStrategy {
237        self.buffer_cache_strategy.unwrap_or_default()
238    }
239    pub fn buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
240        self.buffer_cache_allocation_strategy.unwrap_or_default()
241    }
242    pub fn effective_buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
243        match self.buffer_cache_strategy() {
244            BufferCacheStrategy::S3Fifo => self.buffer_cache_allocation_strategy(),
245            BufferCacheStrategy::Lru => BufferCacheAllocationStrategy::PerThread,
246        }
247    }
248    pub fn fetch_join(&self) -> bool {
249        self.fetch_join.unwrap_or(false)
250    }
251    pub fn fetch_distinct(&self) -> bool {
252        self.fetch_distinct.unwrap_or(false)
253    }
254    pub fn merger(&self) -> MergerType {
255        self.merger.unwrap_or_default()
256    }
257    pub fn stack_overflow_backtrace(&self) -> bool {
258        self.stack_overflow_backtrace.unwrap_or(false)
259    }
260    pub fn splitter_chunk_size_records(&self) -> u64 {
261        self.splitter_chunk_size_records.unwrap_or(10_000)
262    }
263    pub fn adaptive_joins(&self) -> bool {
264        self.adaptive_joins.unwrap_or(false)
265    }
266    pub fn balancer_min_relative_improvement_threshold(&self) -> f64 {
267        self.balancer_min_relative_improvement_threshold
268            .unwrap_or(1.2)
269    }
270    pub fn balancer_min_absolute_improvement_threshold(&self) -> u64 {
271        self.balancer_min_absolute_improvement_threshold
272            .unwrap_or(10_000)
273    }
274    pub fn balancer_balance_tax(&self) -> f64 {
275        self.balancer_balance_tax.unwrap_or(1.1)
276    }
277    pub fn balancer_key_distribution_refresh_threshold(&self) -> f64 {
278        self.balancer_key_distribution_refresh_threshold
279            .unwrap_or(0.1)
280    }
281    pub fn bloom_false_positive_rate(&self) -> f64 {
282        self.bloom_false_positive_rate.unwrap_or(0.0001)
283    }
284    pub fn enable_roaring(&self) -> bool {
285        // Roaring is enabled by default, but `enable_roaring = false` remains
286        // available as a kill switch while the feature is still being tuned.
287        self.enable_roaring.unwrap_or(true)
288    }
289    pub fn negative_weight_multiplier(&self) -> u16 {
290        self.negative_weight_multiplier.unwrap_or(0)
291    }
292}
293
294/// Selects which eviction strategy backs a cache instance.
295#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
296#[serde(rename_all = "snake_case")]
297pub enum BufferCacheStrategy {
298    /// Use the sharded S3-FIFO cache backed by `quick_cache`.
299    #[default]
300    S3Fifo,
301
302    /// Use the mutex-protected weighted LRU cache.
303    Lru,
304}
305
306/// Controls how caches are shared across a foreground/background worker pair.
307#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
308#[serde(rename_all = "snake_case")]
309pub enum BufferCacheAllocationStrategy {
310    /// Share one cache across a foreground/background worker pair.
311    #[default]
312    SharedPerWorkerPair,
313
314    /// Create a separate cache for each foreground/background thread.
315    PerThread,
316
317    /// Share one cache across all foreground/background threads.
318    Global,
319}
320
321/// Which merger to use.
322#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
323#[serde(rename_all = "snake_case")]
324pub enum MergerType {
325    /// Newer merger, which should be faster for high-latency storage, such as
326    /// object storage, but it likely needs tuning.
327    PushMerger,
328
329    /// The old standby, with known performance.
330    #[default]
331    ListMerger,
332}
333
334#[cfg(test)]
335mod tests {
336    use serde_json::json;
337
338    use crate::config::{PipelineConfig, RuntimeConfig};
339
340    use super::*;
341
342    /// Regression test: `Option<f64>` fields inside `DevTweaks` must
343    /// survive a JSON-string round-trip through `PipelineConfig`, which
344    /// uses `#[serde(flatten)]` on `RuntimeConfig`. With `serde_json`'s
345    /// `arbitrary_precision` feature enabled, the serde `Content` buffer
346    /// represents numbers as maps, which breaks plain `f64`
347    /// deserialization (serde-rs/json#1157). The `serde_via_value`
348    /// workaround on each `Option<f64>` field fixes this.
349    #[test]
350    fn dev_tweaks_f64_roundtrip_through_pipeline_config() {
351        let mut rc = RuntimeConfig::default();
352        rc.dev_tweaks = DevTweaks {
353            bloom_false_positive_rate: Some(0.0),
354            balancer_balance_tax: Some(1.1),
355            balancer_min_relative_improvement_threshold: Some(1.2),
356            balancer_key_distribution_refresh_threshold: Some(0.1),
357            ..Default::default()
358        };
359        let pc = PipelineConfig {
360            global: rc,
361            multihost: None,
362            name: Some("test-pipeline".into()),
363            given_name: None,
364            storage_config: None,
365            secrets_dir: None,
366            inputs: Default::default(),
367            outputs: Default::default(),
368            program_ir: None,
369        };
370
371        // JSON string round-trip (the path the pipeline process takes).
372        let json = serde_json::to_string_pretty(&pc).unwrap();
373        let pc2: PipelineConfig = serde_json::from_str(&json)
374            .expect("JSON string round-trip of PipelineConfig with f64 dev_tweaks must succeed");
375        assert_eq!(pc2.global.dev_tweaks.bloom_false_positive_rate, Some(0.0));
376        assert_eq!(pc2.global.dev_tweaks.balancer_balance_tax, Some(1.1));
377        assert_eq!(
378            pc2.global
379                .dev_tweaks
380                .balancer_min_relative_improvement_threshold,
381            Some(1.2)
382        );
383        assert_eq!(
384            pc2.global
385                .dev_tweaks
386                .balancer_key_distribution_refresh_threshold,
387            Some(0.1)
388        );
389
390        // serde_json::Value round-trip (the path the pipeline manager takes).
391        let value = serde_json::to_value(&pc).unwrap();
392        let pc3: PipelineConfig = serde_json::from_value(value)
393            .expect("Value round-trip of PipelineConfig with f64 dev_tweaks must succeed");
394        assert_eq!(pc3.global.dev_tweaks.bloom_false_positive_rate, Some(0.0));
395    }
396
397    #[test]
398    fn other_options() {
399        let dt =
400            serde_json::from_value::<DevTweaks>(json!({"xyzzy": 1.0, "foobar": {"key": "value"}}))
401                .unwrap();
402        assert_eq!(
403            &dt.other_options,
404            &BTreeMap::from_iter([
405                (String::from("xyzzy"), json!(1.0)),
406                (String::from("foobar"), json!({"key": "value"}))
407            ]),
408        );
409    }
410}