kvbm_logical/sequence/assignments/external.rs
1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::ops::Range;
5
6use crate::{BlockId, KvbmSequenceHashProvider, SequenceHash};
7use dynamo_tokens::TokenBlock;
8
9use super::super::store::BlockStore;
10use crate::sequence::BlockSequenceError;
11
12/// Per-tier block_id tracking with an offset into the sequence.
13///
14/// Maintains an ordered mapping of `BlockId` → `SequenceHash` for assigned blocks,
15/// a staging area for blocks whose hashes have been computed but not yet committed,
16/// plus a FIFO queue of block_ids waiting for assignment. Index `i` in the assigned
17/// map corresponds to sequence position `offset + i`.
18///
19/// The three-phase lifecycle is:
20/// - **Unassigned** — block_ids queued for assignment (no hash yet).
21/// - **Staged** — block_ids paired with their `SequenceHash` but not yet committed.
22/// - **Assigned** — committed `BlockId → SequenceHash` pairs in positional order.
23///
24/// Multiple `ExternalBlockAssignments` instances can operate on the same `&[TokenBlock]` at
25/// different offsets (multi-tier).
26pub struct ExternalBlockAssignments {
27 store: BlockStore<(), SequenceHash, SequenceHash>,
28
29 /// Starting position in the sequence. Assignments begin at this position.
30 offset: usize,
31}
32
33impl std::fmt::Debug for ExternalBlockAssignments {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 f.debug_struct("ExternalBlockAssignments")
36 .field("assigned_count", &self.store.assigned_count())
37 .field("staged_count", &self.store.staged_count())
38 .field("unassigned_count", &self.store.unassigned_count())
39 .field("offset", &self.offset)
40 .finish()
41 }
42}
43
44impl ExternalBlockAssignments {
45 /// Creates a new `ExternalBlockAssignments` starting at the given offset.
46 pub fn new(offset: usize) -> Self {
47 Self {
48 store: BlockStore::new(),
49 offset,
50 }
51 }
52
53 /// Returns the starting position in the sequence.
54 pub fn offset(&self) -> usize {
55 self.offset
56 }
57
58 /// Checks whether a block_id is known (assigned, staged, or unassigned).
59 pub fn contains(&self, block_id: &BlockId) -> bool {
60 self.store.contains(block_id)
61 }
62
63 /// Positional access: returns `(BlockId, SequenceHash)` at the given index
64 /// (relative to offset) in the assigned collection.
65 pub fn get_assigned(&self, index: usize) -> Option<(BlockId, SequenceHash)> {
66 self.store
67 .get_assigned(index)
68 .map(|(&id, &hash)| (id, hash))
69 }
70
71 /// Returns the number of assigned blocks.
72 pub fn assigned_count(&self) -> usize {
73 self.store.assigned_count()
74 }
75
76 /// Returns the number of staged blocks.
77 pub fn staged_count(&self) -> usize {
78 self.store.staged_count()
79 }
80
81 /// Returns the number of unassigned (pending) block_ids.
82 pub fn unassigned_count(&self) -> usize {
83 self.store.unassigned_count()
84 }
85
86 /// Positional access: returns `(BlockId, SequenceHash)` at the given index
87 /// (relative to the start of staged) in the staged collection.
88 pub fn get_staged(&self, index: usize) -> Option<(BlockId, SequenceHash)> {
89 self.store.get_staged(index).map(|(&id, &hash)| (id, hash))
90 }
91
92 /// Iterates over assigned blocks in positional order, yielding `(BlockId, SequenceHash)`.
93 pub fn assigned_iter(&self) -> impl Iterator<Item = (BlockId, SequenceHash)> + '_ {
94 self.store.assigned_iter().map(|(&id, &hash)| (id, hash))
95 }
96
97 /// Iterates over staged blocks in staging order, yielding `(BlockId, SequenceHash)`.
98 pub fn staged_iter(&self) -> impl Iterator<Item = (BlockId, SequenceHash)> + '_ {
99 self.store.staged_iter().map(|(&id, &hash)| (id, hash))
100 }
101
102 /// Iterates over unassigned block_ids in FIFO order.
103 pub fn unassigned_iter(&self) -> impl Iterator<Item = BlockId> + '_ {
104 self.store.unassigned_iter().map(|(&id, _)| id)
105 }
106
107 /// Clears all assigned, staged, and unassigned blocks, preserving the offset.
108 pub fn clear(&mut self) {
109 self.store.clear();
110 }
111
112 /// Takes all staged blocks, returning them as a `Vec`.
113 pub fn take_staged(&mut self) -> Vec<(BlockId, SequenceHash)> {
114 self.store.take_staged()
115 }
116
117 /// Returns the next sequence position to be assigned:
118 /// `offset + assigned_count + staged_count`.
119 pub fn next_position(&self) -> usize {
120 self.offset + self.store.assigned_count() + self.store.staged_count()
121 }
122
123 /// Absolute position range of assigned blocks: `offset..offset + assigned_count`.
124 pub fn assigned_positions(&self) -> Range<usize> {
125 self.offset..self.offset + self.store.assigned_count()
126 }
127
128 /// Absolute position range of staged blocks:
129 /// `offset + assigned_count .. offset + assigned_count + staged_count`.
130 pub fn staged_positions(&self) -> Range<usize> {
131 let start = self.offset + self.store.assigned_count();
132 start..start + self.store.staged_count()
133 }
134
135 /// Get the assigned `(BlockId, SequenceHash)` at an absolute sequence position.
136 ///
137 /// Returns `None` if `abs_pos` is outside [`assigned_positions()`](Self::assigned_positions).
138 pub fn get_at_position(&self, abs_pos: usize) -> Option<(BlockId, SequenceHash)> {
139 let relative = abs_pos.checked_sub(self.offset)?;
140 self.get_assigned(relative)
141 }
142
143 /// Absolute position range that pending (unassigned) blocks will occupy once
144 /// flushed: `next_position()..next_position() + unassigned_count()`.
145 pub fn pending_positions(&self) -> Range<usize> {
146 let start = self.next_position();
147 start..start + self.store.unassigned_count()
148 }
149
150 /// Get the pending `BlockId` at an absolute sequence position (FIFO order).
151 ///
152 /// Position `next_position()` maps to the first unassigned block,
153 /// `next_position() + 1` to the second, etc.
154 /// Returns `None` if `abs_pos` is outside [`pending_positions()`](Self::pending_positions).
155 pub fn get_pending_at_position(&self, abs_pos: usize) -> Option<BlockId> {
156 let start = self.next_position();
157 let relative = abs_pos.checked_sub(start)?;
158 self.store.get_unassigned(relative).map(|(&id, _)| id)
159 }
160
161 /// Add new block_ids to the unassigned queue.
162 ///
163 /// `block_ids` is the **full, ordered** list of block IDs allocated to this
164 /// assignment set. Known IDs (already in assigned, staged, or unassigned) must
165 /// form a contiguous prefix and are silently skipped. New IDs are appended to
166 /// the unassigned FIFO queue.
167 ///
168 /// This method does **not** assign blocks — call
169 /// [`assign_pending`](Self::assign_pending) to pair unassigned IDs with
170 /// available sequence blocks.
171 ///
172 /// # Block ID rules
173 ///
174 /// The list is partitioned into a **known prefix** and a **new suffix**:
175 ///
176 /// - **Known prefix** — IDs already present in `assigned`, `staged`, or
177 /// `unassigned`. These are silently skipped. They must appear contiguously
178 /// at the front of the list; interleaving a known ID after an unknown one
179 /// is an [`OrderingViolation`](BlockSequenceError::OrderingViolation).
180 /// - **New suffix** — IDs not yet seen. These are appended (in order) to
181 /// the unassigned FIFO queue.
182 ///
183 /// # Algorithm (two-phase, atomic)
184 ///
185 /// 1. **Validate & collect** — iterate `block_ids`. Known IDs must form a
186 /// contiguous prefix (skip them). Unknown IDs are collected into a temp
187 /// buffer. If a known ID appears after an unknown one →
188 /// `OrderingViolation` error. No state is mutated until validation passes.
189 /// 2. **Commit** — push all new IDs to the unassigned queue.
190 pub fn extend_block_ids(
191 &mut self,
192 block_ids: impl IntoIterator<Item = BlockId>,
193 ) -> Result<(), BlockSequenceError> {
194 // Phase 1: Validate & collect
195 let mut new_ids = Vec::new();
196 let mut new_id_set = indexmap::IndexSet::new();
197 let mut first_new_index: Option<usize> = None;
198
199 for (i, id) in block_ids.into_iter().enumerate() {
200 if self.contains(&id) {
201 // Known ID — must come before any new IDs
202 if let Some(first_new) = first_new_index {
203 return Err(BlockSequenceError::OrderingViolation {
204 known_id: id,
205 new_id: new_ids[0],
206 known_index: i,
207 first_new_index: first_new,
208 });
209 }
210 // Skip — already known
211 } else {
212 // Unknown ID — collect, rejecting internal duplicates
213 if !new_id_set.insert(id) {
214 return Err(BlockSequenceError::DuplicateBlockId { block_id: id });
215 }
216 if first_new_index.is_none() {
217 first_new_index = Some(i);
218 }
219 new_ids.push(id);
220 }
221 }
222
223 // Phase 2: Commit — no errors from here on
224 for id in new_ids {
225 self.store.insert_unassigned(id, ());
226 }
227
228 Ok(())
229 }
230
231 /// Inserts pre-matched `(BlockId, SequenceHash)` pairs directly into the
232 /// assigned collection.
233 ///
234 /// This is the entry point for blocks whose hashes are already known (e.g.
235 /// cache hits). Two-phase atomic: collects all items, validates no duplicate
236 /// BlockIds across all three collections, then commits to assigned.
237 pub fn extend_assigned(
238 &mut self,
239 items: impl IntoIterator<Item = (BlockId, SequenceHash)>,
240 ) -> Result<usize, BlockSequenceError> {
241 let items: Vec<(BlockId, SequenceHash)> = items.into_iter().collect();
242
243 if let Err(block_id) = self
244 .store
245 .validate_no_duplicates(items.iter().map(|(id, _)| *id), items.len())
246 {
247 return Err(BlockSequenceError::DuplicateBlockId { block_id });
248 }
249
250 let count = items.len();
251 for (id, hash) in items {
252 self.store.insert_assigned(id, hash);
253 }
254
255 Ok(count)
256 }
257
258 /// FIFO drain from unassigned into staged, pairing each block_id with the
259 /// sequence hash from the corresponding `TokenBlock`.
260 ///
261 /// Staging starts at `sequence_blocks[self.next_position()]` and proceeds
262 /// forward, consuming one unassigned ID per available block. The loop stops
263 /// when either the unassigned queue is empty or there are no more sequence
264 /// blocks.
265 ///
266 /// Returns the range of newly staged indices (relative to the start of the
267 /// staged collection before this call).
268 ///
269 /// Each staged pair is validated: the position embedded in the block's
270 /// `kvbm_sequence_hash()` must equal the expected sequence index.
271 /// A mismatch returns [`BlockSequenceError::PositionMismatch`].
272 pub fn stage_pending(
273 &mut self,
274 sequence_blocks: &[TokenBlock],
275 ) -> Result<Range<usize>, BlockSequenceError> {
276 let staged_start_idx = self.store.staged_count();
277 let start_pos = self.next_position();
278
279 // How many sequence blocks are available starting from our next position?
280 let available_blocks = sequence_blocks.len().saturating_sub(start_pos);
281
282 // How many can we stage? Min of available blocks and unassigned count.
283 let to_stage = available_blocks.min(self.store.unassigned_count());
284
285 // Phase 1: Validate all positions before mutating
286 for i in 0..to_stage {
287 let seq_pos = start_pos + i;
288 let block = &sequence_blocks[seq_pos];
289 let hash = block.kvbm_sequence_hash();
290
291 let actual_pos = hash.position();
292 if actual_pos != seq_pos as u64 {
293 let block_id = self.store.get_unassigned(i).map(|(&id, _)| id).unwrap();
294 return Err(BlockSequenceError::PositionMismatch {
295 expected: seq_pos,
296 actual: actual_pos,
297 block_id,
298 });
299 }
300 }
301
302 // Phase 2: Commit — no errors from here on
303 for i in 0..to_stage {
304 let seq_pos = start_pos + i;
305 let hash = sequence_blocks[seq_pos].kvbm_sequence_hash();
306 let (block_id, _) = self.store.shift_unassigned().unwrap();
307 self.store.insert_staged(block_id, hash);
308 }
309
310 let staged_end_idx = self.store.staged_count();
311 Ok(staged_start_idx..staged_end_idx)
312 }
313
314 /// Moves all staged blocks into assigned (infallible).
315 ///
316 /// Returns the range of newly assigned indices (relative to the start of
317 /// the assigned collection before this call).
318 pub fn commit_staged(&mut self) -> Range<usize> {
319 let start_idx = self.store.assigned_count();
320
321 while let Some((block_id, hash)) = self.store.shift_staged() {
322 self.store.insert_assigned(block_id, hash);
323 }
324
325 let end_idx = self.store.assigned_count();
326 start_idx..end_idx
327 }
328
329 /// Drain the unassigned FIFO queue into assigned, pairing each block_id
330 /// with the next available `TokenBlock` starting at `next_position()`.
331 ///
332 /// This is a convenience method equivalent to calling
333 /// [`stage_pending`](Self::stage_pending) followed by
334 /// [`commit_staged`](Self::commit_staged).
335 ///
336 /// Returns the range of newly assigned indices (relative to offset).
337 /// An empty range means no new assignments were made.
338 pub fn assign_pending(
339 &mut self,
340 sequence_blocks: &[TokenBlock],
341 ) -> Result<Range<usize>, BlockSequenceError> {
342 self.stage_pending(sequence_blocks)?;
343 Ok(self.commit_staged())
344 }
345}
346
347/// Zip two [`ExternalBlockAssignments`] over their overlapping assigned positions.
348///
349/// For each absolute position where **both** `a` and `b` have assigned blocks,
350/// yields `(position, a_block_id, b_block_id)`.
351///
352/// Iteration order: ascending position.
353pub fn zip_assigned(
354 a: &ExternalBlockAssignments,
355 b: &ExternalBlockAssignments,
356) -> Vec<(usize, BlockId, BlockId)> {
357 let a_range = a.assigned_positions();
358 let b_range = b.assigned_positions();
359 let start = a_range.start.max(b_range.start);
360 let end = a_range.end.min(b_range.end);
361
362 let mut result = Vec::new();
363 for pos in start..end {
364 // Both lookups are guaranteed to succeed within the intersection range.
365 let (a_id, _) = a.get_at_position(pos).unwrap();
366 let (b_id, _) = b.get_at_position(pos).unwrap();
367 result.push((pos, a_id, b_id));
368 }
369 result
370}
371
372/// Zip `src` assigned positions with `dst` pending positions.
373///
374/// For each absolute position where `src` has an assigned block and `dst`
375/// has a pending (unassigned) block, yields `(position, src_block_id, dst_block_id)`.
376///
377/// This is the onboard/offload planning primitive: the result tells you
378/// which source blocks to transfer into which destination blocks.
379pub fn zip_assigned_pending(
380 src: &ExternalBlockAssignments,
381 dst: &ExternalBlockAssignments,
382) -> Vec<(usize, BlockId, BlockId)> {
383 let src_range = src.assigned_positions();
384 let dst_range = dst.pending_positions();
385 let start = src_range.start.max(dst_range.start);
386 let end = src_range.end.min(dst_range.end);
387
388 let mut result = Vec::new();
389 for pos in start..end {
390 // Both lookups are guaranteed to succeed within the intersection range.
391 let (src_id, _) = src.get_at_position(pos).unwrap();
392 let dst_id = dst.get_pending_at_position(pos).unwrap();
393 result.push((pos, src_id, dst_id));
394 }
395 result
396}