feldera_types/config/dev_tweaks.rs
1use std::collections::BTreeMap;
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use utoipa::ToSchema;
6
7/// Optional settings for tweaking Feldera internals.
8///
9/// These settings reflect experiments that may come and go and change from
10/// version to version. Users should not consider them to be stable.
11#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, ToSchema)]
12#[serde(default)]
13pub struct DevTweaks {
14 /// Buffer-cache implementation to use for storage reads.
15 ///
16 /// The default is `s3_fifo`.
17 #[serde(skip_serializing_if = "Option::is_none")]
18 pub buffer_cache_strategy: Option<BufferCacheStrategy>,
19
20 /// Override the number of buckets/shards used by sharded buffer caches.
21 ///
22 /// This only applies when `buffer_cache_strategy = "s3_fifo"`. Values are
23 /// rounded up to the next power of two because the current implementation
24 /// shards by `hash(key) & (n - 1)`.
25 #[serde(skip_serializing_if = "Option::is_none")]
26 pub buffer_max_buckets: Option<usize>,
27
28 /// How S3-FIFO caches are assigned to foreground/background workers.
29 ///
30 /// This only applies when `buffer_cache_strategy = "s3_fifo"`. The
31 /// default is `shared_per_worker_pair`; LRU always uses `per_thread`.
32 #[serde(skip_serializing_if = "Option::is_none")]
33 pub buffer_cache_allocation_strategy: Option<BufferCacheAllocationStrategy>,
34
35 /// Target number of cached bytes retained in each `FBuf` slab size class.
36 ///
37 /// The default is 16 MiB.
38 #[serde(skip_serializing_if = "Option::is_none")]
39 pub fbuf_slab_bytes_per_class: Option<usize>,
40
41 /// Whether to asynchronously fetch keys needed for the join operator from
42 /// storage. Asynchronous fetching should be faster for high-latency
43 /// storage, such as object storage, but it could use excessive amounts of
44 /// memory if the number of keys fetched is very large.
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub fetch_join: Option<bool>,
47
48 /// Whether to asynchronously fetch keys needed for the distinct operator
49 /// from storage. Asynchronous fetching should be faster for high-latency
50 /// storage, such as object storage, but it could use excessive amounts of
51 /// memory if the number of keys fetched is very large.
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub fetch_distinct: Option<bool>,
54
55 /// Which merger to use.
56 #[serde(skip_serializing_if = "Option::is_none")]
57 pub merger: Option<MergerType>,
58
59 /// If set, the maximum amount of storage, in MiB, for the POSIX backend to
60 /// allow to be in use before failing all writes with `StorageFull`. This
61 /// is useful for testing on top of storage that does not implement its own
62 /// quota mechanism.
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub storage_mb_max: Option<u64>,
65
66 /// Attempt to print a stack trace on stack overflow.
67 ///
68 /// To be used for debugging only; do not enable in production.
69 // NOTE: this flag is handled manually in `adapters/src/server.rs` before
70 // parsing DevTweaks. If the name or type of this field changes, make sure to
71 // adjust `server.rs` accordingly.
72 #[serde(skip_serializing_if = "Option::is_none")]
73 pub stack_overflow_backtrace: Option<bool>,
74
75 /// Controls the maximal number of records output by splitter operators
76 /// (joins, distinct, aggregation, rolling window and group operators) at
77 /// each step.
78 ///
79 /// The default value is 10,000 records.
80 // TODO: It would be better if the value were denominated in bytes rather
81 // than records, and if it were configurable per-operator.
82 #[serde(skip_serializing_if = "Option::is_none")]
83 pub splitter_chunk_size_records: Option<u64>,
84
85 /// Enable adaptive joins.
86 ///
87 /// Adaptive joins dynamically change their partitioning policy to avoid skew.
88 ///
89 /// Adaptive joins are disabled by default.
90 #[serde(skip_serializing_if = "Option::is_none")]
91 pub adaptive_joins: Option<bool>,
92
93 /// Evict eagerly from buffer caches as files get deleted.
94 ///
95 /// This is an optimization that drops files from
96 /// the cache as soon as they are deleted.
97 ///
98 /// It has unknown (no?) performance benefits from what I can tell.
99 ///
100 /// Historically it made sense to do this for two reasons:
101 /// a) we know with 100% guarantee that the file won't ever be
102 /// read again.
103 /// b) we could do this in O(logn) time with the LRU cache.
104 /// This is no longer true for s3-fifo where it is O(n).
105 ///
106 /// If the eviction is expensive, (many small objects in the cache)
107 /// this can cause a regression.
108 ///
109 /// New default disables this behavior by making it false.
110 ///
111 /// If this doesn't cause regression we will remove this option
112 /// in the future.
113 #[serde(skip_serializing_if = "Option::is_none")]
114 pub eager_evict: Option<bool>,
115
116 /// The minimum relative improvement threshold for the join balancer.
117 ///
118 /// The join balancer is a component that dynamically chooses an optimal
119 /// partitioning policy for adaptive join operators. This parameter
120 /// prevents the join balancer from making changes to the partitioning
121 /// policy if the improvement is not significant, since the overhead of such
122 /// rebalancing, especially when performed frequently, can exceed the
123 /// benefits.
124 ///
125 /// A rebalancing is considered significant if the relative estimated
126 /// improvement for the cluster of joins where the rebalancing is applied is
127 /// at least this threshold.
128 ///
129 /// A rebalancing is applied if both this threshold and
130 /// `balancer_min_absolute_improvement_threshold` are met.
131 ///
132 /// The default value is 1.2.
133 #[serde(skip_serializing_if = "Option::is_none")]
134 #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
135 pub balancer_min_relative_improvement_threshold: Option<f64>,
136
137 /// The minimum absolute improvement threshold for the balancer.
138 ///
139 /// The join balancer is a component that dynamically chooses an optimal
140 /// partitioning policy for adaptive join operators. This parameter
141 /// prevents the join balancer from making changes to the partitioning
142 /// policy if the improvement is not significant, since the overhead of such
143 /// rebalancing, especially when performed frequently, can exceed the
144 /// benefits.
145 ///
146 /// A rebalancing is considered significant if the absolute estimated
147 /// improvement for the cluster of joins where the rebalancing is applied is
148 /// at least this threshold. The cost model used by the balancer is based on
149 /// the number of records in the largest partition of a collection.
150 ///
151 /// A rebalancing is applied if both this threshold and
152 /// `balancer_min_relative_improvement_threshold` are met.
153 ///
154 /// The default value is 10,000.
155 #[serde(skip_serializing_if = "Option::is_none")]
156 pub balancer_min_absolute_improvement_threshold: Option<u64>,
157
158 /// Factor that discourages the use of the Balance policy in a perfectly balanced collection.
159 ///
160 /// Assuming a perfectly balanced key distribution, the Balance policy is slightly less efficient than Shard,
161 /// since it requires computing the hash of the entire key/value pair. This factor discourages the use of this policy
162 /// if the skew is `<balancer_balance_tax`.
163 ///
164 /// The default value is 1.1.
165 #[serde(skip_serializing_if = "Option::is_none")]
166 #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
167 pub balancer_balance_tax: Option<f64>,
168
169 /// The balancer threshold for checking for an improved partitioning policy for a stream.
170 ///
171 /// Finding a good partitioning policy for a circuit involves solving an optimization problem,
172 /// which can be relatively expensive. Instead of doing this on every step, the balancer only
173 /// checks for an improved partitioning policy if the key distribution of a stream has changed
174 /// significantly since the current solution was computed. Specifically, it only kicks in when
175 /// the size of at least one shard of at least one stream in the cluster has changed by more than
176 /// this threshold.
177 ///
178 /// The default value is 0.1.
179 #[serde(skip_serializing_if = "Option::is_none")]
180 #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
181 pub balancer_key_distribution_refresh_threshold: Option<f64>,
182
183 /// False-positive rate for Bloom filters on batches on storage, as a
184 /// fraction f, where 0 < f < 1.
185 ///
186 /// The false-positive rate trades off between the amount of memory used by
187 /// Bloom filters and how frequently storage needs to be searched for keys
188 /// that are not actually present. Typical false-positive rates and their
189 /// corresponding memory costs are:
190 ///
191 /// - 0.1: 4.8 bits per key
192 /// - 0.01: 9.6 bits per key
193 /// - 0.001: 14.4 bits per key
194 /// - 0.0001: 19.2 bits per key (default)
195 ///
196 /// Values outside the valid range, such as 0.0, disable Bloom filters.
197 #[serde(skip_serializing_if = "Option::is_none")]
198 #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
199 pub bloom_false_positive_rate: Option<f64>,
200
201 /// Whether file-backed batches may use roaring membership filters when the
202 /// key type supports them.
203 #[serde(skip_serializing_if = "Option::is_none")]
204 pub enable_roaring: Option<bool>,
205
206 /// Maximum batch size in records for level 0 merges.
207 #[serde(skip_serializing_if = "Option::is_none")]
208 pub max_level0_batch_size_records: Option<u16>,
209
210 /// The number of merger threads.
211 ///
212 /// The default value is equal to the number of worker threads.
213 #[serde(skip_serializing_if = "Option::is_none")]
214 pub merger_threads: Option<u16>,
215
216 /// Additional bias the merger assigns to records with negative weights
217 /// (retractions) to promote them to higher levels of the LSM tree sooner.
218 ///
219 /// Reasonable values for this parameter are in the range [0, 10].
220 ///
221 /// The default value is 0, which means that retractions are not given
222 /// any additional bias.
223 #[serde(skip_serializing_if = "Option::is_none")]
224 pub negative_weight_multiplier: Option<u16>,
225
226 /// Don't automatically start a transaction for every step.
227 #[serde(skip_serializing_if = "Option::is_none")]
228 pub disable_auto_transaction: Option<bool>,
229
230 /// Override the timestamp returned by SQL `NOW()` at pipeline start.
231 ///
232 /// When set, the clock connector anchors `NOW()` to this RFC 3339
233 /// timestamp the first time the pipeline starts and advances at
234 /// wall-clock cadence from there:
235 /// `NOW() = now_offset + (wall_clock - wall_clock_at_start)`.
236 ///
237 /// Any RFC 3339 timestamp parseable by `chrono::DateTime<Utc>` is
238 /// accepted (years `0001` through `9999`), in the past or future
239 /// relative to wall clock.
240 ///
241 /// This is a testing knob for queries that depend on `NOW()`.
242 ///
243 /// On resume the clock continues from the last journaled `NOW()`;
244 /// `now_offset`'s value is honored only on a fresh start:
245 ///
246 /// | Initial run | Resume from checkpoint | Post-replay `NOW()` |
247 /// |---|---|---|
248 /// | no offset | no offset | wall clock (unchanged) |
249 /// | offset | offset | wall-clock pace from the last journaled value; the new offset value is ignored |
250 /// | offset | no offset | jumps to wall clock (explicit opt-out of the anchor) |
251 /// | no offset | offset | wall-clock pace from the last journaled value; the new offset value is ignored |
252 #[serde(skip_serializing_if = "Option::is_none")]
253 pub now_offset: Option<DateTime<Utc>>,
254
255 /// Drive `NOW()` from an external HTTP endpoint instead of wall clock.
256 ///
257 /// When `true`, the clock connector emits one initial tick (using
258 /// `now_offset` if set, otherwise wall clock) and then holds that
259 /// value. Subsequent calls to `POST /clock/advance` move `NOW()`
260 /// forward by the requested delta. Negative deltas are rejected;
261 /// the clock is forward-only.
262 #[serde(skip_serializing_if = "Option::is_none")]
263 pub now_http_driven: Option<bool>,
264
265 /// Enable streaming exchange.
266 ///
267 /// `false`
268 #[serde(skip_serializing_if = "Option::is_none")]
269 pub streaming_exchange: Option<bool>,
270
271 /// Options not understood by this particular version.
272 ///
273 /// This allows the pipeline manager to take options that a custom or old
274 /// runtime version accepts.
275 #[serde(flatten)]
276 pub other_options: BTreeMap<String, serde_json::Value>,
277}
278
279impl DevTweaks {
280 pub fn buffer_cache_strategy(&self) -> BufferCacheStrategy {
281 self.buffer_cache_strategy.unwrap_or_default()
282 }
283 pub fn buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
284 self.buffer_cache_allocation_strategy.unwrap_or_default()
285 }
286 pub fn effective_buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
287 match self.buffer_cache_strategy() {
288 BufferCacheStrategy::S3Fifo => self.buffer_cache_allocation_strategy(),
289 BufferCacheStrategy::Lru => BufferCacheAllocationStrategy::PerThread,
290 }
291 }
292 pub fn fetch_join(&self) -> bool {
293 self.fetch_join.unwrap_or(false)
294 }
295 pub fn fetch_distinct(&self) -> bool {
296 self.fetch_distinct.unwrap_or(false)
297 }
298 pub fn merger(&self) -> MergerType {
299 self.merger.unwrap_or_default()
300 }
301 pub fn stack_overflow_backtrace(&self) -> bool {
302 self.stack_overflow_backtrace.unwrap_or(false)
303 }
304 pub fn splitter_chunk_size_records(&self) -> u64 {
305 self.splitter_chunk_size_records.unwrap_or(10_000)
306 }
307 pub fn adaptive_joins(&self) -> bool {
308 self.adaptive_joins.unwrap_or(false)
309 }
310 pub fn balancer_min_relative_improvement_threshold(&self) -> f64 {
311 self.balancer_min_relative_improvement_threshold
312 .unwrap_or(1.2)
313 }
314 pub fn balancer_min_absolute_improvement_threshold(&self) -> u64 {
315 self.balancer_min_absolute_improvement_threshold
316 .unwrap_or(10_000)
317 }
318 pub fn balancer_balance_tax(&self) -> f64 {
319 self.balancer_balance_tax.unwrap_or(1.1)
320 }
321 pub fn balancer_key_distribution_refresh_threshold(&self) -> f64 {
322 self.balancer_key_distribution_refresh_threshold
323 .unwrap_or(0.1)
324 }
325 pub fn bloom_false_positive_rate(&self) -> f64 {
326 self.bloom_false_positive_rate.unwrap_or(0.0001)
327 }
328 pub fn enable_roaring(&self) -> bool {
329 // Roaring is enabled by default, but `enable_roaring = false` remains
330 // available as a kill switch while the feature is still being tuned.
331 self.enable_roaring.unwrap_or(true)
332 }
333 pub fn negative_weight_multiplier(&self) -> u16 {
334 self.negative_weight_multiplier.unwrap_or(0)
335 }
336
337 pub fn disable_auto_transaction(&self) -> bool {
338 self.disable_auto_transaction.unwrap_or(false)
339 }
340
341 /// Configured `now_offset` as milliseconds since the Unix epoch,
342 /// or `None` if no override is set.
343 pub fn now_offset_ms(&self) -> Option<i64> {
344 self.now_offset.map(|target| target.timestamp_millis())
345 }
346
347 pub fn now_http_driven(&self) -> bool {
348 self.now_http_driven.unwrap_or(false)
349 }
350
351 pub fn streaming_exchange(&self) -> bool {
352 self.streaming_exchange.unwrap_or(true)
353 }
354}
355
356/// Selects which eviction strategy backs a cache instance.
357#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
358#[serde(rename_all = "snake_case")]
359pub enum BufferCacheStrategy {
360 /// Use the sharded S3-FIFO cache backed by `quick_cache`.
361 #[default]
362 S3Fifo,
363
364 /// Use the mutex-protected weighted LRU cache.
365 Lru,
366}
367
368/// Controls how caches are shared across a foreground/background worker pair.
369#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
370#[serde(rename_all = "snake_case")]
371pub enum BufferCacheAllocationStrategy {
372 /// Share one cache across a foreground/background worker pair.
373 #[default]
374 SharedPerWorkerPair,
375
376 /// Create a separate cache for each foreground/background thread.
377 PerThread,
378
379 /// Share one cache across all foreground/background threads.
380 Global,
381}
382
383/// Which merger to use.
384#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
385#[serde(rename_all = "snake_case")]
386pub enum MergerType {
387 /// Newer merger, which should be faster for high-latency storage, such as
388 /// object storage, but it likely needs tuning.
389 PushMerger,
390
391 /// The old standby, with known performance.
392 #[default]
393 ListMerger,
394}
395
396#[cfg(test)]
397mod tests {
398 use serde_json::json;
399
400 use crate::config::{PipelineConfig, RuntimeConfig};
401
402 use super::*;
403
404 /// Regression test: `Option<f64>` fields inside `DevTweaks` must
405 /// survive a JSON-string round-trip through `PipelineConfig`, which
406 /// uses `#[serde(flatten)]` on `RuntimeConfig`. With `serde_json`'s
407 /// `arbitrary_precision` feature enabled, the serde `Content` buffer
408 /// represents numbers as maps, which breaks plain `f64`
409 /// deserialization (serde-rs/json#1157). The `serde_via_value`
410 /// workaround on each `Option<f64>` field fixes this.
411 #[test]
412 fn dev_tweaks_f64_roundtrip_through_pipeline_config() {
413 let rc = RuntimeConfig {
414 dev_tweaks: DevTweaks {
415 bloom_false_positive_rate: Some(0.0),
416 balancer_balance_tax: Some(1.1),
417 balancer_min_relative_improvement_threshold: Some(1.2),
418 balancer_key_distribution_refresh_threshold: Some(0.1),
419 ..Default::default()
420 },
421 ..Default::default()
422 };
423 let pc = PipelineConfig {
424 global: rc,
425 multihost: None,
426 name: Some("test-pipeline".into()),
427 given_name: None,
428 storage_config: None,
429 secrets_dir: None,
430 inputs: Default::default(),
431 outputs: Default::default(),
432 program_ir: None,
433 };
434
435 // JSON string round-trip (the path the pipeline process takes).
436 let json = serde_json::to_string_pretty(&pc).unwrap();
437 let pc2: PipelineConfig = serde_json::from_str(&json)
438 .expect("JSON string round-trip of PipelineConfig with f64 dev_tweaks must succeed");
439 assert_eq!(pc2.global.dev_tweaks.bloom_false_positive_rate, Some(0.0));
440 assert_eq!(pc2.global.dev_tweaks.balancer_balance_tax, Some(1.1));
441 assert_eq!(
442 pc2.global
443 .dev_tweaks
444 .balancer_min_relative_improvement_threshold,
445 Some(1.2)
446 );
447 assert_eq!(
448 pc2.global
449 .dev_tweaks
450 .balancer_key_distribution_refresh_threshold,
451 Some(0.1)
452 );
453
454 // serde_json::Value round-trip (the path the pipeline manager takes).
455 let value = serde_json::to_value(&pc).unwrap();
456 let pc3: PipelineConfig = serde_json::from_value(value)
457 .expect("Value round-trip of PipelineConfig with f64 dev_tweaks must succeed");
458 assert_eq!(pc3.global.dev_tweaks.bloom_false_positive_rate, Some(0.0));
459 }
460
461 #[test]
462 fn other_options() {
463 let dt =
464 serde_json::from_value::<DevTweaks>(json!({"xyzzy": 1.0, "foobar": {"key": "value"}}))
465 .unwrap();
466 assert_eq!(
467 &dt.other_options,
468 &BTreeMap::from_iter([
469 (String::from("xyzzy"), json!(1.0)),
470 (String::from("foobar"), json!({"key": "value"}))
471 ]),
472 );
473 }
474}