1use crate::{
2 ColumnDef, Context, DataSet, Driver, Error, Executor, Expression, Query, Result, Row,
3 RowLabeled, RowsAffected, TableRef, Value, future::Either, stream::Stream, truncate_long,
4 writer::SqlWriter,
5};
6use futures::{FutureExt, StreamExt};
7use log::Level;
8use std::{
9 future::{self, Future},
10 mem,
11 pin::pin,
12};
13
14pub trait Entity {
16 type PrimaryKey<'a>;
18
19 fn table() -> &'static TableRef;
21
22 fn columns() -> &'static [ColumnDef];
24
25 fn primary_key_def() -> &'static [&'static ColumnDef];
27
28 fn primary_key(&self) -> Self::PrimaryKey<'_>;
30
31 fn unique_defs()
33 -> impl ExactSizeIterator<Item = impl ExactSizeIterator<Item = &'static ColumnDef>>;
34
35 fn row_filtered(&self) -> Box<[(&'static str, Value)]>;
38
39 fn row_full(&self) -> Row;
41
42 fn from_row(row: RowLabeled) -> Result<Self>
46 where
47 Self: Sized;
48
49 fn create_table(
55 executor: &mut impl Executor,
56 if_not_exists: bool,
57 create_schema: bool,
58 ) -> impl Future<Output = Result<()>> + Send
59 where
60 Self: Sized,
61 {
62 async move {
63 let mut query = String::with_capacity(2048);
64 let writer = executor.driver().sql_writer();
65 if create_schema && !Self::table().schema().is_empty() {
66 writer.write_create_schema::<Self>(&mut query, true);
67 }
68 if !executor.accepts_multiple_statements() && !query.is_empty() {
69 let mut q = Query::Raw(query);
70 executor.execute(&mut q).boxed().await?;
71 let Query::Raw(mut q) = q else {
72 return Err(Error::msg(
73 "The executor was borrowed a raw query but it did not return it",
74 ));
75 };
76 query = mem::take(&mut q);
78 query.clear();
79 }
80 writer.write_create_table::<Self>(&mut query, if_not_exists);
81 executor.execute(query).boxed().await.map(|_| ())
83 }
84 }
85
86 fn drop_table(
92 executor: &mut impl Executor,
93 if_exists: bool,
94 drop_schema: bool,
95 ) -> impl Future<Output = Result<()>> + Send
96 where
97 Self: Sized,
98 {
99 async move {
100 let mut query = String::with_capacity(256);
101 let writer = executor.driver().sql_writer();
102 writer.write_drop_table::<Self>(&mut query, if_exists);
103 if drop_schema && !Self::table().schema().is_empty() {
104 if !executor.accepts_multiple_statements() {
105 let mut q = Query::Raw(query);
106 executor.execute(&mut q).boxed().await?;
107 let Query::Raw(mut q) = q else {
108 return Err(Error::msg(
109 "The executor was borrowed a raw query but it did not return it",
110 ));
111 };
112 query = mem::take(&mut q);
114 query.clear();
115 }
116 writer.write_drop_schema::<Self>(&mut query, true);
117 }
118 executor.execute(query).boxed().await.map(|_| ())
120 }
121 }
122
123 fn insert_one(
127 executor: &mut impl Executor,
128 entity: &impl Entity,
129 ) -> impl Future<Output = Result<RowsAffected>> + Send {
130 let mut query = String::with_capacity(128);
131 executor
132 .driver()
133 .sql_writer()
134 .write_insert(&mut query, [entity], false);
135 executor.execute(query)
136 }
137
138 fn insert_many<'a, It>(
142 executor: &mut impl Executor,
143 items: It,
144 ) -> impl Future<Output = Result<RowsAffected>> + Send
145 where
146 Self: 'a + Sized,
147 It: IntoIterator<Item = &'a Self> + Send,
148 <It as IntoIterator>::IntoIter: Send,
149 {
150 executor.append(items)
151 }
152
153 fn prepare_find<Exec: Executor>(
157 executor: &mut Exec,
158 condition: impl Expression,
159 limit: Option<u32>,
160 ) -> impl Future<Output = Result<Query<Exec::Driver>>> {
161 Self::table().prepare(executor, Self::columns(), condition, limit)
162 }
163
164 fn find_pk(
168 executor: &mut impl Executor,
169 primary_key: &Self::PrimaryKey<'_>,
170 ) -> impl Future<Output = Result<Option<Self>>> + Send
171 where
172 Self: Sized;
173
174 fn find_one(
178 executor: &mut impl Executor,
179 condition: impl Expression,
180 ) -> impl Future<Output = Result<Option<Self>>> + Send
181 where
182 Self: Sized,
183 {
184 let stream = Self::find_many(executor, condition, Some(1));
185 async move { pin!(stream).into_future().map(|(v, _)| v).await.transpose() }
186 }
187
188 fn find_many(
193 executor: &mut impl Executor,
194 condition: impl Expression,
195 limit: Option<u32>,
196 ) -> impl Stream<Item = Result<Self>> + Send
197 where
198 Self: Sized,
199 {
200 Self::table()
201 .select(
202 executor,
203 Self::columns()
204 .iter()
205 .map(|c| &c.column_ref as &dyn Expression),
206 condition,
207 limit,
208 )
209 .map(|result| result.and_then(Self::from_row))
210 }
211
212 fn delete_one(
216 executor: &mut impl Executor,
217 primary_key: Self::PrimaryKey<'_>,
218 ) -> impl Future<Output = Result<RowsAffected>> + Send
219 where
220 Self: Sized;
221
222 fn delete_many(
226 executor: &mut impl Executor,
227 condition: impl Expression,
228 ) -> impl Future<Output = Result<RowsAffected>> + Send
229 where
230 Self: Sized,
231 {
232 let mut query = String::with_capacity(128);
233 executor
234 .driver()
235 .sql_writer()
236 .write_delete::<Self>(&mut query, condition);
237 executor.execute(query)
238 }
239
240 fn save(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
246 where
247 Self: Sized,
248 {
249 if Self::primary_key_def().len() == 0 {
250 let error = Error::msg(
251 "Cannot save an entity without a primary key, it would always result in an insert",
252 );
253 log::error!("{:#}", error);
254 return Either::Left(future::ready(Err(error)));
255 }
256 let mut query = String::with_capacity(512);
257 executor
258 .driver()
259 .sql_writer()
260 .write_insert(&mut query, [self], true);
261 let context = format!("While saving using the query: {}", truncate_long!(query));
262 Either::Right(executor.execute(query).map(|mut v| {
263 if let Ok(result) = v
264 && let Some(affected) = result.rows_affected
265 && affected > 2
266 {
267 v = Err(Error::msg(format!(
268 "The driver returned affected rows: {affected} (expected <= 2)"
269 )));
270 }
271 match v {
272 Ok(_) => Ok(()),
273 Err(e) => {
274 let e = e.context(context);
275 log::error!("{e:#}");
276 Err(e)
277 }
278 }
279 }))
280 }
281
282 fn delete(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
289 where
290 Self: Sized,
291 {
292 if Self::primary_key_def().len() == 0 {
293 let error = Error::msg(
294 "Cannot delete an entity without a primary key, it would delete nothing",
295 );
296 log::error!("{:#}", error);
297 return Either::Left(future::ready(Err(error)));
298 }
299 Either::Right(Self::delete_one(executor, self.primary_key()).map(|v| {
300 v.and_then(|v| {
301 if let Some(affected) = v.rows_affected {
302 if affected != 1 {
303 let error = Error::msg(format!(
304 "The query deleted {affected} rows instead of the expected 1"
305 ));
306 log::log!(
307 if affected == 0 {
308 Level::Info
309 } else {
310 Level::Error
311 },
312 "{error}",
313 );
314 return Err(error);
315 }
316 }
317 Ok(())
318 })
319 }))
320 }
321}
322
323impl<E: Entity> DataSet for E {
324 fn qualified_columns() -> bool
328 where
329 Self: Sized,
330 {
331 false
332 }
333
334 fn write_query(&self, writer: &dyn SqlWriter, context: &mut Context, out: &mut String) {
336 Self::table().write_query(writer, context, out);
337 }
338}