edgedb_client/pool/
implementation.rs

1use async_std::channel::unbounded;
2use async_std::task;
3use async_std::sync::{Arc, Mutex, MutexGuard};
4
5use bytes::Bytes;
6
7use edgedb_protocol::QueryResult;
8use edgedb_protocol::client_message::{IoFormat, Cardinality};
9use edgedb_protocol::query_arg::QueryArgs;
10use edgedb_protocol::value::Value;
11
12use crate::ExecuteResult;
13use crate::model::Json;
14use crate::builder::Config;
15use crate::client::{Connection, StatementParams};
16use crate::errors::{Error, ErrorKind, NoDataError, NoResultExpected};
17use crate::pool::command::Command;
18use crate::pool::main;
19use crate::pool::{Client, PoolInner, PoolState, PoolConn, Options};
20
21pub enum InProgressState {
22    Connecting,
23    Comitting,
24    Done,
25}
26
27struct InProgress {
28    state: InProgressState,
29    pool: Arc<PoolInner>,
30}
31
32impl InProgress {
33    fn new(mut guard: MutexGuard<'_, main::Inner>, pool: &Arc<PoolInner>)
34        -> InProgress
35    {
36        guard.in_progress += 1;
37        InProgress { pool: pool.clone(), state: InProgressState::Connecting }
38    }
39    async fn commit(mut self) {
40        self.state = InProgressState::Comitting;
41        let mut inner = self.pool.state.inner.lock().await;
42        inner.in_progress -= 1;
43        inner.acquired_conns += 1;
44        self.state = InProgressState::Done;
45    }
46}
47
48impl Drop for InProgress {
49    fn drop(&mut self) {
50        use InProgressState::*;
51
52        match self.state {
53            Connecting => {
54                self.pool.chan.try_send(Command::ConnectionCanceled).ok();
55            }
56            Comitting => {
57                self.pool.chan.try_send(Command::ConnectionEstablished).ok();
58            }
59            Done => {}
60        }
61    }
62}
63
64impl PoolInner {
65    async fn query<R, A>(self: &Arc<Self>, request: &str, arguments: &A,
66        bld: &StatementParams)
67        -> Result<Vec<R>, Error>
68        where A: QueryArgs,
69              R: QueryResult,
70    {
71        // TODO(tailhook) retry loop
72        let mut conn = self.acquire().await?;
73        conn.query(request, arguments, bld).await
74    }
75}
76
77impl Client {
78    /// Create a new connection pool.
79    ///
80    /// Note this does not create a connection immediately.
81    /// Use [`ensure_connected()`][Client::ensure_connected] to establish a
82    /// connection and verify that the connection is usable.
83    pub fn new(builder: Config) -> Client {
84        let (chan, rcv) = unbounded();
85        let state = Arc::new(PoolState::new(builder));
86        let state2 = state.clone();
87        let task = Mutex::new(Some(task::spawn(main::main(state2, rcv))));
88        Client {
89            options: Arc::new(Options {}),
90            inner: Arc::new(PoolInner {
91                chan,
92                task,
93                state,
94            }),
95        }
96    }
97
98    /// Start shutting down the connection pool.
99    ///
100    /// Note that this waits for all connections to be released when called
101    /// for the first time. But if it is called multiple times concurrently,
102    /// only the first call will wait and subsequent call will exit
103    /// immediately.
104    pub async fn close(&self) {
105        self.inner.chan.send(Command::Close).await.ok();
106        if let Some(task) = self.inner.task.lock().await.take() {
107            task.await;
108        }
109    }
110
111    /// Ensure that there is at least one working connection to the pool.
112    ///
113    /// This can be used at application startup to ensure that you have a
114    /// working connection.
115    pub async fn ensure_connected(&self) -> Result<(), Error> {
116        self.inner.acquire().await?;
117        Ok(())
118    }
119
120    /// Execute a query and return a collection of results.
121    ///
122    /// You will usually have to specify the return type for the query:
123    ///
124    /// ```rust,ignore
125    /// let greeting = pool.query::<String, _>("SELECT 'hello'", &());
126    /// // or
127    /// let greeting: Vec<String> = pool.query("SELECT 'hello'", &());
128    /// ```
129    ///
130    /// This method can be used with both static arguments, like a tuple of
131    /// scalars, and with dynamic arguments [`edgedb_protocol::value::Value`].
132    /// Similarly, dynamically typed results are also supported.
133    pub async fn query<R, A>(&self, request: &str, arguments: &A)
134        -> Result<Vec<R>, Error>
135        where A: QueryArgs,
136              R: QueryResult,
137    {
138        self.inner.query(request, arguments, &StatementParams::new()).await
139    }
140
141    /// Execute a query and return a single result.
142    ///
143    /// You will usually have to specify the return type for the query:
144    ///
145    /// ```rust,ignore
146    /// let greeting = pool.query_single::<String, _>("SELECT 'hello'", &());
147    /// // or
148    /// let greeting: String = pool.query_single("SELECT 'hello'", &());
149    /// ```
150    ///
151    /// The query must return exactly one element. If the query returns more
152    /// than one element, a
153    /// [`ResultCardinalityMismatchError`][crate::errors::ResultCardinalityMismatchError]
154    /// is raised. If the query returns an empty set, a
155    /// [`NoDataError`][crate::errors::NoDataError] is raised.
156    ///
157    /// This method can be used with both static arguments, like a tuple of
158    /// scalars, and with dynamic arguments [`edgedb_protocol::value::Value`].
159    /// Similarly, dynamically typed results are also supported.
160    pub async fn query_single<R, A>(&self, request: &str, arguments: &A)
161        -> Result<R, Error>
162        where A: QueryArgs,
163              R: QueryResult,
164    {
165        let result = self.inner.query(request, arguments,
166            StatementParams::new()
167            .cardinality(Cardinality::AtMostOne)
168        ).await?;
169        result.into_iter().next()
170            .ok_or_else(|| {
171                NoDataError::with_message(
172                    "query_single() returned zero results")
173            })
174    }
175
176    /// Execute a query and return the result as JSON.
177    pub async fn query_json<A>(&self, request: &str, arguments: &A)
178        -> Result<Json, Error>
179        where A: QueryArgs,
180    {
181        let result = self.inner.query(request, arguments,
182            StatementParams::new()
183            .io_format(IoFormat::Json),
184        ).await?;
185        result.into_iter().next()
186            // we trust database to produce valid json
187            .map(|v| unsafe { Json::new_unchecked(v) })
188            .ok_or_else(|| {
189                NoDataError::with_message("query row returned zero results")
190            })
191    }
192
193    /// Execute a query and return a single result as JSON.
194    ///
195    /// The query must return exactly one element. If the query returns more
196    /// than one element, a
197    /// [`ResultCardinalityMismatchError`][crate::errors::ResultCardinalityMismatchError]
198    /// is raised. If the query returns an empty set, a
199    /// [`NoDataError`][crate::errors::NoDataError] is raised.
200    pub async fn query_single_json<A>(&self, request: &str, arguments: &A)
201        -> Result<Json, Error>
202        where A: QueryArgs,
203    {
204        let result = self.inner.query(request, arguments,
205            StatementParams::new()
206            .io_format(IoFormat::Json)
207            .cardinality(Cardinality::AtMostOne)
208        ).await?;
209        result.into_iter().next()
210            // we trust database to produce valid json
211            .map(|v| unsafe { Json::new_unchecked(v) })
212            .ok_or_else(|| {
213                NoDataError::with_message("query row returned zero results")
214            })
215    }
216    /// Execute one or more EdgeQL commands.
217    ///
218    /// Note that if you want the results of query, use
219    /// [`query()`][Client::query] or [`query_single()`][Client::query_single]
220    /// instead.
221    pub async fn execute<A>(&self, request: &str, arguments: &A)
222        -> Result<ExecuteResult, Error>
223        where A: QueryArgs,
224    {
225        let result = self.inner.query::<Value, _>(request, arguments,
226                StatementParams::new()
227                .cardinality(Cardinality::Many) // TODO: NoResult
228            ).await;
229        match result {
230            // TODO(tailhook) propagate better rather than returning nothing
231            Ok(_) => Ok(ExecuteResult { marker: Bytes::from_static(b"") }),
232            Err(e) if e.is::<NoResultExpected>() => {
233                // TODO(tailhook) propagate better rather than parsing a
234                // message
235                match e.initial_message() {
236                    Some(m) => {
237                        Ok(ExecuteResult {
238                            marker: Bytes::from(m.as_bytes().to_vec()),
239                        })
240                    }
241                    None => {
242                        Ok(ExecuteResult { marker: Bytes::from_static(b"") })
243                    }
244                }
245            }
246            Err(e) => return Err(e),
247        }
248    }
249}
250
251impl PoolInner {
252    pub(crate) async fn acquire(self: &Arc<Self>) -> Result<PoolConn, Error> {
253        let mut inner = self.state.inner.lock().await;
254        loop {
255            if let Some(conn) = inner.conns.pop_front() {
256                assert!(conn.is_consistent());
257                inner.acquired_conns += 1;
258                return Ok(PoolConn { conn: Some(conn), pool: self.clone() });
259            }
260            let in_pool = inner.in_progress + inner.acquired_conns;
261            if in_pool < self.state.config.0.max_connections {
262                let guard = InProgress::new(inner, self);
263                let conn = self.state.config.private_connect().await?;
264                // Make sure that connection is wrapped before we commit,
265                // so that connection is returned into a pool if we fail
266                // to commit because of async stuff
267                let conn = PoolConn { conn: Some(conn), pool: self.clone() };
268                guard.commit().await;
269                return Ok(conn);
270            }
271            inner = self.state.connection_released.wait(inner).await;
272        }
273    }
274    pub(crate) fn release(&self, conn: Connection) {
275        self.chan.try_send(Command::Release(conn)).ok();
276    }
277}
278
279impl Drop for PoolInner {
280    fn drop(&mut self) {
281        // If task is locked (i.e. try_lock returns an error) it means
282        // somebody is currently waiting for pool to be closed, which is fine.
283        self.task.try_lock()
284            .and_then(|mut task| task.take().map(|t| t.cancel()));
285    }
286}