edgedb_client/pool/
implementation.rs1use async_std::channel::unbounded;
2use async_std::task;
3use async_std::sync::{Arc, Mutex, MutexGuard};
4
5use bytes::Bytes;
6
7use edgedb_protocol::QueryResult;
8use edgedb_protocol::client_message::{IoFormat, Cardinality};
9use edgedb_protocol::query_arg::QueryArgs;
10use edgedb_protocol::value::Value;
11
12use crate::ExecuteResult;
13use crate::model::Json;
14use crate::builder::Config;
15use crate::client::{Connection, StatementParams};
16use crate::errors::{Error, ErrorKind, NoDataError, NoResultExpected};
17use crate::pool::command::Command;
18use crate::pool::main;
19use crate::pool::{Client, PoolInner, PoolState, PoolConn, Options};
20
21pub enum InProgressState {
22 Connecting,
23 Comitting,
24 Done,
25}
26
27struct InProgress {
28 state: InProgressState,
29 pool: Arc<PoolInner>,
30}
31
32impl InProgress {
33 fn new(mut guard: MutexGuard<'_, main::Inner>, pool: &Arc<PoolInner>)
34 -> InProgress
35 {
36 guard.in_progress += 1;
37 InProgress { pool: pool.clone(), state: InProgressState::Connecting }
38 }
39 async fn commit(mut self) {
40 self.state = InProgressState::Comitting;
41 let mut inner = self.pool.state.inner.lock().await;
42 inner.in_progress -= 1;
43 inner.acquired_conns += 1;
44 self.state = InProgressState::Done;
45 }
46}
47
48impl Drop for InProgress {
49 fn drop(&mut self) {
50 use InProgressState::*;
51
52 match self.state {
53 Connecting => {
54 self.pool.chan.try_send(Command::ConnectionCanceled).ok();
55 }
56 Comitting => {
57 self.pool.chan.try_send(Command::ConnectionEstablished).ok();
58 }
59 Done => {}
60 }
61 }
62}
63
64impl PoolInner {
65 async fn query<R, A>(self: &Arc<Self>, request: &str, arguments: &A,
66 bld: &StatementParams)
67 -> Result<Vec<R>, Error>
68 where A: QueryArgs,
69 R: QueryResult,
70 {
71 let mut conn = self.acquire().await?;
73 conn.query(request, arguments, bld).await
74 }
75}
76
77impl Client {
78 pub fn new(builder: Config) -> Client {
84 let (chan, rcv) = unbounded();
85 let state = Arc::new(PoolState::new(builder));
86 let state2 = state.clone();
87 let task = Mutex::new(Some(task::spawn(main::main(state2, rcv))));
88 Client {
89 options: Arc::new(Options {}),
90 inner: Arc::new(PoolInner {
91 chan,
92 task,
93 state,
94 }),
95 }
96 }
97
98 pub async fn close(&self) {
105 self.inner.chan.send(Command::Close).await.ok();
106 if let Some(task) = self.inner.task.lock().await.take() {
107 task.await;
108 }
109 }
110
111 pub async fn ensure_connected(&self) -> Result<(), Error> {
116 self.inner.acquire().await?;
117 Ok(())
118 }
119
120 pub async fn query<R, A>(&self, request: &str, arguments: &A)
134 -> Result<Vec<R>, Error>
135 where A: QueryArgs,
136 R: QueryResult,
137 {
138 self.inner.query(request, arguments, &StatementParams::new()).await
139 }
140
141 pub async fn query_single<R, A>(&self, request: &str, arguments: &A)
161 -> Result<R, Error>
162 where A: QueryArgs,
163 R: QueryResult,
164 {
165 let result = self.inner.query(request, arguments,
166 StatementParams::new()
167 .cardinality(Cardinality::AtMostOne)
168 ).await?;
169 result.into_iter().next()
170 .ok_or_else(|| {
171 NoDataError::with_message(
172 "query_single() returned zero results")
173 })
174 }
175
176 pub async fn query_json<A>(&self, request: &str, arguments: &A)
178 -> Result<Json, Error>
179 where A: QueryArgs,
180 {
181 let result = self.inner.query(request, arguments,
182 StatementParams::new()
183 .io_format(IoFormat::Json),
184 ).await?;
185 result.into_iter().next()
186 .map(|v| unsafe { Json::new_unchecked(v) })
188 .ok_or_else(|| {
189 NoDataError::with_message("query row returned zero results")
190 })
191 }
192
193 pub async fn query_single_json<A>(&self, request: &str, arguments: &A)
201 -> Result<Json, Error>
202 where A: QueryArgs,
203 {
204 let result = self.inner.query(request, arguments,
205 StatementParams::new()
206 .io_format(IoFormat::Json)
207 .cardinality(Cardinality::AtMostOne)
208 ).await?;
209 result.into_iter().next()
210 .map(|v| unsafe { Json::new_unchecked(v) })
212 .ok_or_else(|| {
213 NoDataError::with_message("query row returned zero results")
214 })
215 }
216 pub async fn execute<A>(&self, request: &str, arguments: &A)
222 -> Result<ExecuteResult, Error>
223 where A: QueryArgs,
224 {
225 let result = self.inner.query::<Value, _>(request, arguments,
226 StatementParams::new()
227 .cardinality(Cardinality::Many) ).await;
229 match result {
230 Ok(_) => Ok(ExecuteResult { marker: Bytes::from_static(b"") }),
232 Err(e) if e.is::<NoResultExpected>() => {
233 match e.initial_message() {
236 Some(m) => {
237 Ok(ExecuteResult {
238 marker: Bytes::from(m.as_bytes().to_vec()),
239 })
240 }
241 None => {
242 Ok(ExecuteResult { marker: Bytes::from_static(b"") })
243 }
244 }
245 }
246 Err(e) => return Err(e),
247 }
248 }
249}
250
251impl PoolInner {
252 pub(crate) async fn acquire(self: &Arc<Self>) -> Result<PoolConn, Error> {
253 let mut inner = self.state.inner.lock().await;
254 loop {
255 if let Some(conn) = inner.conns.pop_front() {
256 assert!(conn.is_consistent());
257 inner.acquired_conns += 1;
258 return Ok(PoolConn { conn: Some(conn), pool: self.clone() });
259 }
260 let in_pool = inner.in_progress + inner.acquired_conns;
261 if in_pool < self.state.config.0.max_connections {
262 let guard = InProgress::new(inner, self);
263 let conn = self.state.config.private_connect().await?;
264 let conn = PoolConn { conn: Some(conn), pool: self.clone() };
268 guard.commit().await;
269 return Ok(conn);
270 }
271 inner = self.state.connection_released.wait(inner).await;
272 }
273 }
274 pub(crate) fn release(&self, conn: Connection) {
275 self.chan.try_send(Command::Release(conn)).ok();
276 }
277}
278
279impl Drop for PoolInner {
280 fn drop(&mut self) {
281 self.task.try_lock()
284 .and_then(|mut task| task.take().map(|t| t.cancel()));
285 }
286}