Skip to main content

tank_core/
entity.rs

1use crate::{
2    ColumnDef, Context, Dataset, Driver, DynQuery, Error, Executor, Expression, Query,
3    QueryBuilder, RawQuery, Result, Row, RowValues, RowsAffected, TableRef, future::Either,
4    stream::Stream, truncate_long, writer::SqlWriter,
5};
6use futures::{FutureExt, StreamExt};
7use log::Level;
8use std::{
9    future::{self, Future},
10    pin::pin,
11    sync::Arc,
12};
13
14/// Database entity mapping.
15///
16/// Use `#[derive(Entity)]` to implement this trait.
17pub trait Entity {
18    /// Primary key type. A tuple of field types (or single type) forming the PK.
19    type PrimaryKey<'a>
20    where
21        Self: 'a;
22
23    /// Table reference matching the `#[tank(...)]` attributes.
24    fn table() -> &'static TableRef;
25
26    /// All column definitions in declaration order.
27    fn columns() -> &'static [ColumnDef];
28
29    /// Primary key column definitions. Empty if no PK defined.
30    fn primary_key_def() -> &'static [&'static ColumnDef];
31
32    /// Extract PK value(s) from `self`.
33    fn primary_key(&self) -> Self::PrimaryKey<'_>;
34
35    /// Build an expression matching the PK of `self`.
36    fn primary_key_expr(&self) -> impl Expression;
37
38    /// Unique constraint definitions.
39    fn unique_defs()
40    -> impl ExactSizeIterator<Item = impl ExactSizeIterator<Item = &'static ColumnDef>>;
41
42    /// Full row representation including all persisted columns.
43    fn row_values(&self) -> RowValues;
44
45    /// Full row representation with column labels.
46    fn row(&self) -> Row {
47        Row {
48            labels: Self::columns()
49                .into_iter()
50                .map(|v| v.name().to_string())
51                .collect::<Arc<[String]>>(),
52            values: self.row_values(),
53        }
54    }
55
56    /// Reconstruct `Self` from a labeled row.
57    ///
58    /// Fails if columns are missing or type conversion fails.
59    fn from_row(row: Row) -> Result<Self>
60    where
61        Self: Sized;
62
63    /// Create table (and optional schema).
64    ///
65    /// - `if_not_exists`: Emits `IF NOT EXISTS` if supported.
66    /// - `create_schema`: Attempts schema creation first.
67    fn create_table(
68        executor: &mut impl Executor,
69        if_not_exists: bool,
70        create_schema: bool,
71    ) -> impl Future<Output = Result<()>> + Send
72    where
73        Self: Sized,
74    {
75        async move {
76            let mut query = DynQuery::with_capacity(2048);
77            let writer = executor.driver().sql_writer();
78            if create_schema && !Self::table().schema.is_empty() {
79                writer.write_create_schema::<Self>(&mut query, true);
80            }
81            if !executor.accepts_multiple_statements() && !query.is_empty() {
82                let mut q = query.into_query(executor.driver());
83                executor.execute(&mut q).await?;
84                // To reuse the allocated buffer
85                query = q.into();
86                query.buffer().clear();
87            }
88            writer.write_create_table::<Self>(&mut query, if_not_exists);
89            executor.execute(query).await.map(|_| ())
90        }
91    }
92
93    /// Drop the table (and optional schema).
94    ///
95    /// - `if_exists`: Emits `IF EXISTS` if supported.
96    /// - `drop_schema`: Drops schema *after* table removal (if empty).
97    fn drop_table(
98        executor: &mut impl Executor,
99        if_exists: bool,
100        drop_schema: bool,
101    ) -> impl Future<Output = Result<()>> + Send
102    where
103        Self: Sized,
104    {
105        async move {
106            let mut query = DynQuery::with_capacity(256);
107            let writer = executor.driver().sql_writer();
108            writer.write_drop_table::<Self>(&mut query, if_exists);
109            if drop_schema && !Self::table().schema.is_empty() {
110                if !executor.accepts_multiple_statements() {
111                    let mut q = query.into_query(executor.driver());
112                    executor.execute(&mut q).await?;
113                    // To reuse the allocated buffer
114                    query = q.into();
115                    query.buffer().clear();
116                }
117                writer.write_drop_schema::<Self>(&mut query, true);
118            }
119            executor.execute(query).await.map(|_| ())
120        }
121    }
122
123    /// Insert a single entity.
124    fn insert_one(
125        executor: &mut impl Executor,
126        entity: &impl Entity,
127    ) -> impl Future<Output = Result<RowsAffected>> + Send {
128        let mut query = DynQuery::with_capacity(128);
129        executor
130            .driver()
131            .sql_writer()
132            .write_insert(&mut query, [entity], false);
133        executor.execute(query)
134    }
135
136    /// Bulk insert entities.
137    fn insert_many<'a, It>(
138        executor: &mut impl Executor,
139        items: It,
140    ) -> impl Future<Output = Result<RowsAffected>> + Send
141    where
142        Self: Sized + 'a,
143        It: IntoIterator<Item = &'a Self> + Send,
144        <It as IntoIterator>::IntoIter: Send,
145    {
146        executor.append(items)
147    }
148
149    /// Prepare (but do not yet run) a SQL select query.
150    ///
151    /// Returns the prepared statement.
152    fn prepare_find<Exec: Executor>(
153        executor: &mut Exec,
154        condition: impl Expression,
155        limit: Option<u32>,
156    ) -> impl Future<Output = Result<Query<Exec::Driver>>> {
157        let builder = QueryBuilder::new()
158            .select(Self::columns())
159            .from(Self::table())
160            .where_expr(condition)
161            .limit(limit);
162        let writer = executor.driver().sql_writer();
163        let mut query = DynQuery::default();
164        writer.write_select(&mut query, &builder);
165        async {
166            if let DynQuery::Raw(RawQuery(sql)) = query {
167                executor.prepare(sql).await
168            } else {
169                Ok(query.into())
170            }
171        }
172    }
173
174    /// Finds the first entity matching a condition expression.
175    ///
176    /// Returns `Ok(None)` if no row matches.
177    fn find_one(
178        executor: &mut impl Executor,
179        condition: impl Expression,
180    ) -> impl Future<Output = Result<Option<Self>>> + Send
181    where
182        Self: Sized,
183    {
184        let stream = Self::find_many(executor, condition, Some(1));
185        async move { pin!(stream).into_future().map(|(v, _)| v).await.transpose() }
186    }
187
188    /// Streams entities matching a condition.
189    ///
190    /// `limit` restricts the maximum number of rows returned at a database level if `Some`
191    /// (if supported by the driver, unlimited otherwise).
192    fn find_many(
193        executor: &mut impl Executor,
194        condition: impl Expression,
195        limit: Option<u32>,
196    ) -> impl Stream<Item = Result<Self>> + Send
197    where
198        Self: Sized,
199    {
200        let builder = QueryBuilder::new()
201            .select(Self::columns())
202            .from(Self::table())
203            .where_expr(condition)
204            .limit(limit);
205        executor
206            .fetch(builder.build(&executor.driver()))
207            .map(|result| result.and_then(Self::from_row))
208    }
209
210    /// Deletes all entities matching a condition.
211    ///
212    /// Returns the number of deleted rows.
213    fn delete_many(
214        executor: &mut impl Executor,
215        condition: impl Expression,
216    ) -> impl Future<Output = Result<RowsAffected>> + Send
217    where
218        Self: Sized,
219    {
220        let mut query = DynQuery::with_capacity(128);
221        executor
222            .driver()
223            .sql_writer()
224            .write_delete::<Self>(&mut query, condition);
225        executor.execute(query)
226    }
227
228    /// Saves the entity (insert or update if available) based on primary key presence.
229    ///
230    /// Errors:
231    /// - Missing PK in the table.
232    /// - Execution failures from underlying driver.
233    fn save(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
234    where
235        Self: Sized,
236    {
237        if Self::primary_key_def().len() == 0 {
238            let error = Error::msg(
239                "Cannot save an entity without a primary key, it would always result in an insert",
240            );
241            log::error!("{:#}", error);
242            return Either::Left(future::ready(Err(error)));
243        }
244        let mut query = DynQuery::with_capacity(512);
245        executor
246            .driver()
247            .sql_writer()
248            .write_insert(&mut query, [self], true);
249        let sql = query.as_str();
250        let context = format!("While saving using the query {}", truncate_long!(sql));
251        Either::Right(executor.execute(query).map(|mut v| {
252            if let Ok(result) = v
253                && let Some(affected) = result.rows_affected
254                && affected > 2
255            {
256                v = Err(Error::msg(format!(
257                    "The driver returned affected rows: {affected} (expected <= 2)"
258                )));
259            }
260            match v {
261                Ok(_) => Ok(()),
262                Err(e) => {
263                    let e = e.context(context);
264                    Err(e)
265                }
266            }
267        }))
268    }
269
270    /// Deletes this entity instance via its primary key.
271    ///
272    /// Errors:
273    /// - Missing PK in the table.
274    /// - If not exactly one row was deleted.
275    /// - Execution failures from underlying driver.
276    fn delete(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
277    where
278        Self: Sized,
279    {
280        if Self::primary_key_def().len() == 0 {
281            let error = Error::msg(
282                "Cannot delete an entity without a primary key, it would delete nothing",
283            );
284            log::error!("{:#}", error);
285            return Either::Left(future::ready(Err(error)));
286        }
287        Either::Right(
288            Self::delete_many(executor, self.primary_key_expr()).map(|v| {
289                v.and_then(|v| {
290                    if let Some(affected) = v.rows_affected {
291                        if affected != 1 {
292                            let error = Error::msg(format!(
293                                "The query deleted {affected} rows instead of the expected 1"
294                            ));
295                            log::log!(
296                                if affected == 0 {
297                                    Level::Info
298                                } else {
299                                    Level::Error
300                                },
301                                "{error}",
302                            );
303                            return Err(error);
304                        }
305                    }
306                    Ok(())
307                })
308            }),
309        )
310    }
311}
312
313impl<E: Entity> Dataset for E {
314    /// Indicates whether column names should be fully qualified with schema and table name.
315    ///
316    /// For entities this returns `false` to keep queries concise, for joins it returns `true`.
317    fn qualified_columns() -> bool
318    where
319        Self: Sized,
320    {
321        false
322    }
323
324    /// Writes the table reference into the out string.
325    fn write_query(&self, writer: &dyn SqlWriter, context: &mut Context, out: &mut DynQuery) {
326        Self::table().write_query(writer, context, out);
327    }
328
329    fn table_ref(&self) -> TableRef {
330        Self::table().clone()
331    }
332}