1use crate::{
2 ColumnDef, Context, DataSet, Driver, DynQuery, Error, Executor, Expression, Query,
3 QueryBuilder, Result, Row, RowLabeled, RowsAffected, TableRef, Value, future::Either,
4 stream::Stream, truncate_long, writer::SqlWriter,
5};
6use futures::{FutureExt, StreamExt};
7use log::Level;
8use std::{
9 future::{self, Future},
10 pin::pin,
11};
12
13pub trait Entity {
15 type PrimaryKey<'a>;
17
18 fn table() -> &'static TableRef;
20
21 fn columns() -> &'static [ColumnDef];
23
24 fn primary_key_def() -> &'static [&'static ColumnDef];
26
27 fn primary_key(&self) -> Self::PrimaryKey<'_>;
29
30 fn unique_defs()
32 -> impl ExactSizeIterator<Item = impl ExactSizeIterator<Item = &'static ColumnDef>>;
33
34 fn row_filtered(&self) -> Box<[(&'static str, Value)]>;
37
38 fn row_full(&self) -> Row;
40
41 fn from_row(row: RowLabeled) -> Result<Self>
45 where
46 Self: Sized;
47
48 fn create_table(
54 executor: &mut impl Executor,
55 if_not_exists: bool,
56 create_schema: bool,
57 ) -> impl Future<Output = Result<()>> + Send
58 where
59 Self: Sized,
60 {
61 async move {
62 let mut query = DynQuery::with_capacity(2048);
63 let writer = executor.driver().sql_writer();
64 if create_schema && !Self::table().schema.is_empty() {
65 writer.write_create_schema::<Self>(&mut query, true);
66 }
67 if !executor.accepts_multiple_statements() && !query.is_empty() {
68 let mut q = query.into_query(executor.driver());
69 executor.execute(&mut q).boxed().await?;
70 query = q.into();
72 query.buffer().clear();
73 }
74 writer.write_create_table::<Self>(&mut query, if_not_exists);
75 executor.execute(query).boxed().await.map(|_| ())
77 }
78 }
79
80 fn drop_table(
86 executor: &mut impl Executor,
87 if_exists: bool,
88 drop_schema: bool,
89 ) -> impl Future<Output = Result<()>> + Send
90 where
91 Self: Sized,
92 {
93 async move {
94 let mut query = DynQuery::with_capacity(256);
95 let writer = executor.driver().sql_writer();
96 writer.write_drop_table::<Self>(&mut query, if_exists);
97 if drop_schema && !Self::table().schema.is_empty() {
98 if !executor.accepts_multiple_statements() {
99 let mut q = query.into_query(executor.driver());
100 executor.execute(&mut q).boxed().await?;
101 query = q.into();
103 query.buffer().clear();
104 }
105 writer.write_drop_schema::<Self>(&mut query, true);
106 }
107 executor.execute(query).boxed().await.map(|_| ())
109 }
110 }
111
112 fn insert_one(
116 executor: &mut impl Executor,
117 entity: &impl Entity,
118 ) -> impl Future<Output = Result<RowsAffected>> + Send {
119 let mut query = DynQuery::with_capacity(128);
120 executor
121 .driver()
122 .sql_writer()
123 .write_insert(&mut query, [entity], false);
124 executor.execute(query)
125 }
126
127 fn insert_many<'a, It>(
131 executor: &mut impl Executor,
132 items: It,
133 ) -> impl Future<Output = Result<RowsAffected>> + Send
134 where
135 Self: Sized + 'a,
136 It: IntoIterator<Item = &'a Self> + Send,
137 <It as IntoIterator>::IntoIter: Send,
138 {
139 executor.append(items)
140 }
141
142 fn prepare_find<Exec: Executor>(
146 executor: &mut Exec,
147 condition: impl Expression,
148 limit: Option<u32>,
149 ) -> impl Future<Output = Result<Query<Exec::Driver>>> {
150 let builder = QueryBuilder::new()
151 .select(Self::columns())
152 .from(Self::table())
153 .where_condition(condition)
154 .limit(limit);
155 let writer = executor.driver().sql_writer();
156 let mut query = DynQuery::default();
157 writer.write_select(&mut query, &builder);
158 executor.prepare(query.into_buffer())
159 }
160
161 fn find_pk(
165 executor: &mut impl Executor,
166 primary_key: &Self::PrimaryKey<'_>,
167 ) -> impl Future<Output = Result<Option<Self>>> + Send
168 where
169 Self: Sized;
170
171 fn find_one(
175 executor: &mut impl Executor,
176 condition: impl Expression,
177 ) -> impl Future<Output = Result<Option<Self>>> + Send
178 where
179 Self: Sized,
180 {
181 let stream = Self::find_many(executor, condition, Some(1));
182 async move { pin!(stream).into_future().map(|(v, _)| v).await.transpose() }
183 }
184
185 fn find_many(
190 executor: &mut impl Executor,
191 condition: impl Expression,
192 limit: Option<u32>,
193 ) -> impl Stream<Item = Result<Self>> + Send
194 where
195 Self: Sized,
196 {
197 let builder = QueryBuilder::new()
198 .select(Self::columns())
199 .from(Self::table())
200 .where_condition(condition)
201 .limit(limit);
202 executor
203 .fetch(builder.build(&executor.driver()))
204 .map(|result| result.and_then(Self::from_row))
205 }
206
207 fn delete_one(
211 executor: &mut impl Executor,
212 primary_key: Self::PrimaryKey<'_>,
213 ) -> impl Future<Output = Result<RowsAffected>> + Send
214 where
215 Self: Sized;
216
217 fn delete_many(
221 executor: &mut impl Executor,
222 condition: impl Expression,
223 ) -> impl Future<Output = Result<RowsAffected>> + Send
224 where
225 Self: Sized,
226 {
227 let mut query = DynQuery::with_capacity(128);
228 executor
229 .driver()
230 .sql_writer()
231 .write_delete::<Self>(&mut query, condition);
232 executor.execute(query)
233 }
234
235 fn save(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
241 where
242 Self: Sized,
243 {
244 if Self::primary_key_def().len() == 0 {
245 let error = Error::msg(
246 "Cannot save an entity without a primary key, it would always result in an insert",
247 );
248 log::error!("{:#}", error);
249 return Either::Left(future::ready(Err(error)));
250 }
251 let mut query = DynQuery::with_capacity(512);
252 executor
253 .driver()
254 .sql_writer()
255 .write_insert(&mut query, [self], true);
256 let sql = query.as_str();
257 let context = format!("While saving using the query: {}", truncate_long!(sql));
258 Either::Right(executor.execute(query).map(|mut v| {
259 if let Ok(result) = v
260 && let Some(affected) = result.rows_affected
261 && affected > 2
262 {
263 v = Err(Error::msg(format!(
264 "The driver returned affected rows: {affected} (expected <= 2)"
265 )));
266 }
267 match v {
268 Ok(_) => Ok(()),
269 Err(e) => {
270 let e = e.context(context);
271 log::error!("{e:#}");
272 Err(e)
273 }
274 }
275 }))
276 }
277
278 fn delete(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
285 where
286 Self: Sized,
287 {
288 if Self::primary_key_def().len() == 0 {
289 let error = Error::msg(
290 "Cannot delete an entity without a primary key, it would delete nothing",
291 );
292 log::error!("{:#}", error);
293 return Either::Left(future::ready(Err(error)));
294 }
295 Either::Right(Self::delete_one(executor, self.primary_key()).map(|v| {
296 v.and_then(|v| {
297 if let Some(affected) = v.rows_affected {
298 if affected != 1 {
299 let error = Error::msg(format!(
300 "The query deleted {affected} rows instead of the expected 1"
301 ));
302 log::log!(
303 if affected == 0 {
304 Level::Info
305 } else {
306 Level::Error
307 },
308 "{error}",
309 );
310 return Err(error);
311 }
312 }
313 Ok(())
314 })
315 }))
316 }
317}
318
319impl<E: Entity> DataSet for E {
320 fn qualified_columns() -> bool
324 where
325 Self: Sized,
326 {
327 false
328 }
329
330 fn write_query(&self, writer: &dyn SqlWriter, context: &mut Context, out: &mut DynQuery) {
332 Self::table().write_query(writer, context, out);
333 }
334
335 fn table_ref(&self) -> TableRef {
336 Self::table().clone()
337 }
338}