monocoque_core/options.rs
1//! Socket configuration options
2//!
3//! This module provides configuration options for `ZeroMQ` sockets, similar to
4//! libzmq's socket options (`zmq_setsockopt/zmq_getsockopt`).
5
6use std::time::Duration;
7
8/// Socket configuration options.
9///
10/// These options control socket behavior including timeouts, buffer sizes,
11/// and reliability features. This struct consolidates all socket configuration
12/// in one place, following the `MongoDB` Rust driver pattern.
13///
14/// # Examples
15///
16/// ```
17/// use monocoque_core::options::SocketOptions;
18/// use std::time::Duration;
19///
20/// // Simple case: use defaults
21/// let opts = SocketOptions::default();
22///
23/// // Customize timeouts and buffers
24/// let opts = SocketOptions::default()
25/// .with_recv_timeout(Duration::from_secs(5))
26/// .with_send_timeout(Duration::from_secs(5))
27/// .with_buffer_sizes(16384, 16384); // 16KB buffers for high-throughput
28/// ```
29#[derive(Debug, Clone)]
30pub struct SocketOptions {
31 /// Read buffer size (bytes)
32 ///
33 /// Size of arena-allocated buffer for receiving data.
34 /// - Default: 8192 (8KB) - balanced for most workloads
35 /// - Small (4KB): Low-latency with small messages (< 1KB)
36 /// - Large (16KB): High-throughput with large messages (> 8KB)
37 pub read_buffer_size: usize,
38
39 /// Write buffer size (bytes)
40 ///
41 /// Initial capacity of `BytesMut` buffer for sending data.
42 /// - Default: 8192 (8KB) - balanced for most workloads
43 /// - Small (4KB): Low-latency with small messages
44 /// - Large (16KB): High-throughput with large messages
45 pub write_buffer_size: usize,
46
47 /// Receive timeout (`ZMQ_RCVTIMEO`)
48 ///
49 /// Maximum time to wait for a receive operation.
50 /// - `None`: Block indefinitely (default)
51 /// - `Some(Duration::ZERO)`: Non-blocking (return immediately with EAGAIN)
52 /// - `Some(duration)`: Wait up to duration before returning EAGAIN
53 pub recv_timeout: Option<Duration>,
54
55 /// Send timeout (`ZMQ_SNDTIMEO`)
56 ///
57 /// Maximum time to wait for a send operation.
58 /// - `None`: Block indefinitely (default)
59 /// - `Some(Duration::ZERO)`: Non-blocking (return immediately with EAGAIN)
60 /// - `Some(duration)`: Wait up to duration before returning EAGAIN
61 pub send_timeout: Option<Duration>,
62
63 /// Handshake timeout (`ZMQ_HANDSHAKE_IVL`)
64 ///
65 /// Maximum time to complete ZMTP handshake after connection.
66 /// - Default: 30 seconds
67 /// - Set to `Duration::ZERO` to disable timeout
68 pub handshake_timeout: Duration,
69
70 /// Linger timeout (`ZMQ_LINGER`)
71 ///
72 /// Time to wait for pending messages to be sent before closing socket.
73 /// - `None`: Close immediately, discard pending messages
74 /// - `Some(Duration::ZERO)`: Same as None
75 /// - `Some(duration)`: Wait up to duration for messages to be sent
76 pub linger: Option<Duration>,
77
78 /// Reconnect interval (`ZMQ_RECONNECT_IVL`)
79 ///
80 /// Initial reconnection delay after connection loss.
81 /// - Default: 100ms
82 /// - Use with `reconnect_ivl_max` for exponential backoff
83 pub reconnect_ivl: Duration,
84
85 /// Maximum reconnect interval (`ZMQ_RECONNECT_IVL_MAX`)
86 ///
87 /// Maximum reconnection delay for exponential backoff.
88 /// - Default: 0 (no maximum, use `reconnect_ivl` always)
89 /// - When > 0: Doubles `reconnect_ivl` up to this value
90 pub reconnect_ivl_max: Duration,
91
92 /// Connection timeout (`ZMQ_CONNECT_TIMEOUT`)
93 ///
94 /// Maximum time to wait for TCP connection to complete.
95 /// - Default: 0 (use OS default)
96 pub connect_timeout: Duration,
97
98 /// High water mark for receiving (`ZMQ_RCVHWM`)
99 ///
100 /// Maximum number of messages to queue for receiving.
101 /// When reached, socket will block or drop messages depending on socket type.
102 /// - Default: 1000 messages
103 pub recv_hwm: usize,
104
105 /// High water mark for sending (`ZMQ_SNDHWM`)
106 ///
107 /// Maximum number of messages to queue for sending.
108 /// When reached, socket will block or drop messages depending on socket type.
109 /// - Default: 1000 messages
110 pub send_hwm: usize,
111
112 /// Enable immediate connect mode (`ZMQ_IMMEDIATE`)
113 ///
114 /// - `false` (default): Queue messages while connecting
115 /// - `true`: Report error if no connection established
116 pub immediate: bool,
117
118 /// Maximum message size (`ZMQ_MAXMSGSIZE`)
119 ///
120 /// Maximum size of a single message in bytes.
121 /// - `None`: No limit (default)
122 /// - `Some(size)`: Reject messages larger than size
123 pub max_msg_size: Option<usize>,
124
125 /// Socket identity / routing ID (`ZMQ_ROUTING_ID` / `ZMQ_IDENTITY`)
126 ///
127 /// Identity for ROUTER addressing. If None, a random UUID is generated.
128 /// - Default: None (auto-generate)
129 /// - Custom: Set for stable identity across reconnections
130 pub routing_id: Option<bytes::Bytes>,
131
132 /// Connect routing ID (`ZMQ_CONNECT_ROUTING_ID`)
133 ///
134 /// Identity to assign to the next outgoing connection.
135 /// Used by ROUTER sockets to assign a specific identity to a peer.
136 /// - Default: None (auto-generate)
137 /// - Custom: Assign explicit identity to next connection
138 /// - Consumed after each connect operation
139 pub connect_routing_id: Option<bytes::Bytes>,
140
141 /// ROUTER mandatory mode (`ZMQ_ROUTER_MANDATORY`)
142 ///
143 /// - `false` (default): Silently drop messages to unknown peers
144 /// - `true`: Return error when sending to unknown peer
145 pub router_mandatory: bool,
146
147 /// ROUTER handover mode (`ZMQ_ROUTER_HANDOVER`)
148 ///
149 /// - `false` (default): Disconnect old peer when new peer with same identity connects
150 /// - `true`: Hand over pending messages to new peer with same identity
151 pub router_handover: bool,
152
153 /// Probe ROUTER on connect (`ZMQ_PROBE_ROUTER`)
154 ///
155 /// - `false` (default): Normal operation
156 /// - `true`: Send empty message on connect to probe ROUTER identity
157 pub probe_router: bool,
158
159 /// XPUB verbose mode (`ZMQ_XPUB_VERBOSE`)
160 ///
161 /// - `false` (default): Only report new subscriptions
162 /// - `true`: Report all subscription messages (including duplicates)
163 pub xpub_verbose: bool,
164
165 /// XPUB manual mode (`ZMQ_XPUB_MANUAL`)
166 ///
167 /// - `false` (default): Automatic subscription management
168 /// - `true`: Manual subscription control via `send()`
169 pub xpub_manual: bool,
170
171 /// XPUB welcome message (`ZMQ_XPUB_WELCOME_MSG`)
172 ///
173 /// Message to send to new subscribers on connection.
174 /// Useful for last value cache (LVC) patterns.
175 pub xpub_welcome_msg: Option<bytes::Bytes>,
176
177 /// XSUB verbose unsubscribe (`ZMQ_XSUB_VERBOSE_UNSUBSCRIBE`)
178 ///
179 /// - `false` (default): Don't send explicit unsubscribe messages
180 /// - `true`: Send unsubscribe messages upstream
181 pub xsub_verbose_unsubs: bool,
182
183 /// Conflate messages (`ZMQ_CONFLATE`)
184 ///
185 /// - `false` (default): Queue all messages
186 /// - `true`: Keep only last message (overwrite queue)
187 pub conflate: bool,
188
189 /// TCP keepalive (`ZMQ_TCP_KEEPALIVE`)
190 ///
191 /// - `-1` (default): Use OS default
192 /// - `0`: Disable TCP keepalive
193 /// - `1`: Enable TCP keepalive
194 pub tcp_keepalive: i32,
195
196 /// TCP keepalive count (`ZMQ_TCP_KEEPALIVE_CNT`)
197 ///
198 /// Number of keepalive probes before considering connection dead.
199 /// - `-1` (default): Use OS default
200 /// - `> 0`: Number of probes
201 pub tcp_keepalive_cnt: i32,
202
203 /// TCP keepalive idle (`ZMQ_TCP_KEEPALIVE_IDLE`)
204 ///
205 /// Time in seconds before starting keepalive probes.
206 /// - `-1` (default): Use OS default
207 /// - `> 0`: Idle time in seconds
208 pub tcp_keepalive_idle: i32,
209
210 /// TCP keepalive interval (`ZMQ_TCP_KEEPALIVE_INTVL`)
211 ///
212 /// Time in seconds between keepalive probes.
213 /// - `-1` (default): Use OS default
214 /// - `> 0`: Interval in seconds
215 pub tcp_keepalive_intvl: i32,
216
217 /// REQ correlate mode (`ZMQ_REQ_CORRELATE`)
218 ///
219 /// Match replies to requests using message envelope.
220 /// - `false` (default): Accept any reply
221 /// - `true`: Match reply envelope to request
222 pub req_correlate: bool,
223
224 /// REQ relaxed mode (`ZMQ_REQ_RELAXED`)
225 ///
226 /// Allow multiple outstanding requests without strict alternation.
227 /// - `false` (default): Strict send-recv-send-recv pattern
228 /// - `true`: Allow send-send-recv-recv pattern
229 pub req_relaxed: bool,
230
231 /// Multicast rate in kilobits per second (`ZMQ_RATE`)
232 ///
233 /// Maximum send or receive data rate for multicast transports (PGM/EPGM).
234 /// - Default: 100 kbps
235 pub rate: i32,
236
237 /// Multicast recovery interval (`ZMQ_RECOVERY_IVL`)
238 ///
239 /// Maximum time to recover lost messages on multicast transports.
240 /// - Default: 10 seconds
241 pub recovery_ivl: Duration,
242
243 /// OS-level send buffer size (`ZMQ_SNDBUF`)
244 ///
245 /// Size of kernel send buffer. 0 = OS default.
246 /// - Default: 0 (use OS default)
247 pub sndbuf: i32,
248
249 /// OS-level receive buffer size (`ZMQ_RCVBUF`)
250 ///
251 /// Size of kernel receive buffer. 0 = OS default.
252 /// - Default: 0 (use OS default)
253 pub rcvbuf: i32,
254
255 /// Multicast TTL (`ZMQ_MULTICAST_HOPS`)
256 ///
257 /// Time-to-live for multicast packets.
258 /// - Default: 1 (local network only)
259 pub multicast_hops: i32,
260
261 /// IP Type of Service (`ZMQ_TOS`)
262 ///
263 /// Sets the `ToS` field in IP headers for `QoS`.
264 /// - Default: 0 (normal service)
265 pub tos: i32,
266
267 /// Maximum multicast transmission unit (`ZMQ_MULTICAST_MAXTPDU`)
268 ///
269 /// Maximum transport data unit for multicast.
270 /// - Default: 1500 bytes
271 pub multicast_maxtpdu: i32,
272
273 /// IPv6 support (`ZMQ_IPV6`)
274 ///
275 /// Enable IPv6 on socket.
276 /// - `false` (default): IPv4 only
277 /// - `true`: IPv6 support enabled
278 pub ipv6: bool,
279
280 /// Bind to device (`ZMQ_BINDTODEVICE`)
281 ///
282 /// Bind socket to specific network interface (Linux only).
283 /// - Default: None (bind to all interfaces)
284 pub bind_to_device: Option<String>,
285
286 // --- Security Options ---
287 /// PLAIN server mode (`ZMQ_PLAIN_SERVER`)
288 ///
289 /// Enable PLAIN authentication as server.
290 /// - `false` (default): Client mode
291 /// - `true`: Server mode (validate credentials)
292 pub plain_server: bool,
293
294 /// PLAIN username (`ZMQ_PLAIN_USERNAME`)
295 ///
296 /// Username for PLAIN authentication (client side).
297 /// - Default: None (no authentication)
298 pub plain_username: Option<String>,
299
300 /// PLAIN password (`ZMQ_PLAIN_PASSWORD`)
301 ///
302 /// Password for PLAIN authentication (client side).
303 /// - Default: None (no authentication)
304 pub plain_password: Option<String>,
305
306 /// CURVE server mode (`ZMQ_CURVE_SERVER`)
307 ///
308 /// Enable CURVE encryption as server.
309 /// - `false` (default): Client mode
310 /// - `true`: Server mode (provide server key)
311 pub curve_server: bool,
312
313 /// CURVE public key (`ZMQ_CURVE_PUBLICKEY`)
314 ///
315 /// Local public key for CURVE (32 bytes).
316 /// - Default: None (no encryption)
317 pub curve_publickey: Option<[u8; 32]>,
318
319 /// CURVE secret key (`ZMQ_CURVE_SECRETKEY`)
320 ///
321 /// Local secret key for CURVE (32 bytes).
322 /// - Default: None (no encryption)
323 pub curve_secretkey: Option<[u8; 32]>,
324
325 /// CURVE server key (`ZMQ_CURVE_SERVERKEY`)
326 ///
327 /// Server's public key for CURVE client (32 bytes).
328 /// - Default: None (no encryption)
329 /// - Client must set this to verify server identity
330 pub curve_serverkey: Option<[u8; 32]>,
331
332 /// ZAP domain (`ZMQ_ZAP_DOMAIN`)
333 ///
334 /// Security domain for ZAP authentication.
335 /// - Default: "" (global domain)
336 pub zap_domain: String,
337
338 /// Subscriptions (`ZMQ_SUBSCRIBE`)
339 ///
340 /// Subscription filters for SUB/XSUB sockets.
341 /// - Empty vec: No subscriptions (default) - won't receive any messages
342 /// - vec![b""] or vec![`Bytes::new()`]: Subscribe to all messages
343 /// - vec![b"topic1", b"topic2"]: Subscribe to specific topics
344 ///
345 /// Note: SUB sockets MUST subscribe to at least one topic to receive messages.
346 pub subscriptions: Vec<bytes::Bytes>,
347
348 /// Unsubscriptions (`ZMQ_UNSUBSCRIBE`)
349 ///
350 /// Subscription filters to remove for SUB/XSUB sockets.
351 /// Applied after subscriptions during socket configuration.
352 pub unsubscriptions: Vec<bytes::Bytes>,
353
354 /// Maximum reconnection attempts (`ZMQ_RECONNECT_STOP`)
355 ///
356 /// Maximum number of times to attempt reconnection after a disconnect.
357 /// - `None`: Retry indefinitely (default, matches libzmq behaviour)
358 /// - `Some(n)`: Give up and return `NotConnected` after n attempts
359 pub max_reconnect_attempts: Option<u32>,
360
361 /// ZMTP heartbeat interval (`ZMQ_HEARTBEAT_IVL` = 75)
362 ///
363 /// How often to send PING heartbeat commands on an otherwise idle connection.
364 /// - `None`: Disabled (default)
365 /// - `Some(dur)`: Send PING every `dur` of inactivity
366 pub heartbeat_ivl: Option<Duration>,
367
368 /// ZMTP heartbeat TTL (`ZMQ_HEARTBEAT_TTL` = 76)
369 ///
370 /// Time-to-live for the remote peer's heartbeat (sent in PING command).
371 /// The remote will disconnect if it doesn't receive a heartbeat within this interval.
372 /// - `None`: Use `heartbeat_ivl` (default)
373 /// - `Some(dur)`: Override TTL sent to peer
374 pub heartbeat_ttl: Option<Duration>,
375
376 /// ZMTP heartbeat timeout (`ZMQ_HEARTBEAT_TIMEOUT` = 77)
377 ///
378 /// How long to wait for a PONG reply before considering the connection dead.
379 /// - `None`: Use `heartbeat_ivl` (default)
380 /// - `Some(dur)`: Custom timeout (recommended: 2–5× heartbeat_ivl)
381 pub heartbeat_timeout: Option<Duration>,
382
383 /// ROUTER raw mode (`ZMQ_ROUTER_RAW` = 41)
384 ///
385 /// Put ROUTER socket into raw mode (no ZMTP handshake, acts like STREAM).
386 /// - `false` (default): Normal ZMTP routing
387 /// - `true`: Raw TCP bridging mode
388 pub router_raw: bool,
389
390 /// STREAM connect/disconnect notifications (`ZMQ_STREAM_NOTIFY` = 73)
391 ///
392 /// Send empty notification frames on connect and disconnect.
393 /// - `true` (default): Send notification frames
394 /// - `false`: Suppress notification frames
395 pub stream_notify: bool,
396
397 /// XPUB no-drop mode (`ZMQ_XPUB_NODROP` = 69)
398 ///
399 /// - `false` (default): Drop messages silently when HWM is reached
400 /// - `true`: Return error (`EAGAIN`) instead of dropping
401 pub xpub_nodrop: bool,
402
403 /// Invert topic matching (`ZMQ_INVERT_MATCHING` = 74)
404 ///
405 /// Invert the subscription filter logic for PUB/SUB and XPUB/XSUB.
406 /// - `false` (default): Deliver messages matching subscriptions
407 /// - `true`: Deliver messages NOT matching any subscription
408 pub invert_matching: bool,
409}
410
411impl Default for SocketOptions {
412 fn default() -> Self {
413 Self {
414 recv_timeout: None, // Block indefinitely
415 send_timeout: None, // Block indefinitely
416 handshake_timeout: Duration::from_secs(30),
417 linger: Some(Duration::from_secs(30)), // Wait 30s for pending messages
418 reconnect_ivl: Duration::from_millis(100),
419 reconnect_ivl_max: Duration::ZERO, // No maximum
420 connect_timeout: Duration::ZERO, // Use OS default
421 recv_hwm: 1000,
422 send_hwm: 1000,
423 immediate: false,
424 max_msg_size: None, // No limit
425 read_buffer_size: 8192, // 8KB - balanced default
426 write_buffer_size: 8192, // 8KB - balanced default
427 routing_id: None,
428 connect_routing_id: None,
429 router_mandatory: false,
430 router_handover: false,
431 probe_router: false,
432 xpub_verbose: false,
433 xpub_manual: false,
434 xpub_welcome_msg: None,
435 xsub_verbose_unsubs: false,
436 conflate: false,
437 tcp_keepalive: -1, // OS default
438 tcp_keepalive_cnt: -1, // OS default
439 tcp_keepalive_idle: -1, // OS default
440 tcp_keepalive_intvl: -1, // OS default
441 req_correlate: false,
442 req_relaxed: false,
443 rate: 100, // 100 kbps
444 recovery_ivl: Duration::from_secs(10),
445 sndbuf: 0, // OS default
446 rcvbuf: 0, // OS default
447 multicast_hops: 1, // Local network only
448 tos: 0, // Normal service
449 multicast_maxtpdu: 1500, // Standard MTU
450 ipv6: false, // IPv4 only
451 bind_to_device: None, // All interfaces
452 // Security
453 plain_server: false,
454 plain_username: None,
455 plain_password: None,
456 curve_server: false,
457 curve_publickey: None,
458 curve_secretkey: None,
459 curve_serverkey: None,
460 zap_domain: String::new(), // Global domain
461 subscriptions: Vec::new(), // No subscriptions
462 unsubscriptions: Vec::new(), // No unsubscriptions
463 max_reconnect_attempts: None, // Retry indefinitely
464 heartbeat_ivl: None,
465 heartbeat_ttl: None,
466 heartbeat_timeout: None,
467 router_raw: false,
468 stream_notify: true,
469 xpub_nodrop: false,
470 invert_matching: false,
471 }
472 }
473}
474
475impl SocketOptions {
476 /// Create new socket options with default values (8KB buffers).
477 #[must_use]
478 pub fn new() -> Self {
479 Self::default()
480 }
481
482 /// Create socket options optimized for small messages (< 1KB).
483 ///
484 /// Sets 4KB buffers, suitable for low-latency request-reply patterns.
485 ///
486 /// # Examples
487 ///
488 /// ```
489 /// use monocoque_core::options::SocketOptions;
490 ///
491 /// let opts = SocketOptions::small(); // 4KB buffers for REQ/REP
492 /// ```
493 #[must_use]
494 pub fn small() -> Self {
495 Self {
496 read_buffer_size: 4096,
497 write_buffer_size: 4096,
498 ..Self::default()
499 }
500 }
501
502 /// Create socket options optimized for large messages (> 8KB).
503 ///
504 /// Sets 16KB buffers, suitable for high-throughput async patterns.
505 ///
506 /// # Examples
507 ///
508 /// ```
509 /// use monocoque_core::options::SocketOptions;
510 ///
511 /// let opts = SocketOptions::large(); // 16KB buffers for DEALER/ROUTER
512 /// ```
513 #[must_use]
514 pub fn large() -> Self {
515 Self {
516 read_buffer_size: 16384,
517 write_buffer_size: 16384,
518 ..Self::default()
519 }
520 }
521
522 /// Set receive timeout.
523 ///
524 /// # Examples
525 ///
526 /// ```
527 /// use monocoque_core::options::SocketOptions;
528 /// use std::time::Duration;
529 ///
530 /// // Non-blocking receive
531 /// let opts = SocketOptions::new().with_recv_timeout(Duration::ZERO);
532 ///
533 /// // 5 second timeout
534 /// let opts = SocketOptions::new().with_recv_timeout(Duration::from_secs(5));
535 /// ```
536 pub const fn with_recv_timeout(mut self, timeout: Duration) -> Self {
537 self.recv_timeout = Some(timeout);
538 self
539 }
540
541 /// Set send timeout.
542 pub const fn with_send_timeout(mut self, timeout: Duration) -> Self {
543 self.send_timeout = Some(timeout);
544 self
545 }
546
547 /// Set handshake timeout.
548 pub const fn with_handshake_timeout(mut self, timeout: Duration) -> Self {
549 self.handshake_timeout = timeout;
550 self
551 }
552
553 /// Set linger timeout.
554 pub const fn with_linger(mut self, linger: Option<Duration>) -> Self {
555 self.linger = linger;
556 self
557 }
558
559 /// Set reconnection interval.
560 pub const fn with_reconnect_ivl(mut self, ivl: Duration) -> Self {
561 self.reconnect_ivl = ivl;
562 self
563 }
564
565 /// Set maximum reconnection interval for exponential backoff.
566 pub const fn with_reconnect_ivl_max(mut self, max: Duration) -> Self {
567 self.reconnect_ivl_max = max;
568 self
569 }
570
571 /// Set maximum number of reconnection attempts.
572 ///
573 /// `None` retries indefinitely (default); `Some(n)` gives up after n attempts.
574 pub const fn with_max_reconnect_attempts(mut self, max: Option<u32>) -> Self {
575 self.max_reconnect_attempts = max;
576 self
577 }
578
579 /// Set connection timeout.
580 pub const fn with_connect_timeout(mut self, timeout: Duration) -> Self {
581 self.connect_timeout = timeout;
582 self
583 }
584
585 /// Set heartbeat interval (`ZMQ_HEARTBEAT_IVL`).
586 pub const fn with_heartbeat_ivl(mut self, ivl: Duration) -> Self {
587 self.heartbeat_ivl = Some(ivl);
588 self
589 }
590
591 /// Set heartbeat TTL (`ZMQ_HEARTBEAT_TTL`).
592 pub const fn with_heartbeat_ttl(mut self, ttl: Duration) -> Self {
593 self.heartbeat_ttl = Some(ttl);
594 self
595 }
596
597 /// Set heartbeat timeout (`ZMQ_HEARTBEAT_TIMEOUT`).
598 pub const fn with_heartbeat_timeout(mut self, timeout: Duration) -> Self {
599 self.heartbeat_timeout = Some(timeout);
600 self
601 }
602
603 /// Enable or disable ROUTER raw mode (`ZMQ_ROUTER_RAW`).
604 pub const fn with_router_raw(mut self, raw: bool) -> Self {
605 self.router_raw = raw;
606 self
607 }
608
609 /// Enable or disable STREAM connect/disconnect notifications (`ZMQ_STREAM_NOTIFY`).
610 pub const fn with_stream_notify(mut self, notify: bool) -> Self {
611 self.stream_notify = notify;
612 self
613 }
614
615 /// Enable XPUB no-drop mode (`ZMQ_XPUB_NODROP`).
616 pub const fn with_xpub_nodrop(mut self, nodrop: bool) -> Self {
617 self.xpub_nodrop = nodrop;
618 self
619 }
620
621 /// Enable inverted topic matching (`ZMQ_INVERT_MATCHING`).
622 pub const fn with_invert_matching(mut self, invert: bool) -> Self {
623 self.invert_matching = invert;
624 self
625 }
626
627 /// Set receive high water mark.
628 pub const fn with_recv_hwm(mut self, hwm: usize) -> Self {
629 self.recv_hwm = hwm;
630 self
631 }
632
633 /// Set send high water mark.
634 pub const fn with_send_hwm(mut self, hwm: usize) -> Self {
635 self.send_hwm = hwm;
636 self
637 }
638
639 /// Enable or disable immediate mode.
640 pub const fn with_immediate(mut self, immediate: bool) -> Self {
641 self.immediate = immediate;
642 self
643 }
644
645 /// Set maximum message size.
646 pub const fn with_max_msg_size(mut self, size: Option<usize>) -> Self {
647 self.max_msg_size = size;
648 self
649 }
650
651 /// Set read buffer size.
652 ///
653 /// # Examples
654 ///
655 /// ```
656 /// use monocoque_core::options::SocketOptions;
657 ///
658 /// // Small buffers for low latency
659 /// let opts = SocketOptions::new().with_read_buffer_size(4096);
660 ///
661 /// // Large buffers for throughput
662 /// let opts = SocketOptions::new().with_read_buffer_size(16384);
663 /// ```
664 pub const fn with_read_buffer_size(mut self, size: usize) -> Self {
665 self.read_buffer_size = size;
666 self
667 }
668
669 /// Set write buffer size.
670 pub const fn with_write_buffer_size(mut self, size: usize) -> Self {
671 self.write_buffer_size = size;
672 self
673 }
674
675 /// Set both read and write buffer sizes (convenience method).
676 ///
677 /// # Examples
678 ///
679 /// ```
680 /// use monocoque_core::options::SocketOptions;
681 ///
682 /// // Small buffers for both
683 /// let opts = SocketOptions::new().with_buffer_sizes(4096, 4096);
684 /// ```
685 pub const fn with_buffer_sizes(mut self, read_size: usize, write_size: usize) -> Self {
686 self.read_buffer_size = read_size;
687 self.write_buffer_size = write_size;
688 self
689 }
690
691 /// Set socket routing ID / identity.
692 ///
693 /// # Examples
694 ///
695 /// ```
696 /// use monocoque_core::options::SocketOptions;
697 /// use bytes::Bytes;
698 ///
699 /// let opts = SocketOptions::new()
700 /// .with_routing_id(Bytes::from_static(b"worker-01"));
701 /// ```
702 pub fn with_routing_id(mut self, id: bytes::Bytes) -> Self {
703 self.routing_id = Some(id);
704 self
705 }
706
707 /// Set connect routing ID for the next connection.
708 ///
709 /// This option is consumed after each connect operation and must be set
710 /// again for subsequent connections.
711 ///
712 /// # Examples
713 ///
714 /// ```
715 /// use monocoque_core::options::SocketOptions;
716 /// use bytes::Bytes;
717 ///
718 /// let opts = SocketOptions::new()
719 /// .with_connect_routing_id(Bytes::from_static(b"client-001"));
720 /// ```
721 pub fn with_connect_routing_id(mut self, id: bytes::Bytes) -> Self {
722 self.connect_routing_id = Some(id);
723 self
724 }
725
726 /// Enable ROUTER mandatory mode.
727 pub const fn with_router_mandatory(mut self, enabled: bool) -> Self {
728 self.router_mandatory = enabled;
729 self
730 }
731
732 /// Enable ROUTER handover mode.
733 pub const fn with_router_handover(mut self, enabled: bool) -> Self {
734 self.router_handover = enabled;
735 self
736 }
737
738 /// Enable ROUTER probe on connect.
739 pub const fn with_probe_router(mut self, enabled: bool) -> Self {
740 self.probe_router = enabled;
741 self
742 }
743
744 /// Enable XPUB verbose mode.
745 pub const fn with_xpub_verbose(mut self, enabled: bool) -> Self {
746 self.xpub_verbose = enabled;
747 self
748 }
749
750 /// Enable XPUB manual mode.
751 pub const fn with_xpub_manual(mut self, enabled: bool) -> Self {
752 self.xpub_manual = enabled;
753 self
754 }
755
756 /// Set XPUB welcome message.
757 pub fn with_xpub_welcome_msg(mut self, msg: bytes::Bytes) -> Self {
758 self.xpub_welcome_msg = Some(msg);
759 self
760 }
761
762 /// Enable XSUB verbose unsubscribe.
763 pub const fn with_xsub_verbose_unsubs(mut self, enabled: bool) -> Self {
764 self.xsub_verbose_unsubs = enabled;
765 self
766 }
767
768 /// Enable message conflation (keep only last message).
769 pub const fn with_conflate(mut self, enabled: bool) -> Self {
770 self.conflate = enabled;
771 self
772 }
773
774 /// Set TCP keepalive mode.
775 ///
776 /// # Arguments
777 ///
778 /// * `mode` - `-1` for OS default, `0` to disable, `1` to enable
779 pub const fn with_tcp_keepalive(mut self, mode: i32) -> Self {
780 self.tcp_keepalive = mode;
781 self
782 }
783
784 /// Set TCP keepalive count (number of probes before timeout).
785 ///
786 /// # Arguments
787 ///
788 /// * `count` - `-1` for OS default, `> 0` for specific count
789 pub const fn with_tcp_keepalive_cnt(mut self, count: i32) -> Self {
790 self.tcp_keepalive_cnt = count;
791 self
792 }
793
794 /// Set TCP keepalive idle time (seconds before first probe).
795 ///
796 /// # Arguments
797 ///
798 /// * `seconds` - `-1` for OS default, `> 0` for specific idle time
799 pub const fn with_tcp_keepalive_idle(mut self, seconds: i32) -> Self {
800 self.tcp_keepalive_idle = seconds;
801 self
802 }
803
804 /// Set TCP keepalive interval (seconds between probes).
805 ///
806 /// # Arguments
807 ///
808 /// * `seconds` - `-1` for OS default, `> 0` for specific interval
809 pub const fn with_tcp_keepalive_intvl(mut self, seconds: i32) -> Self {
810 self.tcp_keepalive_intvl = seconds;
811 self
812 }
813
814 /// Enable REQ correlation mode (match replies to requests).
815 pub const fn with_req_correlate(mut self, enabled: bool) -> Self {
816 self.req_correlate = enabled;
817 self
818 }
819
820 /// Enable REQ relaxed mode (allow multiple outstanding requests).
821 pub const fn with_req_relaxed(mut self, enabled: bool) -> Self {
822 self.req_relaxed = enabled;
823 self
824 }
825
826 /// Set multicast rate (`ZMQ_RATE`).
827 pub const fn with_rate(mut self, rate: i32) -> Self {
828 self.rate = rate;
829 self
830 }
831
832 /// Set multicast recovery interval (`ZMQ_RECOVERY_IVL`).
833 pub const fn with_recovery_ivl(mut self, interval: Duration) -> Self {
834 self.recovery_ivl = interval;
835 self
836 }
837
838 /// Set OS send buffer size (`ZMQ_SNDBUF`).
839 pub const fn with_sndbuf(mut self, size: i32) -> Self {
840 self.sndbuf = size;
841 self
842 }
843
844 /// Set OS receive buffer size (`ZMQ_RCVBUF`).
845 pub const fn with_rcvbuf(mut self, size: i32) -> Self {
846 self.rcvbuf = size;
847 self
848 }
849
850 /// Set multicast TTL/hops (`ZMQ_MULTICAST_HOPS`).
851 pub const fn with_multicast_hops(mut self, hops: i32) -> Self {
852 self.multicast_hops = hops;
853 self
854 }
855
856 /// Set IP Type of Service (`ZMQ_TOS`).
857 pub const fn with_tos(mut self, tos: i32) -> Self {
858 self.tos = tos;
859 self
860 }
861
862 /// Set multicast maximum TPU (`ZMQ_MULTICAST_MAXTPDU`).
863 pub const fn with_multicast_maxtpdu(mut self, mtu: i32) -> Self {
864 self.multicast_maxtpdu = mtu;
865 self
866 }
867
868 /// Enable IPv6 support (`ZMQ_IPV6`).
869 pub const fn with_ipv6(mut self, enabled: bool) -> Self {
870 self.ipv6 = enabled;
871 self
872 }
873
874 /// Bind to specific device (`ZMQ_BINDTODEVICE`) - Linux only.
875 pub fn with_bind_to_device(mut self, device: impl Into<String>) -> Self {
876 self.bind_to_device = Some(device.into());
877 self
878 }
879
880 // --- Security Options ---
881
882 /// Enable PLAIN server mode.
883 ///
884 /// # Examples
885 ///
886 /// ```
887 /// use monocoque_core::options::SocketOptions;
888 ///
889 /// let opts = SocketOptions::new().with_plain_server(true);
890 /// ```
891 pub const fn with_plain_server(mut self, enabled: bool) -> Self {
892 self.plain_server = enabled;
893 self
894 }
895
896 /// Set PLAIN client credentials.
897 ///
898 /// # Examples
899 ///
900 /// ```
901 /// use monocoque_core::options::SocketOptions;
902 ///
903 /// let opts = SocketOptions::new()
904 /// .with_plain_credentials("admin", "secret123");
905 /// ```
906 pub fn with_plain_credentials(
907 mut self,
908 username: impl Into<String>,
909 password: impl Into<String>,
910 ) -> Self {
911 self.plain_username = Some(username.into());
912 self.plain_password = Some(password.into());
913 self
914 }
915
916 /// Enable CURVE server mode.
917 ///
918 /// # Examples
919 ///
920 /// ```
921 /// use monocoque_core::options::SocketOptions;
922 ///
923 /// let opts = SocketOptions::new().with_curve_server(true);
924 /// ```
925 pub const fn with_curve_server(mut self, enabled: bool) -> Self {
926 self.curve_server = enabled;
927 self
928 }
929
930 /// Set CURVE client keys (public + secret).
931 ///
932 /// # Examples
933 ///
934 /// ```
935 /// use monocoque_core::options::SocketOptions;
936 ///
937 /// let public = [0u8; 32]; // Replace with actual key
938 /// let secret = [0u8; 32]; // Replace with actual key
939 /// let opts = SocketOptions::new().with_curve_keypair(public, secret);
940 /// ```
941 pub const fn with_curve_keypair(mut self, publickey: [u8; 32], secretkey: [u8; 32]) -> Self {
942 self.curve_publickey = Some(publickey);
943 self.curve_secretkey = Some(secretkey);
944 self
945 }
946
947 /// Set CURVE server public key (for client).
948 ///
949 /// # Examples
950 ///
951 /// ```
952 /// use monocoque_core::options::SocketOptions;
953 ///
954 /// let server_key = [0u8; 32]; // Server's public key
955 /// let opts = SocketOptions::new().with_curve_serverkey(server_key);
956 /// ```
957 pub const fn with_curve_serverkey(mut self, serverkey: [u8; 32]) -> Self {
958 self.curve_serverkey = Some(serverkey);
959 self
960 }
961
962 /// Set ZAP domain for authentication.
963 ///
964 /// # Examples
965 ///
966 /// ```
967 /// use monocoque_core::options::SocketOptions;
968 ///
969 /// let opts = SocketOptions::new().with_zap_domain("production");
970 /// ```
971 pub fn with_zap_domain(mut self, domain: impl Into<String>) -> Self {
972 self.zap_domain = domain.into();
973 self
974 }
975
976 /// Add a subscription filter for SUB/XSUB sockets (`ZMQ_SUBSCRIBE`).
977 ///
978 /// SUB sockets MUST subscribe to at least one topic to receive messages.
979 /// An empty filter (b"" or `Bytes::new()`) subscribes to all messages.
980 ///
981 /// # Examples
982 ///
983 /// ```
984 /// use monocoque_core::options::SocketOptions;
985 /// use bytes::Bytes;
986 ///
987 /// // Subscribe to all messages
988 /// let opts = SocketOptions::new().with_subscribe(Bytes::new());
989 ///
990 /// // Subscribe to specific topics
991 /// let opts = SocketOptions::new()
992 /// .with_subscribe(Bytes::from("weather."))
993 /// .with_subscribe(Bytes::from("stocks."));
994 /// ```
995 pub fn with_subscribe(mut self, filter: bytes::Bytes) -> Self {
996 self.subscriptions.push(filter);
997 self
998 }
999
1000 /// Add multiple subscription filters for SUB/XSUB sockets.
1001 ///
1002 /// Convenience method to subscribe to multiple topics at once.
1003 ///
1004 /// # Examples
1005 ///
1006 /// ```
1007 /// use monocoque_core::options::SocketOptions;
1008 /// use bytes::Bytes;
1009 ///
1010 /// let opts = SocketOptions::new()
1011 /// .with_subscriptions(vec![
1012 /// Bytes::from("weather."),
1013 /// Bytes::from("stocks."),
1014 /// ]);
1015 /// ```
1016 pub fn with_subscriptions(mut self, filters: Vec<bytes::Bytes>) -> Self {
1017 self.subscriptions.extend(filters);
1018 self
1019 }
1020
1021 /// Add an unsubscription filter for SUB/XSUB sockets (`ZMQ_UNSUBSCRIBE`).
1022 ///
1023 /// Removes a previously added subscription filter.
1024 ///
1025 /// # Examples
1026 ///
1027 /// ```
1028 /// use monocoque_core::options::SocketOptions;
1029 /// use bytes::Bytes;
1030 ///
1031 /// let opts = SocketOptions::new()
1032 /// .with_subscribe(Bytes::new()) // Subscribe to all
1033 /// .with_unsubscribe(Bytes::from("admin.")); // Except admin topics
1034 /// ```
1035 pub fn with_unsubscribe(mut self, filter: bytes::Bytes) -> Self {
1036 self.unsubscriptions.push(filter);
1037 self
1038 }
1039
1040 // --- Query Methods ---
1041
1042 /// Check if receive operation should be non-blocking.
1043 pub const fn is_recv_nonblocking(&self) -> bool {
1044 matches!(self.recv_timeout, Some(d) if d.is_zero())
1045 }
1046
1047 /// Check if send operation should be non-blocking.
1048 pub const fn is_send_nonblocking(&self) -> bool {
1049 matches!(self.send_timeout, Some(d) if d.is_zero())
1050 }
1051
1052 /// Validate routing ID for use with ROUTER sockets.
1053 ///
1054 /// ROUTER socket identities must:
1055 /// - Be 1-255 bytes long
1056 /// - Not start with null byte (0x00) which is reserved for auto-generated IDs
1057 pub fn validate_router_identity(id: &[u8]) -> std::io::Result<()> {
1058 if id.is_empty() {
1059 return Err(std::io::Error::new(
1060 std::io::ErrorKind::InvalidInput,
1061 "routing ID cannot be empty",
1062 ));
1063 }
1064
1065 if id.len() > 255 {
1066 return Err(std::io::Error::new(
1067 std::io::ErrorKind::InvalidInput,
1068 format!("routing ID cannot exceed 255 bytes (got {})", id.len()),
1069 ));
1070 }
1071
1072 if id[0] == 0x00 {
1073 return Err(std::io::Error::new(
1074 std::io::ErrorKind::InvalidInput,
1075 "routing ID cannot start with null byte (reserved for auto-generated IDs)",
1076 ));
1077 }
1078
1079 Ok(())
1080 }
1081
1082 /// Validate general routing ID (for DEALER, REQ, REP).
1083 ///
1084 /// Less strict than ROUTER identities - allows null prefix.
1085 pub fn validate_routing_id(id: &[u8]) -> std::io::Result<()> {
1086 if id.len() > 255 {
1087 return Err(std::io::Error::new(
1088 std::io::ErrorKind::InvalidInput,
1089 format!("routing ID cannot exceed 255 bytes (got {})", id.len()),
1090 ));
1091 }
1092 Ok(())
1093 }
1094
1095 /// Get the current reconnection interval with exponential backoff.
1096 ///
1097 /// Returns the interval to use, considering exponential backoff
1098 /// and the maximum interval setting.
1099 pub fn next_reconnect_ivl(&self, attempt: u32) -> Duration {
1100 if self.reconnect_ivl_max.is_zero() {
1101 // No exponential backoff, always use base interval
1102 return self.reconnect_ivl;
1103 }
1104
1105 // Calculate exponential backoff: base * 2^attempt
1106 let backoff = self
1107 .reconnect_ivl
1108 .saturating_mul(2u32.saturating_pow(attempt));
1109
1110 // Cap at maximum interval
1111 backoff.min(self.reconnect_ivl_max)
1112 }
1113}
1114
1115#[cfg(test)]
1116mod tests {
1117 use super::*;
1118
1119 #[test]
1120 fn test_default_options() {
1121 let opts = SocketOptions::default();
1122 assert!(opts.recv_timeout.is_none());
1123 assert!(opts.send_timeout.is_none());
1124 assert_eq!(opts.handshake_timeout, Duration::from_secs(30));
1125 assert_eq!(opts.reconnect_ivl, Duration::from_millis(100));
1126 assert_eq!(opts.recv_hwm, 1000);
1127 assert_eq!(opts.send_hwm, 1000);
1128 }
1129
1130 #[test]
1131 fn test_builder_pattern() {
1132 let opts = SocketOptions::new()
1133 .with_recv_timeout(Duration::from_secs(5))
1134 .with_send_timeout(Duration::from_secs(10))
1135 .with_recv_hwm(2000);
1136
1137 assert_eq!(opts.recv_timeout, Some(Duration::from_secs(5)));
1138 assert_eq!(opts.send_timeout, Some(Duration::from_secs(10)));
1139 assert_eq!(opts.recv_hwm, 2000);
1140 }
1141
1142 #[test]
1143 fn test_nonblocking_checks() {
1144 let blocking = SocketOptions::new();
1145 assert!(!blocking.is_recv_nonblocking());
1146 assert!(!blocking.is_send_nonblocking());
1147
1148 let nonblocking = SocketOptions::new()
1149 .with_recv_timeout(Duration::ZERO)
1150 .with_send_timeout(Duration::ZERO);
1151 assert!(nonblocking.is_recv_nonblocking());
1152 assert!(nonblocking.is_send_nonblocking());
1153 }
1154
1155 #[test]
1156 fn test_exponential_backoff() {
1157 let opts = SocketOptions::new()
1158 .with_reconnect_ivl(Duration::from_millis(100))
1159 .with_reconnect_ivl_max(Duration::from_secs(10));
1160
1161 // First attempt: 100ms
1162 assert_eq!(opts.next_reconnect_ivl(0), Duration::from_millis(100));
1163
1164 // Second attempt: 200ms
1165 assert_eq!(opts.next_reconnect_ivl(1), Duration::from_millis(200));
1166
1167 // Third attempt: 400ms
1168 assert_eq!(opts.next_reconnect_ivl(2), Duration::from_millis(400));
1169
1170 // Eventually caps at 10s
1171 assert_eq!(opts.next_reconnect_ivl(10), Duration::from_secs(10));
1172 }
1173
1174 #[test]
1175 fn test_no_exponential_backoff() {
1176 let opts = SocketOptions::new().with_reconnect_ivl(Duration::from_millis(100));
1177 // reconnect_ivl_max is 0 by default
1178
1179 // Always returns base interval
1180 assert_eq!(opts.next_reconnect_ivl(0), Duration::from_millis(100));
1181 assert_eq!(opts.next_reconnect_ivl(1), Duration::from_millis(100));
1182 assert_eq!(opts.next_reconnect_ivl(10), Duration::from_millis(100));
1183 }
1184
1185 #[test]
1186 fn test_routing_id_validation() {
1187 // Valid ROUTER identities
1188 assert!(SocketOptions::validate_router_identity(b"client-001").is_ok());
1189 assert!(SocketOptions::validate_router_identity(&[0x01; 255]).is_ok());
1190
1191 // Invalid: empty
1192 assert!(SocketOptions::validate_router_identity(b"").is_err());
1193
1194 // Invalid: too long
1195 assert!(SocketOptions::validate_router_identity(&[0x01; 256]).is_err());
1196
1197 // Invalid: starts with null byte
1198 assert!(SocketOptions::validate_router_identity(b"\x00client").is_err());
1199 }
1200
1201 #[test]
1202 fn test_general_routing_id_validation() {
1203 // Valid
1204 assert!(SocketOptions::validate_routing_id(b"").is_ok()); // Empty allowed
1205 assert!(SocketOptions::validate_routing_id(b"\x00client").is_ok()); // Null prefix allowed
1206 assert!(SocketOptions::validate_routing_id(&[0x00; 255]).is_ok());
1207
1208 // Invalid: too long
1209 assert!(SocketOptions::validate_routing_id(&[0x01; 256]).is_err());
1210 }
1211
1212 #[test]
1213 fn test_connect_routing_id() {
1214 let opts =
1215 SocketOptions::new().with_connect_routing_id(bytes::Bytes::from_static(b"peer-123"));
1216
1217 assert_eq!(
1218 opts.connect_routing_id,
1219 Some(bytes::Bytes::from_static(b"peer-123"))
1220 );
1221 }
1222
1223 #[test]
1224 fn test_router_options() {
1225 let opts = SocketOptions::new()
1226 .with_router_mandatory(true)
1227 .with_router_handover(true);
1228
1229 assert!(opts.router_mandatory);
1230 assert!(opts.router_handover);
1231 }
1232
1233 #[test]
1234 fn test_subscription_options() {
1235 // Test with_subscribe
1236 let opts = SocketOptions::new()
1237 .with_subscribe(bytes::Bytes::new()) // Subscribe to all
1238 .with_subscribe(bytes::Bytes::from("weather."))
1239 .with_subscribe(bytes::Bytes::from("stocks."));
1240
1241 assert_eq!(opts.subscriptions.len(), 3);
1242 assert_eq!(opts.subscriptions[0], bytes::Bytes::new());
1243 assert_eq!(opts.subscriptions[1], bytes::Bytes::from("weather."));
1244 assert_eq!(opts.subscriptions[2], bytes::Bytes::from("stocks."));
1245
1246 // Test with_subscriptions
1247 let opts2 = SocketOptions::new().with_subscriptions(vec![
1248 bytes::Bytes::from("topic1"),
1249 bytes::Bytes::from("topic2"),
1250 ]);
1251
1252 assert_eq!(opts2.subscriptions.len(), 2);
1253
1254 // Test with_unsubscribe
1255 let opts3 = opts.with_unsubscribe(bytes::Bytes::from("admin."));
1256 assert_eq!(opts3.unsubscriptions.len(), 1);
1257 assert_eq!(opts3.unsubscriptions[0], bytes::Bytes::from("admin."));
1258 }
1259}