1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
use core::{cmp::max, fmt::Debug, marker::PhantomData, ptr::null_mut, sync::atomic::Ordering};
extern crate alloc;
use alloc::alloc::{Allocator, Global};

use self::{reader::ReadHandle, writer::WriteHandle};
use crate::{
    max_len, split_idx,
    sync::{Arc, AtomicPtr, AtomicUsize},
    Inner,
};

///Iterate over a Stele by Reference or by Value (for copy types)
pub mod iter;
///Implementation details for [`ReadHandle`]
pub mod reader;
///Implementation details for [`WriteHandle`]
pub mod writer;

/// A [`Stele`] is an append-only data structure that allows for zero copying after by having a set of
/// pointers to power-of-two sized blocks of `T` such that the capacity still doubles each time but
/// there is no need to copy the old data over.
///
/// The trade-off for this is that the [`Stele`] must hold a slot for up to [`usize::BITS`]
/// pointers, which does increase the memory footprint.
#[derive(Debug)]
pub struct Stele<T, A: Allocator = Global> {
    inners: [AtomicPtr<Inner<T>>; 32],
    len: AtomicUsize,
    allocator: A,
}

//SAFETY: If `T` is both `Send` and `Sync`, it is safe to both move the
//array of inners and hand out references to the contained elements.
unsafe impl<T, A: Allocator> Send for Stele<T, A> where T: Send + Sync {}
unsafe impl<T, A: Allocator> Sync for Stele<T, A> where T: Send + Sync {}

impl<T> Stele<T> {
    #[allow(clippy::new_ret_no_self)]
    #[must_use]
    /// Creates a new Stele returns a [`WriteHandle`] and [`ReadHandle`]
    pub fn new() -> (WriteHandle<T>, ReadHandle<T>) {
        let s = Arc::new(Self {
            inners: [(); 32].map(|_| crate::sync::AtomicPtr::new(null_mut())),
            len: AtomicUsize::new(0),
            allocator: Global,
        });
        let h = WriteHandle {
            handle: Arc::clone(&s),
            _unsync: PhantomData,
        };
        let r = ReadHandle { handle: s };
        (h, r)
    }
}

impl<T, A: Allocator> Stele<T, A> {
    //Taken from the standard libraries small vector optimization
    const INITIAL_SIZE: usize = {
        match core::mem::size_of::<T>() {
            1 => 3,
            2..=1023 => 2,
            _ => 1,
        }
    };

    /// Creates a new Stele with the given allocator and returns a [`WriteHandle`] and [`ReadHandle`]
    pub fn new_in(allocator: A) -> (WriteHandle<T, A>, ReadHandle<T, A>) {
        let s = Arc::new(Self {
            inners: [(); 32].map(|_| crate::sync::AtomicPtr::new(null_mut())),
            len: AtomicUsize::new(0),
            allocator,
        });
        let h = WriteHandle {
            handle: Arc::clone(&s),
            _unsync: PhantomData,
        };
        let r = ReadHandle { handle: s };
        (h, r)
    }

    /// Creates a pair of handles from an owned Stele after using [`FromIterator`](core::iter::FromIterator)
    pub fn to_handles(self) -> (WriteHandle<T, A>, ReadHandle<T, A>) {
        let s = Arc::new(self);
        let h = WriteHandle {
            handle: Arc::clone(&s),
            _unsync: PhantomData,
        };
        let r = ReadHandle { handle: s };
        (h, r)
    }

    /// SAFETY: You must only call `push` once at a time to avoid write-write conflicts
    unsafe fn push(&self, val: T) {
        let idx = self.len.load(Ordering::Acquire);
        let (outer_idx, inner_idx) = split_idx(idx);
        //SAFETY: By only incrementing the index after appending the element we ensure that we never allow reads to access unwritten memory
        //and by the safety contract of `push` we know we aren't writing to the same spot multiple times
        unsafe {
            if (idx.is_power_of_two() && outer_idx > Self::INITIAL_SIZE)
                || (outer_idx <= Self::INITIAL_SIZE && self.is_empty())
            {
                self.allocate(outer_idx, max_len(outer_idx));
            }
            *self.inners[outer_idx]
                .load(Ordering::Acquire)
                .add(inner_idx) = crate::Inner::new(val);
        }
        self.len.store(idx + 1, Ordering::Release);
    }

    fn allocate(&self, idx: usize, len: usize) {
        if idx == 0 {
            (0..=Self::INITIAL_SIZE).for_each(|i| {
                self.inners[i].compare_exchange(
                    core::ptr::null_mut(),
                    unsafe { crate::mem::alloc_inner(&self.allocator, max_len(i))},
                    Ordering::AcqRel,
                    Ordering::Relaxed)
                    .expect("The pointer is null because we have just incremented the cap to the head of this pointer");
            });
        } else {
            self.inners[idx]
            .compare_exchange(
                core::ptr::null_mut(),
                unsafe { crate::mem::alloc_inner(&self.allocator, len) },
                Ordering::AcqRel,
                Ordering::Relaxed,
            )
            .expect("The pointer is null because we have just incremented the cap to the head of this pointer");
        }
    }

    pub(crate) fn read(&self, idx: usize) -> &T {
        debug_assert!(self.len.load(Ordering::Acquire) > idx);
        unsafe { (*self.read_raw(idx)).read() }
    }

    pub(crate) fn try_read(&self, idx: usize) -> Option<&T> {
        if idx >= self.len() {
            None
        } else {
            //SAFETY: Null pointers return None from mut_ptr::as_ref()
            unsafe { Some(self.read_raw(idx).as_ref()?.read()) }
        }
    }

    pub(crate) fn len(&self) -> usize {
        self.len.load(Ordering::Acquire)
    }

    pub(crate) fn is_empty(&self) -> bool {
        self.len() == 0
    }

    unsafe fn read_raw(&self, idx: usize) -> *mut crate::Inner<T> {
        let (outer_idx, inner_idx) = crate::split_idx(idx);
        unsafe {
            self.inners[outer_idx]
                .load(Ordering::Acquire)
                .add(inner_idx)
        }
    }
}

impl<T: Copy, A: Allocator> Stele<T, A> {
    pub(crate) fn get(&self, idx: usize) -> T {
        debug_assert!(self.len.load(Ordering::Acquire) > idx);
        unsafe { (*self.read_raw(idx)).get() }
    }
}

impl<T> core::iter::FromIterator<T> for Stele<T> {
    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
        let s = Stele {
            inners: [(); 32].map(|_| AtomicPtr::new(null_mut())),
            len: AtomicUsize::new(0),
            allocator: Global,
        };
        for item in iter {
            //SAFETY: We are the only writer since we just created the Stele
            unsafe { s.push(item) };
        }
        s
    }
}

impl<T, A: Allocator> Drop for Stele<T, A> {
    fn drop(&mut self) {
        #[cfg(not(loom))]
        let size = *self.len.get_mut();
        #[cfg(loom)]
        let size = unsafe { self.len.unsync_load() };
        if size == 0 {
            return;
        }
        let num_inners = max(
            (usize::BITS as usize) - (size.next_power_of_two().leading_zeros() as usize),
            Self::INITIAL_SIZE + 1,
        );
        for idx in 0..num_inners {
            #[cfg(not(loom))]
            unsafe {
                crate::mem::dealloc_inner(
                    &self.allocator,
                    *self.inners[idx].get_mut(),
                    max_len(idx),
                );
            }
            #[cfg(loom)]
            unsafe {
                crate::mem::dealloc_inner(
                    &self.allocator,
                    self.inners[idx].unsync_load(),
                    max_len(idx),
                );
            }
        }
    }
}