1use crate::{
2 ColumnDef, Context, Dataset, Driver, DynQuery, Error, Executor, Expression, Query,
3 QueryBuilder, RawQuery, Result, Row, RowValues, RowsAffected, TableRef, future::Either,
4 stream::Stream, truncate_long, writer::SqlWriter,
5};
6use futures::{FutureExt, StreamExt};
7use log::Level;
8use std::{
9 future::{self, Future},
10 pin::pin,
11 sync::Arc,
12};
13
14pub trait Entity {
18 type PrimaryKey<'a>
20 where
21 Self: 'a;
22
23 fn table() -> &'static TableRef;
25
26 fn columns() -> &'static [ColumnDef];
28
29 fn primary_key_def() -> &'static [&'static ColumnDef];
31
32 fn primary_key(&self) -> Self::PrimaryKey<'_>;
34
35 fn primary_key_expr(&self) -> impl Expression;
37
38 fn unique_defs()
40 -> impl ExactSizeIterator<Item = impl ExactSizeIterator<Item = &'static ColumnDef>>;
41
42 fn row_values(&self) -> RowValues;
44
45 fn row(&self) -> Row {
47 Row {
48 labels: Self::columns()
49 .into_iter()
50 .map(|v| v.name().to_string())
51 .collect::<Arc<[String]>>(),
52 values: self.row_values(),
53 }
54 }
55
56 fn from_row(row: Row) -> Result<Self>
60 where
61 Self: Sized;
62
63 fn create_table(
68 executor: &mut impl Executor,
69 if_not_exists: bool,
70 create_schema: bool,
71 ) -> impl Future<Output = Result<()>> + Send
72 where
73 Self: Sized,
74 {
75 async move {
76 let mut query = DynQuery::with_capacity(2048);
77 let writer = executor.driver().sql_writer();
78 if create_schema && !Self::table().schema.is_empty() {
79 writer.write_create_schema::<Self>(&mut query, true);
80 }
81 if !executor.accepts_multiple_statements() && !query.is_empty() {
82 let mut q = query.into_query(executor.driver());
83 executor.execute(&mut q).await?;
84 query = q.into();
86 query.buffer().clear();
87 }
88 writer.write_create_table::<Self>(&mut query, if_not_exists);
89 executor.execute(query).await.map(|_| ())
90 }
91 }
92
93 fn drop_table(
98 executor: &mut impl Executor,
99 if_exists: bool,
100 drop_schema: bool,
101 ) -> impl Future<Output = Result<()>> + Send
102 where
103 Self: Sized,
104 {
105 async move {
106 let mut query = DynQuery::with_capacity(256);
107 let writer = executor.driver().sql_writer();
108 writer.write_drop_table::<Self>(&mut query, if_exists);
109 if drop_schema && !Self::table().schema.is_empty() {
110 if !executor.accepts_multiple_statements() {
111 let mut q = query.into_query(executor.driver());
112 executor.execute(&mut q).await?;
113 query = q.into();
115 query.buffer().clear();
116 }
117 writer.write_drop_schema::<Self>(&mut query, true);
118 }
119 executor.execute(query).await.map(|_| ())
120 }
121 }
122
123 fn insert_one(
125 executor: &mut impl Executor,
126 entity: &impl Entity,
127 ) -> impl Future<Output = Result<RowsAffected>> + Send {
128 let mut query = DynQuery::with_capacity(128);
129 executor
130 .driver()
131 .sql_writer()
132 .write_insert(&mut query, [entity], false);
133 executor.execute(query)
134 }
135
136 fn insert_many<'a, It>(
138 executor: &mut impl Executor,
139 items: It,
140 ) -> impl Future<Output = Result<RowsAffected>> + Send
141 where
142 Self: Sized + 'a,
143 It: IntoIterator<Item = &'a Self> + Send,
144 <It as IntoIterator>::IntoIter: Send,
145 {
146 executor.append(items)
147 }
148
149 fn prepare_find<Exec: Executor>(
153 executor: &mut Exec,
154 condition: impl Expression,
155 limit: Option<u32>,
156 ) -> impl Future<Output = Result<Query<Exec::Driver>>> {
157 let builder = QueryBuilder::new()
158 .select(Self::columns())
159 .from(Self::table())
160 .where_expr(condition)
161 .limit(limit);
162 let writer = executor.driver().sql_writer();
163 let mut query = DynQuery::default();
164 writer.write_select(&mut query, &builder);
165 async {
166 if let DynQuery::Raw(RawQuery(sql)) = query {
167 executor.prepare(sql).await
168 } else {
169 Ok(query.into())
170 }
171 }
172 }
173
174 fn find_one(
178 executor: &mut impl Executor,
179 condition: impl Expression,
180 ) -> impl Future<Output = Result<Option<Self>>> + Send
181 where
182 Self: Sized,
183 {
184 let stream = Self::find_many(executor, condition, Some(1));
185 async move { pin!(stream).into_future().map(|(v, _)| v).await.transpose() }
186 }
187
188 fn find_many(
193 executor: &mut impl Executor,
194 condition: impl Expression,
195 limit: Option<u32>,
196 ) -> impl Stream<Item = Result<Self>> + Send
197 where
198 Self: Sized,
199 {
200 let builder = QueryBuilder::new()
201 .select(Self::columns())
202 .from(Self::table())
203 .where_expr(condition)
204 .limit(limit);
205 executor
206 .fetch(builder.build(&executor.driver()))
207 .map(|result| result.and_then(Self::from_row))
208 }
209
210 fn delete_many(
214 executor: &mut impl Executor,
215 condition: impl Expression,
216 ) -> impl Future<Output = Result<RowsAffected>> + Send
217 where
218 Self: Sized,
219 {
220 let mut query = DynQuery::with_capacity(128);
221 executor
222 .driver()
223 .sql_writer()
224 .write_delete::<Self>(&mut query, condition);
225 executor.execute(query)
226 }
227
228 fn save(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
234 where
235 Self: Sized,
236 {
237 if Self::primary_key_def().len() == 0 {
238 let error = Error::msg(
239 "Cannot save an entity without a primary key, it would always result in an insert",
240 );
241 log::error!("{:#}", error);
242 return Either::Left(future::ready(Err(error)));
243 }
244 let mut query = DynQuery::with_capacity(512);
245 executor
246 .driver()
247 .sql_writer()
248 .write_insert(&mut query, [self], true);
249 let sql = query.as_str();
250 let context = format!("While saving using the query {}", truncate_long!(sql));
251 Either::Right(executor.execute(query).map(|mut v| {
252 if let Ok(result) = v
253 && let Some(affected) = result.rows_affected
254 && affected > 2
255 {
256 v = Err(Error::msg(format!(
257 "The driver returned affected rows: {affected} (expected <= 2)"
258 )));
259 }
260 match v {
261 Ok(_) => Ok(()),
262 Err(e) => {
263 let e = e.context(context);
264 Err(e)
265 }
266 }
267 }))
268 }
269
270 fn delete(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
277 where
278 Self: Sized,
279 {
280 if Self::primary_key_def().len() == 0 {
281 let error = Error::msg(
282 "Cannot delete an entity without a primary key, it would delete nothing",
283 );
284 log::error!("{:#}", error);
285 return Either::Left(future::ready(Err(error)));
286 }
287 Either::Right(
288 Self::delete_many(executor, self.primary_key_expr()).map(|v| {
289 v.and_then(|v| {
290 if let Some(affected) = v.rows_affected {
291 if affected != 1 {
292 let error = Error::msg(format!(
293 "The query deleted {affected} rows instead of the expected 1"
294 ));
295 log::log!(
296 if affected == 0 {
297 Level::Info
298 } else {
299 Level::Error
300 },
301 "{error}",
302 );
303 return Err(error);
304 }
305 }
306 Ok(())
307 })
308 }),
309 )
310 }
311}
312
313impl<E: Entity> Dataset for E {
314 fn qualified_columns() -> bool
318 where
319 Self: Sized,
320 {
321 false
322 }
323
324 fn write_query(&self, writer: &dyn SqlWriter, context: &mut Context, out: &mut DynQuery) {
326 Self::table().write_query(writer, context, out);
327 }
328
329 fn table_ref(&self) -> TableRef {
330 Self::table().clone()
331 }
332}