Skip to main content

sdd/
linked_list.rs

1//! [`LinkedList`] is a trait that implements lock-free concurrent singly linked list operations.
2
3use std::fmt::{self, Debug, Display};
4use std::ops::{Deref, DerefMut};
5use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed};
6
7use super::{AtomicShared, Guard, Ptr, Shared, Tag};
8
9/// [`LinkedList`] is a trait that implements lock-free singly linked list operations.
10pub trait LinkedList: Sized {
11    /// Returns a reference to the forward link.
12    ///
13    /// The pointer value may be tagged if [`Self::mark`] or [`Self::delete_self`] has been
14    /// invoked. The [`AtomicShared`] must only be updated through [`LinkedList`] in order to keep
15    /// the linked list consistent.
16    fn link_ref(&self) -> &AtomicShared<Self>;
17
18    /// Returns `true` if `self` is reachable and not marked.
19    ///
20    /// # Examples
21    ///
22    /// ```
23    /// use std::sync::atomic::Ordering::Relaxed;
24    ///
25    /// use sdd::{AtomicShared, LinkedList, Tag};
26    ///
27    /// #[derive(Default)]
28    /// struct L(AtomicShared<L>, usize);
29    /// impl LinkedList for L {
30    ///     fn link_ref(&self) -> &AtomicShared<L> {
31    ///         &self.0
32    ///     }
33    /// }
34    ///
35    /// let head: L = L::default();
36    /// assert!(head.is_clear(Relaxed));
37    /// assert!(head.mark(Relaxed));
38    /// assert!(!head.is_clear(Relaxed));
39    /// assert!(head.delete_self(Relaxed));
40    /// assert!(!head.is_clear(Relaxed));
41    /// ```
42    #[inline]
43    fn is_clear(&self, order: Ordering) -> bool {
44        self.link_ref().tag(order) == Tag::None
45    }
46
47    /// Marks `self` with an internal flag to denote that `self` is in a special state.
48    ///
49    /// Returns `false` if a flag has already been set on `self`.
50    ///
51    /// # Examples
52    ///
53    /// ```
54    /// use std::sync::atomic::Ordering::Relaxed;
55    ///
56    /// use sdd::{AtomicShared, LinkedList};
57    ///
58    /// #[derive(Default)]
59    /// struct L(AtomicShared<L>, usize);
60    /// impl LinkedList for L {
61    ///     fn link_ref(&self) -> &AtomicShared<L> {
62    ///         &self.0
63    ///     }
64    /// }
65    ///
66    /// let head: L = L::default();
67    /// assert!(head.mark(Relaxed));
68    /// ```
69    #[inline]
70    fn mark(&self, order: Ordering) -> bool {
71        self.link_ref()
72            .update_tag_if(Tag::First, |ptr| ptr.tag() == Tag::None, order, Relaxed)
73    }
74
75    /// Removes any mark from `self`.
76    ///
77    /// Returns `false` if no flag has been set on `self`.
78    ///
79    /// # Examples
80    ///
81    /// ```
82    /// use std::sync::atomic::Ordering::Relaxed;
83    ///
84    /// use sdd::{AtomicShared, LinkedList};
85    ///
86    /// #[derive(Default)]
87    /// struct L(AtomicShared<L>, usize);
88    /// impl LinkedList for L {
89    ///     fn link_ref(&self) -> &AtomicShared<L> {
90    ///         &self.0
91    ///     }
92    /// }
93    ///
94    /// let head: L = L::default();
95    /// assert!(!head.unmark(Relaxed));
96    /// assert!(head.mark(Relaxed));
97    /// assert!(head.unmark(Relaxed));
98    /// assert!(!head.is_marked(Relaxed));
99    /// ```
100    #[inline]
101    fn unmark(&self, order: Ordering) -> bool {
102        self.link_ref()
103            .update_tag_if(Tag::None, |ptr| ptr.tag() == Tag::First, order, Relaxed)
104    }
105
106    /// Returns `true` if `self` has a mark on it.
107    ///
108    /// # Examples
109    ///
110    /// ```
111    /// use std::sync::atomic::Ordering::Relaxed;
112    ///
113    /// use sdd::{AtomicShared, LinkedList};
114    ///
115    /// #[derive(Default)]
116    /// struct L(AtomicShared<L>, usize);
117    /// impl LinkedList for L {
118    ///     fn link_ref(&self) -> &AtomicShared<L> {
119    ///         &self.0
120    ///     }
121    /// }
122    ///
123    /// let head: L = L::default();
124    /// assert!(!head.is_marked(Relaxed));
125    /// assert!(head.mark(Relaxed));
126    /// assert!(head.is_marked(Relaxed));
127    /// ```
128    #[inline]
129    fn is_marked(&self, order: Ordering) -> bool {
130        self.link_ref().tag(order) == Tag::First
131    }
132
133    /// Deletes `self`.
134    ///
135    /// Returns `false` if `self` is already marked as deleted.
136    ///
137    /// # Examples
138    ///
139    /// ```
140    /// use std::sync::atomic::Ordering::Relaxed;
141    ///
142    /// use sdd::{AtomicShared, Guard, LinkedList, Shared};
143    ///
144    /// #[derive(Default)]
145    /// struct L(AtomicShared<L>, usize);
146    /// impl LinkedList for L {
147    ///     fn link_ref(&self) -> &AtomicShared<L> {
148    ///         &self.0
149    ///     }
150    /// }
151    ///
152    /// let guard = Guard::new();
153    ///
154    /// let head: L = L::default();
155    /// let tail: Shared<L> = Shared::new(L::default());
156    /// assert!(head.push_back(tail.clone(), false, Relaxed, &guard).is_ok());
157    ///
158    /// tail.delete_self(Relaxed);
159    /// assert!(head.next_ptr(Relaxed, &guard).as_ref().is_none());
160    /// ```
161    #[inline]
162    fn delete_self(&self, order: Ordering) -> bool {
163        self.link_ref().update_tag_if(
164            Tag::Second,
165            |ptr| {
166                let tag = ptr.tag();
167                tag == Tag::None || tag == Tag::First
168            },
169            order,
170            Relaxed,
171        )
172    }
173
174    /// Returns `true` if `self` has been deleted.
175    ///
176    /// # Examples
177    ///
178    /// ```
179    /// use std::sync::atomic::Ordering::Relaxed;
180    ///
181    /// use sdd::{AtomicShared, LinkedList};
182    ///
183    /// #[derive(Default)]
184    /// struct L(AtomicShared<L>, usize);
185    /// impl LinkedList for L {
186    ///     fn link_ref(&self) -> &AtomicShared<L> {
187    ///         &self.0
188    ///     }
189    /// }
190    ///
191    /// let entry: L = L::default();
192    /// assert!(!entry.is_deleted(Relaxed));
193    /// entry.delete_self(Relaxed);
194    /// assert!(entry.is_deleted(Relaxed));
195    /// ```
196    #[inline]
197    fn is_deleted(&self, order: Ordering) -> bool {
198        let tag = self.link_ref().tag(order);
199        tag == Tag::Second || tag == Tag::Both
200    }
201
202    /// Appends the given entry to `self` and returns a pointer to the entry.
203    ///
204    /// If `mark` is `true`, it atomically sets an internal flag on `self` when updating
205    /// the linked list, otherwise it removes the mark.
206    ///
207    /// # Errors
208    ///
209    /// Returns the supplied [`Shared`] when it finds `self` deleted.
210    ///
211    /// # Examples
212    ///
213    /// ```
214    /// use std::sync::atomic::Ordering::{Relaxed, Release};
215    ///
216    /// use sdd::{AtomicShared, Guard, LinkedList, Shared};
217    ///
218    /// #[derive(Default)]
219    /// struct L(AtomicShared<L>, usize);
220    /// impl LinkedList for L {
221    ///     fn link_ref(&self) -> &AtomicShared<L> {
222    ///         &self.0
223    ///     }
224    /// }
225    ///
226    /// let guard = Guard::new();
227    ///
228    /// let head: L = L::default();
229    /// assert!(head.push_back(Shared::new(L::default()), true, Release, &guard).is_ok());
230    /// assert!(head.is_marked(Relaxed));
231    /// assert!(head.push_back(Shared::new(L::default()), false, Release, &guard).is_ok());
232    /// assert!(!head.is_marked(Relaxed));
233    ///
234    /// head.delete_self(Relaxed);
235    /// assert!(!head.is_marked(Relaxed));
236    /// assert!(head.push_back(Shared::new(L::default()), false, Release, &guard).is_err());
237    /// ```
238    #[inline]
239    fn push_back<'g>(
240        &self,
241        mut entry: Shared<Self>,
242        mark: bool,
243        order: Ordering,
244        guard: &'g Guard,
245    ) -> Result<Ptr<'g, Self>, Shared<Self>> {
246        let new_tag = if mark { Tag::First } else { Tag::None };
247        let mut next_ptr = self.link_ref().load(Relaxed, guard);
248
249        loop {
250            let tag = next_ptr.tag();
251            if tag == Tag::Second || tag == Tag::Both {
252                break;
253            }
254
255            entry
256                .link_ref()
257                .swap((next_ptr.get_shared(), Tag::None), Relaxed);
258            match self.link_ref().compare_exchange_weak(
259                next_ptr,
260                (Some(entry), new_tag),
261                order,
262                Relaxed,
263                guard,
264            ) {
265                Ok((_, updated)) => {
266                    return Ok(updated);
267                }
268                Err((passed, actual)) => {
269                    entry = unsafe { passed.unwrap_unchecked() };
270                    next_ptr = actual;
271                }
272            }
273        }
274
275        // `current` has been deleted.
276        Err(entry)
277    }
278
279    /// Returns the closest next valid entry.
280    ///
281    /// It unlinks deleted entries until it reaches a valid one.
282    ///
283    /// # Examples
284    ///
285    /// ```
286    /// use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
287    ///
288    /// use sdd::{AtomicShared, Guard, LinkedList, Shared};
289    ///
290    /// #[derive(Default)]
291    /// struct L(AtomicShared<L>, usize);
292    /// impl LinkedList for L {
293    ///     fn link_ref(&self) -> &AtomicShared<L> {
294    ///         &self.0
295    ///     }
296    /// }
297    ///
298    /// let guard = Guard::new();
299    ///
300    /// let head: L = L::default();
301    /// assert!(
302    ///     head.push_back(Shared::new(L(AtomicShared::null(), 1)), false, Release, &guard).is_ok());
303    /// head.mark(Relaxed);
304    ///
305    /// let next_ptr = head.next_ptr(Acquire, &guard);
306    /// assert_eq!(next_ptr.as_ref().unwrap().1, 1);
307    /// assert!(head.is_marked(Relaxed));
308    /// ```
309    #[inline]
310    fn next_ptr<'g>(&self, order: Ordering, guard: &'g Guard) -> Ptr<'g, Self> {
311        next_ptr_recursive(self, order, 64, guard)
312    }
313
314    /// Returns a [`Shared`] handle to the closest next valid entry.
315    ///
316    /// It unlinks deleted entries until it reaches a valid one.
317    ///
318    /// # Examples
319    ///
320    /// ```
321    /// use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
322    ///
323    /// use sdd::{AtomicShared, Guard, LinkedList, Shared};
324    ///
325    /// #[derive(Default)]
326    /// struct L(AtomicShared<L>, usize);
327    /// impl LinkedList for L {
328    ///     fn link_ref(&self) -> &AtomicShared<L> {
329    ///         &self.0
330    ///     }
331    /// }
332    ///
333    /// let guard = Guard::new();
334    ///
335    /// let head: L = L::default();
336    /// assert!(
337    ///     head.push_back(Shared::new(L(AtomicShared::null(), 1)), false, Release, &guard).is_ok());
338    /// head.mark(Relaxed);
339    ///
340    /// let next_shared = head.next_shared(Acquire, &guard);
341    /// assert_eq!(next_shared.unwrap().1, 1);
342    /// assert!(head.is_marked(Relaxed));
343    /// ```
344    #[inline]
345    fn next_shared(&self, order: Ordering, guard: &Guard) -> Option<Shared<Self>> {
346        let mut next_ptr = self.next_ptr(order, guard);
347        let mut next_entry = next_ptr.get_shared();
348        while !next_ptr.is_null() && next_entry.is_none() {
349            // The entry was released in the mean time.
350            next_ptr = next_ptr
351                .as_ref()
352                .map_or_else(Ptr::null, |n| n.next_ptr(Acquire, guard));
353            next_entry = next_ptr.get_shared();
354        }
355        next_entry
356    }
357}
358
359/// [`LinkedEntry`] stores an instance of `T` and a link to the next entry.
360pub struct LinkedEntry<T> {
361    /// `instance` is always `Some` unless [`Self::take_inner`] is called.
362    instance: Option<T>,
363    /// `next` points to the next entry in a linked list.
364    next: AtomicShared<Self>,
365}
366
367impl<T> LinkedEntry<T> {
368    /// Extracts the inner instance of `T`.
369    ///
370    /// # Safety
371    ///
372    /// This method has to be called at most once per [`LinkedEntry`], and the caller needs to make
373    /// sure that the [`LinkedEntry`] is not accessed via [`LinkedList`] methods.
374    #[inline]
375    pub(crate) unsafe fn take_inner(&mut self) -> T {
376        unsafe { self.instance.take().unwrap_unchecked() }
377    }
378
379    #[inline]
380    pub(super) fn new(val: T) -> Self {
381        Self {
382            instance: Some(val),
383            next: AtomicShared::default(),
384        }
385    }
386
387    /// Returns a reference to `next`.
388    #[inline]
389    pub(super) fn next(&self) -> &AtomicShared<Self> {
390        &self.next
391    }
392}
393
394impl<T> AsRef<T> for LinkedEntry<T> {
395    #[inline]
396    fn as_ref(&self) -> &T {
397        unsafe { self.instance.as_ref().unwrap_unchecked() }
398    }
399}
400
401impl<T> AsMut<T> for LinkedEntry<T> {
402    #[inline]
403    fn as_mut(&mut self) -> &mut T {
404        unsafe { self.instance.as_mut().unwrap_unchecked() }
405    }
406}
407
408impl<T: Clone> Clone for LinkedEntry<T> {
409    #[inline]
410    fn clone(&self) -> Self {
411        Self {
412            instance: self.instance.clone(),
413            next: AtomicShared::default(),
414        }
415    }
416}
417
418impl<T: Debug> Debug for LinkedEntry<T> {
419    #[inline]
420    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
421        f.debug_struct("Entry")
422            .field("instance", &self.instance)
423            .field("next", &self.next)
424            .field("marked", &self.is_marked(Relaxed))
425            .field("removed", &self.is_deleted(Relaxed))
426            .finish()
427    }
428}
429
430impl<T> Deref for LinkedEntry<T> {
431    type Target = T;
432
433    #[inline]
434    fn deref(&self) -> &Self::Target {
435        unsafe { self.instance.as_ref().unwrap_unchecked() }
436    }
437}
438
439impl<T> DerefMut for LinkedEntry<T> {
440    #[inline]
441    fn deref_mut(&mut self) -> &mut Self::Target {
442        unsafe { self.instance.as_mut().unwrap_unchecked() }
443    }
444}
445
446impl<T> Drop for LinkedEntry<T> {
447    #[inline]
448    fn drop(&mut self) {
449        if !self.next.is_null(Relaxed) {
450            let guard = Guard::new();
451            if let Some(next_entry) = self.next.load(Relaxed, &guard).as_ref() {
452                next_ptr_recursive(next_entry, Relaxed, 64, &guard);
453            }
454        }
455    }
456}
457
458impl<T: Display> Display for LinkedEntry<T> {
459    #[inline]
460    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461        if let Some(instance) = self.instance.as_ref() {
462            write!(f, "Some({instance})")
463        } else {
464            write!(f, "None")
465        }
466    }
467}
468
469impl<T: Eq> Eq for LinkedEntry<T> {}
470
471impl<T> LinkedList for LinkedEntry<T> {
472    #[inline]
473    fn link_ref(&self) -> &AtomicShared<Self> {
474        &self.next
475    }
476}
477
478impl<T: PartialEq> PartialEq for LinkedEntry<T> {
479    #[inline]
480    fn eq(&self, other: &Self) -> bool {
481        self.instance == other.instance
482    }
483}
484
485/// Recursively cleans up the linked list starting from the supplied head.
486fn next_ptr_recursive<'g, T: LinkedList>(
487    head: &T,
488    order: Ordering,
489    mut depth: usize,
490    guard: &'g Guard,
491) -> Ptr<'g, T> {
492    let mut head_next_ptr = head.link_ref().load(order, guard);
493    let mut head_tag = head_next_ptr.tag();
494    loop {
495        let mut next_ptr = head_next_ptr;
496        let next_valid_ptr = loop {
497            if let Some(next_ref) = next_ptr.as_ref() {
498                let next_next_ptr = next_ref.link_ref().load(order, guard);
499                if next_next_ptr.tag() != Tag::Second {
500                    break next_ptr;
501                }
502                if depth != 0 {
503                    break next_ptr_recursive(next_ref, order, depth - 1, guard);
504                }
505                next_ptr = next_next_ptr;
506            } else {
507                break Ptr::null();
508            }
509        };
510
511        // Do not allow a recursive call more than once.
512        depth = 0;
513
514        // Updates its link if there is an invalid entry between itself and the next valid one.
515        if next_valid_ptr.with_tag(head_tag) != head_next_ptr {
516            let next_valid_entry = next_valid_ptr.get_shared();
517            if !next_valid_ptr.is_null() && next_valid_entry.is_none() {
518                // The entry was unlinked in the meantime.
519                head_next_ptr = head.link_ref().load(order, guard);
520                head_tag = head_next_ptr.tag();
521                continue;
522            }
523            match head.link_ref().compare_exchange(
524                head_next_ptr,
525                (next_valid_entry, head_tag),
526                AcqRel,
527                Acquire,
528                guard,
529            ) {
530                Ok((prev, _)) => {
531                    if let Some(removed) = prev {
532                        let _: bool = removed.release();
533                    }
534                }
535                Err((_, actual)) => {
536                    head_next_ptr = actual;
537                    head_tag = actual.tag();
538                    continue;
539                }
540            }
541        }
542
543        return next_valid_ptr;
544    }
545}