feldera_types/config/dev_tweaks.rs
1use std::collections::BTreeMap;
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use utoipa::ToSchema;
6
7/// Optional settings for tweaking Feldera internals.
8///
9/// These settings reflect experiments that may come and go and change from
10/// version to version. Users should not consider them to be stable.
11#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, ToSchema)]
12#[serde(default)]
13pub struct DevTweaks {
14 /// Buffer-cache implementation to use for storage reads.
15 ///
16 /// The default is `s3_fifo`.
17 #[serde(skip_serializing_if = "Option::is_none")]
18 pub buffer_cache_strategy: Option<BufferCacheStrategy>,
19
20 /// Override the number of buckets/shards used by sharded buffer caches.
21 ///
22 /// This only applies when `buffer_cache_strategy = "s3_fifo"`. Values are
23 /// rounded up to the next power of two because the current implementation
24 /// shards by `hash(key) & (n - 1)`.
25 #[serde(skip_serializing_if = "Option::is_none")]
26 pub buffer_max_buckets: Option<usize>,
27
28 /// How S3-FIFO caches are assigned to foreground/background workers.
29 ///
30 /// This only applies when `buffer_cache_strategy = "s3_fifo"`. The
31 /// default is `shared_per_worker_pair`; LRU always uses `per_thread`.
32 #[serde(skip_serializing_if = "Option::is_none")]
33 pub buffer_cache_allocation_strategy: Option<BufferCacheAllocationStrategy>,
34
35 /// Target number of cached bytes retained in each `FBuf` slab size class.
36 ///
37 /// The default is 16 MiB.
38 #[serde(skip_serializing_if = "Option::is_none")]
39 pub fbuf_slab_bytes_per_class: Option<usize>,
40
41 /// Whether to asynchronously fetch keys needed for the join operator from
42 /// storage. Asynchronous fetching should be faster for high-latency
43 /// storage, such as object storage, but it could use excessive amounts of
44 /// memory if the number of keys fetched is very large.
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub fetch_join: Option<bool>,
47
48 /// Whether to asynchronously fetch keys needed for the distinct operator
49 /// from storage. Asynchronous fetching should be faster for high-latency
50 /// storage, such as object storage, but it could use excessive amounts of
51 /// memory if the number of keys fetched is very large.
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub fetch_distinct: Option<bool>,
54
55 /// Which merger to use.
56 #[serde(skip_serializing_if = "Option::is_none")]
57 pub merger: Option<MergerType>,
58
59 /// If set, the maximum amount of storage, in MiB, for the POSIX backend to
60 /// allow to be in use before failing all writes with `StorageFull`. This
61 /// is useful for testing on top of storage that does not implement its own
62 /// quota mechanism.
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub storage_mb_max: Option<u64>,
65
66 /// Attempt to print a stack trace on stack overflow.
67 ///
68 /// To be used for debugging only; do not enable in production.
69 // NOTE: this flag is handled manually in `adapters/src/server.rs` before
70 // parsing DevTweaks. If the name or type of this field changes, make sure to
71 // adjust `server.rs` accordingly.
72 #[serde(skip_serializing_if = "Option::is_none")]
73 pub stack_overflow_backtrace: Option<bool>,
74
75 /// Controls the maximal number of records output by splitter operators
76 /// (joins, distinct, aggregation, rolling window and group operators) at
77 /// each step.
78 ///
79 /// The default value is 10,000 records.
80 // TODO: It would be better if the value were denominated in bytes rather
81 // than records, and if it were configurable per-operator.
82 #[serde(skip_serializing_if = "Option::is_none")]
83 pub splitter_chunk_size_records: Option<u64>,
84
85 /// Enable adaptive joins.
86 ///
87 /// Adaptive joins dynamically change their partitioning policy to avoid skew.
88 ///
89 /// Adaptive joins are disabled by default.
90 #[serde(skip_serializing_if = "Option::is_none")]
91 pub adaptive_joins: Option<bool>,
92
93 /// Evict eagerly from buffer caches as files get deleted.
94 ///
95 /// This is an optimization that drops files from
96 /// the cache as soon as they are deleted.
97 ///
98 /// It has unknown (no?) performance benefits from what I can tell.
99 ///
100 /// Historically it made sense to do this for two reasons:
101 /// a) we know with 100% guarantee that the file won't ever be
102 /// read again.
103 /// b) we could do this in O(logn) time with the LRU cache.
104 /// This is no longer true for s3-fifo where it is O(n).
105 ///
106 /// If the eviction is expensive, (many small objects in the cache)
107 /// this can cause a regression.
108 ///
109 /// New default disables this behavior by making it false.
110 ///
111 /// If this doesn't cause regression we will remove this option
112 /// in the future.
113 #[serde(skip_serializing_if = "Option::is_none")]
114 pub eager_evict: Option<bool>,
115
116 /// The minimum relative improvement threshold for the join balancer.
117 ///
118 /// The join balancer is a component that dynamically chooses an optimal
119 /// partitioning policy for adaptive join operators. This parameter
120 /// prevents the join balancer from making changes to the partitioning
121 /// policy if the improvement is not significant, since the overhead of such
122 /// rebalancing, especially when performed frequently, can exceed the
123 /// benefits.
124 ///
125 /// A rebalancing is considered significant if the relative estimated
126 /// improvement for the cluster of joins where the rebalancing is applied is
127 /// at least this threshold.
128 ///
129 /// A rebalancing is applied if both this threshold and
130 /// `balancer_min_absolute_improvement_threshold` are met.
131 ///
132 /// The default value is 1.2.
133 #[serde(skip_serializing_if = "Option::is_none")]
134 #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
135 pub balancer_min_relative_improvement_threshold: Option<f64>,
136
137 /// The minimum absolute improvement threshold for the balancer.
138 ///
139 /// The join balancer is a component that dynamically chooses an optimal
140 /// partitioning policy for adaptive join operators. This parameter
141 /// prevents the join balancer from making changes to the partitioning
142 /// policy if the improvement is not significant, since the overhead of such
143 /// rebalancing, especially when performed frequently, can exceed the
144 /// benefits.
145 ///
146 /// A rebalancing is considered significant if the absolute estimated
147 /// improvement for the cluster of joins where the rebalancing is applied is
148 /// at least this threshold. The cost model used by the balancer is based on
149 /// the number of records in the largest partition of a collection.
150 ///
151 /// A rebalancing is applied if both this threshold and
152 /// `balancer_min_relative_improvement_threshold` are met.
153 ///
154 /// The default value is 10,000.
155 #[serde(skip_serializing_if = "Option::is_none")]
156 pub balancer_min_absolute_improvement_threshold: Option<u64>,
157
158 /// Factor that discourages the use of the Balance policy in a perfectly balanced collection.
159 ///
160 /// Assuming a perfectly balanced key distribution, the Balance policy is slightly less efficient than Shard,
161 /// since it requires computing the hash of the entire key/value pair. This factor discourages the use of this policy
162 /// if the skew is `<balancer_balance_tax`.
163 ///
164 /// The default value is 1.1.
165 #[serde(skip_serializing_if = "Option::is_none")]
166 #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
167 pub balancer_balance_tax: Option<f64>,
168
169 /// The balancer threshold for checking for an improved partitioning policy for a stream.
170 ///
171 /// Finding a good partitioning policy for a circuit involves solving an optimization problem,
172 /// which can be relatively expensive. Instead of doing this on every step, the balancer only
173 /// checks for an improved partitioning policy if the key distribution of a stream has changed
174 /// significantly since the current solution was computed. Specifically, it only kicks in when
175 /// the size of at least one shard of at least one stream in the cluster has changed by more than
176 /// this threshold.
177 ///
178 /// The default value is 0.1.
179 #[serde(skip_serializing_if = "Option::is_none")]
180 #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
181 pub balancer_key_distribution_refresh_threshold: Option<f64>,
182
183 /// False-positive rate for Bloom filters on batches on storage, as a
184 /// fraction f, where 0 < f < 1.
185 ///
186 /// The false-positive rate trades off between the amount of memory used by
187 /// Bloom filters and how frequently storage needs to be searched for keys
188 /// that are not actually present. Typical false-positive rates and their
189 /// corresponding memory costs are:
190 ///
191 /// - 0.1: 4.8 bits per key
192 /// - 0.01: 9.6 bits per key
193 /// - 0.001: 14.4 bits per key
194 /// - 0.0001: 19.2 bits per key (default)
195 ///
196 /// Values outside the valid range, such as 0.0, disable Bloom filters.
197 #[serde(skip_serializing_if = "Option::is_none")]
198 #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")]
199 pub bloom_false_positive_rate: Option<f64>,
200
201 /// Whether file-backed batches may use roaring membership filters when the
202 /// key type supports them.
203 #[serde(skip_serializing_if = "Option::is_none")]
204 pub enable_roaring: Option<bool>,
205
206 /// Maximum batch size in records for level 0 merges.
207 #[serde(skip_serializing_if = "Option::is_none")]
208 pub max_level0_batch_size_records: Option<u16>,
209
210 /// The number of merger threads.
211 ///
212 /// The default value is equal to the number of worker threads.
213 #[serde(skip_serializing_if = "Option::is_none")]
214 pub merger_threads: Option<u16>,
215
216 /// Additional bias the merger assigns to records with negative weights
217 /// (retractions) to promote them to higher levels of the LSM tree sooner.
218 ///
219 /// Reasonable values for this parameter are in the range [0, 10].
220 ///
221 /// The default value is 0, which means that retractions are not given
222 /// any additional bias.
223 #[serde(skip_serializing_if = "Option::is_none")]
224 pub negative_weight_multiplier: Option<u16>,
225
226 /// Don't automatically start a transaction for every step.
227 #[serde(skip_serializing_if = "Option::is_none")]
228 pub disable_auto_transaction: Option<bool>,
229
230 /// Override the timestamp returned by SQL `NOW()` at pipeline start.
231 ///
232 /// When set, the clock connector anchors `NOW()` to this RFC 3339
233 /// timestamp the first time the pipeline starts and advances at
234 /// wall-clock cadence from there:
235 /// `NOW() = now_offset + (wall_clock - wall_clock_at_start)`.
236 ///
237 /// Any RFC 3339 timestamp parseable by `chrono::DateTime<Utc>` is
238 /// accepted (years `0001` through `9999`), in the past or future
239 /// relative to wall clock.
240 ///
241 /// This is a testing knob for queries that depend on `NOW()`.
242 ///
243 /// On resume the clock continues from the last journaled `NOW()`;
244 /// `now_offset`'s value is honored only on a fresh start:
245 ///
246 /// | Initial run | Resume from checkpoint | Post-replay `NOW()` |
247 /// |---|---|---|
248 /// | no offset | no offset | wall clock (unchanged) |
249 /// | offset | offset | wall-clock pace from the last journaled value; the new offset value is ignored |
250 /// | offset | no offset | jumps to wall clock (explicit opt-out of the anchor) |
251 /// | no offset | offset | wall-clock pace from the last journaled value; the new offset value is ignored |
252 #[serde(skip_serializing_if = "Option::is_none")]
253 pub now_offset: Option<DateTime<Utc>>,
254
255 /// Drive `NOW()` from an external HTTP endpoint instead of wall clock.
256 ///
257 /// When `true`, the clock connector emits one initial tick (using
258 /// `now_offset` if set, otherwise wall clock) and then holds that
259 /// value. Subsequent calls to `POST /clock/advance` move `NOW()`
260 /// forward by the requested delta. Negative deltas are rejected;
261 /// the clock is forward-only.
262 #[serde(skip_serializing_if = "Option::is_none")]
263 pub now_http_driven: Option<bool>,
264
265 /// Options not understood by this particular version.
266 ///
267 /// This allows the pipeline manager to take options that a custom or old
268 /// runtime version accepts.
269 #[serde(flatten)]
270 pub other_options: BTreeMap<String, serde_json::Value>,
271}
272
273impl DevTweaks {
274 pub fn buffer_cache_strategy(&self) -> BufferCacheStrategy {
275 self.buffer_cache_strategy.unwrap_or_default()
276 }
277 pub fn buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
278 self.buffer_cache_allocation_strategy.unwrap_or_default()
279 }
280 pub fn effective_buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
281 match self.buffer_cache_strategy() {
282 BufferCacheStrategy::S3Fifo => self.buffer_cache_allocation_strategy(),
283 BufferCacheStrategy::Lru => BufferCacheAllocationStrategy::PerThread,
284 }
285 }
286 pub fn fetch_join(&self) -> bool {
287 self.fetch_join.unwrap_or(false)
288 }
289 pub fn fetch_distinct(&self) -> bool {
290 self.fetch_distinct.unwrap_or(false)
291 }
292 pub fn merger(&self) -> MergerType {
293 self.merger.unwrap_or_default()
294 }
295 pub fn stack_overflow_backtrace(&self) -> bool {
296 self.stack_overflow_backtrace.unwrap_or(false)
297 }
298 pub fn splitter_chunk_size_records(&self) -> u64 {
299 self.splitter_chunk_size_records.unwrap_or(10_000)
300 }
301 pub fn adaptive_joins(&self) -> bool {
302 self.adaptive_joins.unwrap_or(false)
303 }
304 pub fn balancer_min_relative_improvement_threshold(&self) -> f64 {
305 self.balancer_min_relative_improvement_threshold
306 .unwrap_or(1.2)
307 }
308 pub fn balancer_min_absolute_improvement_threshold(&self) -> u64 {
309 self.balancer_min_absolute_improvement_threshold
310 .unwrap_or(10_000)
311 }
312 pub fn balancer_balance_tax(&self) -> f64 {
313 self.balancer_balance_tax.unwrap_or(1.1)
314 }
315 pub fn balancer_key_distribution_refresh_threshold(&self) -> f64 {
316 self.balancer_key_distribution_refresh_threshold
317 .unwrap_or(0.1)
318 }
319 pub fn bloom_false_positive_rate(&self) -> f64 {
320 self.bloom_false_positive_rate.unwrap_or(0.0001)
321 }
322 pub fn enable_roaring(&self) -> bool {
323 // Roaring is enabled by default, but `enable_roaring = false` remains
324 // available as a kill switch while the feature is still being tuned.
325 self.enable_roaring.unwrap_or(true)
326 }
327 pub fn negative_weight_multiplier(&self) -> u16 {
328 self.negative_weight_multiplier.unwrap_or(0)
329 }
330
331 pub fn disable_auto_transaction(&self) -> bool {
332 self.disable_auto_transaction.unwrap_or(false)
333 }
334
335 /// Configured `now_offset` as milliseconds since the Unix epoch,
336 /// or `None` if no override is set.
337 pub fn now_offset_ms(&self) -> Option<i64> {
338 self.now_offset.map(|target| target.timestamp_millis())
339 }
340
341 pub fn now_http_driven(&self) -> bool {
342 self.now_http_driven.unwrap_or(false)
343 }
344}
345
346/// Selects which eviction strategy backs a cache instance.
347#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
348#[serde(rename_all = "snake_case")]
349pub enum BufferCacheStrategy {
350 /// Use the sharded S3-FIFO cache backed by `quick_cache`.
351 #[default]
352 S3Fifo,
353
354 /// Use the mutex-protected weighted LRU cache.
355 Lru,
356}
357
358/// Controls how caches are shared across a foreground/background worker pair.
359#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
360#[serde(rename_all = "snake_case")]
361pub enum BufferCacheAllocationStrategy {
362 /// Share one cache across a foreground/background worker pair.
363 #[default]
364 SharedPerWorkerPair,
365
366 /// Create a separate cache for each foreground/background thread.
367 PerThread,
368
369 /// Share one cache across all foreground/background threads.
370 Global,
371}
372
373/// Which merger to use.
374#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
375#[serde(rename_all = "snake_case")]
376pub enum MergerType {
377 /// Newer merger, which should be faster for high-latency storage, such as
378 /// object storage, but it likely needs tuning.
379 PushMerger,
380
381 /// The old standby, with known performance.
382 #[default]
383 ListMerger,
384}
385
386#[cfg(test)]
387mod tests {
388 use serde_json::json;
389
390 use crate::config::{PipelineConfig, RuntimeConfig};
391
392 use super::*;
393
394 /// Regression test: `Option<f64>` fields inside `DevTweaks` must
395 /// survive a JSON-string round-trip through `PipelineConfig`, which
396 /// uses `#[serde(flatten)]` on `RuntimeConfig`. With `serde_json`'s
397 /// `arbitrary_precision` feature enabled, the serde `Content` buffer
398 /// represents numbers as maps, which breaks plain `f64`
399 /// deserialization (serde-rs/json#1157). The `serde_via_value`
400 /// workaround on each `Option<f64>` field fixes this.
401 #[test]
402 fn dev_tweaks_f64_roundtrip_through_pipeline_config() {
403 let rc = RuntimeConfig {
404 dev_tweaks: DevTweaks {
405 bloom_false_positive_rate: Some(0.0),
406 balancer_balance_tax: Some(1.1),
407 balancer_min_relative_improvement_threshold: Some(1.2),
408 balancer_key_distribution_refresh_threshold: Some(0.1),
409 ..Default::default()
410 },
411 ..Default::default()
412 };
413 let pc = PipelineConfig {
414 global: rc,
415 multihost: None,
416 name: Some("test-pipeline".into()),
417 given_name: None,
418 storage_config: None,
419 secrets_dir: None,
420 inputs: Default::default(),
421 outputs: Default::default(),
422 program_ir: None,
423 };
424
425 // JSON string round-trip (the path the pipeline process takes).
426 let json = serde_json::to_string_pretty(&pc).unwrap();
427 let pc2: PipelineConfig = serde_json::from_str(&json)
428 .expect("JSON string round-trip of PipelineConfig with f64 dev_tweaks must succeed");
429 assert_eq!(pc2.global.dev_tweaks.bloom_false_positive_rate, Some(0.0));
430 assert_eq!(pc2.global.dev_tweaks.balancer_balance_tax, Some(1.1));
431 assert_eq!(
432 pc2.global
433 .dev_tweaks
434 .balancer_min_relative_improvement_threshold,
435 Some(1.2)
436 );
437 assert_eq!(
438 pc2.global
439 .dev_tweaks
440 .balancer_key_distribution_refresh_threshold,
441 Some(0.1)
442 );
443
444 // serde_json::Value round-trip (the path the pipeline manager takes).
445 let value = serde_json::to_value(&pc).unwrap();
446 let pc3: PipelineConfig = serde_json::from_value(value)
447 .expect("Value round-trip of PipelineConfig with f64 dev_tweaks must succeed");
448 assert_eq!(pc3.global.dev_tweaks.bloom_false_positive_rate, Some(0.0));
449 }
450
451 #[test]
452 fn other_options() {
453 let dt =
454 serde_json::from_value::<DevTweaks>(json!({"xyzzy": 1.0, "foobar": {"key": "value"}}))
455 .unwrap();
456 assert_eq!(
457 &dt.other_options,
458 &BTreeMap::from_iter([
459 (String::from("xyzzy"), json!(1.0)),
460 (String::from("foobar"), json!({"key": "value"}))
461 ]),
462 );
463 }
464}