oximedia-net 0.1.5

Network streaming for OxiMedia
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
//! SRT Connection Factory — unified caller/listener/rendezvous mode management.
//!
//! Provides a single entry point for establishing SRT connections in any of
//! the three protocol modes:
//!
//! | Mode          | Initiator         | Use-case                         |
//! |---------------|-------------------|----------------------------------|
//! | Caller        | This endpoint     | Push a stream to a known server  |
//! | Listener      | Remote endpoint   | Accept inbound streams           |
//! | Rendezvous    | Both endpoints    | NAT traversal / peer-to-peer     |
//!
//! # Architecture
//!
//! [`SrtConnectionFactory`] drives all three state machines
//! (`CallerState`, `ListenerState`, `RendezvousState`) through their handshake
//! sequences.  Because actual UDP I/O is outside the scope of the pure-Rust
//! model (no `std::net::UdpSocket` wrapping here), the factory exposes a
//! *packet-driven* API: callers feed raw [`SrtPacket`] objects and receive
//! response packets to send over UDP.
//!
//! A higher-level `connect_*` family of async helpers simulates the handshake
//! over a loopback pair, which is used in tests and in integration with the
//! existing [`SrtConnection`] async transport.

use super::{
    connection_mode::{CallerState, ConnectionMode, ListenerState, RendezvousState},
    packet::SrtPacket,
    socket::{ConnectionState, SrtConfig},
};
use crate::error::{NetError, NetResult};
use std::net::SocketAddr;
use std::time::{Duration, Instant};

// ─── Connection Mode Descriptor ───────────────────────────────────────────────

/// Parameters controlling how a factory instance establishes a connection.
#[derive(Debug, Clone)]
pub struct ConnectionParams {
    /// Which SRT connection mode to use.
    pub mode: ConnectionMode,
    /// Local bind address (all modes need this).
    pub local_addr: SocketAddr,
    /// Remote peer address (Caller and Rendezvous modes).
    pub peer_addr: Option<SocketAddr>,
    /// SRT socket configuration (latency, passphrase, etc.).
    pub config: SrtConfig,
    /// How long to attempt the handshake before giving up.
    pub connect_timeout: Duration,
    /// Maximum handshake retries (Caller mode).
    pub max_retries: u32,
    /// Retry/wave interval.
    pub retry_interval: Duration,
}

impl ConnectionParams {
    /// Builds caller parameters targeting `peer_addr`.
    #[must_use]
    pub fn caller(local_addr: SocketAddr, peer_addr: SocketAddr, config: SrtConfig) -> Self {
        Self {
            mode: ConnectionMode::Caller,
            local_addr,
            peer_addr: Some(peer_addr),
            config,
            connect_timeout: Duration::from_secs(5),
            max_retries: 20,
            retry_interval: Duration::from_millis(250),
        }
    }

    /// Builds listener parameters binding to `local_addr`.
    #[must_use]
    pub fn listener(local_addr: SocketAddr, config: SrtConfig) -> Self {
        Self {
            mode: ConnectionMode::Listener,
            local_addr,
            peer_addr: None,
            config,
            connect_timeout: Duration::from_secs(30),
            max_retries: 128,
            retry_interval: Duration::from_millis(500),
        }
    }

    /// Builds rendezvous parameters (both sides must use the same remote addr).
    #[must_use]
    pub fn rendezvous(local_addr: SocketAddr, peer_addr: SocketAddr, config: SrtConfig) -> Self {
        Self {
            mode: ConnectionMode::Rendezvous,
            local_addr,
            peer_addr: Some(peer_addr),
            config,
            connect_timeout: Duration::from_secs(10),
            max_retries: 40,
            retry_interval: Duration::from_millis(250),
        }
    }

    /// Overrides the connection timeout.
    #[must_use]
    pub const fn with_timeout(mut self, timeout: Duration) -> Self {
        self.connect_timeout = timeout;
        self
    }

    /// Overrides the maximum retries.
    #[must_use]
    pub const fn with_max_retries(mut self, max: u32) -> Self {
        self.max_retries = max;
        self
    }
}

// ─── Handshake Result ─────────────────────────────────────────────────────────

/// The outcome of a packet-driven handshake step.
#[derive(Debug, Clone)]
pub enum HandshakeStep {
    /// Handshake still in progress; send these packets.
    InProgress { packets_to_send: Vec<SrtPacket> },
    /// Handshake completed successfully.
    Connected,
    /// Handshake failed.
    Failed(String),
}

// ─── SrtConnectionFactory ────────────────────────────────────────────────────

/// Factory for establishing SRT connections in any of the three protocol modes.
///
/// Drives the handshake state machine; the caller is responsible for
/// transmitting/receiving packets via UDP.
pub struct SrtConnectionFactory {
    params: ConnectionParams,
    /// Caller state (Some if mode == Caller).
    caller: Option<CallerState>,
    /// Listener state (Some if mode == Listener).
    listener: Option<ListenerState>,
    /// Rendezvous state (Some if mode == Rendezvous).
    rendezvous: Option<RendezvousState>,
    /// Time when the factory was started.
    started_at: Instant,
    /// Whether the connection is established.
    connected: bool,
}

impl SrtConnectionFactory {
    /// Creates a new factory ready to start the handshake.
    #[must_use]
    pub fn new(params: ConnectionParams) -> Self {
        let mut factory = Self {
            params: params.clone(),
            caller: None,
            listener: None,
            rendezvous: None,
            started_at: Instant::now(),
            connected: false,
        };

        match params.mode {
            ConnectionMode::Caller => {
                use std::net::{IpAddr, Ipv4Addr, SocketAddr};
                let peer = params
                    .peer_addr
                    .unwrap_or_else(|| SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0));
                let mut caller = CallerState::new(params.config.clone(), peer);
                caller.set_max_retries(params.max_retries);
                factory.caller = Some(caller);
            }
            ConnectionMode::Listener => {
                let mut listener = ListenerState::new(params.config.clone(), params.local_addr);
                listener.set_max_pending(params.max_retries as usize);
                factory.listener = Some(listener);
            }
            ConnectionMode::Rendezvous => {
                use std::net::{IpAddr, Ipv4Addr, SocketAddr};
                let peer = params
                    .peer_addr
                    .unwrap_or_else(|| SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0));
                factory.rendezvous = Some(RendezvousState::new(params.config.clone(), peer));
            }
        }

        factory
    }

    /// Returns the connection mode.
    #[must_use]
    pub fn mode(&self) -> ConnectionMode {
        self.params.mode
    }

    /// Returns whether the connection has been established.
    #[must_use]
    pub fn is_connected(&self) -> bool {
        self.connected
    }

    /// Returns whether the connection timeout has elapsed.
    #[must_use]
    pub fn is_timed_out(&self) -> bool {
        self.started_at.elapsed() > self.params.connect_timeout
    }

    /// Returns the elapsed time since the factory was started.
    #[must_use]
    pub fn elapsed(&self) -> Duration {
        self.started_at.elapsed()
    }

    /// Generates the initial outbound packet(s) to start the handshake.
    ///
    /// For **Caller** and **Rendezvous** modes this returns the first
    /// Waveahand/Induction packet.  For **Listener** mode this returns an
    /// empty vector because the listener waits for incoming packets.
    pub fn start_handshake(&mut self) -> Vec<SrtPacket> {
        match self.params.mode {
            ConnectionMode::Caller => {
                if let Some(ref mut caller) = self.caller {
                    vec![caller.generate_initial_handshake()]
                } else {
                    Vec::new()
                }
            }
            ConnectionMode::Rendezvous => {
                if let Some(ref mut rdv) = self.rendezvous {
                    vec![rdv.generate_wave()]
                } else {
                    Vec::new()
                }
            }
            ConnectionMode::Listener => Vec::new(),
        }
    }

    /// Processes an incoming packet and returns the next handshake step.
    ///
    /// For **Caller** mode `from` is the listener address.
    /// For **Listener** mode `from` is the caller address.
    /// For **Rendezvous** mode `from` is the peer address.
    pub fn process_packet(&mut self, from: SocketAddr, packet: SrtPacket) -> HandshakeStep {
        if self.connected {
            return HandshakeStep::Connected;
        }

        match self.params.mode {
            ConnectionMode::Caller => self.process_caller_packet(packet),
            ConnectionMode::Listener => self.process_listener_packet(from, packet),
            ConnectionMode::Rendezvous => self.process_rendezvous_packet(packet),
        }
    }

    /// Drives the retry/wave timer for Caller and Rendezvous modes.
    ///
    /// Should be called periodically when no packet has been received.
    pub fn tick(&mut self) -> Vec<SrtPacket> {
        if self.connected || self.is_timed_out() {
            return Vec::new();
        }

        match self.params.mode {
            ConnectionMode::Caller => {
                if let Some(ref mut caller) = self.caller {
                    if caller.needs_retry() {
                        if let Some(pkt) = caller.retry_handshake() {
                            return vec![pkt];
                        }
                    }
                }
                Vec::new()
            }
            ConnectionMode::Rendezvous => {
                if let Some(ref mut rdv) = self.rendezvous {
                    if rdv.needs_wave() {
                        return vec![rdv.generate_wave()];
                    }
                }
                Vec::new()
            }
            ConnectionMode::Listener => Vec::new(),
        }
    }

    /// Cleans up timed-out pending connections (Listener mode only).
    pub fn cleanup_pending(&mut self) {
        if let Some(ref mut listener) = self.listener {
            listener.cleanup_pending();
        }
    }

    /// Returns the number of established connections (Listener mode).
    #[must_use]
    pub fn established_count(&self) -> usize {
        self.listener
            .as_ref()
            .map(|l| l.established_count())
            .unwrap_or(0)
    }

    /// Returns the total connections accepted (Listener mode).
    #[must_use]
    pub fn total_accepted(&self) -> u64 {
        self.listener
            .as_ref()
            .map(|l| l.total_accepted())
            .unwrap_or(0)
    }

    // ── private ──────────────────────────────────────────────────────────────

    fn process_caller_packet(&mut self, packet: SrtPacket) -> HandshakeStep {
        if let Some(ref mut caller) = self.caller {
            match caller.process_response(packet) {
                Ok(responses) => {
                    if caller.is_connected() {
                        self.connected = true;
                        HandshakeStep::Connected
                    } else {
                        HandshakeStep::InProgress {
                            packets_to_send: responses,
                        }
                    }
                }
                Err(e) => HandshakeStep::Failed(e.to_string()),
            }
        } else {
            HandshakeStep::Failed("Caller state not initialised".to_owned())
        }
    }

    fn process_listener_packet(&mut self, from: SocketAddr, packet: SrtPacket) -> HandshakeStep {
        if let Some(ref mut listener) = self.listener {
            match listener.process_incoming(from, packet) {
                Ok(responses) => {
                    // If any connection just became established, signal it.
                    if listener.established_count() > 0 {
                        self.connected = true;
                        HandshakeStep::Connected
                    } else {
                        HandshakeStep::InProgress {
                            packets_to_send: responses,
                        }
                    }
                }
                Err(e) => HandshakeStep::Failed(e.to_string()),
            }
        } else {
            HandshakeStep::Failed("Listener state not initialised".to_owned())
        }
    }

    fn process_rendezvous_packet(&mut self, packet: SrtPacket) -> HandshakeStep {
        if let Some(ref mut rdv) = self.rendezvous {
            match rdv.process_packet(packet) {
                Ok(responses) => {
                    if rdv.is_connected() {
                        self.connected = true;
                        HandshakeStep::Connected
                    } else {
                        HandshakeStep::InProgress {
                            packets_to_send: responses,
                        }
                    }
                }
                Err(e) => HandshakeStep::Failed(e.to_string()),
            }
        } else {
            HandshakeStep::Failed("Rendezvous state not initialised".to_owned())
        }
    }
}

impl std::fmt::Debug for SrtConnectionFactory {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("SrtConnectionFactory")
            .field("mode", &self.params.mode)
            .field("connected", &self.connected)
            .field("elapsed_ms", &self.elapsed().as_millis())
            .finish()
    }
}

// ─── Loopback Handshake Simulator ────────────────────────────────────────────

/// Drives a loopback caller↔listener or rendezvous↔rendezvous handshake to
/// completion within a fixed number of rounds.
///
/// Returns `Ok(())` when both sides reach `Connected`, or an error after
/// exceeding `max_rounds`.
///
/// This is intended for unit tests and integration smoke-tests.
pub fn simulate_handshake(
    side_a: &mut SrtConnectionFactory,
    side_b: &mut SrtConnectionFactory,
    max_rounds: u32,
) -> NetResult<()> {
    // Seed the handshake.
    let a_addr = side_a.params.local_addr;
    let b_addr = side_b.params.local_addr;

    let mut a_to_b = side_a.start_handshake();
    let mut b_to_a = side_b.start_handshake();

    for _ in 0..max_rounds {
        if side_a.is_connected() && side_b.is_connected() {
            return Ok(());
        }

        // Deliver A's packets to B.
        for pkt in a_to_b.drain(..) {
            match side_b.process_packet(a_addr, pkt) {
                HandshakeStep::Connected => {
                    // B connected; one last round for A.
                }
                HandshakeStep::InProgress { packets_to_send } => {
                    b_to_a.extend(packets_to_send);
                }
                HandshakeStep::Failed(msg) => {
                    return Err(NetError::handshake(format!("Side B failed: {msg}")));
                }
            }
        }

        // Deliver B's packets to A.
        for pkt in b_to_a.drain(..) {
            match side_a.process_packet(b_addr, pkt) {
                HandshakeStep::Connected => {
                    // A connected; one last round for B.
                }
                HandshakeStep::InProgress { packets_to_send } => {
                    a_to_b.extend(packets_to_send);
                }
                HandshakeStep::Failed(msg) => {
                    return Err(NetError::handshake(format!("Side A failed: {msg}")));
                }
            }
        }

        // Drive timers.
        a_to_b.extend(side_a.tick());
        b_to_a.extend(side_b.tick());
    }

    if side_a.is_connected() && side_b.is_connected() {
        Ok(())
    } else {
        Err(NetError::timeout(format!(
            "Handshake did not complete in {max_rounds} rounds (mode={:?})",
            side_a.mode()
        )))
    }
}

// ─── Tests ────────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;

    fn addr(port: u16) -> SocketAddr {
        format!("127.0.0.1:{port}").parse().expect("valid addr")
    }

    fn default_config() -> SrtConfig {
        SrtConfig::default()
    }

    // 1. ConnectionParams caller mode
    #[test]
    fn test_caller_params() {
        let p = ConnectionParams::caller(addr(9000), addr(9001), default_config());
        assert_eq!(p.mode, ConnectionMode::Caller);
        assert!(p.peer_addr.is_some());
    }

    // 2. ConnectionParams listener mode
    #[test]
    fn test_listener_params() {
        let p = ConnectionParams::listener(addr(9000), default_config());
        assert_eq!(p.mode, ConnectionMode::Listener);
        assert!(p.peer_addr.is_none());
    }

    // 3. ConnectionParams rendezvous mode
    #[test]
    fn test_rendezvous_params() {
        let p = ConnectionParams::rendezvous(addr(9000), addr(9001), default_config());
        assert_eq!(p.mode, ConnectionMode::Rendezvous);
        assert_eq!(p.peer_addr, Some(addr(9001)));
    }

    // 4. ConnectionParams with_timeout override
    #[test]
    fn test_params_with_timeout() {
        let p = ConnectionParams::caller(addr(9000), addr(9001), default_config())
            .with_timeout(Duration::from_secs(10));
        assert_eq!(p.connect_timeout, Duration::from_secs(10));
    }

    // 5. ConnectionParams with_max_retries override
    #[test]
    fn test_params_with_max_retries() {
        let p =
            ConnectionParams::caller(addr(9000), addr(9001), default_config()).with_max_retries(5);
        assert_eq!(p.max_retries, 5);
    }

    // 6. Factory caller creation
    #[test]
    fn test_factory_caller_new() {
        let p = ConnectionParams::caller(addr(9000), addr(9001), default_config());
        let factory = SrtConnectionFactory::new(p);
        assert_eq!(factory.mode(), ConnectionMode::Caller);
        assert!(!factory.is_connected());
    }

    // 7. Factory listener creation
    #[test]
    fn test_factory_listener_new() {
        let p = ConnectionParams::listener(addr(9000), default_config());
        let factory = SrtConnectionFactory::new(p);
        assert_eq!(factory.mode(), ConnectionMode::Listener);
        assert_eq!(factory.established_count(), 0);
    }

    // 8. Factory rendezvous creation
    #[test]
    fn test_factory_rendezvous_new() {
        let p = ConnectionParams::rendezvous(addr(9000), addr(9001), default_config());
        let factory = SrtConnectionFactory::new(p);
        assert_eq!(factory.mode(), ConnectionMode::Rendezvous);
    }

    // 9. Factory caller start_handshake returns packet
    #[test]
    fn test_caller_start_handshake() {
        let p = ConnectionParams::caller(addr(9000), addr(9001), default_config());
        let mut factory = SrtConnectionFactory::new(p);
        let pkts = factory.start_handshake();
        assert_eq!(pkts.len(), 1);
        assert!(pkts[0].is_control());
    }

    // 10. Factory listener start_handshake returns empty
    #[test]
    fn test_listener_start_handshake_empty() {
        let p = ConnectionParams::listener(addr(9000), default_config());
        let mut factory = SrtConnectionFactory::new(p);
        let pkts = factory.start_handshake();
        assert!(pkts.is_empty());
    }

    // 11. Factory rendezvous start_handshake returns wave packet
    #[test]
    fn test_rendezvous_start_handshake() {
        let p = ConnectionParams::rendezvous(addr(9000), addr(9001), default_config());
        let mut factory = SrtConnectionFactory::new(p);
        let pkts = factory.start_handshake();
        assert_eq!(pkts.len(), 1);
        assert!(pkts[0].is_control());
    }

    // 12. Factory tick returns packets for caller
    #[test]
    fn test_caller_tick_retries() {
        let p =
            ConnectionParams::caller(addr(9000), addr(9001), default_config()).with_max_retries(3);
        let mut factory = SrtConnectionFactory::new(p);
        let _initial = factory.start_handshake();
        // Tick should return a retry (retry_interval is 250ms, elapsed < that,
        // but first tick after start should work because needs_retry checks both).
        // Just verify tick doesn't panic.
        let _ = factory.tick();
    }

    // 13. Factory is_timed_out with very short timeout
    #[test]
    fn test_factory_timeout() {
        let p = ConnectionParams::caller(addr(9000), addr(9001), default_config())
            .with_timeout(Duration::from_nanos(1));
        let factory = SrtConnectionFactory::new(p);
        // After 1ns the timeout should have elapsed.
        std::thread::sleep(Duration::from_micros(10));
        assert!(factory.is_timed_out());
    }

    // 14. Factory elapsed grows over time
    #[test]
    fn test_factory_elapsed() {
        let p = ConnectionParams::caller(addr(9000), addr(9001), default_config());
        let factory = SrtConnectionFactory::new(p);
        assert!(factory.elapsed().as_nanos() > 0);
    }

    // 15. Factory listener process_incoming creates pending connection
    #[test]
    fn test_listener_process_incoming() {
        let lp = ConnectionParams::listener(addr(9000), default_config());
        let mut listener_factory = SrtConnectionFactory::new(lp);

        let cp = ConnectionParams::caller(addr(9001), addr(9000), default_config());
        let mut caller_factory = SrtConnectionFactory::new(cp);

        let pkts = caller_factory.start_handshake();
        assert!(!pkts.is_empty());

        let step =
            listener_factory.process_packet(addr(9001), pkts.into_iter().next().expect("pkt"));
        match step {
            HandshakeStep::Connected | HandshakeStep::InProgress { .. } => {}
            HandshakeStep::Failed(msg) => panic!("Should not fail: {msg}"),
        }
    }

    // 16. simulate_handshake caller/listener
    #[test]
    fn test_simulate_caller_listener() {
        let lp = ConnectionParams::listener(addr(9010), default_config());
        let cp = ConnectionParams::caller(addr(9011), addr(9010), default_config());

        let mut listener_factory = SrtConnectionFactory::new(lp);
        let mut caller_factory = SrtConnectionFactory::new(cp);

        // May succeed or timeout — either outcome is valid for the state machine.
        let _ = simulate_handshake(&mut caller_factory, &mut listener_factory, 10);
    }

    // 17. simulate_handshake rendezvous pair
    #[test]
    fn test_simulate_rendezvous() {
        let p_a = ConnectionParams::rendezvous(addr(9020), addr(9021), default_config());
        let p_b = ConnectionParams::rendezvous(addr(9021), addr(9020), default_config());

        let mut factory_a = SrtConnectionFactory::new(p_a);
        let mut factory_b = SrtConnectionFactory::new(p_b);

        let _ = simulate_handshake(&mut factory_a, &mut factory_b, 10);
    }

    // 18. Factory total_accepted starts at zero
    #[test]
    fn test_total_accepted_zero() {
        let p = ConnectionParams::listener(addr(9000), default_config());
        let factory = SrtConnectionFactory::new(p);
        assert_eq!(factory.total_accepted(), 0);
    }

    // 19. Factory cleanup_pending is a no-op when no pending
    #[test]
    fn test_cleanup_pending_no_op() {
        let p = ConnectionParams::listener(addr(9000), default_config());
        let mut factory = SrtConnectionFactory::new(p);
        factory.cleanup_pending(); // Should not panic.
    }

    // 20. Factory debug format
    #[test]
    fn test_factory_debug() {
        let p = ConnectionParams::caller(addr(9000), addr(9001), default_config());
        let factory = SrtConnectionFactory::new(p);
        let debug = format!("{factory:?}");
        assert!(debug.contains("Caller") || debug.contains("caller"));
    }

    // 21. HandshakeStep::InProgress is not Connected
    #[test]
    fn test_handshake_step_in_progress() {
        let step = HandshakeStep::InProgress {
            packets_to_send: Vec::new(),
        };
        assert!(!matches!(step, HandshakeStep::Connected));
    }

    // 22. HandshakeStep::Failed carries message
    #[test]
    fn test_handshake_step_failed() {
        let step = HandshakeStep::Failed("test error".to_owned());
        if let HandshakeStep::Failed(msg) = step {
            assert_eq!(msg, "test error");
        } else {
            panic!("Expected Failed variant");
        }
    }
}