tank_core/
entity.rs

1use crate::{
2    ColumnDef, Context, DataSet, Driver, DynQuery, Error, Executor, Expression, Query,
3    QueryBuilder, Result, Row, RowLabeled, RowsAffected, TableRef, Value, future::Either,
4    stream::Stream, truncate_long, writer::SqlWriter,
5};
6use futures::{FutureExt, StreamExt};
7use log::Level;
8use std::{
9    future::{self, Future},
10    pin::pin,
11};
12
13/// A table-mapped record with schema and CRUD helpers.
14pub trait Entity {
15    /// Primary key type. Tuple of the types of the fields forming the primary key.
16    type PrimaryKey<'a>;
17
18    /// Returns the table reference backing this entity.
19    fn table() -> &'static TableRef;
20
21    /// Returns all declared column definitions in declaration order.
22    fn columns() -> &'static [ColumnDef];
23
24    /// Iterator over columns forming the primary key. Empty iterator means no PK.
25    fn primary_key_def() -> &'static [&'static ColumnDef];
26
27    /// Extracts the primary key value(s) from `self`.
28    fn primary_key(&self) -> Self::PrimaryKey<'_>;
29
30    /// Returns an iterator over unique constraint definitions.
31    fn unique_defs()
32    -> impl ExactSizeIterator<Item = impl ExactSizeIterator<Item = &'static ColumnDef>>;
33
34    /// Returns a filtered mapping of column name to value, typically excluding
35    /// auto-generated or default-only columns.
36    fn row_filtered(&self) -> Box<[(&'static str, Value)]>;
37
38    /// Returns a full `Row` representation including all persisted columns.
39    fn row_full(&self) -> Row;
40
41    /// Constructs `Self` from a labeled database row.
42    ///
43    /// Error if mandatory columns are missing or type conversion fails.
44    fn from_row(row: RowLabeled) -> Result<Self>
45    where
46        Self: Sized;
47
48    /// Creates the underlying table (and optionally schema) if requested.
49    ///
50    /// Parameters:
51    /// - `if_not_exists`: guards against existing table (if drivers support it, otherwise just create table).
52    /// - `create_schema`: attempt to create schema prior to table creation (if drivers support it).
53    fn create_table(
54        executor: &mut impl Executor,
55        if_not_exists: bool,
56        create_schema: bool,
57    ) -> impl Future<Output = Result<()>> + Send
58    where
59        Self: Sized,
60    {
61        async move {
62            let mut query = DynQuery::with_capacity(2048);
63            let writer = executor.driver().sql_writer();
64            if create_schema && !Self::table().schema.is_empty() {
65                writer.write_create_schema::<Self>(&mut query, true);
66            }
67            if !executor.accepts_multiple_statements() && !query.is_empty() {
68                let mut q = query.into_query(executor.driver());
69                executor.execute(&mut q).boxed().await?;
70                // To reuse the allocated buffer
71                query = q.into();
72                query.buffer().clear();
73            }
74            writer.write_create_table::<Self>(&mut query, if_not_exists);
75            // TODO: Remove boxed() once https://github.com/rust-lang/rust/issues/100013 is fixed
76            executor.execute(query).boxed().await.map(|_| ())
77        }
78    }
79
80    /// Drops the underlying table (and optionally schema) if requested.
81    ///
82    /// Parameters:
83    /// - `if_exists`: guards against missing table (if drivers support it, otherwise just drop table).
84    /// - `drop_schema`: attempt to drop schema after table removal (if drivers support it).
85    fn drop_table(
86        executor: &mut impl Executor,
87        if_exists: bool,
88        drop_schema: bool,
89    ) -> impl Future<Output = Result<()>> + Send
90    where
91        Self: Sized,
92    {
93        async move {
94            let mut query = DynQuery::with_capacity(256);
95            let writer = executor.driver().sql_writer();
96            writer.write_drop_table::<Self>(&mut query, if_exists);
97            if drop_schema && !Self::table().schema.is_empty() {
98                if !executor.accepts_multiple_statements() {
99                    let mut q = query.into_query(executor.driver());
100                    executor.execute(&mut q).boxed().await?;
101                    // To reuse the allocated buffer
102                    query = q.into();
103                    query.buffer().clear();
104                }
105                writer.write_drop_schema::<Self>(&mut query, true);
106            }
107            // TODO: Remove boxed() once https://github.com/rust-lang/rust/issues/100013 is fixed
108            executor.execute(query).boxed().await.map(|_| ())
109        }
110    }
111
112    /// Inserts a single entity row.
113    ///
114    /// Returns rows affected (expected: 1 on success).
115    fn insert_one(
116        executor: &mut impl Executor,
117        entity: &impl Entity,
118    ) -> impl Future<Output = Result<RowsAffected>> + Send {
119        let mut query = DynQuery::with_capacity(128);
120        executor
121            .driver()
122            .sql_writer()
123            .write_insert(&mut query, [entity], false);
124        executor.execute(query)
125    }
126
127    /// Multiple insert for a homogeneous iterator of entities.
128    ///
129    /// Returns the number of rows inserted.
130    fn insert_many<'a, It>(
131        executor: &mut impl Executor,
132        items: It,
133    ) -> impl Future<Output = Result<RowsAffected>> + Send
134    where
135        Self: Sized + 'a,
136        It: IntoIterator<Item = &'a Self> + Send,
137        <It as IntoIterator>::IntoIter: Send,
138    {
139        executor.append(items)
140    }
141
142    /// Prepare (but do not yet run) a SQL select query.
143    ///
144    /// Returns the prepared statement.
145    fn prepare_find<Exec: Executor>(
146        executor: &mut Exec,
147        condition: impl Expression,
148        limit: Option<u32>,
149    ) -> impl Future<Output = Result<Query<Exec::Driver>>> {
150        let builder = QueryBuilder::new()
151            .select(Self::columns())
152            .from(Self::table())
153            .where_condition(condition)
154            .limit(limit);
155        let writer = executor.driver().sql_writer();
156        let mut query = DynQuery::default();
157        writer.write_select(&mut query, &builder);
158        executor.prepare(query.into_buffer())
159    }
160
161    /// Finds an entity by primary key.
162    ///
163    /// Returns `Ok(None)` if no row matches.
164    fn find_pk(
165        executor: &mut impl Executor,
166        primary_key: &Self::PrimaryKey<'_>,
167    ) -> impl Future<Output = Result<Option<Self>>> + Send
168    where
169        Self: Sized;
170
171    /// Finds the first entity matching a condition expression.
172    ///
173    /// Returns `Ok(None)` if no row matches.
174    fn find_one(
175        executor: &mut impl Executor,
176        condition: impl Expression,
177    ) -> impl Future<Output = Result<Option<Self>>> + Send
178    where
179        Self: Sized,
180    {
181        let stream = Self::find_many(executor, condition, Some(1));
182        async move { pin!(stream).into_future().map(|(v, _)| v).await.transpose() }
183    }
184
185    /// Streams entities matching a condition.
186    ///
187    /// `limit` restricts the maximum number of rows returned at a database level if `Some`
188    /// (if supported by the driver, unlimited otherwise).
189    fn find_many(
190        executor: &mut impl Executor,
191        condition: impl Expression,
192        limit: Option<u32>,
193    ) -> impl Stream<Item = Result<Self>> + Send
194    where
195        Self: Sized,
196    {
197        let builder = QueryBuilder::new()
198            .select(Self::columns())
199            .from(Self::table())
200            .where_condition(condition)
201            .limit(limit);
202        executor
203            .fetch(builder.build(&executor.driver()))
204            .map(|result| result.and_then(Self::from_row))
205    }
206
207    /// Deletes exactly one entity by primary key.
208    ///
209    /// Returns rows affected (0 if not found).
210    fn delete_one(
211        executor: &mut impl Executor,
212        primary_key: Self::PrimaryKey<'_>,
213    ) -> impl Future<Output = Result<RowsAffected>> + Send
214    where
215        Self: Sized;
216
217    /// Deletes all entities matching a condition.
218    ///
219    /// Returns the number of rows deleted.
220    fn delete_many(
221        executor: &mut impl Executor,
222        condition: impl Expression,
223    ) -> impl Future<Output = Result<RowsAffected>> + Send
224    where
225        Self: Sized,
226    {
227        let mut query = DynQuery::with_capacity(128);
228        executor
229            .driver()
230            .sql_writer()
231            .write_delete::<Self>(&mut query, condition);
232        executor.execute(query)
233    }
234
235    /// Saves the entity (insert or update if available) based on primary key presence.
236    ///
237    /// Errors:
238    /// - Missing PK in the table.
239    /// - Execution failures from underlying driver.
240    fn save(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
241    where
242        Self: Sized,
243    {
244        if Self::primary_key_def().len() == 0 {
245            let error = Error::msg(
246                "Cannot save an entity without a primary key, it would always result in an insert",
247            );
248            log::error!("{:#}", error);
249            return Either::Left(future::ready(Err(error)));
250        }
251        let mut query = DynQuery::with_capacity(512);
252        executor
253            .driver()
254            .sql_writer()
255            .write_insert(&mut query, [self], true);
256        let sql = query.as_str();
257        let context = format!("While saving using the query: {}", truncate_long!(sql));
258        Either::Right(executor.execute(query).map(|mut v| {
259            if let Ok(result) = v
260                && let Some(affected) = result.rows_affected
261                && affected > 2
262            {
263                v = Err(Error::msg(format!(
264                    "The driver returned affected rows: {affected} (expected <= 2)"
265                )));
266            }
267            match v {
268                Ok(_) => Ok(()),
269                Err(e) => {
270                    let e = e.context(context);
271                    log::error!("{e:#}");
272                    Err(e)
273                }
274            }
275        }))
276    }
277
278    /// Deletes this entity instance via its primary key.
279    ///
280    /// Errors:
281    /// - Missing PK in the table.
282    /// - If not exactly one row was deleted.
283    /// - Execution failures from underlying driver.
284    fn delete(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
285    where
286        Self: Sized,
287    {
288        if Self::primary_key_def().len() == 0 {
289            let error = Error::msg(
290                "Cannot delete an entity without a primary key, it would delete nothing",
291            );
292            log::error!("{:#}", error);
293            return Either::Left(future::ready(Err(error)));
294        }
295        Either::Right(Self::delete_one(executor, self.primary_key()).map(|v| {
296            v.and_then(|v| {
297                if let Some(affected) = v.rows_affected {
298                    if affected != 1 {
299                        let error = Error::msg(format!(
300                            "The query deleted {affected} rows instead of the expected 1"
301                        ));
302                        log::log!(
303                            if affected == 0 {
304                                Level::Info
305                            } else {
306                                Level::Error
307                            },
308                            "{error}",
309                        );
310                        return Err(error);
311                    }
312                }
313                Ok(())
314            })
315        }))
316    }
317}
318
319impl<E: Entity> DataSet for E {
320    /// Indicates whether column names should be fully qualified with schema and table name.
321    ///
322    /// For entities this returns `false` to keep queries concise, for joins it returns `true`.
323    fn qualified_columns() -> bool
324    where
325        Self: Sized,
326    {
327        false
328    }
329
330    /// Writes the table reference into the out string.
331    fn write_query(&self, writer: &dyn SqlWriter, context: &mut Context, out: &mut DynQuery) {
332        Self::table().write_query(writer, context, out);
333    }
334
335    fn table_ref(&self) -> TableRef {
336        Self::table().clone()
337    }
338}