wb_cache/test/simulation/db/cache.rs
1//! Common implementation of the `DataController` methods and types for the simulation code.
2use crate::prelude::*;
3use crate::test::simulation::types::simerr;
4use crate::test::simulation::types::Result;
5use crate::test::simulation::types::SimErrorAny;
6use crate::update_iterator::UpdateIterator;
7use crate::update_iterator::UpdateIteratorItem;
8use crate::wbdc_response;
9use sea_orm::entity::Iterable;
10use sea_orm::ActiveModelTrait;
11use sea_orm::DatabaseConnection;
12use sea_orm::DeleteMany;
13use sea_orm::EntityTrait;
14use sea_orm::IntoActiveModel;
15use sea_orm::TransactionTrait;
16use sea_orm_migration::async_trait::async_trait;
17use std::fmt::Debug;
18use std::sync::Arc;
19use std::sync::Weak;
20use tracing::instrument;
21
22use super::driver::DatabaseDriver;
23
24/// The type of cache controller update pool records.
25#[derive(Debug)]
26pub enum CacheUpdates<AM>
27where
28 AM: ActiveModelTrait + Sized + Send + Sync + 'static,
29{
30 /// The active model of this variant is to be inserted into the the database.
31 Insert(AM),
32 /// Database record is to be update with this active model.
33 Update(AM),
34 /// Delete the record from the database.
35 Delete,
36}
37
38impl<AM> CacheUpdates<AM>
39where
40 AM: ActiveModelTrait + Sized + Send + Sync + 'static,
41{
42 /// Create a new ActiveModel with the same discriminant as the current one. To an external observer, it functions
43 /// as replacing a container value.
44 pub fn replace(&self, am: AM) -> CacheUpdates<AM> {
45 match self {
46 CacheUpdates::Insert(_) => CacheUpdates::Insert(am),
47 CacheUpdates::Update(_) => CacheUpdates::Update(am),
48 CacheUpdates::Delete => CacheUpdates::Delete,
49 }
50 }
51}
52
53pub trait DBProvider: Sync + Send + 'static {
54 fn db_driver(&self) -> Result<Arc<impl DatabaseDriver>>;
55 fn db_connection(&self) -> Result<DatabaseConnection>;
56}
57
58/// Common implementation of DataController methods.
59///
60/// All models in this simulation share a lot of properties. Primarily due to the use
61/// of SeaORM framework. Because of this most of the `DataController` functionality
62/// can be shared among them and this is what is this trait made for.
63///
64/// Though an attempt to provide as much information about the implementation as possible
65/// in this documentation will be made, it is recommended to look into the source code
66/// as some comments in the code require the context to be understood.
67///
68/// # Type parameters
69///
70/// | Parameter | Description |
71/// |-----------|-------------|
72/// | `T` | The SeaORM `Entity` type that this data controller is responsible for. It must implement the [`EntityTrait`] trait.
73/// | `PARENT` | The data controller implementing `DCCommon` is expected to be a child of an entity that implements the [`DBProvider`] trait, thereby providing a database connection.
74/// | `IMMUTABLE` | Setting this to `true` indicates that the database does not modify records when they are written to it; in particular, the primary key is configured with auto-increment disabled.
75#[async_trait]
76pub trait DCCommon<T, PARENT, const IMMUTABLE: bool = false>:
77 DataController<CacheUpdate = CacheUpdates<T::ActiveModel>, Error = SimErrorAny> + Send + Debug
78where
79 T: EntityTrait + Send + Sync + 'static,
80 T::Model: IntoActiveModel<T::ActiveModel>,
81 T::Column: Iterable,
82 T::ActiveModel: ActiveModelTrait + Send + Sync + 'static + From<Self::Value>,
83 PARENT: DBProvider,
84 Self::Value: Send + Sync + 'static,
85 Self::Key: ToString + Send + Sync + 'static,
86 Self: ::fieldx_plus::Child<
87 WeakParent = Weak<PARENT>,
88 RcParent = Result<Arc<PARENT>, Self::Error>,
89 FXPParent = Weak<PARENT>,
90 >,
91{
92 /// Provide correct condition for SeaORM's [`DeleteMany`] operation. See, for example,
93 /// [`CustomerManager::delete_many_condition`](crate::test::simulation::db::entity::customer::Manager::delete_many_condition)
94 /// source code.
95 fn delete_many_condition(dm: DeleteMany<T>, keys: Vec<Self::Key>) -> DeleteMany<T>;
96
97 /// Where we get our database connection from.
98 fn db_provider(&self) -> <Self as ::fieldx_plus::Child>::RcParent {
99 self.parent()
100 }
101
102 /// Try to send given update records to the database in the most efficient way. This task is accomplished by:
103 ///
104 /// - using a transaction to group all updates together and possibly avoid re-indexing overhead;
105 /// - sorting updates into inserts, updates, and deletes;
106 /// - batching inserts and deletes.
107 ///
108 /// Updates are performed on a per-record basis because they are, by nature, not batchable.
109 ///
110 /// To be on the safe side, the batches are limited to 1000 records each. Technically, the PostgreSQL protocol
111 /// allows for up to 65,535 records in a single batch. However, practically, even half of that was causing errors.
112 /// Considering that even with the limit of 1000 the simulation demonstrates an 80-100 times improvement over the
113 /// non-cached approach, this is considered a good trade-off.
114 #[instrument(level = "trace", skip(update_records))]
115 async fn wbdc_write_back(&self, update_records: Arc<UpdateIterator<Self>>) -> Result<(), Self::Error> {
116 let conn_provider = self.db_provider()?;
117 let db_conn = conn_provider.db_connection()?;
118
119 // The data safety of this method is ensured by the following critical implementation details:
120 //
121 // - All errors are immediately propagated.
122 // - The transaction object automatically rolls back when dropped.
123 // - Update records are not confirmed until the transaction is successfully committed.
124
125 let transaction = db_conn.begin().await?;
126 let mut inserts = vec![];
127 let mut deletes = vec![];
128
129 loop {
130 let update_item = (*update_records).next();
131 let last_loop = update_item.is_none();
132
133 // Send inserts and deletes to the DB either when done iterating over the updates or when the batch
134 // size reaches 1000 records.
135 if (last_loop && !inserts.is_empty()) || inserts.len() >= 1000 {
136 let am_list = inserts
137 .iter()
138 .map(|i: &UpdateIteratorItem<Self>| {
139 let CacheUpdates::<T::ActiveModel>::Insert(am) = i.update()
140 else {
141 unreachable!("Expected insert update, but got: {:?}", i.update())
142 };
143 am
144 })
145 .cloned()
146 .collect::<Vec<_>>();
147
148 T::insert_many(am_list).exec_without_returning(&transaction).await?;
149 inserts.clear();
150 }
151
152 if (last_loop && !deletes.is_empty()) || deletes.len() >= 1000 {
153 let delete_chunk = deletes
154 .iter()
155 .map(|i: &UpdateIteratorItem<Self>| {
156 let CacheUpdates::<T::ActiveModel>::Delete = i.update()
157 else {
158 unreachable!("Expected delete update, but got: {:?}", i.update())
159 };
160 i.key()
161 })
162 .cloned()
163 .collect::<Vec<_>>();
164 // To implement correct deletion, we delegate the responsibility of determining the filter condition to
165 // the data controller itself, because only the data controller knows the important specifics of the
166 // model, such as which keys are to be used.
167 Self::delete_many_condition(T::delete_many(), delete_chunk)
168 .exec(&transaction)
169 .await?;
170 deletes.clear();
171 }
172
173 if last_loop {
174 break;
175 }
176
177 let update_item = update_item.unwrap();
178 let upd = update_item.update();
179
180 match upd {
181 CacheUpdates::Insert(_) => {
182 inserts.push(update_item);
183 }
184 CacheUpdates::Update(a) => {
185 // Updates cannot be done all at once. Do them right in place then.
186 let am: T::ActiveModel = a.clone();
187 am.update(&transaction).await?;
188 }
189 CacheUpdates::Delete => deletes.push(update_item),
190 };
191 }
192
193 transaction.commit().await?;
194
195 // As long as the transaction succeeded, we can confirm all updates.
196 update_records.confirm_all();
197
198 Ok(())
199 }
200
201 /// For every new data record added this method respond with [`CacheUpdates::Insert(record_active_mode)`] update.
202 /// The [`DataControllerOp`] depends on the `IMMUTABLE` flag: if it is set to `true`, the operation is
203 /// [`DataControllerOp::Insert`], otherwise it is [`DataControllerOp::Nop`].
204 #[instrument(level = "trace", skip(value))]
205 async fn wbdbc_on_new<AM>(&self, _key: &Self::Key, value: &AM) -> Result<DataControllerResponse<Self>, Self::Error>
206 where
207 AM: Into<T::ActiveModel> + Clone + Send + Sync + 'static,
208 {
209 let op = if IMMUTABLE {
210 DataControllerOp::Insert
211 }
212 else {
213 DataControllerOp::Nop
214 };
215 Ok(wbdc_response!(op, Some(CacheUpdates::Insert(value.clone().into()))))
216 }
217
218 /// Execute record deletion.
219 ///
220 /// A special perk of this method is that when a new record is inserted into the cache, its associated update record
221 /// remains as [`CacheUpdates::Insert`] until the next flush, even if the record is later modified by the user.
222 /// This means that for `IMMUTABLE` data controllers, writing the record to the backend and then deleting it has no
223 /// side effects and can be safely collapsed into a single operation. This is exactly what this method does: when
224 /// it finds that the previous update state of an immutable model for the key is an insert, it simply returns a
225 /// [`DataControllerOp::Drop`] operation, meaning that both data and update records
226 /// are removed without having wasted time writing to the backend.
227 #[instrument(level = "trace")]
228 async fn wbdc_on_delete(
229 &self,
230 _key: &Self::Key,
231 update: Option<&CacheUpdates<T::ActiveModel>>,
232 ) -> Result<DataControllerResponse<Self>, Self::Error> {
233 let op = if IMMUTABLE && update.is_some() && matches!(update.unwrap(), CacheUpdates::Insert(_)) {
234 // The perfect case where an insert update hasn't been written to the backend yet and can be just dropped
235 // altogether.
236 DataControllerOp::Drop
237 }
238 else {
239 // In other cases we only request for cache removal and expect the next flush to remove the data from the
240 // backend.
241 DataControllerOp::Revoke
242 };
243 Ok(wbdc_response!(op, Some(CacheUpdates::Delete)))
244 }
245
246 /// Called when a record is modified by the user.
247 ///
248 /// This method creates a "diff" active model representing the differences between the unmodified and modified
249 /// values, where only the changed fields are set. If an update record for the key already exists, it is merged with
250 /// the diff, and the resulting update record is returned.
251 ///
252 /// The new update record always maintains the same discriminant as the previous one unless no prior update exists.
253 ///
254 /// The [`DataControllerOp`] is set to [`DataControllerOp::Insert`] if the `IMMUTABLE` flag is set to `true`;
255 /// otherwise, it is [`DataControllerOp::Nop`].
256 #[instrument(level = "trace", skip(value, old_value, prev_update))]
257 async fn wbdc_on_change(
258 &self,
259 key: &Self::Key,
260 value: &Self::Value,
261 old_value: Self::Value,
262 prev_update: Option<Self::CacheUpdate>,
263 ) -> Result<DataControllerResponse<Self>, Self::Error> {
264 // We use the previous update state as the base when it exists. Otherwise the previous cached value would
265 // implement this role.
266 let mut prev_am: T::ActiveModel = if let Some(ref prev) = prev_update {
267 match prev {
268 CacheUpdates::Delete => Err(simerr!("Attempt to modify a previously deleted key: '{key}'"))?,
269 CacheUpdates::Insert(a) => a.clone(),
270 CacheUpdates::Update(a) => a.clone(),
271 }
272 }
273 else {
274 old_value.into()
275 };
276
277 let mut changed = false;
278 let new_am: T::ActiveModel = value.clone().into();
279
280 for c in T::Column::iter() {
281 let new_v = new_am.get(c);
282 if prev_am.get(c) != new_v {
283 changed = true;
284 if let Some(v) = new_v.into_value() {
285 prev_am.set(c, v);
286 }
287 else {
288 prev_am.not_set(c);
289 }
290 }
291 }
292
293 // With IMMUTABLE model we can safely put the changed data record back into the cache.
294 let op = if changed && IMMUTABLE {
295 DataControllerOp::Insert
296 }
297 else {
298 DataControllerOp::Nop
299 };
300
301 // If there is no difference between the old and new values, then the previous update record remains
302 // intact. Otherwise, we ensure that the previous update only modifies the contained active model and
303 // is not mistakenly replaced with the [`CacheUpdates::Update`] variant.
304 let update = if changed {
305 Some(if prev_update.is_none() {
306 CacheUpdates::Update(prev_am)
307 }
308 else {
309 prev_update.unwrap().replace(prev_am)
310 })
311 }
312 else {
313 prev_update
314 };
315
316 Ok(wbdc_response!(op, update))
317 }
318}