Skip to main content

kvbm_logical/sequence/assignments/
external.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::ops::Range;
5
6use crate::{BlockId, KvbmSequenceHashProvider, SequenceHash};
7use dynamo_tokens::TokenBlock;
8
9use super::super::store::BlockStore;
10use crate::sequence::BlockSequenceError;
11
12/// Per-tier block_id tracking with an offset into the sequence.
13///
14/// Maintains an ordered mapping of `BlockId` → `SequenceHash` for assigned blocks,
15/// a staging area for blocks whose hashes have been computed but not yet committed,
16/// plus a FIFO queue of block_ids waiting for assignment. Index `i` in the assigned
17/// map corresponds to sequence position `offset + i`.
18///
19/// The three-phase lifecycle is:
20/// - **Unassigned** — block_ids queued for assignment (no hash yet).
21/// - **Staged** — block_ids paired with their `SequenceHash` but not yet committed.
22/// - **Assigned** — committed `BlockId → SequenceHash` pairs in positional order.
23///
24/// Multiple `ExternalBlockAssignments` instances can operate on the same `&[TokenBlock]` at
25/// different offsets (multi-tier).
26pub struct ExternalBlockAssignments {
27    store: BlockStore<(), SequenceHash, SequenceHash>,
28
29    /// Starting position in the sequence. Assignments begin at this position.
30    offset: usize,
31}
32
33impl std::fmt::Debug for ExternalBlockAssignments {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        f.debug_struct("ExternalBlockAssignments")
36            .field("assigned_count", &self.store.assigned_count())
37            .field("staged_count", &self.store.staged_count())
38            .field("unassigned_count", &self.store.unassigned_count())
39            .field("offset", &self.offset)
40            .finish()
41    }
42}
43
44impl ExternalBlockAssignments {
45    /// Creates a new `ExternalBlockAssignments` starting at the given offset.
46    pub fn new(offset: usize) -> Self {
47        Self {
48            store: BlockStore::new(),
49            offset,
50        }
51    }
52
53    /// Returns the starting position in the sequence.
54    pub fn offset(&self) -> usize {
55        self.offset
56    }
57
58    /// Checks whether a block_id is known (assigned, staged, or unassigned).
59    pub fn contains(&self, block_id: &BlockId) -> bool {
60        self.store.contains(block_id)
61    }
62
63    /// Positional access: returns `(BlockId, SequenceHash)` at the given index
64    /// (relative to offset) in the assigned collection.
65    pub fn get_assigned(&self, index: usize) -> Option<(BlockId, SequenceHash)> {
66        self.store
67            .get_assigned(index)
68            .map(|(&id, &hash)| (id, hash))
69    }
70
71    /// Returns the number of assigned blocks.
72    pub fn assigned_count(&self) -> usize {
73        self.store.assigned_count()
74    }
75
76    /// Returns the number of staged blocks.
77    pub fn staged_count(&self) -> usize {
78        self.store.staged_count()
79    }
80
81    /// Returns the number of unassigned (pending) block_ids.
82    pub fn unassigned_count(&self) -> usize {
83        self.store.unassigned_count()
84    }
85
86    /// Positional access: returns `(BlockId, SequenceHash)` at the given index
87    /// (relative to the start of staged) in the staged collection.
88    pub fn get_staged(&self, index: usize) -> Option<(BlockId, SequenceHash)> {
89        self.store.get_staged(index).map(|(&id, &hash)| (id, hash))
90    }
91
92    /// Iterates over assigned blocks in positional order, yielding `(BlockId, SequenceHash)`.
93    pub fn assigned_iter(&self) -> impl Iterator<Item = (BlockId, SequenceHash)> + '_ {
94        self.store.assigned_iter().map(|(&id, &hash)| (id, hash))
95    }
96
97    /// Iterates over staged blocks in staging order, yielding `(BlockId, SequenceHash)`.
98    pub fn staged_iter(&self) -> impl Iterator<Item = (BlockId, SequenceHash)> + '_ {
99        self.store.staged_iter().map(|(&id, &hash)| (id, hash))
100    }
101
102    /// Iterates over unassigned block_ids in FIFO order.
103    pub fn unassigned_iter(&self) -> impl Iterator<Item = BlockId> + '_ {
104        self.store.unassigned_iter().map(|(&id, _)| id)
105    }
106
107    /// Clears all assigned, staged, and unassigned blocks, preserving the offset.
108    pub fn clear(&mut self) {
109        self.store.clear();
110    }
111
112    /// Takes all staged blocks, returning them as a `Vec`.
113    pub fn take_staged(&mut self) -> Vec<(BlockId, SequenceHash)> {
114        self.store.take_staged()
115    }
116
117    /// Returns the next sequence position to be assigned:
118    /// `offset + assigned_count + staged_count`.
119    pub fn next_position(&self) -> usize {
120        self.offset + self.store.assigned_count() + self.store.staged_count()
121    }
122
123    /// Absolute position range of assigned blocks: `offset..offset + assigned_count`.
124    pub fn assigned_positions(&self) -> Range<usize> {
125        self.offset..self.offset + self.store.assigned_count()
126    }
127
128    /// Absolute position range of staged blocks:
129    /// `offset + assigned_count .. offset + assigned_count + staged_count`.
130    pub fn staged_positions(&self) -> Range<usize> {
131        let start = self.offset + self.store.assigned_count();
132        start..start + self.store.staged_count()
133    }
134
135    /// Get the assigned `(BlockId, SequenceHash)` at an absolute sequence position.
136    ///
137    /// Returns `None` if `abs_pos` is outside [`assigned_positions()`](Self::assigned_positions).
138    pub fn get_at_position(&self, abs_pos: usize) -> Option<(BlockId, SequenceHash)> {
139        let relative = abs_pos.checked_sub(self.offset)?;
140        self.get_assigned(relative)
141    }
142
143    /// Absolute position range that pending (unassigned) blocks will occupy once
144    /// flushed: `next_position()..next_position() + unassigned_count()`.
145    pub fn pending_positions(&self) -> Range<usize> {
146        let start = self.next_position();
147        start..start + self.store.unassigned_count()
148    }
149
150    /// Get the pending `BlockId` at an absolute sequence position (FIFO order).
151    ///
152    /// Position `next_position()` maps to the first unassigned block,
153    /// `next_position() + 1` to the second, etc.
154    /// Returns `None` if `abs_pos` is outside [`pending_positions()`](Self::pending_positions).
155    pub fn get_pending_at_position(&self, abs_pos: usize) -> Option<BlockId> {
156        let start = self.next_position();
157        let relative = abs_pos.checked_sub(start)?;
158        self.store.get_unassigned(relative).map(|(&id, _)| id)
159    }
160
161    /// Add new block_ids to the unassigned queue.
162    ///
163    /// `block_ids` is the **full, ordered** list of block IDs allocated to this
164    /// assignment set. Known IDs (already in assigned, staged, or unassigned) must
165    /// form a contiguous prefix and are silently skipped. New IDs are appended to
166    /// the unassigned FIFO queue.
167    ///
168    /// This method does **not** assign blocks — call
169    /// [`assign_pending`](Self::assign_pending) to pair unassigned IDs with
170    /// available sequence blocks.
171    ///
172    /// # Block ID rules
173    ///
174    /// The list is partitioned into a **known prefix** and a **new suffix**:
175    ///
176    /// - **Known prefix** — IDs already present in `assigned`, `staged`, or
177    ///   `unassigned`. These are silently skipped. They must appear contiguously
178    ///   at the front of the list; interleaving a known ID after an unknown one
179    ///   is an [`OrderingViolation`](BlockSequenceError::OrderingViolation).
180    /// - **New suffix** — IDs not yet seen. These are appended (in order) to
181    ///   the unassigned FIFO queue.
182    ///
183    /// # Algorithm (two-phase, atomic)
184    ///
185    /// 1. **Validate & collect** — iterate `block_ids`. Known IDs must form a
186    ///    contiguous prefix (skip them). Unknown IDs are collected into a temp
187    ///    buffer. If a known ID appears after an unknown one →
188    ///    `OrderingViolation` error. No state is mutated until validation passes.
189    /// 2. **Commit** — push all new IDs to the unassigned queue.
190    pub fn extend_block_ids(
191        &mut self,
192        block_ids: impl IntoIterator<Item = BlockId>,
193    ) -> Result<(), BlockSequenceError> {
194        // Phase 1: Validate & collect
195        let mut new_ids = Vec::new();
196        let mut new_id_set = indexmap::IndexSet::new();
197        let mut first_new_index: Option<usize> = None;
198
199        for (i, id) in block_ids.into_iter().enumerate() {
200            if self.contains(&id) {
201                // Known ID — must come before any new IDs
202                if let Some(first_new) = first_new_index {
203                    return Err(BlockSequenceError::OrderingViolation {
204                        known_id: id,
205                        new_id: new_ids[0],
206                        known_index: i,
207                        first_new_index: first_new,
208                    });
209                }
210                // Skip — already known
211            } else {
212                // Unknown ID — collect, rejecting internal duplicates
213                if !new_id_set.insert(id) {
214                    return Err(BlockSequenceError::DuplicateBlockId { block_id: id });
215                }
216                if first_new_index.is_none() {
217                    first_new_index = Some(i);
218                }
219                new_ids.push(id);
220            }
221        }
222
223        // Phase 2: Commit — no errors from here on
224        for id in new_ids {
225            self.store.insert_unassigned(id, ());
226        }
227
228        Ok(())
229    }
230
231    /// Inserts pre-matched `(BlockId, SequenceHash)` pairs directly into the
232    /// assigned collection.
233    ///
234    /// This is the entry point for blocks whose hashes are already known (e.g.
235    /// cache hits). Two-phase atomic: collects all items, validates no duplicate
236    /// BlockIds across all three collections, then commits to assigned.
237    pub fn extend_assigned(
238        &mut self,
239        items: impl IntoIterator<Item = (BlockId, SequenceHash)>,
240    ) -> Result<usize, BlockSequenceError> {
241        let items: Vec<(BlockId, SequenceHash)> = items.into_iter().collect();
242
243        if let Err(block_id) = self
244            .store
245            .validate_no_duplicates(items.iter().map(|(id, _)| *id), items.len())
246        {
247            return Err(BlockSequenceError::DuplicateBlockId { block_id });
248        }
249
250        let count = items.len();
251        for (id, hash) in items {
252            self.store.insert_assigned(id, hash);
253        }
254
255        Ok(count)
256    }
257
258    /// FIFO drain from unassigned into staged, pairing each block_id with the
259    /// sequence hash from the corresponding `TokenBlock`.
260    ///
261    /// Staging starts at `sequence_blocks[self.next_position()]` and proceeds
262    /// forward, consuming one unassigned ID per available block. The loop stops
263    /// when either the unassigned queue is empty or there are no more sequence
264    /// blocks.
265    ///
266    /// Returns the range of newly staged indices (relative to the start of the
267    /// staged collection before this call).
268    ///
269    /// Each staged pair is validated: the position embedded in the block's
270    /// `kvbm_sequence_hash()` must equal the expected sequence index.
271    /// A mismatch returns [`BlockSequenceError::PositionMismatch`].
272    pub fn stage_pending(
273        &mut self,
274        sequence_blocks: &[TokenBlock],
275    ) -> Result<Range<usize>, BlockSequenceError> {
276        let staged_start_idx = self.store.staged_count();
277        let start_pos = self.next_position();
278
279        // How many sequence blocks are available starting from our next position?
280        let available_blocks = sequence_blocks.len().saturating_sub(start_pos);
281
282        // How many can we stage? Min of available blocks and unassigned count.
283        let to_stage = available_blocks.min(self.store.unassigned_count());
284
285        // Phase 1: Validate all positions before mutating
286        for i in 0..to_stage {
287            let seq_pos = start_pos + i;
288            let block = &sequence_blocks[seq_pos];
289            let hash = block.kvbm_sequence_hash();
290
291            let actual_pos = hash.position();
292            if actual_pos != seq_pos as u64 {
293                let block_id = self.store.get_unassigned(i).map(|(&id, _)| id).unwrap();
294                return Err(BlockSequenceError::PositionMismatch {
295                    expected: seq_pos,
296                    actual: actual_pos,
297                    block_id,
298                });
299            }
300        }
301
302        // Phase 2: Commit — no errors from here on
303        for i in 0..to_stage {
304            let seq_pos = start_pos + i;
305            let hash = sequence_blocks[seq_pos].kvbm_sequence_hash();
306            let (block_id, _) = self.store.shift_unassigned().unwrap();
307            self.store.insert_staged(block_id, hash);
308        }
309
310        let staged_end_idx = self.store.staged_count();
311        Ok(staged_start_idx..staged_end_idx)
312    }
313
314    /// Moves all staged blocks into assigned (infallible).
315    ///
316    /// Returns the range of newly assigned indices (relative to the start of
317    /// the assigned collection before this call).
318    pub fn commit_staged(&mut self) -> Range<usize> {
319        let start_idx = self.store.assigned_count();
320
321        while let Some((block_id, hash)) = self.store.shift_staged() {
322            self.store.insert_assigned(block_id, hash);
323        }
324
325        let end_idx = self.store.assigned_count();
326        start_idx..end_idx
327    }
328
329    /// Drain the unassigned FIFO queue into assigned, pairing each block_id
330    /// with the next available `TokenBlock` starting at `next_position()`.
331    ///
332    /// This is a convenience method equivalent to calling
333    /// [`stage_pending`](Self::stage_pending) followed by
334    /// [`commit_staged`](Self::commit_staged).
335    ///
336    /// Returns the range of newly assigned indices (relative to offset).
337    /// An empty range means no new assignments were made.
338    pub fn assign_pending(
339        &mut self,
340        sequence_blocks: &[TokenBlock],
341    ) -> Result<Range<usize>, BlockSequenceError> {
342        self.stage_pending(sequence_blocks)?;
343        Ok(self.commit_staged())
344    }
345}
346
347/// Zip two [`ExternalBlockAssignments`] over their overlapping assigned positions.
348///
349/// For each absolute position where **both** `a` and `b` have assigned blocks,
350/// yields `(position, a_block_id, b_block_id)`.
351///
352/// Iteration order: ascending position.
353pub fn zip_assigned(
354    a: &ExternalBlockAssignments,
355    b: &ExternalBlockAssignments,
356) -> Vec<(usize, BlockId, BlockId)> {
357    let a_range = a.assigned_positions();
358    let b_range = b.assigned_positions();
359    let start = a_range.start.max(b_range.start);
360    let end = a_range.end.min(b_range.end);
361
362    let mut result = Vec::new();
363    for pos in start..end {
364        // Both lookups are guaranteed to succeed within the intersection range.
365        let (a_id, _) = a.get_at_position(pos).unwrap();
366        let (b_id, _) = b.get_at_position(pos).unwrap();
367        result.push((pos, a_id, b_id));
368    }
369    result
370}
371
372/// Zip `src` assigned positions with `dst` pending positions.
373///
374/// For each absolute position where `src` has an assigned block and `dst`
375/// has a pending (unassigned) block, yields `(position, src_block_id, dst_block_id)`.
376///
377/// This is the onboard/offload planning primitive: the result tells you
378/// which source blocks to transfer into which destination blocks.
379pub fn zip_assigned_pending(
380    src: &ExternalBlockAssignments,
381    dst: &ExternalBlockAssignments,
382) -> Vec<(usize, BlockId, BlockId)> {
383    let src_range = src.assigned_positions();
384    let dst_range = dst.pending_positions();
385    let start = src_range.start.max(dst_range.start);
386    let end = src_range.end.min(dst_range.end);
387
388    let mut result = Vec::new();
389    for pos in start..end {
390        // Both lookups are guaranteed to succeed within the intersection range.
391        let (src_id, _) = src.get_at_position(pos).unwrap();
392        let dst_id = dst.get_pending_at_position(pos).unwrap();
393        result.push((pos, src_id, dst_id));
394    }
395    result
396}