vortex_layout/sequence.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp::Ordering;
5use std::collections::BTreeSet;
6use std::fmt;
7use std::hash::{Hash, Hasher};
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll, Waker};
11
12use parking_lot::Mutex;
13use vortex_error::VortexExpect;
14use vortex_utils::aliases::hash_map::HashMap;
15
16use crate::segments::SegmentId;
17
18/// A hierarchical sequence identifier that exists within a shared universe.
19///
20/// SequenceIds form a collision-free universe where each ID is represented as a vector
21/// of indices (e.g., `[0, 1, 2]`). The API design prevents collisions by only allowing
22/// new IDs to be created through controlled advancement or descent operations.
23///
24/// # Hierarchy and Ordering
25///
26/// IDs are hierarchical and lexicographically ordered:
27/// - `[0]` < `[0, 0]` < `[0, 1]` < `[1]` < `[1, 0]`
28/// - A parent ID like `[0, 1]` can spawn children `[0, 1, 0]`, `[0, 1, 1]`, etc.
29/// - Sibling IDs are created by advancing: `[0, 0]` → `[0, 1]` → `[0, 2]`
30///
31/// # Drop Ordering
32///
33/// When a SequenceId is dropped, it may wake futures waiting for ordering guarantees.
34/// The `collapse()` method leverages this to provide deterministic ordering of
35/// recursively created sequence IDs.
36pub struct SequenceId {
37 id: Vec<usize>,
38 universe: Arc<Mutex<SequenceUniverse>>,
39}
40
41impl PartialEq for SequenceId {
42 fn eq(&self, other: &Self) -> bool {
43 self.id == other.id
44 }
45}
46
47impl Eq for SequenceId {}
48
49impl PartialOrd for SequenceId {
50 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
51 Some(self.cmp(other))
52 }
53}
54
55impl Ord for SequenceId {
56 fn cmp(&self, other: &Self) -> Ordering {
57 self.id.cmp(&other.id)
58 }
59}
60
61impl Hash for SequenceId {
62 fn hash<H: Hasher>(&self, state: &mut H) {
63 self.id.hash(state);
64 }
65}
66
67impl fmt::Debug for SequenceId {
68 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69 f.debug_struct("SequenceId").field("id", &self.id).finish()
70 }
71}
72
73impl SequenceId {
74 /// Creates a new root sequence universe starting with ID `[0]`.
75 ///
76 /// Each call to `root()` creates an independent universe with no ordering
77 /// guarantees between separate root instances. Within a single universe,
78 /// all IDs are strictly ordered.
79 pub fn root() -> SequencePointer {
80 SequencePointer(SequenceId::new(vec![0], Default::default()))
81 }
82
83 /// Creates a child sequence by descending one level in the hierarchy.
84 ///
85 /// If this SequenceId has ID `[1, 2]`, this method creates the first child
86 /// `[1, 2, 0]` and returns a `SequencePointer` that can generate siblings
87 /// `[1, 2, 1]`, `[1, 2, 2]`, etc.
88 ///
89 /// # Ownership
90 ///
91 /// This method consumes `self`, as the parent ID is no longer needed once
92 /// we've descended to work with its children.
93 pub fn descend(self) -> SequencePointer {
94 let mut id = self.id.clone();
95 id.push(0);
96 SequencePointer(SequenceId::new(id, self.universe.clone()))
97 }
98
99 /// Waits until all SequenceIds with IDs lexicographically smaller than this one are dropped.
100 ///
101 /// This async method provides ordering guarantees by ensuring all "prior" sequences
102 /// in the universe have been dropped before returning. Combined with the collision-free
103 /// API, this guarantees that for this universe no sequences lexicographically smaller than
104 /// this one will ever be created again.
105 ///
106 /// # Ordering Guarantee
107 ///
108 /// Once `collapse()` returns, you can be certain that:
109 /// - All sequences with smaller IDs have been dropped
110 /// - No new sequences with smaller IDs can ever be created (due to collision prevention)
111 /// - The returned `SegmentId` is monotonically increasing within this universe
112 ///
113 /// # Use Cases
114 ///
115 /// This is particularly useful for ordering recursively created work:
116 /// - Recursive algorithms that spawn child tasks
117 /// - Ensuring deterministic processing order across concurrent operations
118 /// - Converting hierarchical sequence identifiers to linear segment identifiers
119 ///
120 /// # Returns
121 ///
122 /// A monotonically increasing `SegmentId` that can be used for ordered storage
123 /// or processing. Each successful collapse within a universe produces a larger
124 /// `SegmentId` than the previous one.
125 pub async fn collapse(self) -> SegmentId {
126 WaitSequenceFuture(self).await
127 }
128
129 /// This is intentionally not pub. [SequencePointer::advance] is the only allowed way to create
130 /// [SequenceId] instances
131 fn new(id: Vec<usize>, universe: Arc<Mutex<SequenceUniverse>>) -> Self {
132 // NOTE: This is the only place we construct a SequenceId, and
133 // we immediately add it to the universe.
134 let res = Self { id, universe };
135 res.universe.lock().add(&res);
136 res
137 }
138}
139
140impl Drop for SequenceId {
141 fn drop(&mut self) {
142 let waker = self.universe.lock().remove(self);
143 if let Some(w) = waker {
144 w.wake();
145 }
146 }
147}
148
149/// A pointer that can advance through sibling sequence IDs.
150///
151/// SequencePointer is the only mechanism for creating new SequenceIds within
152/// a universe.
153pub struct SequencePointer(SequenceId);
154
155impl SequencePointer {
156 /// Advances to the next sibling sequence and returns the current one.
157 ///
158 /// # Ownership
159 ///
160 /// This method requires `&mut self` because it advances the internal state
161 /// to point to the next sibling position.
162 pub fn advance(&mut self) -> SequenceId {
163 let mut next_id = self.0.id.clone();
164
165 // increment x.y.z -> x.y.(z + 1)
166 let last = next_id.last_mut();
167 let last = last.vortex_expect("must have at least one element");
168 *last += 1;
169 let next_sibling = SequenceId::new(next_id, self.0.universe.clone());
170 std::mem::replace(&mut self.0, next_sibling)
171 }
172
173 /// Converts this pointer into its current SequenceId, consuming the pointer.
174 ///
175 /// This method is useful when you want to access the current SequenceId
176 /// without advancing to the next sibling. Once downgraded, you cannot
177 /// create additional siblings from this pointer.
178 pub fn downgrade(self) -> SequenceId {
179 self.0
180 }
181}
182
183#[derive(Default)]
184struct SequenceUniverse {
185 active: BTreeSet<Vec<usize>>,
186 wakers: HashMap<Vec<usize>, Waker>,
187 next_segment_id: SegmentId,
188}
189
190impl SequenceUniverse {
191 fn add(&mut self, sequence_id: &SequenceId) {
192 self.active.insert(sequence_id.id.clone());
193 }
194
195 fn remove(&mut self, sequence_id: &SequenceId) -> Option<Waker> {
196 self.active.remove(&sequence_id.id);
197 let Some(first) = self.active.first() else {
198 // last sequence finished, we must have no pending futures
199 assert!(self.wakers.is_empty(), "all wakers must have been removed");
200 return None;
201 };
202 self.wakers.remove(first)
203 }
204
205 pub fn next_segment_id(&mut self) -> SegmentId {
206 let res = self.next_segment_id;
207 self.next_segment_id = SegmentId::from(*res + 1);
208 res
209 }
210}
211
212struct WaitSequenceFuture(SequenceId);
213
214impl Future for WaitSequenceFuture {
215 type Output = SegmentId;
216
217 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
218 let mut guard = self.0.universe.lock();
219 let current_first = guard
220 .active
221 .first()
222 .cloned()
223 .vortex_expect("if we have a future, we must have at least one active sequence");
224 if self.0.id == current_first {
225 return Poll::Ready(guard.next_segment_id());
226 }
227 guard.wakers.insert(self.0.id.clone(), cx.waker().clone());
228 Poll::Pending
229 }
230}