infinitree/fields/versioned/
list.rs

1//! A concurrent, incremental linked list implementation
2use crate::{
3    fields::{
4        depth::Incremental, Collection, Intent, Load, LocalField, SparseField, Store, Strategy,
5        Value,
6    },
7    index::{FieldWriter, Transaction},
8    object::{self, serializer::SizedPointer, ObjectError},
9};
10use scc::{
11    ebr::{AtomicShared, Guard, Ptr, Shared, Tag},
12    LinkedList as SCCLinkedList,
13};
14use std::{
15    ops::Deref,
16    sync::{atomic::Ordering, Arc},
17};
18
19#[derive(Clone, Default)]
20pub struct Node<T: 'static>(AtomicShared<Node<T>>, T);
21impl<T: 'static> SCCLinkedList for Node<T> {
22    fn link_ref(&self) -> &AtomicShared<Node<T>> {
23        &self.0
24    }
25}
26
27#[allow(unused)]
28impl<T: 'static> Node<T> {
29    fn set_next(&self, next: Shared<Node<T>>, barrier: &Guard) {
30        let _ = self.push_back(next, false, Ordering::Release, barrier);
31    }
32
33    fn insert(&self, value: impl Into<T>) {
34        let barrier = Guard::new();
35        self.set_next(
36            Shared::new(Node(AtomicShared::null(), value.into())),
37            &barrier,
38        );
39    }
40
41    pub fn is_last(&self) -> bool {
42        self.0.is_null(Ordering::Acquire)
43    }
44
45    fn next(&self) -> Option<Shared<Node<T>>> {
46        let barrier = Guard::new();
47        self.0.load(Ordering::Acquire, &barrier).get_shared()
48    }
49}
50
51#[derive(Default)]
52struct NodeIter<T: 'static> {
53    current: Option<Shared<Node<Arc<T>>>>,
54}
55
56impl<T: 'static> Iterator for NodeIter<T> {
57    type Item = Arc<T>;
58
59    fn next(&mut self) -> Option<Self::Item> {
60        let value = self.current.as_ref().map(|node| node.1.clone());
61        self.current = self.current.as_deref().and_then(Node::next);
62
63        value
64    }
65}
66
67impl<T: 'static> Deref for Node<Arc<T>> {
68    type Target = T;
69
70    #[inline]
71    fn deref(&self) -> &Self::Target {
72        self.1.deref()
73    }
74}
75
76struct LinkedListInner<T: 'static> {
77    last: AtomicShared<Node<Arc<T>>>,
78    commit_start: AtomicShared<Node<Arc<T>>>,
79    previous_commit_last: AtomicShared<Node<Arc<T>>>,
80    first: AtomicShared<Node<Arc<T>>>,
81}
82
83impl<T: 'static> Default for LinkedListInner<T> {
84    fn default() -> Self {
85        Self {
86            last: AtomicShared::null(),
87            commit_start: AtomicShared::null(),
88            previous_commit_last: AtomicShared::null(),
89            first: AtomicShared::null(),
90        }
91    }
92}
93
94/// Append-only linked list that only commits incremental changes
95#[derive(Clone)]
96pub struct LinkedList<T: 'static> {
97    inner: Shared<LinkedListInner<T>>,
98}
99
100impl<T: 'static> Default for LinkedList<T> {
101    fn default() -> Self {
102        Self {
103            inner: Shared::new(LinkedListInner::default()),
104        }
105    }
106}
107
108impl<T: 'static> LinkedList<T> {
109    /// Add a new item to the list
110    ///
111    /// # Examples
112    ///
113    /// ```
114    /// use infinitree::fields::LinkedList;
115    ///
116    /// let list = LinkedList::default();
117    /// list.push(123456);
118    ///
119    /// assert_eq!(list.last(), Some(123456.into()))
120    ///
121    /// ```
122    pub fn push(&self, value: impl Into<Arc<T>>) {
123        let node = Shared::new(Node(AtomicShared::default(), value.into()));
124        let barrier = Guard::new();
125
126        let _ = self
127            .inner
128            .commit_start
129            .compare_exchange(
130                Ptr::null(),
131                (Some(node.clone()), Tag::None),
132                Ordering::SeqCst,
133                Ordering::Relaxed,
134                &barrier,
135            )
136            .and_then(|_| {
137                self.inner.first.compare_exchange(
138                    Ptr::null(),
139                    (Some(node.clone()), Tag::None),
140                    Ordering::SeqCst,
141                    Ordering::Relaxed,
142                    &barrier,
143                )
144            });
145
146        let barrier = Guard::new();
147        let ptr = self.inner.last.load(Ordering::Acquire, &barrier);
148        self.inner
149            .last
150            .swap((Some(node.clone()), Tag::None), Ordering::Release);
151
152        if let Some(ptr) = ptr.as_ref() {
153            ptr.set_next(node, &barrier);
154        }
155    }
156
157    /// Gets the first item of the current commit
158    ///
159    /// # Examples
160    ///
161    /// ```
162    /// use infinitree::fields::LinkedList;
163    ///
164    /// let list = LinkedList::default();
165    ///
166    /// list.push(123456);
167    /// assert_eq!(list.first_in_commit(), Some(123456.into()));
168
169    ///
170    /// list.push(111111);
171    /// assert_eq!(list.first_in_commit(), Some(123456.into()));
172    ///
173    /// list.commit();
174    /// assert_eq!(list.first_in_commit(), None);
175    ///
176    /// list.push(654321);
177    /// assert_eq!(list.first_in_commit(), Some(654321.into()));
178    ///
179    /// ```
180    pub fn first_in_commit(&self) -> Option<Arc<T>> {
181        let barrier = Guard::new();
182        self.inner
183            .commit_start
184            .load(Ordering::Acquire, &barrier)
185            .as_ref()
186            .map(|node| node.1.clone())
187    }
188
189    /// Gets the first item of the linked list
190    ///
191    /// # Examples
192    ///
193    /// ```
194    /// use infinitree::fields::LinkedList;
195    ///
196    /// let list = LinkedList::default();
197    /// list.push(123456);
198    ///
199    /// assert_eq!(list.first(), Some(123456.into()));
200    ///
201    /// list.push(111111);
202    ///
203    /// assert_eq!(list.first(), Some(123456.into()));
204    /// ```
205    pub fn first(&self) -> Option<Arc<T>> {
206        let barrier = Guard::new();
207        self.inner
208            .first
209            .load(Ordering::Acquire, &barrier)
210            .as_ref()
211            .map(|node| node.1.clone())
212    }
213
214    /// Gets the last item of the linked list
215    ///
216    /// # Examples
217    ///
218    /// ```
219    /// use infinitree::fields::LinkedList;
220    ///
221    /// let list = LinkedList::default();
222    ///
223    /// list.push(123456);
224    /// assert_eq!(list.last(), Some(123456.into()));
225    ///
226    /// list.push(111111);
227    /// assert_eq!(list.last(), Some(111111.into()));
228    /// ```
229    pub fn last(&self) -> Option<Arc<T>> {
230        let barrier = Guard::new();
231        self.inner
232            .last
233            .load(Ordering::Acquire, &barrier)
234            .as_ref()
235            .map(|node| node.1.clone())
236    }
237
238    /// Move the commit pointer to the last item in the list
239    ///
240    /// # Examples
241    ///
242    /// ```
243    /// use infinitree::fields::LinkedList;
244    ///
245    /// let list = LinkedList::default();
246    ///
247    /// list.push(123456);
248    /// assert_eq!(list.first_in_commit(), Some(123456.into()));
249    ///
250    /// list.push(111111);
251    /// assert_eq!(list.first_in_commit(), Some(123456.into()));
252    ///
253    /// list.commit();
254    /// assert_eq!(list.first_in_commit(), None);
255    ///
256    /// list.push(654321);
257    /// assert_eq!(list.first_in_commit(), Some(654321.into()));
258    /// ```
259    pub fn commit(&self) {
260        let barrier = Guard::new();
261        let last = self
262            .inner
263            .last
264            .load(Ordering::SeqCst, &barrier)
265            .get_shared();
266        self.inner
267            .commit_start
268            .swap((None, Tag::None), Ordering::SeqCst);
269        self.inner
270            .previous_commit_last
271            .swap((last, Tag::None), Ordering::SeqCst);
272    }
273
274    /// Move the commit pointer to the last item in the list
275    ///
276    /// # Examples
277    ///
278    /// ```
279    /// use infinitree::fields::LinkedList;
280    ///
281    /// let list = LinkedList::default();
282    ///
283    /// list.push(123456);
284    /// list.push(111111);
285    /// list.commit();
286    /// list.push(654321);
287    /// assert_eq!(list.first_in_commit(), Some(654321.into()));
288    /// assert_eq!(list.last(), Some(654321.into()));
289    /// assert_eq!(list.first(), Some(123456.into()));
290    ///
291    /// list.clear();
292    /// assert_eq!(list.first_in_commit(), None);
293    /// assert_eq!(list.first(),None);
294    /// assert_eq!(list.last(), None);
295    /// ```
296    pub fn clear(&self) {
297        self.inner.first.swap((None, Tag::None), Ordering::SeqCst);
298        self.inner
299            .commit_start
300            .swap((None, Tag::None), Ordering::SeqCst);
301        self.inner
302            .previous_commit_last
303            .swap((None, Tag::None), Ordering::SeqCst);
304        self.inner.last.swap((None, Tag::None), Ordering::SeqCst);
305    }
306
307    /// Move the commit pointer to the last item in the list
308    ///
309    /// # Examples
310    ///
311    /// ```
312    /// use infinitree::fields::LinkedList;
313    ///
314    /// let list = LinkedList::default();
315    ///
316    /// list.push(123456);
317    /// list.push(111111);
318    /// list.commit();
319    /// list.push(654321);
320    /// assert_eq!(list.first_in_commit(), Some(654321.into()));
321    /// assert_eq!(list.last(), Some(654321.into()));
322    /// assert_eq!(list.first(), Some(123456.into()));
323    ///
324    /// list.rollback();
325    /// assert_eq!(list.first_in_commit(), None);
326    /// assert_eq!(list.first(), Some(123456.into()));
327    /// assert_eq!(list.last(), Some(111111.into()));
328    /// ```
329    pub fn rollback(&self) {
330        let barrier = Guard::new();
331        let last = self
332            .inner
333            .previous_commit_last
334            .load(Ordering::SeqCst, &barrier)
335            .get_shared();
336        self.inner.last.swap((last, Tag::None), Ordering::SeqCst);
337        self.inner
338            .commit_start
339            .swap((None, Tag::None), Ordering::SeqCst);
340    }
341
342    pub fn iter(&self) -> impl Iterator<Item = Arc<T>> {
343        let barrier = Guard::new();
344        NodeIter {
345            current: self
346                .inner
347                .first
348                .load(Ordering::Acquire, &barrier)
349                .get_shared(),
350        }
351    }
352}
353
354impl<T> Store for LocalField<LinkedList<T>>
355where
356    T: Value,
357{
358    fn store(&mut self, mut transaction: &mut dyn Transaction, _object: &mut dyn object::Writer) {
359        for v in self.field.iter() {
360            transaction.write_next(v);
361        }
362
363        self.field.commit();
364    }
365}
366
367impl<T> Collection for LocalField<LinkedList<T>>
368where
369    T: Value + Clone,
370{
371    type Depth = Incremental;
372    type Key = T;
373    type Serialized = T;
374    type Item = T;
375
376    fn key(from: &Self::Serialized) -> &Self::Key {
377        from
378    }
379
380    fn load(from: Self::Serialized, _object: &mut dyn object::Reader) -> Self::Item {
381        from
382    }
383
384    fn insert(&mut self, record: Self::Item) {
385        self.field.push(record);
386    }
387}
388
389impl<T> Store for SparseField<LinkedList<T>>
390where
391    T: Value,
392{
393    fn store(&mut self, mut transaction: &mut dyn Transaction, writer: &mut dyn object::Writer) {
394        for v in self.field.iter() {
395            let ptr = object::serializer::write(
396                writer,
397                |x| {
398                    crate::serialize_to_vec(&x).map_err(|e| ObjectError::Serialize {
399                        source: Box::new(e),
400                    })
401                },
402                v,
403            )
404            .unwrap();
405
406            transaction.write_next(ptr);
407        }
408
409        self.field.commit();
410    }
411}
412
413impl<T> Collection for SparseField<LinkedList<T>>
414where
415    T: Value + Clone,
416{
417    type Depth = Incremental;
418    type Key = SizedPointer;
419    type Serialized = SizedPointer;
420    type Item = T;
421
422    fn key(from: &Self::Serialized) -> &Self::Key {
423        from
424    }
425
426    fn load(from: Self::Serialized, object: &mut dyn object::Reader) -> Self::Item {
427        object::serializer::read(
428            object,
429            |x| {
430                crate::deserialize_from_slice(x).map_err(|e| ObjectError::Deserialize {
431                    source: Box::new(e),
432                })
433            },
434            from,
435        )
436        .unwrap()
437    }
438
439    fn insert(&mut self, record: Self::Item) {
440        self.field.push(record);
441    }
442}
443
444impl<T> crate::Index for LinkedList<T>
445where
446    T: 'static + Value + Clone,
447{
448    fn store_all(&self) -> anyhow::Result<Vec<Intent<Box<dyn Store>>>> {
449        Ok(vec![Intent::new(
450            "root",
451            Box::new(LocalField::for_field(self)),
452        )])
453    }
454
455    fn load_all(&self) -> anyhow::Result<Vec<Intent<Box<dyn Load>>>> {
456        Ok(vec![Intent::new(
457            "root",
458            Box::new(LocalField::for_field(self)),
459        )])
460    }
461}
462
463#[cfg(test)]
464mod test {
465    use super::LinkedList;
466    use crate::{
467        fields::{LocalField, SparseField, Strategy},
468        index::test::store_then_load,
469    };
470
471    type TestList = LinkedList<usize>;
472    fn init_list(store: &TestList) {
473        store.push(123454321);
474        store.push(123456791);
475        store.commit();
476        store.push(123456790);
477        store.push(987654321);
478        assert_eq!(store.iter().count(), 4);
479    }
480
481    crate::len_check_test!(TestList, LocalField, init_list, |l: TestList| {
482        let mut x = 0;
483        for _ in l.iter() {
484            x += 1;
485        }
486        x
487    });
488    crate::len_check_test!(TestList, SparseField, init_list, |l: TestList| {
489        let mut x = 0;
490        for _ in l.iter() {
491            x += 1;
492        }
493        x
494    });
495}