wb_cache/
update_iterator.rs

1use fieldx_plus::child_build;
2use fieldx_plus::fx_plus;
3use tokio::sync::OwnedRwLockWriteGuard;
4
5use crate::Cache;
6use crate::DataController;
7
8type KeyGuard<DC> = (
9    <DC as DataController>::Key,
10    OwnedRwLockWriteGuard<Option<<DC as DataController>::CacheUpdate>>,
11);
12type KeyOptGuard<DC> = (
13    <DC as DataController>::Key,
14    Option<OwnedRwLockWriteGuard<Option<<DC as DataController>::CacheUpdate>>>,
15);
16
17/// Iterator over updates in the cache.
18///
19/// This is a crucial part of the [write-back](DataController::write_back) mechanism. Despite its name reflecting
20/// the primary function of the struct, it does not implement the `Iterator` trait but provides a `next()` method
21/// to iterate over updates. This is because it incorporates aspects of a collection's behavior, making it more
22/// than just a simple iterator.
23///
24/// <a id="confirmation"></a>
25/// **Note** that an important part of implementing a write back method is to remember to confirm the successfully
26/// processed updates. Unconfirmed ones will be put back into the update pool for later processing. Confirmation can be
27/// done by calling the `confirm_all()` method or by calling `confirm()` on each item returned by the `next()` method.
28/// The former approach is useful for transactional updates.
29#[fx_plus(
30    child(Cache<DC>, rc_strong),
31    parent,
32    default(off),
33    sync,
34    rc,
35    get(off),
36    builder(vis(pub(crate)))
37)]
38pub struct UpdateIterator<DC>
39where
40    DC: DataController + Send + Sync + 'static,
41{
42    #[fieldx(inner_mut, private, get, get_mut, builder(private))]
43    unprocessed: Vec<KeyOptGuard<DC>>,
44
45    #[fieldx(inner_mut, private, get(copy), set, builder(off))]
46    next_idx: usize,
47
48    // Collect owned guards here. When the iterator is dropped the locks are released.
49    #[fieldx(inner_mut, get_mut(vis(pub(crate))), builder(off))]
50    worked: Vec<KeyGuard<DC>>,
51}
52
53impl<DC> UpdateIterator<DC>
54where
55    DC: DataController + Send + Sync + 'static,
56{
57    #[inline(always)]
58    fn take_back(&self, key_guard: (DC::Key, OwnedRwLockWriteGuard<Option<DC::CacheUpdate>>)) {
59        if key_guard.1.is_some() {
60            // If the update data is Some, then it means the data controller hasn't confirmed it yet.
61            // Leaving aside the hypothesis of a bug in the DC implementation, this indicates that
62            // either a transaction is in progress or there was an error while processing the update.
63            // Retain the lock for later so that the DC can confirm the entire transaction at once when it is completed.
64            self.worked_mut().push(key_guard);
65        }
66    }
67
68    /// Confirm all updates at once. Useful for transactional updates.
69    #[inline]
70    pub fn confirm_all(&self) {
71        for (_key, guard) in self.worked_mut().iter_mut() {
72            guard.take();
73        }
74    }
75
76    /// The number of update records to process.
77    #[inline(always)]
78    pub fn len(&self) -> usize {
79        self.unprocessed().len()
80    }
81
82    #[inline(always)]
83    pub fn is_empty(&self) -> bool {
84        self.unprocessed().is_empty()
85    }
86
87    /// Get the next update item to process or `None` if there are no more items left.
88    ///
89    /// It is possible that not all of the updates, bundled with the current iterator, will be returned by this method.
90    /// This can happen due to the concurrent nature of the cache and the fact that updates can be flushed by other
91    /// threads.
92    pub fn next(&self) -> Option<UpdateIteratorItem<DC>> {
93        let mut unprocessed = self.unprocessed_mut();
94        loop {
95            let next_idx = self.next_idx();
96            if next_idx >= unprocessed.len() {
97                return None;
98            }
99
100            let Some((key, guard)) = unprocessed.get_mut(next_idx).map(|(k, g)| (k.clone(), g.take()))
101            else {
102                panic!(
103                    "Internal error of UpdateIterator<{}>: next update key not found at index {next_idx}",
104                    std::any::type_name::<DC>()
105                );
106            };
107
108            self.set_next_idx(next_idx + 1);
109
110            let guard = if let Some(g) = guard {
111                g
112            }
113            else if let Some(update) = self.parent().updates().get(&key).cloned() {
114                // Either we're able to get a write lock immediately or we skip this update. This way two problems are
115                // avoided:
116                //
117                // 1. Deadlock: waiting for the update lock may block the entire cache.
118                // 2. A locked update is assumed to be already processed by another thread, typically as a result of
119                //    flush_one call.
120                let Ok(guard) = update.data.clone().try_write_owned()
121                else {
122                    continue;
123                };
124                guard
125            }
126            else {
127                // Skip if there is no update for this key. Most likely it was already flushed.
128                continue;
129            };
130
131            // If guard's content is None, it means the update was already flushed and we can skip it.
132            if guard.is_some() {
133                return Some(
134                    child_build!(
135                        self, UpdateIteratorItem<DC> {
136                            key_guard: Some((key.clone(), guard)),
137                        }
138                    )
139                    .unwrap(),
140                );
141            }
142        }
143    }
144
145    /// Reset the iterator to the initial state to allow re-iterating over the same updates. Mostly useful for cache
146    /// controller's own purposes.
147    #[inline]
148    pub fn reset(&self) {
149        self.set_next_idx(0);
150        self.worked_mut().truncate(0);
151    }
152}
153
154impl<DC> UpdateIteratorBuilder<DC>
155where
156    DC: DataController + Send + Sync + 'static,
157{
158    /// Setup the iterator from a list of keys. In this case it will attemp to collect the write locks from the update
159    /// records.
160    pub(crate) fn keys(self, keys: Vec<DC::Key>) -> Self {
161        let unprocessed = keys.into_iter().map(|key| (key, None)).collect::<Vec<_>>();
162        self.unprocessed(unprocessed)
163    }
164
165    /// Setup the iterator from a single key/guard pair. This is to support single entry flushes where the guard is
166    /// pre-collected.
167    pub(crate) fn key_guard(self, kg: (DC::Key, OwnedRwLockWriteGuard<Option<DC::CacheUpdate>>)) -> Self {
168        self.unprocessed(vec![(kg.0, Some(kg.1))])
169    }
170}
171
172/// Update item provides access to the update record, its key, and allows confirming the update.
173///
174/// When the item is dropped, it returns itself to the iterator for post-processing.  This is particularly important
175/// when transactional updates are used and confirmed using the [`confirm_all()`](UpdateIterator::confirm_all) method,
176/// because that method confirms only the
177/// items that were returned to the iterator.
178#[fx_plus(
179    child(UpdateIterator<DC>, rc_strong),
180    default(off),
181    sync,
182)]
183pub struct UpdateIteratorItem<DC>
184where
185    DC: DataController + Send + Sync + 'static,
186{
187    // Option here is to allow the Drop trait to take the guard back to the iterator.
188    key_guard: Option<KeyGuard<DC>>,
189}
190
191impl<DC> UpdateIteratorItem<DC>
192where
193    DC: DataController + Send + Sync + 'static,
194{
195    /// Get the update record.
196    pub fn update(&self) -> &DC::CacheUpdate {
197        // The .expect must never fire because we own the exclusive lock and the iterator is checking for None before
198        // returning this item.
199        self.key_guard
200            .as_ref()
201            .expect("Internal error: guard is None")
202            .1
203            .as_ref()
204            .expect("Internal error: update data cannot be None")
205    }
206
207    /// Get the update record's key.
208    pub fn key(&self) -> &DC::Key {
209        // The .expect must never fire because we own the exclusive lock and the iterator is checking for None before
210        // returning this item.
211        &self.key_guard.as_ref().expect("Internal error: guard is None").0
212    }
213
214    /// Confirm writing this update record to the backend.
215    pub fn confirm(mut self) {
216        if let Some(mut guard) = self.key_guard.take() {
217            guard.1.take();
218        }
219        else {
220            unreachable!("Internal error: guard is None");
221        }
222    }
223}
224
225/// Returns this item back to the iterator for post-processing.
226impl<DC> Drop for UpdateIteratorItem<DC>
227where
228    DC: DataController + Send + Sync + 'static,
229{
230    fn drop(&mut self) {
231        if let Some(key_guard) = self.key_guard.take() {
232            self.parent().take_back(key_guard);
233        }
234    }
235}