solana_connection_cache/
connection_cache.rs

1use {
2    crate::{
3        client_connection::ClientConnection as BlockingClientConnection,
4        connection_cache_stats::{ConnectionCacheStats, CONNECTION_STAT_SUBMISSION_INTERVAL},
5        nonblocking::client_connection::ClientConnection as NonblockingClientConnection,
6    },
7    crossbeam_channel::{Receiver, RecvError, Sender},
8    indexmap::map::IndexMap,
9    log::*,
10    rand::{thread_rng, Rng},
11    solana_keypair::Keypair,
12    solana_measure::measure::Measure,
13    solana_time_utils::AtomicInterval,
14    std::{
15        net::SocketAddr,
16        sync::{atomic::Ordering, Arc, RwLock},
17        thread::{Builder, JoinHandle},
18    },
19    thiserror::Error,
20};
21
22// Should be non-zero
23const MAX_CONNECTIONS: usize = 1024;
24
25/// Default connection pool size per remote address
26pub const DEFAULT_CONNECTION_POOL_SIZE: usize = 2;
27
28#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
29pub enum Protocol {
30    UDP,
31    QUIC,
32}
33
34pub trait ConnectionManager: Send + Sync + 'static {
35    type ConnectionPool: ConnectionPool;
36    type NewConnectionConfig: NewConnectionConfig;
37
38    const PROTOCOL: Protocol;
39
40    fn new_connection_pool(&self) -> Self::ConnectionPool;
41    fn new_connection_config(&self) -> Self::NewConnectionConfig;
42    fn update_key(&self, _key: &Keypair) -> Result<(), Box<dyn std::error::Error>>;
43}
44
45pub struct ConnectionCache<
46    R, // ConnectionPool
47    S, // ConnectionManager
48    T, // NewConnectionConfig
49> {
50    name: &'static str,
51    map: Arc<RwLock<IndexMap<SocketAddr, /*ConnectionPool:*/ R>>>,
52    connection_manager: Arc<S>,
53    stats: Arc<ConnectionCacheStats>,
54    last_stats: AtomicInterval,
55    connection_pool_size: usize,
56    connection_config: Arc<T>,
57    sender: Sender<(usize, SocketAddr)>,
58}
59
60impl<P, M, C> ConnectionCache<P, M, C>
61where
62    P: ConnectionPool<NewConnectionConfig = C>,
63    M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
64    C: NewConnectionConfig,
65{
66    pub fn new(
67        name: &'static str,
68        connection_manager: M,
69        connection_pool_size: usize,
70    ) -> Result<Self, ClientError> {
71        let config = connection_manager.new_connection_config();
72        Ok(Self::new_with_config(
73            name,
74            connection_pool_size,
75            config,
76            connection_manager,
77        ))
78    }
79
80    pub fn new_with_config(
81        name: &'static str,
82        connection_pool_size: usize,
83        connection_config: C,
84        connection_manager: M,
85    ) -> Self {
86        info!("Creating ConnectionCache {name}, pool size: {connection_pool_size}");
87        let (sender, receiver) = crossbeam_channel::unbounded();
88
89        let map = Arc::new(RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)));
90        let config = Arc::new(connection_config);
91        let connection_manager = Arc::new(connection_manager);
92        let connection_pool_size = 1.max(connection_pool_size); // The minimum pool size is 1.
93
94        let stats = Arc::new(ConnectionCacheStats::default());
95
96        let _async_connection_thread =
97            Self::create_connection_async_thread(map.clone(), receiver, stats.clone());
98        Self {
99            name,
100            map,
101            stats,
102            connection_manager,
103            last_stats: AtomicInterval::default(),
104            connection_pool_size,
105            connection_config: config,
106            sender,
107        }
108    }
109
110    /// This actually triggers the connection creation by sending empty data
111    fn create_connection_async_thread(
112        map: Arc<RwLock<IndexMap<SocketAddr, /*ConnectionPool:*/ P>>>,
113        receiver: Receiver<(usize, SocketAddr)>,
114        stats: Arc<ConnectionCacheStats>,
115    ) -> JoinHandle<()> {
116        Builder::new()
117            .name("solQAsynCon".to_string())
118            .spawn(move || loop {
119                let recv_result = receiver.recv();
120                match recv_result {
121                    Err(RecvError) => {
122                        break;
123                    }
124                    Ok((idx, addr)) => {
125                        let map = map.read().unwrap();
126                        let pool = map.get(&addr);
127                        if let Some(pool) = pool {
128                            let conn = pool.get(idx);
129                            if let Ok(conn) = conn {
130                                drop(map);
131                                let conn = conn.new_blocking_connection(addr, stats.clone());
132                                let result = conn.send_data(&[]);
133                                debug!("Create async connection result {result:?} for {addr}");
134                            }
135                        }
136                    }
137                }
138            })
139            .unwrap()
140    }
141
142    pub fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
143        let mut map = self.map.write().unwrap();
144        map.clear();
145        self.connection_manager.update_key(key)
146    }
147    /// Create a lazy connection object under the exclusive lock of the cache map if there is not
148    /// enough used connections in the connection pool for the specified address.
149    /// Returns CreateConnectionResult.
150    fn create_connection(
151        &self,
152        lock_timing_ms: &mut u64,
153        addr: &SocketAddr,
154    ) -> CreateConnectionResult<<P as ConnectionPool>::BaseClientConnection> {
155        let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure");
156        let mut map = self.map.write().unwrap();
157        get_connection_map_lock_measure.stop();
158        *lock_timing_ms = lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms());
159        // Read again, as it is possible that between read lock dropped and the write lock acquired
160        // another thread could have setup the connection.
161
162        let pool_status = map
163            .get(addr)
164            .map(|pool| pool.check_pool_status(self.connection_pool_size))
165            .unwrap_or(PoolStatus::Empty);
166
167        let (cache_hit, num_evictions, eviction_timing_ms) =
168            if matches!(pool_status, PoolStatus::Empty) {
169                Self::create_connection_internal(
170                    &self.connection_config,
171                    &self.connection_manager,
172                    &mut map,
173                    addr,
174                    self.connection_pool_size,
175                    None,
176                )
177            } else {
178                (true, 0, 0)
179            };
180
181        if matches!(pool_status, PoolStatus::PartiallyFull) {
182            // trigger an async connection create
183            debug!("Triggering async connection for {addr:?}");
184            Self::create_connection_internal(
185                &self.connection_config,
186                &self.connection_manager,
187                &mut map,
188                addr,
189                self.connection_pool_size,
190                Some(&self.sender),
191            );
192        }
193
194        let pool = map.get(addr).unwrap();
195        let connection = pool.borrow_connection();
196
197        CreateConnectionResult {
198            connection,
199            cache_hit,
200            connection_cache_stats: self.stats.clone(),
201            num_evictions,
202            eviction_timing_ms,
203        }
204    }
205
206    fn create_connection_internal(
207        config: &C,
208        connection_manager: &M,
209        map: &mut std::sync::RwLockWriteGuard<'_, IndexMap<SocketAddr, P>>,
210        addr: &SocketAddr,
211        connection_pool_size: usize,
212        async_connection_sender: Option<&Sender<(usize, SocketAddr)>>,
213    ) -> (bool, u64, u64) {
214        // evict a connection if the cache is reaching upper bounds
215        let mut num_evictions = 0;
216        let mut get_connection_cache_eviction_measure =
217            Measure::start("get_connection_cache_eviction_measure");
218        let existing_index = map.get_index_of(addr);
219        while map.len() >= MAX_CONNECTIONS {
220            let mut rng = thread_rng();
221            let n = rng.gen_range(0..MAX_CONNECTIONS);
222            if let Some(index) = existing_index {
223                if n == index {
224                    continue;
225                }
226            }
227            map.swap_remove_index(n);
228            num_evictions += 1;
229        }
230        get_connection_cache_eviction_measure.stop();
231
232        let mut hit_cache = false;
233        map.entry(*addr)
234            .and_modify(|pool| {
235                if matches!(
236                    pool.check_pool_status(connection_pool_size),
237                    PoolStatus::PartiallyFull
238                ) {
239                    let idx = pool.add_connection(config, addr);
240                    if let Some(sender) = async_connection_sender {
241                        debug!(
242                            "Sending async connection creation {} for {addr}",
243                            pool.num_connections() - 1
244                        );
245                        sender.send((idx, *addr)).unwrap();
246                    };
247                } else {
248                    hit_cache = true;
249                }
250            })
251            .or_insert_with(|| {
252                let mut pool = connection_manager.new_connection_pool();
253                pool.add_connection(config, addr);
254                pool
255            });
256        (
257            hit_cache,
258            num_evictions,
259            get_connection_cache_eviction_measure.as_ms(),
260        )
261    }
262
263    fn get_or_add_connection(
264        &self,
265        addr: &SocketAddr,
266    ) -> GetConnectionResult<<P as ConnectionPool>::BaseClientConnection> {
267        let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure");
268        let map = self.map.read().unwrap();
269        get_connection_map_lock_measure.stop();
270
271        let mut lock_timing_ms = get_connection_map_lock_measure.as_ms();
272
273        let report_stats = self
274            .last_stats
275            .should_update(CONNECTION_STAT_SUBMISSION_INTERVAL);
276
277        let mut get_connection_map_measure = Measure::start("get_connection_hit_measure");
278        let CreateConnectionResult {
279            connection,
280            cache_hit,
281            connection_cache_stats,
282            num_evictions,
283            eviction_timing_ms,
284        } = match map.get(addr) {
285            Some(pool) => {
286                let pool_status = pool.check_pool_status(self.connection_pool_size);
287                match pool_status {
288                    PoolStatus::Empty => {
289                        // create more connection and put it in the pool
290                        drop(map);
291                        self.create_connection(&mut lock_timing_ms, addr)
292                    }
293                    PoolStatus::PartiallyFull | PoolStatus::Full => {
294                        let connection = pool.borrow_connection();
295                        if matches!(pool_status, PoolStatus::PartiallyFull) {
296                            debug!("Creating connection async for {addr}");
297                            drop(map);
298                            let mut map = self.map.write().unwrap();
299                            Self::create_connection_internal(
300                                &self.connection_config,
301                                &self.connection_manager,
302                                &mut map,
303                                addr,
304                                self.connection_pool_size,
305                                Some(&self.sender),
306                            );
307                        }
308                        CreateConnectionResult {
309                            connection,
310                            cache_hit: true,
311                            connection_cache_stats: self.stats.clone(),
312                            num_evictions: 0,
313                            eviction_timing_ms: 0,
314                        }
315                    }
316                }
317            }
318            None => {
319                // Upgrade to write access by dropping read lock and acquire write lock
320                drop(map);
321                self.create_connection(&mut lock_timing_ms, addr)
322            }
323        };
324        get_connection_map_measure.stop();
325
326        GetConnectionResult {
327            connection,
328            cache_hit,
329            report_stats,
330            map_timing_ms: get_connection_map_measure.as_ms(),
331            lock_timing_ms,
332            connection_cache_stats,
333            num_evictions,
334            eviction_timing_ms,
335        }
336    }
337
338    fn get_connection_and_log_stats(
339        &self,
340        addr: &SocketAddr,
341    ) -> (
342        Arc<<P as ConnectionPool>::BaseClientConnection>,
343        Arc<ConnectionCacheStats>,
344    ) {
345        let mut get_connection_measure = Measure::start("get_connection_measure");
346        let GetConnectionResult {
347            connection,
348            cache_hit,
349            report_stats,
350            map_timing_ms,
351            lock_timing_ms,
352            connection_cache_stats,
353            num_evictions,
354            eviction_timing_ms,
355        } = self.get_or_add_connection(addr);
356
357        if report_stats {
358            connection_cache_stats.report(self.name);
359        }
360
361        if cache_hit {
362            connection_cache_stats
363                .cache_hits
364                .fetch_add(1, Ordering::Relaxed);
365            connection_cache_stats
366                .get_connection_hit_ms
367                .fetch_add(map_timing_ms, Ordering::Relaxed);
368        } else {
369            connection_cache_stats
370                .cache_misses
371                .fetch_add(1, Ordering::Relaxed);
372            connection_cache_stats
373                .get_connection_miss_ms
374                .fetch_add(map_timing_ms, Ordering::Relaxed);
375            connection_cache_stats
376                .cache_evictions
377                .fetch_add(num_evictions, Ordering::Relaxed);
378            connection_cache_stats
379                .eviction_time_ms
380                .fetch_add(eviction_timing_ms, Ordering::Relaxed);
381        }
382
383        get_connection_measure.stop();
384        connection_cache_stats
385            .get_connection_lock_ms
386            .fetch_add(lock_timing_ms, Ordering::Relaxed);
387        connection_cache_stats
388            .get_connection_ms
389            .fetch_add(get_connection_measure.as_ms(), Ordering::Relaxed);
390
391        (connection, connection_cache_stats)
392    }
393
394    pub fn get_connection(&self, addr: &SocketAddr) -> Arc<<<P as ConnectionPool>::BaseClientConnection as BaseClientConnection>::BlockingClientConnection>{
395        let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr);
396        connection.new_blocking_connection(*addr, connection_cache_stats)
397    }
398
399    pub fn get_nonblocking_connection(
400        &self,
401        addr: &SocketAddr,
402    ) -> Arc<<<P as ConnectionPool>::BaseClientConnection as BaseClientConnection>::NonblockingClientConnection>{
403        let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr);
404        connection.new_nonblocking_connection(*addr, connection_cache_stats)
405    }
406}
407
408#[derive(Error, Debug)]
409pub enum ConnectionPoolError {
410    #[error("connection index is out of range of the pool")]
411    IndexOutOfRange,
412}
413
414#[derive(Error, Debug)]
415pub enum ClientError {
416    #[error("IO error: {0:?}")]
417    IoError(#[from] std::io::Error),
418}
419
420pub trait NewConnectionConfig: Sized + Send + Sync + 'static {
421    fn new() -> Result<Self, ClientError>;
422}
423
424pub enum PoolStatus {
425    Empty,
426    PartiallyFull,
427    Full,
428}
429
430pub trait ConnectionPool: Send + Sync + 'static {
431    type NewConnectionConfig: NewConnectionConfig;
432    type BaseClientConnection: BaseClientConnection;
433
434    /// Add a connection to the pool and return its index
435    fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) -> usize;
436
437    /// Get the number of current connections in the pool
438    fn num_connections(&self) -> usize;
439
440    /// Get a connection based on its index in the pool, without checking if the
441    fn get(&self, index: usize) -> Result<Arc<Self::BaseClientConnection>, ConnectionPoolError>;
442
443    /// Get a connection from the pool. It must have at least one connection in the pool.
444    /// This randomly picks a connection in the pool.
445    fn borrow_connection(&self) -> Arc<Self::BaseClientConnection> {
446        let mut rng = thread_rng();
447        let n = rng.gen_range(0..self.num_connections());
448        self.get(n).expect("index is within num_connections")
449    }
450
451    /// Check if we need to create a new connection. If the count of the connections
452    /// is smaller than the pool size and if there is no connection at all.
453    fn check_pool_status(&self, required_pool_size: usize) -> PoolStatus {
454        if self.num_connections() == 0 {
455            PoolStatus::Empty
456        } else if self.num_connections() < required_pool_size {
457            PoolStatus::PartiallyFull
458        } else {
459            PoolStatus::Full
460        }
461    }
462
463    fn create_pool_entry(
464        &self,
465        config: &Self::NewConnectionConfig,
466        addr: &SocketAddr,
467    ) -> Arc<Self::BaseClientConnection>;
468}
469
470pub trait BaseClientConnection {
471    type BlockingClientConnection: BlockingClientConnection;
472    type NonblockingClientConnection: NonblockingClientConnection;
473
474    fn new_blocking_connection(
475        &self,
476        addr: SocketAddr,
477        stats: Arc<ConnectionCacheStats>,
478    ) -> Arc<Self::BlockingClientConnection>;
479
480    fn new_nonblocking_connection(
481        &self,
482        addr: SocketAddr,
483        stats: Arc<ConnectionCacheStats>,
484    ) -> Arc<Self::NonblockingClientConnection>;
485}
486
487struct GetConnectionResult<T> {
488    connection: Arc</*BaseClientConnection:*/ T>,
489    cache_hit: bool,
490    report_stats: bool,
491    map_timing_ms: u64,
492    lock_timing_ms: u64,
493    connection_cache_stats: Arc<ConnectionCacheStats>,
494    num_evictions: u64,
495    eviction_timing_ms: u64,
496}
497
498struct CreateConnectionResult<T> {
499    connection: Arc</*BaseClientConnection:*/ T>,
500    cache_hit: bool,
501    connection_cache_stats: Arc<ConnectionCacheStats>,
502    num_evictions: u64,
503    eviction_timing_ms: u64,
504}
505
506#[cfg(test)]
507mod tests {
508    use {
509        super::*,
510        crate::{
511            client_connection::ClientConnection as BlockingClientConnection,
512            nonblocking::client_connection::ClientConnection as NonblockingClientConnection,
513        },
514        async_trait::async_trait,
515        rand::{Rng, SeedableRng},
516        rand_chacha::ChaChaRng,
517        solana_net_utils::sockets::bind_to_localhost_unique,
518        solana_transaction_error::TransportResult,
519        std::{
520            net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
521            sync::Arc,
522        },
523    };
524
525    struct MockUdpPool {
526        connections: Vec<Arc<MockUdp>>,
527    }
528    impl ConnectionPool for MockUdpPool {
529        type NewConnectionConfig = MockUdpConfig;
530        type BaseClientConnection = MockUdp;
531
532        /// Add a connection into the pool and return its index in the pool.
533        fn add_connection(
534            &mut self,
535            config: &Self::NewConnectionConfig,
536            addr: &SocketAddr,
537        ) -> usize {
538            let connection = self.create_pool_entry(config, addr);
539            let idx = self.connections.len();
540            self.connections.push(connection);
541            idx
542        }
543
544        fn num_connections(&self) -> usize {
545            self.connections.len()
546        }
547
548        fn get(
549            &self,
550            index: usize,
551        ) -> Result<Arc<Self::BaseClientConnection>, ConnectionPoolError> {
552            self.connections
553                .get(index)
554                .cloned()
555                .ok_or(ConnectionPoolError::IndexOutOfRange)
556        }
557
558        fn create_pool_entry(
559            &self,
560            config: &Self::NewConnectionConfig,
561            _addr: &SocketAddr,
562        ) -> Arc<Self::BaseClientConnection> {
563            Arc::new(MockUdp(config.udp_socket.clone()))
564        }
565    }
566
567    struct MockUdpConfig {
568        udp_socket: Arc<UdpSocket>,
569    }
570
571    impl Default for MockUdpConfig {
572        fn default() -> Self {
573            Self {
574                udp_socket: Arc::new(bind_to_localhost_unique().unwrap()),
575            }
576        }
577    }
578
579    impl NewConnectionConfig for MockUdpConfig {
580        fn new() -> Result<Self, ClientError> {
581            Ok(Self {
582                udp_socket: Arc::new(
583                    bind_to_localhost_unique().map_err(Into::<ClientError>::into)?,
584                ),
585            })
586        }
587    }
588
589    struct MockUdp(Arc<UdpSocket>);
590    impl BaseClientConnection for MockUdp {
591        type BlockingClientConnection = MockUdpConnection;
592        type NonblockingClientConnection = MockUdpConnection;
593
594        fn new_blocking_connection(
595            &self,
596            addr: SocketAddr,
597            _stats: Arc<ConnectionCacheStats>,
598        ) -> Arc<Self::BlockingClientConnection> {
599            Arc::new(MockUdpConnection {
600                _socket: self.0.clone(),
601                addr,
602            })
603        }
604
605        fn new_nonblocking_connection(
606            &self,
607            addr: SocketAddr,
608            _stats: Arc<ConnectionCacheStats>,
609        ) -> Arc<Self::NonblockingClientConnection> {
610            Arc::new(MockUdpConnection {
611                _socket: self.0.clone(),
612                addr,
613            })
614        }
615    }
616
617    struct MockUdpConnection {
618        _socket: Arc<UdpSocket>,
619        addr: SocketAddr,
620    }
621
622    #[derive(Default)]
623    struct MockConnectionManager {}
624
625    impl ConnectionManager for MockConnectionManager {
626        type ConnectionPool = MockUdpPool;
627        type NewConnectionConfig = MockUdpConfig;
628
629        const PROTOCOL: Protocol = Protocol::QUIC;
630
631        fn new_connection_pool(&self) -> Self::ConnectionPool {
632            MockUdpPool {
633                connections: Vec::default(),
634            }
635        }
636
637        fn new_connection_config(&self) -> Self::NewConnectionConfig {
638            MockUdpConfig::new().unwrap()
639        }
640
641        fn update_key(&self, _key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
642            Ok(())
643        }
644    }
645
646    impl BlockingClientConnection for MockUdpConnection {
647        fn server_addr(&self) -> &SocketAddr {
648            &self.addr
649        }
650        fn send_data(&self, _buffer: &[u8]) -> TransportResult<()> {
651            unimplemented!()
652        }
653        fn send_data_async(&self, _data: Vec<u8>) -> TransportResult<()> {
654            unimplemented!()
655        }
656        fn send_data_batch(&self, _buffers: &[Vec<u8>]) -> TransportResult<()> {
657            unimplemented!()
658        }
659        fn send_data_batch_async(&self, _buffers: Vec<Vec<u8>>) -> TransportResult<()> {
660            unimplemented!()
661        }
662    }
663
664    #[async_trait]
665    impl NonblockingClientConnection for MockUdpConnection {
666        fn server_addr(&self) -> &SocketAddr {
667            &self.addr
668        }
669        async fn send_data(&self, _data: &[u8]) -> TransportResult<()> {
670            unimplemented!()
671        }
672        async fn send_data_batch(&self, _buffers: &[Vec<u8>]) -> TransportResult<()> {
673            unimplemented!()
674        }
675    }
676
677    fn get_addr(rng: &mut ChaChaRng) -> SocketAddr {
678        let a = rng.gen_range(1..255);
679        let b = rng.gen_range(1..255);
680        let c = rng.gen_range(1..255);
681        let d = rng.gen_range(1..255);
682
683        let addr_str = format!("{a}.{b}.{c}.{d}:80");
684
685        addr_str.parse().expect("Invalid address")
686    }
687
688    #[test]
689    fn test_connection_cache() {
690        solana_logger::setup();
691        // Allow the test to run deterministically
692        // with the same pseudorandom sequence between runs
693        // and on different platforms - the cryptographic security
694        // property isn't important here but ChaChaRng provides a way
695        // to get the same pseudorandom sequence on different platforms
696        let mut rng = ChaChaRng::seed_from_u64(42);
697
698        // Generate a bunch of random addresses and create connections to them
699        // Since ClientConnection::new is infallible, it should't matter whether or not
700        // we can actually connect to those addresses - ClientConnection implementations should either
701        // be lazy and not connect until first use or handle connection errors somehow
702        // (without crashing, as would be required in a real practical validator)
703        let connection_manager = MockConnectionManager::default();
704        let connection_cache = ConnectionCache::new(
705            "connection_cache_test",
706            connection_manager,
707            DEFAULT_CONNECTION_POOL_SIZE,
708        )
709        .unwrap();
710        let addrs = (0..MAX_CONNECTIONS)
711            .map(|_| {
712                let addr = get_addr(&mut rng);
713                connection_cache.get_connection(&addr);
714                addr
715            })
716            .collect::<Vec<_>>();
717        {
718            let map = connection_cache.map.read().unwrap();
719            assert!(map.len() == MAX_CONNECTIONS);
720            addrs.iter().for_each(|addr| {
721                let conn = &map.get(addr).expect("Address not found").get(0).unwrap();
722                let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone());
723                assert_eq!(
724                    BlockingClientConnection::server_addr(&*conn).ip(),
725                    addr.ip(),
726                );
727                assert_eq!(
728                    NonblockingClientConnection::server_addr(&*conn).ip(),
729                    addr.ip(),
730                );
731            });
732        }
733
734        let addr = &get_addr(&mut rng);
735        connection_cache.get_connection(addr);
736
737        let port = addr.port();
738        let addr_with_quic_port = SocketAddr::new(addr.ip(), port);
739        let map = connection_cache.map.read().unwrap();
740        assert!(map.len() == MAX_CONNECTIONS);
741        let _conn = map.get(&addr_with_quic_port).expect("Address not found");
742    }
743
744    // Test that we can get_connection with a connection cache configured
745    // on an address with a port that would overflow to
746    // an invalid port.
747    #[test]
748    fn test_overflow_address() {
749        let port = u16::MAX;
750        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
751        let connection_manager = MockConnectionManager::default();
752        let connection_cache =
753            ConnectionCache::new("connection_cache_test", connection_manager, 1).unwrap();
754
755        let conn = connection_cache.get_connection(&addr);
756        // We (intentionally) don't have an interface that allows us to distinguish between
757        // UDP and Quic connections, so check instead that the port is valid (non-zero)
758        // and is the same as the input port (falling back on UDP)
759        assert_ne!(port, 0u16);
760        assert_eq!(BlockingClientConnection::server_addr(&*conn).port(), port);
761        assert_eq!(
762            NonblockingClientConnection::server_addr(&*conn).port(),
763            port
764        );
765    }
766}