1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
use crate::{
  clients::RedisClient,
  error::{RedisError, RedisErrorKind},
  interfaces::ClientLike,
  types::{ConnectHandle, PerformanceConfig, ReconnectPolicy, RedisConfig},
  utils,
};
use futures::future::{join_all, try_join_all};
use std::{
  fmt,
  ops::Deref,
  sync::{atomic::AtomicUsize, Arc},
};

#[cfg(feature = "dns")]
use crate::types::Resolve;

/// The inner state used by a `RedisPool`.
#[derive(Clone)]
pub(crate) struct RedisPoolInner {
  clients: Vec<RedisClient>,
  last:    Arc<AtomicUsize>,
}

/// A struct to pool multiple Redis clients together into one interface that will round-robin requests among clients,
/// preferring clients with an active connection if specified.
#[derive(Clone)]
pub struct RedisPool {
  inner: Arc<RedisPoolInner>,
}

impl fmt::Debug for RedisPool {
  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
    f.debug_struct("RedisPool")
      .field("size", &self.inner.clients.len())
      .finish()
  }
}

impl Deref for RedisPool {
  type Target = RedisClient;

  fn deref(&self) -> &Self::Target {
    self.next()
  }
}

impl<'a> From<&'a RedisPool> for &'a RedisClient {
  fn from(p: &'a RedisPool) -> &'a RedisClient {
    p.next()
  }
}

impl<'a> From<&'a RedisPool> for RedisClient {
  fn from(p: &'a RedisPool) -> RedisClient {
    p.next().clone()
  }
}

impl RedisPool {
  /// Create a new pool without connecting to the server.
  pub fn new(
    config: RedisConfig,
    perf: Option<PerformanceConfig>,
    policy: Option<ReconnectPolicy>,
    size: usize,
  ) -> Result<Self, RedisError> {
    if size > 0 {
      let mut clients = Vec::with_capacity(size);
      for _ in 0 .. size {
        clients.push(RedisClient::new(config.clone(), perf.clone(), policy.clone()));
      }
      let last = Arc::new(AtomicUsize::new(0));

      Ok(RedisPool {
        inner: Arc::new(RedisPoolInner { clients, last }),
      })
    } else {
      Err(RedisError::new(RedisErrorKind::Config, "Pool cannot be empty."))
    }
  }

  /// Read the individual clients in the pool.
  pub fn clients(&self) -> &[RedisClient] {
    &self.inner.clients
  }

  /// Connect each client to the server, returning the task driving each connection.
  ///
  /// The caller is responsible for calling any `on_*` functions on each client.
  pub fn connect(&self) -> Vec<ConnectHandle> {
    self.inner.clients.iter().map(|c| c.connect()).collect()
  }

  /// Wait for all the clients to connect to the server.
  pub async fn wait_for_connect(&self) -> Result<(), RedisError> {
    let futures = self.inner.clients.iter().map(|c| c.wait_for_connect());
    let _ = try_join_all(futures).await?;

    Ok(())
  }

  /// Override the DNS resolution logic for all clients in the pool.
  #[cfg(feature = "dns")]
  #[cfg_attr(docsrs, doc(cfg(feature = "dns")))]
  pub async fn set_resolver(&self, resolver: Arc<dyn Resolve>) {
    for client in self.inner.clients.iter() {
      client.set_resolver(resolver.clone()).await;
    }
  }

  /// Read the size of the pool.
  pub fn size(&self) -> usize {
    self.inner.clients.len()
  }

  /// Read the client that should run the next command.
  #[cfg(feature = "pool-prefer-active")]
  pub fn next(&self) -> &RedisClient {
    let mut idx = utils::incr_atomic(&self.inner.last) % self.inner.clients.len();

    for _ in 0 .. self.inner.clients.len() {
      let client = &self.inner.clients[idx];
      if client.is_connected() {
        return client;
      }
      idx = (idx + 1) % self.inner.clients.len();
    }

    &self.inner.clients[idx]
  }

  /// Read the client that should run the next command.
  #[cfg(not(feature = "pool-prefer-active"))]
  pub fn next(&self) -> &RedisClient {
    &self.inner.clients[utils::incr_atomic(&self.inner.last) % self.inner.clients.len()]
  }

  /// Read the client that ran the last command.
  pub fn last(&self) -> &RedisClient {
    &self.inner.clients[utils::read_atomic(&self.inner.last) % self.inner.clients.len()]
  }

  /// Call `QUIT` on each client in the pool.
  pub async fn quit_pool(&self) {
    let futures = self.inner.clients.iter().map(|c| c.quit());
    let _ = join_all(futures).await;
  }
}