Skip to main content

kvbm_logical/blocks/
mutable.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! RAII guard for a block in the **Reset** state.
5//!
6//! A [`MutableBlock`] is the entry point of the block lifecycle. It is
7//! obtained from [`BlockManager::allocate_blocks`](crate::manager::BlockManager::allocate_blocks)
8//! or by calling [`CompleteBlock::reset`](super::CompleteBlock::reset), and
9//! can be advanced to a [`CompleteBlock`](super::CompleteBlock) via
10//! [`stage`](MutableBlock::stage) or [`complete`](MutableBlock::complete).
11
12use super::{
13    Block, BlockError, BlockId, BlockMetadata, CompleteBlock, ResetReturnFn, SequenceHash,
14    state::Reset,
15};
16
17use crate::metrics::BlockPoolMetrics;
18use dynamo_tokens::TokenBlock;
19use std::sync::Arc;
20
21/// RAII guard for a block in the **Reset** state.
22///
23/// Wraps an internal `Block<T, Reset>` and guarantees that the block is
24/// returned to the reset pool when the guard is dropped -- whether the
25/// caller explicitly transitions it or simply lets it fall out of scope.
26///
27/// # Obtaining a `MutableBlock`
28///
29/// - [`BlockManager::allocate_blocks`](crate::manager::BlockManager::allocate_blocks)
30///   -- pulls one or more blocks from the reset pool.
31/// - [`CompleteBlock::reset`] -- undoes a staging operation, returning a
32///   block to the Reset state (metrics are *not* carried over on this path).
33///
34/// # State transitions
35///
36/// - [`stage`](Self::stage) -- transitions to [`CompleteBlock`] using a
37///   pre-computed [`SequenceHash`] and a block-size check.
38/// - [`complete`](Self::complete) -- transitions to [`CompleteBlock`] by
39///   extracting the hash from a [`TokenBlock`](dynamo_tokens::TokenBlock).
40///
41/// Both methods consume `self` and return the block inside
42/// `Err(`[`BlockError`]`)` on size mismatch so it is never leaked.
43///
44/// # Drop behaviour
45///
46/// Dropping a `MutableBlock` returns the underlying block to the reset pool
47/// and decrements the `inflight_mutable` metric gauge.
48pub struct MutableBlock<T: BlockMetadata> {
49    block: Option<Block<T, Reset>>,
50    return_fn: ResetReturnFn<T>,
51    metrics: Option<Arc<BlockPoolMetrics>>,
52}
53
54impl<T: BlockMetadata> MutableBlock<T> {
55    /// Create a new MutableBlock in Reset state
56    pub(crate) fn new(
57        block: Block<T, Reset>,
58        return_fn: ResetReturnFn<T>,
59        metrics: Option<Arc<BlockPoolMetrics>>,
60    ) -> Self {
61        if let Some(ref m) = metrics {
62            m.inc_inflight_mutable();
63        }
64        Self {
65            block: Some(block),
66            return_fn,
67            metrics,
68        }
69    }
70
71    /// Returns the [`BlockId`] assigned to this block.
72    pub fn block_id(&self) -> BlockId {
73        self.block_ref().block_id()
74    }
75
76    /// Transitions from **Reset** to **Staged**, producing a [`CompleteBlock`].
77    ///
78    /// The caller supplies a pre-computed [`SequenceHash`] and the expected
79    /// `block_size`. If `block_size` does not match the block's fixed size
80    /// the method returns `Err(`[`BlockError::BlockSizeMismatch`]`)` with the
81    /// `MutableBlock` inside so the caller can recover it.
82    ///
83    /// Increments the `stagings` counter on success.
84    pub fn stage(
85        mut self,
86        seq_hash: SequenceHash,
87        block_size: usize,
88    ) -> Result<CompleteBlock<T>, BlockError<MutableBlock<T>>> {
89        let inner_size = self.block_ref().block_size();
90        if block_size != inner_size {
91            return Err(BlockError::BlockSizeMismatch {
92                expected: inner_size,
93                actual: block_size,
94                block: self,
95            });
96        }
97        if let Some(ref m) = self.metrics {
98            m.inc_stagings();
99        }
100        Ok(CompleteBlock::new(
101            self.take_block().stage(seq_hash),
102            self.return_fn.clone(),
103        ))
104    }
105
106    /// Transitions from **Reset** to **Staged**, producing a [`CompleteBlock`].
107    ///
108    /// The [`SequenceHash`] is derived from the provided
109    /// [`TokenBlock`](dynamo_tokens::TokenBlock). If the token block's size
110    /// does not match the block's fixed size the method returns
111    /// `Err(`[`BlockError::BlockSizeMismatch`]`)` with the `MutableBlock`
112    /// inside so the caller can recover it.
113    ///
114    /// Increments the `stagings` counter on success.
115    pub fn complete(
116        mut self,
117        token_block: &TokenBlock,
118    ) -> Result<CompleteBlock<T>, BlockError<MutableBlock<T>>> {
119        let block = self.take_block();
120        match block.complete(token_block) {
121            Ok(complete_block) => {
122                if let Some(ref m) = self.metrics {
123                    m.inc_stagings();
124                }
125                Ok(CompleteBlock::new(complete_block, self.return_fn.clone()))
126            }
127            Err(block_error) => {
128                // Extract the block from the error and put it back in self
129                match block_error {
130                    BlockError::BlockSizeMismatch {
131                        expected,
132                        actual,
133                        block,
134                    } => {
135                        self.block = Some(block);
136                        Err(BlockError::BlockSizeMismatch {
137                            expected,
138                            actual,
139                            block: self,
140                        })
141                    }
142                }
143            }
144        }
145    }
146
147    #[inline(always)]
148    fn take_block(&mut self) -> Block<T, Reset> {
149        self.block.take().expect("MutableBlock missing block")
150    }
151
152    #[inline(always)]
153    fn block_ref(&self) -> &Block<T, Reset> {
154        self.block.as_ref().expect("MutableBlock missing block")
155    }
156}
157
158impl<T: BlockMetadata> Drop for MutableBlock<T> {
159    #[inline]
160    fn drop(&mut self) {
161        if let Some(block) = self.block.take() {
162            (self.return_fn)(block);
163        }
164        if let Some(ref m) = self.metrics {
165            m.dec_inflight_mutable();
166        }
167    }
168}
169
170impl<T: BlockMetadata> std::fmt::Debug for MutableBlock<T> {
171    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172        f.debug_struct("MutableBlock")
173            .field("block_id", &self.block.as_ref().map(|b| b.block_id()))
174            .finish()
175    }
176}