1use crate::{
2 ColumnDef, Context, Dataset, Driver, DynQuery, Error, Executor, Expression, Query,
3 QueryBuilder, RawQuery, Result, Row, RowLabeled, RowsAffected, TableRef, Value, future::Either,
4 stream::Stream, truncate_long, writer::SqlWriter,
5};
6use futures::{FutureExt, StreamExt};
7use log::Level;
8use std::{
9 future::{self, Future},
10 pin::pin,
11 sync::Arc,
12};
13
14pub trait Entity {
18 type PrimaryKey<'a>
20 where
21 Self: 'a;
22
23 fn table() -> &'static TableRef;
25
26 fn columns() -> &'static [ColumnDef];
28
29 fn primary_key_def() -> &'static [&'static ColumnDef];
31
32 fn primary_key(&self) -> Self::PrimaryKey<'_>;
34
35 fn primary_key_expr(&self) -> impl Expression;
37
38 fn unique_defs()
40 -> impl ExactSizeIterator<Item = impl ExactSizeIterator<Item = &'static ColumnDef>>;
41
42 fn row_filtered(&self) -> Box<[(&'static str, Value)]>;
44
45 fn row_full(&self) -> Row;
47
48 fn row_labeled(&self) -> RowLabeled {
50 RowLabeled {
51 labels: Self::columns()
52 .into_iter()
53 .map(|v| v.name().to_string())
54 .collect::<Arc<[String]>>(),
55 values: self.row_full(),
56 }
57 }
58
59 fn from_row(row: RowLabeled) -> Result<Self>
63 where
64 Self: Sized;
65
66 fn create_table(
71 executor: &mut impl Executor,
72 if_not_exists: bool,
73 create_schema: bool,
74 ) -> impl Future<Output = Result<()>> + Send
75 where
76 Self: Sized,
77 {
78 async move {
79 let mut query = DynQuery::with_capacity(2048);
80 let writer = executor.driver().sql_writer();
81 if create_schema && !Self::table().schema.is_empty() {
82 writer.write_create_schema::<Self>(&mut query, true);
83 }
84 if !executor.accepts_multiple_statements() && !query.is_empty() {
85 let mut q = query.into_query(executor.driver());
86 executor.execute(&mut q).await?;
87 query = q.into();
89 query.buffer().clear();
90 }
91 writer.write_create_table::<Self>(&mut query, if_not_exists);
92 executor.execute(query).await.map(|_| ())
93 }
94 }
95
96 fn drop_table(
101 executor: &mut impl Executor,
102 if_exists: bool,
103 drop_schema: bool,
104 ) -> impl Future<Output = Result<()>> + Send
105 where
106 Self: Sized,
107 {
108 async move {
109 let mut query = DynQuery::with_capacity(256);
110 let writer = executor.driver().sql_writer();
111 writer.write_drop_table::<Self>(&mut query, if_exists);
112 if drop_schema && !Self::table().schema.is_empty() {
113 if !executor.accepts_multiple_statements() {
114 let mut q = query.into_query(executor.driver());
115 executor.execute(&mut q).await?;
116 query = q.into();
118 query.buffer().clear();
119 }
120 writer.write_drop_schema::<Self>(&mut query, true);
121 }
122 executor.execute(query).await.map(|_| ())
123 }
124 }
125
126 fn insert_one(
128 executor: &mut impl Executor,
129 entity: &impl Entity,
130 ) -> impl Future<Output = Result<RowsAffected>> + Send {
131 let mut query = DynQuery::with_capacity(128);
132 executor
133 .driver()
134 .sql_writer()
135 .write_insert(&mut query, [entity], false);
136 executor.execute(query)
137 }
138
139 fn insert_many<'a, It>(
141 executor: &mut impl Executor,
142 items: It,
143 ) -> impl Future<Output = Result<RowsAffected>> + Send
144 where
145 Self: Sized + 'a,
146 It: IntoIterator<Item = &'a Self> + Send,
147 <It as IntoIterator>::IntoIter: Send,
148 {
149 executor.append(items)
150 }
151
152 fn prepare_find<Exec: Executor>(
156 executor: &mut Exec,
157 condition: impl Expression,
158 limit: Option<u32>,
159 ) -> impl Future<Output = Result<Query<Exec::Driver>>> {
160 let builder = QueryBuilder::new()
161 .select(Self::columns())
162 .from(Self::table())
163 .where_expr(condition)
164 .limit(limit);
165 let writer = executor.driver().sql_writer();
166 let mut query = DynQuery::default();
167 writer.write_select(&mut query, &builder);
168 async {
169 if let DynQuery::Raw(RawQuery(sql)) = query {
170 executor.prepare(sql).await
171 } else {
172 Ok(query.into())
173 }
174 }
175 }
176
177 fn find_one(
181 executor: &mut impl Executor,
182 condition: impl Expression,
183 ) -> impl Future<Output = Result<Option<Self>>> + Send
184 where
185 Self: Sized,
186 {
187 let stream = Self::find_many(executor, condition, Some(1));
188 async move { pin!(stream).into_future().map(|(v, _)| v).await.transpose() }
189 }
190
191 fn find_many(
196 executor: &mut impl Executor,
197 condition: impl Expression,
198 limit: Option<u32>,
199 ) -> impl Stream<Item = Result<Self>> + Send
200 where
201 Self: Sized,
202 {
203 let builder = QueryBuilder::new()
204 .select(Self::columns())
205 .from(Self::table())
206 .where_expr(condition)
207 .limit(limit);
208 executor
209 .fetch(builder.build(&executor.driver()))
210 .map(|result| result.and_then(Self::from_row))
211 }
212
213 fn delete_many(
217 executor: &mut impl Executor,
218 condition: impl Expression,
219 ) -> impl Future<Output = Result<RowsAffected>> + Send
220 where
221 Self: Sized,
222 {
223 let mut query = DynQuery::with_capacity(128);
224 executor
225 .driver()
226 .sql_writer()
227 .write_delete::<Self>(&mut query, condition);
228 executor.execute(query)
229 }
230
231 fn save(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
237 where
238 Self: Sized,
239 {
240 if Self::primary_key_def().len() == 0 {
241 let error = Error::msg(
242 "Cannot save an entity without a primary key, it would always result in an insert",
243 );
244 log::error!("{:#}", error);
245 return Either::Left(future::ready(Err(error)));
246 }
247 let mut query = DynQuery::with_capacity(512);
248 executor
249 .driver()
250 .sql_writer()
251 .write_insert(&mut query, [self], true);
252 let sql = query.as_str();
253 let context = format!("While saving using the query {}", truncate_long!(sql));
254 Either::Right(executor.execute(query).map(|mut v| {
255 if let Ok(result) = v
256 && let Some(affected) = result.rows_affected
257 && affected > 2
258 {
259 v = Err(Error::msg(format!(
260 "The driver returned affected rows: {affected} (expected <= 2)"
261 )));
262 }
263 match v {
264 Ok(_) => Ok(()),
265 Err(e) => {
266 let e = e.context(context);
267 Err(e)
268 }
269 }
270 }))
271 }
272
273 fn delete(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
280 where
281 Self: Sized,
282 {
283 if Self::primary_key_def().len() == 0 {
284 let error = Error::msg(
285 "Cannot delete an entity without a primary key, it would delete nothing",
286 );
287 log::error!("{:#}", error);
288 return Either::Left(future::ready(Err(error)));
289 }
290 Either::Right(
291 Self::delete_many(executor, self.primary_key_expr()).map(|v| {
292 v.and_then(|v| {
293 if let Some(affected) = v.rows_affected {
294 if affected != 1 {
295 let error = Error::msg(format!(
296 "The query deleted {affected} rows instead of the expected 1"
297 ));
298 log::log!(
299 if affected == 0 {
300 Level::Info
301 } else {
302 Level::Error
303 },
304 "{error}",
305 );
306 return Err(error);
307 }
308 }
309 Ok(())
310 })
311 }),
312 )
313 }
314}
315
316impl<E: Entity> Dataset for E {
317 fn qualified_columns() -> bool
321 where
322 Self: Sized,
323 {
324 false
325 }
326
327 fn write_query(&self, writer: &dyn SqlWriter, context: &mut Context, out: &mut DynQuery) {
329 Self::table().write_query(writer, context, out);
330 }
331
332 fn table_ref(&self) -> TableRef {
333 Self::table().clone()
334 }
335}