wb_cache/test/simulation/db/
cache.rs

1//! Common implementation of the `DataController` methods and types for the simulation code.
2use crate::prelude::*;
3use crate::test::simulation::types::simerr;
4use crate::test::simulation::types::Result;
5use crate::test::simulation::types::SimErrorAny;
6use crate::update_iterator::UpdateIterator;
7use crate::update_iterator::UpdateIteratorItem;
8use crate::wbdc_response;
9use sea_orm::entity::Iterable;
10use sea_orm::ActiveModelTrait;
11use sea_orm::DatabaseConnection;
12use sea_orm::DeleteMany;
13use sea_orm::EntityTrait;
14use sea_orm::IntoActiveModel;
15use sea_orm::TransactionTrait;
16use sea_orm_migration::async_trait::async_trait;
17use std::fmt::Debug;
18use std::sync::Arc;
19use std::sync::Weak;
20use tracing::instrument;
21
22use super::driver::DatabaseDriver;
23
24/// The type of cache controller update pool records.
25#[derive(Debug)]
26pub enum CacheUpdates<AM>
27where
28    AM: ActiveModelTrait + Sized + Send + Sync + 'static,
29{
30    /// The active model of this variant is to be inserted into the the database.
31    Insert(AM),
32    /// Database record is to be update with this active model.
33    Update(AM),
34    /// Delete the record from the database.
35    Delete,
36}
37
38impl<AM> CacheUpdates<AM>
39where
40    AM: ActiveModelTrait + Sized + Send + Sync + 'static,
41{
42    /// Create a new ActiveModel with the same discriminant as the current one. To an external observer, it functions
43    /// as replacing a container value.
44    pub fn replace(&self, am: AM) -> CacheUpdates<AM> {
45        match self {
46            CacheUpdates::Insert(_) => CacheUpdates::Insert(am),
47            CacheUpdates::Update(_) => CacheUpdates::Update(am),
48            CacheUpdates::Delete => CacheUpdates::Delete,
49        }
50    }
51}
52
53pub trait DBProvider: Sync + Send + 'static {
54    fn db_driver(&self) -> Result<Arc<impl DatabaseDriver>>;
55    fn db_connection(&self) -> Result<DatabaseConnection>;
56}
57
58/// Common implementation of DataController methods.
59///
60/// All models in this simulation share a lot of properties. Primarily due to the use
61/// of SeaORM framework. Because of this most of the `DataController` functionality
62/// can be shared among them and this is what is this trait made for.
63///
64/// Though an attempt to provide as much information about the implementation as possible
65/// in this documentation will be made, it is recommended to look into the source code
66/// as some comments in the code require the context to be understood.
67///
68/// # Type parameters
69///
70/// | Parameter | Description |
71/// |-----------|-------------|
72/// | `T`       | The SeaORM `Entity` type that this data controller is responsible for. It must implement the [`EntityTrait`] trait.
73/// | `PARENT`  | The data controller implementing `DCCommon` is expected to be a child of an entity that implements the [`DBProvider`] trait, thereby providing a database connection.
74/// | `IMMUTABLE` | Setting this to `true` indicates that the database does not modify records when they are written to it; in particular, the primary key is configured with auto-increment disabled.
75#[async_trait]
76pub trait DCCommon<T, PARENT, const IMMUTABLE: bool = false>:
77    DataController<CacheUpdate = CacheUpdates<T::ActiveModel>, Error = SimErrorAny> + Send + Debug
78where
79    T: EntityTrait + Send + Sync + 'static,
80    T::Model: IntoActiveModel<T::ActiveModel>,
81    T::Column: Iterable,
82    T::ActiveModel: ActiveModelTrait + Send + Sync + 'static + From<Self::Value>,
83    PARENT: DBProvider,
84    Self::Value: Send + Sync + 'static,
85    Self::Key: ToString + Send + Sync + 'static,
86    Self: ::fieldx_plus::Child<
87        WeakParent = Weak<PARENT>,
88        RcParent = Result<Arc<PARENT>, Self::Error>,
89        FXPParent = Weak<PARENT>,
90    >,
91{
92    /// Provide correct condition for SeaORM's [`DeleteMany`] operation. See, for example,
93    /// [`CustomerManager::delete_many_condition`](crate::test::simulation::db::entity::customer::Manager::delete_many_condition)
94    /// source code.
95    fn delete_many_condition(dm: DeleteMany<T>, keys: Vec<Self::Key>) -> DeleteMany<T>;
96
97    /// Where we get our database connection from.
98    fn db_provider(&self) -> <Self as ::fieldx_plus::Child>::RcParent {
99        self.parent()
100    }
101
102    /// Try to send given update records to the database in the most efficient way. This task is accomplished by:
103    ///
104    /// - using a transaction to group all updates together and possibly avoid re-indexing overhead;
105    /// - sorting updates into inserts, updates, and deletes;
106    /// - batching inserts and deletes.
107    ///
108    /// Updates are performed on a per-record basis because they are, by nature, not batchable.
109    ///
110    /// To be on the safe side, the batches are limited to 1000 records each. Technically, the PostgreSQL protocol
111    /// allows for up to 65,535 records in a single batch. However, practically, even half of that was causing errors.
112    /// Considering that even with the limit of 1000 the simulation demonstrates an 80-100 times improvement over the
113    /// non-cached approach, this is considered a good trade-off.
114    #[instrument(level = "trace", skip(update_records))]
115    async fn wbdc_write_back(&self, update_records: Arc<UpdateIterator<Self>>) -> Result<(), Self::Error> {
116        let conn_provider = self.db_provider()?;
117        let db_conn = conn_provider.db_connection()?;
118
119        // The data safety of this method is ensured by the following critical implementation details:
120        //
121        // - All errors are immediately propagated.
122        // - The transaction object automatically rolls back when dropped.
123        // - Update records are not confirmed until the transaction is successfully committed.
124
125        let transaction = db_conn.begin().await?;
126        let mut inserts = vec![];
127        let mut deletes = vec![];
128
129        loop {
130            let update_item = (*update_records).next();
131            let last_loop = update_item.is_none();
132
133            // Send inserts and deletes to the DB either when done iterating over the updates or when the batch
134            // size reaches 1000 records.
135            if (last_loop && !inserts.is_empty()) || inserts.len() >= 1000 {
136                let am_list = inserts
137                    .iter()
138                    .map(|i: &UpdateIteratorItem<Self>| {
139                        let CacheUpdates::<T::ActiveModel>::Insert(am) = i.update()
140                        else {
141                            unreachable!("Expected insert update, but got: {:?}", i.update())
142                        };
143                        am
144                    })
145                    .cloned()
146                    .collect::<Vec<_>>();
147
148                T::insert_many(am_list).exec_without_returning(&transaction).await?;
149                inserts.clear();
150            }
151
152            if (last_loop && !deletes.is_empty()) || deletes.len() >= 1000 {
153                let delete_chunk = deletes
154                    .iter()
155                    .map(|i: &UpdateIteratorItem<Self>| {
156                        let CacheUpdates::<T::ActiveModel>::Delete = i.update()
157                        else {
158                            unreachable!("Expected delete update, but got: {:?}", i.update())
159                        };
160                        i.key()
161                    })
162                    .cloned()
163                    .collect::<Vec<_>>();
164                // To implement correct deletion, we delegate the responsibility of determining the filter condition to
165                // the data controller itself, because only the data controller knows the important specifics of the
166                // model, such as which keys are to be used.
167                Self::delete_many_condition(T::delete_many(), delete_chunk)
168                    .exec(&transaction)
169                    .await?;
170                deletes.clear();
171            }
172
173            if last_loop {
174                break;
175            }
176
177            let update_item = update_item.unwrap();
178            let upd = update_item.update();
179
180            match upd {
181                CacheUpdates::Insert(_) => {
182                    inserts.push(update_item);
183                }
184                CacheUpdates::Update(a) => {
185                    // Updates cannot be done all at once. Do them right in place then.
186                    let am: T::ActiveModel = a.clone();
187                    am.update(&transaction).await?;
188                }
189                CacheUpdates::Delete => deletes.push(update_item),
190            };
191        }
192
193        transaction.commit().await?;
194
195        // As long as the transaction succeeded, we can confirm all updates.
196        update_records.confirm_all();
197
198        Ok(())
199    }
200
201    /// For every new data record added this method respond with [`CacheUpdates::Insert(record_active_mode)`] update.
202    /// The [`DataControllerOp`] depends on the `IMMUTABLE` flag: if it is set to `true`, the operation is
203    /// [`DataControllerOp::Insert`], otherwise it is [`DataControllerOp::Nop`].
204    #[instrument(level = "trace", skip(value))]
205    async fn wbdbc_on_new<AM>(&self, _key: &Self::Key, value: &AM) -> Result<DataControllerResponse<Self>, Self::Error>
206    where
207        AM: Into<T::ActiveModel> + Clone + Send + Sync + 'static,
208    {
209        let op = if IMMUTABLE {
210            DataControllerOp::Insert
211        }
212        else {
213            DataControllerOp::Nop
214        };
215        Ok(wbdc_response!(op, Some(CacheUpdates::Insert(value.clone().into()))))
216    }
217
218    /// Execute record deletion.
219    ///
220    /// A special perk of this method is that when a new record is inserted into the cache, its associated update record
221    /// remains as [`CacheUpdates::Insert`] until the next flush, even if the record is later modified by the user.
222    /// This means that for `IMMUTABLE` data controllers, writing the record to the backend and then deleting it has no
223    /// side effects and can be safely collapsed into a single operation.  This is exactly what this method does: when
224    /// it finds that the previous update state of an immutable model for the key is an insert, it simply returns a
225    /// [`DataControllerOp::Drop`] operation, meaning that both data and update records
226    /// are removed without having wasted time writing to the backend.
227    #[instrument(level = "trace")]
228    async fn wbdc_on_delete(
229        &self,
230        _key: &Self::Key,
231        update: Option<&CacheUpdates<T::ActiveModel>>,
232    ) -> Result<DataControllerResponse<Self>, Self::Error> {
233        let op = if IMMUTABLE && update.is_some() && matches!(update.unwrap(), CacheUpdates::Insert(_)) {
234            // The perfect case where an insert update hasn't been written to the backend yet and can be just dropped
235            // altogether.
236            DataControllerOp::Drop
237        }
238        else {
239            // In other cases we only request for cache removal and expect the next flush to remove the data from the
240            // backend.
241            DataControllerOp::Revoke
242        };
243        Ok(wbdc_response!(op, Some(CacheUpdates::Delete)))
244    }
245
246    /// Called when a record is modified by the user.
247    ///
248    /// This method creates a "diff" active model representing the differences between the unmodified and modified
249    /// values, where only the changed fields are set. If an update record for the key already exists, it is merged with
250    /// the diff, and the resulting update record is returned.
251    ///
252    /// The new update record always maintains the same discriminant as the previous one unless no prior update exists.
253    ///
254    /// The [`DataControllerOp`] is set to [`DataControllerOp::Insert`] if the `IMMUTABLE` flag is set to `true`;
255    /// otherwise, it is [`DataControllerOp::Nop`].
256    #[instrument(level = "trace", skip(value, old_value, prev_update))]
257    async fn wbdc_on_change(
258        &self,
259        key: &Self::Key,
260        value: &Self::Value,
261        old_value: Self::Value,
262        prev_update: Option<Self::CacheUpdate>,
263    ) -> Result<DataControllerResponse<Self>, Self::Error> {
264        // We use the previous update state as the base when it exists. Otherwise the previous cached value would
265        // implement this role.
266        let mut prev_am: T::ActiveModel = if let Some(ref prev) = prev_update {
267            match prev {
268                CacheUpdates::Delete => Err(simerr!("Attempt to modify a previously deleted key: '{key}'"))?,
269                CacheUpdates::Insert(a) => a.clone(),
270                CacheUpdates::Update(a) => a.clone(),
271            }
272        }
273        else {
274            old_value.into()
275        };
276
277        let mut changed = false;
278        let new_am: T::ActiveModel = value.clone().into();
279
280        for c in T::Column::iter() {
281            let new_v = new_am.get(c);
282            if prev_am.get(c) != new_v {
283                changed = true;
284                if let Some(v) = new_v.into_value() {
285                    prev_am.set(c, v);
286                }
287                else {
288                    prev_am.not_set(c);
289                }
290            }
291        }
292
293        // With IMMUTABLE model we can safely put the changed data record back into the cache.
294        let op = if changed && IMMUTABLE {
295            DataControllerOp::Insert
296        }
297        else {
298            DataControllerOp::Nop
299        };
300
301        // If there is no difference between the old and new values, then the previous update record remains
302        // intact. Otherwise, we ensure that the previous update only modifies the contained active model and
303        // is not mistakenly replaced with the [`CacheUpdates::Update`] variant.
304        let update = if changed {
305            Some(if prev_update.is_none() {
306                CacheUpdates::Update(prev_am)
307            }
308            else {
309                prev_update.unwrap().replace(prev_am)
310            })
311        }
312        else {
313            prev_update
314        };
315
316        Ok(wbdc_response!(op, update))
317    }
318}