kvbm_logical/blocks/mutable.rs
1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! RAII guard for a block in the **Reset** state.
5//!
6//! A [`MutableBlock`] is the entry point of the block lifecycle. It is
7//! obtained from [`BlockManager::allocate_blocks`](crate::manager::BlockManager::allocate_blocks)
8//! or by calling [`CompleteBlock::reset`](super::CompleteBlock::reset), and
9//! can be advanced to a [`CompleteBlock`](super::CompleteBlock) via
10//! [`stage`](MutableBlock::stage) or [`complete`](MutableBlock::complete).
11
12use super::{
13 Block, BlockError, BlockId, BlockMetadata, CompleteBlock, ResetReturnFn, SequenceHash,
14 state::Reset,
15};
16
17use crate::metrics::BlockPoolMetrics;
18use dynamo_tokens::TokenBlock;
19use std::sync::Arc;
20
21/// RAII guard for a block in the **Reset** state.
22///
23/// Wraps an internal `Block<T, Reset>` and guarantees that the block is
24/// returned to the reset pool when the guard is dropped -- whether the
25/// caller explicitly transitions it or simply lets it fall out of scope.
26///
27/// # Obtaining a `MutableBlock`
28///
29/// - [`BlockManager::allocate_blocks`](crate::manager::BlockManager::allocate_blocks)
30/// -- pulls one or more blocks from the reset pool.
31/// - [`CompleteBlock::reset`] -- undoes a staging operation, returning a
32/// block to the Reset state (metrics are *not* carried over on this path).
33///
34/// # State transitions
35///
36/// - [`stage`](Self::stage) -- transitions to [`CompleteBlock`] using a
37/// pre-computed [`SequenceHash`] and a block-size check.
38/// - [`complete`](Self::complete) -- transitions to [`CompleteBlock`] by
39/// extracting the hash from a [`TokenBlock`](dynamo_tokens::TokenBlock).
40///
41/// Both methods consume `self` and return the block inside
42/// `Err(`[`BlockError`]`)` on size mismatch so it is never leaked.
43///
44/// # Drop behaviour
45///
46/// Dropping a `MutableBlock` returns the underlying block to the reset pool
47/// and decrements the `inflight_mutable` metric gauge.
48pub struct MutableBlock<T: BlockMetadata> {
49 block: Option<Block<T, Reset>>,
50 return_fn: ResetReturnFn<T>,
51 metrics: Option<Arc<BlockPoolMetrics>>,
52}
53
54impl<T: BlockMetadata> MutableBlock<T> {
55 /// Create a new MutableBlock in Reset state
56 pub(crate) fn new(
57 block: Block<T, Reset>,
58 return_fn: ResetReturnFn<T>,
59 metrics: Option<Arc<BlockPoolMetrics>>,
60 ) -> Self {
61 if let Some(ref m) = metrics {
62 m.inc_inflight_mutable();
63 }
64 Self {
65 block: Some(block),
66 return_fn,
67 metrics,
68 }
69 }
70
71 /// Returns the [`BlockId`] assigned to this block.
72 pub fn block_id(&self) -> BlockId {
73 self.block_ref().block_id()
74 }
75
76 /// Transitions from **Reset** to **Staged**, producing a [`CompleteBlock`].
77 ///
78 /// The caller supplies a pre-computed [`SequenceHash`] and the expected
79 /// `block_size`. If `block_size` does not match the block's fixed size
80 /// the method returns `Err(`[`BlockError::BlockSizeMismatch`]`)` with the
81 /// `MutableBlock` inside so the caller can recover it.
82 ///
83 /// Increments the `stagings` counter on success.
84 pub fn stage(
85 mut self,
86 seq_hash: SequenceHash,
87 block_size: usize,
88 ) -> Result<CompleteBlock<T>, BlockError<MutableBlock<T>>> {
89 let inner_size = self.block_ref().block_size();
90 if block_size != inner_size {
91 return Err(BlockError::BlockSizeMismatch {
92 expected: inner_size,
93 actual: block_size,
94 block: self,
95 });
96 }
97 if let Some(ref m) = self.metrics {
98 m.inc_stagings();
99 }
100 Ok(CompleteBlock::new(
101 self.take_block().stage(seq_hash),
102 self.return_fn.clone(),
103 ))
104 }
105
106 /// Transitions from **Reset** to **Staged**, producing a [`CompleteBlock`].
107 ///
108 /// The [`SequenceHash`] is derived from the provided
109 /// [`TokenBlock`](dynamo_tokens::TokenBlock). If the token block's size
110 /// does not match the block's fixed size the method returns
111 /// `Err(`[`BlockError::BlockSizeMismatch`]`)` with the `MutableBlock`
112 /// inside so the caller can recover it.
113 ///
114 /// Increments the `stagings` counter on success.
115 pub fn complete(
116 mut self,
117 token_block: &TokenBlock,
118 ) -> Result<CompleteBlock<T>, BlockError<MutableBlock<T>>> {
119 let block = self.take_block();
120 match block.complete(token_block) {
121 Ok(complete_block) => {
122 if let Some(ref m) = self.metrics {
123 m.inc_stagings();
124 }
125 Ok(CompleteBlock::new(complete_block, self.return_fn.clone()))
126 }
127 Err(block_error) => {
128 // Extract the block from the error and put it back in self
129 match block_error {
130 BlockError::BlockSizeMismatch {
131 expected,
132 actual,
133 block,
134 } => {
135 self.block = Some(block);
136 Err(BlockError::BlockSizeMismatch {
137 expected,
138 actual,
139 block: self,
140 })
141 }
142 }
143 }
144 }
145 }
146
147 #[inline(always)]
148 fn take_block(&mut self) -> Block<T, Reset> {
149 self.block.take().expect("MutableBlock missing block")
150 }
151
152 #[inline(always)]
153 fn block_ref(&self) -> &Block<T, Reset> {
154 self.block.as_ref().expect("MutableBlock missing block")
155 }
156}
157
158impl<T: BlockMetadata> Drop for MutableBlock<T> {
159 #[inline]
160 fn drop(&mut self) {
161 if let Some(block) = self.block.take() {
162 (self.return_fn)(block);
163 }
164 if let Some(ref m) = self.metrics {
165 m.dec_inflight_mutable();
166 }
167 }
168}
169
170impl<T: BlockMetadata> std::fmt::Debug for MutableBlock<T> {
171 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 f.debug_struct("MutableBlock")
173 .field("block_id", &self.block.as_ref().map(|b| b.block_id()))
174 .finish()
175 }
176}