1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
/// Optional settings for tweaking Feldera internals.
///
/// These settings reflect experiments that may come and go and change from
/// version to version. Users should not consider them to be stable.
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, ToSchema)]
#[serde(default)]
pub struct DevTweaks {
/// Buffer-cache implementation to use for storage reads.
///
/// The default is `s3_fifo`.
#[serde(skip_serializing_if = "Option::is_none")]
pub buffer_cache_strategy: Option<BufferCacheStrategy>,
/// Override the number of buckets/shards used by sharded buffer caches.
///
/// This only applies when `buffer_cache_strategy = "s3_fifo"`. Values are
/// rounded up to the next power of two because the current implementation
/// shards by `hash(key) & (n - 1)`.
#[serde(skip_serializing_if = "Option::is_none")]
pub buffer_max_buckets: Option<usize>,
/// How S3-FIFO caches are assigned to foreground/background workers.
///
/// This only applies when `buffer_cache_strategy = "s3_fifo"`. The
/// default is `shared_per_worker_pair`; LRU always uses `per_thread`.
#[serde(skip_serializing_if = "Option::is_none")]
pub buffer_cache_allocation_strategy: Option<BufferCacheAllocationStrategy>,
/// Target number of cached bytes retained in each `FBuf` slab size class.
///
/// The default is 16 MiB.
#[serde(skip_serializing_if = "Option::is_none")]
pub fbuf_slab_bytes_per_class: Option<usize>,
/// Whether to asynchronously fetch keys needed for the join operator from
/// storage. Asynchronous fetching should be faster for high-latency
/// storage, such as object storage, but it could use excessive amounts of
/// memory if the number of keys fetched is very large.
#[serde(skip_serializing_if = "Option::is_none")]
pub fetch_join: Option<bool>,
/// Whether to asynchronously fetch keys needed for the distinct operator
/// from storage. Asynchronous fetching should be faster for high-latency
/// storage, such as object storage, but it could use excessive amounts of
/// memory if the number of keys fetched is very large.
#[serde(skip_serializing_if = "Option::is_none")]
pub fetch_distinct: Option<bool>,
/// Which merger to use.
#[serde(skip_serializing_if = "Option::is_none")]
pub merger: Option<MergerType>,
/// If set, the maximum amount of storage, in MiB, for the POSIX backend to
/// allow to be in use before failing all writes with [StorageFull]. This
/// is useful for testing on top of storage that does not implement its own
/// quota mechanism.
///
/// [StorageFull]: std::io::ErrorKind::StorageFull
#[serde(skip_serializing_if = "Option::is_none")]
pub storage_mb_max: Option<u64>,
/// Attempt to print a stack trace on stack overflow.
///
/// To be used for debugging only; do not enable in production.
// NOTE: this flag is handled manually in `adapters/src/server.rs` before
// parsing DevTweaks. If the name or type of this field changes, make sure to
// adjust `server.rs` accordingly.
#[serde(skip_serializing_if = "Option::is_none")]
pub stack_overflow_backtrace: Option<bool>,
/// Controls the maximal number of records output by splitter operators
/// (joins, distinct, aggregation, rolling window and group operators) at
/// each step.
///
/// The default value is 10,000 records.
// TODO: It would be better if the value were denominated in bytes rather
// than records, and if it were configurable per-operator.
#[serde(skip_serializing_if = "Option::is_none")]
pub splitter_chunk_size_records: Option<u64>,
/// Enable adaptive joins.
///
/// Adaptive joins dynamically change their partitioning policy to avoid skew.
///
/// Adaptive joins are disabled by default.
#[serde(skip_serializing_if = "Option::is_none")]
pub adaptive_joins: Option<bool>,
/// The minimum relative improvement threshold for the join balancer.
///
/// The join balancer is a component that dynamically chooses an optimal
/// partitioning policy for adaptive join operators. This parameter
/// prevents the join balancer from making changes to the partitioning
/// policy if the improvement is not significant, since the overhead of such
/// rebalancing, especially when performed frequently, can exceed the
/// benefits.
///
/// A rebalancing is considered significant if the relative estimated
/// improvement for the cluster of joins where the rebalancing is applied is
/// at least this threshold.
///
/// A rebalancing is applied if both this threshold and
/// `balancer_min_absolute_improvement_threshold` are met.
///
/// The default value is 1.2.
#[serde(skip_serializing_if = "Option::is_none")]
pub balancer_min_relative_improvement_threshold: Option<f64>,
/// The minimum absolute improvement threshold for the balancer.
///
/// The join balancer is a component that dynamically chooses an optimal
/// partitioning policy for adaptive join operators. This parameter
/// prevents the join balancer from making changes to the partitioning
/// policy if the improvement is not significant, since the overhead of such
/// rebalancing, especially when performed frequently, can exceed the
/// benefits.
///
/// A rebalancing is considered significant if the absolute estimated
/// improvement for the cluster of joins where the rebalancing is applied is
/// at least this threshold. The cost model used by the balancer is based on
/// the number of records in the largest partition of a collection.
///
/// A rebalancing is applied if both this threshold and
/// `balancer_min_relative_improvement_threshold` are met.
///
/// The default value is 10,000.
#[serde(skip_serializing_if = "Option::is_none")]
pub balancer_min_absolute_improvement_threshold: Option<u64>,
/// Factor that discourages the use of the Balance policy in a perfectly balanced collection.
///
/// Assuming a perfectly balanced key distribution, the Balance policy is slightly less efficient than Shard,
/// since it requires computing the hash of the entire key/value pair. This factor discourages the use of this policy
/// if the skew is `<balancer_balance_tax`.
///
/// The default value is 1.1.
#[serde(skip_serializing_if = "Option::is_none")]
pub balancer_balance_tax: Option<f64>,
/// The balancer threshold for checking for an improved partitioning policy for a stream.
///
/// Finding a good partitioning policy for a circuit involves solving an optimization problem,
/// which can be relatively expensive. Instead of doing this on every step, the balancer only
/// checks for an improved partitioning policy if the key distribution of a stream has changed
/// significantly since the current solution was computed. Specifically, it only kicks in when
/// the size of at least one shard of at least one stream in the cluster has changed by more than
/// this threshold.
///
/// The default value is 0.1.
#[serde(skip_serializing_if = "Option::is_none")]
pub balancer_key_distribution_refresh_threshold: Option<f64>,
/// False-positive rate for Bloom filters on batches on storage, as a
/// fraction f, where 0 < f < 1.
///
/// The false-positive rate trades off between the amount of memory used by
/// Bloom filters and how frequently storage needs to be searched for keys
/// that are not actually present. Typical false-positive rates and their
/// corresponding memory costs are:
///
/// - 0.1: 4.8 bits per key
/// - 0.01: 9.6 bits per key
/// - 0.001: 14.4 bits per key
/// - 0.0001: 19.2 bits per key (default)
///
/// Values outside the valid range, such as 0.0, disable Bloom filters.
#[serde(skip_serializing_if = "Option::is_none")]
pub bloom_false_positive_rate: Option<f64>,
/// Maximum batch size in records for level 0 merges.
#[serde(skip_serializing_if = "Option::is_none")]
pub max_level0_batch_size_records: Option<u16>,
/// The number of merger threads.
///
/// The default value is equal to the number of worker threads.
#[serde(skip_serializing_if = "Option::is_none")]
pub merger_threads: Option<u16>,
/// Additional bias the merger assigns to records with negative weights
/// (retractions) to promote them to higher levels of the LSM tree sooner.
///
/// Reasonable values for this parameter are in the range [0, 10].
///
/// The default value is 0, which means that retractions are not given
/// any additional bias.
#[serde(skip_serializing_if = "Option::is_none")]
pub negative_weight_multiplier: Option<u16>,
}
impl DevTweaks {
pub fn buffer_cache_strategy(&self) -> BufferCacheStrategy {
self.buffer_cache_strategy.unwrap_or_default()
}
pub fn buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
self.buffer_cache_allocation_strategy.unwrap_or_default()
}
pub fn effective_buffer_cache_allocation_strategy(&self) -> BufferCacheAllocationStrategy {
match self.buffer_cache_strategy() {
BufferCacheStrategy::S3Fifo => self.buffer_cache_allocation_strategy(),
BufferCacheStrategy::Lru => BufferCacheAllocationStrategy::PerThread,
}
}
pub fn fetch_join(&self) -> bool {
self.fetch_join.unwrap_or(false)
}
pub fn fetch_distinct(&self) -> bool {
self.fetch_distinct.unwrap_or(false)
}
pub fn merger(&self) -> MergerType {
self.merger.unwrap_or_default()
}
pub fn stack_overflow_backtrace(&self) -> bool {
self.stack_overflow_backtrace.unwrap_or(false)
}
pub fn splitter_chunk_size_records(&self) -> u64 {
self.splitter_chunk_size_records.unwrap_or(10_000)
}
pub fn adaptive_joins(&self) -> bool {
self.adaptive_joins.unwrap_or(false)
}
pub fn balancer_min_relative_improvement_threshold(&self) -> f64 {
self.balancer_min_relative_improvement_threshold
.unwrap_or(1.2)
}
pub fn balancer_min_absolute_improvement_threshold(&self) -> u64 {
self.balancer_min_absolute_improvement_threshold
.unwrap_or(10_000)
}
pub fn balancer_balance_tax(&self) -> f64 {
self.balancer_balance_tax.unwrap_or(1.1)
}
pub fn balancer_key_distribution_refresh_threshold(&self) -> f64 {
self.balancer_key_distribution_refresh_threshold
.unwrap_or(0.1)
}
pub fn bloom_false_positive_rate(&self) -> f64 {
self.bloom_false_positive_rate.unwrap_or(0.0001)
}
pub fn negative_weight_multiplier(&self) -> u16 {
self.negative_weight_multiplier.unwrap_or(0)
}
}
/// Selects which eviction strategy backs a cache instance.
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum BufferCacheStrategy {
/// Use the sharded S3-FIFO cache backed by `quick_cache`.
#[default]
S3Fifo,
/// Use the mutex-protected weighted LRU cache.
Lru,
}
/// Controls how caches are shared across a foreground/background worker pair.
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum BufferCacheAllocationStrategy {
/// Share one cache across a foreground/background worker pair.
#[default]
SharedPerWorkerPair,
/// Create a separate cache for each foreground/background thread.
PerThread,
/// Share one cache across all foreground/background threads.
Global,
}
/// Which merger to use.
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum MergerType {
/// Newer merger, which should be faster for high-latency storage, such as
/// object storage, but it likely needs tuning.
PushMerger,
/// The old standby, with known performance.
#[default]
ListMerger,
}