use super::handle::LayoutHandle;
use crate::block_manager::v2::physical::layout::LayoutDescriptor;
use anyhow::Result;
use bincode::{Decode, Encode};
use bytes::Bytes;
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
pub struct WorkerAddress {
pub worker_id: u64,
pub nixl_agent_name: String,
}
impl WorkerAddress {
pub fn new(worker_id: u64, nixl_agent_name: String) -> Self {
Self {
worker_id,
nixl_agent_name,
}
}
}
#[derive(Debug, Clone, Encode, Decode)]
pub struct LocalLayoutDescriptor {
pub handle: LayoutHandle,
#[bincode(with_serde)]
pub layout: LayoutDescriptor,
}
impl LocalLayoutDescriptor {
pub fn new(handle: LayoutHandle, layout: LayoutDescriptor) -> Self {
Self { handle, layout }
}
}
#[derive(Debug, Encode, Decode)]
pub struct RdmaLayoutDescriptors {
pub worker_address: WorkerAddress,
pub nixl_metadata: Vec<u8>,
pub layouts: Vec<LocalLayoutDescriptor>,
}
pub struct SerializedLayout(Bytes);
impl SerializedLayout {
pub fn pack(
worker_address: WorkerAddress,
nixl_metadata: Vec<u8>,
layouts: Vec<LocalLayoutDescriptor>,
) -> Result<Self> {
let inner = RdmaLayoutDescriptors {
worker_address,
nixl_metadata,
layouts,
};
let bytes = bincode::encode_to_vec(&inner, bincode::config::standard())
.map_err(|e| anyhow::anyhow!("failed to encode managed memory metadata: {}", e))?;
Ok(Self(Bytes::from(bytes)))
}
pub fn unpack(&self) -> Result<RdmaLayoutDescriptors> {
let (inner, _) = bincode::decode_from_slice(&self.0, bincode::config::standard())
.map_err(|e| anyhow::anyhow!("failed to decode managed memory metadata: {}", e))?;
Ok(inner)
}
pub fn as_bytes(&self) -> &Bytes {
&self.0
}
pub fn from_bytes(bytes: Bytes) -> Self {
Self(bytes)
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
impl std::fmt::Debug for SerializedLayout {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SerializedLayout")
.field("size_bytes", &self.len())
.finish()
}
}
#[cfg(all(test, feature = "testing-nixl"))]
mod tests {
use super::*;
use crate::block_manager::v2::memory::{MemoryDescriptor, StorageKind};
use crate::block_manager::v2::physical::layout::{
BlockFormat, FullyContiguousDetails, LayoutConfig, LayoutDescriptor, LayoutTypeDetails,
NixlMetadata,
};
fn make_test_serialized_layout() -> LayoutDescriptor {
let config = LayoutConfig::builder()
.num_blocks(2)
.num_layers(2)
.outer_dim(2)
.page_size(4)
.inner_dim(8)
.dtype_width_bytes(2)
.build()
.unwrap();
LayoutDescriptor {
version: 1,
layout_config: config,
location: StorageKind::System,
nixl_metadata: NixlMetadata::new("test".to_string(), nixl_sys::MemType::Dram, 0),
memory_descriptors: vec![MemoryDescriptor::new(0x1000, 4096)],
layout_type_details: LayoutTypeDetails::FullyContiguous(FullyContiguousDetails {
block_format: BlockFormat::Operational,
}),
}
}
#[test]
fn test_worker_address() {
let addr = WorkerAddress::new(42, "test_agent".to_string());
assert_eq!(addr.worker_id, 42);
assert_eq!(addr.nixl_agent_name, "test_agent");
}
#[test]
fn test_serialized_layout_with_handle() {
let handle = LayoutHandle::new(1, 2);
let layout = make_test_serialized_layout();
let with_handle = LocalLayoutDescriptor::new(handle, layout);
assert_eq!(with_handle.handle, handle);
}
#[test]
fn test_metadata_pack_unpack() {
let worker_address = WorkerAddress::new(100, "worker_100".to_string());
let nixl_metadata = vec![1, 2, 3, 4, 5];
let layouts = vec![LocalLayoutDescriptor::new(
LayoutHandle::new(100, 1),
make_test_serialized_layout(),
)];
let packed =
SerializedLayout::pack(worker_address.clone(), nixl_metadata.clone(), layouts).unwrap();
assert!(!packed.is_empty());
assert!(!packed.is_empty());
let unpacked = packed.unpack().unwrap();
assert_eq!(unpacked.worker_address, worker_address);
assert_eq!(unpacked.nixl_metadata, nixl_metadata);
assert_eq!(unpacked.layouts.len(), 1);
assert_eq!(unpacked.layouts[0].handle.worker_id(), 100);
assert_eq!(unpacked.layouts[0].handle.layout_id(), 1);
}
#[test]
fn test_metadata_multiple_layouts() {
let worker_address = WorkerAddress::new(200, "worker_200".to_string());
let nixl_metadata = vec![10, 20, 30];
let layouts = vec![
LocalLayoutDescriptor::new(LayoutHandle::new(200, 1), make_test_serialized_layout()),
LocalLayoutDescriptor::new(LayoutHandle::new(200, 2), make_test_serialized_layout()),
LocalLayoutDescriptor::new(LayoutHandle::new(200, 3), make_test_serialized_layout()),
];
let packed =
SerializedLayout::pack(worker_address, nixl_metadata, layouts.clone()).unwrap();
let unpacked = packed.unpack().unwrap();
assert_eq!(unpacked.layouts.len(), 3);
for (i, layout) in unpacked.layouts.iter().enumerate() {
assert_eq!(layout.handle.worker_id(), 200);
assert_eq!(layout.handle.layout_id(), (i + 1) as u16);
}
}
#[test]
fn test_metadata_from_bytes() {
let worker_address = WorkerAddress::new(42, "test".to_string());
let nixl_metadata = vec![1, 2, 3];
let layouts = vec![];
let packed = SerializedLayout::pack(worker_address, nixl_metadata, layouts).unwrap();
let bytes = packed.as_bytes().clone();
let restored = SerializedLayout::from_bytes(bytes);
let unpacked = restored.unpack().unwrap();
assert_eq!(unpacked.worker_address.worker_id, 42);
}
}