mod active;
mod inactive;
mod priority_key;
mod state;
use active::ActiveBlockPool;
use derive_builder::Builder;
use derive_getters::Dissolve;
use inactive::InactiveBlockPool;
use priority_key::PriorityKey;
pub use super::block::{ImmutableBlock, MutableBlock};
use super::block::{
nixl::short_type_name, registry::BlockRegistry, Block, BlockError, BlockMetadata,
};
use super::events::{EventManager, NullEventManager};
use super::storage::Storage;
use crate::tokens::{SequenceHash, TokenBlock};
use std::{
collections::{BTreeSet, HashMap, VecDeque},
sync::{Arc, Weak},
};
use tokio_util::sync::CancellationToken;
use dynamo_runtime::Result;
#[derive(Debug, thiserror::Error)]
pub enum BlockPoolError {
#[error("Block is not complete")]
BlockNotComplete,
#[error("Not enough blocks available, requested: {0}, available: {1}")]
NotEnoughBlocksAvailable(usize, usize),
#[error("Invalid MutableBlock: {0}")]
InvalidMutableBlock(String),
#[error("Failed to register block: {0}")]
FailedToRegisterBlock(String),
#[error("Progress engine shutdown")]
ProgressEngineShutdown,
#[error(transparent)]
BlockError(#[from] BlockError),
}
#[derive(Builder, Dissolve)]
#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
pub struct BlockPoolArgs<S: Storage, M: BlockMetadata> {
#[builder(default = "NullEventManager::new()")]
event_manager: Arc<dyn EventManager>,
#[builder(default = "CancellationToken::new()")]
cancel_token: CancellationToken,
#[builder(default)]
blocks: Vec<Block<S, M>>,
}
impl<S: Storage, M: BlockMetadata> BlockPoolArgsBuilder<S, M> {
pub fn build(self) -> anyhow::Result<BlockPool<S, M>> {
let args = self.build_internal()?;
let (event_manager, cancel_token, blocks) = args.dissolve();
tracing::info!("building block pool");
let pool = BlockPool::new(event_manager, cancel_token, blocks);
Ok(pool)
}
}
pub struct BlockPool<S: Storage, M: BlockMetadata> {
priority_tx: tokio::sync::mpsc::UnboundedSender<PriorityRequest<S, M>>,
ctrl_tx: tokio::sync::mpsc::UnboundedSender<ControlRequest<S, M>>,
}
impl<S: Storage, M: BlockMetadata> Clone for BlockPool<S, M> {
fn clone(&self) -> Self {
Self {
priority_tx: self.priority_tx.clone(),
ctrl_tx: self.ctrl_tx.clone(),
}
}
}
#[derive(Dissolve)]
struct Unary<Req, Resp> {
request: Req,
response_tx: oneshot::Sender<Resp>,
}
impl<Req, Resp> Unary<Req, Resp> {
fn make_request(request: Req) -> (Self, oneshot::Receiver<Resp>) {
let (response_tx, response_rx) = oneshot::channel();
(
Self {
request,
response_tx,
},
response_rx,
)
}
}
type UnaryResponse<T> = Result<oneshot::Receiver<T>, BlockPoolError>;
type ImmutableBlocksResult<S, M> = Result<Vec<ImmutableBlock<S, M>>, BlockPoolError>;
pub type MutableBlocks<S, M> = Vec<MutableBlock<S, M>>;
pub type ImmutableBlocks<S, M> = Vec<ImmutableBlock<S, M>>;
enum PriorityRequest<S: Storage, M: BlockMetadata> {
AllocateBlocks(Unary<usize, Result<Vec<MutableBlock<S, M>>, BlockPoolError>>),
RegisterBlocks(Unary<MutableBlocks<S, M>, Result<ImmutableBlocks<S, M>, BlockPoolError>>),
MatchSequenceHashes(Unary<Vec<SequenceHash>, Vec<ImmutableBlock<S, M>>>),
}
enum ControlRequest<S: Storage, M: BlockMetadata> {
AddBlocks(Unary<Vec<Block<S, M>>, ()>),
}
impl<S: Storage, M: BlockMetadata> BlockPool<S, M> {
pub fn builder() -> BlockPoolArgsBuilder<S, M> {
BlockPoolArgsBuilder::default()
}
fn new(
event_manager: Arc<dyn EventManager>,
cancel_token: CancellationToken,
blocks: Vec<Block<S, M>>,
) -> Self {
let (pool, progress_engine) =
Self::with_progress_engine(event_manager, cancel_token, blocks);
let thread_name = format!("block-pool-{}", short_type_name::<S>());
std::thread::Builder::new()
.name(thread_name)
.spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to build Tokio runtime for block pool progress engine");
runtime.block_on(async move {
let mut progress_engine = progress_engine;
tracing::debug!("starting progress engine");
while progress_engine.step().await {
tracing::trace!("progress engine step");
}
});
})
.expect("Failed to spawn block pool progress engine thread");
pool
}
fn with_progress_engine(
event_manager: Arc<dyn EventManager>,
cancel_token: CancellationToken,
blocks: Vec<Block<S, M>>,
) -> (Self, ProgressEngine<S, M>) {
let (priority_tx, priority_rx) = tokio::sync::mpsc::unbounded_channel();
let (ctrl_tx, ctrl_rx) = tokio::sync::mpsc::unbounded_channel();
let progress_engine =
ProgressEngine::<S, M>::new(event_manager, priority_rx, ctrl_rx, cancel_token, blocks);
(
Self {
priority_tx,
ctrl_tx,
},
progress_engine,
)
}
#[expect(dead_code)]
pub(crate) async fn add_blocks(&self, blocks: Vec<Block<S, M>>) -> Result<(), BlockPoolError> {
self._add_blocks(blocks)?
.await
.map_err(|_| BlockPoolError::ProgressEngineShutdown)
}
pub(crate) fn add_blocks_blocking(
&self,
blocks: Vec<Block<S, M>>,
) -> Result<(), BlockPoolError> {
self._add_blocks(blocks)?
.recv()
.map_err(|_| BlockPoolError::ProgressEngineShutdown)
}
fn _add_blocks(&self, blocks: Vec<Block<S, M>>) -> UnaryResponse<()> {
let (req, resp_rx) = Unary::<_, ()>::make_request(blocks);
self.ctrl_tx
.send(ControlRequest::AddBlocks(req))
.map_err(|_| BlockPoolError::ProgressEngineShutdown)?;
Ok(resp_rx)
}
pub async fn allocate_blocks(
&self,
count: usize,
) -> Result<Vec<MutableBlock<S, M>>, BlockPoolError> {
self._allocate_blocks(count)?
.await
.map_err(|_| BlockPoolError::ProgressEngineShutdown)?
}
pub fn allocate_blocks_blocking(
&self,
count: usize,
) -> Result<Vec<MutableBlock<S, M>>, BlockPoolError> {
self._allocate_blocks(count)?
.recv()
.map_err(|_| BlockPoolError::ProgressEngineShutdown)?
}
fn _allocate_blocks(
&self,
count: usize,
) -> UnaryResponse<Result<Vec<MutableBlock<S, M>>, BlockPoolError>> {
let (req, resp_rx) =
Unary::<_, Result<Vec<MutableBlock<S, M>>, BlockPoolError>>::make_request(count);
self.priority_tx
.send(PriorityRequest::AllocateBlocks(req))
.map_err(|_| BlockPoolError::ProgressEngineShutdown)?;
Ok(resp_rx)
}
pub async fn register_blocks(
&self,
blocks: Vec<MutableBlock<S, M>>,
) -> ImmutableBlocksResult<S, M> {
self._register_blocks(blocks)?
.await
.map_err(|_| BlockPoolError::ProgressEngineShutdown)?
}
pub fn register_blocks_blocking(
&self,
blocks: Vec<MutableBlock<S, M>>,
) -> ImmutableBlocksResult<S, M> {
self._register_blocks(blocks)?
.recv()
.map_err(|_| BlockPoolError::ProgressEngineShutdown)?
}
fn _register_blocks(
&self,
blocks: Vec<MutableBlock<S, M>>,
) -> UnaryResponse<ImmutableBlocksResult<S, M>> {
let (req, resp_rx) = Unary::<_, ImmutableBlocksResult<S, M>>::make_request(blocks);
self.priority_tx
.send(PriorityRequest::RegisterBlocks(req))
.map_err(|_| BlockPoolError::ProgressEngineShutdown)?;
Ok(resp_rx)
}
pub async fn match_sequence_hashes(
&self,
sequence_hashes: &[SequenceHash],
) -> ImmutableBlocksResult<S, M> {
self._match_sequence_hashes(sequence_hashes)?
.await
.map_err(|_| BlockPoolError::ProgressEngineShutdown)
}
pub fn match_sequence_hashes_blocking(
&self,
sequence_hashes: &[SequenceHash],
) -> ImmutableBlocksResult<S, M> {
self._match_sequence_hashes(sequence_hashes)?
.recv()
.map_err(|_| BlockPoolError::ProgressEngineShutdown)
}
fn _match_sequence_hashes(
&self,
sequence_hashes: &[SequenceHash],
) -> UnaryResponse<Vec<ImmutableBlock<S, M>>> {
let (req, resp_rx) =
Unary::<_, Vec<ImmutableBlock<S, M>>>::make_request(sequence_hashes.into());
self.priority_tx
.send(PriorityRequest::MatchSequenceHashes(req))
.map_err(|_| BlockPoolError::ProgressEngineShutdown)?;
Ok(resp_rx)
}
}
struct State<S: Storage, M: BlockMetadata> {
active: ActiveBlockPool<S, M>,
inactive: InactiveBlockPool<S, M>,
registry: BlockRegistry,
return_tx: tokio::sync::mpsc::UnboundedSender<Block<S, M>>,
event_manager: Arc<dyn EventManager>,
}
struct ProgressEngine<S: Storage, M: BlockMetadata> {
priority_rx: tokio::sync::mpsc::UnboundedReceiver<PriorityRequest<S, M>>,
ctrl_rx: tokio::sync::mpsc::UnboundedReceiver<ControlRequest<S, M>>,
cancel_token: CancellationToken,
state: State<S, M>,
return_rx: tokio::sync::mpsc::UnboundedReceiver<Block<S, M>>,
}
#[cfg(test)]
mod tests {
use crate::block_manager::block::BlockExt;
use super::super::block::{BasicMetadata, Blocks};
use super::super::layout::tests::setup_layout;
use super::*;
impl<S: Storage, M: BlockMetadata> BlockPoolArgsBuilder<S, M> {
fn build_with_progress_engine(
self,
) -> anyhow::Result<(BlockPool<S, M>, ProgressEngine<S, M>)> {
let args = self.build_internal()?;
let (event_manager, cancel_token, blocks) = args.dissolve();
let (pool, progress_engine) =
BlockPool::with_progress_engine(event_manager, cancel_token, blocks);
Ok((pool, progress_engine))
}
}
#[tokio::test]
async fn test_block_pool_state() {
let layout = setup_layout(None).unwrap();
let blocks = Blocks::<_, BasicMetadata>::new(layout, 42, 0)
.unwrap()
.into_blocks()
.unwrap();
let (_pool, mut progress) = BlockPool::builder()
.blocks(blocks)
.build_with_progress_engine()
.unwrap();
assert_eq!(progress.state.inactive.available_blocks(), 7);
let blocks = progress.state.allocate_blocks(1).unwrap();
assert_eq!(progress.state.inactive.available_blocks(), 6);
assert_eq!(blocks.len(), 1);
drop(blocks);
progress.step().await;
assert_eq!(progress.state.inactive.available_blocks(), 7);
let mut blocks = progress.state.allocate_blocks(1).unwrap();
assert_eq!(progress.state.inactive.available_blocks(), 6);
assert_eq!(blocks.len(), 1);
let mut block = blocks.pop().unwrap();
block.init_sequence(1337).unwrap();
block.add_token(1).unwrap();
block.add_token(2).unwrap();
block.add_token(3).unwrap();
block.add_token(4).unwrap();
assert!(block.add_token(5).is_err());
}
#[tokio::test]
async fn test_block_pool() {
let layout = setup_layout(None).unwrap();
let blocks = Blocks::<_, BasicMetadata>::new(layout, 42, 0)
.unwrap()
.into_blocks()
.unwrap();
let (pool, mut progress) = BlockPool::builder()
.blocks(blocks)
.build_with_progress_engine()
.unwrap();
assert_eq!(progress.state.inactive.available_blocks(), 7);
let pool_clone = pool.clone();
let allocate_1_block =
tokio::spawn(async move { pool_clone.allocate_blocks(1).await.unwrap() });
progress.step().await;
let blocks = allocate_1_block.await.unwrap();
assert_eq!(progress.state.inactive.available_blocks(), 6);
assert_eq!(blocks.len(), 1);
drop(blocks);
assert_eq!(progress.state.inactive.available_blocks(), 6);
progress.step().await;
assert_eq!(progress.state.inactive.available_blocks(), 7);
}
#[test]
fn test_block_pool_blocking() {
const EXPECTED_SEQUENCE_HASH: u64 = 14643705804678351452;
let layout = setup_layout(None).unwrap();
let blocks = Blocks::<_, BasicMetadata>::new(layout, 42, 0)
.unwrap()
.into_blocks()
.unwrap();
let pool = BlockPool::builder().blocks(blocks).build().unwrap();
let matched_blocks = pool
.match_sequence_hashes_blocking(&[EXPECTED_SEQUENCE_HASH])
.unwrap();
assert_eq!(matched_blocks.len(), 0);
let mut mutable_blocks = pool.allocate_blocks_blocking(1).unwrap();
assert_eq!(mutable_blocks.len(), 1);
let mut block = mutable_blocks.pop().unwrap();
block.init_sequence(1337).unwrap();
block.add_token(1).unwrap();
block.add_token(2).unwrap();
block.add_token(3).unwrap();
block.add_token(4).unwrap();
assert!(block.add_token(5).is_err());
block.commit().unwrap();
assert!(block.state().is_complete());
let sequence_hash = block.sequence_hash().unwrap();
assert_eq!(sequence_hash, EXPECTED_SEQUENCE_HASH);
let mut immutable_blocks = pool.register_blocks_blocking(vec![block]).unwrap();
let block = immutable_blocks.pop().unwrap();
assert!(block.state().is_registered());
assert_eq!(block.sequence_hash().unwrap(), sequence_hash);
drop(block);
let matched = pool
.match_sequence_hashes_blocking(&[sequence_hash])
.unwrap();
assert_eq!(matched.len(), 1);
assert_eq!(matched[0].sequence_hash().unwrap(), sequence_hash);
}
}