infinitree/fields/versioned/
list.rs1use crate::{
3 fields::{
4 depth::Incremental, Collection, Intent, Load, LocalField, SparseField, Store, Strategy,
5 Value,
6 },
7 index::{FieldWriter, Transaction},
8 object::{self, serializer::SizedPointer, ObjectError},
9};
10use scc::{
11 ebr::{AtomicShared, Guard, Ptr, Shared, Tag},
12 LinkedList as SCCLinkedList,
13};
14use std::{
15 ops::Deref,
16 sync::{atomic::Ordering, Arc},
17};
18
19#[derive(Clone, Default)]
20pub struct Node<T: 'static>(AtomicShared<Node<T>>, T);
21impl<T: 'static> SCCLinkedList for Node<T> {
22 fn link_ref(&self) -> &AtomicShared<Node<T>> {
23 &self.0
24 }
25}
26
27#[allow(unused)]
28impl<T: 'static> Node<T> {
29 fn set_next(&self, next: Shared<Node<T>>, barrier: &Guard) {
30 let _ = self.push_back(next, false, Ordering::Release, barrier);
31 }
32
33 fn insert(&self, value: impl Into<T>) {
34 let barrier = Guard::new();
35 self.set_next(
36 Shared::new(Node(AtomicShared::null(), value.into())),
37 &barrier,
38 );
39 }
40
41 pub fn is_last(&self) -> bool {
42 self.0.is_null(Ordering::Acquire)
43 }
44
45 fn next(&self) -> Option<Shared<Node<T>>> {
46 let barrier = Guard::new();
47 self.0.load(Ordering::Acquire, &barrier).get_shared()
48 }
49}
50
51#[derive(Default)]
52struct NodeIter<T: 'static> {
53 current: Option<Shared<Node<Arc<T>>>>,
54}
55
56impl<T: 'static> Iterator for NodeIter<T> {
57 type Item = Arc<T>;
58
59 fn next(&mut self) -> Option<Self::Item> {
60 let value = self.current.as_ref().map(|node| node.1.clone());
61 self.current = self.current.as_deref().and_then(Node::next);
62
63 value
64 }
65}
66
67impl<T: 'static> Deref for Node<Arc<T>> {
68 type Target = T;
69
70 #[inline]
71 fn deref(&self) -> &Self::Target {
72 self.1.deref()
73 }
74}
75
76struct LinkedListInner<T: 'static> {
77 last: AtomicShared<Node<Arc<T>>>,
78 commit_start: AtomicShared<Node<Arc<T>>>,
79 previous_commit_last: AtomicShared<Node<Arc<T>>>,
80 first: AtomicShared<Node<Arc<T>>>,
81}
82
83impl<T: 'static> Default for LinkedListInner<T> {
84 fn default() -> Self {
85 Self {
86 last: AtomicShared::null(),
87 commit_start: AtomicShared::null(),
88 previous_commit_last: AtomicShared::null(),
89 first: AtomicShared::null(),
90 }
91 }
92}
93
94#[derive(Clone)]
96pub struct LinkedList<T: 'static> {
97 inner: Shared<LinkedListInner<T>>,
98}
99
100impl<T: 'static> Default for LinkedList<T> {
101 fn default() -> Self {
102 Self {
103 inner: Shared::new(LinkedListInner::default()),
104 }
105 }
106}
107
108impl<T: 'static> LinkedList<T> {
109 pub fn push(&self, value: impl Into<Arc<T>>) {
123 let node = Shared::new(Node(AtomicShared::default(), value.into()));
124 let barrier = Guard::new();
125
126 let _ = self
127 .inner
128 .commit_start
129 .compare_exchange(
130 Ptr::null(),
131 (Some(node.clone()), Tag::None),
132 Ordering::SeqCst,
133 Ordering::Relaxed,
134 &barrier,
135 )
136 .and_then(|_| {
137 self.inner.first.compare_exchange(
138 Ptr::null(),
139 (Some(node.clone()), Tag::None),
140 Ordering::SeqCst,
141 Ordering::Relaxed,
142 &barrier,
143 )
144 });
145
146 let barrier = Guard::new();
147 let ptr = self.inner.last.load(Ordering::Acquire, &barrier);
148 self.inner
149 .last
150 .swap((Some(node.clone()), Tag::None), Ordering::Release);
151
152 if let Some(ptr) = ptr.as_ref() {
153 ptr.set_next(node, &barrier);
154 }
155 }
156
157 pub fn first_in_commit(&self) -> Option<Arc<T>> {
181 let barrier = Guard::new();
182 self.inner
183 .commit_start
184 .load(Ordering::Acquire, &barrier)
185 .as_ref()
186 .map(|node| node.1.clone())
187 }
188
189 pub fn first(&self) -> Option<Arc<T>> {
206 let barrier = Guard::new();
207 self.inner
208 .first
209 .load(Ordering::Acquire, &barrier)
210 .as_ref()
211 .map(|node| node.1.clone())
212 }
213
214 pub fn last(&self) -> Option<Arc<T>> {
230 let barrier = Guard::new();
231 self.inner
232 .last
233 .load(Ordering::Acquire, &barrier)
234 .as_ref()
235 .map(|node| node.1.clone())
236 }
237
238 pub fn commit(&self) {
260 let barrier = Guard::new();
261 let last = self
262 .inner
263 .last
264 .load(Ordering::SeqCst, &barrier)
265 .get_shared();
266 self.inner
267 .commit_start
268 .swap((None, Tag::None), Ordering::SeqCst);
269 self.inner
270 .previous_commit_last
271 .swap((last, Tag::None), Ordering::SeqCst);
272 }
273
274 pub fn clear(&self) {
297 self.inner.first.swap((None, Tag::None), Ordering::SeqCst);
298 self.inner
299 .commit_start
300 .swap((None, Tag::None), Ordering::SeqCst);
301 self.inner
302 .previous_commit_last
303 .swap((None, Tag::None), Ordering::SeqCst);
304 self.inner.last.swap((None, Tag::None), Ordering::SeqCst);
305 }
306
307 pub fn rollback(&self) {
330 let barrier = Guard::new();
331 let last = self
332 .inner
333 .previous_commit_last
334 .load(Ordering::SeqCst, &barrier)
335 .get_shared();
336 self.inner.last.swap((last, Tag::None), Ordering::SeqCst);
337 self.inner
338 .commit_start
339 .swap((None, Tag::None), Ordering::SeqCst);
340 }
341
342 pub fn iter(&self) -> impl Iterator<Item = Arc<T>> {
343 let barrier = Guard::new();
344 NodeIter {
345 current: self
346 .inner
347 .first
348 .load(Ordering::Acquire, &barrier)
349 .get_shared(),
350 }
351 }
352}
353
354impl<T> Store for LocalField<LinkedList<T>>
355where
356 T: Value,
357{
358 fn store(&mut self, mut transaction: &mut dyn Transaction, _object: &mut dyn object::Writer) {
359 for v in self.field.iter() {
360 transaction.write_next(v);
361 }
362
363 self.field.commit();
364 }
365}
366
367impl<T> Collection for LocalField<LinkedList<T>>
368where
369 T: Value + Clone,
370{
371 type Depth = Incremental;
372 type Key = T;
373 type Serialized = T;
374 type Item = T;
375
376 fn key(from: &Self::Serialized) -> &Self::Key {
377 from
378 }
379
380 fn load(from: Self::Serialized, _object: &mut dyn object::Reader) -> Self::Item {
381 from
382 }
383
384 fn insert(&mut self, record: Self::Item) {
385 self.field.push(record);
386 }
387}
388
389impl<T> Store for SparseField<LinkedList<T>>
390where
391 T: Value,
392{
393 fn store(&mut self, mut transaction: &mut dyn Transaction, writer: &mut dyn object::Writer) {
394 for v in self.field.iter() {
395 let ptr = object::serializer::write(
396 writer,
397 |x| {
398 crate::serialize_to_vec(&x).map_err(|e| ObjectError::Serialize {
399 source: Box::new(e),
400 })
401 },
402 v,
403 )
404 .unwrap();
405
406 transaction.write_next(ptr);
407 }
408
409 self.field.commit();
410 }
411}
412
413impl<T> Collection for SparseField<LinkedList<T>>
414where
415 T: Value + Clone,
416{
417 type Depth = Incremental;
418 type Key = SizedPointer;
419 type Serialized = SizedPointer;
420 type Item = T;
421
422 fn key(from: &Self::Serialized) -> &Self::Key {
423 from
424 }
425
426 fn load(from: Self::Serialized, object: &mut dyn object::Reader) -> Self::Item {
427 object::serializer::read(
428 object,
429 |x| {
430 crate::deserialize_from_slice(x).map_err(|e| ObjectError::Deserialize {
431 source: Box::new(e),
432 })
433 },
434 from,
435 )
436 .unwrap()
437 }
438
439 fn insert(&mut self, record: Self::Item) {
440 self.field.push(record);
441 }
442}
443
444impl<T> crate::Index for LinkedList<T>
445where
446 T: 'static + Value + Clone,
447{
448 fn store_all(&self) -> anyhow::Result<Vec<Intent<Box<dyn Store>>>> {
449 Ok(vec![Intent::new(
450 "root",
451 Box::new(LocalField::for_field(self)),
452 )])
453 }
454
455 fn load_all(&self) -> anyhow::Result<Vec<Intent<Box<dyn Load>>>> {
456 Ok(vec![Intent::new(
457 "root",
458 Box::new(LocalField::for_field(self)),
459 )])
460 }
461}
462
463#[cfg(test)]
464mod test {
465 use super::LinkedList;
466 use crate::{
467 fields::{LocalField, SparseField, Strategy},
468 index::test::store_then_load,
469 };
470
471 type TestList = LinkedList<usize>;
472 fn init_list(store: &TestList) {
473 store.push(123454321);
474 store.push(123456791);
475 store.commit();
476 store.push(123456790);
477 store.push(987654321);
478 assert_eq!(store.iter().count(), 4);
479 }
480
481 crate::len_check_test!(TestList, LocalField, init_list, |l: TestList| {
482 let mut x = 0;
483 for _ in l.iter() {
484 x += 1;
485 }
486 x
487 });
488 crate::len_check_test!(TestList, SparseField, init_list, |l: TestList| {
489 let mut x = 0;
490 for _ in l.iter() {
491 x += 1;
492 }
493 x
494 });
495}