monocoque-rs-core 0.1.1

Protocol-agnostic messaging kernel with io_uring-based I/O
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
//! Socket configuration options
//!
//! This module provides configuration options for `ZeroMQ` sockets, similar to
//! libzmq's socket options (`zmq_setsockopt/zmq_getsockopt`).

use std::time::Duration;

/// Socket configuration options.
///
/// These options control socket behavior including timeouts, buffer sizes,
/// and reliability features. This struct consolidates all socket configuration
/// in one place, following the `MongoDB` Rust driver pattern.
///
/// # Examples
///
/// ```
/// use monocoque_core::options::SocketOptions;
/// use std::time::Duration;
///
/// // Simple case: use defaults
/// let opts = SocketOptions::default();
///
/// // Customize timeouts and buffers
/// let opts = SocketOptions::default()
///     .with_recv_timeout(Duration::from_secs(5))
///     .with_send_timeout(Duration::from_secs(5))
///     .with_buffer_sizes(16384, 16384);  // 16KB buffers for high-throughput
/// ```
#[derive(Debug, Clone)]
pub struct SocketOptions {
    /// Read buffer size (bytes)
    ///
    /// Size of arena-allocated buffer for receiving data.
    /// - Default: 8192 (8KB) - balanced for most workloads
    /// - Small (4KB): Low-latency with small messages (< 1KB)
    /// - Large (16KB): High-throughput with large messages (> 8KB)
    pub read_buffer_size: usize,

    /// Write buffer size (bytes)
    ///
    /// Initial capacity of `BytesMut` buffer for sending data.
    /// - Default: 8192 (8KB) - balanced for most workloads
    /// - Small (4KB): Low-latency with small messages
    /// - Large (16KB): High-throughput with large messages
    pub write_buffer_size: usize,

    /// Receive timeout (`ZMQ_RCVTIMEO`)
    ///
    /// Maximum time to wait for a receive operation.
    /// - `None`: Block indefinitely (default)
    /// - `Some(Duration::ZERO)`: Non-blocking (return immediately with EAGAIN)
    /// - `Some(duration)`: Wait up to duration before returning EAGAIN
    pub recv_timeout: Option<Duration>,

    /// Send timeout (`ZMQ_SNDTIMEO`)
    ///
    /// Maximum time to wait for a send operation.
    /// - `None`: Block indefinitely (default)
    /// - `Some(Duration::ZERO)`: Non-blocking (return immediately with EAGAIN)
    /// - `Some(duration)`: Wait up to duration before returning EAGAIN
    pub send_timeout: Option<Duration>,

    /// Handshake timeout (`ZMQ_HANDSHAKE_IVL`)
    ///
    /// Maximum time to complete ZMTP handshake after connection.
    /// - Default: 30 seconds
    /// - Set to `Duration::ZERO` to disable timeout
    pub handshake_timeout: Duration,

    /// Linger timeout (`ZMQ_LINGER`)
    ///
    /// Time to wait for pending messages to be sent before closing socket.
    /// - `None`: Close immediately, discard pending messages
    /// - `Some(Duration::ZERO)`: Same as None
    /// - `Some(duration)`: Wait up to duration for messages to be sent
    pub linger: Option<Duration>,

    /// Reconnect interval (`ZMQ_RECONNECT_IVL`)
    ///
    /// Initial reconnection delay after connection loss.
    /// - Default: 100ms
    /// - Use with `reconnect_ivl_max` for exponential backoff
    pub reconnect_ivl: Duration,

    /// Maximum reconnect interval (`ZMQ_RECONNECT_IVL_MAX`)
    ///
    /// Maximum reconnection delay for exponential backoff.
    /// - Default: 0 (no maximum, use `reconnect_ivl` always)
    /// - When > 0: Doubles `reconnect_ivl` up to this value
    pub reconnect_ivl_max: Duration,

    /// Connection timeout (`ZMQ_CONNECT_TIMEOUT`)
    ///
    /// Maximum time to wait for TCP connection to complete.
    /// - Default: 0 (use OS default)
    pub connect_timeout: Duration,

    /// High water mark for receiving (`ZMQ_RCVHWM`)
    ///
    /// Maximum number of messages to queue for receiving.
    /// When reached, socket will block or drop messages depending on socket type.
    /// - Default: 1000 messages
    pub recv_hwm: usize,

    /// High water mark for sending (`ZMQ_SNDHWM`)
    ///
    /// Maximum number of messages to queue for sending.
    /// When reached, socket will block or drop messages depending on socket type.
    /// - Default: 1000 messages
    pub send_hwm: usize,

    /// Enable immediate connect mode (`ZMQ_IMMEDIATE`)
    ///
    /// - `false` (default): Queue messages while connecting
    /// - `true`: Report error if no connection established
    pub immediate: bool,

    /// Maximum message size (`ZMQ_MAXMSGSIZE`)
    ///
    /// Maximum size of a single message in bytes.
    /// - `None`: No limit (default)
    /// - `Some(size)`: Reject messages larger than size
    pub max_msg_size: Option<usize>,

    /// Socket identity / routing ID (`ZMQ_ROUTING_ID` / `ZMQ_IDENTITY`)
    ///
    /// Identity for ROUTER addressing. If None, a random UUID is generated.
    /// - Default: None (auto-generate)
    /// - Custom: Set for stable identity across reconnections
    pub routing_id: Option<bytes::Bytes>,

    /// Connect routing ID (`ZMQ_CONNECT_ROUTING_ID`)
    ///
    /// Identity to assign to the next outgoing connection.
    /// Used by ROUTER sockets to assign a specific identity to a peer.
    /// - Default: None (auto-generate)
    /// - Custom: Assign explicit identity to next connection
    /// - Consumed after each connect operation
    pub connect_routing_id: Option<bytes::Bytes>,

    /// ROUTER mandatory mode (`ZMQ_ROUTER_MANDATORY`)
    ///
    /// - `false` (default): Silently drop messages to unknown peers
    /// - `true`: Return error when sending to unknown peer
    pub router_mandatory: bool,

    /// ROUTER handover mode (`ZMQ_ROUTER_HANDOVER`)
    ///
    /// - `false` (default): Disconnect old peer when new peer with same identity connects
    /// - `true`: Hand over pending messages to new peer with same identity
    pub router_handover: bool,

    /// Probe ROUTER on connect (`ZMQ_PROBE_ROUTER`)
    ///
    /// - `false` (default): Normal operation
    /// - `true`: Send empty message on connect to probe ROUTER identity
    pub probe_router: bool,

    /// XPUB verbose mode (`ZMQ_XPUB_VERBOSE`)
    ///
    /// - `false` (default): Only report new subscriptions
    /// - `true`: Report all subscription messages (including duplicates)
    pub xpub_verbose: bool,

    /// XPUB manual mode (`ZMQ_XPUB_MANUAL`)
    ///
    /// - `false` (default): Automatic subscription management
    /// - `true`: Manual subscription control via `send()`
    pub xpub_manual: bool,

    /// XPUB welcome message (`ZMQ_XPUB_WELCOME_MSG`)
    ///
    /// Message to send to new subscribers on connection.
    /// Useful for last value cache (LVC) patterns.
    pub xpub_welcome_msg: Option<bytes::Bytes>,

    /// XSUB verbose unsubscribe (`ZMQ_XSUB_VERBOSE_UNSUBSCRIBE`)
    ///
    /// - `false` (default): Don't send explicit unsubscribe messages
    /// - `true`: Send unsubscribe messages upstream
    pub xsub_verbose_unsubs: bool,

    /// Conflate messages (`ZMQ_CONFLATE`)
    ///
    /// - `false` (default): Queue all messages
    /// - `true`: Keep only last message (overwrite queue)
    pub conflate: bool,

    /// TCP keepalive (`ZMQ_TCP_KEEPALIVE`)
    ///
    /// - `-1` (default): Use OS default
    /// - `0`: Disable TCP keepalive
    /// - `1`: Enable TCP keepalive
    pub tcp_keepalive: i32,

    /// TCP keepalive count (`ZMQ_TCP_KEEPALIVE_CNT`)
    ///
    /// Number of keepalive probes before considering connection dead.
    /// - `-1` (default): Use OS default
    /// - `> 0`: Number of probes
    pub tcp_keepalive_cnt: i32,

    /// TCP keepalive idle (`ZMQ_TCP_KEEPALIVE_IDLE`)
    ///
    /// Time in seconds before starting keepalive probes.
    /// - `-1` (default): Use OS default
    /// - `> 0`: Idle time in seconds
    pub tcp_keepalive_idle: i32,

    /// TCP keepalive interval (`ZMQ_TCP_KEEPALIVE_INTVL`)
    ///
    /// Time in seconds between keepalive probes.
    /// - `-1` (default): Use OS default
    /// - `> 0`: Interval in seconds
    pub tcp_keepalive_intvl: i32,

    /// REQ correlate mode (`ZMQ_REQ_CORRELATE`)
    ///
    /// Match replies to requests using message envelope.
    /// - `false` (default): Accept any reply
    /// - `true`: Match reply envelope to request
    pub req_correlate: bool,

    /// REQ relaxed mode (`ZMQ_REQ_RELAXED`)
    ///
    /// Allow multiple outstanding requests without strict alternation.
    /// - `false` (default): Strict send-recv-send-recv pattern
    /// - `true`: Allow send-send-recv-recv pattern
    pub req_relaxed: bool,

    /// Multicast rate in kilobits per second (`ZMQ_RATE`)
    ///
    /// Maximum send or receive data rate for multicast transports (PGM/EPGM).
    /// - Default: 100 kbps
    pub rate: i32,

    /// Multicast recovery interval (`ZMQ_RECOVERY_IVL`)
    ///
    /// Maximum time to recover lost messages on multicast transports.
    /// - Default: 10 seconds
    pub recovery_ivl: Duration,

    /// OS-level send buffer size (`ZMQ_SNDBUF`)
    ///
    /// Size of kernel send buffer. 0 = OS default.
    /// - Default: 0 (use OS default)
    pub sndbuf: i32,

    /// OS-level receive buffer size (`ZMQ_RCVBUF`)
    ///
    /// Size of kernel receive buffer. 0 = OS default.
    /// - Default: 0 (use OS default)
    pub rcvbuf: i32,

    /// Multicast TTL (`ZMQ_MULTICAST_HOPS`)
    ///
    /// Time-to-live for multicast packets.
    /// - Default: 1 (local network only)
    pub multicast_hops: i32,

    /// IP Type of Service (`ZMQ_TOS`)
    ///
    /// Sets the `ToS` field in IP headers for `QoS`.
    /// - Default: 0 (normal service)
    pub tos: i32,

    /// Maximum multicast transmission unit (`ZMQ_MULTICAST_MAXTPDU`)
    ///
    /// Maximum transport data unit for multicast.
    /// - Default: 1500 bytes
    pub multicast_maxtpdu: i32,

    /// IPv6 support (`ZMQ_IPV6`)
    ///
    /// Enable IPv6 on socket.
    /// - `false` (default): IPv4 only
    /// - `true`: IPv6 support enabled
    pub ipv6: bool,

    /// Bind to device (`ZMQ_BINDTODEVICE`)
    ///
    /// Bind socket to specific network interface (Linux only).
    /// - Default: None (bind to all interfaces)
    pub bind_to_device: Option<String>,

    // --- Security Options ---
    /// PLAIN server mode (`ZMQ_PLAIN_SERVER`)
    ///
    /// Enable PLAIN authentication as server.
    /// - `false` (default): Client mode
    /// - `true`: Server mode (validate credentials)
    pub plain_server: bool,

    /// PLAIN username (`ZMQ_PLAIN_USERNAME`)
    ///
    /// Username for PLAIN authentication (client side).
    /// - Default: None (no authentication)
    pub plain_username: Option<String>,

    /// PLAIN password (`ZMQ_PLAIN_PASSWORD`)
    ///
    /// Password for PLAIN authentication (client side).
    /// - Default: None (no authentication)
    pub plain_password: Option<String>,

    /// CURVE server mode (`ZMQ_CURVE_SERVER`)
    ///
    /// Enable CURVE encryption as server.
    /// - `false` (default): Client mode
    /// - `true`: Server mode (provide server key)
    pub curve_server: bool,

    /// CURVE public key (`ZMQ_CURVE_PUBLICKEY`)
    ///
    /// Local public key for CURVE (32 bytes).
    /// - Default: None (no encryption)
    pub curve_publickey: Option<[u8; 32]>,

    /// CURVE secret key (`ZMQ_CURVE_SECRETKEY`)
    ///
    /// Local secret key for CURVE (32 bytes).
    /// - Default: None (no encryption)
    pub curve_secretkey: Option<[u8; 32]>,

    /// CURVE server key (`ZMQ_CURVE_SERVERKEY`)
    ///
    /// Server's public key for CURVE client (32 bytes).
    /// - Default: None (no encryption)
    /// - Client must set this to verify server identity
    pub curve_serverkey: Option<[u8; 32]>,

    /// ZAP domain (`ZMQ_ZAP_DOMAIN`)
    ///
    /// Security domain for ZAP authentication.
    /// - Default: "" (global domain)
    pub zap_domain: String,

    /// Subscriptions (`ZMQ_SUBSCRIBE`)
    ///
    /// Subscription filters for SUB/XSUB sockets.
    /// - Empty vec: No subscriptions (default) - won't receive any messages
    /// - vec![b""] or vec![`Bytes::new()`]: Subscribe to all messages
    /// - vec![b"topic1", b"topic2"]: Subscribe to specific topics
    ///
    /// Note: SUB sockets MUST subscribe to at least one topic to receive messages.
    pub subscriptions: Vec<bytes::Bytes>,

    /// Unsubscriptions (`ZMQ_UNSUBSCRIBE`)
    ///
    /// Subscription filters to remove for SUB/XSUB sockets.
    /// Applied after subscriptions during socket configuration.
    pub unsubscriptions: Vec<bytes::Bytes>,

    /// Maximum reconnection attempts (`ZMQ_RECONNECT_STOP`)
    ///
    /// Maximum number of times to attempt reconnection after a disconnect.
    /// - `None`: Retry indefinitely (default, matches libzmq behaviour)
    /// - `Some(n)`: Give up and return `NotConnected` after n attempts
    pub max_reconnect_attempts: Option<u32>,

    /// ZMTP heartbeat interval (`ZMQ_HEARTBEAT_IVL` = 75)
    ///
    /// How often to send PING heartbeat commands on an otherwise idle connection.
    /// - `None`: Disabled (default)
    /// - `Some(dur)`: Send PING every `dur` of inactivity
    pub heartbeat_ivl: Option<Duration>,

    /// ZMTP heartbeat TTL (`ZMQ_HEARTBEAT_TTL` = 76)
    ///
    /// Time-to-live for the remote peer's heartbeat (sent in PING command).
    /// The remote will disconnect if it doesn't receive a heartbeat within this interval.
    /// - `None`: Use `heartbeat_ivl` (default)
    /// - `Some(dur)`: Override TTL sent to peer
    pub heartbeat_ttl: Option<Duration>,

    /// ZMTP heartbeat timeout (`ZMQ_HEARTBEAT_TIMEOUT` = 77)
    ///
    /// How long to wait for a PONG reply before considering the connection dead.
    /// - `None`: Use `heartbeat_ivl` (default)
    /// - `Some(dur)`: Custom timeout (recommended: 2–5× heartbeat_ivl)
    pub heartbeat_timeout: Option<Duration>,

    /// ROUTER raw mode (`ZMQ_ROUTER_RAW` = 41)
    ///
    /// Put ROUTER socket into raw mode (no ZMTP handshake, acts like STREAM).
    /// - `false` (default): Normal ZMTP routing
    /// - `true`: Raw TCP bridging mode
    pub router_raw: bool,

    /// STREAM connect/disconnect notifications (`ZMQ_STREAM_NOTIFY` = 73)
    ///
    /// Send empty notification frames on connect and disconnect.
    /// - `true` (default): Send notification frames
    /// - `false`: Suppress notification frames
    pub stream_notify: bool,

    /// XPUB no-drop mode (`ZMQ_XPUB_NODROP` = 69)
    ///
    /// - `false` (default): Drop messages silently when HWM is reached
    /// - `true`: Return error (`EAGAIN`) instead of dropping
    pub xpub_nodrop: bool,

    /// Invert topic matching (`ZMQ_INVERT_MATCHING` = 74)
    ///
    /// Invert the subscription filter logic for PUB/SUB and XPUB/XSUB.
    /// - `false` (default): Deliver messages matching subscriptions
    /// - `true`: Deliver messages NOT matching any subscription
    pub invert_matching: bool,
}

impl Default for SocketOptions {
    fn default() -> Self {
        Self {
            recv_timeout: None, // Block indefinitely
            send_timeout: None, // Block indefinitely
            handshake_timeout: Duration::from_secs(30),
            linger: Some(Duration::from_secs(30)), // Wait 30s for pending messages
            reconnect_ivl: Duration::from_millis(100),
            reconnect_ivl_max: Duration::ZERO, // No maximum
            connect_timeout: Duration::ZERO,   // Use OS default
            recv_hwm: 1000,
            send_hwm: 1000,
            immediate: false,
            max_msg_size: None,      // No limit
            read_buffer_size: 8192,  // 8KB - balanced default
            write_buffer_size: 8192, // 8KB - balanced default
            routing_id: None,
            connect_routing_id: None,
            router_mandatory: false,
            router_handover: false,
            probe_router: false,
            xpub_verbose: false,
            xpub_manual: false,
            xpub_welcome_msg: None,
            xsub_verbose_unsubs: false,
            conflate: false,
            tcp_keepalive: -1,       // OS default
            tcp_keepalive_cnt: -1,   // OS default
            tcp_keepalive_idle: -1,  // OS default
            tcp_keepalive_intvl: -1, // OS default
            req_correlate: false,
            req_relaxed: false,
            rate: 100, // 100 kbps
            recovery_ivl: Duration::from_secs(10),
            sndbuf: 0,               // OS default
            rcvbuf: 0,               // OS default
            multicast_hops: 1,       // Local network only
            tos: 0,                  // Normal service
            multicast_maxtpdu: 1500, // Standard MTU
            ipv6: false,             // IPv4 only
            bind_to_device: None,    // All interfaces
            // Security
            plain_server: false,
            plain_username: None,
            plain_password: None,
            curve_server: false,
            curve_publickey: None,
            curve_secretkey: None,
            curve_serverkey: None,
            zap_domain: String::new(),    // Global domain
            subscriptions: Vec::new(),    // No subscriptions
            unsubscriptions: Vec::new(),  // No unsubscriptions
            max_reconnect_attempts: None, // Retry indefinitely
            heartbeat_ivl: None,
            heartbeat_ttl: None,
            heartbeat_timeout: None,
            router_raw: false,
            stream_notify: true,
            xpub_nodrop: false,
            invert_matching: false,
        }
    }
}

impl SocketOptions {
    /// Create new socket options with default values (8KB buffers).
    #[must_use]
    pub fn new() -> Self {
        Self::default()
    }

    /// Create socket options optimized for small messages (< 1KB).
    ///
    /// Sets 4KB buffers, suitable for low-latency request-reply patterns.
    ///
    /// # Examples
    ///
    /// ```
    /// use monocoque_core::options::SocketOptions;
    ///
    /// let opts = SocketOptions::small();  // 4KB buffers for REQ/REP
    /// ```
    #[must_use]
    pub fn small() -> Self {
        Self {
            read_buffer_size: 4096,
            write_buffer_size: 4096,
            ..Self::default()
        }
    }

    /// Create socket options optimized for large messages (> 8KB).
    ///
    /// Sets 16KB buffers, suitable for high-throughput async patterns.
    ///
    /// # Examples
    ///
    /// ```
    /// use monocoque_core::options::SocketOptions;
    ///
    /// let opts = SocketOptions::large();  // 16KB buffers for DEALER/ROUTER
    /// ```
    #[must_use]
    pub fn large() -> Self {
        Self {
            read_buffer_size: 16384,
            write_buffer_size: 16384,
            ..Self::default()
        }
    }

    /// Set receive timeout.
    ///
    /// # Examples
    ///
    /// ```
    /// use monocoque_core::options::SocketOptions;
    /// use std::time::Duration;
    ///
    /// // Non-blocking receive
    /// let opts = SocketOptions::new().with_recv_timeout(Duration::ZERO);
    ///
    /// // 5 second timeout
    /// let opts = SocketOptions::new().with_recv_timeout(Duration::from_secs(5));
    /// ```
    pub const fn with_recv_timeout(mut self, timeout: Duration) -> Self {
        self.recv_timeout = Some(timeout);
        self
    }

    /// Set send timeout.
    pub const fn with_send_timeout(mut self, timeout: Duration) -> Self {
        self.send_timeout = Some(timeout);
        self
    }

    /// Set handshake timeout.
    pub const fn with_handshake_timeout(mut self, timeout: Duration) -> Self {
        self.handshake_timeout = timeout;
        self
    }

    /// Set linger timeout.
    pub const fn with_linger(mut self, linger: Option<Duration>) -> Self {
        self.linger = linger;
        self
    }

    /// Set reconnection interval.
    pub const fn with_reconnect_ivl(mut self, ivl: Duration) -> Self {
        self.reconnect_ivl = ivl;
        self
    }

    /// Set maximum reconnection interval for exponential backoff.
    pub const fn with_reconnect_ivl_max(mut self, max: Duration) -> Self {
        self.reconnect_ivl_max = max;
        self
    }

    /// Set maximum number of reconnection attempts.
    ///
    /// `None` retries indefinitely (default); `Some(n)` gives up after n attempts.
    pub const fn with_max_reconnect_attempts(mut self, max: Option<u32>) -> Self {
        self.max_reconnect_attempts = max;
        self
    }

    /// Set connection timeout.
    pub const fn with_connect_timeout(mut self, timeout: Duration) -> Self {
        self.connect_timeout = timeout;
        self
    }

    /// Set heartbeat interval (`ZMQ_HEARTBEAT_IVL`).
    pub const fn with_heartbeat_ivl(mut self, ivl: Duration) -> Self {
        self.heartbeat_ivl = Some(ivl);
        self
    }

    /// Set heartbeat TTL (`ZMQ_HEARTBEAT_TTL`).
    pub const fn with_heartbeat_ttl(mut self, ttl: Duration) -> Self {
        self.heartbeat_ttl = Some(ttl);
        self
    }

    /// Set heartbeat timeout (`ZMQ_HEARTBEAT_TIMEOUT`).
    pub const fn with_heartbeat_timeout(mut self, timeout: Duration) -> Self {
        self.heartbeat_timeout = Some(timeout);
        self
    }

    /// Enable or disable ROUTER raw mode (`ZMQ_ROUTER_RAW`).
    pub const fn with_router_raw(mut self, raw: bool) -> Self {
        self.router_raw = raw;
        self
    }

    /// Enable or disable STREAM connect/disconnect notifications (`ZMQ_STREAM_NOTIFY`).
    pub const fn with_stream_notify(mut self, notify: bool) -> Self {
        self.stream_notify = notify;
        self
    }

    /// Enable XPUB no-drop mode (`ZMQ_XPUB_NODROP`).
    pub const fn with_xpub_nodrop(mut self, nodrop: bool) -> Self {
        self.xpub_nodrop = nodrop;
        self
    }

    /// Enable inverted topic matching (`ZMQ_INVERT_MATCHING`).
    pub const fn with_invert_matching(mut self, invert: bool) -> Self {
        self.invert_matching = invert;
        self
    }

    /// Set receive high water mark.
    pub const fn with_recv_hwm(mut self, hwm: usize) -> Self {
        self.recv_hwm = hwm;
        self
    }

    /// Set send high water mark.
    pub const fn with_send_hwm(mut self, hwm: usize) -> Self {
        self.send_hwm = hwm;
        self
    }

    /// Enable or disable immediate mode.
    pub const fn with_immediate(mut self, immediate: bool) -> Self {
        self.immediate = immediate;
        self
    }

    /// Set maximum message size.
    pub const fn with_max_msg_size(mut self, size: Option<usize>) -> Self {
        self.max_msg_size = size;
        self
    }

    /// Set read buffer size.
    ///
    /// # Examples
    ///
    /// ```
    /// use monocoque_core::options::SocketOptions;
    ///
    /// // Small buffers for low latency
    /// let opts = SocketOptions::new().with_read_buffer_size(4096);
    ///
    /// // Large buffers for throughput
    /// let opts = SocketOptions::new().with_read_buffer_size(16384);
    /// ```
    pub const fn with_read_buffer_size(mut self, size: usize) -> Self {
        self.read_buffer_size = size;
        self
    }

    /// Set write buffer size.
    pub const fn with_write_buffer_size(mut self, size: usize) -> Self {
        self.write_buffer_size = size;
        self
    }

    /// Set both read and write buffer sizes (convenience method).
    ///
    /// # Examples
    ///
    /// ```
    /// use monocoque_core::options::SocketOptions;
    ///
    /// // Small buffers for both
    /// let opts = SocketOptions::new().with_buffer_sizes(4096, 4096);
    /// ```
    pub const fn with_buffer_sizes(mut self, read_size: usize, write_size: usize) -> Self {
        self.read_buffer_size = read_size;
        self.write_buffer_size = write_size;
        self
    }

    /// Set socket routing ID / identity.
    ///
    /// # Examples
    ///
    /// ```
    /// use monocoque_core::options::SocketOptions;
    /// use bytes::Bytes;
    ///
    /// let opts = SocketOptions::new()
    ///     .with_routing_id(Bytes::from_static(b"worker-01"));
    /// ```
    pub fn with_routing_id(mut self, id: bytes::Bytes) -> Self {
        self.routing_id = Some(id);
        self
    }

    /// Set connect routing ID for the next connection.
    ///
    /// This option is consumed after each connect operation and must be set
    /// again for subsequent connections.
    ///
    /// # Examples
    ///
    /// ```
    /// use monocoque_core::options::SocketOptions;
    /// use bytes::Bytes;
    ///
    /// let opts = SocketOptions::new()
    ///     .with_connect_routing_id(Bytes::from_static(b"client-001"));
    /// ```
    pub fn with_connect_routing_id(mut self, id: bytes::Bytes) -> Self {
        self.connect_routing_id = Some(id);
        self
    }

    /// Enable ROUTER mandatory mode.
    pub const fn with_router_mandatory(mut self, enabled: bool) -> Self {
        self.router_mandatory = enabled;
        self
    }

    /// Enable ROUTER handover mode.
    pub const fn with_router_handover(mut self, enabled: bool) -> Self {
        self.router_handover = enabled;
        self
    }

    /// Enable ROUTER probe on connect.
    pub const fn with_probe_router(mut self, enabled: bool) -> Self {
        self.probe_router = enabled;
        self
    }

    /// Enable XPUB verbose mode.
    pub const fn with_xpub_verbose(mut self, enabled: bool) -> Self {
        self.xpub_verbose = enabled;
        self
    }

    /// Enable XPUB manual mode.
    pub const fn with_xpub_manual(mut self, enabled: bool) -> Self {
        self.xpub_manual = enabled;
        self
    }

    /// Set XPUB welcome message.
    pub fn with_xpub_welcome_msg(mut self, msg: bytes::Bytes) -> Self {
        self.xpub_welcome_msg = Some(msg);
        self
    }

    /// Enable XSUB verbose unsubscribe.
    pub const fn with_xsub_verbose_unsubs(mut self, enabled: bool) -> Self {
        self.xsub_verbose_unsubs = enabled;
        self
    }

    /// Enable message conflation (keep only last message).
    pub const fn with_conflate(mut self, enabled: bool) -> Self {
        self.conflate = enabled;
        self
    }

    /// Set TCP keepalive mode.
    ///
    /// # Arguments
    ///
    /// * `mode` - `-1` for OS default, `0` to disable, `1` to enable
    pub const fn with_tcp_keepalive(mut self, mode: i32) -> Self {
        self.tcp_keepalive = mode;
        self
    }

    /// Set TCP keepalive count (number of probes before timeout).
    ///
    /// # Arguments
    ///
    /// * `count` - `-1` for OS default, `> 0` for specific count
    pub const fn with_tcp_keepalive_cnt(mut self, count: i32) -> Self {
        self.tcp_keepalive_cnt = count;
        self
    }

    /// Set TCP keepalive idle time (seconds before first probe).
    ///
    /// # Arguments
    ///
    /// * `seconds` - `-1` for OS default, `> 0` for specific idle time
    pub const fn with_tcp_keepalive_idle(mut self, seconds: i32) -> Self {
        self.tcp_keepalive_idle = seconds;
        self
    }

    /// Set TCP keepalive interval (seconds between probes).
    ///
    /// # Arguments
    ///
    /// * `seconds` - `-1` for OS default, `> 0` for specific interval
    pub const fn with_tcp_keepalive_intvl(mut self, seconds: i32) -> Self {
        self.tcp_keepalive_intvl = seconds;
        self
    }

    /// Enable REQ correlation mode (match replies to requests).
    pub const fn with_req_correlate(mut self, enabled: bool) -> Self {
        self.req_correlate = enabled;
        self
    }

    /// Enable REQ relaxed mode (allow multiple outstanding requests).
    pub const fn with_req_relaxed(mut self, enabled: bool) -> Self {
        self.req_relaxed = enabled;
        self
    }

    /// Set multicast rate (`ZMQ_RATE`).
    pub const fn with_rate(mut self, rate: i32) -> Self {
        self.rate = rate;
        self
    }

    /// Set multicast recovery interval (`ZMQ_RECOVERY_IVL`).
    pub const fn with_recovery_ivl(mut self, interval: Duration) -> Self {
        self.recovery_ivl = interval;
        self
    }

    /// Set OS send buffer size (`ZMQ_SNDBUF`).
    pub const fn with_sndbuf(mut self, size: i32) -> Self {
        self.sndbuf = size;
        self
    }

    /// Set OS receive buffer size (`ZMQ_RCVBUF`).
    pub const fn with_rcvbuf(mut self, size: i32) -> Self {
        self.rcvbuf = size;
        self
    }

    /// Set multicast TTL/hops (`ZMQ_MULTICAST_HOPS`).
    pub const fn with_multicast_hops(mut self, hops: i32) -> Self {
        self.multicast_hops = hops;
        self
    }

    /// Set IP Type of Service (`ZMQ_TOS`).
    pub const fn with_tos(mut self, tos: i32) -> Self {
        self.tos = tos;
        self
    }

    /// Set multicast maximum TPU (`ZMQ_MULTICAST_MAXTPDU`).
    pub const fn with_multicast_maxtpdu(mut self, mtu: i32) -> Self {
        self.multicast_maxtpdu = mtu;
        self
    }

    /// Enable IPv6 support (`ZMQ_IPV6`).
    pub const fn with_ipv6(mut self, enabled: bool) -> Self {
        self.ipv6 = enabled;
        self
    }

    /// Bind to specific device (`ZMQ_BINDTODEVICE`) - Linux only.
    pub fn with_bind_to_device(mut self, device: impl Into<String>) -> Self {
        self.bind_to_device = Some(device.into());
        self
    }

    // --- Security Options ---

    /// Enable PLAIN server mode.
    ///
    /// # Examples
    ///
    /// ```
    /// use monocoque_core::options::SocketOptions;
    ///
    /// let opts = SocketOptions::new().with_plain_server(true);
    /// ```
    pub const fn with_plain_server(mut self, enabled: bool) -> Self {
        self.plain_server = enabled;
        self
    }

    /// Set PLAIN client credentials.
    ///
    /// # Examples
    ///
    /// ```
    /// use monocoque_core::options::SocketOptions;
    ///
    /// let opts = SocketOptions::new()
    ///     .with_plain_credentials("admin", "secret123");
    /// ```
    pub fn with_plain_credentials(
        mut self,
        username: impl Into<String>,
        password: impl Into<String>,
    ) -> Self {
        self.plain_username = Some(username.into());
        self.plain_password = Some(password.into());
        self
    }

    /// Enable CURVE server mode.
    ///
    /// # Examples
    ///
    /// ```
    /// use monocoque_core::options::SocketOptions;
    ///
    /// let opts = SocketOptions::new().with_curve_server(true);
    /// ```
    pub const fn with_curve_server(mut self, enabled: bool) -> Self {
        self.curve_server = enabled;
        self
    }

    /// Set CURVE client keys (public + secret).
    ///
    /// # Examples
    ///
    /// ```
    /// use monocoque_core::options::SocketOptions;
    ///
    /// let public = [0u8; 32];  // Replace with actual key
    /// let secret = [0u8; 32];  // Replace with actual key
    /// let opts = SocketOptions::new().with_curve_keypair(public, secret);
    /// ```
    pub const fn with_curve_keypair(mut self, publickey: [u8; 32], secretkey: [u8; 32]) -> Self {
        self.curve_publickey = Some(publickey);
        self.curve_secretkey = Some(secretkey);
        self
    }

    /// Set CURVE server public key (for client).
    ///
    /// # Examples
    ///
    /// ```
    /// use monocoque_core::options::SocketOptions;
    ///
    /// let server_key = [0u8; 32];  // Server's public key
    /// let opts = SocketOptions::new().with_curve_serverkey(server_key);
    /// ```
    pub const fn with_curve_serverkey(mut self, serverkey: [u8; 32]) -> Self {
        self.curve_serverkey = Some(serverkey);
        self
    }

    /// Set ZAP domain for authentication.
    ///
    /// # Examples
    ///
    /// ```
    /// use monocoque_core::options::SocketOptions;
    ///
    /// let opts = SocketOptions::new().with_zap_domain("production");
    /// ```
    pub fn with_zap_domain(mut self, domain: impl Into<String>) -> Self {
        self.zap_domain = domain.into();
        self
    }

    /// Add a subscription filter for SUB/XSUB sockets (`ZMQ_SUBSCRIBE`).
    ///
    /// SUB sockets MUST subscribe to at least one topic to receive messages.
    /// An empty filter (b"" or `Bytes::new()`) subscribes to all messages.
    ///
    /// # Examples
    ///
    /// ```
    /// use monocoque_core::options::SocketOptions;
    /// use bytes::Bytes;
    ///
    /// // Subscribe to all messages
    /// let opts = SocketOptions::new().with_subscribe(Bytes::new());
    ///
    /// // Subscribe to specific topics
    /// let opts = SocketOptions::new()
    ///     .with_subscribe(Bytes::from("weather."))
    ///     .with_subscribe(Bytes::from("stocks."));
    /// ```
    pub fn with_subscribe(mut self, filter: bytes::Bytes) -> Self {
        self.subscriptions.push(filter);
        self
    }

    /// Add multiple subscription filters for SUB/XSUB sockets.
    ///
    /// Convenience method to subscribe to multiple topics at once.
    ///
    /// # Examples
    ///
    /// ```
    /// use monocoque_core::options::SocketOptions;
    /// use bytes::Bytes;
    ///
    /// let opts = SocketOptions::new()
    ///     .with_subscriptions(vec![
    ///         Bytes::from("weather."),
    ///         Bytes::from("stocks."),
    ///     ]);
    /// ```
    pub fn with_subscriptions(mut self, filters: Vec<bytes::Bytes>) -> Self {
        self.subscriptions.extend(filters);
        self
    }

    /// Add an unsubscription filter for SUB/XSUB sockets (`ZMQ_UNSUBSCRIBE`).
    ///
    /// Removes a previously added subscription filter.
    ///
    /// # Examples
    ///
    /// ```
    /// use monocoque_core::options::SocketOptions;
    /// use bytes::Bytes;
    ///
    /// let opts = SocketOptions::new()
    ///     .with_subscribe(Bytes::new())  // Subscribe to all
    ///     .with_unsubscribe(Bytes::from("admin.")); // Except admin topics
    /// ```
    pub fn with_unsubscribe(mut self, filter: bytes::Bytes) -> Self {
        self.unsubscriptions.push(filter);
        self
    }

    // --- Query Methods ---

    /// Check if receive operation should be non-blocking.
    pub const fn is_recv_nonblocking(&self) -> bool {
        matches!(self.recv_timeout, Some(d) if d.is_zero())
    }

    /// Check if send operation should be non-blocking.
    pub const fn is_send_nonblocking(&self) -> bool {
        matches!(self.send_timeout, Some(d) if d.is_zero())
    }

    /// Validate routing ID for use with ROUTER sockets.
    ///
    /// ROUTER socket identities must:
    /// - Be 1-255 bytes long
    /// - Not start with null byte (0x00) which is reserved for auto-generated IDs
    pub fn validate_router_identity(id: &[u8]) -> std::io::Result<()> {
        if id.is_empty() {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidInput,
                "routing ID cannot be empty",
            ));
        }

        if id.len() > 255 {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidInput,
                format!("routing ID cannot exceed 255 bytes (got {})", id.len()),
            ));
        }

        if id[0] == 0x00 {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidInput,
                "routing ID cannot start with null byte (reserved for auto-generated IDs)",
            ));
        }

        Ok(())
    }

    /// Validate general routing ID (for DEALER, REQ, REP).
    ///
    /// Less strict than ROUTER identities - allows null prefix.
    pub fn validate_routing_id(id: &[u8]) -> std::io::Result<()> {
        if id.len() > 255 {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidInput,
                format!("routing ID cannot exceed 255 bytes (got {})", id.len()),
            ));
        }
        Ok(())
    }

    /// Get the current reconnection interval with exponential backoff.
    ///
    /// Returns the interval to use, considering exponential backoff
    /// and the maximum interval setting.
    pub fn next_reconnect_ivl(&self, attempt: u32) -> Duration {
        if self.reconnect_ivl_max.is_zero() {
            // No exponential backoff, always use base interval
            return self.reconnect_ivl;
        }

        // Calculate exponential backoff: base * 2^attempt
        let backoff = self
            .reconnect_ivl
            .saturating_mul(2u32.saturating_pow(attempt));

        // Cap at maximum interval
        backoff.min(self.reconnect_ivl_max)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_default_options() {
        let opts = SocketOptions::default();
        assert!(opts.recv_timeout.is_none());
        assert!(opts.send_timeout.is_none());
        assert_eq!(opts.handshake_timeout, Duration::from_secs(30));
        assert_eq!(opts.reconnect_ivl, Duration::from_millis(100));
        assert_eq!(opts.recv_hwm, 1000);
        assert_eq!(opts.send_hwm, 1000);
    }

    #[test]
    fn test_builder_pattern() {
        let opts = SocketOptions::new()
            .with_recv_timeout(Duration::from_secs(5))
            .with_send_timeout(Duration::from_secs(10))
            .with_recv_hwm(2000);

        assert_eq!(opts.recv_timeout, Some(Duration::from_secs(5)));
        assert_eq!(opts.send_timeout, Some(Duration::from_secs(10)));
        assert_eq!(opts.recv_hwm, 2000);
    }

    #[test]
    fn test_nonblocking_checks() {
        let blocking = SocketOptions::new();
        assert!(!blocking.is_recv_nonblocking());
        assert!(!blocking.is_send_nonblocking());

        let nonblocking = SocketOptions::new()
            .with_recv_timeout(Duration::ZERO)
            .with_send_timeout(Duration::ZERO);
        assert!(nonblocking.is_recv_nonblocking());
        assert!(nonblocking.is_send_nonblocking());
    }

    #[test]
    fn test_exponential_backoff() {
        let opts = SocketOptions::new()
            .with_reconnect_ivl(Duration::from_millis(100))
            .with_reconnect_ivl_max(Duration::from_secs(10));

        // First attempt: 100ms
        assert_eq!(opts.next_reconnect_ivl(0), Duration::from_millis(100));

        // Second attempt: 200ms
        assert_eq!(opts.next_reconnect_ivl(1), Duration::from_millis(200));

        // Third attempt: 400ms
        assert_eq!(opts.next_reconnect_ivl(2), Duration::from_millis(400));

        // Eventually caps at 10s
        assert_eq!(opts.next_reconnect_ivl(10), Duration::from_secs(10));
    }

    #[test]
    fn test_no_exponential_backoff() {
        let opts = SocketOptions::new().with_reconnect_ivl(Duration::from_millis(100));
        // reconnect_ivl_max is 0 by default

        // Always returns base interval
        assert_eq!(opts.next_reconnect_ivl(0), Duration::from_millis(100));
        assert_eq!(opts.next_reconnect_ivl(1), Duration::from_millis(100));
        assert_eq!(opts.next_reconnect_ivl(10), Duration::from_millis(100));
    }

    #[test]
    fn test_routing_id_validation() {
        // Valid ROUTER identities
        assert!(SocketOptions::validate_router_identity(b"client-001").is_ok());
        assert!(SocketOptions::validate_router_identity(&[0x01; 255]).is_ok());

        // Invalid: empty
        assert!(SocketOptions::validate_router_identity(b"").is_err());

        // Invalid: too long
        assert!(SocketOptions::validate_router_identity(&[0x01; 256]).is_err());

        // Invalid: starts with null byte
        assert!(SocketOptions::validate_router_identity(b"\x00client").is_err());
    }

    #[test]
    fn test_general_routing_id_validation() {
        // Valid
        assert!(SocketOptions::validate_routing_id(b"").is_ok()); // Empty allowed
        assert!(SocketOptions::validate_routing_id(b"\x00client").is_ok()); // Null prefix allowed
        assert!(SocketOptions::validate_routing_id(&[0x00; 255]).is_ok());

        // Invalid: too long
        assert!(SocketOptions::validate_routing_id(&[0x01; 256]).is_err());
    }

    #[test]
    fn test_connect_routing_id() {
        let opts =
            SocketOptions::new().with_connect_routing_id(bytes::Bytes::from_static(b"peer-123"));

        assert_eq!(
            opts.connect_routing_id,
            Some(bytes::Bytes::from_static(b"peer-123"))
        );
    }

    #[test]
    fn test_router_options() {
        let opts = SocketOptions::new()
            .with_router_mandatory(true)
            .with_router_handover(true);

        assert!(opts.router_mandatory);
        assert!(opts.router_handover);
    }

    #[test]
    fn test_subscription_options() {
        // Test with_subscribe
        let opts = SocketOptions::new()
            .with_subscribe(bytes::Bytes::new()) // Subscribe to all
            .with_subscribe(bytes::Bytes::from("weather."))
            .with_subscribe(bytes::Bytes::from("stocks."));

        assert_eq!(opts.subscriptions.len(), 3);
        assert_eq!(opts.subscriptions[0], bytes::Bytes::new());
        assert_eq!(opts.subscriptions[1], bytes::Bytes::from("weather."));
        assert_eq!(opts.subscriptions[2], bytes::Bytes::from("stocks."));

        // Test with_subscriptions
        let opts2 = SocketOptions::new().with_subscriptions(vec![
            bytes::Bytes::from("topic1"),
            bytes::Bytes::from("topic2"),
        ]);

        assert_eq!(opts2.subscriptions.len(), 2);

        // Test with_unsubscribe
        let opts3 = opts.with_unsubscribe(bytes::Bytes::from("admin."));
        assert_eq!(opts3.unsubscriptions.len(), 1);
        assert_eq!(opts3.unsubscriptions[0], bytes::Bytes::from("admin."));
    }
}