Skip to main content

kvbm_logical/sequence/assignments/
logical.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Manages physical block guards through three ordered collections,
5//! mirroring the block lifecycle state machine:
6//! `MutableBlock<T>` → `CompleteBlock<T>` → `ImmutableBlock<T>`.
7
8use crate::blocks::{BlockError, BlockMetadata, CompleteBlock, ImmutableBlock, MutableBlock};
9use crate::manager::BlockManager;
10use crate::{BlockId, SequenceHash};
11use dynamo_tokens::TokenBlock;
12
13use super::super::store::BlockStore;
14
15/// Error type for [`LogicalBlockAssignments`] operations.
16#[derive(Debug, thiserror::Error)]
17pub enum LogicalBlockAssignmentError<T: BlockMetadata> {
18    /// A mutable block_id in the input already exists in one of the three collections.
19    #[error("duplicate block_id {block_id} already present")]
20    DuplicateBlockId {
21        /// The first duplicate block_id detected.
22        block_id: BlockId,
23        /// All input blocks returned for recovery (no leaks).
24        blocks: Vec<MutableBlock<T>>,
25    },
26
27    /// An immutable block_id in the input already exists in one of the three collections.
28    #[error("duplicate block_id {block_id} already present")]
29    DuplicateAssignedBlockId {
30        /// The first duplicate block_id detected.
31        block_id: BlockId,
32        /// All input blocks returned for recovery (no leaks).
33        blocks: Vec<ImmutableBlock<T>>,
34    },
35
36    /// A matched block's sequence hash does not match the expected sequence hash.
37    #[error("sequence hash mismatch at position {position}: expected {expected}, got {actual}")]
38    SequenceHashMismatch {
39        /// The position in the sequence where the mismatch was detected.
40        position: usize,
41        /// The expected hash from the token sequence.
42        expected: SequenceHash,
43        /// The actual hash from the matched block.
44        actual: SequenceHash,
45        /// All input blocks returned for recovery (no leaks).
46        blocks: Vec<ImmutableBlock<T>>,
47    },
48}
49
50/// Manages the physical block guards (RAII types) through three ordered collections,
51/// mirroring the block lifecycle state machine:
52/// `MutableBlock<T>` → `CompleteBlock<T>` → `ImmutableBlock<T>`.
53///
54/// Provides the same ordered-collection semantics as
55/// [`ExternalBlockAssignments`](super::ExternalBlockAssignments) but at the guard level rather
56/// than the identity level.
57pub struct LogicalBlockAssignments<T: BlockMetadata> {
58    store: BlockStore<MutableBlock<T>, CompleteBlock<T>, ImmutableBlock<T>>,
59}
60
61impl<T: BlockMetadata> LogicalBlockAssignments<T> {
62    /// Creates an empty `LogicalBlockAssignments`.
63    pub fn new() -> Self {
64        Self {
65            store: BlockStore::new(),
66        }
67    }
68
69    // -- Counts & Queries --------------------------------------------------
70
71    /// Returns the number of assigned (registered/immutable) blocks.
72    pub fn assigned_count(&self) -> usize {
73        self.store.assigned_count()
74    }
75
76    /// Returns the number of staged (complete) blocks.
77    pub fn staged_count(&self) -> usize {
78        self.store.staged_count()
79    }
80
81    /// Returns the number of unassigned (mutable) blocks.
82    pub fn unassigned_count(&self) -> usize {
83        self.store.unassigned_count()
84    }
85
86    /// Returns `true` if all three collections are empty.
87    pub fn is_empty(&self) -> bool {
88        self.store.is_empty()
89    }
90
91    /// Checks whether a `BlockId` is present in any of the three collections.
92    pub fn contains(&self, block_id: &BlockId) -> bool {
93        self.store.contains(block_id)
94    }
95
96    // -- Positional Access -------------------------------------------------
97
98    /// Returns the assigned block at the given index (insertion order).
99    pub fn get_assigned(&self, index: usize) -> Option<(&BlockId, &ImmutableBlock<T>)> {
100        self.store.get_assigned(index)
101    }
102
103    /// Returns the staged block at the given index (staging order).
104    pub fn get_staged(&self, index: usize) -> Option<(&BlockId, &CompleteBlock<T>)> {
105        self.store.get_staged(index)
106    }
107
108    /// Returns the unassigned block at the given index (FIFO order).
109    pub fn get_unassigned(&self, index: usize) -> Option<(&BlockId, &MutableBlock<T>)> {
110        self.store.get_unassigned(index)
111    }
112
113    // -- Iteration ---------------------------------------------------------
114
115    /// Iterates over assigned blocks in positional order.
116    pub fn assigned_iter(&self) -> impl Iterator<Item = (&BlockId, &ImmutableBlock<T>)> {
117        self.store.assigned_iter()
118    }
119
120    /// Iterates over staged blocks in staging order.
121    pub fn staged_iter(&self) -> impl Iterator<Item = (&BlockId, &CompleteBlock<T>)> {
122        self.store.staged_iter()
123    }
124
125    /// Iterates over unassigned blocks in FIFO order.
126    pub fn unassigned_iter(&self) -> impl Iterator<Item = (&BlockId, &MutableBlock<T>)> {
127        self.store.unassigned_iter()
128    }
129
130    /// Iterates over all block IDs across all three collections in lifecycle
131    /// order: assigned → staged → unassigned.
132    pub fn all_block_ids(&self) -> impl Iterator<Item = &BlockId> {
133        self.store.all_block_ids()
134    }
135
136    // -- Mutation -----------------------------------------------------------
137
138    /// Adds mutable blocks to the unassigned queue.
139    ///
140    /// Two-phase atomic: collects all blocks, validates no duplicate BlockIds
141    /// across all three collections, then commits to unassigned. On error,
142    /// all blocks are returned in the error variant (no leaks).
143    pub fn extend_blocks(
144        &mut self,
145        blocks: impl IntoIterator<Item = MutableBlock<T>>,
146    ) -> Result<usize, LogicalBlockAssignmentError<T>> {
147        let blocks: Vec<MutableBlock<T>> = blocks.into_iter().collect();
148
149        if let Err(block_id) = self
150            .store
151            .validate_no_duplicates(blocks.iter().map(|b| b.block_id()), blocks.len())
152        {
153            return Err(LogicalBlockAssignmentError::DuplicateBlockId { block_id, blocks });
154        }
155
156        let count = blocks.len();
157        for block in blocks {
158            let id = block.block_id();
159            self.store.insert_unassigned(id, block);
160        }
161
162        Ok(count)
163    }
164
165    /// Inserts pre-matched immutable blocks directly into the assigned collection.
166    ///
167    /// This is the entry point for blocks retrieved via
168    /// [`BlockManager::match_blocks`] — blocks that already exist in the
169    /// manager's pools and can skip the unassigned → staged → assigned pipeline.
170    ///
171    /// Two-phase atomic: collects all blocks, validates no duplicate BlockIds
172    /// across all three collections, then commits to assigned. On error,
173    /// all blocks are returned in the error variant (no leaks).
174    pub fn extend_assigned(
175        &mut self,
176        blocks: impl IntoIterator<Item = ImmutableBlock<T>>,
177    ) -> Result<usize, LogicalBlockAssignmentError<T>> {
178        let blocks: Vec<ImmutableBlock<T>> = blocks.into_iter().collect();
179
180        if let Err(block_id) = self
181            .store
182            .validate_no_duplicates(blocks.iter().map(|b| b.block_id()), blocks.len())
183        {
184            return Err(LogicalBlockAssignmentError::DuplicateAssignedBlockId { block_id, blocks });
185        }
186
187        let count = blocks.len();
188        for block in blocks {
189            let id = block.block_id();
190            self.store.insert_assigned(id, block);
191        }
192
193        Ok(count)
194    }
195
196    /// FIFO drain from unassigned, completing each block with the corresponding
197    /// token block.
198    ///
199    /// Stages `min(sequence_blocks.len(), unassigned.len())` blocks. On
200    /// [`BlockError`], already-staged blocks remain in staged; the failed block
201    /// is returned in the error.
202    pub fn stage(
203        &mut self,
204        sequence_blocks: &[TokenBlock],
205    ) -> Result<usize, BlockError<MutableBlock<T>>> {
206        let to_stage = sequence_blocks.len().min(self.store.unassigned_count());
207
208        #[allow(clippy::needless_range_loop)]
209        for i in 0..to_stage {
210            let (block_id, mutable) = self.store.shift_unassigned().unwrap();
211            match mutable.complete(&sequence_blocks[i]) {
212                Ok(complete) => {
213                    self.store.insert_staged(block_id, complete);
214                }
215                Err(err) => {
216                    return Err(err);
217                }
218            }
219        }
220
221        Ok(to_stage)
222    }
223
224    /// Takes all staged blocks (FIFO order), registering each with the block
225    /// manager and moving them to assigned.
226    ///
227    /// Returns the number of blocks registered.
228    pub fn register(&mut self, manager: &BlockManager<T>) -> usize {
229        let count = self.store.staged_count();
230
231        while let Some((block_id, complete)) = self.store.shift_staged() {
232            let immutable = manager.register_block(complete);
233            self.store.insert_assigned(block_id, immutable);
234        }
235
236        count
237    }
238
239    /// LIFO-removes the last unassigned block.
240    pub fn pop_last_unassigned(&mut self) -> Option<(BlockId, MutableBlock<T>)> {
241        self.store.pop_unassigned()
242    }
243
244    /// Drops all guards across all three collections (RAII returns blocks to pools).
245    pub fn clear(&mut self) {
246        self.store.clear();
247    }
248
249    /// Takes all assigned blocks, returning them as a `Vec`.
250    pub fn take_assigned(&mut self) -> Vec<(BlockId, ImmutableBlock<T>)> {
251        self.store.take_assigned()
252    }
253
254    /// Takes all staged blocks, returning them as a `Vec`.
255    pub fn take_staged(&mut self) -> Vec<(BlockId, CompleteBlock<T>)> {
256        self.store.take_staged()
257    }
258
259    /// Takes all unassigned blocks, returning them as a `Vec`.
260    pub fn take_unassigned(&mut self) -> Vec<(BlockId, MutableBlock<T>)> {
261        self.store.take_unassigned()
262    }
263}
264
265impl<T: BlockMetadata> Default for LogicalBlockAssignments<T> {
266    fn default() -> Self {
267        Self::new()
268    }
269}
270
271impl<T: BlockMetadata> std::fmt::Debug for LogicalBlockAssignments<T> {
272    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273        f.debug_struct("LogicalBlockAssignments")
274            .field("assigned_count", &self.store.assigned_count())
275            .field("staged_count", &self.store.staged_count())
276            .field("unassigned_count", &self.store.unassigned_count())
277            .finish()
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use super::*;
284    use crate::sequence::BlockSequence;
285    use crate::testing::{TestMeta, create_test_manager};
286
287    const BLOCK_SIZE: u32 = 4;
288
289    fn create_sequence(num_blocks: usize) -> BlockSequence {
290        let total_tokens = num_blocks * BLOCK_SIZE as usize;
291        let tokens: Vec<u32> = (0..total_tokens as u32).collect();
292        BlockSequence::new(tokens, BLOCK_SIZE, None)
293    }
294
295    // =========================================================================
296    // Empty construction
297    // =========================================================================
298
299    #[test]
300    fn test_empty_construction() {
301        let la = LogicalBlockAssignments::<TestMeta>::new();
302        assert!(la.is_empty());
303        assert_eq!(la.assigned_count(), 0);
304        assert_eq!(la.staged_count(), 0);
305        assert_eq!(la.unassigned_count(), 0);
306        assert!(!la.contains(&0));
307    }
308
309    #[test]
310    fn test_default_is_empty() {
311        let la = LogicalBlockAssignments::<TestMeta>::default();
312        assert!(la.is_empty());
313    }
314
315    // =========================================================================
316    // extend_blocks
317    // =========================================================================
318
319    #[test]
320    fn test_extend_blocks_basic() {
321        let manager = create_test_manager::<TestMeta>(10);
322        let blocks = manager.allocate_blocks(5).unwrap();
323        let ids: Vec<BlockId> = blocks.iter().map(|b| b.block_id()).collect();
324
325        let mut la = LogicalBlockAssignments::new();
326        let count = la.extend_blocks(blocks).unwrap();
327
328        assert_eq!(count, 5);
329        assert_eq!(la.unassigned_count(), 5);
330        assert!(!la.is_empty());
331
332        // Verify FIFO ordering
333        for (i, expected_id) in ids.iter().enumerate() {
334            let (id, _) = la.get_unassigned(i).unwrap();
335            assert_eq!(id, expected_id);
336        }
337    }
338
339    // =========================================================================
340    // extend_assigned
341    // =========================================================================
342
343    /// Helper: allocate, complete, and register blocks through the manager,
344    /// returning `ImmutableBlock`s suitable for `extend_assigned`.
345    fn register_blocks_directly(
346        manager: &BlockManager<TestMeta>,
347        seq: &BlockSequence,
348        count: usize,
349    ) -> Vec<ImmutableBlock<TestMeta>> {
350        let mutables = manager.allocate_blocks(count).unwrap();
351        mutables
352            .into_iter()
353            .zip(seq.blocks().iter())
354            .map(|(m, tb)| manager.register_block(m.complete(tb).unwrap()))
355            .collect()
356    }
357
358    #[test]
359    fn test_extend_assigned_basic() {
360        let manager = create_test_manager::<TestMeta>(10);
361        let seq = create_sequence(3);
362        let immutables = register_blocks_directly(&manager, &seq, 3);
363        let ids: Vec<BlockId> = immutables.iter().map(|b| b.block_id()).collect();
364        let hashes = seq.all_sequence_hashes();
365
366        let mut la = LogicalBlockAssignments::new();
367        let count = la.extend_assigned(immutables).unwrap();
368
369        assert_eq!(count, 3);
370        assert_eq!(la.assigned_count(), 3);
371        assert_eq!(la.staged_count(), 0);
372        assert_eq!(la.unassigned_count(), 0);
373
374        for (i, expected_id) in ids.iter().enumerate() {
375            let (id, imm) = la.get_assigned(i).unwrap();
376            assert_eq!(id, expected_id);
377            assert_eq!(imm.block_id(), *expected_id);
378            assert_eq!(imm.sequence_hash(), hashes[i]);
379        }
380    }
381
382    #[test]
383    fn test_extend_assigned_ordering_preserved() {
384        let manager = create_test_manager::<TestMeta>(10);
385        let seq = create_sequence(5);
386        let immutables = register_blocks_directly(&manager, &seq, 5);
387        let ids: Vec<BlockId> = immutables.iter().map(|b| b.block_id()).collect();
388
389        let mut la = LogicalBlockAssignments::new();
390        la.extend_assigned(immutables).unwrap();
391
392        // Verify insertion order matches input order
393        for (i, expected_id) in ids.iter().enumerate() {
394            let (id, _) = la.get_assigned(i).unwrap();
395            assert_eq!(id, expected_id);
396        }
397    }
398
399    #[test]
400    fn test_extend_assigned_contains() {
401        let manager = create_test_manager::<TestMeta>(10);
402        let seq = create_sequence(3);
403        let immutables = register_blocks_directly(&manager, &seq, 3);
404        let ids: Vec<BlockId> = immutables.iter().map(|b| b.block_id()).collect();
405
406        let mut la = LogicalBlockAssignments::new();
407        la.extend_assigned(immutables).unwrap();
408
409        for id in &ids {
410            assert!(la.contains(id));
411        }
412        assert!(!la.contains(&999));
413    }
414
415    #[test]
416    fn test_extend_assigned_then_extend_stage_register() {
417        let manager = create_test_manager::<TestMeta>(10);
418        let seq = create_sequence(5);
419
420        // Register first 3 as matched prefix
421        let matched = register_blocks_directly(&manager, &seq, 3);
422        let matched_ids: Vec<BlockId> = matched.iter().map(|b| b.block_id()).collect();
423
424        let mut la = LogicalBlockAssignments::new();
425        la.extend_assigned(matched).unwrap();
426        assert_eq!(la.assigned_count(), 3);
427
428        // Allocate new blocks for the remaining 2 positions
429        let new_blocks = manager.allocate_blocks(2).unwrap();
430        let new_ids: Vec<BlockId> = new_blocks.iter().map(|b| b.block_id()).collect();
431        la.extend_blocks(new_blocks).unwrap();
432
433        // Stage and register the new blocks
434        la.stage(&seq.blocks()[3..5]).unwrap();
435        la.register(&manager);
436
437        assert_eq!(la.assigned_count(), 5);
438        assert_eq!(la.staged_count(), 0);
439        assert_eq!(la.unassigned_count(), 0);
440
441        // First 3 are the matched prefix
442        for (i, expected_id) in matched_ids.iter().enumerate() {
443            let (id, _) = la.get_assigned(i).unwrap();
444            assert_eq!(id, expected_id);
445        }
446        // Last 2 are the newly registered blocks
447        for (i, expected_id) in new_ids.iter().enumerate() {
448            let (id, _) = la.get_assigned(3 + i).unwrap();
449            assert_eq!(id, expected_id);
450        }
451    }
452
453    #[test]
454    fn test_extend_assigned_with_match_blocks() {
455        let manager = create_test_manager::<TestMeta>(10);
456        let seq = create_sequence(3);
457        let hashes = seq.all_sequence_hashes();
458
459        // Populate the manager: allocate → complete → register → drop
460        let mutables = manager.allocate_blocks(3).unwrap();
461        let registered: Vec<ImmutableBlock<TestMeta>> = mutables
462            .into_iter()
463            .zip(seq.blocks().iter())
464            .map(|(m, tb)| manager.register_block(m.complete(tb).unwrap()))
465            .collect();
466        drop(registered);
467
468        // match_blocks retrieves them from the manager's pools
469        let matched = manager.match_blocks(&hashes);
470        assert_eq!(matched.len(), 3);
471
472        let mut la = LogicalBlockAssignments::new();
473        la.extend_assigned(matched).unwrap();
474        assert_eq!(la.assigned_count(), 3);
475
476        // Verify hashes match
477        for (i, expected_hash) in hashes.iter().enumerate() {
478            let (_, imm) = la.get_assigned(i).unwrap();
479            assert_eq!(imm.sequence_hash(), *expected_hash);
480        }
481    }
482
483    #[test]
484    fn test_extend_assigned_empty() {
485        let mut la = LogicalBlockAssignments::<TestMeta>::new();
486        let count = la.extend_assigned(Vec::new()).unwrap();
487        assert_eq!(count, 0);
488        assert!(la.is_empty());
489    }
490
491    // =========================================================================
492    // stage
493    // =========================================================================
494
495    #[test]
496    fn test_stage_basic() {
497        let manager = create_test_manager::<TestMeta>(10);
498        let blocks = manager.allocate_blocks(3).unwrap();
499        let ids: Vec<BlockId> = blocks.iter().map(|b| b.block_id()).collect();
500        let seq = create_sequence(3);
501
502        let mut la = LogicalBlockAssignments::new();
503        la.extend_blocks(blocks).unwrap();
504
505        let staged = la.stage(seq.blocks()).unwrap();
506        assert_eq!(staged, 3);
507        assert_eq!(la.staged_count(), 3);
508        assert_eq!(la.unassigned_count(), 0);
509
510        // Verify FIFO ordering in staged
511        for (i, expected_id) in ids.iter().enumerate() {
512            let (id, _) = la.get_staged(i).unwrap();
513            assert_eq!(id, expected_id);
514        }
515    }
516
517    #[test]
518    fn test_stage_fifo_drain() {
519        let manager = create_test_manager::<TestMeta>(10);
520        let blocks = manager.allocate_blocks(5).unwrap();
521        let ids: Vec<BlockId> = blocks.iter().map(|b| b.block_id()).collect();
522        let seq = create_sequence(3);
523
524        let mut la = LogicalBlockAssignments::new();
525        la.extend_blocks(blocks).unwrap();
526
527        // Stage 3 out of 5
528        la.stage(seq.blocks()).unwrap();
529
530        // First 3 should have been drained from unassigned (FIFO)
531        assert_eq!(la.unassigned_count(), 2);
532        let (remaining_0, _) = la.get_unassigned(0).unwrap();
533        let (remaining_1, _) = la.get_unassigned(1).unwrap();
534        assert_eq!(*remaining_0, ids[3]);
535        assert_eq!(*remaining_1, ids[4]);
536    }
537
538    #[test]
539    fn test_stage_partial_fewer_blocks_than_unassigned() {
540        let manager = create_test_manager::<TestMeta>(10);
541        let blocks = manager.allocate_blocks(5).unwrap();
542        let seq = create_sequence(3);
543
544        let mut la = LogicalBlockAssignments::new();
545        la.extend_blocks(blocks).unwrap();
546
547        let staged = la.stage(seq.blocks()).unwrap();
548        assert_eq!(staged, 3);
549        assert_eq!(la.staged_count(), 3);
550        assert_eq!(la.unassigned_count(), 2);
551    }
552
553    #[test]
554    fn test_stage_partial_fewer_unassigned_than_blocks() {
555        let manager = create_test_manager::<TestMeta>(10);
556        let blocks = manager.allocate_blocks(2).unwrap();
557        let seq = create_sequence(5);
558
559        let mut la = LogicalBlockAssignments::new();
560        la.extend_blocks(blocks).unwrap();
561
562        let staged = la.stage(seq.blocks()).unwrap();
563        assert_eq!(staged, 2);
564        assert_eq!(la.staged_count(), 2);
565        assert_eq!(la.unassigned_count(), 0);
566    }
567
568    #[test]
569    fn test_stage_block_size_mismatch_recovery() {
570        let manager = create_test_manager::<TestMeta>(10);
571        let blocks = manager.allocate_blocks(3).unwrap();
572
573        // Sequence with block_size=8 (manager uses block_size=4)
574        let tokens: Vec<u32> = (0..24).collect();
575        let bad_seq = BlockSequence::new(tokens, 8, None);
576
577        let mut la = LogicalBlockAssignments::new();
578        la.extend_blocks(blocks).unwrap();
579
580        let result = la.stage(bad_seq.blocks());
581        assert!(result.is_err());
582
583        match result.unwrap_err() {
584            BlockError::BlockSizeMismatch {
585                expected,
586                actual,
587                block,
588            } => {
589                assert_eq!(expected, 4);
590                assert_eq!(actual, 8);
591                // Block recovered — drop to return to pool
592                drop(block);
593            }
594        }
595
596        // First block was removed from unassigned and returned in error
597        assert_eq!(la.staged_count(), 0);
598        assert_eq!(la.unassigned_count(), 2);
599    }
600
601    #[test]
602    fn test_stage_partial_then_mismatch() {
603        let manager = create_test_manager::<TestMeta>(10);
604        let blocks = manager.allocate_blocks(3).unwrap();
605        let good_seq = create_sequence(2);
606        let bad_tokens: Vec<u32> = (0..8).collect();
607        let bad_seq = BlockSequence::new(bad_tokens, 8, None);
608
609        let mut la = LogicalBlockAssignments::new();
610        la.extend_blocks(blocks).unwrap();
611
612        // Stage first 2 successfully
613        let staged = la.stage(good_seq.blocks()).unwrap();
614        assert_eq!(staged, 2);
615        assert_eq!(la.staged_count(), 2);
616        assert_eq!(la.unassigned_count(), 1);
617
618        // Try to stage 1 more with wrong size → error
619        let result = la.stage(bad_seq.blocks());
620        assert!(result.is_err());
621
622        // The 2 already-staged blocks remain
623        assert_eq!(la.staged_count(), 2);
624        // The failed block was removed from unassigned and returned in error
625        assert_eq!(la.unassigned_count(), 0);
626    }
627
628    // =========================================================================
629    // register
630    // =========================================================================
631
632    #[test]
633    fn test_register_basic() {
634        let manager = create_test_manager::<TestMeta>(10);
635        let blocks = manager.allocate_blocks(3).unwrap();
636        let ids: Vec<BlockId> = blocks.iter().map(|b| b.block_id()).collect();
637        let seq = create_sequence(3);
638
639        let mut la = LogicalBlockAssignments::new();
640        la.extend_blocks(blocks).unwrap();
641        la.stage(seq.blocks()).unwrap();
642
643        let registered = la.register(&manager);
644        assert_eq!(registered, 3);
645        assert_eq!(la.assigned_count(), 3);
646        assert_eq!(la.staged_count(), 0);
647
648        // Verify ordering preserved
649        for (i, expected_id) in ids.iter().enumerate() {
650            let (id, imm) = la.get_assigned(i).unwrap();
651            assert_eq!(id, expected_id);
652            assert_eq!(imm.block_id(), *expected_id);
653        }
654    }
655
656    #[test]
657    fn test_register_empty_staged() {
658        let manager = create_test_manager::<TestMeta>(10);
659        let mut la = LogicalBlockAssignments::<TestMeta>::new();
660
661        let registered = la.register(&manager);
662        assert_eq!(registered, 0);
663    }
664
665    // =========================================================================
666    // Full pipeline
667    // =========================================================================
668
669    #[test]
670    fn test_full_pipeline_extend_stage_register() {
671        let manager = create_test_manager::<TestMeta>(10);
672        let blocks = manager.allocate_blocks(5).unwrap();
673        let ids: Vec<BlockId> = blocks.iter().map(|b| b.block_id()).collect();
674        let seq = create_sequence(5);
675        let expected_hashes = seq.all_sequence_hashes();
676
677        let mut la = LogicalBlockAssignments::new();
678
679        // extend
680        la.extend_blocks(blocks).unwrap();
681        assert_eq!(la.unassigned_count(), 5);
682
683        // stage
684        la.stage(seq.blocks()).unwrap();
685        assert_eq!(la.staged_count(), 5);
686        assert_eq!(la.unassigned_count(), 0);
687
688        // register
689        la.register(&manager);
690        assert_eq!(la.assigned_count(), 5);
691        assert_eq!(la.staged_count(), 0);
692
693        // Verify correct blocks in correct order with correct hashes
694        for (i, expected_id) in ids.iter().enumerate() {
695            let (id, immutable) = la.get_assigned(i).unwrap();
696            assert_eq!(id, expected_id);
697            assert_eq!(immutable.block_id(), *expected_id);
698            assert_eq!(immutable.sequence_hash(), expected_hashes[i]);
699        }
700    }
701
702    #[test]
703    fn test_full_pipeline_incremental() {
704        let manager = create_test_manager::<TestMeta>(10);
705        let blocks = manager.allocate_blocks(6).unwrap();
706        let ids: Vec<BlockId> = blocks.iter().map(|b| b.block_id()).collect();
707        let seq = create_sequence(6);
708
709        let mut la = LogicalBlockAssignments::new();
710        la.extend_blocks(blocks).unwrap();
711
712        // Stage first 3, register them
713        la.stage(&seq.blocks()[..3]).unwrap();
714        la.register(&manager);
715        assert_eq!(la.assigned_count(), 3);
716        assert_eq!(la.unassigned_count(), 3);
717
718        // Stage remaining 3, register them
719        la.stage(&seq.blocks()[..3]).unwrap();
720        la.register(&manager);
721        assert_eq!(la.assigned_count(), 6);
722        assert_eq!(la.unassigned_count(), 0);
723
724        // Verify all 6 in order
725        for (i, expected_id) in ids.iter().enumerate() {
726            let (id, _) = la.get_assigned(i).unwrap();
727            assert_eq!(id, expected_id);
728        }
729    }
730
731    // =========================================================================
732    // contains across all three collections
733    // =========================================================================
734
735    #[test]
736    fn test_contains_across_collections() {
737        let manager = create_test_manager::<TestMeta>(10);
738        let blocks = manager.allocate_blocks(6).unwrap();
739        let ids: Vec<BlockId> = blocks.iter().map(|b| b.block_id()).collect();
740        let seq = create_sequence(4);
741
742        let mut la = LogicalBlockAssignments::new();
743        la.extend_blocks(blocks).unwrap();
744
745        // Stage first 2, then register them
746        la.stage(&seq.blocks()[..2]).unwrap();
747        la.register(&manager);
748
749        // Stage 2 more
750        la.stage(&seq.blocks()[..2]).unwrap();
751
752        // State: 2 assigned, 2 staged, 2 unassigned
753        assert_eq!(la.assigned_count(), 2);
754        assert_eq!(la.staged_count(), 2);
755        assert_eq!(la.unassigned_count(), 2);
756
757        // All 6 should be contained
758        for id in &ids {
759            assert!(la.contains(id), "block_id {id} should be contained");
760        }
761        assert!(!la.contains(&999));
762    }
763
764    // =========================================================================
765    // clear and drain
766    // =========================================================================
767
768    #[test]
769    fn test_clear() {
770        let manager = create_test_manager::<TestMeta>(10);
771        let blocks = manager.allocate_blocks(3).unwrap();
772        let seq = create_sequence(3);
773
774        let mut la = LogicalBlockAssignments::new();
775        la.extend_blocks(blocks).unwrap();
776        la.stage(seq.blocks()).unwrap();
777        la.register(&manager);
778        assert_eq!(la.assigned_count(), 3);
779
780        la.clear();
781        assert!(la.is_empty());
782    }
783
784    #[test]
785    fn test_take_assigned() {
786        let manager = create_test_manager::<TestMeta>(10);
787        let blocks = manager.allocate_blocks(3).unwrap();
788        let ids: Vec<BlockId> = blocks.iter().map(|b| b.block_id()).collect();
789        let seq = create_sequence(3);
790
791        let mut la = LogicalBlockAssignments::new();
792        la.extend_blocks(blocks).unwrap();
793        la.stage(seq.blocks()).unwrap();
794        la.register(&manager);
795
796        let drained = la.take_assigned();
797        assert_eq!(drained.len(), 3);
798        assert_eq!(la.assigned_count(), 0);
799
800        for (i, (id, _)) in drained.iter().enumerate() {
801            assert_eq!(*id, ids[i]);
802        }
803    }
804
805    #[test]
806    fn test_take_staged() {
807        let manager = create_test_manager::<TestMeta>(10);
808        let blocks = manager.allocate_blocks(3).unwrap();
809        let ids: Vec<BlockId> = blocks.iter().map(|b| b.block_id()).collect();
810        let seq = create_sequence(3);
811
812        let mut la = LogicalBlockAssignments::new();
813        la.extend_blocks(blocks).unwrap();
814        la.stage(seq.blocks()).unwrap();
815
816        let drained = la.take_staged();
817        assert_eq!(drained.len(), 3);
818        assert_eq!(la.staged_count(), 0);
819
820        for (i, (id, _)) in drained.iter().enumerate() {
821            assert_eq!(*id, ids[i]);
822        }
823    }
824
825    #[test]
826    fn test_take_unassigned() {
827        let manager = create_test_manager::<TestMeta>(10);
828        let blocks = manager.allocate_blocks(3).unwrap();
829        let ids: Vec<BlockId> = blocks.iter().map(|b| b.block_id()).collect();
830
831        let mut la = LogicalBlockAssignments::new();
832        la.extend_blocks(blocks).unwrap();
833
834        let drained = la.take_unassigned();
835        assert_eq!(drained.len(), 3);
836        assert_eq!(la.unassigned_count(), 0);
837
838        for (i, (id, _)) in drained.iter().enumerate() {
839            assert_eq!(*id, ids[i]);
840        }
841    }
842
843    // =========================================================================
844    // Negative: extend_assigned duplicate detection
845    // =========================================================================
846
847    #[test]
848    fn test_extend_assigned_duplicate_already_in_assigned() {
849        let manager = create_test_manager::<TestMeta>(10);
850        let seq = create_sequence(3);
851        let immutables = register_blocks_directly(&manager, &seq, 3);
852
853        // Clone before consuming — ImmutableBlock is Clone
854        let clones: Vec<ImmutableBlock<TestMeta>> = immutables.to_vec();
855        let dup_id = clones[0].block_id();
856
857        let mut la = LogicalBlockAssignments::new();
858        la.extend_assigned(immutables).unwrap();
859
860        // Second call with same block_ids → duplicate error
861        let result = la.extend_assigned(clones);
862        assert!(result.is_err());
863        match result.unwrap_err() {
864            LogicalBlockAssignmentError::DuplicateAssignedBlockId { block_id, blocks } => {
865                assert_eq!(block_id, dup_id);
866                assert_eq!(blocks.len(), 3);
867            }
868            other => panic!("expected DuplicateAssignedBlockId, got: {other:?}"),
869        }
870
871        // Atomic rollback: assigned unchanged
872        assert_eq!(la.assigned_count(), 3);
873    }
874
875    #[test]
876    fn test_extend_assigned_duplicate_within_input_batch() {
877        let manager = create_test_manager::<TestMeta>(10);
878        let seq = create_sequence(1);
879        let immutables = register_blocks_directly(&manager, &seq, 1);
880        let dup = immutables[0].clone();
881        let dup_id = dup.block_id();
882
883        // Two copies of the same block in one batch
884        let batch = vec![immutables.into_iter().next().unwrap(), dup];
885
886        let mut la = LogicalBlockAssignments::new();
887        let result = la.extend_assigned(batch);
888        assert!(result.is_err());
889        match result.unwrap_err() {
890            LogicalBlockAssignmentError::DuplicateAssignedBlockId { block_id, blocks } => {
891                assert_eq!(block_id, dup_id);
892                assert_eq!(blocks.len(), 2);
893            }
894            other => panic!("expected DuplicateAssignedBlockId, got: {other:?}"),
895        }
896
897        // Nothing committed
898        assert!(la.is_empty());
899    }
900
901    #[test]
902    fn test_extend_assigned_disjoint_from_staged() {
903        // Verifies that extend_assigned succeeds when the new block_ids
904        // are disjoint from those already in staged (no collision).
905        let manager = create_test_manager::<TestMeta>(10);
906        let blocks = manager.allocate_blocks(3).unwrap();
907        let ids: Vec<BlockId> = blocks.iter().map(|b| b.block_id()).collect();
908        let seq = create_sequence(3);
909
910        let mut la = LogicalBlockAssignments::new();
911        la.extend_blocks(blocks).unwrap();
912        la.stage(seq.blocks()).unwrap();
913        // 3 blocks now in staged
914
915        let other_blocks = manager.allocate_blocks(3).unwrap();
916        let immutables: Vec<ImmutableBlock<TestMeta>> = other_blocks
917            .into_iter()
918            .zip(seq.blocks().iter())
919            .map(|(m, tb)| manager.register_block(m.complete(tb).unwrap()))
920            .collect();
921
922        // Different block_ids from the pool — no collision with staged
923        let other_ids: Vec<BlockId> = immutables.iter().map(|b| b.block_id()).collect();
924        assert!(ids.iter().all(|id| !other_ids.contains(id)));
925        la.extend_assigned(immutables).unwrap();
926        assert_eq!(la.assigned_count(), 3);
927        assert_eq!(la.staged_count(), 3);
928    }
929
930    #[test]
931    fn test_extend_assigned_disjoint_from_unassigned() {
932        // Verifies that extend_assigned succeeds when the new block_ids
933        // are disjoint from those already in unassigned (no collision).
934        let manager = create_test_manager::<TestMeta>(10);
935        let seq = create_sequence(3);
936
937        // Put blocks in unassigned
938        let blocks = manager.allocate_blocks(3).unwrap();
939        let unassigned_ids: Vec<BlockId> = blocks.iter().map(|b| b.block_id()).collect();
940        let mut la = LogicalBlockAssignments::new();
941        la.extend_blocks(blocks).unwrap();
942
943        // Register separate blocks to get immutables
944        let other_blocks = manager.allocate_blocks(3).unwrap();
945        let immutables: Vec<ImmutableBlock<TestMeta>> = other_blocks
946            .into_iter()
947            .zip(seq.blocks().iter())
948            .map(|(m, tb)| manager.register_block(m.complete(tb).unwrap()))
949            .collect();
950
951        // Different block_ids from the pool — no collision with unassigned
952        let imm_ids: Vec<BlockId> = immutables.iter().map(|b| b.block_id()).collect();
953        assert!(unassigned_ids.iter().all(|id| !imm_ids.contains(id)));
954        la.extend_assigned(immutables).unwrap();
955        assert_eq!(la.assigned_count(), 3);
956        assert_eq!(la.unassigned_count(), 3);
957    }
958
959    // =========================================================================
960    // Negative: extend_blocks with block_id in staged or assigned
961    // =========================================================================
962
963    #[test]
964    fn test_extend_blocks_id_already_in_assigned() {
965        let manager = create_test_manager::<TestMeta>(10);
966        let seq = create_sequence(3);
967
968        // Register 3 blocks → assigned
969        let blocks = manager.allocate_blocks(3).unwrap();
970        let mut la = LogicalBlockAssignments::new();
971        la.extend_blocks(blocks).unwrap();
972        la.stage(seq.blocks()).unwrap();
973        la.register(&manager);
974        assert_eq!(la.assigned_count(), 3);
975
976        // Those block_ids are now back in the reset pool (RAII from the original
977        // MutableBlocks was consumed by stage/register). Allocating again may
978        // return the same block_ids — but they're still in `assigned`.
979        // With 10 total blocks and 3 in assigned, 7 are free. The 3 assigned
980        // block_ids won't be re-allocated because their RAII guards live in
981        // `la.assigned`. So we can't hit this path with a single manager.
982        //
983        // Verified: extend_blocks can only collide via programming error
984        // (two managers), which is an invalid scenario.
985        assert_eq!(la.assigned_count(), 3);
986    }
987
988    // =========================================================================
989    // Negative: stage edge cases
990    // =========================================================================
991
992    #[test]
993    fn test_stage_empty_unassigned() {
994        let mut la = LogicalBlockAssignments::<TestMeta>::new();
995        let seq = create_sequence(3);
996
997        let staged = la.stage(seq.blocks()).unwrap();
998        assert_eq!(staged, 0);
999        assert_eq!(la.staged_count(), 0);
1000    }
1001
1002    #[test]
1003    fn test_stage_empty_sequence_blocks() {
1004        let manager = create_test_manager::<TestMeta>(10);
1005        let blocks = manager.allocate_blocks(3).unwrap();
1006
1007        let mut la = LogicalBlockAssignments::new();
1008        la.extend_blocks(blocks).unwrap();
1009
1010        let staged = la.stage(&[]).unwrap();
1011        assert_eq!(staged, 0);
1012        assert_eq!(la.staged_count(), 0);
1013        assert_eq!(la.unassigned_count(), 3);
1014    }
1015
1016    #[test]
1017    fn test_stage_both_empty() {
1018        let mut la = LogicalBlockAssignments::<TestMeta>::new();
1019
1020        let staged = la.stage(&[]).unwrap();
1021        assert_eq!(staged, 0);
1022        assert!(la.is_empty());
1023    }
1024
1025    // =========================================================================
1026    // Negative: positional access out of bounds
1027    // =========================================================================
1028
1029    #[test]
1030    fn test_get_assigned_out_of_bounds() {
1031        let la = LogicalBlockAssignments::<TestMeta>::new();
1032        assert!(la.get_assigned(0).is_none());
1033        assert!(la.get_assigned(100).is_none());
1034    }
1035
1036    #[test]
1037    fn test_get_staged_out_of_bounds() {
1038        let la = LogicalBlockAssignments::<TestMeta>::new();
1039        assert!(la.get_staged(0).is_none());
1040        assert!(la.get_staged(100).is_none());
1041    }
1042
1043    #[test]
1044    fn test_get_unassigned_out_of_bounds() {
1045        let la = LogicalBlockAssignments::<TestMeta>::new();
1046        assert!(la.get_unassigned(0).is_none());
1047        assert!(la.get_unassigned(100).is_none());
1048    }
1049
1050    #[test]
1051    fn test_get_out_of_bounds_with_populated_collections() {
1052        let manager = create_test_manager::<TestMeta>(10);
1053        let blocks = manager.allocate_blocks(2).unwrap();
1054        let seq = create_sequence(2);
1055
1056        let mut la = LogicalBlockAssignments::new();
1057        la.extend_blocks(blocks).unwrap();
1058        la.stage(seq.blocks()).unwrap();
1059        la.register(&manager);
1060
1061        assert!(la.get_assigned(0).is_some());
1062        assert!(la.get_assigned(1).is_some());
1063        assert!(la.get_assigned(2).is_none());
1064        assert!(la.get_staged(0).is_none());
1065        assert!(la.get_unassigned(0).is_none());
1066    }
1067
1068    // =========================================================================
1069    // Debug
1070    // =========================================================================
1071
1072    #[test]
1073    fn test_debug_impl() {
1074        let la = LogicalBlockAssignments::<TestMeta>::new();
1075        let debug_str = format!("{la:?}");
1076        assert!(debug_str.contains("LogicalBlockAssignments"));
1077    }
1078}