Skip to main content

feldera_types/config/
dev_tweaks.rs

1use std::collections::BTreeMap;
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use utoipa::ToSchema;
6
7/// Optional settings for tweaking Feldera internals.
8///
9/// These settings reflect experiments that may come and go and change from
10/// version to version.  Users should not consider them to be stable.
11#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, ToSchema)]
12#[serde(default)]
13pub struct DevTweaks {
14    /// Buffer-cache implementation to use for storage reads.
15    ///
16    /// The default is `s3_fifo`.
17    #[serde(skip_serializing_if = "Option::is_none")]
18    pub buffer_cache_strategy: Option<BufferCacheStrategy>,
19
20    /// Override the number of buckets/shards used by sharded buffer caches.
21    ///
22    /// This only applies when `buffer_cache_strategy = "s3_fifo"`. Values are
23    /// rounded up to the next power of two because the current implementation
24    /// shards by `hash(key) & (n - 1)`.
25    #[serde(skip_serializing_if = "Option::is_none")]
26    pub buffer_max_buckets: Option<usize>,
27
28    /// How S3-FIFO caches are assigned to foreground/background workers.
29    ///
30    /// This only applies when `buffer_cache_strategy = "s3_fifo"`. The
31    /// default is `shared_per_worker_pair`; LRU always uses `per_thread`.
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub buffer_cache_allocation_strategy: Option<BufferCacheAllocationStrategy>,
34
35    /// Target number of cached bytes retained in each `FBuf` slab size class.
36    ///
37    /// The default is 16 MiB.
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub fbuf_slab_bytes_per_class: Option<usize>,
40
41    /// Whether to asynchronously fetch keys needed for the join operator from
42    /// storage.  Asynchronous fetching should be faster for high-latency
43    /// storage, such as object storage, but it could use excessive amounts of
44    /// memory if the number of keys fetched is very large.
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub fetch_join: Option<bool>,
47
48    /// Whether to asynchronously fetch keys needed for the distinct operator
49    /// from storage.  Asynchronous fetching should be faster for high-latency
50    /// storage, such as object storage, but it could use excessive amounts of
51    /// memory if the number of keys fetched is very large.
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub fetch_distinct: Option<bool>,
54
55    /// Which merger to use.
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub merger: Option<MergerType>,
58
59    /// If set, the maximum amount of storage, in MiB, for the POSIX backend to
60    /// allow to be in use before failing all writes with `StorageFull`.  This
61    /// is useful for testing on top of storage that does not implement its own
62    /// quota mechanism.
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub storage_mb_max: Option<u64>,
65
66    /// Attempt to print a stack trace on stack overflow.
67    ///
68    /// To be used for debugging only; do not enable in production.
69    // NOTE: this flag is handled manually in `adapters/src/server.rs` before
70    // parsing DevTweaks. If the name or type of this field changes, make sure to
71    // adjust `server.rs` accordingly.
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub stack_overflow_backtrace: Option<bool>,
74
75    /// Controls the maximal number of records output by splitter operators
76    /// (joins, distinct, aggregation, rolling window and group operators) at
77    /// each step.
78    ///
79    /// The default value is 10,000 records.
80    // TODO: It would be better if the value were denominated in bytes rather
81    // than records, and if it were configurable per-operator.
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub splitter_chunk_size_records: Option<u64>,
84
85    /// Enable adaptive joins.
86    ///
87    /// Adaptive joins dynamically change their partitioning policy to avoid skew.
88    ///
89    /// Adaptive joins are disabled by default.
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub adaptive_joins: Option<bool>,
92
93    /// Evict eagerly from buffer caches as files get deleted.
94    ///
95    /// This is an optimization that drops files from
96    /// the cache as soon as they are deleted.
97    ///
98    /// It has unknown (no?) performance benefits from what I can tell.
99    ///
100    /// Historically it made sense to do this for two reasons:
101    /// a) we know with 100% guarantee that the file won't ever be
102    ///    read again.
103    /// b) we could do this in O(logn) time with the LRU cache.
104    ///    This is no longer true for s3-fifo where it is O(n).
105    ///
106    /// If the eviction is expensive, (many small objects in the cache)
107    /// this can cause a regression.
108    ///
109    /// New default disables this behavior by making it false.
110    ///
111    /// If this doesn't cause regression we will remove this option
112    /// in the future.
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub eager_evict: Option<bool>,
115
116    /// The minimum relative improvement threshold for the join balancer.
117    ///
118    /// The join balancer is a component that dynamically chooses an optimal
119    /// partitioning policy for adaptive join operators.  This parameter
120    /// prevents the join balancer from making changes to the partitioning
121    /// policy if the improvement is not significant, since the overhead of such
122    /// rebalancing, especially when performed frequently, can exceed the
123    /// benefits.
124    ///
125    /// A rebalancing is considered significant if the relative estimated
126    /// improvement for the cluster of joins where the rebalancing is applied is
127    /// at least this threshold.
128    ///
129    /// A rebalancing is applied if both this threshold and
130    /// `balancer_min_absolute_improvement_threshold` are met.
131    ///
132    /// The default value is 1.2.
133    #[serde(skip_serializing_if = "Option::is_none")]
134    #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
135    pub balancer_min_relative_improvement_threshold: Option<f64>,
136
137    /// The minimum absolute improvement threshold for the balancer.
138    ///
139    /// The join balancer is a component that dynamically chooses an optimal
140    /// partitioning policy for adaptive join operators.  This parameter
141    /// prevents the join balancer from making changes to the partitioning
142    /// policy if the improvement is not significant, since the overhead of such
143    /// rebalancing, especially when performed frequently, can exceed the
144    /// benefits.
145    ///
146    /// A rebalancing is considered significant if the absolute estimated
147    /// improvement for the cluster of joins where the rebalancing is applied is
148    /// at least this threshold. The cost model used by the balancer is based on
149    /// the number of records in the largest partition of a collection.
150    ///
151    /// A rebalancing is applied if both this threshold and
152    /// `balancer_min_relative_improvement_threshold` are met.
153    ///
154    /// The default value is 10,000.
155    #[serde(skip_serializing_if = "Option::is_none")]
156    pub balancer_min_absolute_improvement_threshold: Option<u64>,
157
158    /// Factor that discourages the use of the Balance policy in a perfectly balanced collection.
159    ///
160    /// Assuming a perfectly balanced key distribution, the Balance policy is slightly less efficient than Shard,
161    /// since it requires computing the hash of the entire key/value pair. This factor discourages the use of this policy
162    /// if the skew is `<balancer_balance_tax`.
163    ///
164    /// The default value is 1.1.
165    #[serde(skip_serializing_if = "Option::is_none")]
166    #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
167    pub balancer_balance_tax: Option<f64>,
168
169    /// The balancer threshold for checking for an improved partitioning policy for a stream.
170    ///
171    /// Finding a good partitioning policy for a circuit involves solving an optimization problem,
172    /// which can be relatively expensive. Instead of doing this on every step, the balancer only
173    /// checks for an improved partitioning policy if the key distribution of a stream has changed
174    /// significantly since the current solution was computed.  Specifically, it only kicks in when
175    /// the size of at least one shard of at least one stream in the cluster has changed by more than
176    /// this threshold.
177    ///
178    /// The default value is 0.1.
179    #[serde(skip_serializing_if = "Option::is_none")]
180    #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
181    pub balancer_key_distribution_refresh_threshold: Option<f64>,
182
183    /// False-positive rate for Bloom filters on batches on storage, as a
184    /// fraction f, where 0 < f < 1.
185    ///
186    /// The false-positive rate trades off between the amount of memory used by
187    /// Bloom filters and how frequently storage needs to be searched for keys
188    /// that are not actually present.  Typical false-positive rates and their
189    /// corresponding memory costs are:
190    ///
191    /// - 0.1: 4.8 bits per key
192    /// - 0.01: 9.6 bits per key
193    /// - 0.001: 14.4 bits per key
194    /// - 0.0001: 19.2 bits per key (default)
195    ///
196    /// Values outside the valid range, such as 0.0, disable Bloom filters.
197    #[serde(skip_serializing_if = "Option::is_none")]
198    #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
199    pub bloom_false_positive_rate: Option<f64>,
200
201    /// Whether file-backed batches may use roaring membership filters when the
202    /// key type supports them.
203    #[serde(skip_serializing_if = "Option::is_none")]
204    pub enable_roaring: Option<bool>,
205
206    /// Maximum batch size in records for level 0 merges.
207    #[serde(skip_serializing_if = "Option::is_none")]
208    pub max_level0_batch_size_records: Option<u16>,
209
210    /// The number of merger threads.
211    ///
212    /// The default value is equal to the number of worker threads.
213    #[serde(skip_serializing_if = "Option::is_none")]
214    pub merger_threads: Option<u16>,
215
216    /// Additional bias the merger assigns to records with negative weights
217    /// (retractions) to promote them to higher levels of the LSM tree sooner.
218    ///
219    /// Reasonable values for this parameter are in the range [0, 10].
220    ///
221    /// The default value is 0, which means that retractions are not given
222    /// any additional bias.
223    #[serde(skip_serializing_if = "Option::is_none")]
224    pub negative_weight_multiplier: Option<u16>,
225
226    /// Don't automatically start a transaction for every step.
227    #[serde(skip_serializing_if = "Option::is_none")]
228    pub disable_auto_transaction: Option<bool>,
229
230    /// Override the timestamp returned by SQL `NOW()` at pipeline start.
231    ///
232    /// When set, the clock connector anchors `NOW()` to this RFC 3339
233    /// timestamp the first time the pipeline starts and advances at
234    /// wall-clock cadence from there:
235    /// `NOW() = now_offset + (wall_clock - wall_clock_at_start)`.
236    ///
237    /// Any RFC 3339 timestamp parseable by `chrono::DateTime<Utc>` is
238    /// accepted (years `0001` through `9999`), in the past or future
239    /// relative to wall clock.
240    ///
241    /// This is a testing knob for queries that depend on `NOW()`.
242    ///
243    /// On resume the clock continues from the last journaled `NOW()`;
244    /// `now_offset`'s value is honored only on a fresh start:
245    ///
246    /// | Initial run | Resume from checkpoint | Post-replay `NOW()` |
247    /// |---|---|---|
248    /// | no offset | no offset | wall clock (unchanged) |
249    /// | offset    | offset    | wall-clock pace from the last journaled value; the new offset value is ignored |
250    /// | offset    | no offset | jumps to wall clock (explicit opt-out of the anchor) |
251    /// | no offset | offset    | wall-clock pace from the last journaled value; the new offset value is ignored |
252    #[serde(skip_serializing_if = "Option::is_none")]
253    pub now_offset: Option<DateTime<Utc>>,
254
255    /// Drive `NOW()` from an external HTTP endpoint instead of wall clock.
256    ///
257    /// When `true`, the clock connector emits one initial tick (using
258    /// `now_offset` if set, otherwise wall clock) and then holds that
259    /// value.  Subsequent calls to `POST /clock/advance` move `NOW()`
260    /// forward by the requested delta.  Negative deltas are rejected;
261    /// the clock is forward-only.
262    #[serde(skip_serializing_if = "Option::is_none")]
263    pub now_http_driven: Option<bool>,
264
265    /// Enable streaming exchange.
266    ///
267    /// `false`
268    #[serde(skip_serializing_if = "Option::is_none")]
269    pub streaming_exchange: Option<bool>,
270
271    /// Options not understood by this particular version.
272    ///
273    /// This allows the pipeline manager to take options that a custom or old
274    /// runtime version accepts.
275    #[serde(flatten)]
276    pub other_options: BTreeMap<String, serde_json::Value>,
277}
278
279impl DevTweaks {
280    pub fn buffer_cache_strategy(&self) -> BufferCacheStrategy {
281        self.buffer_cache_strategy.unwrap_or_default()
282    }
283    pub fn buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
284        self.buffer_cache_allocation_strategy.unwrap_or_default()
285    }
286    pub fn effective_buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
287        match self.buffer_cache_strategy() {
288            BufferCacheStrategy::S3Fifo => self.buffer_cache_allocation_strategy(),
289            BufferCacheStrategy::Lru => BufferCacheAllocationStrategy::PerThread,
290        }
291    }
292    pub fn fetch_join(&self) -> bool {
293        self.fetch_join.unwrap_or(false)
294    }
295    pub fn fetch_distinct(&self) -> bool {
296        self.fetch_distinct.unwrap_or(false)
297    }
298    pub fn merger(&self) -> MergerType {
299        self.merger.unwrap_or_default()
300    }
301    pub fn stack_overflow_backtrace(&self) -> bool {
302        self.stack_overflow_backtrace.unwrap_or(false)
303    }
304    pub fn splitter_chunk_size_records(&self) -> u64 {
305        self.splitter_chunk_size_records.unwrap_or(10_000)
306    }
307    pub fn adaptive_joins(&self) -> bool {
308        self.adaptive_joins.unwrap_or(false)
309    }
310    pub fn balancer_min_relative_improvement_threshold(&self) -> f64 {
311        self.balancer_min_relative_improvement_threshold
312            .unwrap_or(1.2)
313    }
314    pub fn balancer_min_absolute_improvement_threshold(&self) -> u64 {
315        self.balancer_min_absolute_improvement_threshold
316            .unwrap_or(10_000)
317    }
318    pub fn balancer_balance_tax(&self) -> f64 {
319        self.balancer_balance_tax.unwrap_or(1.1)
320    }
321    pub fn balancer_key_distribution_refresh_threshold(&self) -> f64 {
322        self.balancer_key_distribution_refresh_threshold
323            .unwrap_or(0.1)
324    }
325    pub fn bloom_false_positive_rate(&self) -> f64 {
326        self.bloom_false_positive_rate.unwrap_or(0.0001)
327    }
328    pub fn enable_roaring(&self) -> bool {
329        // Roaring is enabled by default, but `enable_roaring = false` remains
330        // available as a kill switch while the feature is still being tuned.
331        self.enable_roaring.unwrap_or(true)
332    }
333    pub fn negative_weight_multiplier(&self) -> u16 {
334        self.negative_weight_multiplier.unwrap_or(0)
335    }
336
337    pub fn disable_auto_transaction(&self) -> bool {
338        self.disable_auto_transaction.unwrap_or(false)
339    }
340
341    /// Configured `now_offset` as milliseconds since the Unix epoch,
342    /// or `None` if no override is set.
343    pub fn now_offset_ms(&self) -> Option<i64> {
344        self.now_offset.map(|target| target.timestamp_millis())
345    }
346
347    pub fn now_http_driven(&self) -> bool {
348        self.now_http_driven.unwrap_or(false)
349    }
350
351    pub fn streaming_exchange(&self) -> bool {
352        self.streaming_exchange.unwrap_or(true)
353    }
354}
355
356/// Selects which eviction strategy backs a cache instance.
357#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
358#[serde(rename_all = "snake_case")]
359pub enum BufferCacheStrategy {
360    /// Use the sharded S3-FIFO cache backed by `quick_cache`.
361    #[default]
362    S3Fifo,
363
364    /// Use the mutex-protected weighted LRU cache.
365    Lru,
366}
367
368/// Controls how caches are shared across a foreground/background worker pair.
369#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
370#[serde(rename_all = "snake_case")]
371pub enum BufferCacheAllocationStrategy {
372    /// Share one cache across a foreground/background worker pair.
373    #[default]
374    SharedPerWorkerPair,
375
376    /// Create a separate cache for each foreground/background thread.
377    PerThread,
378
379    /// Share one cache across all foreground/background threads.
380    Global,
381}
382
383/// Which merger to use.
384#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
385#[serde(rename_all = "snake_case")]
386pub enum MergerType {
387    /// Newer merger, which should be faster for high-latency storage, such as
388    /// object storage, but it likely needs tuning.
389    PushMerger,
390
391    /// The old standby, with known performance.
392    #[default]
393    ListMerger,
394}
395
396#[cfg(test)]
397mod tests {
398    use serde_json::json;
399
400    use crate::config::{PipelineConfig, RuntimeConfig};
401
402    use super::*;
403
404    /// Regression test: `Option<f64>` fields inside `DevTweaks` must
405    /// survive a JSON-string round-trip through `PipelineConfig`, which
406    /// uses `#[serde(flatten)]` on `RuntimeConfig`. With `serde_json`'s
407    /// `arbitrary_precision` feature enabled, the serde `Content` buffer
408    /// represents numbers as maps, which breaks plain `f64`
409    /// deserialization (serde-rs/json#1157). The `serde_via_value`
410    /// workaround on each `Option<f64>` field fixes this.
411    #[test]
412    fn dev_tweaks_f64_roundtrip_through_pipeline_config() {
413        let rc = RuntimeConfig {
414            dev_tweaks: DevTweaks {
415                bloom_false_positive_rate: Some(0.0),
416                balancer_balance_tax: Some(1.1),
417                balancer_min_relative_improvement_threshold: Some(1.2),
418                balancer_key_distribution_refresh_threshold: Some(0.1),
419                ..Default::default()
420            },
421            ..Default::default()
422        };
423        let pc = PipelineConfig {
424            global: rc,
425            multihost: None,
426            name: Some("test-pipeline".into()),
427            given_name: None,
428            storage_config: None,
429            secrets_dir: None,
430            inputs: Default::default(),
431            outputs: Default::default(),
432            program_ir: None,
433        };
434
435        // JSON string round-trip (the path the pipeline process takes).
436        let json = serde_json::to_string_pretty(&pc).unwrap();
437        let pc2: PipelineConfig = serde_json::from_str(&json)
438            .expect("JSON string round-trip of PipelineConfig with f64 dev_tweaks must succeed");
439        assert_eq!(pc2.global.dev_tweaks.bloom_false_positive_rate, Some(0.0));
440        assert_eq!(pc2.global.dev_tweaks.balancer_balance_tax, Some(1.1));
441        assert_eq!(
442            pc2.global
443                .dev_tweaks
444                .balancer_min_relative_improvement_threshold,
445            Some(1.2)
446        );
447        assert_eq!(
448            pc2.global
449                .dev_tweaks
450                .balancer_key_distribution_refresh_threshold,
451            Some(0.1)
452        );
453
454        // serde_json::Value round-trip (the path the pipeline manager takes).
455        let value = serde_json::to_value(&pc).unwrap();
456        let pc3: PipelineConfig = serde_json::from_value(value)
457            .expect("Value round-trip of PipelineConfig with f64 dev_tweaks must succeed");
458        assert_eq!(pc3.global.dev_tweaks.bloom_false_positive_rate, Some(0.0));
459    }
460
461    #[test]
462    fn other_options() {
463        let dt =
464            serde_json::from_value::<DevTweaks>(json!({"xyzzy": 1.0, "foobar": {"key": "value"}}))
465                .unwrap();
466        assert_eq!(
467            &dt.other_options,
468            &BTreeMap::from_iter([
469                (String::from("xyzzy"), json!(1.0)),
470                (String::from("foobar"), json!({"key": "value"}))
471            ]),
472        );
473    }
474}