mod config;
mod state;
pub mod block;
pub mod events;
pub mod layout;
pub mod pool;
pub mod storage;
pub use crate::common::dtype::DType;
pub use block::{
nixl::{
AsBlockDescriptorSet, BlockDescriptorList, IsImmutable, IsMutable, MutabilityKind,
RemoteBlock,
},
transfer::{BlockTransferEngineV1, TransferRequestPut},
BasicMetadata, BlockMetadata, Blocks,
};
pub use config::*;
pub use layout::{nixl::NixlLayout, LayoutConfig, LayoutConfigBuilder, LayoutError, LayoutType};
pub use pool::BlockPool;
pub use storage::{
nixl::NixlRegisterableStorage, DeviceStorage, PinnedStorage, Storage, StorageAllocator,
};
pub use tokio_util::sync::CancellationToken;
use anyhow::{Context, Result};
use block::nixl::{BlockMutability, NixlBlockSet, RemoteBlocks, SerializedNixlBlockSet};
use derive_builder::Builder;
use nixl_sys::Agent as NixlAgent;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use storage::nixl::MemType;
use validator::Validate;
pub type WorkerID = u64;
pub type ReferenceBlockManager = KvBlockManager<BasicMetadata>;
pub enum CacheLevel {
G1,
G2,
G3,
G4,
}
pub struct KvBlockManager<Metadata: BlockMetadata> {
state: Arc<state::KvBlockManagerState<Metadata>>,
cancellation_token: CancellationToken,
}
impl<Metadata: BlockMetadata> KvBlockManager<Metadata> {
pub fn new(config: KvBlockManagerConfig) -> Result<Self> {
let mut config = config;
let cancellation_token = config.runtime.cancellation_token.clone();
config.runtime.cancellation_token = cancellation_token.child_token();
let state = state::KvBlockManagerState::new(config)?;
Ok(Self {
state,
cancellation_token,
})
}
pub fn export_local_blockset(&self) -> Result<SerializedNixlBlockSet> {
self.state.export_local_blockset()
}
pub fn import_remote_blockset(
&self,
serialized_blockset: SerializedNixlBlockSet,
) -> Result<()> {
self.state.import_remote_blockset(serialized_blockset)
}
pub fn get_remote_blocks_immutable(
&self,
bds: &BlockDescriptorList,
) -> Result<Vec<RemoteBlock<IsImmutable>>> {
self.state.get_remote_blocks_immutable(bds)
}
pub fn get_remote_blocks_mutable(
&self,
bds: &BlockDescriptorList,
) -> Result<Vec<RemoteBlock<IsMutable>>> {
self.state.get_remote_blocks_mutable(bds)
}
pub fn host(&self) -> Option<&BlockPool<PinnedStorage, Metadata>> {
self.state.host()
}
pub fn device(&self) -> Option<&BlockPool<DeviceStorage, Metadata>> {
self.state.device()
}
pub fn worker_id(&self) -> WorkerID {
self.state.worker_id()
}
}
impl<Metadata: BlockMetadata> Drop for KvBlockManager<Metadata> {
fn drop(&mut self) {
self.cancellation_token.cancel();
}
}
#[cfg(all(test, feature = "testing-full"))]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
static WORKER_ID: AtomicU64 = AtomicU64::new(1337);
fn create_reference_block_manager() -> ReferenceBlockManager {
let worker_id = WORKER_ID.fetch_add(1, Ordering::SeqCst);
let config = KvBlockManagerConfig::builder()
.runtime(
KvManagerRuntimeConfig::builder()
.worker_id(worker_id)
.build()
.unwrap(),
)
.model(
KvManagerModelConfig::builder()
.num_layers(3)
.page_size(4)
.inner_dim(16)
.build()
.unwrap(),
)
.host_layout(
KvManagerLayoutConfig::builder()
.num_blocks(16)
.allocator(storage::PinnedAllocator::default())
.build()
.unwrap(),
)
.device_layout(
KvManagerLayoutConfig::builder()
.num_blocks(8)
.allocator(storage::DeviceAllocator::new(0).unwrap())
.build()
.unwrap(),
)
.build()
.unwrap();
ReferenceBlockManager::new(config).unwrap()
}
#[tokio::test]
async fn test_reference_block_manager_inherited_async_runtime() {
dynamo_runtime::logging::init();
let _block_manager = create_reference_block_manager();
}
#[test]
fn test_reference_block_manager_blocking() {
dynamo_runtime::logging::init();
let _block_manager = create_reference_block_manager();
}
#[tokio::test]
async fn test_reference_block_managers() {
dynamo_runtime::logging::init();
let kvbm_0 = create_reference_block_manager();
let kvbm_1 = create_reference_block_manager();
assert_ne!(kvbm_0.worker_id(), kvbm_1.worker_id());
let blockset_0 = kvbm_0.export_local_blockset().unwrap();
let blockset_1 = kvbm_1.export_local_blockset().unwrap();
kvbm_0.import_remote_blockset(blockset_1).unwrap();
kvbm_1.import_remote_blockset(blockset_0).unwrap();
let blocks_0 = kvbm_0.host().unwrap().allocate_blocks(4).await.unwrap();
let blockset_0 = blocks_0.as_block_descriptor_set().unwrap();
let _blocks_1 = kvbm_1.host().unwrap().allocate_blocks(4).await.unwrap();
let mut _remote_blocks_0 = kvbm_1.get_remote_blocks_mutable(&blockset_0).unwrap();
}
}