Skip to main content

feldera_types/config/
dev_tweaks.rs

1use std::collections::BTreeMap;
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use utoipa::ToSchema;
6
7/// Optional settings for tweaking Feldera internals.
8///
9/// These settings reflect experiments that may come and go and change from
10/// version to version.  Users should not consider them to be stable.
11#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, ToSchema)]
12#[serde(default)]
13pub struct DevTweaks {
14    /// Buffer-cache implementation to use for storage reads.
15    ///
16    /// The default is `s3_fifo`.
17    #[serde(skip_serializing_if = "Option::is_none")]
18    pub buffer_cache_strategy: Option<BufferCacheStrategy>,
19
20    /// Override the number of buckets/shards used by sharded buffer caches.
21    ///
22    /// This only applies when `buffer_cache_strategy = "s3_fifo"`. Values are
23    /// rounded up to the next power of two because the current implementation
24    /// shards by `hash(key) & (n - 1)`.
25    #[serde(skip_serializing_if = "Option::is_none")]
26    pub buffer_max_buckets: Option<usize>,
27
28    /// How S3-FIFO caches are assigned to foreground/background workers.
29    ///
30    /// This only applies when `buffer_cache_strategy = "s3_fifo"`. The
31    /// default is `shared_per_worker_pair`; LRU always uses `per_thread`.
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub buffer_cache_allocation_strategy: Option<BufferCacheAllocationStrategy>,
34
35    /// Target number of cached bytes retained in each `FBuf` slab size class.
36    ///
37    /// The default is 16 MiB.
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub fbuf_slab_bytes_per_class: Option<usize>,
40
41    /// Whether to asynchronously fetch keys needed for the join operator from
42    /// storage.  Asynchronous fetching should be faster for high-latency
43    /// storage, such as object storage, but it could use excessive amounts of
44    /// memory if the number of keys fetched is very large.
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub fetch_join: Option<bool>,
47
48    /// Whether to asynchronously fetch keys needed for the distinct operator
49    /// from storage.  Asynchronous fetching should be faster for high-latency
50    /// storage, such as object storage, but it could use excessive amounts of
51    /// memory if the number of keys fetched is very large.
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub fetch_distinct: Option<bool>,
54
55    /// Which merger to use.
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub merger: Option<MergerType>,
58
59    /// If set, the maximum amount of storage, in MiB, for the POSIX backend to
60    /// allow to be in use before failing all writes with `StorageFull`.  This
61    /// is useful for testing on top of storage that does not implement its own
62    /// quota mechanism.
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub storage_mb_max: Option<u64>,
65
66    /// Attempt to print a stack trace on stack overflow.
67    ///
68    /// To be used for debugging only; do not enable in production.
69    // NOTE: this flag is handled manually in `adapters/src/server.rs` before
70    // parsing DevTweaks. If the name or type of this field changes, make sure to
71    // adjust `server.rs` accordingly.
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub stack_overflow_backtrace: Option<bool>,
74
75    /// Controls the maximal number of records output by splitter operators
76    /// (joins, distinct, aggregation, rolling window and group operators) at
77    /// each step.
78    ///
79    /// The default value is 10,000 records.
80    // TODO: It would be better if the value were denominated in bytes rather
81    // than records, and if it were configurable per-operator.
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub splitter_chunk_size_records: Option<u64>,
84
85    /// Enable adaptive joins.
86    ///
87    /// Adaptive joins dynamically change their partitioning policy to avoid skew.
88    ///
89    /// Adaptive joins are disabled by default.
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub adaptive_joins: Option<bool>,
92
93    /// Evict eagerly from buffer caches as files get deleted.
94    ///
95    /// This is an optimization that drops files from
96    /// the cache as soon as they are deleted.
97    ///
98    /// It has unknown (no?) performance benefits from what I can tell.
99    ///
100    /// Historically it made sense to do this for two reasons:
101    /// a) we know with 100% guarantee that the file won't ever be
102    ///    read again.
103    /// b) we could do this in O(logn) time with the LRU cache.
104    ///    This is no longer true for s3-fifo where it is O(n).
105    ///
106    /// If the eviction is expensive, (many small objects in the cache)
107    /// this can cause a regression.
108    ///
109    /// New default disables this behavior by making it false.
110    ///
111    /// If this doesn't cause regression we will remove this option
112    /// in the future.
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub eager_evict: Option<bool>,
115
116    /// The minimum relative improvement threshold for the join balancer.
117    ///
118    /// The join balancer is a component that dynamically chooses an optimal
119    /// partitioning policy for adaptive join operators.  This parameter
120    /// prevents the join balancer from making changes to the partitioning
121    /// policy if the improvement is not significant, since the overhead of such
122    /// rebalancing, especially when performed frequently, can exceed the
123    /// benefits.
124    ///
125    /// A rebalancing is considered significant if the relative estimated
126    /// improvement for the cluster of joins where the rebalancing is applied is
127    /// at least this threshold.
128    ///
129    /// A rebalancing is applied if both this threshold and
130    /// `balancer_min_absolute_improvement_threshold` are met.
131    ///
132    /// The default value is 1.2.
133    #[serde(skip_serializing_if = "Option::is_none")]
134    #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
135    pub balancer_min_relative_improvement_threshold: Option<f64>,
136
137    /// The minimum absolute improvement threshold for the balancer.
138    ///
139    /// The join balancer is a component that dynamically chooses an optimal
140    /// partitioning policy for adaptive join operators.  This parameter
141    /// prevents the join balancer from making changes to the partitioning
142    /// policy if the improvement is not significant, since the overhead of such
143    /// rebalancing, especially when performed frequently, can exceed the
144    /// benefits.
145    ///
146    /// A rebalancing is considered significant if the absolute estimated
147    /// improvement for the cluster of joins where the rebalancing is applied is
148    /// at least this threshold. The cost model used by the balancer is based on
149    /// the number of records in the largest partition of a collection.
150    ///
151    /// A rebalancing is applied if both this threshold and
152    /// `balancer_min_relative_improvement_threshold` are met.
153    ///
154    /// The default value is 10,000.
155    #[serde(skip_serializing_if = "Option::is_none")]
156    pub balancer_min_absolute_improvement_threshold: Option<u64>,
157
158    /// Factor that discourages the use of the Balance policy in a perfectly balanced collection.
159    ///
160    /// Assuming a perfectly balanced key distribution, the Balance policy is slightly less efficient than Shard,
161    /// since it requires computing the hash of the entire key/value pair. This factor discourages the use of this policy
162    /// if the skew is `<balancer_balance_tax`.
163    ///
164    /// The default value is 1.1.
165    #[serde(skip_serializing_if = "Option::is_none")]
166    #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
167    pub balancer_balance_tax: Option<f64>,
168
169    /// The balancer threshold for checking for an improved partitioning policy for a stream.
170    ///
171    /// Finding a good partitioning policy for a circuit involves solving an optimization problem,
172    /// which can be relatively expensive. Instead of doing this on every step, the balancer only
173    /// checks for an improved partitioning policy if the key distribution of a stream has changed
174    /// significantly since the current solution was computed.  Specifically, it only kicks in when
175    /// the size of at least one shard of at least one stream in the cluster has changed by more than
176    /// this threshold.
177    ///
178    /// The default value is 0.1.
179    #[serde(skip_serializing_if = "Option::is_none")]
180    #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
181    pub balancer_key_distribution_refresh_threshold: Option<f64>,
182
183    /// False-positive rate for Bloom filters on batches on storage, as a
184    /// fraction f, where 0 < f < 1.
185    ///
186    /// The false-positive rate trades off between the amount of memory used by
187    /// Bloom filters and how frequently storage needs to be searched for keys
188    /// that are not actually present.  Typical false-positive rates and their
189    /// corresponding memory costs are:
190    ///
191    /// - 0.1: 4.8 bits per key
192    /// - 0.01: 9.6 bits per key
193    /// - 0.001: 14.4 bits per key
194    /// - 0.0001: 19.2 bits per key (default)
195    ///
196    /// Values outside the valid range, such as 0.0, disable Bloom filters.
197    #[serde(skip_serializing_if = "Option::is_none")]
198    #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
199    pub bloom_false_positive_rate: Option<f64>,
200
201    /// Whether file-backed batches may use roaring membership filters when the
202    /// key type supports them.
203    #[serde(skip_serializing_if = "Option::is_none")]
204    pub enable_roaring: Option<bool>,
205
206    /// Maximum batch size in records for level 0 merges.
207    #[serde(skip_serializing_if = "Option::is_none")]
208    pub max_level0_batch_size_records: Option<u16>,
209
210    /// The number of merger threads.
211    ///
212    /// The default value is equal to the number of worker threads.
213    #[serde(skip_serializing_if = "Option::is_none")]
214    pub merger_threads: Option<u16>,
215
216    /// Additional bias the merger assigns to records with negative weights
217    /// (retractions) to promote them to higher levels of the LSM tree sooner.
218    ///
219    /// Reasonable values for this parameter are in the range [0, 10].
220    ///
221    /// The default value is 0, which means that retractions are not given
222    /// any additional bias.
223    #[serde(skip_serializing_if = "Option::is_none")]
224    pub negative_weight_multiplier: Option<u16>,
225
226    /// Don't automatically start a transaction for every step.
227    #[serde(skip_serializing_if = "Option::is_none")]
228    pub disable_auto_transaction: Option<bool>,
229
230    /// Override the timestamp returned by SQL `NOW()` at pipeline start.
231    ///
232    /// When set, the clock connector anchors `NOW()` to this RFC 3339
233    /// timestamp the first time the pipeline starts and advances at
234    /// wall-clock cadence from there:
235    /// `NOW() = now_offset + (wall_clock - wall_clock_at_start)`.
236    ///
237    /// Any RFC 3339 timestamp parseable by `chrono::DateTime<Utc>` is
238    /// accepted (years `0001` through `9999`), in the past or future
239    /// relative to wall clock.
240    ///
241    /// This is a testing knob for queries that depend on `NOW()`.
242    ///
243    /// On resume the clock continues from the last journaled `NOW()`;
244    /// `now_offset`'s value is honored only on a fresh start:
245    ///
246    /// | Initial run | Resume from checkpoint | Post-replay `NOW()` |
247    /// |---|---|---|
248    /// | no offset | no offset | wall clock (unchanged) |
249    /// | offset    | offset    | wall-clock pace from the last journaled value; the new offset value is ignored |
250    /// | offset    | no offset | jumps to wall clock (explicit opt-out of the anchor) |
251    /// | no offset | offset    | wall-clock pace from the last journaled value; the new offset value is ignored |
252    #[serde(skip_serializing_if = "Option::is_none")]
253    pub now_offset: Option<DateTime<Utc>>,
254
255    /// Drive `NOW()` from an external HTTP endpoint instead of wall clock.
256    ///
257    /// When `true`, the clock connector emits one initial tick (using
258    /// `now_offset` if set, otherwise wall clock) and then holds that
259    /// value.  Subsequent calls to `POST /clock/advance` move `NOW()`
260    /// forward by the requested delta.  Negative deltas are rejected;
261    /// the clock is forward-only.
262    #[serde(skip_serializing_if = "Option::is_none")]
263    pub now_http_driven: Option<bool>,
264
265    /// Options not understood by this particular version.
266    ///
267    /// This allows the pipeline manager to take options that a custom or old
268    /// runtime version accepts.
269    #[serde(flatten)]
270    pub other_options: BTreeMap<String, serde_json::Value>,
271}
272
273impl DevTweaks {
274    pub fn buffer_cache_strategy(&self) -> BufferCacheStrategy {
275        self.buffer_cache_strategy.unwrap_or_default()
276    }
277    pub fn buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
278        self.buffer_cache_allocation_strategy.unwrap_or_default()
279    }
280    pub fn effective_buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
281        match self.buffer_cache_strategy() {
282            BufferCacheStrategy::S3Fifo => self.buffer_cache_allocation_strategy(),
283            BufferCacheStrategy::Lru => BufferCacheAllocationStrategy::PerThread,
284        }
285    }
286    pub fn fetch_join(&self) -> bool {
287        self.fetch_join.unwrap_or(false)
288    }
289    pub fn fetch_distinct(&self) -> bool {
290        self.fetch_distinct.unwrap_or(false)
291    }
292    pub fn merger(&self) -> MergerType {
293        self.merger.unwrap_or_default()
294    }
295    pub fn stack_overflow_backtrace(&self) -> bool {
296        self.stack_overflow_backtrace.unwrap_or(false)
297    }
298    pub fn splitter_chunk_size_records(&self) -> u64 {
299        self.splitter_chunk_size_records.unwrap_or(10_000)
300    }
301    pub fn adaptive_joins(&self) -> bool {
302        self.adaptive_joins.unwrap_or(false)
303    }
304    pub fn balancer_min_relative_improvement_threshold(&self) -> f64 {
305        self.balancer_min_relative_improvement_threshold
306            .unwrap_or(1.2)
307    }
308    pub fn balancer_min_absolute_improvement_threshold(&self) -> u64 {
309        self.balancer_min_absolute_improvement_threshold
310            .unwrap_or(10_000)
311    }
312    pub fn balancer_balance_tax(&self) -> f64 {
313        self.balancer_balance_tax.unwrap_or(1.1)
314    }
315    pub fn balancer_key_distribution_refresh_threshold(&self) -> f64 {
316        self.balancer_key_distribution_refresh_threshold
317            .unwrap_or(0.1)
318    }
319    pub fn bloom_false_positive_rate(&self) -> f64 {
320        self.bloom_false_positive_rate.unwrap_or(0.0001)
321    }
322    pub fn enable_roaring(&self) -> bool {
323        // Roaring is enabled by default, but `enable_roaring = false` remains
324        // available as a kill switch while the feature is still being tuned.
325        self.enable_roaring.unwrap_or(true)
326    }
327    pub fn negative_weight_multiplier(&self) -> u16 {
328        self.negative_weight_multiplier.unwrap_or(0)
329    }
330
331    pub fn disable_auto_transaction(&self) -> bool {
332        self.disable_auto_transaction.unwrap_or(false)
333    }
334
335    /// Configured `now_offset` as milliseconds since the Unix epoch,
336    /// or `None` if no override is set.
337    pub fn now_offset_ms(&self) -> Option<i64> {
338        self.now_offset.map(|target| target.timestamp_millis())
339    }
340
341    pub fn now_http_driven(&self) -> bool {
342        self.now_http_driven.unwrap_or(false)
343    }
344}
345
346/// Selects which eviction strategy backs a cache instance.
347#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
348#[serde(rename_all = "snake_case")]
349pub enum BufferCacheStrategy {
350    /// Use the sharded S3-FIFO cache backed by `quick_cache`.
351    #[default]
352    S3Fifo,
353
354    /// Use the mutex-protected weighted LRU cache.
355    Lru,
356}
357
358/// Controls how caches are shared across a foreground/background worker pair.
359#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
360#[serde(rename_all = "snake_case")]
361pub enum BufferCacheAllocationStrategy {
362    /// Share one cache across a foreground/background worker pair.
363    #[default]
364    SharedPerWorkerPair,
365
366    /// Create a separate cache for each foreground/background thread.
367    PerThread,
368
369    /// Share one cache across all foreground/background threads.
370    Global,
371}
372
373/// Which merger to use.
374#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
375#[serde(rename_all = "snake_case")]
376pub enum MergerType {
377    /// Newer merger, which should be faster for high-latency storage, such as
378    /// object storage, but it likely needs tuning.
379    PushMerger,
380
381    /// The old standby, with known performance.
382    #[default]
383    ListMerger,
384}
385
386#[cfg(test)]
387mod tests {
388    use serde_json::json;
389
390    use crate::config::{PipelineConfig, RuntimeConfig};
391
392    use super::*;
393
394    /// Regression test: `Option<f64>` fields inside `DevTweaks` must
395    /// survive a JSON-string round-trip through `PipelineConfig`, which
396    /// uses `#[serde(flatten)]` on `RuntimeConfig`. With `serde_json`'s
397    /// `arbitrary_precision` feature enabled, the serde `Content` buffer
398    /// represents numbers as maps, which breaks plain `f64`
399    /// deserialization (serde-rs/json#1157). The `serde_via_value`
400    /// workaround on each `Option<f64>` field fixes this.
401    #[test]
402    fn dev_tweaks_f64_roundtrip_through_pipeline_config() {
403        let rc = RuntimeConfig {
404            dev_tweaks: DevTweaks {
405                bloom_false_positive_rate: Some(0.0),
406                balancer_balance_tax: Some(1.1),
407                balancer_min_relative_improvement_threshold: Some(1.2),
408                balancer_key_distribution_refresh_threshold: Some(0.1),
409                ..Default::default()
410            },
411            ..Default::default()
412        };
413        let pc = PipelineConfig {
414            global: rc,
415            multihost: None,
416            name: Some("test-pipeline".into()),
417            given_name: None,
418            storage_config: None,
419            secrets_dir: None,
420            inputs: Default::default(),
421            outputs: Default::default(),
422            program_ir: None,
423        };
424
425        // JSON string round-trip (the path the pipeline process takes).
426        let json = serde_json::to_string_pretty(&pc).unwrap();
427        let pc2: PipelineConfig = serde_json::from_str(&json)
428            .expect("JSON string round-trip of PipelineConfig with f64 dev_tweaks must succeed");
429        assert_eq!(pc2.global.dev_tweaks.bloom_false_positive_rate, Some(0.0));
430        assert_eq!(pc2.global.dev_tweaks.balancer_balance_tax, Some(1.1));
431        assert_eq!(
432            pc2.global
433                .dev_tweaks
434                .balancer_min_relative_improvement_threshold,
435            Some(1.2)
436        );
437        assert_eq!(
438            pc2.global
439                .dev_tweaks
440                .balancer_key_distribution_refresh_threshold,
441            Some(0.1)
442        );
443
444        // serde_json::Value round-trip (the path the pipeline manager takes).
445        let value = serde_json::to_value(&pc).unwrap();
446        let pc3: PipelineConfig = serde_json::from_value(value)
447            .expect("Value round-trip of PipelineConfig with f64 dev_tweaks must succeed");
448        assert_eq!(pc3.global.dev_tweaks.bloom_false_positive_rate, Some(0.0));
449    }
450
451    #[test]
452    fn other_options() {
453        let dt =
454            serde_json::from_value::<DevTweaks>(json!({"xyzzy": 1.0, "foobar": {"key": "value"}}))
455                .unwrap();
456        assert_eq!(
457            &dt.other_options,
458            &BTreeMap::from_iter([
459                (String::from("xyzzy"), json!(1.0)),
460                (String::from("foobar"), json!({"key": "value"}))
461            ]),
462        );
463    }
464}