maxwell_utils/connection/
connection_pool.rs1use std::sync::Arc;
2
3use actix::{dev::ToEnvelope, prelude::*};
4use ahash::RandomState as AHasher;
5use dashmap::{mapref::entry::Entry, DashMap};
6
7use super::{Connection, StopMsg};
9
10#[derive(Debug, Clone)]
11pub struct ConnectionPoolOptions {
12 pub slot_size: u8,
13}
14
15impl Default for ConnectionPoolOptions {
16 fn default() -> Self {
17 ConnectionPoolOptions { slot_size: 8 }
18 }
19}
20
21pub struct ConnectionSlot<C: Connection> {
22 endpoint: String,
23 connections: Vec<Arc<Addr<C>>>,
24 index_seed: u16,
25}
26
27impl<C: Connection> ConnectionSlot<C> {
28 #[inline]
29 pub fn new<F>(endpoint: String, options: &ConnectionPoolOptions, init_connection: &F) -> Self
30 where F: Fn(&String) -> Addr<C> {
31 let mut connections = Vec::<Arc<Addr<C>>>::new();
32 for _ in 0..options.slot_size {
33 connections.push(Arc::new(init_connection(&endpoint)));
34 }
35 ConnectionSlot { endpoint, connections, index_seed: 0 }
36 }
37
38 #[inline]
39 pub fn clear(&mut self)
40 where <C as actix::Actor>::Context: ToEnvelope<C, StopMsg> {
41 for connection in &self.connections {
42 connection.do_send(StopMsg)
43 }
44 self.connections.clear();
45 }
46
47 #[inline]
48 pub fn get_or_init<F>(&mut self, init_connection: &F) -> Arc<Addr<C>>
49 where F: Fn(&String) -> Addr<C> {
50 let index = self.next_index();
51 self.get_or_init_with_index(index, init_connection)
52 }
53
54 #[inline]
55 pub fn get_or_init_with_index_seed<F>(
56 &mut self, index_seed: u32, init_connection: &F,
57 ) -> Arc<Addr<C>>
58 where F: Fn(&String) -> Addr<C> {
59 self.get_or_init_with_index(self.build_index(index_seed), init_connection)
60 }
61
62 #[inline]
63 fn get_or_init_with_index<F>(&mut self, index: usize, init_connection: &F) -> Arc<Addr<C>>
64 where F: Fn(&String) -> Addr<C> {
65 let connection = &self.connections[index];
66 if connection.connected() {
67 Arc::clone(connection)
68 } else {
69 self.connections[index] = Arc::new(init_connection(&self.endpoint));
70 Arc::clone(&self.connections[index])
71 }
72 }
73
74 #[inline]
75 fn next_index(&mut self) -> usize {
76 if self.index_seed as u32 + 1 > u16::MAX as u32 {
77 self.index_seed = 0;
78 } else {
79 self.index_seed += 1;
80 }
81 (self.index_seed % self.connections.len() as u16) as usize
82 }
83
84 #[inline]
85 fn build_index(&self, seed: u32) -> usize {
86 (seed % self.connections.len() as u32) as usize
87 }
88}
89
90pub struct ConnectionPool<C: Connection> {
91 options: ConnectionPoolOptions,
92 slots: DashMap<String, ConnectionSlot<C>, AHasher>,
93}
94
95impl<C: Connection> ConnectionPool<C> {
96 #[inline]
97 pub fn new(options: ConnectionPoolOptions) -> Self {
98 ConnectionPool { options, slots: DashMap::with_capacity_and_hasher(512, AHasher::new()) }
99 }
100
101 #[inline]
102 pub fn get_or_init<S, F>(&self, endpoint: S, init_connection: &F) -> Arc<Addr<C>>
103 where
104 S: AsRef<str>,
105 F: Fn(&String) -> Addr<C>, {
106 match self.slots.entry(endpoint.as_ref().to_owned()) {
107 Entry::Occupied(mut entry) => entry.get_mut().get_or_init(init_connection),
108 Entry::Vacant(entry) => {
109 let mut slot =
110 ConnectionSlot::new(endpoint.as_ref().to_owned(), &self.options, init_connection);
111 let connection = slot.get_or_init(init_connection);
112 entry.insert(slot);
113 connection
114 }
115 }
116 }
117
118 #[inline]
119 pub fn get_or_init_with_index_seed<S, F>(
120 &self, endpoint: S, index_seed: u32, init_connection: &F,
121 ) -> Arc<Addr<C>>
122 where
123 S: AsRef<str>,
124 F: Fn(&String) -> Addr<C>, {
125 match self.slots.entry(endpoint.as_ref().to_owned()) {
126 Entry::Occupied(mut entry) => {
127 entry.get_mut().get_or_init_with_index_seed(index_seed, init_connection)
128 }
129 Entry::Vacant(entry) => {
130 let mut slot =
131 ConnectionSlot::new(endpoint.as_ref().to_owned(), &self.options, init_connection);
132 let connection = slot.get_or_init_with_index_seed(index_seed, init_connection);
133 entry.insert(slot);
134 connection
135 }
136 }
137 }
138
139 #[inline]
140 pub fn remove_by_endpoint<S>(&self, endpoint: S)
141 where
142 S: AsRef<str>,
143 <C as actix::Actor>::Context: ToEnvelope<C, StopMsg>, {
144 match self.slots.entry(endpoint.as_ref().to_owned()) {
145 Entry::Occupied(mut occupied) => {
146 occupied.get_mut().clear();
147 occupied.remove();
148 }
149 Entry::Vacant(_) => {}
150 }
151 }
152}
153
154#[cfg(test)]
158mod tests {
159
160 use std::{
161 sync::Arc,
162 time::{Duration, Instant},
163 };
164
165 use actix::prelude::*;
166 use tokio::time::sleep;
167
168 use super::*;
170 use crate::connection::*;
171
172 #[actix::test]
173 async fn fetch_with() {
174 let connection_pool: ConnectionPool<FutureStyleConnection> =
175 ConnectionPool::new(ConnectionPoolOptions::default());
176 let endpoint = "localhost:8081";
177 let mut connections: Vec<Arc<Addr<FutureStyleConnection>>> = Vec::new();
178 let start = Instant::now();
179 for _i in 0..32 {
180 let connection = connection_pool.get_or_init(&endpoint, &|endpoint| {
181 FutureStyleConnection::start2(endpoint.to_owned(), ConnectionOptions::default())
182 });
183 connections.push(connection);
184 }
185 sleep(Duration::from_secs(3)).await;
186 let spent = Instant::now() - start;
187 println!("Spent {:?}ms to create connetion pool", spent.as_millis());
188 }
189}