1use super::*;
2
3impl<K, V, C, P, S> AsyncTm<K, V, C, P, S>
4where
5 C: Cm<Key = K>,
6 P: Pwm<Key = K, Value = V>,
7 S: AsyncSpawner,
8{
9 pub async fn write_with_blocking_cm_and_pwm(
12 &self,
13 pending_manager_opts: P::Options,
14 conflict_manager_opts: C::Options,
15 ) -> Result<AsyncWtm<K, V, C, P, S>, TransactionError<C::Error, P::Error>> {
16 let read_ts = self.inner.read_ts().await;
17 Ok(AsyncWtm {
18 orc: self.inner.clone(),
19 read_ts,
20 size: 0,
21 count: 0,
22 conflict_manager: Some(C::new(conflict_manager_opts).map_err(TransactionError::conflict)?),
23 pending_writes: Some(P::new(pending_manager_opts).map_err(TransactionError::pending)?),
24 duplicate_writes: OneOrMore::new(),
25 discarded: false,
26 done_read: false,
27 })
28 }
29}
30
31impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
32where
33 C: CmComparable<Key = K>,
34 S: AsyncSpawner,
35{
36 pub fn mark_read_comparable_blocking<Q>(&mut self, k: &Q)
38 where
39 K: Borrow<Q>,
40 Q: ?Sized + Ord,
41 {
42 if let Some(ref mut conflict_manager) = self.conflict_manager {
43 conflict_manager.mark_read_comparable(k);
44 }
45 }
46
47 pub fn mark_conflict_comparable_blocking<Q>(&mut self, k: &Q)
49 where
50 K: Borrow<Q>,
51 Q: ?Sized + Ord,
52 {
53 if let Some(ref mut conflict_manager) = self.conflict_manager {
54 conflict_manager.mark_conflict_comparable(k);
55 }
56 }
57}
58
59impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
60where
61 C: CmEquivalent<Key = K>,
62 S: AsyncSpawner,
63{
64 pub fn mark_read_equivalent_blocking<Q>(&mut self, k: &Q)
66 where
67 K: Borrow<Q>,
68 Q: ?Sized + Hash + Eq,
69 {
70 if let Some(ref mut conflict_manager) = self.conflict_manager {
71 conflict_manager.mark_read_equivalent(k);
72 }
73 }
74
75 pub fn mark_conflict_equivalent_blocking<Q>(&mut self, k: &Q)
77 where
78 K: Borrow<Q>,
79 Q: ?Sized + Hash + Eq,
80 {
81 if let Some(ref mut conflict_manager) = self.conflict_manager {
82 conflict_manager.mark_conflict_equivalent(k);
83 }
84 }
85}
86
87impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
88where
89 C: Cm<Key = K>,
90 S: AsyncSpawner,
91{
92 pub fn blocking_marker(&mut self) -> Option<Marker<'_, C>> {
98 self.conflict_manager.as_mut().map(Marker::new)
99 }
100
101 pub fn blocking_marker_with_pm(&mut self) -> Option<(Marker<'_, C>, &P)> {
108 self
109 .conflict_manager
110 .as_mut()
111 .map(|marker| (Marker::new(marker), self.pending_writes.as_ref().unwrap()))
112 }
113
114 pub fn mark_read_blocking(&mut self, k: &K) {
116 if let Some(ref mut conflict_manager) = self.conflict_manager {
117 conflict_manager.mark_read(k);
118 }
119 }
120
121 pub fn mark_conflict_blocking(&mut self, k: &K) {
123 if let Some(ref mut conflict_manager) = self.conflict_manager {
124 conflict_manager.mark_conflict(k);
125 }
126 }
127}
128
129impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
130where
131 C: Cm<Key = K>,
132 P: Pwm<Key = K, Value = V>,
133 S: AsyncSpawner,
134{
135 #[inline]
137 pub fn rollback_blocking(&mut self) -> Result<(), TransactionError<C::Error, P::Error>> {
138 if self.discarded {
139 return Err(TransactionError::Discard);
140 }
141
142 self
143 .pending_writes
144 .as_mut()
145 .unwrap()
146 .rollback()
147 .map_err(TransactionError::Pwm)?;
148 self
149 .conflict_manager
150 .as_mut()
151 .unwrap()
152 .rollback()
153 .map_err(TransactionError::Cm)?;
154 Ok(())
155 }
156
157 pub fn insert_blocking(
159 &mut self,
160 key: K,
161 value: V,
162 ) -> Result<(), TransactionError<C::Error, P::Error>> {
163 self.insert_with_blocking_in(key, value)
164 }
165
166 pub fn remove_blocking(&mut self, key: K) -> Result<(), TransactionError<C::Error, P::Error>> {
172 self.modify_blocking(Entry {
173 data: EntryData::Remove(key),
174 version: 0,
175 })
176 }
177
178 pub fn contains_key_blocking(
180 &mut self,
181 key: &K,
182 ) -> Result<Option<bool>, TransactionError<C::Error, P::Error>> {
183 if self.discarded {
184 return Err(TransactionError::Discard);
185 }
186
187 match self
188 .pending_writes
189 .as_ref()
190 .unwrap()
191 .get(key)
192 .map_err(TransactionError::pending)?
193 {
194 Some(ent) => {
195 if ent.value.is_none() {
197 return Ok(Some(false));
198 }
199
200 Ok(Some(true))
202 }
203 None => {
204 if let Some(ref mut conflict_manager) = self.conflict_manager {
207 conflict_manager.mark_read(key);
208 }
209
210 Ok(None)
211 }
212 }
213 }
214
215 pub fn get_blocking<'a, 'b: 'a>(
218 &'a mut self,
219 key: &'b K,
220 ) -> Result<Option<EntryRef<'a, K, V>>, TransactionError<C::Error, P::Error>> {
221 if self.discarded {
222 return Err(TransactionError::Discard);
223 }
224
225 if let Some(e) = self
226 .pending_writes
227 .as_ref()
228 .unwrap()
229 .get(key)
230 .map_err(TransactionError::Pwm)?
231 {
232 if e.value.is_none() {
234 return Ok(None);
235 }
236
237 Ok(Some(EntryRef {
239 data: match &e.value {
240 Some(value) => EntryDataRef::Insert { key, value },
241 None => EntryDataRef::Remove(key),
242 },
243 version: e.version,
244 }))
245 } else {
246 if let Some(ref mut conflict_manager) = self.conflict_manager {
249 conflict_manager.mark_read(key);
250 }
251
252 Ok(None)
253 }
254 }
255
256 fn insert_with_blocking_in(
257 &mut self,
258 key: K,
259 value: V,
260 ) -> Result<(), TransactionError<C::Error, P::Error>> {
261 let ent = Entry {
262 data: EntryData::Insert { key, value },
263 version: self.read_ts,
264 };
265
266 self.modify_blocking(ent)
267 }
268
269 fn modify_blocking(
270 &mut self,
271 ent: Entry<K, V>,
272 ) -> Result<(), TransactionError<C::Error, P::Error>> {
273 if self.discarded {
274 return Err(TransactionError::Discard);
275 }
276
277 let pending_writes = self.pending_writes.as_mut().unwrap();
278 pending_writes
279 .validate_entry(&ent)
280 .map_err(TransactionError::Pwm)?;
281
282 let cnt = self.count + 1;
283 let size = self.size + pending_writes.estimate_size(&ent);
285 if cnt >= pending_writes.max_batch_entries() || size >= pending_writes.max_batch_size() {
286 return Err(TransactionError::LargeTxn);
287 }
288
289 self.count = cnt;
290 self.size = size;
291
292 if let Some(ref mut conflict_manager) = self.conflict_manager {
295 conflict_manager.mark_conflict(ent.key());
296 }
297
298 let eversion = ent.version;
302 let (ek, ev) = ent.split();
303
304 if let Some((old_key, old_value)) = pending_writes
305 .remove_entry(&ek)
306 .map_err(TransactionError::Pwm)?
307 {
308 if old_value.version != eversion {
309 self
310 .duplicate_writes
311 .push(Entry::unsplit(old_key, old_value));
312 }
313 }
314 pending_writes
315 .insert(ek, ev)
316 .map_err(TransactionError::Pwm)?;
317
318 Ok(())
319 }
320}
321
322impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
323where
324 C: CmComparable<Key = K>,
325 P: PwmEquivalent<Key = K, Value = V>,
326 S: AsyncSpawner,
327{
328 pub fn contains_key_comparable_cm_equivalent_pm_blocking<'a, 'b: 'a, Q>(
334 &'a mut self,
335 key: &'b Q,
336 ) -> Result<Option<bool>, TransactionError<C::Error, P::Error>>
337 where
338 K: Borrow<Q>,
339 Q: ?Sized + Eq + Ord + Hash,
340 {
341 match self
342 .pending_writes
343 .as_ref()
344 .unwrap()
345 .get_equivalent(key)
346 .map_err(TransactionError::pending)?
347 {
348 Some(ent) => {
349 if ent.value.is_none() {
351 return Ok(Some(false));
352 }
353
354 Ok(Some(true))
356 }
357 None => {
358 if let Some(ref mut conflict_manager) = self.conflict_manager {
361 conflict_manager.mark_read_comparable(key);
362 }
363
364 Ok(None)
365 }
366 }
367 }
368
369 pub fn get_comparable_cm_equivalent_pm_blocking<'a, 'b: 'a, Q>(
372 &'a mut self,
373 key: &'b Q,
374 ) -> Result<Option<EntryRef<'a, K, V>>, TransactionError<C::Error, P::Error>>
375 where
376 K: Borrow<Q>,
377 Q: ?Sized + Eq + Ord + Hash,
378 {
379 if let Some((k, e)) = self
380 .pending_writes
381 .as_ref()
382 .unwrap()
383 .get_entry_equivalent(key)
384 .map_err(TransactionError::Pwm)?
385 {
386 if e.value.is_none() {
388 return Ok(None);
389 }
390
391 Ok(Some(EntryRef {
393 data: match &e.value {
394 Some(value) => EntryDataRef::Insert { key: k, value },
395 None => EntryDataRef::Remove(k),
396 },
397 version: e.version,
398 }))
399 } else {
400 if let Some(ref mut conflict_manager) = self.conflict_manager {
403 conflict_manager.mark_read_comparable(key);
404 }
405
406 Ok(None)
407 }
408 }
409}
410
411impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
412where
413 C: CmEquivalent<Key = K>,
414 P: PwmComparable<Key = K, Value = V>,
415 S: AsyncSpawner,
416{
417 pub fn contains_key_equivalent_cm_comparable_pm_blocking<'a, 'b: 'a, Q>(
423 &'a mut self,
424 key: &'b Q,
425 ) -> Result<Option<bool>, TransactionError<C::Error, P::Error>>
426 where
427 K: Borrow<Q>,
428 Q: ?Sized + Eq + Ord + Hash,
429 {
430 match self
431 .pending_writes
432 .as_ref()
433 .unwrap()
434 .get_comparable(key)
435 .map_err(TransactionError::pending)?
436 {
437 Some(ent) => {
438 if ent.value.is_none() {
440 return Ok(Some(false));
441 }
442
443 Ok(Some(true))
445 }
446 None => {
447 if let Some(ref mut conflict_manager) = self.conflict_manager {
450 conflict_manager.mark_read_equivalent(key);
451 }
452
453 Ok(None)
454 }
455 }
456 }
457
458 pub fn get_equivalent_cm_comparable_pm_blocking<'a, 'b: 'a, Q>(
461 &'a mut self,
462 key: &'b Q,
463 ) -> Result<Option<EntryRef<'a, K, V>>, TransactionError<C::Error, P::Error>>
464 where
465 K: Borrow<Q>,
466 Q: ?Sized + Eq + Ord + Hash,
467 {
468 if let Some((k, e)) = self
469 .pending_writes
470 .as_ref()
471 .unwrap()
472 .get_entry_comparable(key)
473 .map_err(TransactionError::Pwm)?
474 {
475 if e.value.is_none() {
477 return Ok(None);
478 }
479
480 Ok(Some(EntryRef {
482 data: match &e.value {
483 Some(value) => EntryDataRef::Insert { key: k, value },
484 None => EntryDataRef::Remove(k),
485 },
486 version: e.version,
487 }))
488 } else {
489 if let Some(ref mut conflict_manager) = self.conflict_manager {
492 conflict_manager.mark_read_equivalent(key);
493 }
494
495 Ok(None)
496 }
497 }
498}
499
500impl<K, V, C, P, S> AsyncWtm<K, V, C, P, S>
501where
502 C: CmComparable<Key = K>,
503 P: PwmComparable<Key = K, Value = V>,
504 S: AsyncSpawner,
505{
506 pub fn contains_key_comparable_blocking<'a, 'b: 'a, Q>(
512 &'a mut self,
513 key: &'b Q,
514 ) -> Result<Option<bool>, TransactionError<C::Error, P::Error>>
515 where
516 K: Borrow<Q>,
517 Q: ?Sized + Ord,
518 {
519 match self
520 .pending_writes
521 .as_ref()
522 .unwrap()
523 .get_comparable(key)
524 .map_err(TransactionError::pending)?
525 {
526 Some(ent) => {
527 if ent.value.is_none() {
529 return Ok(Some(false));
530 }
531
532 Ok(Some(true))
534 }
535 None => {
536 if let Some(ref mut conflict_manager) = self.conflict_manager {
539 conflict_manager.mark_read_comparable(key);
540 }
541
542 Ok(None)
543 }
544 }
545 }
546
547 pub fn get_comparable_blocking<'a, 'b: 'a, Q>(
550 &'a mut self,
551 key: &'b Q,
552 ) -> Result<Option<EntryRef<'a, K, V>>, TransactionError<C::Error, P::Error>>
553 where
554 K: Borrow<Q>,
555 Q: ?Sized + Ord,
556 {
557 if let Some((k, e)) = self
558 .pending_writes
559 .as_ref()
560 .unwrap()
561 .get_entry_comparable(key)
562 .map_err(TransactionError::Pwm)?
563 {
564 if e.value.is_none() {
566 return Ok(None);
567 }
568
569 Ok(Some(EntryRef {
571 data: match &e.value {
572 Some(value) => EntryDataRef::Insert { key: k, value },
573 None => EntryDataRef::Remove(k),
574 },
575 version: e.version,
576 }))
577 } else {
578 if let Some(ref mut conflict_manager) = self.conflict_manager {
581 conflict_manager.mark_read_comparable(key);
582 }
583
584 Ok(None)
585 }
586 }
587}