1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
//! `RemoteSettings`.
//!
//! Knobs for the remoting subsystem. Defaults are conservative and
//! suitable for development; production deployments should tune the
//! frame size, timeouts, and quarantine windows from `atomr-config`.
use std::time::Duration;
use atomr_config::Config;
#[derive(Debug, Clone)]
pub struct RemoteSettings {
/// Wire transport scheme — by default `"akka.tcp"`.
pub protocol: String,
/// Local hostname to advertise. `None` means bind-only.
pub hostname: Option<String>,
/// Local TCP port to bind. `0` lets the OS pick.
pub port: u16,
/// Maximum frame size in bytes (length-prefix + payload).
pub max_frame_size: usize,
/// How often the writer emits a `Heartbeat` PDU when idle.
pub heartbeat_interval: Duration,
/// Heartbeat absence after which an endpoint is considered failing.
pub heartbeat_timeout: Duration,
/// Hard cap on the time we wait for the `Associate` handshake reply.
pub handshake_timeout: Duration,
/// Time window during which a quarantined remote may not reassociate.
pub quarantine_duration: Duration,
/// Initial backoff for endpoint reconnect attempts.
pub backoff_initial: Duration,
/// Cap on reconnect backoff (with jitter).
pub backoff_max: Duration,
/// Reconnect backoff multiplier per attempt.
pub backoff_multiplier: f64,
/// Number of reconnect attempts before giving up.
pub max_reconnect_attempts: u32,
/// Sliding-window size for ack'd delivery.
pub ack_window: u32,
/// `Send` buffer length (per-endpoint, bounded mpsc).
pub send_buffer_len: usize,
/// Default serializer id used for outbound messages whose type does
/// not have a more specific serializer registered.
pub default_serializer_id: u32,
/// Cookie required during handshake. `None` disables cookie auth.
pub require_cookie: Option<String>,
/// Watch heartbeat tick interval (RemoteWatcher).
pub watch_heartbeat_interval: Duration,
/// Watch failure threshold (in missed heartbeats).
pub watch_failure_threshold: u32,
// -- Phase 5.J: phi-accrual failure-detector tuning --
//
// These mirror
// keys. Producing a `FailureDetectorRegistry` from `RemoteSettings`
// honours each knob.
/// φ value above which the peer is considered failed (
/// default: 8.0 for watch, 10.0 for cluster).
pub phi_threshold: f64,
/// Maximum sample size kept in the heart-beat history.
pub phi_max_sample_size: usize,
/// Floor on the inter-arrival std-dev (avoids over-confidence on
/// suspiciously stable links). default: 100ms.
pub phi_min_std_deviation: Duration,
/// Pause window the detector tolerates before suspicion grows.
pub phi_acceptable_heartbeat_pause: Duration,
/// TLS configuration. Default is unconfigured (`!enabled()`).
/// Phase 5.E.
pub tls: crate::tls::TlsConfig,
/// Maximum payload bytes per wire frame. Larger payloads are
/// fragmented via `chunking::Chunker`. Phase 5.F.
pub maximum_payload_size: usize,
/// Bounded send-queue capacity per peer (Phase 5.G). When the
/// queue hits the cap the configured [`SendQueueOverflow`] policy
/// decides whether to drop, fail, or block the caller.
pub send_queue_capacity: usize,
/// What to do when the bounded send queue is full.
pub send_queue_overflow: SendQueueOverflow,
}
/// Overflow policy for the bounded outbound send queue.
/// The equivalent strategies in.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum SendQueueOverflow {
/// Drop the message that triggered the overflow (oldest stays).
DropNew,
/// Drop the oldest message; enqueue the new one.
DropOld,
/// Surface a `RemoteError::SendQueueFull` to the caller.
Fail,
}
impl Default for RemoteSettings {
fn default() -> Self {
Self {
protocol: "akka.tcp".into(),
hostname: None,
port: 0,
max_frame_size: 4 * 1024 * 1024,
heartbeat_interval: Duration::from_millis(1000),
heartbeat_timeout: Duration::from_secs(10),
handshake_timeout: Duration::from_secs(15),
quarantine_duration: Duration::from_secs(60),
backoff_initial: Duration::from_millis(200),
backoff_max: Duration::from_secs(10),
backoff_multiplier: 2.0,
max_reconnect_attempts: 10,
ack_window: 1000,
send_buffer_len: 4096,
default_serializer_id: crate::serialization::BINCODE_SERIALIZER_ID,
require_cookie: None,
watch_heartbeat_interval: Duration::from_secs(1),
watch_failure_threshold: 5,
phi_threshold: 8.0,
phi_max_sample_size: 1000,
phi_min_std_deviation: Duration::from_millis(100),
phi_acceptable_heartbeat_pause: Duration::from_secs(3),
tls: crate::tls::TlsConfig::default(),
maximum_payload_size: 256 * 1024,
send_queue_capacity: 4096,
send_queue_overflow: SendQueueOverflow::Fail,
}
}
}
impl RemoteSettings {
/// Read overrides from the given config. Any missing key falls back to
/// the default value. Layout mirrors.
pub fn from_config(_cfg: &Config) -> Self {
// The atomr-config crate's reader is intentionally minimal at this
// stage. Future versions will pull `akka.remote.*` keys here.
Self::default()
}
pub fn with_hostname(mut self, host: impl Into<String>) -> Self {
self.hostname = Some(host.into());
self
}
pub fn with_port(mut self, port: u16) -> Self {
self.port = port;
self
}
pub fn with_protocol(mut self, p: impl Into<String>) -> Self {
self.protocol = p.into();
self
}
/// Override the phi-accrual threshold (default 8.0).
pub fn with_phi_threshold(mut self, t: f64) -> Self {
self.phi_threshold = t;
self
}
/// Override the phi-accrual sample size (default 1000).
pub fn with_phi_sample_size(mut self, n: usize) -> Self {
self.phi_max_sample_size = n;
self
}
/// Override the std-dev floor (default 100ms).
pub fn with_phi_min_std_deviation(mut self, d: Duration) -> Self {
self.phi_min_std_deviation = d;
self
}
/// Override the acceptable heart-beat pause (default 3s).
pub fn with_phi_acceptable_pause(mut self, d: Duration) -> Self {
self.phi_acceptable_heartbeat_pause = d;
self
}
/// Override the TLS configuration (default: disabled).
pub fn with_tls(mut self, t: crate::tls::TlsConfig) -> Self {
self.tls = t;
self
}
/// Override the chunking threshold (default 256 KiB).
pub fn with_maximum_payload_size(mut self, n: usize) -> Self {
self.maximum_payload_size = n;
self
}
/// Set the bounded send-queue capacity (Phase 5.G).
pub fn with_send_queue_capacity(mut self, n: usize) -> Self {
self.send_queue_capacity = n;
self
}
/// Set the overflow policy for the bounded send queue (Phase 5.G).
pub fn with_send_queue_overflow(mut self, p: SendQueueOverflow) -> Self {
self.send_queue_overflow = p;
self
}
}