Skip to main content

vortex_layout/
sequence.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp::Ordering;
5use std::collections::BTreeSet;
6use std::fmt;
7use std::hash::Hash;
8use std::hash::Hasher;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::task::Context;
12use std::task::Poll;
13use std::task::Waker;
14
15use futures::Stream;
16use futures::StreamExt;
17use parking_lot::Mutex;
18use pin_project_lite::pin_project;
19use vortex_array::ArrayRef;
20use vortex_array::dtype::DType;
21use vortex_array::stream::ArrayStream;
22use vortex_error::VortexExpect;
23use vortex_error::VortexResult;
24use vortex_utils::aliases::hash_map::HashMap;
25
26/// A hierarchical sequence identifier that exists within a shared universe.
27///
28/// SequenceIds form a collision-free universe where each ID is represented as a vector
29/// of indices (e.g., `[0, 1, 2]`). The API design prevents collisions by only allowing
30/// new IDs to be created through controlled advancement or descent operations.
31///
32/// # Hierarchy and Ordering
33///
34/// IDs are hierarchical and lexicographically ordered:
35/// - `[0]` < `[0, 0]` < `[0, 1]` < `[1]` < `[1, 0]`
36/// - A parent ID like `[0, 1]` can spawn children `[0, 1, 0]`, `[0, 1, 1]`, etc.
37/// - Sibling IDs are created by advancing: `[0, 0]` → `[0, 1]` → `[0, 2]`
38///
39/// # Drop Ordering
40///
41/// When a SequenceId is dropped, it may wake futures waiting for ordering guarantees.
42/// The `collapse()` method leverages this to provide deterministic ordering of
43/// recursively created sequence IDs.
44pub struct SequenceId {
45    id: Vec<usize>,
46    universe: Arc<Mutex<SequenceUniverse>>,
47}
48
49impl PartialEq for SequenceId {
50    fn eq(&self, other: &Self) -> bool {
51        self.id == other.id
52    }
53}
54
55impl Eq for SequenceId {}
56
57impl PartialOrd for SequenceId {
58    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
59        Some(self.cmp(other))
60    }
61}
62
63impl Ord for SequenceId {
64    fn cmp(&self, other: &Self) -> Ordering {
65        self.id.cmp(&other.id)
66    }
67}
68
69impl Hash for SequenceId {
70    fn hash<H: Hasher>(&self, state: &mut H) {
71        self.id.hash(state);
72    }
73}
74
75impl fmt::Debug for SequenceId {
76    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77        f.debug_struct("SequenceId").field("id", &self.id).finish()
78    }
79}
80
81impl SequenceId {
82    /// Creates a new root sequence universe starting with ID `[0]`.
83    ///
84    /// Each call to `root()` creates an independent universe with no ordering
85    /// guarantees between separate root instances. Within a single universe,
86    /// all IDs are strictly ordered.
87    pub fn root() -> SequencePointer {
88        SequencePointer(SequenceId::new(vec![0], Default::default()))
89    }
90
91    /// Creates a child sequence by descending one level in the hierarchy.
92    ///
93    /// If this SequenceId has ID `[1, 2]`, this method creates the first child
94    /// `[1, 2, 0]` and returns a `SequencePointer` that can generate siblings
95    /// `[1, 2, 1]`, `[1, 2, 2]`, etc.
96    ///
97    /// # Ownership
98    ///
99    /// This method consumes `self`, as the parent ID is no longer needed once
100    /// we've descended to work with its children.
101    pub fn descend(self) -> SequencePointer {
102        let mut id = self.id.clone();
103        id.push(0);
104        SequencePointer(SequenceId::new(id, Arc::clone(&self.universe)))
105    }
106
107    /// Waits until all SequenceIds with IDs lexicographically smaller than this one are dropped.
108    ///
109    /// This async method provides ordering guarantees by ensuring all "prior" sequences
110    /// in the universe have been dropped before returning. Combined with the collision-free
111    /// API, this guarantees that for this universe no sequences lexicographically smaller than
112    /// this one will ever be created again.
113    ///
114    /// # Ordering Guarantee
115    ///
116    /// Once `collapse()` returns, you can be certain that:
117    /// - All sequences with smaller IDs have been dropped
118    /// - No new sequences with smaller IDs can ever be created (due to collision prevention)
119    ///
120    /// # Use Cases
121    ///
122    /// This is particularly useful for ordering recursively created work:
123    /// - Recursive algorithms that spawn child tasks
124    /// - Ensuring deterministic processing order across concurrent operations  
125    /// - Converting hierarchical sequence identifiers to linear segment identifiers
126    ///
127    /// # Returns
128    ///
129    /// The [`SequenceId`] once all other segment IDs before it have been dropped. The caller can hold
130    /// onto the sequence ID essentially as a lock on future calls to [`SequenceId::collapse`]
131    /// in order to perform ordered operations.
132    pub async fn collapse(&mut self) {
133        WaitSequenceFuture(self).await;
134    }
135
136    /// This is intentionally not pub. [SequencePointer::advance] is the only allowed way to create
137    /// [SequenceId] instances
138    fn new(id: Vec<usize>, universe: Arc<Mutex<SequenceUniverse>>) -> Self {
139        // NOTE: This is the only place we construct a SequenceId, and
140        // we immediately add it to the universe.
141        let res = Self { id, universe };
142        res.universe.lock().add(&res);
143        res
144    }
145}
146
147impl Drop for SequenceId {
148    fn drop(&mut self) {
149        let waker = self.universe.lock().remove(self);
150        if let Some(w) = waker {
151            w.wake();
152        }
153    }
154}
155
156/// A pointer that can advance through sibling sequence IDs.
157///
158/// SequencePointer is the only mechanism for creating new SequenceIds within
159/// a universe.
160#[derive(Debug)]
161pub struct SequencePointer(SequenceId);
162
163impl SequencePointer {
164    /// Splits this pointer into two, where the second is strictly greater than the first.
165    pub fn split(mut self) -> (SequencePointer, SequencePointer) {
166        (self.split_off(), self)
167    }
168
169    /// Split off a pointer to appear before the current one.
170    ///
171    /// The current pointer is advanced to the next sibling, and we return a new pointer.
172    pub fn split_off(&mut self) -> SequencePointer {
173        // Advance ourselves to the next sibling, and return a new pointer to the previous one.
174        self.advance().descend()
175    }
176
177    /// Advances to the next sibling sequence and returns the current one.
178    ///
179    /// # Ownership
180    ///
181    /// This method requires `&mut self` because it advances the internal state
182    /// to point to the next sibling position.
183    pub fn advance(&mut self) -> SequenceId {
184        let mut next_id = self.0.id.clone();
185
186        // increment x.y.z -> x.y.(z + 1)
187        let last = next_id.last_mut();
188        let last = last.vortex_expect("must have at least one element");
189        *last += 1;
190        let next_sibling = SequenceId::new(next_id, Arc::clone(&self.0.universe));
191        std::mem::replace(&mut self.0, next_sibling)
192    }
193
194    /// Converts this pointer into its current SequenceId, consuming the pointer.
195    ///
196    /// This method is useful when you want to access the current SequenceId
197    /// without advancing to the next sibling. Once downgraded, you cannot
198    /// create additional siblings from this pointer.
199    pub fn downgrade(self) -> SequenceId {
200        self.0
201    }
202}
203
204#[derive(Default)]
205struct SequenceUniverse {
206    active: BTreeSet<Vec<usize>>,
207    wakers: HashMap<Vec<usize>, Waker>,
208}
209
210impl SequenceUniverse {
211    fn add(&mut self, sequence_id: &SequenceId) {
212        self.active.insert(sequence_id.id.clone());
213    }
214
215    fn remove(&mut self, sequence_id: &SequenceId) -> Option<Waker> {
216        self.active.remove(&sequence_id.id);
217        let Some(first) = self.active.first() else {
218            // last sequence finished, we must have no pending futures
219            assert!(self.wakers.is_empty(), "all wakers must have been removed");
220            return None;
221        };
222        self.wakers.remove(first)
223    }
224}
225
226struct WaitSequenceFuture<'a>(&'a mut SequenceId);
227
228impl Future for WaitSequenceFuture<'_> {
229    type Output = ();
230
231    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
232        let mut guard = self.0.universe.lock();
233        let current_first = guard
234            .active
235            .first()
236            .cloned()
237            .vortex_expect("if we have a future, we must have at least one active sequence");
238        if self.0.id == current_first {
239            guard.wakers.remove(&self.0.id);
240            return Poll::Ready(());
241        }
242
243        guard.wakers.insert(self.0.id.clone(), cx.waker().clone());
244        Poll::Pending
245    }
246}
247
248/// If the future itself is dropped, we don't want to orphan the waker
249impl Drop for WaitSequenceFuture<'_> {
250    fn drop(&mut self) {
251        self.0.universe.lock().wakers.remove(&self.0.id);
252    }
253}
254
255pub trait SequentialStream: Stream<Item = VortexResult<(SequenceId, ArrayRef)>> {
256    fn dtype(&self) -> &DType;
257}
258
259pub type SendableSequentialStream = Pin<Box<dyn SequentialStream + Send>>;
260
261impl SequentialStream for SendableSequentialStream {
262    fn dtype(&self) -> &DType {
263        (**self).dtype()
264    }
265}
266
267pub trait SequentialStreamExt: SequentialStream {
268    // not named boxed to prevent clashing with StreamExt
269    fn sendable(self) -> SendableSequentialStream
270    where
271        Self: Sized + Send + 'static,
272    {
273        Box::pin(self)
274    }
275}
276
277impl<S: SequentialStream> SequentialStreamExt for S {}
278
279pin_project! {
280    pub struct SequentialStreamAdapter<S> {
281        dtype: DType,
282        #[pin]
283        inner: S,
284    }
285}
286
287impl<S> SequentialStreamAdapter<S> {
288    pub fn new(dtype: DType, inner: S) -> Self {
289        Self { dtype, inner }
290    }
291}
292
293impl<S> SequentialStream for SequentialStreamAdapter<S>
294where
295    S: Stream<Item = VortexResult<(SequenceId, ArrayRef)>>,
296{
297    fn dtype(&self) -> &DType {
298        &self.dtype
299    }
300}
301
302impl<S> Stream for SequentialStreamAdapter<S>
303where
304    S: Stream<Item = VortexResult<(SequenceId, ArrayRef)>>,
305{
306    type Item = VortexResult<(SequenceId, ArrayRef)>;
307
308    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
309        let this = self.project();
310        let array = futures::ready!(this.inner.poll_next(cx));
311        if let Some(Ok((_, array))) = array.as_ref() {
312            assert_eq!(
313                array.dtype(),
314                this.dtype,
315                "Sequential stream of {} got chunk of {}.",
316                array.dtype(),
317                this.dtype
318            );
319        }
320
321        Poll::Ready(array)
322    }
323
324    fn size_hint(&self) -> (usize, Option<usize>) {
325        self.inner.size_hint()
326    }
327}
328
329pub trait SequentialArrayStreamExt: ArrayStream {
330    /// Converts the stream to a [`SendableSequentialStream`].
331    fn sequenced(self, mut pointer: SequencePointer) -> SendableSequentialStream
332    where
333        Self: Sized + Send + 'static,
334    {
335        Box::pin(SequentialStreamAdapter::new(
336            self.dtype().clone(),
337            StreamExt::map(self, move |item| {
338                item.map(|array| (pointer.advance(), array))
339            }),
340        ))
341    }
342}
343
344impl<S: ArrayStream> SequentialArrayStreamExt for S {}