Skip to main content

monocoque_core/
options.rs

1//! Socket configuration options
2//!
3//! This module provides configuration options for `ZeroMQ` sockets, similar to
4//! libzmq's socket options (`zmq_setsockopt/zmq_getsockopt`).
5
6use std::time::Duration;
7
8/// Socket configuration options.
9///
10/// These options control socket behavior including timeouts, buffer sizes,
11/// and reliability features. This struct consolidates all socket configuration
12/// in one place, following the `MongoDB` Rust driver pattern.
13///
14/// # Examples
15///
16/// ```
17/// use monocoque_core::options::SocketOptions;
18/// use std::time::Duration;
19///
20/// // Simple case: use defaults
21/// let opts = SocketOptions::default();
22///
23/// // Customize timeouts and buffers
24/// let opts = SocketOptions::default()
25///     .with_recv_timeout(Duration::from_secs(5))
26///     .with_send_timeout(Duration::from_secs(5))
27///     .with_buffer_sizes(16384, 16384);  // 16KB buffers for high-throughput
28/// ```
29#[derive(Debug, Clone)]
30pub struct SocketOptions {
31    /// Read buffer size (bytes)
32    ///
33    /// Size of arena-allocated buffer for receiving data.
34    /// - Default: 8192 (8KB) - balanced for most workloads
35    /// - Small (4KB): Low-latency with small messages (< 1KB)
36    /// - Large (16KB): High-throughput with large messages (> 8KB)
37    pub read_buffer_size: usize,
38
39    /// Write buffer size (bytes)
40    ///
41    /// Initial capacity of `BytesMut` buffer for sending data.
42    /// - Default: 8192 (8KB) - balanced for most workloads
43    /// - Small (4KB): Low-latency with small messages
44    /// - Large (16KB): High-throughput with large messages
45    pub write_buffer_size: usize,
46
47    /// Receive timeout (`ZMQ_RCVTIMEO`)
48    ///
49    /// Maximum time to wait for a receive operation.
50    /// - `None`: Block indefinitely (default)
51    /// - `Some(Duration::ZERO)`: Non-blocking (return immediately with EAGAIN)
52    /// - `Some(duration)`: Wait up to duration before returning EAGAIN
53    pub recv_timeout: Option<Duration>,
54
55    /// Send timeout (`ZMQ_SNDTIMEO`)
56    ///
57    /// Maximum time to wait for a send operation.
58    /// - `None`: Block indefinitely (default)
59    /// - `Some(Duration::ZERO)`: Non-blocking (return immediately with EAGAIN)
60    /// - `Some(duration)`: Wait up to duration before returning EAGAIN
61    pub send_timeout: Option<Duration>,
62
63    /// Handshake timeout (`ZMQ_HANDSHAKE_IVL`)
64    ///
65    /// Maximum time to complete ZMTP handshake after connection.
66    /// - Default: 30 seconds
67    /// - Set to `Duration::ZERO` to disable timeout
68    pub handshake_timeout: Duration,
69
70    /// Linger timeout (`ZMQ_LINGER`)
71    ///
72    /// Time to wait for pending messages to be sent before closing socket.
73    /// - `None`: Close immediately, discard pending messages
74    /// - `Some(Duration::ZERO)`: Same as None
75    /// - `Some(duration)`: Wait up to duration for messages to be sent
76    pub linger: Option<Duration>,
77
78    /// Reconnect interval (`ZMQ_RECONNECT_IVL`)
79    ///
80    /// Initial reconnection delay after connection loss.
81    /// - Default: 100ms
82    /// - Use with `reconnect_ivl_max` for exponential backoff
83    pub reconnect_ivl: Duration,
84
85    /// Maximum reconnect interval (`ZMQ_RECONNECT_IVL_MAX`)
86    ///
87    /// Maximum reconnection delay for exponential backoff.
88    /// - Default: 0 (no maximum, use `reconnect_ivl` always)
89    /// - When > 0: Doubles `reconnect_ivl` up to this value
90    pub reconnect_ivl_max: Duration,
91
92    /// Connection timeout (`ZMQ_CONNECT_TIMEOUT`)
93    ///
94    /// Maximum time to wait for TCP connection to complete.
95    /// - Default: 0 (use OS default)
96    pub connect_timeout: Duration,
97
98    /// High water mark for receiving (`ZMQ_RCVHWM`)
99    ///
100    /// Maximum number of messages to queue for receiving.
101    /// When reached, socket will block or drop messages depending on socket type.
102    /// - Default: 1000 messages
103    pub recv_hwm: usize,
104
105    /// High water mark for sending (`ZMQ_SNDHWM`)
106    ///
107    /// Maximum number of messages to queue for sending.
108    /// When reached, socket will block or drop messages depending on socket type.
109    /// - Default: 1000 messages
110    pub send_hwm: usize,
111
112    /// Enable immediate connect mode (`ZMQ_IMMEDIATE`)
113    ///
114    /// - `false` (default): Queue messages while connecting
115    /// - `true`: Report error if no connection established
116    pub immediate: bool,
117
118    /// Maximum message size (`ZMQ_MAXMSGSIZE`)
119    ///
120    /// Maximum size of a single message in bytes.
121    /// - `None`: No limit (default)
122    /// - `Some(size)`: Reject messages larger than size
123    pub max_msg_size: Option<usize>,
124
125    /// Socket identity / routing ID (`ZMQ_ROUTING_ID` / `ZMQ_IDENTITY`)
126    ///
127    /// Identity for ROUTER addressing. If None, a random UUID is generated.
128    /// - Default: None (auto-generate)
129    /// - Custom: Set for stable identity across reconnections
130    pub routing_id: Option<bytes::Bytes>,
131
132    /// Connect routing ID (`ZMQ_CONNECT_ROUTING_ID`)
133    ///
134    /// Identity to assign to the next outgoing connection.
135    /// Used by ROUTER sockets to assign a specific identity to a peer.
136    /// - Default: None (auto-generate)
137    /// - Custom: Assign explicit identity to next connection
138    /// - Consumed after each connect operation
139    pub connect_routing_id: Option<bytes::Bytes>,
140
141    /// ROUTER mandatory mode (`ZMQ_ROUTER_MANDATORY`)
142    ///
143    /// - `false` (default): Silently drop messages to unknown peers
144    /// - `true`: Return error when sending to unknown peer
145    pub router_mandatory: bool,
146
147    /// ROUTER handover mode (`ZMQ_ROUTER_HANDOVER`)
148    ///
149    /// - `false` (default): Disconnect old peer when new peer with same identity connects
150    /// - `true`: Hand over pending messages to new peer with same identity
151    pub router_handover: bool,
152
153    /// Probe ROUTER on connect (`ZMQ_PROBE_ROUTER`)
154    ///
155    /// - `false` (default): Normal operation
156    /// - `true`: Send empty message on connect to probe ROUTER identity
157    pub probe_router: bool,
158
159    /// XPUB verbose mode (`ZMQ_XPUB_VERBOSE`)
160    ///
161    /// - `false` (default): Only report new subscriptions
162    /// - `true`: Report all subscription messages (including duplicates)
163    pub xpub_verbose: bool,
164
165    /// XPUB manual mode (`ZMQ_XPUB_MANUAL`)
166    ///
167    /// - `false` (default): Automatic subscription management
168    /// - `true`: Manual subscription control via `send()`
169    pub xpub_manual: bool,
170
171    /// XPUB welcome message (`ZMQ_XPUB_WELCOME_MSG`)
172    ///
173    /// Message to send to new subscribers on connection.
174    /// Useful for last value cache (LVC) patterns.
175    pub xpub_welcome_msg: Option<bytes::Bytes>,
176
177    /// XSUB verbose unsubscribe (`ZMQ_XSUB_VERBOSE_UNSUBSCRIBE`)
178    ///
179    /// - `false` (default): Don't send explicit unsubscribe messages
180    /// - `true`: Send unsubscribe messages upstream
181    pub xsub_verbose_unsubs: bool,
182
183    /// Conflate messages (`ZMQ_CONFLATE`)
184    ///
185    /// - `false` (default): Queue all messages
186    /// - `true`: Keep only last message (overwrite queue)
187    pub conflate: bool,
188
189    /// TCP keepalive (`ZMQ_TCP_KEEPALIVE`)
190    ///
191    /// - `-1` (default): Use OS default
192    /// - `0`: Disable TCP keepalive
193    /// - `1`: Enable TCP keepalive
194    pub tcp_keepalive: i32,
195
196    /// TCP keepalive count (`ZMQ_TCP_KEEPALIVE_CNT`)
197    ///
198    /// Number of keepalive probes before considering connection dead.
199    /// - `-1` (default): Use OS default
200    /// - `> 0`: Number of probes
201    pub tcp_keepalive_cnt: i32,
202
203    /// TCP keepalive idle (`ZMQ_TCP_KEEPALIVE_IDLE`)
204    ///
205    /// Time in seconds before starting keepalive probes.
206    /// - `-1` (default): Use OS default
207    /// - `> 0`: Idle time in seconds
208    pub tcp_keepalive_idle: i32,
209
210    /// TCP keepalive interval (`ZMQ_TCP_KEEPALIVE_INTVL`)
211    ///
212    /// Time in seconds between keepalive probes.
213    /// - `-1` (default): Use OS default
214    /// - `> 0`: Interval in seconds
215    pub tcp_keepalive_intvl: i32,
216
217    /// REQ correlate mode (`ZMQ_REQ_CORRELATE`)
218    ///
219    /// Match replies to requests using message envelope.
220    /// - `false` (default): Accept any reply
221    /// - `true`: Match reply envelope to request
222    pub req_correlate: bool,
223
224    /// REQ relaxed mode (`ZMQ_REQ_RELAXED`)
225    ///
226    /// Allow multiple outstanding requests without strict alternation.
227    /// - `false` (default): Strict send-recv-send-recv pattern
228    /// - `true`: Allow send-send-recv-recv pattern
229    pub req_relaxed: bool,
230
231    /// Multicast rate in kilobits per second (`ZMQ_RATE`)
232    ///
233    /// Maximum send or receive data rate for multicast transports (PGM/EPGM).
234    /// - Default: 100 kbps
235    pub rate: i32,
236
237    /// Multicast recovery interval (`ZMQ_RECOVERY_IVL`)
238    ///
239    /// Maximum time to recover lost messages on multicast transports.
240    /// - Default: 10 seconds
241    pub recovery_ivl: Duration,
242
243    /// OS-level send buffer size (`ZMQ_SNDBUF`)
244    ///
245    /// Size of kernel send buffer. 0 = OS default.
246    /// - Default: 0 (use OS default)
247    pub sndbuf: i32,
248
249    /// OS-level receive buffer size (`ZMQ_RCVBUF`)
250    ///
251    /// Size of kernel receive buffer. 0 = OS default.
252    /// - Default: 0 (use OS default)
253    pub rcvbuf: i32,
254
255    /// Multicast TTL (`ZMQ_MULTICAST_HOPS`)
256    ///
257    /// Time-to-live for multicast packets.
258    /// - Default: 1 (local network only)
259    pub multicast_hops: i32,
260
261    /// IP Type of Service (`ZMQ_TOS`)
262    ///
263    /// Sets the `ToS` field in IP headers for `QoS`.
264    /// - Default: 0 (normal service)
265    pub tos: i32,
266
267    /// Maximum multicast transmission unit (`ZMQ_MULTICAST_MAXTPDU`)
268    ///
269    /// Maximum transport data unit for multicast.
270    /// - Default: 1500 bytes
271    pub multicast_maxtpdu: i32,
272
273    /// IPv6 support (`ZMQ_IPV6`)
274    ///
275    /// Enable IPv6 on socket.
276    /// - `false` (default): IPv4 only
277    /// - `true`: IPv6 support enabled
278    pub ipv6: bool,
279
280    /// Bind to device (`ZMQ_BINDTODEVICE`)
281    ///
282    /// Bind socket to specific network interface (Linux only).
283    /// - Default: None (bind to all interfaces)
284    pub bind_to_device: Option<String>,
285
286    // --- Security Options ---
287    /// PLAIN server mode (`ZMQ_PLAIN_SERVER`)
288    ///
289    /// Enable PLAIN authentication as server.
290    /// - `false` (default): Client mode
291    /// - `true`: Server mode (validate credentials)
292    pub plain_server: bool,
293
294    /// PLAIN username (`ZMQ_PLAIN_USERNAME`)
295    ///
296    /// Username for PLAIN authentication (client side).
297    /// - Default: None (no authentication)
298    pub plain_username: Option<String>,
299
300    /// PLAIN password (`ZMQ_PLAIN_PASSWORD`)
301    ///
302    /// Password for PLAIN authentication (client side).
303    /// - Default: None (no authentication)
304    pub plain_password: Option<String>,
305
306    /// CURVE server mode (`ZMQ_CURVE_SERVER`)
307    ///
308    /// Enable CURVE encryption as server.
309    /// - `false` (default): Client mode
310    /// - `true`: Server mode (provide server key)
311    pub curve_server: bool,
312
313    /// CURVE public key (`ZMQ_CURVE_PUBLICKEY`)
314    ///
315    /// Local public key for CURVE (32 bytes).
316    /// - Default: None (no encryption)
317    pub curve_publickey: Option<[u8; 32]>,
318
319    /// CURVE secret key (`ZMQ_CURVE_SECRETKEY`)
320    ///
321    /// Local secret key for CURVE (32 bytes).
322    /// - Default: None (no encryption)
323    pub curve_secretkey: Option<[u8; 32]>,
324
325    /// CURVE server key (`ZMQ_CURVE_SERVERKEY`)
326    ///
327    /// Server's public key for CURVE client (32 bytes).
328    /// - Default: None (no encryption)
329    /// - Client must set this to verify server identity
330    pub curve_serverkey: Option<[u8; 32]>,
331
332    /// ZAP domain (`ZMQ_ZAP_DOMAIN`)
333    ///
334    /// Security domain for ZAP authentication.
335    /// - Default: "" (global domain)
336    pub zap_domain: String,
337
338    /// Subscriptions (`ZMQ_SUBSCRIBE`)
339    ///
340    /// Subscription filters for SUB/XSUB sockets.
341    /// - Empty vec: No subscriptions (default) - won't receive any messages
342    /// - vec![b""] or vec![`Bytes::new()`]: Subscribe to all messages
343    /// - vec![b"topic1", b"topic2"]: Subscribe to specific topics
344    ///
345    /// Note: SUB sockets MUST subscribe to at least one topic to receive messages.
346    pub subscriptions: Vec<bytes::Bytes>,
347
348    /// Unsubscriptions (`ZMQ_UNSUBSCRIBE`)
349    ///
350    /// Subscription filters to remove for SUB/XSUB sockets.
351    /// Applied after subscriptions during socket configuration.
352    pub unsubscriptions: Vec<bytes::Bytes>,
353
354    /// Maximum reconnection attempts (`ZMQ_RECONNECT_STOP`)
355    ///
356    /// Maximum number of times to attempt reconnection after a disconnect.
357    /// - `None`: Retry indefinitely (default, matches libzmq behaviour)
358    /// - `Some(n)`: Give up and return `NotConnected` after n attempts
359    pub max_reconnect_attempts: Option<u32>,
360
361    /// ZMTP heartbeat interval (`ZMQ_HEARTBEAT_IVL` = 75)
362    ///
363    /// How often to send PING heartbeat commands on an otherwise idle connection.
364    /// - `None`: Disabled (default)
365    /// - `Some(dur)`: Send PING every `dur` of inactivity
366    pub heartbeat_ivl: Option<Duration>,
367
368    /// ZMTP heartbeat TTL (`ZMQ_HEARTBEAT_TTL` = 76)
369    ///
370    /// Time-to-live for the remote peer's heartbeat (sent in PING command).
371    /// The remote will disconnect if it doesn't receive a heartbeat within this interval.
372    /// - `None`: Use `heartbeat_ivl` (default)
373    /// - `Some(dur)`: Override TTL sent to peer
374    pub heartbeat_ttl: Option<Duration>,
375
376    /// ZMTP heartbeat timeout (`ZMQ_HEARTBEAT_TIMEOUT` = 77)
377    ///
378    /// How long to wait for a PONG reply before considering the connection dead.
379    /// - `None`: Use `heartbeat_ivl` (default)
380    /// - `Some(dur)`: Custom timeout (recommended: 2–5× heartbeat_ivl)
381    pub heartbeat_timeout: Option<Duration>,
382
383    /// ROUTER raw mode (`ZMQ_ROUTER_RAW` = 41)
384    ///
385    /// Put ROUTER socket into raw mode (no ZMTP handshake, acts like STREAM).
386    /// - `false` (default): Normal ZMTP routing
387    /// - `true`: Raw TCP bridging mode
388    pub router_raw: bool,
389
390    /// STREAM connect/disconnect notifications (`ZMQ_STREAM_NOTIFY` = 73)
391    ///
392    /// Send empty notification frames on connect and disconnect.
393    /// - `true` (default): Send notification frames
394    /// - `false`: Suppress notification frames
395    pub stream_notify: bool,
396
397    /// XPUB no-drop mode (`ZMQ_XPUB_NODROP` = 69)
398    ///
399    /// - `false` (default): Drop messages silently when HWM is reached
400    /// - `true`: Return error (`EAGAIN`) instead of dropping
401    pub xpub_nodrop: bool,
402
403    /// Invert topic matching (`ZMQ_INVERT_MATCHING` = 74)
404    ///
405    /// Invert the subscription filter logic for PUB/SUB and XPUB/XSUB.
406    /// - `false` (default): Deliver messages matching subscriptions
407    /// - `true`: Deliver messages NOT matching any subscription
408    pub invert_matching: bool,
409}
410
411impl Default for SocketOptions {
412    fn default() -> Self {
413        Self {
414            recv_timeout: None, // Block indefinitely
415            send_timeout: None, // Block indefinitely
416            handshake_timeout: Duration::from_secs(30),
417            linger: Some(Duration::from_secs(30)), // Wait 30s for pending messages
418            reconnect_ivl: Duration::from_millis(100),
419            reconnect_ivl_max: Duration::ZERO, // No maximum
420            connect_timeout: Duration::ZERO,   // Use OS default
421            recv_hwm: 1000,
422            send_hwm: 1000,
423            immediate: false,
424            max_msg_size: None,      // No limit
425            read_buffer_size: 8192,  // 8KB - balanced default
426            write_buffer_size: 8192, // 8KB - balanced default
427            routing_id: None,
428            connect_routing_id: None,
429            router_mandatory: false,
430            router_handover: false,
431            probe_router: false,
432            xpub_verbose: false,
433            xpub_manual: false,
434            xpub_welcome_msg: None,
435            xsub_verbose_unsubs: false,
436            conflate: false,
437            tcp_keepalive: -1,       // OS default
438            tcp_keepalive_cnt: -1,   // OS default
439            tcp_keepalive_idle: -1,  // OS default
440            tcp_keepalive_intvl: -1, // OS default
441            req_correlate: false,
442            req_relaxed: false,
443            rate: 100, // 100 kbps
444            recovery_ivl: Duration::from_secs(10),
445            sndbuf: 0,               // OS default
446            rcvbuf: 0,               // OS default
447            multicast_hops: 1,       // Local network only
448            tos: 0,                  // Normal service
449            multicast_maxtpdu: 1500, // Standard MTU
450            ipv6: false,             // IPv4 only
451            bind_to_device: None,    // All interfaces
452            // Security
453            plain_server: false,
454            plain_username: None,
455            plain_password: None,
456            curve_server: false,
457            curve_publickey: None,
458            curve_secretkey: None,
459            curve_serverkey: None,
460            zap_domain: String::new(),    // Global domain
461            subscriptions: Vec::new(),    // No subscriptions
462            unsubscriptions: Vec::new(),  // No unsubscriptions
463            max_reconnect_attempts: None, // Retry indefinitely
464            heartbeat_ivl: None,
465            heartbeat_ttl: None,
466            heartbeat_timeout: None,
467            router_raw: false,
468            stream_notify: true,
469            xpub_nodrop: false,
470            invert_matching: false,
471        }
472    }
473}
474
475impl SocketOptions {
476    /// Create new socket options with default values (8KB buffers).
477    #[must_use]
478    pub fn new() -> Self {
479        Self::default()
480    }
481
482    /// Create socket options optimized for small messages (< 1KB).
483    ///
484    /// Sets 4KB buffers, suitable for low-latency request-reply patterns.
485    ///
486    /// # Examples
487    ///
488    /// ```
489    /// use monocoque_core::options::SocketOptions;
490    ///
491    /// let opts = SocketOptions::small();  // 4KB buffers for REQ/REP
492    /// ```
493    #[must_use]
494    pub fn small() -> Self {
495        Self {
496            read_buffer_size: 4096,
497            write_buffer_size: 4096,
498            ..Self::default()
499        }
500    }
501
502    /// Create socket options optimized for large messages (> 8KB).
503    ///
504    /// Sets 16KB buffers, suitable for high-throughput async patterns.
505    ///
506    /// # Examples
507    ///
508    /// ```
509    /// use monocoque_core::options::SocketOptions;
510    ///
511    /// let opts = SocketOptions::large();  // 16KB buffers for DEALER/ROUTER
512    /// ```
513    #[must_use]
514    pub fn large() -> Self {
515        Self {
516            read_buffer_size: 16384,
517            write_buffer_size: 16384,
518            ..Self::default()
519        }
520    }
521
522    /// Set receive timeout.
523    ///
524    /// # Examples
525    ///
526    /// ```
527    /// use monocoque_core::options::SocketOptions;
528    /// use std::time::Duration;
529    ///
530    /// // Non-blocking receive
531    /// let opts = SocketOptions::new().with_recv_timeout(Duration::ZERO);
532    ///
533    /// // 5 second timeout
534    /// let opts = SocketOptions::new().with_recv_timeout(Duration::from_secs(5));
535    /// ```
536    pub const fn with_recv_timeout(mut self, timeout: Duration) -> Self {
537        self.recv_timeout = Some(timeout);
538        self
539    }
540
541    /// Set send timeout.
542    pub const fn with_send_timeout(mut self, timeout: Duration) -> Self {
543        self.send_timeout = Some(timeout);
544        self
545    }
546
547    /// Set handshake timeout.
548    pub const fn with_handshake_timeout(mut self, timeout: Duration) -> Self {
549        self.handshake_timeout = timeout;
550        self
551    }
552
553    /// Set linger timeout.
554    pub const fn with_linger(mut self, linger: Option<Duration>) -> Self {
555        self.linger = linger;
556        self
557    }
558
559    /// Set reconnection interval.
560    pub const fn with_reconnect_ivl(mut self, ivl: Duration) -> Self {
561        self.reconnect_ivl = ivl;
562        self
563    }
564
565    /// Set maximum reconnection interval for exponential backoff.
566    pub const fn with_reconnect_ivl_max(mut self, max: Duration) -> Self {
567        self.reconnect_ivl_max = max;
568        self
569    }
570
571    /// Set maximum number of reconnection attempts.
572    ///
573    /// `None` retries indefinitely (default); `Some(n)` gives up after n attempts.
574    pub const fn with_max_reconnect_attempts(mut self, max: Option<u32>) -> Self {
575        self.max_reconnect_attempts = max;
576        self
577    }
578
579    /// Set connection timeout.
580    pub const fn with_connect_timeout(mut self, timeout: Duration) -> Self {
581        self.connect_timeout = timeout;
582        self
583    }
584
585    /// Set heartbeat interval (`ZMQ_HEARTBEAT_IVL`).
586    pub const fn with_heartbeat_ivl(mut self, ivl: Duration) -> Self {
587        self.heartbeat_ivl = Some(ivl);
588        self
589    }
590
591    /// Set heartbeat TTL (`ZMQ_HEARTBEAT_TTL`).
592    pub const fn with_heartbeat_ttl(mut self, ttl: Duration) -> Self {
593        self.heartbeat_ttl = Some(ttl);
594        self
595    }
596
597    /// Set heartbeat timeout (`ZMQ_HEARTBEAT_TIMEOUT`).
598    pub const fn with_heartbeat_timeout(mut self, timeout: Duration) -> Self {
599        self.heartbeat_timeout = Some(timeout);
600        self
601    }
602
603    /// Enable or disable ROUTER raw mode (`ZMQ_ROUTER_RAW`).
604    pub const fn with_router_raw(mut self, raw: bool) -> Self {
605        self.router_raw = raw;
606        self
607    }
608
609    /// Enable or disable STREAM connect/disconnect notifications (`ZMQ_STREAM_NOTIFY`).
610    pub const fn with_stream_notify(mut self, notify: bool) -> Self {
611        self.stream_notify = notify;
612        self
613    }
614
615    /// Enable XPUB no-drop mode (`ZMQ_XPUB_NODROP`).
616    pub const fn with_xpub_nodrop(mut self, nodrop: bool) -> Self {
617        self.xpub_nodrop = nodrop;
618        self
619    }
620
621    /// Enable inverted topic matching (`ZMQ_INVERT_MATCHING`).
622    pub const fn with_invert_matching(mut self, invert: bool) -> Self {
623        self.invert_matching = invert;
624        self
625    }
626
627    /// Set receive high water mark.
628    pub const fn with_recv_hwm(mut self, hwm: usize) -> Self {
629        self.recv_hwm = hwm;
630        self
631    }
632
633    /// Set send high water mark.
634    pub const fn with_send_hwm(mut self, hwm: usize) -> Self {
635        self.send_hwm = hwm;
636        self
637    }
638
639    /// Enable or disable immediate mode.
640    pub const fn with_immediate(mut self, immediate: bool) -> Self {
641        self.immediate = immediate;
642        self
643    }
644
645    /// Set maximum message size.
646    pub const fn with_max_msg_size(mut self, size: Option<usize>) -> Self {
647        self.max_msg_size = size;
648        self
649    }
650
651    /// Set read buffer size.
652    ///
653    /// # Examples
654    ///
655    /// ```
656    /// use monocoque_core::options::SocketOptions;
657    ///
658    /// // Small buffers for low latency
659    /// let opts = SocketOptions::new().with_read_buffer_size(4096);
660    ///
661    /// // Large buffers for throughput
662    /// let opts = SocketOptions::new().with_read_buffer_size(16384);
663    /// ```
664    pub const fn with_read_buffer_size(mut self, size: usize) -> Self {
665        self.read_buffer_size = size;
666        self
667    }
668
669    /// Set write buffer size.
670    pub const fn with_write_buffer_size(mut self, size: usize) -> Self {
671        self.write_buffer_size = size;
672        self
673    }
674
675    /// Set both read and write buffer sizes (convenience method).
676    ///
677    /// # Examples
678    ///
679    /// ```
680    /// use monocoque_core::options::SocketOptions;
681    ///
682    /// // Small buffers for both
683    /// let opts = SocketOptions::new().with_buffer_sizes(4096, 4096);
684    /// ```
685    pub const fn with_buffer_sizes(mut self, read_size: usize, write_size: usize) -> Self {
686        self.read_buffer_size = read_size;
687        self.write_buffer_size = write_size;
688        self
689    }
690
691    /// Set socket routing ID / identity.
692    ///
693    /// # Examples
694    ///
695    /// ```
696    /// use monocoque_core::options::SocketOptions;
697    /// use bytes::Bytes;
698    ///
699    /// let opts = SocketOptions::new()
700    ///     .with_routing_id(Bytes::from_static(b"worker-01"));
701    /// ```
702    pub fn with_routing_id(mut self, id: bytes::Bytes) -> Self {
703        self.routing_id = Some(id);
704        self
705    }
706
707    /// Set connect routing ID for the next connection.
708    ///
709    /// This option is consumed after each connect operation and must be set
710    /// again for subsequent connections.
711    ///
712    /// # Examples
713    ///
714    /// ```
715    /// use monocoque_core::options::SocketOptions;
716    /// use bytes::Bytes;
717    ///
718    /// let opts = SocketOptions::new()
719    ///     .with_connect_routing_id(Bytes::from_static(b"client-001"));
720    /// ```
721    pub fn with_connect_routing_id(mut self, id: bytes::Bytes) -> Self {
722        self.connect_routing_id = Some(id);
723        self
724    }
725
726    /// Enable ROUTER mandatory mode.
727    pub const fn with_router_mandatory(mut self, enabled: bool) -> Self {
728        self.router_mandatory = enabled;
729        self
730    }
731
732    /// Enable ROUTER handover mode.
733    pub const fn with_router_handover(mut self, enabled: bool) -> Self {
734        self.router_handover = enabled;
735        self
736    }
737
738    /// Enable ROUTER probe on connect.
739    pub const fn with_probe_router(mut self, enabled: bool) -> Self {
740        self.probe_router = enabled;
741        self
742    }
743
744    /// Enable XPUB verbose mode.
745    pub const fn with_xpub_verbose(mut self, enabled: bool) -> Self {
746        self.xpub_verbose = enabled;
747        self
748    }
749
750    /// Enable XPUB manual mode.
751    pub const fn with_xpub_manual(mut self, enabled: bool) -> Self {
752        self.xpub_manual = enabled;
753        self
754    }
755
756    /// Set XPUB welcome message.
757    pub fn with_xpub_welcome_msg(mut self, msg: bytes::Bytes) -> Self {
758        self.xpub_welcome_msg = Some(msg);
759        self
760    }
761
762    /// Enable XSUB verbose unsubscribe.
763    pub const fn with_xsub_verbose_unsubs(mut self, enabled: bool) -> Self {
764        self.xsub_verbose_unsubs = enabled;
765        self
766    }
767
768    /// Enable message conflation (keep only last message).
769    pub const fn with_conflate(mut self, enabled: bool) -> Self {
770        self.conflate = enabled;
771        self
772    }
773
774    /// Set TCP keepalive mode.
775    ///
776    /// # Arguments
777    ///
778    /// * `mode` - `-1` for OS default, `0` to disable, `1` to enable
779    pub const fn with_tcp_keepalive(mut self, mode: i32) -> Self {
780        self.tcp_keepalive = mode;
781        self
782    }
783
784    /// Set TCP keepalive count (number of probes before timeout).
785    ///
786    /// # Arguments
787    ///
788    /// * `count` - `-1` for OS default, `> 0` for specific count
789    pub const fn with_tcp_keepalive_cnt(mut self, count: i32) -> Self {
790        self.tcp_keepalive_cnt = count;
791        self
792    }
793
794    /// Set TCP keepalive idle time (seconds before first probe).
795    ///
796    /// # Arguments
797    ///
798    /// * `seconds` - `-1` for OS default, `> 0` for specific idle time
799    pub const fn with_tcp_keepalive_idle(mut self, seconds: i32) -> Self {
800        self.tcp_keepalive_idle = seconds;
801        self
802    }
803
804    /// Set TCP keepalive interval (seconds between probes).
805    ///
806    /// # Arguments
807    ///
808    /// * `seconds` - `-1` for OS default, `> 0` for specific interval
809    pub const fn with_tcp_keepalive_intvl(mut self, seconds: i32) -> Self {
810        self.tcp_keepalive_intvl = seconds;
811        self
812    }
813
814    /// Enable REQ correlation mode (match replies to requests).
815    pub const fn with_req_correlate(mut self, enabled: bool) -> Self {
816        self.req_correlate = enabled;
817        self
818    }
819
820    /// Enable REQ relaxed mode (allow multiple outstanding requests).
821    pub const fn with_req_relaxed(mut self, enabled: bool) -> Self {
822        self.req_relaxed = enabled;
823        self
824    }
825
826    /// Set multicast rate (`ZMQ_RATE`).
827    pub const fn with_rate(mut self, rate: i32) -> Self {
828        self.rate = rate;
829        self
830    }
831
832    /// Set multicast recovery interval (`ZMQ_RECOVERY_IVL`).
833    pub const fn with_recovery_ivl(mut self, interval: Duration) -> Self {
834        self.recovery_ivl = interval;
835        self
836    }
837
838    /// Set OS send buffer size (`ZMQ_SNDBUF`).
839    pub const fn with_sndbuf(mut self, size: i32) -> Self {
840        self.sndbuf = size;
841        self
842    }
843
844    /// Set OS receive buffer size (`ZMQ_RCVBUF`).
845    pub const fn with_rcvbuf(mut self, size: i32) -> Self {
846        self.rcvbuf = size;
847        self
848    }
849
850    /// Set multicast TTL/hops (`ZMQ_MULTICAST_HOPS`).
851    pub const fn with_multicast_hops(mut self, hops: i32) -> Self {
852        self.multicast_hops = hops;
853        self
854    }
855
856    /// Set IP Type of Service (`ZMQ_TOS`).
857    pub const fn with_tos(mut self, tos: i32) -> Self {
858        self.tos = tos;
859        self
860    }
861
862    /// Set multicast maximum TPU (`ZMQ_MULTICAST_MAXTPDU`).
863    pub const fn with_multicast_maxtpdu(mut self, mtu: i32) -> Self {
864        self.multicast_maxtpdu = mtu;
865        self
866    }
867
868    /// Enable IPv6 support (`ZMQ_IPV6`).
869    pub const fn with_ipv6(mut self, enabled: bool) -> Self {
870        self.ipv6 = enabled;
871        self
872    }
873
874    /// Bind to specific device (`ZMQ_BINDTODEVICE`) - Linux only.
875    pub fn with_bind_to_device(mut self, device: impl Into<String>) -> Self {
876        self.bind_to_device = Some(device.into());
877        self
878    }
879
880    // --- Security Options ---
881
882    /// Enable PLAIN server mode.
883    ///
884    /// # Examples
885    ///
886    /// ```
887    /// use monocoque_core::options::SocketOptions;
888    ///
889    /// let opts = SocketOptions::new().with_plain_server(true);
890    /// ```
891    pub const fn with_plain_server(mut self, enabled: bool) -> Self {
892        self.plain_server = enabled;
893        self
894    }
895
896    /// Set PLAIN client credentials.
897    ///
898    /// # Examples
899    ///
900    /// ```
901    /// use monocoque_core::options::SocketOptions;
902    ///
903    /// let opts = SocketOptions::new()
904    ///     .with_plain_credentials("admin", "secret123");
905    /// ```
906    pub fn with_plain_credentials(
907        mut self,
908        username: impl Into<String>,
909        password: impl Into<String>,
910    ) -> Self {
911        self.plain_username = Some(username.into());
912        self.plain_password = Some(password.into());
913        self
914    }
915
916    /// Enable CURVE server mode.
917    ///
918    /// # Examples
919    ///
920    /// ```
921    /// use monocoque_core::options::SocketOptions;
922    ///
923    /// let opts = SocketOptions::new().with_curve_server(true);
924    /// ```
925    pub const fn with_curve_server(mut self, enabled: bool) -> Self {
926        self.curve_server = enabled;
927        self
928    }
929
930    /// Set CURVE client keys (public + secret).
931    ///
932    /// # Examples
933    ///
934    /// ```
935    /// use monocoque_core::options::SocketOptions;
936    ///
937    /// let public = [0u8; 32];  // Replace with actual key
938    /// let secret = [0u8; 32];  // Replace with actual key
939    /// let opts = SocketOptions::new().with_curve_keypair(public, secret);
940    /// ```
941    pub const fn with_curve_keypair(mut self, publickey: [u8; 32], secretkey: [u8; 32]) -> Self {
942        self.curve_publickey = Some(publickey);
943        self.curve_secretkey = Some(secretkey);
944        self
945    }
946
947    /// Set CURVE server public key (for client).
948    ///
949    /// # Examples
950    ///
951    /// ```
952    /// use monocoque_core::options::SocketOptions;
953    ///
954    /// let server_key = [0u8; 32];  // Server's public key
955    /// let opts = SocketOptions::new().with_curve_serverkey(server_key);
956    /// ```
957    pub const fn with_curve_serverkey(mut self, serverkey: [u8; 32]) -> Self {
958        self.curve_serverkey = Some(serverkey);
959        self
960    }
961
962    /// Set ZAP domain for authentication.
963    ///
964    /// # Examples
965    ///
966    /// ```
967    /// use monocoque_core::options::SocketOptions;
968    ///
969    /// let opts = SocketOptions::new().with_zap_domain("production");
970    /// ```
971    pub fn with_zap_domain(mut self, domain: impl Into<String>) -> Self {
972        self.zap_domain = domain.into();
973        self
974    }
975
976    /// Add a subscription filter for SUB/XSUB sockets (`ZMQ_SUBSCRIBE`).
977    ///
978    /// SUB sockets MUST subscribe to at least one topic to receive messages.
979    /// An empty filter (b"" or `Bytes::new()`) subscribes to all messages.
980    ///
981    /// # Examples
982    ///
983    /// ```
984    /// use monocoque_core::options::SocketOptions;
985    /// use bytes::Bytes;
986    ///
987    /// // Subscribe to all messages
988    /// let opts = SocketOptions::new().with_subscribe(Bytes::new());
989    ///
990    /// // Subscribe to specific topics
991    /// let opts = SocketOptions::new()
992    ///     .with_subscribe(Bytes::from("weather."))
993    ///     .with_subscribe(Bytes::from("stocks."));
994    /// ```
995    pub fn with_subscribe(mut self, filter: bytes::Bytes) -> Self {
996        self.subscriptions.push(filter);
997        self
998    }
999
1000    /// Add multiple subscription filters for SUB/XSUB sockets.
1001    ///
1002    /// Convenience method to subscribe to multiple topics at once.
1003    ///
1004    /// # Examples
1005    ///
1006    /// ```
1007    /// use monocoque_core::options::SocketOptions;
1008    /// use bytes::Bytes;
1009    ///
1010    /// let opts = SocketOptions::new()
1011    ///     .with_subscriptions(vec![
1012    ///         Bytes::from("weather."),
1013    ///         Bytes::from("stocks."),
1014    ///     ]);
1015    /// ```
1016    pub fn with_subscriptions(mut self, filters: Vec<bytes::Bytes>) -> Self {
1017        self.subscriptions.extend(filters);
1018        self
1019    }
1020
1021    /// Add an unsubscription filter for SUB/XSUB sockets (`ZMQ_UNSUBSCRIBE`).
1022    ///
1023    /// Removes a previously added subscription filter.
1024    ///
1025    /// # Examples
1026    ///
1027    /// ```
1028    /// use monocoque_core::options::SocketOptions;
1029    /// use bytes::Bytes;
1030    ///
1031    /// let opts = SocketOptions::new()
1032    ///     .with_subscribe(Bytes::new())  // Subscribe to all
1033    ///     .with_unsubscribe(Bytes::from("admin.")); // Except admin topics
1034    /// ```
1035    pub fn with_unsubscribe(mut self, filter: bytes::Bytes) -> Self {
1036        self.unsubscriptions.push(filter);
1037        self
1038    }
1039
1040    // --- Query Methods ---
1041
1042    /// Check if receive operation should be non-blocking.
1043    pub const fn is_recv_nonblocking(&self) -> bool {
1044        matches!(self.recv_timeout, Some(d) if d.is_zero())
1045    }
1046
1047    /// Check if send operation should be non-blocking.
1048    pub const fn is_send_nonblocking(&self) -> bool {
1049        matches!(self.send_timeout, Some(d) if d.is_zero())
1050    }
1051
1052    /// Validate routing ID for use with ROUTER sockets.
1053    ///
1054    /// ROUTER socket identities must:
1055    /// - Be 1-255 bytes long
1056    /// - Not start with null byte (0x00) which is reserved for auto-generated IDs
1057    pub fn validate_router_identity(id: &[u8]) -> std::io::Result<()> {
1058        if id.is_empty() {
1059            return Err(std::io::Error::new(
1060                std::io::ErrorKind::InvalidInput,
1061                "routing ID cannot be empty",
1062            ));
1063        }
1064
1065        if id.len() > 255 {
1066            return Err(std::io::Error::new(
1067                std::io::ErrorKind::InvalidInput,
1068                format!("routing ID cannot exceed 255 bytes (got {})", id.len()),
1069            ));
1070        }
1071
1072        if id[0] == 0x00 {
1073            return Err(std::io::Error::new(
1074                std::io::ErrorKind::InvalidInput,
1075                "routing ID cannot start with null byte (reserved for auto-generated IDs)",
1076            ));
1077        }
1078
1079        Ok(())
1080    }
1081
1082    /// Validate general routing ID (for DEALER, REQ, REP).
1083    ///
1084    /// Less strict than ROUTER identities - allows null prefix.
1085    pub fn validate_routing_id(id: &[u8]) -> std::io::Result<()> {
1086        if id.len() > 255 {
1087            return Err(std::io::Error::new(
1088                std::io::ErrorKind::InvalidInput,
1089                format!("routing ID cannot exceed 255 bytes (got {})", id.len()),
1090            ));
1091        }
1092        Ok(())
1093    }
1094
1095    /// Get the current reconnection interval with exponential backoff.
1096    ///
1097    /// Returns the interval to use, considering exponential backoff
1098    /// and the maximum interval setting.
1099    pub fn next_reconnect_ivl(&self, attempt: u32) -> Duration {
1100        if self.reconnect_ivl_max.is_zero() {
1101            // No exponential backoff, always use base interval
1102            return self.reconnect_ivl;
1103        }
1104
1105        // Calculate exponential backoff: base * 2^attempt
1106        let backoff = self
1107            .reconnect_ivl
1108            .saturating_mul(2u32.saturating_pow(attempt));
1109
1110        // Cap at maximum interval
1111        backoff.min(self.reconnect_ivl_max)
1112    }
1113}
1114
1115#[cfg(test)]
1116mod tests {
1117    use super::*;
1118
1119    #[test]
1120    fn test_default_options() {
1121        let opts = SocketOptions::default();
1122        assert!(opts.recv_timeout.is_none());
1123        assert!(opts.send_timeout.is_none());
1124        assert_eq!(opts.handshake_timeout, Duration::from_secs(30));
1125        assert_eq!(opts.reconnect_ivl, Duration::from_millis(100));
1126        assert_eq!(opts.recv_hwm, 1000);
1127        assert_eq!(opts.send_hwm, 1000);
1128    }
1129
1130    #[test]
1131    fn test_builder_pattern() {
1132        let opts = SocketOptions::new()
1133            .with_recv_timeout(Duration::from_secs(5))
1134            .with_send_timeout(Duration::from_secs(10))
1135            .with_recv_hwm(2000);
1136
1137        assert_eq!(opts.recv_timeout, Some(Duration::from_secs(5)));
1138        assert_eq!(opts.send_timeout, Some(Duration::from_secs(10)));
1139        assert_eq!(opts.recv_hwm, 2000);
1140    }
1141
1142    #[test]
1143    fn test_nonblocking_checks() {
1144        let blocking = SocketOptions::new();
1145        assert!(!blocking.is_recv_nonblocking());
1146        assert!(!blocking.is_send_nonblocking());
1147
1148        let nonblocking = SocketOptions::new()
1149            .with_recv_timeout(Duration::ZERO)
1150            .with_send_timeout(Duration::ZERO);
1151        assert!(nonblocking.is_recv_nonblocking());
1152        assert!(nonblocking.is_send_nonblocking());
1153    }
1154
1155    #[test]
1156    fn test_exponential_backoff() {
1157        let opts = SocketOptions::new()
1158            .with_reconnect_ivl(Duration::from_millis(100))
1159            .with_reconnect_ivl_max(Duration::from_secs(10));
1160
1161        // First attempt: 100ms
1162        assert_eq!(opts.next_reconnect_ivl(0), Duration::from_millis(100));
1163
1164        // Second attempt: 200ms
1165        assert_eq!(opts.next_reconnect_ivl(1), Duration::from_millis(200));
1166
1167        // Third attempt: 400ms
1168        assert_eq!(opts.next_reconnect_ivl(2), Duration::from_millis(400));
1169
1170        // Eventually caps at 10s
1171        assert_eq!(opts.next_reconnect_ivl(10), Duration::from_secs(10));
1172    }
1173
1174    #[test]
1175    fn test_no_exponential_backoff() {
1176        let opts = SocketOptions::new().with_reconnect_ivl(Duration::from_millis(100));
1177        // reconnect_ivl_max is 0 by default
1178
1179        // Always returns base interval
1180        assert_eq!(opts.next_reconnect_ivl(0), Duration::from_millis(100));
1181        assert_eq!(opts.next_reconnect_ivl(1), Duration::from_millis(100));
1182        assert_eq!(opts.next_reconnect_ivl(10), Duration::from_millis(100));
1183    }
1184
1185    #[test]
1186    fn test_routing_id_validation() {
1187        // Valid ROUTER identities
1188        assert!(SocketOptions::validate_router_identity(b"client-001").is_ok());
1189        assert!(SocketOptions::validate_router_identity(&[0x01; 255]).is_ok());
1190
1191        // Invalid: empty
1192        assert!(SocketOptions::validate_router_identity(b"").is_err());
1193
1194        // Invalid: too long
1195        assert!(SocketOptions::validate_router_identity(&[0x01; 256]).is_err());
1196
1197        // Invalid: starts with null byte
1198        assert!(SocketOptions::validate_router_identity(b"\x00client").is_err());
1199    }
1200
1201    #[test]
1202    fn test_general_routing_id_validation() {
1203        // Valid
1204        assert!(SocketOptions::validate_routing_id(b"").is_ok()); // Empty allowed
1205        assert!(SocketOptions::validate_routing_id(b"\x00client").is_ok()); // Null prefix allowed
1206        assert!(SocketOptions::validate_routing_id(&[0x00; 255]).is_ok());
1207
1208        // Invalid: too long
1209        assert!(SocketOptions::validate_routing_id(&[0x01; 256]).is_err());
1210    }
1211
1212    #[test]
1213    fn test_connect_routing_id() {
1214        let opts =
1215            SocketOptions::new().with_connect_routing_id(bytes::Bytes::from_static(b"peer-123"));
1216
1217        assert_eq!(
1218            opts.connect_routing_id,
1219            Some(bytes::Bytes::from_static(b"peer-123"))
1220        );
1221    }
1222
1223    #[test]
1224    fn test_router_options() {
1225        let opts = SocketOptions::new()
1226            .with_router_mandatory(true)
1227            .with_router_handover(true);
1228
1229        assert!(opts.router_mandatory);
1230        assert!(opts.router_handover);
1231    }
1232
1233    #[test]
1234    fn test_subscription_options() {
1235        // Test with_subscribe
1236        let opts = SocketOptions::new()
1237            .with_subscribe(bytes::Bytes::new()) // Subscribe to all
1238            .with_subscribe(bytes::Bytes::from("weather."))
1239            .with_subscribe(bytes::Bytes::from("stocks."));
1240
1241        assert_eq!(opts.subscriptions.len(), 3);
1242        assert_eq!(opts.subscriptions[0], bytes::Bytes::new());
1243        assert_eq!(opts.subscriptions[1], bytes::Bytes::from("weather."));
1244        assert_eq!(opts.subscriptions[2], bytes::Bytes::from("stocks."));
1245
1246        // Test with_subscriptions
1247        let opts2 = SocketOptions::new().with_subscriptions(vec![
1248            bytes::Bytes::from("topic1"),
1249            bytes::Bytes::from("topic2"),
1250        ]);
1251
1252        assert_eq!(opts2.subscriptions.len(), 2);
1253
1254        // Test with_unsubscribe
1255        let opts3 = opts.with_unsubscribe(bytes::Bytes::from("admin."));
1256        assert_eq!(opts3.unsubscriptions.len(), 1);
1257        assert_eq!(opts3.unsubscriptions[0], bytes::Bytes::from("admin."));
1258    }
1259}