d_engine/config/
network.rs

1use config::ConfigError;
2use serde::Deserialize;
3use serde::Serialize;
4
5use crate::Error;
6use crate::Result;
7
8/// Hierarchical network configuration for different Raft connection types
9///
10/// Provides specialized tuning for three distinct communication patterns:
11/// - Control plane: Election/heartbeat (low bandwidth, high priority)
12/// - Data plane: Log replication (balanced throughput/latency)
13/// - Bulk transfer: Snapshotting (high bandwidth, tolerant to latency)
14#[derive(Debug, Serialize, Deserialize, Clone)]
15#[allow(dead_code)]
16pub struct NetworkConfig {
17    /// Configuration for control plane connections (leader election/heartbeats)
18    #[serde(default = "default_control_params")]
19    pub control: ConnectionParams,
20
21    /// Configuration for data plane connections (log replication)
22    #[serde(default = "default_data_params")]
23    pub data: ConnectionParams,
24
25    /// Configuration for bulk transfer connections (snapshot installation)
26    #[serde(default = "default_bulk_params")]
27    pub bulk: ConnectionParams,
28
29    /// Common TCP setting for all connection types
30    #[serde(default = "default_tcp_nodelay")]
31    pub tcp_nodelay: bool,
32
33    /// I/O buffer size in bytes for all connections
34    #[serde(default = "default_buffer_size")]
35    pub buffer_size: usize,
36}
37
38/// Low-level network parameters for a specific connection type
39#[derive(Debug, Serialize, Deserialize, Clone, Default)]
40pub struct ConnectionParams {
41    /// TCP connect timeout in milliseconds
42    #[serde(default = "default_connect_timeout")]
43    pub connect_timeout_in_ms: u64,
44
45    /// gRPC request completion timeout in milliseconds
46    #[serde(default = "default_request_timeout")]
47    pub request_timeout_in_ms: u64,
48
49    /// Max concurrent requests per connection
50    #[serde(default = "default_concurrency_limit")]
51    pub concurrency_limit: usize,
52
53    /// HTTP2 SETTINGS_MAX_CONCURRENT_STREAMS
54    #[serde(default = "default_max_streams")]
55    pub max_concurrent_streams: u32,
56
57    /// TCP keepalive in seconds (None to disable)
58    #[serde(default = "default_tcp_keepalive")]
59    pub tcp_keepalive_in_secs: u64,
60
61    /// HTTP2 keepalive ping interval in seconds
62    #[serde(default = "default_h2_keepalive_interval")]
63    pub http2_keep_alive_interval_in_secs: u64,
64
65    /// HTTP2 keepalive timeout in seconds
66    #[serde(default = "default_h2_keepalive_timeout")]
67    pub http2_keep_alive_timeout_in_secs: u64,
68
69    /// HTTP2 max frame size in bytes
70    #[serde(default = "default_max_frame_size")]
71    pub max_frame_size: u32,
72
73    /// Initial connection-level flow control window in bytes
74    #[serde(default = "default_conn_window_size")]
75    pub connection_window_size: u32,
76
77    /// Initial stream-level flow control window in bytes
78    #[serde(default = "default_stream_window_size")]
79    pub stream_window_size: u32,
80
81    /// Enable HTTP2 adaptive window sizing
82    #[serde(default = "default_adaptive_window")]
83    pub adaptive_window: bool,
84}
85
86impl Default for NetworkConfig {
87    fn default() -> Self {
88        Self {
89            control: default_control_params(),
90            data: default_data_params(),
91            bulk: default_bulk_params(),
92            tcp_nodelay: default_tcp_nodelay(),
93            buffer_size: default_buffer_size(),
94        }
95    }
96}
97
98impl NetworkConfig {
99    /// Validates configuration sanity across all connection types
100    pub fn validate(&self) -> Result<()> {
101        // Validate common parameters
102        if self.buffer_size < 1024 {
103            return Err(Error::Config(ConfigError::Message(format!(
104                "Buffer size {} too small, minimum 1024 bytes",
105                self.buffer_size
106            ))));
107        }
108
109        // Validate per-connection type parameters
110        self.control.validate("control")?;
111        self.data.validate("data")?;
112        self.bulk.validate("bulk")?;
113
114        Ok(())
115    }
116}
117
118impl ConnectionParams {
119    /// Type-specific validation with context for error messages
120    pub(crate) fn validate(
121        &self,
122        conn_type: &str,
123    ) -> Result<()> {
124        // Timeout validation
125        if self.connect_timeout_in_ms == 0 {
126            return Err(Error::Config(ConfigError::Message(format!(
127                "{conn_type} connection timeout must be > 0",
128            ))));
129        }
130
131        if self.request_timeout_in_ms != 0
132            && self.request_timeout_in_ms <= self.connect_timeout_in_ms
133        {
134            return Err(Error::Config(ConfigError::Message(format!(
135                "{} request timeout {}ms must exceed connect timeout {}ms",
136                conn_type, self.request_timeout_in_ms, self.connect_timeout_in_ms
137            ))));
138        }
139
140        // HTTP2 keepalive validation
141        if self.http2_keep_alive_timeout_in_secs >= self.http2_keep_alive_interval_in_secs {
142            return Err(Error::Config(ConfigError::Message(format!(
143                "{} keepalive timeout {}s must be < interval {}s",
144                conn_type,
145                self.http2_keep_alive_timeout_in_secs,
146                self.http2_keep_alive_interval_in_secs
147            ))));
148        }
149
150        // Window size validation when not using adaptive windows
151        if !self.adaptive_window {
152            const MIN_WINDOW: u32 = 65535; // HTTP2 spec minimum
153            if self.stream_window_size < MIN_WINDOW {
154                return Err(Error::Config(ConfigError::Message(format!(
155                    "{} stream window size {} below minimum {}",
156                    conn_type, self.stream_window_size, MIN_WINDOW
157                ))));
158            }
159
160            if self.connection_window_size < self.stream_window_size {
161                return Err(Error::Config(ConfigError::Message(format!(
162                    "{} connection window {} smaller than stream window {}",
163                    conn_type, self.connection_window_size, self.stream_window_size
164                ))));
165            }
166        }
167
168        Ok(())
169    }
170}
171
172// Default configuration profiles for each connection type
173
174fn default_control_params() -> ConnectionParams {
175    ConnectionParams {
176        connect_timeout_in_ms: 20,             // Fast failure for leader elections
177        request_timeout_in_ms: 100,            // Strict heartbeat timing
178        concurrency_limit: 1024,               // Moderate concurrency
179        max_concurrent_streams: 100,           // Stream limit for control operations
180        tcp_keepalive_in_secs: 300,            // 5 minute TCP keepalive
181        http2_keep_alive_interval_in_secs: 30, // Frequent pings
182        http2_keep_alive_timeout_in_secs: 5,   // Short timeout
183        max_frame_size: default_max_frame_size(),
184        connection_window_size: 1_048_576, // 1MB connection window
185        stream_window_size: 262_144,       // 256KB stream window
186        adaptive_window: false,            // Predictable behavior
187    }
188}
189
190fn default_data_params() -> ConnectionParams {
191    ConnectionParams {
192        connect_timeout_in_ms: 50,              // Balance speed and reliability
193        request_timeout_in_ms: 500,             // Accommodate log batches
194        concurrency_limit: 8192,                // High concurrency for parallel logs
195        max_concurrent_streams: 500,            // More streams for pipelining
196        tcp_keepalive_in_secs: 600,             // 10 minute TCP keepalive
197        http2_keep_alive_interval_in_secs: 120, // Moderate ping interval
198        http2_keep_alive_timeout_in_secs: 30,   // Longer grace period
199        max_frame_size: default_max_frame_size(),
200        connection_window_size: 6_291_456, // 6MB connection window
201        stream_window_size: 1_048_576,     // 1MB stream window
202        adaptive_window: true,             // Optimize for varying loads
203    }
204}
205
206fn default_bulk_params() -> ConnectionParams {
207    ConnectionParams {
208        connect_timeout_in_ms: 500000,  // Allow for slow bulk connections
209        request_timeout_in_ms: 5000000, // Disable request timeout
210        concurrency_limit: 4,           // Limit parallel bulk transfers
211        max_concurrent_streams: 2,      // Minimal stream concurrency
212        tcp_keepalive_in_secs: 3600,    // Long-lived connections
213        http2_keep_alive_interval_in_secs: 600, // 10 minute pings
214        http2_keep_alive_timeout_in_secs: 60, // 1 minute timeout
215        max_frame_size: 16_777_215,     // Max allowed frame size
216        connection_window_size: 67_108_864, // 64MB connection window
217        stream_window_size: 16_777_216, // 16MB stream window
218        adaptive_window: false,         // Stable throughput
219    }
220}
221
222// Preserve existing default helpers for fallback
223fn default_connect_timeout() -> u64 {
224    20
225}
226fn default_request_timeout() -> u64 {
227    100
228}
229fn default_concurrency_limit() -> usize {
230    256
231}
232fn default_max_streams() -> u32 {
233    500
234}
235fn default_tcp_nodelay() -> bool {
236    true
237}
238fn default_tcp_keepalive() -> u64 {
239    3600
240}
241fn default_h2_keepalive_interval() -> u64 {
242    300
243}
244fn default_h2_keepalive_timeout() -> u64 {
245    20
246}
247fn default_max_frame_size() -> u32 {
248    16_777_215
249}
250fn default_conn_window_size() -> u32 {
251    20_971_520 // 20MB
252}
253fn default_stream_window_size() -> u32 {
254    10_485_760 // 10MB
255}
256fn default_buffer_size() -> usize {
257    65_536
258}
259fn default_adaptive_window() -> bool {
260    false
261}