vortex_layout/
sequence.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp::Ordering;
5use std::collections::BTreeSet;
6use std::fmt;
7use std::hash::{Hash, Hasher};
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll, Waker};
11
12use parking_lot::Mutex;
13use vortex_error::VortexExpect;
14use vortex_utils::aliases::hash_map::HashMap;
15
16use crate::segments::SegmentId;
17
18/// A hierarchical sequence identifier that exists within a shared universe.
19///
20/// SequenceIds form a collision-free universe where each ID is represented as a vector
21/// of indices (e.g., `[0, 1, 2]`). The API design prevents collisions by only allowing
22/// new IDs to be created through controlled advancement or descent operations.
23///
24/// # Hierarchy and Ordering
25///
26/// IDs are hierarchical and lexicographically ordered:
27/// - `[0]` < `[0, 0]` < `[0, 1]` < `[1]` < `[1, 0]`
28/// - A parent ID like `[0, 1]` can spawn children `[0, 1, 0]`, `[0, 1, 1]`, etc.
29/// - Sibling IDs are created by advancing: `[0, 0]` → `[0, 1]` → `[0, 2]`
30///
31/// # Drop Ordering
32///
33/// When a SequenceId is dropped, it may wake futures waiting for ordering guarantees.
34/// The `collapse()` method leverages this to provide deterministic ordering of
35/// recursively created sequence IDs.
36pub struct SequenceId {
37    id: Vec<usize>,
38    universe: Arc<Mutex<SequenceUniverse>>,
39}
40
41impl PartialEq for SequenceId {
42    fn eq(&self, other: &Self) -> bool {
43        self.id == other.id
44    }
45}
46
47impl Eq for SequenceId {}
48
49impl PartialOrd for SequenceId {
50    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
51        Some(self.cmp(other))
52    }
53}
54
55impl Ord for SequenceId {
56    fn cmp(&self, other: &Self) -> Ordering {
57        self.id.cmp(&other.id)
58    }
59}
60
61impl Hash for SequenceId {
62    fn hash<H: Hasher>(&self, state: &mut H) {
63        self.id.hash(state);
64    }
65}
66
67impl fmt::Debug for SequenceId {
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        f.debug_struct("SequenceId").field("id", &self.id).finish()
70    }
71}
72
73impl SequenceId {
74    /// Creates a new root sequence universe starting with ID `[0]`.
75    ///
76    /// Each call to `root()` creates an independent universe with no ordering
77    /// guarantees between separate root instances. Within a single universe,
78    /// all IDs are strictly ordered.
79    pub fn root() -> SequencePointer {
80        SequencePointer(SequenceId::new(vec![0], Default::default()))
81    }
82
83    /// Creates a child sequence by descending one level in the hierarchy.
84    ///
85    /// If this SequenceId has ID `[1, 2]`, this method creates the first child
86    /// `[1, 2, 0]` and returns a `SequencePointer` that can generate siblings
87    /// `[1, 2, 1]`, `[1, 2, 2]`, etc.
88    ///
89    /// # Ownership
90    ///
91    /// This method consumes `self`, as the parent ID is no longer needed once
92    /// we've descended to work with its children.
93    pub fn descend(self) -> SequencePointer {
94        let mut id = self.id.clone();
95        id.push(0);
96        SequencePointer(SequenceId::new(id, self.universe.clone()))
97    }
98
99    /// Waits until all SequenceIds with IDs lexicographically smaller than this one are dropped.
100    ///
101    /// This async method provides ordering guarantees by ensuring all "prior" sequences
102    /// in the universe have been dropped before returning. Combined with the collision-free
103    /// API, this guarantees that for this universe no sequences lexicographically smaller than
104    /// this one will ever be created again.
105    ///
106    /// # Ordering Guarantee
107    ///
108    /// Once `collapse()` returns, you can be certain that:
109    /// - All sequences with smaller IDs have been dropped
110    /// - No new sequences with smaller IDs can ever be created (due to collision prevention)
111    /// - The returned `SegmentId` is monotonically increasing within this universe
112    ///
113    /// # Use Cases
114    ///
115    /// This is particularly useful for ordering recursively created work:
116    /// - Recursive algorithms that spawn child tasks
117    /// - Ensuring deterministic processing order across concurrent operations  
118    /// - Converting hierarchical sequence identifiers to linear segment identifiers
119    ///
120    /// # Returns
121    ///
122    /// A monotonically increasing `SegmentId` that can be used for ordered storage
123    /// or processing. Each successful collapse within a universe produces a larger
124    /// `SegmentId` than the previous one.
125    pub async fn collapse(self) -> SegmentId {
126        WaitSequenceFuture(self).await
127    }
128
129    /// This is intentionally not pub. [SequencePointer::advance] is the only allowed way to create
130    /// [SequenceId] instances
131    fn new(id: Vec<usize>, universe: Arc<Mutex<SequenceUniverse>>) -> Self {
132        // NOTE: This is the only place we construct a SequenceId, and
133        // we immediately add it to the universe.
134        let res = Self { id, universe };
135        res.universe.lock().add(&res);
136        res
137    }
138}
139
140impl Drop for SequenceId {
141    fn drop(&mut self) {
142        let waker = self.universe.lock().remove(self);
143        if let Some(w) = waker {
144            w.wake();
145        }
146    }
147}
148
149/// A pointer that can advance through sibling sequence IDs.
150///
151/// SequencePointer is the only mechanism for creating new SequenceIds within
152/// a universe.
153pub struct SequencePointer(SequenceId);
154
155impl SequencePointer {
156    /// Advances to the next sibling sequence and returns the current one.
157    ///
158    /// # Ownership
159    ///
160    /// This method requires `&mut self` because it advances the internal state
161    /// to point to the next sibling position.
162    pub fn advance(&mut self) -> SequenceId {
163        let mut next_id = self.0.id.clone();
164
165        // increment x.y.z -> x.y.(z + 1)
166        let last = next_id.last_mut();
167        let last = last.vortex_expect("must have at least one element");
168        *last += 1;
169        let next_sibling = SequenceId::new(next_id, self.0.universe.clone());
170        std::mem::replace(&mut self.0, next_sibling)
171    }
172
173    /// Converts this pointer into its current SequenceId, consuming the pointer.
174    ///
175    /// This method is useful when you want to access the current SequenceId
176    /// without advancing to the next sibling. Once downgraded, you cannot
177    /// create additional siblings from this pointer.
178    pub fn downgrade(self) -> SequenceId {
179        self.0
180    }
181}
182
183#[derive(Default)]
184struct SequenceUniverse {
185    active: BTreeSet<Vec<usize>>,
186    wakers: HashMap<Vec<usize>, Waker>,
187    next_segment_id: SegmentId,
188}
189
190impl SequenceUniverse {
191    fn add(&mut self, sequence_id: &SequenceId) {
192        self.active.insert(sequence_id.id.clone());
193    }
194
195    fn remove(&mut self, sequence_id: &SequenceId) -> Option<Waker> {
196        self.active.remove(&sequence_id.id);
197        let Some(first) = self.active.first() else {
198            // last sequence finished, we must have no pending futures
199            assert!(self.wakers.is_empty(), "all wakers must have been removed");
200            return None;
201        };
202        self.wakers.remove(first)
203    }
204
205    pub fn next_segment_id(&mut self) -> SegmentId {
206        let res = self.next_segment_id;
207        self.next_segment_id = SegmentId::from(*res + 1);
208        res
209    }
210}
211
212struct WaitSequenceFuture(SequenceId);
213
214impl Future for WaitSequenceFuture {
215    type Output = SegmentId;
216
217    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
218        let mut guard = self.0.universe.lock();
219        let current_first = guard
220            .active
221            .first()
222            .cloned()
223            .vortex_expect("if we have a future, we must have at least one active sequence");
224        if self.0.id == current_first {
225            return Poll::Ready(guard.next_segment_id());
226        }
227        guard.wakers.insert(self.0.id.clone(), cx.waker().clone());
228        Poll::Pending
229    }
230}