use crate::{
error::{BufferError, ManagerError}, handle::ZeroCopyHandle, types::{AbsoluteOffset, FailedReservationInfo, GroupId, ReservationId, SubmitParams}, };
use bytes::Bytes;
use tracing::{debug, error, info, trace, warn};
#[derive(Debug)]
pub struct SubmitAgent {
pub(crate) id: ReservationId,
pub(crate) group_id: GroupId,
pub(crate) offset: AbsoluteOffset,
pub(crate) size: usize,
pub(crate) handle: ZeroCopyHandle, pub(crate) consumed: bool,
}
impl SubmitAgent {
pub fn id(&self) -> ReservationId {
self.id
}
pub fn offset(&self) -> AbsoluteOffset {
self.offset
}
pub fn size(&self) -> usize {
self.size
}
pub fn into_single_agent(mut self) -> SingleAgent {
trace!("(Agent {}) SubmitAgent 转换为 SingleAgent", self.id);
self.consumed = true; SingleAgent {
id: self.id,
group_id: self.group_id,
offset: self.offset,
size: self.size,
handle: self.handle.clone(), committed: false, }
}
pub fn into_chunk_agent(mut self) -> ChunkAgent {
trace!("(Agent {}) SubmitAgent 转换为 ChunkAgent", self.id);
self.consumed = true; ChunkAgent {
id: self.id,
group_id: self.group_id,
offset: self.offset,
size: self.size,
handle: self.handle.clone(), chunks: Vec::new(), current_size: 0, committed: false, }
}
}
impl Drop for SubmitAgent {
fn drop(&mut self) {
if !self.consumed {
warn!(
"(Agent {}) Drop: SubmitAgent 未转换为 Single/ChunkAgent,通知 Manager 预留失败 (ID: {}, Offset: {}, Size: {})",
self.id, self.id, self.offset, self.size
);
self.handle.send_failed_info(FailedReservationInfo {
id: self.id,
group_id: self.group_id,
offset: self.offset,
size: self.size,
});
} else {
trace!("(Agent {}) Drop: SubmitAgent 已被消费,不发送通知", self.id);
}
}
}
#[derive(Debug)]
pub struct SingleAgent {
id: ReservationId,
group_id: GroupId,
offset: AbsoluteOffset,
size: usize,
handle: ZeroCopyHandle,
committed: bool,
}
impl SingleAgent {
pub fn id(&self) -> ReservationId {
self.id
}
pub fn offset(&self) -> AbsoluteOffset {
self.offset
}
pub fn size(&self) -> usize {
self.size
}
pub async fn submit_bytes(mut self, bytes: Bytes) -> Result<(), BufferError> {
if self.committed {
error!("(Agent {}) 尝试重复提交 (SingleAgent)", self.id);
return Err(ManagerError::AlreadyCommitted(self.id).into());
}
let actual_size = bytes.len();
if actual_size != self.size {
error!(
"(Agent {}) 提交大小错误 (SingleAgent): 期望 {}, 实际 {}",
self.id, self.size, actual_size
);
return Err(ManagerError::SubmitSizeIncorrect {
reservation_id: self.id,
expected: self.size,
actual: actual_size,
}
.into());
}
debug!(
"(Agent {}) 准备提交单块数据 (Offset: {}, Size: {})",
self.id, self.offset, self.size
);
match self
.handle
.submit_single_bytes_internal(
SubmitParams {
res_id: self.id,
group_id: self.group_id,
offset: self.offset,
},
bytes,
)
.await
{
Ok(_) => {
info!("(Agent {}) 单块数据提交成功", self.id);
self.committed = true; Ok(())
}
Err(e) => {
error!("(Agent {}) 单块数据提交失败: {:?}", self.id, e);
Err(e)
}
}
}
}
impl Drop for SingleAgent {
fn drop(&mut self) {
if !self.committed {
warn!(
"(Agent {}) Drop: SingleAgent 未成功提交,通知 Manager 失败 (ID: {}, Offset: {}, Size: {})",
self.id, self.id, self.offset, self.size
);
self.handle.send_failed_info(FailedReservationInfo {
id: self.id,
group_id: self.group_id,
offset: self.offset,
size: self.size,
});
}
}
}
#[derive(Debug)]
pub struct ChunkAgent {
id: ReservationId,
group_id: GroupId,
offset: AbsoluteOffset, size: usize, handle: ZeroCopyHandle,
chunks: Vec<Bytes>,
current_size: usize,
committed: bool,
}
impl ChunkAgent {
pub fn id(&self) -> ReservationId {
self.id
}
pub fn offset(&self) -> AbsoluteOffset {
self.offset
}
pub fn size(&self) -> usize {
self.size
}
pub fn current_size(&self) -> usize {
self.current_size
}
pub fn submit_chunk(&mut self, chunk: Bytes) -> Result<(), ManagerError> {
if self.committed {
error!("(Agent {}) 尝试在 Commit 后添加块 (ChunkAgent)", self.id);
return Err(ManagerError::AlreadyCommitted(self.id));
}
let chunk_len = chunk.len();
if self.current_size + chunk_len > self.size {
error!(
"(Agent {}) 添加块导致超出预留大小 (ChunkAgent): 当前 {}, 添加 {}, 总共 {}, 预留 {}",
self.id, self.current_size, chunk_len, self.current_size + chunk_len, self.size
);
return Err(ManagerError::SubmitSizeTooLarge {
reservation_id: self.id,
largest: self.size - self.current_size, actual: chunk_len,
});
}
self.current_size += chunk_len;
self.chunks.push(chunk);
trace!(
"(Agent {}) 添加块成功 (ChunkAgent): 大小 {}, 当前总大小 {}",
self.id,
chunk_len,
self.current_size
);
Ok(())
}
pub async fn commit(mut self) -> Result<(), BufferError> {
if self.committed {
error!("(Agent {}) 尝试重复 Commit (ChunkAgent)", self.id);
return Err(ManagerError::AlreadyCommitted(self.id).into());
}
if self.current_size != self.size {
error!(
"(Agent {}) Commit 时总大小不匹配 (ChunkAgent): 期望 {}, 实际 {}",
self.id, self.size, self.current_size
);
return Err(ManagerError::CommitSizeMismatch {
reservation_id: self.id,
expected: self.size,
actual: self.current_size,
}
.into());
}
debug!(
"(Agent {}) 准备 Commit 分块数据 (Offset: {}, Total Size: {}, Chunks: {})",
self.id,
self.offset,
self.current_size,
self.chunks.len()
);
let chunks_to_send = std::mem::take(&mut self.chunks);
match self
.handle
.submit_chunked_bytes_internal(
SubmitParams {
res_id: self.id,
group_id: self.group_id,
offset: self.offset,
},
chunks_to_send,
)
.await
{
Ok(_) => {
info!("(Agent {}) 分块数据 Commit 成功", self.id);
self.committed = true;
Ok(())
}
Err(e) => {
error!("(Agent {}) 分块数据 Commit 失败: {:?}", self.id, e);
Err(e)
}
}
}
}
impl Drop for ChunkAgent {
fn drop(&mut self) {
if !self.committed {
warn!(
"(Agent {}) Drop: ChunkAgent 未成功 Commit (或未调用 Commit),通知 Manager 失败 (ID: {}, Offset: {}, Size: {})",
self.id, self.id, self.offset, self.size
);
self.handle.send_failed_info(FailedReservationInfo {
id: self.id,
group_id: self.group_id,
offset: self.offset,
size: self.size, });
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::CommitType;
use crate::{
handle::ZeroCopyHandle,
types::{AbsoluteOffset, Request, ReservationId},
BufferError, ManagerError,
};
use bytes::Bytes;
use tokio::sync::mpsc;
fn setup_handle_and_receiver() -> (ZeroCopyHandle, mpsc::Receiver<Request>) {
let (request_tx, request_rx) = mpsc::channel::<Request>(10);
let handle = ZeroCopyHandle::new(request_tx);
(handle, request_rx)
}
#[test]
fn test_submit_agent_creation_and_accessors() {
let (handle, _rx) = setup_handle_and_receiver();
let agent = SubmitAgent {
id: 1,
group_id: 0,
offset: 100,
size: 50,
handle,
consumed: false,
};
assert_eq!(agent.id(), 1);
assert_eq!(agent.offset(), 100);
assert_eq!(agent.size(), 50);
assert!(!agent.consumed);
}
#[test]
fn test_submit_agent_into_single_agent() {
let (handle, _rx) = setup_handle_and_receiver();
let submit_agent = SubmitAgent {
id: 1,
group_id: 0,
offset: 100,
size: 50,
handle: handle.clone(),
consumed: false,
};
let single_agent = submit_agent.into_single_agent();
assert_eq!(single_agent.id, 1);
assert_eq!(single_agent.offset, 100);
assert_eq!(single_agent.size, 50);
assert_eq!(single_agent.group_id, 0);
assert!(!single_agent.committed);
}
#[test]
fn test_submit_agent_into_chunk_agent() {
let (handle, _rx) = setup_handle_and_receiver();
let submit_agent = SubmitAgent {
id: 1,
group_id: 0,
offset: 100,
size: 50,
handle: handle.clone(),
consumed: false,
};
let chunk_agent = submit_agent.into_chunk_agent();
assert_eq!(chunk_agent.id, 1);
assert_eq!(chunk_agent.offset, 100);
assert_eq!(chunk_agent.size, 50);
assert_eq!(chunk_agent.group_id, 0);
assert!(chunk_agent.chunks.is_empty());
assert_eq!(chunk_agent.current_size, 0);
assert!(!chunk_agent.committed);
}
#[tokio::test]
async fn test_submit_agent_drop_sends_failed_info() {
let (handle, mut rx) = setup_handle_and_receiver();
let agent_id: ReservationId = 10;
let agent_offset: AbsoluteOffset = 200;
let agent_size: usize = 30;
{
let _submit_agent = SubmitAgent {
id: agent_id,
group_id: 1,
offset: agent_offset,
size: agent_size,
handle: handle.clone(),
consumed: false, };
}
match tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv()).await {
Ok(Some(Request::FailedInfo(req))) => {
assert_eq!(req.info.id, agent_id);
assert_eq!(req.info.offset, agent_offset);
assert_eq!(req.info.size, agent_size);
}
_ => panic!("SubmitAgent drop 时未发送 FailedInfo 或发送了错误类型"),
}
}
#[tokio::test]
async fn test_submit_agent_drop_consumed_does_not_send_failed_info() {
let (handle, mut rx) = setup_handle_and_receiver();
{
let mut submit_agent = SubmitAgent {
id: 10,
group_id: 1,
offset: 200,
size: 30,
handle: handle.clone(),
consumed: false,
};
submit_agent.consumed = true; }
match tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await {
Ok(Some(_)) => panic!("不应发送任何信息当 SubmitAgent 已被消费"),
Ok(None) => {} Err(_) => {} }
}
#[tokio::test]
async fn test_single_agent_submit_bytes_success() {
let (mock_request_tx, mut mock_request_rx) = mpsc::channel::<Request>(1);
let handle = ZeroCopyHandle::new(mock_request_tx);
let agent_id: ReservationId = 2;
let agent_size: usize = 10;
tokio::spawn(async move {
if let Some(Request::SubmitBytes(req)) = mock_request_rx.recv().await {
assert_eq!(req.reservation_id, agent_id);
assert_eq!(
req.data.clone().get_single_bytes().unwrap().len(),
agent_size
);
req.reply_tx.send(Ok(())).unwrap();
}
});
let single_agent = SingleAgent {
id: agent_id,
group_id: 0,
offset: 100,
size: agent_size,
handle,
committed: false,
};
let data = Bytes::from(vec![0u8; agent_size]);
let result = single_agent.submit_bytes(data).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_single_agent_submit_bytes_size_mismatch_at_agent() {
let (handle, _rx) = setup_handle_and_receiver(); let agent = SingleAgent {
id: 2,
group_id: 0,
offset: 100,
size: 10, handle,
committed: false,
};
let data = Bytes::from(vec![0u8; 5]); let result = agent.submit_bytes(data).await;
match result {
Err(BufferError::ManagerError(ManagerError::SubmitSizeIncorrect {
reservation_id,
expected,
actual,
})) => {
assert_eq!(reservation_id, 2);
assert_eq!(expected, 10);
assert_eq!(actual, 5);
}
_ => panic!("期望 SubmitSizeIncorrect 错误, 得到 {:?}", result),
}
}
#[tokio::test]
async fn test_single_agent_submit_bytes_manager_error() {
let (mock_request_tx, mut mock_request_rx) = mpsc::channel::<Request>(1);
let handle = ZeroCopyHandle::new(mock_request_tx);
let agent_id: ReservationId = 3;
tokio::spawn(async move {
if let Some(Request::SubmitBytes(req)) = mock_request_rx.recv().await {
req.reply_tx
.send(Err(ManagerError::ReservationNotFound(agent_id)))
.unwrap();
}
});
let single_agent = SingleAgent {
id: agent_id,
group_id: 0,
offset: 100,
size: 10,
handle,
committed: false,
};
let data = Bytes::from(vec![0u8; 10]);
let result = single_agent.submit_bytes(data).await;
match result {
Err(BufferError::ManagerError(ManagerError::ReservationNotFound(id))) => {
assert_eq!(id, agent_id);
}
_ => panic!("期望 ReservationNotFound 错误, 得到 {:?}", result),
}
}
#[tokio::test]
async fn test_single_agent_drop_not_committed_sends_failed_info() {
let (handle, mut rx) = setup_handle_and_receiver();
let agent_id: ReservationId = 20;
let agent_offset: AbsoluteOffset = 220;
let agent_size: usize = 32;
{
let _single_agent = SingleAgent {
id: agent_id,
group_id: 1,
offset: agent_offset,
size: agent_size,
handle: handle.clone(),
committed: false, };
}
match tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv()).await {
Ok(Some(Request::FailedInfo(req))) => {
assert_eq!(req.info.id, agent_id);
assert_eq!(req.info.offset, agent_offset);
assert_eq!(req.info.size, agent_size);
}
_ => panic!("SingleAgent (未提交) drop 时未发送 FailedInfo"),
}
}
#[tokio::test]
async fn test_single_agent_drop_committed_does_not_send_failed_info() {
let (handle, mut rx) = setup_handle_and_receiver();
{
let mut single_agent = SingleAgent {
id: 20,
group_id: 1,
offset: 220,
size: 32,
handle: handle.clone(),
committed: false,
};
single_agent.committed = true; }
match tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await {
Ok(Some(_)) => panic!("不应发送任何信息当 SingleAgent 已提交"),
Ok(None) => {}
Err(_) => {} }
}
#[test]
fn test_chunk_agent_submit_chunk_success_and_current_size() {
let (handle, _rx) = setup_handle_and_receiver();
let mut agent = ChunkAgent {
id: 3,
group_id: 0,
offset: 200,
size: 100, handle,
chunks: Vec::new(),
current_size: 0,
committed: false,
};
assert_eq!(agent.current_size(), 0);
let chunk1 = Bytes::from(vec![0u8; 30]);
assert!(agent.submit_chunk(chunk1).is_ok());
assert_eq!(agent.current_size(), 30);
assert_eq!(agent.chunks.len(), 1);
let chunk2 = Bytes::from(vec![0u8; 40]);
assert!(agent.submit_chunk(chunk2).is_ok());
assert_eq!(agent.current_size(), 70);
assert_eq!(agent.chunks.len(), 2);
}
#[test]
fn test_chunk_agent_submit_chunk_exceeds_size() {
let (handle, _rx) = setup_handle_and_receiver();
let mut agent = ChunkAgent {
id: 3,
group_id: 0,
offset: 200,
size: 50, handle,
chunks: Vec::new(),
current_size: 0,
committed: false,
};
let chunk1 = Bytes::from(vec![0u8; 30]);
agent.submit_chunk(chunk1).unwrap();
let chunk2 = Bytes::from(vec![0u8; 25]); let result = agent.submit_chunk(chunk2);
match result {
Err(ManagerError::SubmitSizeTooLarge {
reservation_id,
largest,
actual,
}) => {
assert_eq!(reservation_id, 3);
assert_eq!(largest, 20); assert_eq!(actual, 25);
}
_ => panic!("期望 SubmitSizeTooLarge 错误, 得到 {:?}", result),
}
assert_eq!(agent.current_size(), 30); }
#[tokio::test]
async fn test_chunk_agent_commit_success() {
let (mock_request_tx, mut mock_request_rx) = mpsc::channel::<Request>(1);
let handle = ZeroCopyHandle::new(mock_request_tx);
let agent_id: ReservationId = 4;
let agent_size: usize = 50;
tokio::spawn(async move {
if let Some(Request::SubmitBytes(req)) = mock_request_rx.recv().await {
assert_eq!(req.reservation_id, agent_id);
if let CommitType::Chunked(chunks) = req.data {
assert_eq!(chunks.len(), 2);
assert_eq!(chunks[0].len(), 20);
assert_eq!(chunks[1].len(), 30);
} else {
panic!("期望 CommitType::Chunked");
}
req.reply_tx.send(Ok(())).unwrap();
}
});
let mut chunk_agent = ChunkAgent {
id: agent_id,
group_id: 0,
offset: 300,
size: agent_size,
handle,
chunks: Vec::new(),
current_size: 0,
committed: false,
};
chunk_agent
.submit_chunk(Bytes::from(vec![0u8; 20]))
.unwrap();
chunk_agent
.submit_chunk(Bytes::from(vec![0u8; 30]))
.unwrap();
let result = chunk_agent.commit().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_chunk_agent_commit_size_mismatch_at_agent() {
let (handle, _rx) = setup_handle_and_receiver(); let mut agent = ChunkAgent {
id: 4,
group_id: 0,
offset: 300,
size: 50, handle,
chunks: Vec::new(),
current_size: 0,
committed: false,
};
agent.submit_chunk(Bytes::from(vec![0u8; 20])).unwrap();
let result = agent.commit().await; match result {
Err(BufferError::ManagerError(ManagerError::CommitSizeMismatch {
reservation_id,
expected,
actual,
})) => {
assert_eq!(reservation_id, 4);
assert_eq!(expected, 50);
assert_eq!(actual, 20);
}
_ => panic!("期望 CommitSizeMismatch 错误, 得到 {:?}", result),
}
}
#[tokio::test]
async fn test_chunk_agent_drop_not_committed_sends_failed_info() {
let (handle, mut rx) = setup_handle_and_receiver();
let agent_id: ReservationId = 30;
let agent_offset: AbsoluteOffset = 330;
let agent_size: usize = 33;
{
let _chunk_agent = ChunkAgent {
id: agent_id,
group_id: 1,
offset: agent_offset,
size: agent_size,
handle: handle.clone(),
chunks: Vec::new(),
current_size: 0,
committed: false, };
}
match tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv()).await {
Ok(Some(Request::FailedInfo(req))) => {
assert_eq!(req.info.id, agent_id);
assert_eq!(req.info.offset, agent_offset);
assert_eq!(req.info.size, agent_size); }
_ => panic!("ChunkAgent (未提交) drop 时未发送 FailedInfo"),
}
}
impl CommitType {
fn get_single_bytes(self) -> Option<Bytes> {
match self {
CommitType::Single(b) => Some(b),
_ => None,
}
}
}
}