Skip to main content

feldera_types/config/
dev_tweaks.rs

1use serde::{Deserialize, Serialize};
2use utoipa::ToSchema;
3
4/// Optional settings for tweaking Feldera internals.
5///
6/// These settings reflect experiments that may come and go and change from
7/// version to version.  Users should not consider them to be stable.
8#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, ToSchema)]
9#[serde(default)]
10pub struct DevTweaks {
11    /// Buffer-cache implementation to use for storage reads.
12    ///
13    /// The default is `s3_fifo`.
14    #[serde(skip_serializing_if = "Option::is_none")]
15    pub buffer_cache_strategy: Option<BufferCacheStrategy>,
16
17    /// Override the number of buckets/shards used by sharded buffer caches.
18    ///
19    /// This only applies when `buffer_cache_strategy = "s3_fifo"`. Values are
20    /// rounded up to the next power of two because the current implementation
21    /// shards by `hash(key) & (n - 1)`.
22    #[serde(skip_serializing_if = "Option::is_none")]
23    pub buffer_max_buckets: Option<usize>,
24
25    /// How S3-FIFO caches are assigned to foreground/background workers.
26    ///
27    /// This only applies when `buffer_cache_strategy = "s3_fifo"`. The
28    /// default is `shared_per_worker_pair`; LRU always uses `per_thread`.
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub buffer_cache_allocation_strategy: Option<BufferCacheAllocationStrategy>,
31
32    /// Target number of cached bytes retained in each `FBuf` slab size class.
33    ///
34    /// The default is 16 MiB.
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub fbuf_slab_bytes_per_class: Option<usize>,
37
38    /// Whether to asynchronously fetch keys needed for the join operator from
39    /// storage.  Asynchronous fetching should be faster for high-latency
40    /// storage, such as object storage, but it could use excessive amounts of
41    /// memory if the number of keys fetched is very large.
42    #[serde(skip_serializing_if = "Option::is_none")]
43    pub fetch_join: Option<bool>,
44
45    /// Whether to asynchronously fetch keys needed for the distinct operator
46    /// from storage.  Asynchronous fetching should be faster for high-latency
47    /// storage, such as object storage, but it could use excessive amounts of
48    /// memory if the number of keys fetched is very large.
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub fetch_distinct: Option<bool>,
51
52    /// Which merger to use.
53    #[serde(skip_serializing_if = "Option::is_none")]
54    pub merger: Option<MergerType>,
55
56    /// If set, the maximum amount of storage, in MiB, for the POSIX backend to
57    /// allow to be in use before failing all writes with [StorageFull].  This
58    /// is useful for testing on top of storage that does not implement its own
59    /// quota mechanism.
60    ///
61    /// [StorageFull]: std::io::ErrorKind::StorageFull
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub storage_mb_max: Option<u64>,
64
65    /// Attempt to print a stack trace on stack overflow.
66    ///
67    /// To be used for debugging only; do not enable in production.
68    // NOTE: this flag is handled manually in `adapters/src/server.rs` before
69    // parsing DevTweaks. If the name or type of this field changes, make sure to
70    // adjust `server.rs` accordingly.
71    #[serde(skip_serializing_if = "Option::is_none")]
72    pub stack_overflow_backtrace: Option<bool>,
73
74    /// Controls the maximal number of records output by splitter operators
75    /// (joins, distinct, aggregation, rolling window and group operators) at
76    /// each step.
77    ///
78    /// The default value is 10,000 records.
79    // TODO: It would be better if the value were denominated in bytes rather
80    // than records, and if it were configurable per-operator.
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub splitter_chunk_size_records: Option<u64>,
83
84    /// Enable adaptive joins.
85    ///
86    /// Adaptive joins dynamically change their partitioning policy to avoid skew.
87    ///
88    /// Adaptive joins are disabled by default.
89    #[serde(skip_serializing_if = "Option::is_none")]
90    pub adaptive_joins: Option<bool>,
91
92    /// The minimum relative improvement threshold for the join balancer.
93    ///
94    /// The join balancer is a component that dynamically chooses an optimal
95    /// partitioning policy for adaptive join operators.  This parameter
96    /// prevents the join balancer from making changes to the partitioning
97    /// policy if the improvement is not significant, since the overhead of such
98    /// rebalancing, especially when performed frequently, can exceed the
99    /// benefits.
100    ///
101    /// A rebalancing is considered significant if the relative estimated
102    /// improvement for the cluster of joins where the rebalancing is applied is
103    /// at least this threshold.
104    ///
105    /// A rebalancing is applied if both this threshold and
106    /// `balancer_min_absolute_improvement_threshold` are met.
107    ///
108    /// The default value is 1.2.
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub balancer_min_relative_improvement_threshold: Option<f64>,
111
112    /// The minimum absolute improvement threshold for the balancer.
113    ///
114    /// The join balancer is a component that dynamically chooses an optimal
115    /// partitioning policy for adaptive join operators.  This parameter
116    /// prevents the join balancer from making changes to the partitioning
117    /// policy if the improvement is not significant, since the overhead of such
118    /// rebalancing, especially when performed frequently, can exceed the
119    /// benefits.
120    ///
121    /// A rebalancing is considered significant if the absolute estimated
122    /// improvement for the cluster of joins where the rebalancing is applied is
123    /// at least this threshold. The cost model used by the balancer is based on
124    /// the number of records in the largest partition of a collection.
125    ///
126    /// A rebalancing is applied if both this threshold and
127    /// `balancer_min_relative_improvement_threshold` are met.
128    ///
129    /// The default value is 10,000.
130    #[serde(skip_serializing_if = "Option::is_none")]
131    pub balancer_min_absolute_improvement_threshold: Option<u64>,
132
133    /// Factor that discourages the use of the Balance policy in a perfectly balanced collection.
134    ///
135    /// Assuming a perfectly balanced key distribution, the Balance policy is slightly less efficient than Shard,
136    /// since it requires computing the hash of the entire key/value pair. This factor discourages the use of this policy
137    /// if the skew is `<balancer_balance_tax`.
138    ///
139    /// The default value is 1.1.
140    #[serde(skip_serializing_if = "Option::is_none")]
141    pub balancer_balance_tax: Option<f64>,
142
143    /// The balancer threshold for checking for an improved partitioning policy for a stream.
144    ///
145    /// Finding a good partitioning policy for a circuit involves solving an optimization problem,
146    /// which can be relatively expensive. Instead of doing this on every step, the balancer only
147    /// checks for an improved partitioning policy if the key distribution of a stream has changed
148    /// significantly since the current solution was computed.  Specifically, it only kicks in when
149    /// the size of at least one shard of at least one stream in the cluster has changed by more than
150    /// this threshold.
151    ///
152    /// The default value is 0.1.
153    #[serde(skip_serializing_if = "Option::is_none")]
154    pub balancer_key_distribution_refresh_threshold: Option<f64>,
155
156    /// False-positive rate for Bloom filters on batches on storage, as a
157    /// fraction f, where 0 < f < 1.
158    ///
159    /// The false-positive rate trades off between the amount of memory used by
160    /// Bloom filters and how frequently storage needs to be searched for keys
161    /// that are not actually present.  Typical false-positive rates and their
162    /// corresponding memory costs are:
163    ///
164    /// - 0.1: 4.8 bits per key
165    /// - 0.01: 9.6 bits per key
166    /// - 0.001: 14.4 bits per key
167    /// - 0.0001: 19.2 bits per key (default)
168    ///
169    /// Values outside the valid range, such as 0.0, disable Bloom filters.
170    #[serde(skip_serializing_if = "Option::is_none")]
171    pub bloom_false_positive_rate: Option<f64>,
172
173    /// Maximum batch size in records for level 0 merges.
174    #[serde(skip_serializing_if = "Option::is_none")]
175    pub max_level0_batch_size_records: Option<u16>,
176
177    /// The number of merger threads.
178    ///
179    /// The default value is equal to the number of worker threads.
180    #[serde(skip_serializing_if = "Option::is_none")]
181    pub merger_threads: Option<u16>,
182
183    /// Additional bias the merger assigns to records with negative weights
184    /// (retractions) to promote them to higher levels of the LSM tree sooner.
185    ///
186    /// Reasonable values for this parameter are in the range [0, 10].
187    ///
188    /// The default value is 0, which means that retractions are not given
189    /// any additional bias.
190    #[serde(skip_serializing_if = "Option::is_none")]
191    pub negative_weight_multiplier: Option<u16>,
192}
193
194impl DevTweaks {
195    pub fn buffer_cache_strategy(&self) -> BufferCacheStrategy {
196        self.buffer_cache_strategy.unwrap_or_default()
197    }
198    pub fn buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
199        self.buffer_cache_allocation_strategy.unwrap_or_default()
200    }
201    pub fn effective_buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
202        match self.buffer_cache_strategy() {
203            BufferCacheStrategy::S3Fifo => self.buffer_cache_allocation_strategy(),
204            BufferCacheStrategy::Lru => BufferCacheAllocationStrategy::PerThread,
205        }
206    }
207    pub fn fetch_join(&self) -> bool {
208        self.fetch_join.unwrap_or(false)
209    }
210    pub fn fetch_distinct(&self) -> bool {
211        self.fetch_distinct.unwrap_or(false)
212    }
213    pub fn merger(&self) -> MergerType {
214        self.merger.unwrap_or_default()
215    }
216    pub fn stack_overflow_backtrace(&self) -> bool {
217        self.stack_overflow_backtrace.unwrap_or(false)
218    }
219    pub fn splitter_chunk_size_records(&self) -> u64 {
220        self.splitter_chunk_size_records.unwrap_or(10_000)
221    }
222    pub fn adaptive_joins(&self) -> bool {
223        self.adaptive_joins.unwrap_or(false)
224    }
225    pub fn balancer_min_relative_improvement_threshold(&self) -> f64 {
226        self.balancer_min_relative_improvement_threshold
227            .unwrap_or(1.2)
228    }
229    pub fn balancer_min_absolute_improvement_threshold(&self) -> u64 {
230        self.balancer_min_absolute_improvement_threshold
231            .unwrap_or(10_000)
232    }
233    pub fn balancer_balance_tax(&self) -> f64 {
234        self.balancer_balance_tax.unwrap_or(1.1)
235    }
236    pub fn balancer_key_distribution_refresh_threshold(&self) -> f64 {
237        self.balancer_key_distribution_refresh_threshold
238            .unwrap_or(0.1)
239    }
240    pub fn bloom_false_positive_rate(&self) -> f64 {
241        self.bloom_false_positive_rate.unwrap_or(0.0001)
242    }
243    pub fn negative_weight_multiplier(&self) -> u16 {
244        self.negative_weight_multiplier.unwrap_or(0)
245    }
246}
247
248/// Selects which eviction strategy backs a cache instance.
249#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
250#[serde(rename_all = "snake_case")]
251pub enum BufferCacheStrategy {
252    /// Use the sharded S3-FIFO cache backed by `quick_cache`.
253    #[default]
254    S3Fifo,
255
256    /// Use the mutex-protected weighted LRU cache.
257    Lru,
258}
259
260/// Controls how caches are shared across a foreground/background worker pair.
261#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
262#[serde(rename_all = "snake_case")]
263pub enum BufferCacheAllocationStrategy {
264    /// Share one cache across a foreground/background worker pair.
265    #[default]
266    SharedPerWorkerPair,
267
268    /// Create a separate cache for each foreground/background thread.
269    PerThread,
270
271    /// Share one cache across all foreground/background threads.
272    Global,
273}
274
275/// Which merger to use.
276#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
277#[serde(rename_all = "snake_case")]
278pub enum MergerType {
279    /// Newer merger, which should be faster for high-latency storage, such as
280    /// object storage, but it likely needs tuning.
281    PushMerger,
282
283    /// The old standby, with known performance.
284    #[default]
285    ListMerger,
286}