use std::fmt::Debug;
use std::path::PathBuf;
use std::time::Duration;
use config::ConfigError;
use serde::Deserialize;
use serde::Serialize;
use tracing::warn;
use super::lease::LeaseConfig;
use super::validate_directory;
use crate::Error;
use crate::Result;
#[derive(Serialize, Deserialize, Clone)]
pub struct RaftConfig {
#[serde(default)]
pub replication: ReplicationConfig,
#[serde(default)]
pub batching: BatchingConfig,
#[serde(default)]
pub election: ElectionConfig,
#[serde(default)]
pub membership: MembershipConfig,
#[serde(default, alias = "storage")]
pub state_machine: StateMachineConfig,
#[serde(default)]
pub snapshot: SnapshotConfig,
#[serde(default)]
pub persistence: PersistenceConfig,
#[serde(default = "default_learner_catchup_threshold")]
pub learner_catchup_threshold: u64,
#[serde(default = "default_learner_check_throttle_ms")]
pub learner_check_throttle_ms: u64,
#[serde(default = "default_general_timeout")]
pub general_raft_timeout_duration_in_ms: u64,
#[serde(default = "default_snapshot_rpc_timeout_ms")]
pub snapshot_rpc_timeout_ms: u64,
#[serde(default)]
pub auto_join: AutoJoinConfig,
#[serde(default)]
pub read_consistency: ReadConsistencyConfig,
#[serde(default)]
pub backpressure: BackpressureConfig,
#[serde(default)]
pub rpc_compression: RpcCompressionConfig,
#[serde(default)]
pub watch: WatchConfig,
#[serde(default)]
pub metrics: MetricsConfig,
}
impl Debug for RaftConfig {
fn fmt(
&self,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
f.debug_struct("RaftConfig").finish()
}
}
impl Default for RaftConfig {
fn default() -> Self {
Self {
replication: ReplicationConfig::default(),
batching: BatchingConfig::default(),
election: ElectionConfig::default(),
membership: MembershipConfig::default(),
state_machine: StateMachineConfig::default(),
snapshot: SnapshotConfig::default(),
persistence: PersistenceConfig::default(),
learner_catchup_threshold: default_learner_catchup_threshold(),
learner_check_throttle_ms: default_learner_check_throttle_ms(),
general_raft_timeout_duration_in_ms: default_general_timeout(),
auto_join: AutoJoinConfig::default(),
snapshot_rpc_timeout_ms: default_snapshot_rpc_timeout_ms(),
read_consistency: ReadConsistencyConfig::default(),
backpressure: BackpressureConfig::default(),
rpc_compression: RpcCompressionConfig::default(),
watch: WatchConfig::default(),
metrics: MetricsConfig::default(),
}
}
}
impl RaftConfig {
pub fn validate(&self) -> Result<()> {
if self.learner_catchup_threshold == 0 {
return Err(Error::Config(ConfigError::Message(
"learner_catchup_threshold must be greater than 0".into(),
)));
}
if self.general_raft_timeout_duration_in_ms < 1 {
return Err(Error::Config(ConfigError::Message(
"general_raft_timeout_duration_in_ms must be at least 1ms".into(),
)));
}
self.replication.validate()?;
self.batching.validate()?;
self.election.validate()?;
self.membership.validate()?;
self.state_machine.validate()?;
self.snapshot.validate()?;
self.read_consistency.validate()?;
self.watch.validate()?;
if self.read_consistency.lease_duration_ms > self.election.election_timeout_min / 2 {
warn!(
"read_consistency.lease_duration_ms ({}) is greater than half of election_timeout_min ({}ms). \
This may cause lease expiration during normal operation.",
self.read_consistency.lease_duration_ms,
self.election.election_timeout_min / 2
);
}
Ok(())
}
}
fn default_learner_catchup_threshold() -> u64 {
1
}
fn default_learner_check_throttle_ms() -> u64 {
1000 }
fn default_general_timeout() -> u64 {
50
}
fn default_snapshot_rpc_timeout_ms() -> u64 {
3_600_000
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ReplicationConfig {
#[serde(default = "default_append_interval")]
pub rpc_append_entries_clock_in_ms: u64,
#[serde(default = "default_entries_per_replication")]
pub append_entries_max_entries_per_replication: u64,
}
impl Default for ReplicationConfig {
fn default() -> Self {
Self {
rpc_append_entries_clock_in_ms: default_append_interval(),
append_entries_max_entries_per_replication: default_entries_per_replication(),
}
}
}
impl ReplicationConfig {
fn validate(&self) -> Result<()> {
if self.rpc_append_entries_clock_in_ms == 0 {
return Err(Error::Config(ConfigError::Message(
"rpc_append_entries_clock_in_ms cannot be 0".into(),
)));
}
if self.append_entries_max_entries_per_replication == 0 {
return Err(Error::Config(ConfigError::Message(
"append_entries_max_entries_per_replication must be > 0".into(),
)));
}
Ok(())
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct BatchingConfig {
#[serde(default = "default_max_batch_size")]
pub max_batch_size: usize,
}
impl Default for BatchingConfig {
fn default() -> Self {
Self {
max_batch_size: default_max_batch_size(),
}
}
}
impl BatchingConfig {
fn validate(&self) -> Result<()> {
if self.max_batch_size == 0 {
return Err(Error::Config(ConfigError::Message(
"batching.max_batch_size must be > 0".into(),
)));
}
Ok(())
}
}
fn default_append_interval() -> u64 {
100
}
fn default_max_batch_size() -> usize {
100
}
fn default_entries_per_replication() -> u64 {
100
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ElectionConfig {
#[serde(default = "default_election_timeout_min")]
pub election_timeout_min: u64,
#[serde(default = "default_election_timeout_max")]
pub election_timeout_max: u64,
#[serde(default = "default_peer_monitor_interval")]
pub rpc_peer_connectinon_monitor_interval_in_sec: u64,
#[serde(default = "default_client_request_id")]
pub internal_rpc_client_request_id: u32,
}
impl Default for ElectionConfig {
fn default() -> Self {
Self {
election_timeout_min: default_election_timeout_min(),
election_timeout_max: default_election_timeout_max(),
rpc_peer_connectinon_monitor_interval_in_sec: default_peer_monitor_interval(),
internal_rpc_client_request_id: default_client_request_id(),
}
}
}
impl ElectionConfig {
fn validate(&self) -> Result<()> {
if self.election_timeout_min >= self.election_timeout_max {
return Err(Error::Config(ConfigError::Message(format!(
"election_timeout_min {}ms must be less than election_timeout_max {}ms",
self.election_timeout_min, self.election_timeout_max
))));
}
if self.rpc_peer_connectinon_monitor_interval_in_sec == 0 {
return Err(Error::Config(ConfigError::Message(
"rpc_peer_connectinon_monitor_interval_in_sec cannot be 0".into(),
)));
}
Ok(())
}
}
fn default_election_timeout_min() -> u64 {
500
}
fn default_election_timeout_max() -> u64 {
1000
}
fn default_peer_monitor_interval() -> u64 {
30
}
fn default_client_request_id() -> u32 {
0
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct MembershipConfig {
#[serde(default = "default_probe_service")]
pub cluster_healthcheck_probe_service_name: String,
#[serde(default = "default_verify_leadership_persistent_timeout")]
pub verify_leadership_persistent_timeout: Duration,
#[serde(default = "default_membership_maintenance_interval")]
pub membership_maintenance_interval: Duration,
#[serde(default)]
pub zombie: ZombieConfig,
#[serde(default)]
pub promotion: PromotionConfig,
}
impl Default for MembershipConfig {
fn default() -> Self {
Self {
cluster_healthcheck_probe_service_name: default_probe_service(),
verify_leadership_persistent_timeout: default_verify_leadership_persistent_timeout(),
membership_maintenance_interval: default_membership_maintenance_interval(),
zombie: ZombieConfig::default(),
promotion: PromotionConfig::default(),
}
}
}
fn default_probe_service() -> String {
"d_engine.server.cluster.ClusterManagementService".to_string()
}
fn default_membership_maintenance_interval() -> Duration {
Duration::from_secs(30)
}
fn default_verify_leadership_persistent_timeout() -> Duration {
Duration::from_secs(3600)
}
impl MembershipConfig {
fn validate(&self) -> Result<()> {
if self.cluster_healthcheck_probe_service_name.is_empty() {
return Err(Error::Config(ConfigError::Message(
"cluster_healthcheck_probe_service_name cannot be empty".into(),
)));
}
Ok(())
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Default)]
pub struct StateMachineConfig {
#[serde(alias = "ttl")]
pub lease: LeaseConfig,
}
impl StateMachineConfig {
pub fn validate(&self) -> Result<()> {
self.lease.validate()?;
Ok(())
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct SnapshotConfig {
#[serde(default = "default_snapshot_enabled")]
pub enable: bool,
#[serde(default = "default_max_log_entries_before_snapshot")]
pub max_log_entries_before_snapshot: u64,
#[serde(default = "default_snapshot_cool_down_since_last_check")]
pub snapshot_cool_down_since_last_check: Duration,
#[serde(default = "default_cleanup_retain_count")]
pub cleanup_retain_count: u64,
#[serde(default = "default_snapshots_dir")]
pub snapshots_dir: PathBuf,
#[serde(default = "default_snapshots_dir_prefix")]
pub snapshots_dir_prefix: String,
#[serde(default = "default_chunk_size")]
pub chunk_size: usize,
#[serde(default = "default_retained_log_entries")]
pub retained_log_entries: u64,
#[serde(default = "default_sender_yield_every_n_chunks")]
pub sender_yield_every_n_chunks: usize,
#[serde(default = "default_receiver_yield_every_n_chunks")]
pub receiver_yield_every_n_chunks: usize,
#[serde(default = "default_max_bandwidth_mbps")]
pub max_bandwidth_mbps: u32,
#[serde(default = "default_push_queue_size")]
pub push_queue_size: usize,
#[serde(default = "default_cache_size")]
pub cache_size: usize,
#[serde(default = "default_max_retries")]
pub max_retries: u32,
#[serde(default = "default_transfer_timeout_in_sec")]
pub transfer_timeout_in_sec: u64,
#[serde(default = "default_retry_interval_in_ms")]
pub retry_interval_in_ms: u64,
#[serde(default = "default_snapshot_push_backoff_in_ms")]
pub snapshot_push_backoff_in_ms: u64,
#[serde(default = "default_snapshot_push_max_retry")]
pub snapshot_push_max_retry: u32,
#[serde(default = "default_push_timeout_in_ms")]
pub push_timeout_in_ms: u64,
}
impl Default for SnapshotConfig {
fn default() -> Self {
Self {
max_log_entries_before_snapshot: default_max_log_entries_before_snapshot(),
snapshot_cool_down_since_last_check: default_snapshot_cool_down_since_last_check(),
cleanup_retain_count: default_cleanup_retain_count(),
snapshots_dir: default_snapshots_dir(),
snapshots_dir_prefix: default_snapshots_dir_prefix(),
chunk_size: default_chunk_size(),
retained_log_entries: default_retained_log_entries(),
sender_yield_every_n_chunks: default_sender_yield_every_n_chunks(),
receiver_yield_every_n_chunks: default_receiver_yield_every_n_chunks(),
max_bandwidth_mbps: default_max_bandwidth_mbps(),
push_queue_size: default_push_queue_size(),
cache_size: default_cache_size(),
max_retries: default_max_retries(),
transfer_timeout_in_sec: default_transfer_timeout_in_sec(),
retry_interval_in_ms: default_retry_interval_in_ms(),
snapshot_push_backoff_in_ms: default_snapshot_push_backoff_in_ms(),
snapshot_push_max_retry: default_snapshot_push_max_retry(),
push_timeout_in_ms: default_push_timeout_in_ms(),
enable: default_snapshot_enabled(),
}
}
}
impl SnapshotConfig {
fn validate(&self) -> Result<()> {
if self.max_log_entries_before_snapshot == 0 {
return Err(Error::Config(ConfigError::Message(
"max_log_entries_before_snapshot must be greater than 0".into(),
)));
}
if self.cleanup_retain_count == 0 {
return Err(Error::Config(ConfigError::Message(
"cleanup_retain_count must be greater than 0".into(),
)));
}
validate_directory(&self.snapshots_dir, "snapshots_dir")?;
if self.chunk_size == 0 {
return Err(Error::Config(ConfigError::Message(format!(
"chunk_size must be at least {} bytes (got {})",
0, self.chunk_size
))));
}
if self.retained_log_entries < 1 {
return Err(Error::Config(ConfigError::Message(format!(
"retained_log_entries must be >= 1, (got {})",
self.retained_log_entries
))));
}
if self.sender_yield_every_n_chunks < 1 {
return Err(Error::Config(ConfigError::Message(format!(
"sender_yield_every_n_chunks must be >= 1, (got {})",
self.sender_yield_every_n_chunks
))));
}
if self.receiver_yield_every_n_chunks < 1 {
return Err(Error::Config(ConfigError::Message(format!(
"receiver_yield_every_n_chunks must be >= 1, (got {})",
self.receiver_yield_every_n_chunks
))));
}
if self.push_queue_size < 1 {
return Err(Error::Config(ConfigError::Message(format!(
"push_queue_size must be >= 1, (got {})",
self.push_queue_size
))));
}
if self.snapshot_push_max_retry < 1 {
return Err(Error::Config(ConfigError::Message(format!(
"snapshot_push_max_retry must be >= 1, (got {})",
self.snapshot_push_max_retry
))));
}
Ok(())
}
}
fn default_snapshot_enabled() -> bool {
true
}
fn default_max_log_entries_before_snapshot() -> u64 {
1000
}
fn default_snapshot_cool_down_since_last_check() -> Duration {
Duration::from_secs(3600)
}
fn default_cleanup_retain_count() -> u64 {
2
}
fn default_snapshots_dir() -> PathBuf {
PathBuf::from("/tmp/snapshots")
}
fn default_snapshots_dir_prefix() -> String {
"snapshot-".to_string()
}
fn default_chunk_size() -> usize {
1024
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct AutoJoinConfig {
#[serde(default = "default_rpc_enable_compression")]
pub rpc_enable_compression: bool,
}
impl Default for AutoJoinConfig {
fn default() -> Self {
Self {
rpc_enable_compression: default_rpc_enable_compression(),
}
}
}
fn default_rpc_enable_compression() -> bool {
true
}
fn default_retained_log_entries() -> u64 {
1
}
fn default_sender_yield_every_n_chunks() -> usize {
1
}
fn default_receiver_yield_every_n_chunks() -> usize {
1
}
fn default_max_bandwidth_mbps() -> u32 {
1
}
fn default_push_queue_size() -> usize {
100
}
fn default_cache_size() -> usize {
10000
}
fn default_max_retries() -> u32 {
1
}
fn default_transfer_timeout_in_sec() -> u64 {
600
}
fn default_retry_interval_in_ms() -> u64 {
10
}
fn default_snapshot_push_backoff_in_ms() -> u64 {
100
}
fn default_snapshot_push_max_retry() -> u32 {
3
}
fn default_push_timeout_in_ms() -> u64 {
300_000
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ZombieConfig {
#[serde(default = "default_zombie_threshold")]
pub threshold: u32,
#[serde(default = "default_zombie_purge_interval")]
pub purge_interval: Duration,
}
impl Default for ZombieConfig {
fn default() -> Self {
Self {
threshold: default_zombie_threshold(),
purge_interval: default_zombie_purge_interval(),
}
}
}
fn default_zombie_threshold() -> u32 {
3
}
fn default_zombie_purge_interval() -> Duration {
Duration::from_secs(30)
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PromotionConfig {
#[serde(default = "default_stale_learner_threshold")]
pub stale_learner_threshold: Duration,
#[serde(default = "default_stale_check_interval")]
pub stale_check_interval: Duration,
}
impl Default for PromotionConfig {
fn default() -> Self {
Self {
stale_learner_threshold: default_stale_learner_threshold(),
stale_check_interval: default_stale_check_interval(),
}
}
}
fn default_stale_learner_threshold() -> Duration {
Duration::from_secs(300)
}
fn default_stale_check_interval() -> Duration {
Duration::from_secs(30)
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub enum PersistenceStrategy {
DiskFirst,
MemFirst,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub enum FlushPolicy {
Immediate,
Batch { threshold: usize, interval_ms: u64 },
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct PersistenceConfig {
#[serde(default = "default_persistence_strategy")]
pub strategy: PersistenceStrategy,
#[serde(default = "default_flush_policy")]
pub flush_policy: FlushPolicy,
#[serde(default = "default_max_buffered_entries")]
pub max_buffered_entries: usize,
#[serde(default = "default_flush_workers")]
pub flush_workers: usize,
#[serde(default = "default_channel_capacity")]
pub channel_capacity: usize,
}
fn default_persistence_strategy() -> PersistenceStrategy {
PersistenceStrategy::DiskFirst
}
fn default_flush_workers() -> usize {
2
}
fn default_channel_capacity() -> usize {
100
}
fn default_flush_policy() -> FlushPolicy {
FlushPolicy::Batch {
threshold: 1024,
interval_ms: 100,
}
}
fn default_max_buffered_entries() -> usize {
10_000
}
impl Default for PersistenceConfig {
fn default() -> Self {
Self {
strategy: default_persistence_strategy(),
flush_policy: default_flush_policy(),
max_buffered_entries: default_max_buffered_entries(),
flush_workers: default_flush_workers(),
channel_capacity: default_channel_capacity(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackpressureConfig {
#[serde(default = "default_max_pending_writes")]
pub max_pending_writes: usize,
#[serde(default = "default_max_pending_reads")]
pub max_pending_reads: usize,
}
impl Default for BackpressureConfig {
fn default() -> Self {
Self {
max_pending_writes: default_max_pending_writes(),
max_pending_reads: default_max_pending_reads(),
}
}
}
fn default_max_pending_writes() -> usize {
10_000
}
fn default_max_pending_reads() -> usize {
50_000
}
impl BackpressureConfig {
pub fn should_reject_write(
&self,
current_pending: usize,
) -> bool {
self.max_pending_writes > 0 && current_pending >= self.max_pending_writes
}
pub fn should_reject_read(
&self,
current_pending: usize,
) -> bool {
self.max_pending_reads > 0 && current_pending >= self.max_pending_reads
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub enum ReadConsistencyPolicy {
LeaseRead,
#[default]
LinearizableRead,
EventualConsistency,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ReadConsistencyConfig {
#[serde(default)]
pub default_policy: ReadConsistencyPolicy,
#[serde(default = "default_lease_duration_ms")]
pub lease_duration_ms: u64,
#[serde(default = "default_allow_client_override")]
pub allow_client_override: bool,
#[serde(default = "default_state_machine_sync_timeout_ms")]
pub state_machine_sync_timeout_ms: u64,
}
impl Default for ReadConsistencyConfig {
fn default() -> Self {
Self {
default_policy: ReadConsistencyPolicy::default(),
lease_duration_ms: default_lease_duration_ms(),
allow_client_override: default_allow_client_override(),
state_machine_sync_timeout_ms: default_state_machine_sync_timeout_ms(),
}
}
}
fn default_lease_duration_ms() -> u64 {
250
}
fn default_allow_client_override() -> bool {
true
}
fn default_state_machine_sync_timeout_ms() -> u64 {
10 }
impl ReadConsistencyConfig {
fn validate(&self) -> Result<()> {
if self.lease_duration_ms == 0 {
return Err(Error::Config(ConfigError::Message(
"read_consistency.lease_duration_ms must be greater than 0".into(),
)));
}
Ok(())
}
}
impl From<d_engine_proto::client::ReadConsistencyPolicy> for ReadConsistencyPolicy {
fn from(proto_policy: d_engine_proto::client::ReadConsistencyPolicy) -> Self {
match proto_policy {
d_engine_proto::client::ReadConsistencyPolicy::LeaseRead => Self::LeaseRead,
d_engine_proto::client::ReadConsistencyPolicy::LinearizableRead => {
Self::LinearizableRead
}
d_engine_proto::client::ReadConsistencyPolicy::EventualConsistency => {
Self::EventualConsistency
}
}
}
}
impl From<ReadConsistencyPolicy> for d_engine_proto::client::ReadConsistencyPolicy {
fn from(config_policy: ReadConsistencyPolicy) -> Self {
match config_policy {
ReadConsistencyPolicy::LeaseRead => {
d_engine_proto::client::ReadConsistencyPolicy::LeaseRead
}
ReadConsistencyPolicy::LinearizableRead => {
d_engine_proto::client::ReadConsistencyPolicy::LinearizableRead
}
ReadConsistencyPolicy::EventualConsistency => {
d_engine_proto::client::ReadConsistencyPolicy::EventualConsistency
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RpcCompressionConfig {
#[serde(default = "default_replication_compression")]
pub replication_response: bool,
#[serde(default = "default_election_compression")]
pub election_response: bool,
#[serde(default = "default_snapshot_compression")]
pub snapshot_response: bool,
#[serde(default = "default_cluster_compression")]
pub cluster_response: bool,
#[serde(default = "default_client_compression")]
pub client_response: bool,
}
impl Default for RpcCompressionConfig {
fn default() -> Self {
Self {
replication_response: default_replication_compression(),
election_response: default_election_compression(),
snapshot_response: default_snapshot_compression(),
cluster_response: default_cluster_compression(),
client_response: default_client_compression(),
}
}
}
fn default_replication_compression() -> bool {
false
}
fn default_election_compression() -> bool {
true
}
fn default_snapshot_compression() -> bool {
true
}
fn default_cluster_compression() -> bool {
true
}
fn default_client_compression() -> bool {
false
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct WatchConfig {
#[serde(default = "default_event_queue_size")]
pub event_queue_size: usize,
#[serde(default = "default_watcher_buffer_size")]
pub watcher_buffer_size: usize,
#[serde(default = "default_enable_watch_metrics")]
pub enable_metrics: bool,
}
impl Default for WatchConfig {
fn default() -> Self {
Self {
event_queue_size: default_event_queue_size(),
watcher_buffer_size: default_watcher_buffer_size(),
enable_metrics: default_enable_watch_metrics(),
}
}
}
impl WatchConfig {
pub fn validate(&self) -> Result<()> {
if self.event_queue_size == 0 {
return Err(Error::Config(ConfigError::Message(
"watch.event_queue_size must be greater than 0".into(),
)));
}
if self.event_queue_size > 100_000 {
warn!(
"watch.event_queue_size ({}) is very large and may consume significant memory (~{}MB)",
self.event_queue_size,
(self.event_queue_size * 24) / 1_000_000
);
}
if self.watcher_buffer_size == 0 {
return Err(Error::Config(ConfigError::Message(
"watch.watcher_buffer_size must be greater than 0".into(),
)));
}
if self.watcher_buffer_size > 1000 {
warn!(
"watch.watcher_buffer_size ({}) is very large. Each watcher will consume ~{}KB memory",
self.watcher_buffer_size,
(self.watcher_buffer_size * 240) / 1000
);
}
Ok(())
}
}
const fn default_event_queue_size() -> usize {
1000
}
const fn default_watcher_buffer_size() -> usize {
10
}
const fn default_enable_watch_metrics() -> bool {
false
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct MetricsConfig {
#[serde(default = "default_enable_backpressure_metrics")]
pub enable_backpressure: bool,
#[serde(default = "default_enable_batch_metrics")]
pub enable_batch: bool,
#[serde(default = "default_metrics_sample_rate")]
pub sample_rate: u32,
}
impl Default for MetricsConfig {
fn default() -> Self {
Self {
enable_backpressure: default_enable_backpressure_metrics(),
enable_batch: default_enable_batch_metrics(),
sample_rate: default_metrics_sample_rate(),
}
}
}
fn default_enable_backpressure_metrics() -> bool {
false
}
fn default_enable_batch_metrics() -> bool {
false
}
fn default_metrics_sample_rate() -> u32 {
1 }