maxwell_utils/connection/
connection_pool.rs

1use std::sync::Arc;
2
3use actix::{dev::ToEnvelope, prelude::*};
4use ahash::RandomState as AHasher;
5use dashmap::{mapref::entry::Entry, DashMap};
6
7// use triomphe::Arc;
8use super::{Connection, StopMsg};
9
10#[derive(Debug, Clone)]
11pub struct ConnectionPoolOptions {
12  pub slot_size: u8,
13}
14
15impl Default for ConnectionPoolOptions {
16  fn default() -> Self {
17    ConnectionPoolOptions { slot_size: 8 }
18  }
19}
20
21pub struct ConnectionSlot<C: Connection> {
22  endpoint: String,
23  connections: Vec<Arc<Addr<C>>>,
24  index_seed: u16,
25}
26
27impl<C: Connection> ConnectionSlot<C> {
28  #[inline]
29  pub fn new<F>(endpoint: String, options: &ConnectionPoolOptions, init_connection: &F) -> Self
30  where F: Fn(&String) -> Addr<C> {
31    let mut connections = Vec::<Arc<Addr<C>>>::new();
32    for _ in 0..options.slot_size {
33      connections.push(Arc::new(init_connection(&endpoint)));
34    }
35    ConnectionSlot { endpoint, connections, index_seed: 0 }
36  }
37
38  #[inline]
39  pub fn clear(&mut self)
40  where <C as actix::Actor>::Context: ToEnvelope<C, StopMsg> {
41    for connection in &self.connections {
42      connection.do_send(StopMsg)
43    }
44    self.connections.clear();
45  }
46
47  #[inline]
48  pub fn get_or_init<F>(&mut self, init_connection: &F) -> Arc<Addr<C>>
49  where F: Fn(&String) -> Addr<C> {
50    let index = self.next_index();
51    self.get_or_init_with_index(index, init_connection)
52  }
53
54  #[inline]
55  pub fn get_or_init_with_index_seed<F>(
56    &mut self, index_seed: u32, init_connection: &F,
57  ) -> Arc<Addr<C>>
58  where F: Fn(&String) -> Addr<C> {
59    self.get_or_init_with_index(self.build_index(index_seed), init_connection)
60  }
61
62  #[inline]
63  fn get_or_init_with_index<F>(&mut self, index: usize, init_connection: &F) -> Arc<Addr<C>>
64  where F: Fn(&String) -> Addr<C> {
65    let connection = &self.connections[index];
66    if connection.connected() {
67      Arc::clone(connection)
68    } else {
69      self.connections[index] = Arc::new(init_connection(&self.endpoint));
70      Arc::clone(&self.connections[index])
71    }
72  }
73
74  #[inline]
75  fn next_index(&mut self) -> usize {
76    if self.index_seed as u32 + 1 > u16::MAX as u32 {
77      self.index_seed = 0;
78    } else {
79      self.index_seed += 1;
80    }
81    (self.index_seed % self.connections.len() as u16) as usize
82  }
83
84  #[inline]
85  fn build_index(&self, seed: u32) -> usize {
86    (seed % self.connections.len() as u32) as usize
87  }
88}
89
90pub struct ConnectionPool<C: Connection> {
91  options: ConnectionPoolOptions,
92  slots: DashMap<String, ConnectionSlot<C>, AHasher>,
93}
94
95impl<C: Connection> ConnectionPool<C> {
96  #[inline]
97  pub fn new(options: ConnectionPoolOptions) -> Self {
98    ConnectionPool { options, slots: DashMap::with_capacity_and_hasher(512, AHasher::new()) }
99  }
100
101  #[inline]
102  pub fn get_or_init<S, F>(&self, endpoint: S, init_connection: &F) -> Arc<Addr<C>>
103  where
104    S: AsRef<str>,
105    F: Fn(&String) -> Addr<C>, {
106    match self.slots.entry(endpoint.as_ref().to_owned()) {
107      Entry::Occupied(mut entry) => entry.get_mut().get_or_init(init_connection),
108      Entry::Vacant(entry) => {
109        let mut slot =
110          ConnectionSlot::new(endpoint.as_ref().to_owned(), &self.options, init_connection);
111        let connection = slot.get_or_init(init_connection);
112        entry.insert(slot);
113        connection
114      }
115    }
116  }
117
118  #[inline]
119  pub fn get_or_init_with_index_seed<S, F>(
120    &self, endpoint: S, index_seed: u32, init_connection: &F,
121  ) -> Arc<Addr<C>>
122  where
123    S: AsRef<str>,
124    F: Fn(&String) -> Addr<C>, {
125    match self.slots.entry(endpoint.as_ref().to_owned()) {
126      Entry::Occupied(mut entry) => {
127        entry.get_mut().get_or_init_with_index_seed(index_seed, init_connection)
128      }
129      Entry::Vacant(entry) => {
130        let mut slot =
131          ConnectionSlot::new(endpoint.as_ref().to_owned(), &self.options, init_connection);
132        let connection = slot.get_or_init_with_index_seed(index_seed, init_connection);
133        entry.insert(slot);
134        connection
135      }
136    }
137  }
138
139  #[inline]
140  pub fn remove_by_endpoint<S>(&self, endpoint: S)
141  where
142    S: AsRef<str>,
143    <C as actix::Actor>::Context: ToEnvelope<C, StopMsg>, {
144    match self.slots.entry(endpoint.as_ref().to_owned()) {
145      Entry::Occupied(mut occupied) => {
146        occupied.get_mut().clear();
147        occupied.remove();
148      }
149      Entry::Vacant(_) => {}
150    }
151  }
152}
153
154////////////////////////////////////////////////////////////////////////////////
155/// test cases
156////////////////////////////////////////////////////////////////////////////////
157#[cfg(test)]
158mod tests {
159
160  use std::{
161    sync::Arc,
162    time::{Duration, Instant},
163  };
164
165  use actix::prelude::*;
166  use tokio::time::sleep;
167
168  // use triomphe::Arc;
169  use super::*;
170  use crate::connection::*;
171
172  #[actix::test]
173  async fn fetch_with() {
174    let connection_pool: ConnectionPool<FutureStyleConnection> =
175      ConnectionPool::new(ConnectionPoolOptions::default());
176    let endpoint = "localhost:8081";
177    let mut connections: Vec<Arc<Addr<FutureStyleConnection>>> = Vec::new();
178    let start = Instant::now();
179    for _i in 0..32 {
180      let connection = connection_pool.get_or_init(&endpoint, &|endpoint| {
181        FutureStyleConnection::start2(endpoint.to_owned(), ConnectionOptions::default())
182      });
183      connections.push(connection);
184    }
185    sleep(Duration::from_secs(3)).await;
186    let spent = Instant::now() - start;
187    println!("Spent {:?}ms to create connetion pool", spent.as_millis());
188  }
189}