1use crate::{
2 ColumnDef, Context, Dataset, Driver, DynQuery, Error, Executor, Expression, Query,
3 QueryBuilder, RawQuery, Result, Row, RowLabeled, RowsAffected, TableRef, Value, future::Either,
4 stream::Stream, truncate_long, writer::SqlWriter,
5};
6use futures::{FutureExt, StreamExt};
7use log::Level;
8use std::{
9 future::{self, Future},
10 pin::pin,
11 sync::Arc,
12};
13
14pub trait Entity {
16 type PrimaryKey<'a>
18 where
19 Self: 'a;
20
21 fn table() -> &'static TableRef;
23
24 fn columns() -> &'static [ColumnDef];
26
27 fn primary_key_def() -> &'static [&'static ColumnDef];
29
30 fn primary_key(&self) -> Self::PrimaryKey<'_>;
32
33 fn primary_key_expr(&self) -> impl Expression;
34
35 fn unique_defs()
37 -> impl ExactSizeIterator<Item = impl ExactSizeIterator<Item = &'static ColumnDef>>;
38
39 fn row_filtered(&self) -> Box<[(&'static str, Value)]>;
42
43 fn row_full(&self) -> Row;
45
46 fn row_labeled(&self) -> RowLabeled {
47 RowLabeled {
48 labels: Self::columns()
49 .into_iter()
50 .map(|v| v.name().to_string())
51 .collect::<Arc<[String]>>(),
52 values: self.row_full(),
53 }
54 }
55
56 fn from_row(row: RowLabeled) -> Result<Self>
60 where
61 Self: Sized;
62
63 fn create_table(
69 executor: &mut impl Executor,
70 if_not_exists: bool,
71 create_schema: bool,
72 ) -> impl Future<Output = Result<()>> + Send
73 where
74 Self: Sized,
75 {
76 async move {
77 let mut query = DynQuery::with_capacity(2048);
78 let writer = executor.driver().sql_writer();
79 if create_schema && !Self::table().schema.is_empty() {
80 writer.write_create_schema::<Self>(&mut query, true);
81 }
82 if !executor.accepts_multiple_statements() && !query.is_empty() {
83 let mut q = query.into_query(executor.driver());
84 executor.execute(&mut q).await?;
85 query = q.into();
87 query.buffer().clear();
88 }
89 writer.write_create_table::<Self>(&mut query, if_not_exists);
90 executor.execute(query).await.map(|_| ())
91 }
92 }
93
94 fn drop_table(
100 executor: &mut impl Executor,
101 if_exists: bool,
102 drop_schema: bool,
103 ) -> impl Future<Output = Result<()>> + Send
104 where
105 Self: Sized,
106 {
107 async move {
108 let mut query = DynQuery::with_capacity(256);
109 let writer = executor.driver().sql_writer();
110 writer.write_drop_table::<Self>(&mut query, if_exists);
111 if drop_schema && !Self::table().schema.is_empty() {
112 if !executor.accepts_multiple_statements() {
113 let mut q = query.into_query(executor.driver());
114 executor.execute(&mut q).await?;
115 query = q.into();
117 query.buffer().clear();
118 }
119 writer.write_drop_schema::<Self>(&mut query, true);
120 }
121 executor.execute(query).await.map(|_| ())
122 }
123 }
124
125 fn insert_one(
129 executor: &mut impl Executor,
130 entity: &impl Entity,
131 ) -> impl Future<Output = Result<RowsAffected>> + Send {
132 let mut query = DynQuery::with_capacity(128);
133 executor
134 .driver()
135 .sql_writer()
136 .write_insert(&mut query, [entity], false);
137 executor.execute(query)
138 }
139
140 fn insert_many<'a, It>(
144 executor: &mut impl Executor,
145 items: It,
146 ) -> impl Future<Output = Result<RowsAffected>> + Send
147 where
148 Self: Sized + 'a,
149 It: IntoIterator<Item = &'a Self> + Send,
150 <It as IntoIterator>::IntoIter: Send,
151 {
152 executor.append(items)
153 }
154
155 fn prepare_find<Exec: Executor>(
159 executor: &mut Exec,
160 condition: impl Expression,
161 limit: Option<u32>,
162 ) -> impl Future<Output = Result<Query<Exec::Driver>>> {
163 let builder = QueryBuilder::new()
164 .select(Self::columns())
165 .from(Self::table())
166 .where_expr(condition)
167 .limit(limit);
168 let writer = executor.driver().sql_writer();
169 let mut query = DynQuery::default();
170 writer.write_select(&mut query, &builder);
171 async {
172 if let DynQuery::Raw(RawQuery(sql)) = query {
173 executor.prepare(sql).await
174 } else {
175 Ok(query.into())
176 }
177 }
178 }
179
180 fn find_one(
184 executor: &mut impl Executor,
185 condition: impl Expression,
186 ) -> impl Future<Output = Result<Option<Self>>> + Send
187 where
188 Self: Sized,
189 {
190 let stream = Self::find_many(executor, condition, Some(1));
191 async move { pin!(stream).into_future().map(|(v, _)| v).await.transpose() }
192 }
193
194 fn find_many(
199 executor: &mut impl Executor,
200 condition: impl Expression,
201 limit: Option<u32>,
202 ) -> impl Stream<Item = Result<Self>> + Send
203 where
204 Self: Sized,
205 {
206 let builder = QueryBuilder::new()
207 .select(Self::columns())
208 .from(Self::table())
209 .where_expr(condition)
210 .limit(limit);
211 executor
212 .fetch(builder.build(&executor.driver()))
213 .map(|result| result.and_then(Self::from_row))
214 }
215
216 fn delete_many(
220 executor: &mut impl Executor,
221 condition: impl Expression,
222 ) -> impl Future<Output = Result<RowsAffected>> + Send
223 where
224 Self: Sized,
225 {
226 let mut query = DynQuery::with_capacity(128);
227 executor
228 .driver()
229 .sql_writer()
230 .write_delete::<Self>(&mut query, condition);
231 executor.execute(query)
232 }
233
234 fn save(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
240 where
241 Self: Sized,
242 {
243 if Self::primary_key_def().len() == 0 {
244 let error = Error::msg(
245 "Cannot save an entity without a primary key, it would always result in an insert",
246 );
247 log::error!("{:#}", error);
248 return Either::Left(future::ready(Err(error)));
249 }
250 let mut query = DynQuery::with_capacity(512);
251 executor
252 .driver()
253 .sql_writer()
254 .write_insert(&mut query, [self], true);
255 let sql = query.as_str();
256 let context = format!("While saving using the query {}", truncate_long!(sql));
257 Either::Right(executor.execute(query).map(|mut v| {
258 if let Ok(result) = v
259 && let Some(affected) = result.rows_affected
260 && affected > 2
261 {
262 v = Err(Error::msg(format!(
263 "The driver returned affected rows: {affected} (expected <= 2)"
264 )));
265 }
266 match v {
267 Ok(_) => Ok(()),
268 Err(e) => {
269 let e = e.context(context);
270 log::error!("{e:#}");
271 Err(e)
272 }
273 }
274 }))
275 }
276
277 fn delete(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
284 where
285 Self: Sized,
286 {
287 if Self::primary_key_def().len() == 0 {
288 let error = Error::msg(
289 "Cannot delete an entity without a primary key, it would delete nothing",
290 );
291 log::error!("{:#}", error);
292 return Either::Left(future::ready(Err(error)));
293 }
294 Either::Right(
295 Self::delete_many(executor, self.primary_key_expr()).map(|v| {
296 v.and_then(|v| {
297 if let Some(affected) = v.rows_affected {
298 if affected != 1 {
299 let error = Error::msg(format!(
300 "The query deleted {affected} rows instead of the expected 1"
301 ));
302 log::log!(
303 if affected == 0 {
304 Level::Info
305 } else {
306 Level::Error
307 },
308 "{error}",
309 );
310 return Err(error);
311 }
312 }
313 Ok(())
314 })
315 }),
316 )
317 }
318}
319
320impl<E: Entity> Dataset for E {
321 fn qualified_columns() -> bool
325 where
326 Self: Sized,
327 {
328 false
329 }
330
331 fn write_query(&self, writer: &dyn SqlWriter, context: &mut Context, out: &mut DynQuery) {
333 Self::table().write_query(writer, context, out);
334 }
335
336 fn table_ref(&self) -> TableRef {
337 Self::table().clone()
338 }
339}