pub(crate) use crate::error::ManagerError;
use bytes::Bytes;
use std::{
collections::BTreeMap, num::NonZeroUsize, };
use tokio::sync::oneshot;
pub type ReservationId = u64;
pub type AbsoluteOffset = usize;
pub type GroupId = u64;
pub type SubmitReplyTx = oneshot::Sender<Result<(), ManagerError>>;
pub type ReserveReplyTx =
oneshot::Sender<Result<(ReservationId, AbsoluteOffset, GroupId), ManagerError>>;
pub type FinalizeReplyTx = oneshot::Sender<Option<FinalizeResult>>;
#[derive(Debug, Clone, PartialEq, Eq)] pub struct FailedReservationInfo {
pub id: ReservationId,
pub group_id: GroupId,
pub offset: AbsoluteOffset,
pub size: usize,
}
#[derive(Debug, Clone)] pub struct FailedGroupData {
pub group_id: GroupId,
pub group_chunks: BTreeMap<AbsoluteOffset, Bytes>,
pub failed_reservations: Vec<FailedReservationInfo>,
}
#[derive(Debug, Default)]
pub struct FinalizeResult {
pub failed: Vec<FailedGroupData>,
}
impl FinalizeResult {
pub fn failed_len(&self) -> usize {
self.failed.len()
}
pub fn is_empty(&self) -> bool {
self.failed.is_empty()
}
}
pub type SuccessfulGroupData = (AbsoluteOffset, Box<[u8]>);
pub type FailedGroupDataTransmission = FailedGroupData;
#[derive(Debug, Clone)] pub enum CommitType {
Chunked(Vec<Bytes>),
Single(Bytes),
}
#[derive(Debug)] pub struct ReserveRequest {
pub size: NonZeroUsize,
pub reply_tx: ReserveReplyTx,
}
#[derive(Debug)] pub struct SubmitBytesRequest {
pub reservation_id: ReservationId,
pub absolute_offset: AbsoluteOffset,
pub group_id: GroupId,
pub data: CommitType,
pub reply_tx: SubmitReplyTx,
}
#[derive(Debug, Clone)] pub struct FailedInfoRequest {
pub info: FailedReservationInfo,
}
#[derive(Debug)]
pub enum Request {
Reserve(ReserveRequest),
SubmitBytes(SubmitBytesRequest),
FailedInfo(FailedInfoRequest),
Finalize {
reply_tx: FinalizeReplyTx,
},
}
#[derive(Debug, Clone)] pub struct SubmitParams {
pub(crate) res_id: ReservationId,
pub(crate) group_id: GroupId,
pub(crate) offset: AbsoluteOffset,
}
impl SubmitParams {
pub fn into_single_request(self, bytes: Bytes, tx: SubmitReplyTx) -> Request {
Request::SubmitBytes(SubmitBytesRequest {
reservation_id: self.res_id,
absolute_offset: self.offset,
group_id: self.group_id,
data: CommitType::Single(bytes),
reply_tx: tx,
})
}
pub fn into_chunked_request(self, chunks: Vec<Bytes>, tx: SubmitReplyTx) -> Request {
Request::SubmitBytes(SubmitBytesRequest {
reservation_id: self.res_id,
absolute_offset: self.offset,
group_id: self.group_id,
data: CommitType::Chunked(chunks),
reply_tx: tx,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::num::NonZeroUsize;
#[test]
fn test_failed_reservation_info_creation() {
let info = FailedReservationInfo {
id: 1, group_id: 0, offset: 100, size: 50, };
assert_eq!(info.id, 1);
assert_eq!(info.group_id, 0);
assert_eq!(info.offset, 100);
assert_eq!(info.size, 50);
}
#[test]
fn test_failed_group_data_creation() {
let chunks = BTreeMap::new(); let reservations = Vec::new(); let data = FailedGroupData {
group_id: 10,
group_chunks: chunks.clone(), failed_reservations: reservations.clone(), };
assert_eq!(data.group_id, 10);
assert!(data.group_chunks.is_empty()); assert!(data.failed_reservations.is_empty()); }
#[test]
fn test_finalize_result_default_and_methods() {
let default_result = FinalizeResult::default();
assert!(default_result.is_empty()); assert_eq!(default_result.failed_len(), 0);
let result_with_failure = FinalizeResult {
failed: vec![FailedGroupData { group_id: 1,
group_chunks: BTreeMap::new(),
failed_reservations: Vec::new(),
}],
};
assert!(!result_with_failure.is_empty()); assert_eq!(result_with_failure.failed_len(), 1); }
#[test]
fn test_commit_type_variants() {
let single_data = Bytes::from_static(b"hello");
let commit_single = CommitType::Single(single_data.clone());
if let CommitType::Single(s) = commit_single {
assert_eq!(s, single_data); } else {
panic!("Expected CommitType::Single"); }
let chunk_data = vec![Bytes::from_static(b"world")];
let commit_chunked = CommitType::Chunked(chunk_data.clone());
if let CommitType::Chunked(c) = commit_chunked {
assert_eq!(c, chunk_data); } else {
panic!("Expected CommitType::Chunked"); }
}
#[test]
fn test_reserve_request_creation() {
let (reply_tx, _reply_rx) = oneshot::channel(); let size = NonZeroUsize::new(100).unwrap(); let req = ReserveRequest { size, reply_tx };
assert_eq!(req.size.get(), 100); }
#[test]
fn test_submit_bytes_request_creation() {
let (reply_tx, _reply_rx) = oneshot::channel(); let data = CommitType::Single(Bytes::new()); let req = SubmitBytesRequest {
reservation_id: 1,
absolute_offset: 0,
group_id: 0,
data: data.clone(), reply_tx, };
assert_eq!(req.reservation_id, 1);
assert_eq!(req.absolute_offset, 0);
assert_eq!(req.group_id, 0);
matches!(req.data, CommitType::Single(_));
}
#[test]
fn test_failed_info_request_creation() {
let failed_res_info = FailedReservationInfo {
id: 1,
group_id: 0,
offset: 0,
size: 0,
};
let req = FailedInfoRequest {
info: failed_res_info.clone(), };
assert_eq!(req.info, failed_res_info);
}
#[test]
fn test_request_enum_variants() {
let (reserve_reply_tx, _) = oneshot::channel();
let reserve_req_struct = ReserveRequest {
size: NonZeroUsize::new(1).unwrap(),
reply_tx: reserve_reply_tx,
};
let req_reserve = Request::Reserve(reserve_req_struct);
matches!(req_reserve, Request::Reserve(_));
let (submit_reply_tx, _) = oneshot::channel();
let submit_req_struct = SubmitBytesRequest {
reservation_id: 1,
absolute_offset: 0,
group_id: 0,
data: CommitType::Single(Bytes::new()),
reply_tx: submit_reply_tx,
};
let req_submit = Request::SubmitBytes(submit_req_struct);
matches!(req_submit, Request::SubmitBytes(_));
let failed_info_req_struct = FailedInfoRequest {
info: FailedReservationInfo {
id: 1,
group_id: 0,
offset: 0,
size: 0,
},
};
let req_failed_info = Request::FailedInfo(failed_info_req_struct);
matches!(req_failed_info, Request::FailedInfo(_));
let (finalize_reply_tx, _) = oneshot::channel();
let req_finalize = Request::Finalize { reply_tx: finalize_reply_tx };
matches!(req_finalize, Request::Finalize { .. }); }
}