Skip to main content

atomr_remote/
settings.rs

1//! `RemoteSettings`. akka.net: `Remote/RemoteSettings.cs`.
2//!
3//! Knobs for the remoting subsystem. Defaults are conservative and
4//! suitable for development; production deployments should tune the
5//! frame size, timeouts, and quarantine windows from `atomr-config`.
6
7use std::time::Duration;
8
9use atomr_config::Config;
10
11#[derive(Debug, Clone)]
12pub struct RemoteSettings {
13    /// Wire transport scheme — by default `"akka.tcp"`.
14    pub protocol: String,
15    /// Local hostname to advertise. `None` means bind-only.
16    pub hostname: Option<String>,
17    /// Local TCP port to bind. `0` lets the OS pick.
18    pub port: u16,
19    /// Maximum frame size in bytes (length-prefix + payload).
20    pub max_frame_size: usize,
21    /// How often the writer emits a `Heartbeat` PDU when idle.
22    pub heartbeat_interval: Duration,
23    /// Heartbeat absence after which an endpoint is considered failing.
24    pub heartbeat_timeout: Duration,
25    /// Hard cap on the time we wait for the `Associate` handshake reply.
26    pub handshake_timeout: Duration,
27    /// Time window during which a quarantined remote may not reassociate.
28    pub quarantine_duration: Duration,
29    /// Initial backoff for endpoint reconnect attempts.
30    pub backoff_initial: Duration,
31    /// Cap on reconnect backoff (with jitter).
32    pub backoff_max: Duration,
33    /// Reconnect backoff multiplier per attempt.
34    pub backoff_multiplier: f64,
35    /// Number of reconnect attempts before giving up.
36    pub max_reconnect_attempts: u32,
37    /// Sliding-window size for ack'd delivery.
38    pub ack_window: u32,
39    /// `Send` buffer length (per-endpoint, bounded mpsc).
40    pub send_buffer_len: usize,
41    /// Default serializer id used for outbound messages whose type does
42    /// not have a more specific serializer registered.
43    pub default_serializer_id: u32,
44    /// Cookie required during handshake. `None` disables cookie auth.
45    pub require_cookie: Option<String>,
46    /// Watch heartbeat tick interval (RemoteWatcher).
47    pub watch_heartbeat_interval: Duration,
48    /// Watch failure threshold (in missed heartbeats).
49    pub watch_failure_threshold: u32,
50
51    // -- Phase 5.J: phi-accrual failure-detector tuning --
52    //
53    // These mirror akka.net's `akka.remote.watch-failure-detector.*`
54    // keys. Producing a `FailureDetectorRegistry` from `RemoteSettings`
55    // honours each knob.
56    /// φ value above which the peer is considered failed (akka.net
57    /// default: 8.0 for watch, 10.0 for cluster).
58    pub phi_threshold: f64,
59    /// Maximum sample size kept in the heart-beat history.
60    pub phi_max_sample_size: usize,
61    /// Floor on the inter-arrival std-dev (avoids over-confidence on
62    /// suspiciously stable links). akka.net default: 100ms.
63    pub phi_min_std_deviation: Duration,
64    /// Pause window the detector tolerates before suspicion grows.
65    pub phi_acceptable_heartbeat_pause: Duration,
66
67    /// TLS configuration. Default is unconfigured (`!enabled()`).
68    /// Phase 5.E.
69    pub tls: crate::tls::TlsConfig,
70
71    /// Maximum payload bytes per wire frame. Larger payloads are
72    /// fragmented via `chunking::Chunker`. Phase 5.F.
73    pub maximum_payload_size: usize,
74
75    /// Bounded send-queue capacity per peer (Phase 5.G). When the
76    /// queue hits the cap the configured [`SendQueueOverflow`] policy
77    /// decides whether to drop, fail, or block the caller.
78    pub send_queue_capacity: usize,
79    /// What to do when the bounded send queue is full.
80    pub send_queue_overflow: SendQueueOverflow,
81}
82
83/// Overflow policy for the bounded outbound send queue. akka.net parity:
84/// the equivalent strategies in `Akka.Remote.SendBufferOverflowStrategy`.
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86#[non_exhaustive]
87pub enum SendQueueOverflow {
88    /// Drop the message that triggered the overflow (oldest stays).
89    DropNew,
90    /// Drop the oldest message; enqueue the new one.
91    DropOld,
92    /// Surface a `RemoteError::SendQueueFull` to the caller.
93    Fail,
94}
95
96impl Default for RemoteSettings {
97    fn default() -> Self {
98        Self {
99            protocol: "akka.tcp".into(),
100            hostname: None,
101            port: 0,
102            max_frame_size: 4 * 1024 * 1024,
103            heartbeat_interval: Duration::from_millis(1000),
104            heartbeat_timeout: Duration::from_secs(10),
105            handshake_timeout: Duration::from_secs(15),
106            quarantine_duration: Duration::from_secs(60),
107            backoff_initial: Duration::from_millis(200),
108            backoff_max: Duration::from_secs(10),
109            backoff_multiplier: 2.0,
110            max_reconnect_attempts: 10,
111            ack_window: 1000,
112            send_buffer_len: 4096,
113            default_serializer_id: crate::serialization::BINCODE_SERIALIZER_ID,
114            require_cookie: None,
115            watch_heartbeat_interval: Duration::from_secs(1),
116            watch_failure_threshold: 5,
117            phi_threshold: 8.0,
118            phi_max_sample_size: 1000,
119            phi_min_std_deviation: Duration::from_millis(100),
120            phi_acceptable_heartbeat_pause: Duration::from_secs(3),
121            tls: crate::tls::TlsConfig::default(),
122            maximum_payload_size: 256 * 1024,
123            send_queue_capacity: 4096,
124            send_queue_overflow: SendQueueOverflow::Fail,
125        }
126    }
127}
128
129impl RemoteSettings {
130    /// Read overrides from the given config. Any missing key falls back to
131    /// the default value. Layout mirrors Akka.NET's `akka.remote.dot-netty.tcp.*`.
132    pub fn from_config(_cfg: &Config) -> Self {
133        // The atomr-config crate's reader is intentionally minimal at this
134        // stage. Future versions will pull `akka.remote.*` keys here.
135        Self::default()
136    }
137
138    pub fn with_hostname(mut self, host: impl Into<String>) -> Self {
139        self.hostname = Some(host.into());
140        self
141    }
142
143    pub fn with_port(mut self, port: u16) -> Self {
144        self.port = port;
145        self
146    }
147
148    pub fn with_protocol(mut self, p: impl Into<String>) -> Self {
149        self.protocol = p.into();
150        self
151    }
152
153    /// Override the phi-accrual threshold (default 8.0).
154    pub fn with_phi_threshold(mut self, t: f64) -> Self {
155        self.phi_threshold = t;
156        self
157    }
158
159    /// Override the phi-accrual sample size (default 1000).
160    pub fn with_phi_sample_size(mut self, n: usize) -> Self {
161        self.phi_max_sample_size = n;
162        self
163    }
164
165    /// Override the std-dev floor (default 100ms).
166    pub fn with_phi_min_std_deviation(mut self, d: Duration) -> Self {
167        self.phi_min_std_deviation = d;
168        self
169    }
170
171    /// Override the acceptable heart-beat pause (default 3s).
172    pub fn with_phi_acceptable_pause(mut self, d: Duration) -> Self {
173        self.phi_acceptable_heartbeat_pause = d;
174        self
175    }
176
177    /// Override the TLS configuration (default: disabled).
178    pub fn with_tls(mut self, t: crate::tls::TlsConfig) -> Self {
179        self.tls = t;
180        self
181    }
182
183    /// Override the chunking threshold (default 256 KiB).
184    pub fn with_maximum_payload_size(mut self, n: usize) -> Self {
185        self.maximum_payload_size = n;
186        self
187    }
188
189    /// Set the bounded send-queue capacity (Phase 5.G).
190    pub fn with_send_queue_capacity(mut self, n: usize) -> Self {
191        self.send_queue_capacity = n;
192        self
193    }
194
195    /// Set the overflow policy for the bounded send queue (Phase 5.G).
196    pub fn with_send_queue_overflow(mut self, p: SendQueueOverflow) -> Self {
197        self.send_queue_overflow = p;
198        self
199    }
200}