wb_cache/update_iterator.rs
1use fieldx_plus::child_build;
2use fieldx_plus::fx_plus;
3use tokio::sync::OwnedRwLockWriteGuard;
4
5use crate::Cache;
6use crate::DataController;
7
8type KeyGuard<DC> = (
9 <DC as DataController>::Key,
10 OwnedRwLockWriteGuard<Option<<DC as DataController>::CacheUpdate>>,
11);
12type KeyOptGuard<DC> = (
13 <DC as DataController>::Key,
14 Option<OwnedRwLockWriteGuard<Option<<DC as DataController>::CacheUpdate>>>,
15);
16
17/// Iterator over updates in the cache.
18///
19/// This is a crucial part of the [write-back](DataController::write_back) mechanism. Despite its name reflecting
20/// the primary function of the struct, it does not implement the `Iterator` trait but provides a `next()` method
21/// to iterate over updates. This is because it incorporates aspects of a collection's behavior, making it more
22/// than just a simple iterator.
23///
24/// <a id="confirmation"></a>
25/// **Note** that an important part of implementing a write back method is to remember to confirm the successfully
26/// processed updates. Unconfirmed ones will be put back into the update pool for later processing. Confirmation can be
27/// done by calling the `confirm_all()` method or by calling `confirm()` on each item returned by the `next()` method.
28/// The former approach is useful for transactional updates.
29#[fx_plus(
30 child(Cache<DC>, rc_strong),
31 parent,
32 default(off),
33 sync,
34 rc,
35 get(off),
36 builder(vis(pub(crate)))
37)]
38pub struct UpdateIterator<DC>
39where
40 DC: DataController + Send + Sync + 'static,
41{
42 #[fieldx(inner_mut, private, get, get_mut, builder(private))]
43 unprocessed: Vec<KeyOptGuard<DC>>,
44
45 #[fieldx(inner_mut, private, get(copy), set, builder(off))]
46 next_idx: usize,
47
48 // Collect owned guards here. When the iterator is dropped the locks are released.
49 #[fieldx(inner_mut, get_mut(vis(pub(crate))), builder(off))]
50 worked: Vec<KeyGuard<DC>>,
51}
52
53impl<DC> UpdateIterator<DC>
54where
55 DC: DataController + Send + Sync + 'static,
56{
57 #[inline(always)]
58 fn take_back(&self, key_guard: (DC::Key, OwnedRwLockWriteGuard<Option<DC::CacheUpdate>>)) {
59 if key_guard.1.is_some() {
60 // If the update data is Some, then it means the data controller hasn't confirmed it yet.
61 // Leaving aside the hypothesis of a bug in the DC implementation, this indicates that
62 // either a transaction is in progress or there was an error while processing the update.
63 // Retain the lock for later so that the DC can confirm the entire transaction at once when it is completed.
64 self.worked_mut().push(key_guard);
65 }
66 }
67
68 /// Confirm all updates at once. Useful for transactional updates.
69 #[inline]
70 pub fn confirm_all(&self) {
71 for (_key, guard) in self.worked_mut().iter_mut() {
72 guard.take();
73 }
74 }
75
76 /// The number of update records to process.
77 #[inline(always)]
78 pub fn len(&self) -> usize {
79 self.unprocessed().len()
80 }
81
82 #[inline(always)]
83 pub fn is_empty(&self) -> bool {
84 self.unprocessed().is_empty()
85 }
86
87 /// Get the next update item to process or `None` if there are no more items left.
88 ///
89 /// It is possible that not all of the updates, bundled with the current iterator, will be returned by this method.
90 /// This can happen due to the concurrent nature of the cache and the fact that updates can be flushed by other
91 /// threads.
92 pub fn next(&self) -> Option<UpdateIteratorItem<DC>> {
93 let mut unprocessed = self.unprocessed_mut();
94 loop {
95 let next_idx = self.next_idx();
96 if next_idx >= unprocessed.len() {
97 return None;
98 }
99
100 let Some((key, guard)) = unprocessed.get_mut(next_idx).map(|(k, g)| (k.clone(), g.take()))
101 else {
102 panic!(
103 "Internal error of UpdateIterator<{}>: next update key not found at index {next_idx}",
104 std::any::type_name::<DC>()
105 );
106 };
107
108 self.set_next_idx(next_idx + 1);
109
110 let guard = if let Some(g) = guard {
111 g
112 }
113 else if let Some(update) = self.parent().updates().get(&key).cloned() {
114 // Either we're able to get a write lock immediately or we skip this update. This way two problems are
115 // avoided:
116 //
117 // 1. Deadlock: waiting for the update lock may block the entire cache.
118 // 2. A locked update is assumed to be already processed by another thread, typically as a result of
119 // flush_one call.
120 let Ok(guard) = update.data.clone().try_write_owned()
121 else {
122 continue;
123 };
124 guard
125 }
126 else {
127 // Skip if there is no update for this key. Most likely it was already flushed.
128 continue;
129 };
130
131 // If guard's content is None, it means the update was already flushed and we can skip it.
132 if guard.is_some() {
133 return Some(
134 child_build!(
135 self, UpdateIteratorItem<DC> {
136 key_guard: Some((key.clone(), guard)),
137 }
138 )
139 .unwrap(),
140 );
141 }
142 }
143 }
144
145 /// Reset the iterator to the initial state to allow re-iterating over the same updates. Mostly useful for cache
146 /// controller's own purposes.
147 #[inline]
148 pub fn reset(&self) {
149 self.set_next_idx(0);
150 self.worked_mut().truncate(0);
151 }
152}
153
154impl<DC> UpdateIteratorBuilder<DC>
155where
156 DC: DataController + Send + Sync + 'static,
157{
158 /// Setup the iterator from a list of keys. In this case it will attemp to collect the write locks from the update
159 /// records.
160 pub(crate) fn keys(self, keys: Vec<DC::Key>) -> Self {
161 let unprocessed = keys.into_iter().map(|key| (key, None)).collect::<Vec<_>>();
162 self.unprocessed(unprocessed)
163 }
164
165 /// Setup the iterator from a single key/guard pair. This is to support single entry flushes where the guard is
166 /// pre-collected.
167 pub(crate) fn key_guard(self, kg: (DC::Key, OwnedRwLockWriteGuard<Option<DC::CacheUpdate>>)) -> Self {
168 self.unprocessed(vec![(kg.0, Some(kg.1))])
169 }
170}
171
172/// Update item provides access to the update record, its key, and allows confirming the update.
173///
174/// When the item is dropped, it returns itself to the iterator for post-processing. This is particularly important
175/// when transactional updates are used and confirmed using the [`confirm_all()`](UpdateIterator::confirm_all) method,
176/// because that method confirms only the
177/// items that were returned to the iterator.
178#[fx_plus(
179 child(UpdateIterator<DC>, rc_strong),
180 default(off),
181 sync,
182)]
183pub struct UpdateIteratorItem<DC>
184where
185 DC: DataController + Send + Sync + 'static,
186{
187 // Option here is to allow the Drop trait to take the guard back to the iterator.
188 key_guard: Option<KeyGuard<DC>>,
189}
190
191impl<DC> UpdateIteratorItem<DC>
192where
193 DC: DataController + Send + Sync + 'static,
194{
195 /// Get the update record.
196 pub fn update(&self) -> &DC::CacheUpdate {
197 // The .expect must never fire because we own the exclusive lock and the iterator is checking for None before
198 // returning this item.
199 self.key_guard
200 .as_ref()
201 .expect("Internal error: guard is None")
202 .1
203 .as_ref()
204 .expect("Internal error: update data cannot be None")
205 }
206
207 /// Get the update record's key.
208 pub fn key(&self) -> &DC::Key {
209 // The .expect must never fire because we own the exclusive lock and the iterator is checking for None before
210 // returning this item.
211 &self.key_guard.as_ref().expect("Internal error: guard is None").0
212 }
213
214 /// Confirm writing this update record to the backend.
215 pub fn confirm(mut self) {
216 if let Some(mut guard) = self.key_guard.take() {
217 guard.1.take();
218 }
219 else {
220 unreachable!("Internal error: guard is None");
221 }
222 }
223}
224
225/// Returns this item back to the iterator for post-processing.
226impl<DC> Drop for UpdateIteratorItem<DC>
227where
228 DC: DataController + Send + Sync + 'static,
229{
230 fn drop(&mut self) {
231 if let Some(key_guard) = self.key_guard.take() {
232 self.parent().take_back(key_guard);
233 }
234 }
235}