Skip to main content

DevTweaks

Struct DevTweaks 

Source
pub struct DevTweaks {
Show 21 fields pub adaptive_joins: Option<bool>, pub balancer_balance_tax: Option<f64>, pub balancer_key_distribution_refresh_threshold: Option<f64>, pub balancer_min_absolute_improvement_threshold: Option<i64>, pub balancer_min_relative_improvement_threshold: Option<f64>, pub bloom_false_positive_rate: Option<f64>, pub buffer_cache_allocation_strategy: Option<BufferCacheAllocationStrategy>, pub buffer_cache_strategy: Option<BufferCacheStrategy>, pub buffer_max_buckets: Option<u64>, pub eager_evict: Option<bool>, pub enable_roaring: Option<bool>, pub fbuf_slab_bytes_per_class: Option<u64>, pub fetch_distinct: Option<bool>, pub fetch_join: Option<bool>, pub max_level0_batch_size_records: Option<i64>, pub merger: Option<MergerType>, pub merger_threads: Option<i64>, pub negative_weight_multiplier: Option<i64>, pub splitter_chunk_size_records: Option<i64>, pub stack_overflow_backtrace: Option<bool>, pub storage_mb_max: Option<i64>,
}
Expand description

Optional settings for tweaking Feldera internals.

These settings reflect experiments that may come and go and change from version to version. Users should not consider them to be stable.

JSON schema
{
 "description": "Optional settings for tweaking Feldera internals.\n\nThese settings reflect experiments that may come and go and change from\nversion to version.  Users should not consider them to be stable.",
 "type": "object",
 "properties": {
   "adaptive_joins": {
     "description": "Enable adaptive joins.\n\nAdaptive joins dynamically change their partitioning policy to avoid skew.\n\nAdaptive joins are disabled by default.",
     "type": [
       "boolean",
       "null"
     ]
   },
   "balancer_balance_tax": {
     "description": "Factor that discourages the use of the Balance policy in a perfectly balanced collection.\n\nAssuming a perfectly balanced key distribution, the Balance policy is slightly less efficient than Shard,\nsince it requires computing the hash of the entire key/value pair. This factor discourages the use of this policy\nif the skew is `<balancer_balance_tax`.\n\nThe default value is 1.1.",
     "type": [
       "number",
       "null"
     ],
     "format": "double"
   },
   "balancer_key_distribution_refresh_threshold": {
     "description": "The balancer threshold for checking for an improved partitioning policy for a stream.\n\nFinding a good partitioning policy for a circuit involves solving an optimization problem,\nwhich can be relatively expensive. Instead of doing this on every step, the balancer only\nchecks for an improved partitioning policy if the key distribution of a stream has changed\nsignificantly since the current solution was computed.  Specifically, it only kicks in when\nthe size of at least one shard of at least one stream in the cluster has changed by more than\nthis threshold.\n\nThe default value is 0.1.",
     "type": [
       "number",
       "null"
     ],
     "format": "double"
   },
   "balancer_min_absolute_improvement_threshold": {
     "description": "The minimum absolute improvement threshold for the balancer.\n\nThe join balancer is a component that dynamically chooses an optimal\npartitioning policy for adaptive join operators.  This parameter\nprevents the join balancer from making changes to the partitioning\npolicy if the improvement is not significant, since the overhead of such\nrebalancing, especially when performed frequently, can exceed the\nbenefits.\n\nA rebalancing is considered significant if the absolute estimated\nimprovement for the cluster of joins where the rebalancing is applied is\nat least this threshold. The cost model used by the balancer is based on\nthe number of records in the largest partition of a collection.\n\nA rebalancing is applied if both this threshold and\n`balancer_min_relative_improvement_threshold` are met.\n\nThe default value is 10,000.",
     "type": [
       "integer",
       "null"
     ],
     "format": "int64",
     "minimum": 0.0
   },
   "balancer_min_relative_improvement_threshold": {
     "description": "The minimum relative improvement threshold for the join balancer.\n\nThe join balancer is a component that dynamically chooses an optimal\npartitioning policy for adaptive join operators.  This parameter\nprevents the join balancer from making changes to the partitioning\npolicy if the improvement is not significant, since the overhead of such\nrebalancing, especially when performed frequently, can exceed the\nbenefits.\n\nA rebalancing is considered significant if the relative estimated\nimprovement for the cluster of joins where the rebalancing is applied is\nat least this threshold.\n\nA rebalancing is applied if both this threshold and\n`balancer_min_absolute_improvement_threshold` are met.\n\nThe default value is 1.2.",
     "type": [
       "number",
       "null"
     ],
     "format": "double"
   },
   "bloom_false_positive_rate": {
     "description": "False-positive rate for Bloom filters on batches on storage, as a\nfraction f, where 0 < f < 1.\n\nThe false-positive rate trades off between the amount of memory used by\nBloom filters and how frequently storage needs to be searched for keys\nthat are not actually present.  Typical false-positive rates and their\ncorresponding memory costs are:\n\n- 0.1: 4.8 bits per key\n- 0.01: 9.6 bits per key\n- 0.001: 14.4 bits per key\n- 0.0001: 19.2 bits per key (default)\n\nValues outside the valid range, such as 0.0, disable Bloom filters.",
     "type": [
       "number",
       "null"
     ],
     "format": "double"
   },
   "buffer_cache_allocation_strategy": {
     "allOf": [
       {
         "$ref": "#/components/schemas/BufferCacheAllocationStrategy"
       }
     ]
   },
   "buffer_cache_strategy": {
     "allOf": [
       {
         "$ref": "#/components/schemas/BufferCacheStrategy"
       }
     ]
   },
   "buffer_max_buckets": {
     "description": "Override the number of buckets/shards used by sharded buffer caches.\n\nThis only applies when `buffer_cache_strategy = \"s3_fifo\"`. Values are\nrounded up to the next power of two because the current implementation\nshards by `hash(key) & (n - 1)`.",
     "type": [
       "integer",
       "null"
     ],
     "minimum": 0.0
   },
   "eager_evict": {
     "description": "Evict eagerly from buffer caches as files get deleted.\n\nThis is an optimization that drops files from\nthe cache as soon as they are deleted.\n\nIt has unknown (no?) performance benefits from what I can tell.\n\nHistorically it made sense to do this for two reasons:\na) we know with 100% guarantee that the file won't ever be\nread again.\nb) we could do this in O(logn) time with the LRU cache.\nThis is no longer true for s3-fifo where it is O(n).\n\nIf the eviction is expensive, (many small objects in the cache)\nthis can cause a regression.\n\nNew default disables this behavior by making it false.\n\nIf this doesn't cause regression we will remove this option\nin the future.",
     "type": [
       "boolean",
       "null"
     ]
   },
   "enable_roaring": {
     "description": "Whether file-backed batches may use roaring membership filters when the\nkey type supports them.",
     "type": [
       "boolean",
       "null"
     ]
   },
   "fbuf_slab_bytes_per_class": {
     "description": "Target number of cached bytes retained in each `FBuf` slab size class.\n\nThe default is 16 MiB.",
     "type": [
       "integer",
       "null"
     ],
     "minimum": 0.0
   },
   "fetch_distinct": {
     "description": "Whether to asynchronously fetch keys needed for the distinct operator\nfrom storage.  Asynchronous fetching should be faster for high-latency\nstorage, such as object storage, but it could use excessive amounts of\nmemory if the number of keys fetched is very large.",
     "type": [
       "boolean",
       "null"
     ]
   },
   "fetch_join": {
     "description": "Whether to asynchronously fetch keys needed for the join operator from\nstorage.  Asynchronous fetching should be faster for high-latency\nstorage, such as object storage, but it could use excessive amounts of\nmemory if the number of keys fetched is very large.",
     "type": [
       "boolean",
       "null"
     ]
   },
   "max_level0_batch_size_records": {
     "description": "Maximum batch size in records for level 0 merges.",
     "type": [
       "integer",
       "null"
     ],
     "format": "int32",
     "minimum": 0.0
   },
   "merger": {
     "allOf": [
       {
         "$ref": "#/components/schemas/MergerType"
       }
     ]
   },
   "merger_threads": {
     "description": "The number of merger threads.\n\nThe default value is equal to the number of worker threads.",
     "type": [
       "integer",
       "null"
     ],
     "format": "int32",
     "minimum": 0.0
   },
   "negative_weight_multiplier": {
     "description": "Additional bias the merger assigns to records with negative weights\n(retractions) to promote them to higher levels of the LSM tree sooner.\n\nReasonable values for this parameter are in the range [0, 10].\n\nThe default value is 0, which means that retractions are not given\nany additional bias.",
     "type": [
       "integer",
       "null"
     ],
     "format": "int32",
     "minimum": 0.0
   },
   "splitter_chunk_size_records": {
     "description": "Controls the maximal number of records output by splitter operators\n(joins, distinct, aggregation, rolling window and group operators) at\neach step.\n\nThe default value is 10,000 records.",
     "type": [
       "integer",
       "null"
     ],
     "format": "int64",
     "minimum": 0.0
   },
   "stack_overflow_backtrace": {
     "description": "Attempt to print a stack trace on stack overflow.\n\nTo be used for debugging only; do not enable in production.",
     "type": [
       "boolean",
       "null"
     ]
   },
   "storage_mb_max": {
     "description": "If set, the maximum amount of storage, in MiB, for the POSIX backend to\nallow to be in use before failing all writes with [StorageFull].  This\nis useful for testing on top of storage that does not implement its own\nquota mechanism.\n\n[StorageFull]: std::io::ErrorKind::StorageFull",
     "type": [
       "integer",
       "null"
     ],
     "format": "int64",
     "minimum": 0.0
   }
 },
 "additionalProperties": {
   "description": "Options not understood by this particular version.\n\nThis allows the pipeline manager to take options that a custom or old\nruntime version accepts.",
   "default": {}
 }
}

Fields§

§adaptive_joins: Option<bool>

Enable adaptive joins.

Adaptive joins dynamically change their partitioning policy to avoid skew.

Adaptive joins are disabled by default.

§balancer_balance_tax: Option<f64>

Factor that discourages the use of the Balance policy in a perfectly balanced collection.

Assuming a perfectly balanced key distribution, the Balance policy is slightly less efficient than Shard, since it requires computing the hash of the entire key/value pair. This factor discourages the use of this policy if the skew is <balancer_balance_tax.

The default value is 1.1.

§balancer_key_distribution_refresh_threshold: Option<f64>

The balancer threshold for checking for an improved partitioning policy for a stream.

Finding a good partitioning policy for a circuit involves solving an optimization problem, which can be relatively expensive. Instead of doing this on every step, the balancer only checks for an improved partitioning policy if the key distribution of a stream has changed significantly since the current solution was computed. Specifically, it only kicks in when the size of at least one shard of at least one stream in the cluster has changed by more than this threshold.

The default value is 0.1.

§balancer_min_absolute_improvement_threshold: Option<i64>

The minimum absolute improvement threshold for the balancer.

The join balancer is a component that dynamically chooses an optimal partitioning policy for adaptive join operators. This parameter prevents the join balancer from making changes to the partitioning policy if the improvement is not significant, since the overhead of such rebalancing, especially when performed frequently, can exceed the benefits.

A rebalancing is considered significant if the absolute estimated improvement for the cluster of joins where the rebalancing is applied is at least this threshold. The cost model used by the balancer is based on the number of records in the largest partition of a collection.

A rebalancing is applied if both this threshold and balancer_min_relative_improvement_threshold are met.

The default value is 10,000.

§balancer_min_relative_improvement_threshold: Option<f64>

The minimum relative improvement threshold for the join balancer.

The join balancer is a component that dynamically chooses an optimal partitioning policy for adaptive join operators. This parameter prevents the join balancer from making changes to the partitioning policy if the improvement is not significant, since the overhead of such rebalancing, especially when performed frequently, can exceed the benefits.

A rebalancing is considered significant if the relative estimated improvement for the cluster of joins where the rebalancing is applied is at least this threshold.

A rebalancing is applied if both this threshold and balancer_min_absolute_improvement_threshold are met.

The default value is 1.2.

§bloom_false_positive_rate: Option<f64>

False-positive rate for Bloom filters on batches on storage, as a fraction f, where 0 < f < 1.

The false-positive rate trades off between the amount of memory used by Bloom filters and how frequently storage needs to be searched for keys that are not actually present. Typical false-positive rates and their corresponding memory costs are:

  • 0.1: 4.8 bits per key
  • 0.01: 9.6 bits per key
  • 0.001: 14.4 bits per key
  • 0.0001: 19.2 bits per key (default)

Values outside the valid range, such as 0.0, disable Bloom filters.

§buffer_cache_allocation_strategy: Option<BufferCacheAllocationStrategy>§buffer_cache_strategy: Option<BufferCacheStrategy>§buffer_max_buckets: Option<u64>

Override the number of buckets/shards used by sharded buffer caches.

This only applies when buffer_cache_strategy = "s3_fifo". Values are rounded up to the next power of two because the current implementation shards by hash(key) & (n - 1).

§eager_evict: Option<bool>

Evict eagerly from buffer caches as files get deleted.

This is an optimization that drops files from the cache as soon as they are deleted.

It has unknown (no?) performance benefits from what I can tell.

Historically it made sense to do this for two reasons: a) we know with 100% guarantee that the file won’t ever be read again. b) we could do this in O(logn) time with the LRU cache. This is no longer true for s3-fifo where it is O(n).

If the eviction is expensive, (many small objects in the cache) this can cause a regression.

New default disables this behavior by making it false.

If this doesn’t cause regression we will remove this option in the future.

§enable_roaring: Option<bool>

Whether file-backed batches may use roaring membership filters when the key type supports them.

§fbuf_slab_bytes_per_class: Option<u64>

Target number of cached bytes retained in each FBuf slab size class.

The default is 16 MiB.

§fetch_distinct: Option<bool>

Whether to asynchronously fetch keys needed for the distinct operator from storage. Asynchronous fetching should be faster for high-latency storage, such as object storage, but it could use excessive amounts of memory if the number of keys fetched is very large.

§fetch_join: Option<bool>

Whether to asynchronously fetch keys needed for the join operator from storage. Asynchronous fetching should be faster for high-latency storage, such as object storage, but it could use excessive amounts of memory if the number of keys fetched is very large.

§max_level0_batch_size_records: Option<i64>

Maximum batch size in records for level 0 merges.

§merger: Option<MergerType>§merger_threads: Option<i64>

The number of merger threads.

The default value is equal to the number of worker threads.

§negative_weight_multiplier: Option<i64>

Additional bias the merger assigns to records with negative weights (retractions) to promote them to higher levels of the LSM tree sooner.

Reasonable values for this parameter are in the range [0, 10].

The default value is 0, which means that retractions are not given any additional bias.

§splitter_chunk_size_records: Option<i64>

Controls the maximal number of records output by splitter operators (joins, distinct, aggregation, rolling window and group operators) at each step.

The default value is 10,000 records.

§stack_overflow_backtrace: Option<bool>

Attempt to print a stack trace on stack overflow.

To be used for debugging only; do not enable in production.

§storage_mb_max: Option<i64>

If set, the maximum amount of storage, in MiB, for the POSIX backend to allow to be in use before failing all writes with StorageFull. This is useful for testing on top of storage that does not implement its own quota mechanism.

Implementations§

Trait Implementations§

Source§

impl Clone for DevTweaks

Source§

fn clone(&self) -> DevTweaks

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for DevTweaks

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<'de> Deserialize<'de> for DevTweaks

Source§

fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>
where __D: Deserializer<'de>,

Deserialize this value from the given Serde deserializer. Read more
Source§

impl From<&DevTweaks> for DevTweaks

Source§

fn from(value: &DevTweaks) -> Self

Converts to this type from the input type.
Source§

impl From<DevTweaks> for DevTweaks

Source§

fn from(value: DevTweaks) -> Self

Converts to this type from the input type.
Source§

impl Serialize for DevTweaks

Source§

fn serialize<__S>(&self, __serializer: __S) -> Result<__S::Ok, __S::Error>
where __S: Serializer,

Serialize this value into the given Serde serializer. Read more
Source§

impl TryFrom<DevTweaks> for DevTweaks

Source§

type Error = ConversionError

The type returned in the event of a conversion error.
Source§

fn try_from(value: DevTweaks) -> Result<Self, ConversionError>

Performs the conversion.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> Serialize for T
where T: Serialize + ?Sized,

Source§

fn erased_serialize(&self, serializer: &mut dyn Serializer) -> Result<Ok, Error>

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> DeserializeOwned for T
where T: for<'de> Deserialize<'de>,

Source§

impl<T> ErasedDestructor for T
where T: 'static,