Skip to main content

tank_core/
entity.rs

1use crate::{
2    ColumnDef, Context, Dataset, Driver, DynQuery, Error, Executor, Expression, Query,
3    QueryBuilder, RawQuery, Result, Row, RowLabeled, RowsAffected, TableRef, Value, future::Either,
4    stream::Stream, truncate_long, writer::SqlWriter,
5};
6use futures::{FutureExt, StreamExt};
7use log::Level;
8use std::{
9    future::{self, Future},
10    pin::pin,
11    sync::Arc,
12};
13
14/// A table-mapped record with schema and CRUD helpers.
15pub trait Entity {
16    /// Primary key type. Tuple of the types of the fields forming the primary key.
17    type PrimaryKey<'a>
18    where
19        Self: 'a;
20
21    /// Returns the table reference backing this entity.
22    fn table() -> &'static TableRef;
23
24    /// Returns all declared column definitions in declaration order.
25    fn columns() -> &'static [ColumnDef];
26
27    /// Iterator over columns forming the primary key. Empty iterator means no PK.
28    fn primary_key_def() -> &'static [&'static ColumnDef];
29
30    /// Extracts the primary key value(s) from `self`.
31    fn primary_key(&self) -> Self::PrimaryKey<'_>;
32
33    fn primary_key_expr(&self) -> impl Expression;
34
35    /// Returns an iterator over unique constraint definitions.
36    fn unique_defs()
37    -> impl ExactSizeIterator<Item = impl ExactSizeIterator<Item = &'static ColumnDef>>;
38
39    /// Returns a filtered mapping of column name to value, typically excluding
40    /// auto-generated or default-only columns.
41    fn row_filtered(&self) -> Box<[(&'static str, Value)]>;
42
43    /// Returns a full `Row` representation including all persisted columns.
44    fn row_full(&self) -> Row;
45
46    fn row_labeled(&self) -> RowLabeled {
47        RowLabeled {
48            labels: Self::columns()
49                .into_iter()
50                .map(|v| v.name().to_string())
51                .collect::<Arc<[String]>>(),
52            values: self.row_full(),
53        }
54    }
55
56    /// Constructs `Self` from a labeled database row.
57    ///
58    /// Error if mandatory columns are missing or type conversion fails.
59    fn from_row(row: RowLabeled) -> Result<Self>
60    where
61        Self: Sized;
62
63    /// Creates the underlying table (and optionally schema) if requested.
64    ///
65    /// Parameters:
66    /// - `if_not_exists`: guards against existing table (if drivers support it, otherwise just create table).
67    /// - `create_schema`: attempt to create schema prior to table creation (if drivers support it).
68    fn create_table(
69        executor: &mut impl Executor,
70        if_not_exists: bool,
71        create_schema: bool,
72    ) -> impl Future<Output = Result<()>> + Send
73    where
74        Self: Sized,
75    {
76        async move {
77            let mut query = DynQuery::with_capacity(2048);
78            let writer = executor.driver().sql_writer();
79            if create_schema && !Self::table().schema.is_empty() {
80                writer.write_create_schema::<Self>(&mut query, true);
81            }
82            if !executor.accepts_multiple_statements() && !query.is_empty() {
83                let mut q = query.into_query(executor.driver());
84                executor.execute(&mut q).await?;
85                // To reuse the allocated buffer
86                query = q.into();
87                query.buffer().clear();
88            }
89            writer.write_create_table::<Self>(&mut query, if_not_exists);
90            executor.execute(query).await.map(|_| ())
91        }
92    }
93
94    /// Drops the underlying table (and optionally schema) if requested.
95    ///
96    /// Parameters:
97    /// - `if_exists`: guards against missing table (if drivers support it, otherwise just drop table).
98    /// - `drop_schema`: attempt to drop schema after table removal (if drivers support it).
99    fn drop_table(
100        executor: &mut impl Executor,
101        if_exists: bool,
102        drop_schema: bool,
103    ) -> impl Future<Output = Result<()>> + Send
104    where
105        Self: Sized,
106    {
107        async move {
108            let mut query = DynQuery::with_capacity(256);
109            let writer = executor.driver().sql_writer();
110            writer.write_drop_table::<Self>(&mut query, if_exists);
111            if drop_schema && !Self::table().schema.is_empty() {
112                if !executor.accepts_multiple_statements() {
113                    let mut q = query.into_query(executor.driver());
114                    executor.execute(&mut q).await?;
115                    // To reuse the allocated buffer
116                    query = q.into();
117                    query.buffer().clear();
118                }
119                writer.write_drop_schema::<Self>(&mut query, true);
120            }
121            executor.execute(query).await.map(|_| ())
122        }
123    }
124
125    /// Inserts a single entity row.
126    ///
127    /// Returns rows affected (expected: 1 on success).
128    fn insert_one(
129        executor: &mut impl Executor,
130        entity: &impl Entity,
131    ) -> impl Future<Output = Result<RowsAffected>> + Send {
132        let mut query = DynQuery::with_capacity(128);
133        executor
134            .driver()
135            .sql_writer()
136            .write_insert(&mut query, [entity], false);
137        executor.execute(query)
138    }
139
140    /// Multiple insert for a homogeneous iterator of entities.
141    ///
142    /// Returns the number of rows inserted.
143    fn insert_many<'a, It>(
144        executor: &mut impl Executor,
145        items: It,
146    ) -> impl Future<Output = Result<RowsAffected>> + Send
147    where
148        Self: Sized + 'a,
149        It: IntoIterator<Item = &'a Self> + Send,
150        <It as IntoIterator>::IntoIter: Send,
151    {
152        executor.append(items)
153    }
154
155    /// Prepare (but do not yet run) a SQL select query.
156    ///
157    /// Returns the prepared statement.
158    fn prepare_find<Exec: Executor>(
159        executor: &mut Exec,
160        condition: impl Expression,
161        limit: Option<u32>,
162    ) -> impl Future<Output = Result<Query<Exec::Driver>>> {
163        let builder = QueryBuilder::new()
164            .select(Self::columns())
165            .from(Self::table())
166            .where_expr(condition)
167            .limit(limit);
168        let writer = executor.driver().sql_writer();
169        let mut query = DynQuery::default();
170        writer.write_select(&mut query, &builder);
171        async {
172            if let DynQuery::Raw(RawQuery(sql)) = query {
173                executor.prepare(sql).await
174            } else {
175                Ok(query.into())
176            }
177        }
178    }
179
180    /// Finds the first entity matching a condition expression.
181    ///
182    /// Returns `Ok(None)` if no row matches.
183    fn find_one(
184        executor: &mut impl Executor,
185        condition: impl Expression,
186    ) -> impl Future<Output = Result<Option<Self>>> + Send
187    where
188        Self: Sized,
189    {
190        let stream = Self::find_many(executor, condition, Some(1));
191        async move { pin!(stream).into_future().map(|(v, _)| v).await.transpose() }
192    }
193
194    /// Streams entities matching a condition.
195    ///
196    /// `limit` restricts the maximum number of rows returned at a database level if `Some`
197    /// (if supported by the driver, unlimited otherwise).
198    fn find_many(
199        executor: &mut impl Executor,
200        condition: impl Expression,
201        limit: Option<u32>,
202    ) -> impl Stream<Item = Result<Self>> + Send
203    where
204        Self: Sized,
205    {
206        let builder = QueryBuilder::new()
207            .select(Self::columns())
208            .from(Self::table())
209            .where_expr(condition)
210            .limit(limit);
211        executor
212            .fetch(builder.build(&executor.driver()))
213            .map(|result| result.and_then(Self::from_row))
214    }
215
216    /// Deletes all entities matching a condition.
217    ///
218    /// Returns the number of deleted rows.
219    fn delete_many(
220        executor: &mut impl Executor,
221        condition: impl Expression,
222    ) -> impl Future<Output = Result<RowsAffected>> + Send
223    where
224        Self: Sized,
225    {
226        let mut query = DynQuery::with_capacity(128);
227        executor
228            .driver()
229            .sql_writer()
230            .write_delete::<Self>(&mut query, condition);
231        executor.execute(query)
232    }
233
234    /// Saves the entity (insert or update if available) based on primary key presence.
235    ///
236    /// Errors:
237    /// - Missing PK in the table.
238    /// - Execution failures from underlying driver.
239    fn save(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
240    where
241        Self: Sized,
242    {
243        if Self::primary_key_def().len() == 0 {
244            let error = Error::msg(
245                "Cannot save an entity without a primary key, it would always result in an insert",
246            );
247            log::error!("{:#}", error);
248            return Either::Left(future::ready(Err(error)));
249        }
250        let mut query = DynQuery::with_capacity(512);
251        executor
252            .driver()
253            .sql_writer()
254            .write_insert(&mut query, [self], true);
255        let sql = query.as_str();
256        let context = format!("While saving using the query {}", truncate_long!(sql));
257        Either::Right(executor.execute(query).map(|mut v| {
258            if let Ok(result) = v
259                && let Some(affected) = result.rows_affected
260                && affected > 2
261            {
262                v = Err(Error::msg(format!(
263                    "The driver returned affected rows: {affected} (expected <= 2)"
264                )));
265            }
266            match v {
267                Ok(_) => Ok(()),
268                Err(e) => {
269                    let e = e.context(context);
270                    log::error!("{e:#}");
271                    Err(e)
272                }
273            }
274        }))
275    }
276
277    /// Deletes this entity instance via its primary key.
278    ///
279    /// Errors:
280    /// - Missing PK in the table.
281    /// - If not exactly one row was deleted.
282    /// - Execution failures from underlying driver.
283    fn delete(&self, executor: &mut impl Executor) -> impl Future<Output = Result<()>> + Send
284    where
285        Self: Sized,
286    {
287        if Self::primary_key_def().len() == 0 {
288            let error = Error::msg(
289                "Cannot delete an entity without a primary key, it would delete nothing",
290            );
291            log::error!("{:#}", error);
292            return Either::Left(future::ready(Err(error)));
293        }
294        Either::Right(
295            Self::delete_many(executor, self.primary_key_expr()).map(|v| {
296                v.and_then(|v| {
297                    if let Some(affected) = v.rows_affected {
298                        if affected != 1 {
299                            let error = Error::msg(format!(
300                                "The query deleted {affected} rows instead of the expected 1"
301                            ));
302                            log::log!(
303                                if affected == 0 {
304                                    Level::Info
305                                } else {
306                                    Level::Error
307                                },
308                                "{error}",
309                            );
310                            return Err(error);
311                        }
312                    }
313                    Ok(())
314                })
315            }),
316        )
317    }
318}
319
320impl<E: Entity> Dataset for E {
321    /// Indicates whether column names should be fully qualified with schema and table name.
322    ///
323    /// For entities this returns `false` to keep queries concise, for joins it returns `true`.
324    fn qualified_columns() -> bool
325    where
326        Self: Sized,
327    {
328        false
329    }
330
331    /// Writes the table reference into the out string.
332    fn write_query(&self, writer: &dyn SqlWriter, context: &mut Context, out: &mut DynQuery) {
333        Self::table().write_query(writer, context, out);
334    }
335
336    fn table_ref(&self) -> TableRef {
337        Self::table().clone()
338    }
339}