1use bytes::Bytes;
2
3use edgedb_protocol::QueryResult;
4use edgedb_protocol::query_arg::QueryArgs;
5use edgedb_protocol::descriptors::OutputTypedesc;
6use edgedb_protocol::client_message::{Cardinality, IoFormat};
7
8use crate::errors::{Error, NoResultExpected, NoDataError, ErrorKind};
9use crate::Client;
10use crate::client::StatementParams;
11use crate::model::Json;
12
13
14#[derive(Debug, Clone)]
16pub struct ExecuteResult {
17 #[allow(dead_code)] pub(crate) marker: Bytes,
19}
20
21struct Statement<'a, A> {
22 params: StatementParams,
23 query: &'a str,
24 arguments: &'a A,
25}
26
27pub struct GenericResult {
28 pub(crate) descriptor: OutputTypedesc,
29 pub(crate) data: Vec<Bytes>,
30 pub(crate) completion: Bytes,
31}
32
33pub trait GenericQuery: Send + Sync {
34 fn query(&self) -> &str;
35 fn arguments(&self) -> &dyn QueryArgs;
36 fn params(&self) -> &StatementParams;
37}
38
39pub trait Encoder: Send + Sync {
40}
41pub trait Decoder: Send + Sync {
42}
43pub trait Decodable {
44}
45#[async_trait::async_trait]
46pub trait Sealed {
47 async fn query_dynamic(&mut self, query: &dyn GenericQuery)
48 -> Result<GenericResult, Error>;
49}
50
51pub trait Executor: Sealed {
68}
69
70#[async_trait::async_trait]
71impl Sealed for Client {
72 async fn query_dynamic(&mut self, query: &dyn GenericQuery)
73 -> Result<GenericResult, Error>
74 {
75 let mut conn = self.inner.acquire().await?;
77 conn.query_dynamic(query).await
78 }
79}
80
81impl Executor for Client {}
82
83impl dyn Executor + '_ {
84 pub async fn query<R, A>(&mut self, query: &str, arguments: &A)
98 -> Result<Vec<R>, Error>
99 where A: QueryArgs,
100 R: QueryResult,
101 {
102 let result = self.query_dynamic(&Statement {
103 params: StatementParams::new(),
104 query,
105 arguments,
106 }).await?;
107 match result.descriptor.root_pos() {
108 Some(root_pos) => {
109 let ctx = result.descriptor.as_queryable_context();
110 let mut state = R::prepare(&ctx, root_pos)?;
111 let mut res = Vec::with_capacity(result.data.len());
112 for datum in result.data.into_iter() {
113 res.push(R::decode(&mut state, &datum)?);
114 }
115 Ok(res)
116 }
117 None => {
118 Err(NoResultExpected::with_message(
119 String::from_utf8_lossy(&result.completion[..])
120 .to_string()))?
121 }
122 }
123 }
124
125 pub async fn query_single<R, A>(&mut self, query: &str, arguments: &A)
145 -> Result<R, Error>
146 where A: QueryArgs,
147 R: QueryResult,
148 {
149 let result = self.query_dynamic(&Statement {
150 params: StatementParams::new()
151 .cardinality(Cardinality::AtMostOne)
152 .clone(),
153 query,
154 arguments,
155 }).await?;
156 match result.descriptor.root_pos() {
157 Some(root_pos) => {
158 let ctx = result.descriptor.as_queryable_context();
159 let mut state = R::prepare(&ctx, root_pos)?;
160 if result.data.len() == 0 {
161 return Err(NoDataError::with_message(
162 "query_single() returned zero results"))
163 }
164 return Ok(R::decode(&mut state, &result.data[0])?)
165 }
166 None => {
167 Err(NoResultExpected::with_message(
168 String::from_utf8_lossy(&result.completion[..])
169 .to_string()))?
170 }
171 }
172 }
173
174 pub async fn query_json<A>(&mut self, query: &str, arguments: &A)
176 -> Result<Json, Error>
177 where A: QueryArgs,
178 {
179 let result = self.query_dynamic(&Statement {
180 params: StatementParams::new()
181 .io_format(IoFormat::Json)
182 .clone(),
183 query,
184 arguments,
185 }).await?;
186 match result.descriptor.root_pos() {
187 Some(root_pos) => {
188 let ctx = result.descriptor.as_queryable_context();
189 let mut state = <String as QueryResult>
190 ::prepare(&ctx, root_pos)?;
191 if result.data.len() == 0 {
192 return Err(NoDataError::with_message(
193 "query_json() returned zero results"))
194 }
195 let data = <String as QueryResult>::decode(
196 &mut state, &result.data[0])?;
197 let json = unsafe { Json::new_unchecked(data) };
199 return Ok(json)
200 }
201 None => {
202 Err(NoResultExpected::with_message(
203 String::from_utf8_lossy(&result.completion[..])
204 .to_string()))?
205 }
206 }
207 }
208
209 pub async fn query_single_json<A>(&mut self, query: &str, arguments: &A)
217 -> Result<Json, Error>
218 where A: QueryArgs,
219 {
220 let result = self.query_dynamic(&Statement {
221 params: StatementParams::new()
222 .io_format(IoFormat::Json)
223 .cardinality(Cardinality::AtMostOne)
224 .clone(),
225 query,
226 arguments,
227 }).await?;
228 match result.descriptor.root_pos() {
229 Some(root_pos) => {
230 let ctx = result.descriptor.as_queryable_context();
231 let mut state = <String as QueryResult>
232 ::prepare(&ctx, root_pos)?;
233 if result.data.len() == 0 {
234 return Err(NoDataError::with_message(
235 "query_single_json() returned zero results"))
236 }
237 let data = <String as QueryResult>::decode(
238 &mut state, &result.data[0])?;
239 let json = unsafe { Json::new_unchecked(data) };
241 return Ok(json)
242 }
243 None => {
244 Err(NoResultExpected::with_message(
245 String::from_utf8_lossy(&result.completion[..])
246 .to_string()))?
247 }
248 }
249 }
250 pub async fn execute<A>(&mut self, query: &str, arguments: &A)
256 -> Result<ExecuteResult, Error>
257 where A: QueryArgs,
258 {
259 let result = self.query_dynamic(&Statement {
260 params: StatementParams::new(),
261 query,
262 arguments,
263 }).await?;
264 return Ok(ExecuteResult { marker: result.completion });
266 }
267}
268
269impl<A: QueryArgs + Send + Sync + Sized> GenericQuery for Statement<'_, A> {
270 fn query(&self) -> &str {
271 &self.query
272 }
273 fn arguments(&self) -> &dyn QueryArgs {
274 self.arguments
275 }
276 fn params(&self) -> &StatementParams {
277 &self.params
278 }
279}