firebird_wire/pool.rs
1//! Pool de conexões reutilizáveis ao servidor Firebird.
2//!
3//! [`Pool`] mantém um conjunto de [`Connection`]s ociosas prontas para uso, limitando
4//! o total de conexões simultâneas ao servidor pelo campo [`PoolConfig::max_size`].
5//! O pool não executa `ping` antes de entregar uma conexão ociosa; ele só filtra
6//! conexões que já foram marcadas localmente como quebradas. Se o servidor fechar
7//! um socket em silêncio, a primeira operação real vai detectar o erro e a conexão
8//! será descartada ao retornar ao pool.
9//!
10//! ```text
11//! let pool = Pool::new(config, PoolConfig::default());
12//! let mut conn = pool.get()?; // pega do pool ou cria uma nova
13//! conn.ping()?; // usa normalmente via Deref
14//! drop(conn); // devolve ao pool automaticamente
15//! ```
16//!
17//! # Compartilhamento
18//!
19//! [`Pool`] é barato de clonar (`Arc` interno) — compartilhe o mesmo pool entre
20//! tarefas sem custo.
21//!
22//! # Ciclo de vida
23//!
24//! A conexão é devolvida ao pool ao cair fora de escopo. Se o chamador precisar
25//! descartar uma conexão (ex.: após um erro irrecuperável), chame
26//! [`PooledConnection::discard`] antes de deixá-la cair.
27
28use std::collections::VecDeque;
29use std::sync::{Arc, Condvar, Mutex};
30use std::time::{Duration, Instant};
31
32use crate::config::ConnectConfig;
33use crate::connection::Connection;
34use crate::error::{Error, Result};
35
36/// Parâmetros do pool. Use [`Default`] para os valores recomendados.
37#[derive(Debug, Clone)]
38pub struct PoolConfig {
39 /// Número máximo de conexões simultâneas (ociosas + em uso). Padrão: 10.
40 pub max_size: usize,
41 /// Tempo máximo de espera por uma conexão disponível.
42 /// `None` espera indefinidamente. Padrão: 30 s.
43 pub acquisition_timeout: Option<Duration>,
44}
45
46impl Default for PoolConfig {
47 fn default() -> Self {
48 PoolConfig {
49 max_size: 10,
50 acquisition_timeout: Some(Duration::from_secs(30)),
51 }
52 }
53}
54
55// Internos compartilhados entre todos os clones do pool.
56struct PoolState {
57 config: ConnectConfig,
58 idle: Mutex<VecDeque<Connection>>,
59 permits: Arc<PermitPool>,
60 acquisition_timeout: Option<Duration>,
61}
62
63struct PermitPool {
64 available: Mutex<usize>,
65 changed: Condvar,
66}
67
68impl PermitPool {
69 fn new(max: usize) -> Self {
70 PermitPool {
71 available: Mutex::new(max),
72 changed: Condvar::new(),
73 }
74 }
75
76 fn acquire(self: &Arc<Self>, timeout: Option<Duration>) -> Result<Permit> {
77 let mut available = self
78 .available
79 .lock()
80 .map_err(|_| Error::Pool("pool lock poisoned".into()))?;
81
82 match timeout {
83 None => {
84 while *available == 0 {
85 available = self
86 .changed
87 .wait(available)
88 .map_err(|_| Error::Pool("pool lock poisoned".into()))?;
89 }
90 }
91 Some(timeout) => {
92 let deadline = Instant::now() + timeout;
93 while *available == 0 {
94 let now = Instant::now();
95 if now >= deadline {
96 return Err(Error::Timeout);
97 }
98 let wait = deadline.saturating_duration_since(now);
99 let (guard, result) = self
100 .changed
101 .wait_timeout(available, wait)
102 .map_err(|_| Error::Pool("pool lock poisoned".into()))?;
103 available = guard;
104 if result.timed_out() && *available == 0 {
105 return Err(Error::Timeout);
106 }
107 }
108 }
109 }
110
111 *available -= 1;
112 Ok(Permit {
113 permits: Arc::clone(self),
114 })
115 }
116
117 fn release(&self) {
118 if let Ok(mut available) = self.available.lock() {
119 *available += 1;
120 self.changed.notify_one();
121 }
122 }
123}
124
125struct Permit {
126 permits: Arc<PermitPool>,
127}
128
129impl Drop for Permit {
130 fn drop(&mut self) {
131 self.permits.release();
132 }
133}
134
135/// Um pool de conexões ao Firebird. Clone livremente para compartilhar entre tarefas.
136#[derive(Clone)]
137pub struct Pool(Arc<PoolState>);
138
139impl Pool {
140 /// Cria um pool vazio com a configuração fornecida. As conexões são criadas sob
141 /// demanda na primeira chamada a [`Self::get`].
142 pub fn new(config: ConnectConfig, pool_config: PoolConfig) -> Self {
143 Pool(Arc::new(PoolState {
144 config,
145 idle: Mutex::new(VecDeque::new()),
146 permits: Arc::new(PermitPool::new(pool_config.max_size)),
147 acquisition_timeout: pool_config.acquisition_timeout,
148 }))
149 }
150
151 /// Obtém uma conexão do pool. Bloqueia (até o `acquisition_timeout`) se o
152 /// número máximo de conexões já estiver em uso.
153 ///
154 /// Sempre que há uma conexão ociosa no pool, ela é reutilizada. Caso contrário,
155 /// uma nova conexão é aberta. A conexão é devolvida ao pool ao cair fora de escopo.
156 pub fn get(&self) -> Result<PooledConnection> {
157 let permit = self.acquire_permit()?;
158
159 // Tenta reutilizar uma conexão ociosa, descartando as mortas.
160 while let Some(conn) = self.pop_idle() {
161 if conn_is_alive(&conn) {
162 return Ok(PooledConnection {
163 conn: Some(conn),
164 pool: self.clone(),
165 permit: Some(permit),
166 });
167 }
168 // Conexão morta — descarta e continua tentando; o permit permanece.
169 }
170
171 // Nenhuma ociosa disponível: abre uma nova.
172 let conn = Connection::connect(&self.0.config)?;
173 Ok(PooledConnection {
174 conn: Some(conn),
175 pool: self.clone(),
176 permit: Some(permit),
177 })
178 }
179
180 // Devolve uma conexão à fila de ociosas. Chamado pelo Drop de PooledConnection.
181 fn return_conn(&self, conn: Connection) {
182 // Não recicla conexões com erro de I/O ou desync: seriam veneno para o
183 // próximo usuário. Descarta-as (o socket fecha ao soltar a Connection).
184 if !conn.is_healthy() {
185 return;
186 }
187 if let Ok(mut idle) = self.0.idle.lock() {
188 idle.push_back(conn);
189 }
190 // Se o lock estiver envenenado, descarta silenciosamente.
191 }
192
193 fn pop_idle(&self) -> Option<Connection> {
194 self.0.idle.lock().ok()?.pop_front()
195 }
196
197 fn acquire_permit(&self) -> Result<Permit> {
198 self.0.permits.acquire(self.0.acquisition_timeout)
199 }
200}
201
202/// Verifica superficialmente se uma conexão ainda parece viva (sem ida ao servidor).
203/// Filtra conexões já marcadas com erro de I/O ou desync; não detecta o servidor
204/// ter derrubado o socket de forma silenciosa (a primeira operação revelará isso,
205/// e aí a conexão é marcada e descartada na devolução).
206fn conn_is_alive(conn: &Connection) -> bool {
207 conn.is_healthy()
208}
209
210/// Guard que representa uma conexão retirada do pool.
211///
212/// Use via [`std::ops::Deref`]/[`std::ops::DerefMut`] para acessar a [`Connection`].
213/// Ao cair fora de escopo, a conexão é devolvida ao pool automaticamente.
214/// Se a conexão estiver com falha, chame [`Self::discard`] para descartá-la
215/// sem devolvê-la.
216pub struct PooledConnection {
217 conn: Option<Connection>,
218 pool: Pool,
219 permit: Option<Permit>,
220}
221
222impl PooledConnection {
223 /// Descarta a conexão em vez de devolvê-la ao pool. Use após um erro
224 /// irrecuperável na conexão para evitar contaminar o pool.
225 pub fn discard(mut self) {
226 self.conn = None; // descarta a conexão aqui; Drop vai notar que é None.
227 }
228}
229
230impl std::ops::Deref for PooledConnection {
231 type Target = Connection;
232 fn deref(&self) -> &Connection {
233 self.conn.as_ref().expect("conexão já descartada")
234 }
235}
236
237impl std::ops::DerefMut for PooledConnection {
238 fn deref_mut(&mut self) -> &mut Connection {
239 self.conn.as_mut().expect("conexão já descartada")
240 }
241}
242
243impl Drop for PooledConnection {
244 fn drop(&mut self) {
245 // Devolve a conexão à fila de ociosas (se não foi descartada).
246 if let Some(conn) = self.conn.take() {
247 self.pool.return_conn(conn);
248 }
249 // O permit é liberado aqui, abrindo espaço no semáforo.
250 drop(self.permit.take());
251 }
252}