vortex_layout/
sequence.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp::Ordering;
5use std::collections::BTreeSet;
6use std::fmt;
7use std::hash::Hash;
8use std::hash::Hasher;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::task::Context;
12use std::task::Poll;
13use std::task::Waker;
14
15use futures::Stream;
16use futures::StreamExt;
17use parking_lot::Mutex;
18use pin_project_lite::pin_project;
19use vortex_array::Array;
20use vortex_array::ArrayRef;
21use vortex_array::stream::ArrayStream;
22use vortex_dtype::DType;
23use vortex_error::VortexExpect;
24use vortex_error::VortexResult;
25use vortex_utils::aliases::hash_map::HashMap;
26
27/// A hierarchical sequence identifier that exists within a shared universe.
28///
29/// SequenceIds form a collision-free universe where each ID is represented as a vector
30/// of indices (e.g., `[0, 1, 2]`). The API design prevents collisions by only allowing
31/// new IDs to be created through controlled advancement or descent operations.
32///
33/// # Hierarchy and Ordering
34///
35/// IDs are hierarchical and lexicographically ordered:
36/// - `[0]` < `[0, 0]` < `[0, 1]` < `[1]` < `[1, 0]`
37/// - A parent ID like `[0, 1]` can spawn children `[0, 1, 0]`, `[0, 1, 1]`, etc.
38/// - Sibling IDs are created by advancing: `[0, 0]` → `[0, 1]` → `[0, 2]`
39///
40/// # Drop Ordering
41///
42/// When a SequenceId is dropped, it may wake futures waiting for ordering guarantees.
43/// The `collapse()` method leverages this to provide deterministic ordering of
44/// recursively created sequence IDs.
45pub struct SequenceId {
46    id: Vec<usize>,
47    universe: Arc<Mutex<SequenceUniverse>>,
48}
49
50impl PartialEq for SequenceId {
51    fn eq(&self, other: &Self) -> bool {
52        self.id == other.id
53    }
54}
55
56impl Eq for SequenceId {}
57
58impl PartialOrd for SequenceId {
59    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
60        Some(self.cmp(other))
61    }
62}
63
64impl Ord for SequenceId {
65    fn cmp(&self, other: &Self) -> Ordering {
66        self.id.cmp(&other.id)
67    }
68}
69
70impl Hash for SequenceId {
71    fn hash<H: Hasher>(&self, state: &mut H) {
72        self.id.hash(state);
73    }
74}
75
76impl fmt::Debug for SequenceId {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        f.debug_struct("SequenceId").field("id", &self.id).finish()
79    }
80}
81
82impl SequenceId {
83    /// Creates a new root sequence universe starting with ID `[0]`.
84    ///
85    /// Each call to `root()` creates an independent universe with no ordering
86    /// guarantees between separate root instances. Within a single universe,
87    /// all IDs are strictly ordered.
88    pub fn root() -> SequencePointer {
89        SequencePointer(SequenceId::new(vec![0], Default::default()))
90    }
91
92    /// Creates a child sequence by descending one level in the hierarchy.
93    ///
94    /// If this SequenceId has ID `[1, 2]`, this method creates the first child
95    /// `[1, 2, 0]` and returns a `SequencePointer` that can generate siblings
96    /// `[1, 2, 1]`, `[1, 2, 2]`, etc.
97    ///
98    /// # Ownership
99    ///
100    /// This method consumes `self`, as the parent ID is no longer needed once
101    /// we've descended to work with its children.
102    pub fn descend(self) -> SequencePointer {
103        let mut id = self.id.clone();
104        id.push(0);
105        SequencePointer(SequenceId::new(id, self.universe.clone()))
106    }
107
108    /// Waits until all SequenceIds with IDs lexicographically smaller than this one are dropped.
109    ///
110    /// This async method provides ordering guarantees by ensuring all "prior" sequences
111    /// in the universe have been dropped before returning. Combined with the collision-free
112    /// API, this guarantees that for this universe no sequences lexicographically smaller than
113    /// this one will ever be created again.
114    ///
115    /// # Ordering Guarantee
116    ///
117    /// Once `collapse()` returns, you can be certain that:
118    /// - All sequences with smaller IDs have been dropped
119    /// - No new sequences with smaller IDs can ever be created (due to collision prevention)
120    ///
121    /// # Use Cases
122    ///
123    /// This is particularly useful for ordering recursively created work:
124    /// - Recursive algorithms that spawn child tasks
125    /// - Ensuring deterministic processing order across concurrent operations  
126    /// - Converting hierarchical sequence identifiers to linear segment identifiers
127    ///
128    /// # Returns
129    ///
130    /// The [`SequenceId`] once all other segment IDs before it have been dropped. The caller can hold
131    /// onto the sequence ID essentially as a lock on future calls to [`SequenceId::collapse`]
132    /// in order to perform ordered operations.
133    pub async fn collapse(&mut self) {
134        WaitSequenceFuture(self).await;
135    }
136
137    /// This is intentionally not pub. [SequencePointer::advance] is the only allowed way to create
138    /// [SequenceId] instances
139    fn new(id: Vec<usize>, universe: Arc<Mutex<SequenceUniverse>>) -> Self {
140        // NOTE: This is the only place we construct a SequenceId, and
141        // we immediately add it to the universe.
142        let res = Self { id, universe };
143        res.universe.lock().add(&res);
144        res
145    }
146}
147
148impl Drop for SequenceId {
149    fn drop(&mut self) {
150        let waker = self.universe.lock().remove(self);
151        if let Some(w) = waker {
152            w.wake();
153        }
154    }
155}
156
157/// A pointer that can advance through sibling sequence IDs.
158///
159/// SequencePointer is the only mechanism for creating new SequenceIds within
160/// a universe.
161#[derive(Debug)]
162pub struct SequencePointer(SequenceId);
163
164impl SequencePointer {
165    /// Splits this pointer into two, where the second is strictly greater than the first.
166    pub fn split(mut self) -> (SequencePointer, SequencePointer) {
167        (self.split_off(), self)
168    }
169
170    /// Split off a pointer to appear before the current one.
171    ///
172    /// The current pointer is advanced to the next sibling, and we return a new pointer.
173    pub fn split_off(&mut self) -> SequencePointer {
174        // Advance ourselves to the next sibling, and return a new pointer to the previous one.
175        self.advance().descend()
176    }
177
178    /// Advances to the next sibling sequence and returns the current one.
179    ///
180    /// # Ownership
181    ///
182    /// This method requires `&mut self` because it advances the internal state
183    /// to point to the next sibling position.
184    pub fn advance(&mut self) -> SequenceId {
185        let mut next_id = self.0.id.clone();
186
187        // increment x.y.z -> x.y.(z + 1)
188        let last = next_id.last_mut();
189        let last = last.vortex_expect("must have at least one element");
190        *last += 1;
191        let next_sibling = SequenceId::new(next_id, self.0.universe.clone());
192        std::mem::replace(&mut self.0, next_sibling)
193    }
194
195    /// Converts this pointer into its current SequenceId, consuming the pointer.
196    ///
197    /// This method is useful when you want to access the current SequenceId
198    /// without advancing to the next sibling. Once downgraded, you cannot
199    /// create additional siblings from this pointer.
200    pub fn downgrade(self) -> SequenceId {
201        self.0
202    }
203}
204
205#[derive(Default)]
206struct SequenceUniverse {
207    active: BTreeSet<Vec<usize>>,
208    wakers: HashMap<Vec<usize>, Waker>,
209}
210
211impl SequenceUniverse {
212    fn add(&mut self, sequence_id: &SequenceId) {
213        self.active.insert(sequence_id.id.clone());
214    }
215
216    fn remove(&mut self, sequence_id: &SequenceId) -> Option<Waker> {
217        self.active.remove(&sequence_id.id);
218        let Some(first) = self.active.first() else {
219            // last sequence finished, we must have no pending futures
220            assert!(self.wakers.is_empty(), "all wakers must have been removed");
221            return None;
222        };
223        self.wakers.remove(first)
224    }
225}
226
227struct WaitSequenceFuture<'a>(&'a mut SequenceId);
228
229impl Future for WaitSequenceFuture<'_> {
230    type Output = ();
231
232    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
233        let mut guard = self.0.universe.lock();
234        let current_first = guard
235            .active
236            .first()
237            .cloned()
238            .vortex_expect("if we have a future, we must have at least one active sequence");
239        if self.0.id == current_first {
240            guard.wakers.remove(&self.0.id);
241            return Poll::Ready(());
242        }
243
244        guard.wakers.insert(self.0.id.clone(), cx.waker().clone());
245        Poll::Pending
246    }
247}
248
249/// If the future itself is dropped, we don't want to orphan the waker
250impl Drop for WaitSequenceFuture<'_> {
251    fn drop(&mut self) {
252        self.0.universe.lock().wakers.remove(&self.0.id);
253    }
254}
255
256pub trait SequentialStream: Stream<Item = VortexResult<(SequenceId, ArrayRef)>> {
257    fn dtype(&self) -> &DType;
258}
259
260pub type SendableSequentialStream = Pin<Box<dyn SequentialStream + Send>>;
261
262impl SequentialStream for SendableSequentialStream {
263    fn dtype(&self) -> &DType {
264        (**self).dtype()
265    }
266}
267
268pub trait SequentialStreamExt: SequentialStream {
269    // not named boxed to prevent clashing with StreamExt
270    fn sendable(self) -> SendableSequentialStream
271    where
272        Self: Sized + Send + 'static,
273    {
274        Box::pin(self)
275    }
276}
277
278impl<S: SequentialStream> SequentialStreamExt for S {}
279
280pin_project! {
281    pub struct SequentialStreamAdapter<S> {
282        dtype: DType,
283        #[pin]
284        inner: S,
285    }
286}
287
288impl<S> SequentialStreamAdapter<S> {
289    pub fn new(dtype: DType, inner: S) -> Self {
290        Self { dtype, inner }
291    }
292}
293
294impl<S> SequentialStream for SequentialStreamAdapter<S>
295where
296    S: Stream<Item = VortexResult<(SequenceId, ArrayRef)>>,
297{
298    fn dtype(&self) -> &DType {
299        &self.dtype
300    }
301}
302
303impl<S> Stream for SequentialStreamAdapter<S>
304where
305    S: Stream<Item = VortexResult<(SequenceId, ArrayRef)>>,
306{
307    type Item = VortexResult<(SequenceId, ArrayRef)>;
308
309    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
310        let this = self.project();
311        let array = futures::ready!(this.inner.poll_next(cx));
312        if let Some(Ok((_, array))) = array.as_ref() {
313            assert_eq!(
314                array.dtype(),
315                this.dtype,
316                "Sequential stream of {} got chunk of {}.",
317                array.dtype(),
318                this.dtype
319            );
320        }
321
322        Poll::Ready(array)
323    }
324
325    fn size_hint(&self) -> (usize, Option<usize>) {
326        self.inner.size_hint()
327    }
328}
329
330pub trait SequentialArrayStreamExt: ArrayStream {
331    /// Converts the stream to a [`SendableSequentialStream`].
332    fn sequenced(self, mut pointer: SequencePointer) -> SendableSequentialStream
333    where
334        Self: Sized + Send + 'static,
335    {
336        Box::pin(SequentialStreamAdapter::new(
337            self.dtype().clone(),
338            StreamExt::map(self, move |item| {
339                item.map(|array| (pointer.advance(), array))
340            }),
341        ))
342    }
343}
344
345impl<S: ArrayStream> SequentialArrayStreamExt for S {}