Skip to main content

tank_core/
entity.rs

1use crate::{
2    ColumnDef, Context, Dataset, Driver, DynQuery, Error, Executor, Expression, Query,
3    QueryBuilder, RawQuery, Result, Row, RowLabeled, RowsAffected, TableRef, Value, future::Either,
4    stream::Stream, truncate_long, writer::SqlWriter,
5};
6use futures::{FutureExt, StreamExt};
7use log::Level;
8use std::{
9    future::{self, Future},
10    pin::pin,
11    sync::Arc,
12};
13
14/// Database entity mapping.
15///
16/// Use `#[derive(Entity)]` to implement this trait.
17pub trait Entity {
18    /// Primary key type. A tuple of field types (or single type) forming the PK.
19    type PrimaryKey<'a>
20    where
21        Self: 'a;
22
23    /// Table reference matching the `#[tank(...)]` attributes.
24    fn table() -> &'static TableRef;
25
26    /// All column definitions in declaration order.
27    fn columns() -> &'static [ColumnDef];
28
29    /// Primary key column definitions. Empty if no PK defined.
30    fn primary_key_def() -> &'static [&'static ColumnDef];
31
32    /// Extract PK value(s) from `self`.
33    fn primary_key(&self) -> Self::PrimaryKey<'_>;
34
35    /// Build an expression matching the PK of `self`.
36    fn primary_key_expr(&self) -> impl Expression;
37
38    /// Unique constraint definitions.
39    fn unique_defs()
40    -> impl ExactSizeIterator<Item = impl ExactSizeIterator<Item = &'static ColumnDef>>;
41
42    /// Column name-value pairs for persistence (excludes ignored/default fields).
43    fn row_filtered(&self) -> Box<[(&'static str, Value)]>;
44
45    /// Full row representation including all persisted columns.
46    fn row_full(&self) -> Row;
47
48    /// Full row representation with column labels.
49    fn row_labeled(&self) -> RowLabeled {
50        RowLabeled {
51            labels: Self::columns()
52                .into_iter()
53                .map(|v| v.name().to_string())
54                .collect::<Arc<[String]>>(),
55            values: self.row_full(),
56        }
57    }
58
59    /// Reconstruct `Self` from a labeled row.
60    ///
61    /// Fails if columns are missing or type conversion fails.
62    fn from_row(row: RowLabeled) -> Result<Self>
63    where
64        Self: Sized;
65
66    /// Create table (and optional schema).
67    ///
68    /// - `if_not_exists`: Emits `IF NOT EXISTS` if supported.
69    /// - `create_schema`: Attempts schema creation first.
70    fn create_table(
71        executor: &mut impl Executor,
72        if_not_exists: bool,
73        create_schema: bool,
74    ) -> impl Future<Output = Result<()>> + Send
75    where
76        Self: Sized,
77    {
78        async move {
79            let mut query = DynQuery::with_capacity(2048);
80            let writer = executor.driver().sql_writer();
81            if create_schema && !Self::table().schema.is_empty() {
82                writer.write_create_schema::<Self>(&mut query, true);
83            }
84            if !executor.accepts_multiple_statements() && !query.is_empty() {
85                let mut q = query.into_query(executor.driver());
86                executor.execute(&mut q).await?;
87                // To reuse the allocated buffer
88                query = q.into();
89                query.buffer().clear();
90            }
91            writer.write_create_table::<Self>(&mut query, if_not_exists);
92            executor.execute(query).await.map(|_| ())
93        }
94    }
95
96    /// Drop the table (and optional schema).
97    ///
98    /// - `if_exists`: Emits `IF EXISTS` if supported.
99    /// - `drop_schema`: Drops schema *after* table removal (if empty).
100    fn drop_table(
101        executor: &mut impl Executor,
102        if_exists: bool,
103        drop_schema: bool,
104    ) -> impl Future<Output = Result<()>> + Send
105    where
106        Self: Sized,
107    {
108        async move {
109            let mut query = DynQuery::with_capacity(256);
110            let writer = executor.driver().sql_writer();
111            writer.write_drop_table::<Self>(&mut query, if_exists);
112            if drop_schema && !Self::table().schema.is_empty() {
113                if !executor.accepts_multiple_statements() {
114                    let mut q = query.into_query(executor.driver());
115                    executor.execute(&mut q).await?;
116                    // To reuse the allocated buffer
117                    query = q.into();
118                    query.buffer().clear();
119                }
120                writer.write_drop_schema::<Self>(&mut query, true);
121            }
122            executor.execute(query).await.map(|_| ())
123        }
124    }
125
126    /// Insert a single entity.
127    fn insert_one(
128        executor: &mut impl Executor,
129        entity: &impl Entity,
130    ) -> impl Future<Output = Result<RowsAffected>> + Send {
131        let mut query = DynQuery::with_capacity(128);
132        executor
133            .driver()
134            .sql_writer()
135            .write_insert(&mut query, [entity], false);
136        executor.execute(query)
137    }
138
139    /// Bulk insert entities.
140    fn insert_many<'a, It>(
141        executor: &mut impl Executor,
142        items: It,
143    ) -> impl Future<Output = Result<RowsAffected>> + Send
144    where
145        Self: Sized + 'a,
146        It: IntoIterator<Item = &'a Self> + Send,
147        <It as IntoIterator>::IntoIter: Send,
148    {
149        executor.append(items)
150    }
151
152    /// Prepare (but do not yet run) a SQL select query.
153    ///
154    /// Returns the prepared statement.
155    fn prepare_find<Exec: Executor>(
156        executor: &mut Exec,
157        condition: impl Expression,
158        limit: Option<u32>,
159    ) -> impl Future<Output = Result<Query<Exec::Driver>>> {
160        let builder = QueryBuilder::new()
161            .select(Self::columns())
162            .from(Self::table())
163            .where_expr(condition)
164            .limit(limit);
165        let writer = executor.driver().sql_writer();
166        let mut query = DynQuery::default();
167        writer.write_select(&mut query, &builder);
168        async {
169            if let DynQuery::Raw(RawQuery(sql)) = query {
170                executor.prepare(sql).await
171            } else {
172                Ok(query.into())
173            }
174        }
175    }
176
177    /// Finds the first entity matching a condition expression.
178    ///
179    /// Returns `Ok(None)` if no row matches.
180    fn find_one(
181        executor: &mut impl Executor,
182        condition: impl Expression,
183    ) -> impl Future<Output = Result<Option<Self>>> + Send
184    where
185        Self: Sized,
186    {
187        let stream = Self::find_many(executor, condition, Some(1));
188        async move { pin!(stream).into_future().map(|(v, _)| v).await.transpose() }
189    }
190
191    /// Streams entities matching a condition.
192    ///
193    /// `limit` restricts the maximum number of rows returned at a database level if `Some`
194    /// (if supported by the driver, unlimited otherwise).
195    fn find_many(
196        executor: &mut impl Executor,
197        condition: impl Expression,
198        limit: Option<u32>,
199    ) -> impl Stream<Item = Result<Self>> + Send
200    where
201        Self: Sized,
202    {
203        let builder = QueryBuilder::new()
204            .select(Self::columns())
205            .from(Self::table())
206            .where_expr(condition)
207            .limit(limit);
208        executor
209            .fetch(builder.build(&executor.driver()))
210            .map(|result| result.and_then(Self::from_row))
211    }
212
213    /// Deletes all entities matching a condition.
214    ///
215    /// Returns the number of deleted rows.
216    fn delete_many(
217        executor: &mut impl Executor,
218        condition: impl Expression,
219    ) -> impl Future<Output = Result<RowsAffected>> + Send
220    where
221        Self: Sized,
222    {
223        let mut query = DynQuery::with_capacity(128);
224        executor
225            .driver()
226            .sql_writer()
227            .write_delete::<Self>(&mut query, condition);
228        executor.execute(query)
229    }
230
231    /// Saves the entity (insert or update if available) based on primary key presence.
232    ///
233    /// Errors:
234    /// - Missing PK in the table.
235    /// - Execution failures from underlying driver.
236    fn save(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
237    where
238        Self: Sized,
239    {
240        if Self::primary_key_def().len() == 0 {
241            let error = Error::msg(
242                "Cannot save an entity without a primary key, it would always result in an insert",
243            );
244            log::error!("{:#}", error);
245            return Either::Left(future::ready(Err(error)));
246        }
247        let mut query = DynQuery::with_capacity(512);
248        executor
249            .driver()
250            .sql_writer()
251            .write_insert(&mut query, [self], true);
252        let sql = query.as_str();
253        let context = format!("While saving using the query {}", truncate_long!(sql));
254        Either::Right(executor.execute(query).map(|mut v| {
255            if let Ok(result) = v
256                && let Some(affected) = result.rows_affected
257                && affected > 2
258            {
259                v = Err(Error::msg(format!(
260                    "The driver returned affected rows: {affected} (expected <= 2)"
261                )));
262            }
263            match v {
264                Ok(_) => Ok(()),
265                Err(e) => {
266                    let e = e.context(context);
267                    Err(e)
268                }
269            }
270        }))
271    }
272
273    /// Deletes this entity instance via its primary key.
274    ///
275    /// Errors:
276    /// - Missing PK in the table.
277    /// - If not exactly one row was deleted.
278    /// - Execution failures from underlying driver.
279    fn delete(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
280    where
281        Self: Sized,
282    {
283        if Self::primary_key_def().len() == 0 {
284            let error = Error::msg(
285                "Cannot delete an entity without a primary key, it would delete nothing",
286            );
287            log::error!("{:#}", error);
288            return Either::Left(future::ready(Err(error)));
289        }
290        Either::Right(
291            Self::delete_many(executor, self.primary_key_expr()).map(|v| {
292                v.and_then(|v| {
293                    if let Some(affected) = v.rows_affected {
294                        if affected != 1 {
295                            let error = Error::msg(format!(
296                                "The query deleted {affected} rows instead of the expected 1"
297                            ));
298                            log::log!(
299                                if affected == 0 {
300                                    Level::Info
301                                } else {
302                                    Level::Error
303                                },
304                                "{error}",
305                            );
306                            return Err(error);
307                        }
308                    }
309                    Ok(())
310                })
311            }),
312        )
313    }
314}
315
316impl<E: Entity> Dataset for E {
317    /// Indicates whether column names should be fully qualified with schema and table name.
318    ///
319    /// For entities this returns `false` to keep queries concise, for joins it returns `true`.
320    fn qualified_columns() -> bool
321    where
322        Self: Sized,
323    {
324        false
325    }
326
327    /// Writes the table reference into the out string.
328    fn write_query(&self, writer: &dyn SqlWriter, context: &mut Context, out: &mut DynQuery) {
329        Self::table().write_query(writer, context, out);
330    }
331
332    fn table_ref(&self) -> TableRef {
333        Self::table().clone()
334    }
335}