use crate::manager::components::group_data_processor::{
GroupDataProcessor, ProcessedGroupOutcome, ProcessingError, ProcessorTaskError,
};
use crate::manager::components::group_lifecycle::GroupLifecycleManager;
use crate::manager::components::ManagerComponents;
use crate::manager::state::GroupState;
use crate::types::{GroupId, ReservationId};
use crate::{FailedReservationInfo, ManagerActor, ManagerError, SuccessfulGroupData};
use tokio::sync::mpsc;
use tracing::{debug, error, info, trace, warn};
pub(crate) async fn try_process_taken_group_state(
group_id: GroupId,
mut group_state: GroupState, glm: &mut dyn GroupLifecycleManager, processor: &(dyn GroupDataProcessor + Send + Sync), completed_data_tx: &mpsc::Sender<SuccessfulGroupData>,
) -> Result<bool, ManagerError> { trace!("(GroupProcessing) [try_process] 开始处理取出的 Group {} 状态", group_id);
let current_committed_data = std::mem::take(&mut group_state.committed_data);
let current_failed_infos = std::mem::take(&mut group_state.failed_infos);
let current_reservation_metadata = std::mem::take(&mut group_state.reservation_metadata);
debug!(
"(GroupProcessing) [try_process] 处理 Group {} (包含 {} 提交项, {} 失败项, {} 元数据项)",
group_id,
current_committed_data.len(),
current_failed_infos.len(),
current_reservation_metadata.len()
);
if !current_failed_infos.is_empty() {
warn!(
"(GroupProcessing) [try_process] Group {} 包含 {} 个失败预留,判定为处理失败。将状态放回 GLM。",
group_id,
current_failed_infos.len()
);
group_state.committed_data = current_committed_data;
group_state.failed_infos = current_failed_infos;
group_state.reservation_metadata = current_reservation_metadata;
glm.insert_group(group_id, group_state);
trace!("(GroupProcessing) [try_process] 失败的 Group {} 状态已放回 GLM。", group_id);
return Err(ManagerError::GroupContainsFailures(group_id));
}
if current_committed_data.is_empty() {
debug!(
"(GroupProcessing) [try_process] Group {} 无提交数据也无失败信息,视为空分组并消耗。",
group_id
);
return Ok(true); }
trace!("(GroupProcessing) [try_process] Group {} 有提交数据且无失败,调用 GroupDataProcessor...", group_id);
match processor.process_group_data(
group_id,
current_committed_data, ¤t_reservation_metadata, ) {
Ok(ProcessedGroupOutcome::Success(successful_data)) => {
let (start_offset, ref payload) = successful_data;
let data_size = payload.len();
info!(
"(GroupProcessing) [try_process] Group {} 数据由 Processor 成功合并 (Offset: {}, Size: {}),尝试发送...",
group_id, start_offset, data_size
);
if let Err(send_err) = completed_data_tx.send(successful_data).await {
error!(
"(GroupProcessing) [try_process] Group {} 发送成功合并的数据到通道失败: {}。将状态标记为失败并放回 GLM。",
group_id, send_err
);
group_state.failed_infos.push(FailedReservationInfo {
id: ReservationId::MAX, group_id,
offset: start_offset, size: data_size, });
group_state.reservation_metadata = current_reservation_metadata;
glm.insert_group(group_id, group_state);
trace!("(GroupProcessing) [try_process] 发送失败的 Group {} 状态已放回 GLM。", group_id);
Err(ManagerError::SendCompletionFailed(group_id))
} else {
info!(
"(GroupProcessing) [try_process] Group {} 成功合并的数据已发送给消费者。分组处理完成并消耗。",
group_id
);
Ok(true) }
}
Ok(ProcessedGroupOutcome::Empty) => {
debug!(
"(GroupProcessing) [try_process] Group {} 由 Processor 处理后视为空分组并消耗。",
group_id
);
Ok(true) }
Err(processor_task_error) => {
let ProcessorTaskError {
error: processing_err,
committed_data_on_failure,
} = processor_task_error;
warn!(
"(GroupProcessing) [try_process] Group {} 数据由 Processor 处理失败: {:?}。将状态放回 GLM。",
group_id, processing_err
);
group_state.committed_data = committed_data_on_failure;
group_state.reservation_metadata = current_reservation_metadata;
glm.insert_group(group_id, group_state);
trace!("(GroupProcessing) [try_process] Processor 处理失败的 Group {} 状态已放回 GLM。", group_id);
match processing_err {
ProcessingError::NotContiguous(gid, exp, act) => {
Err(ManagerError::GroupNotContiguous(gid, exp, act))
}
ProcessingError::SizeMismatch(gid, exp, act) => {
Err(ManagerError::GroupSizeMismatch(gid, exp, act))
}
ProcessingError::OffsetMismatch(gid, exp, act) => {
Err(ManagerError::GroupOffsetMismatch(gid, exp, act))
}
ProcessingError::MergeFailure(gid, exp, act) => {
Err(ManagerError::MergeSizeMismatch(gid, exp, act))
}
ProcessingError::AttemptToProcessEmptyCommittedData(gid) => {
error!("(GroupProcessing) [try_process] 内部逻辑错误:Processor 报告为 Group {} 处理了空的 committed_data!", gid);
Err(ManagerError::Internal(format!(
"Processor logic error: attempt to process empty committed data for group {}",
gid
)))
}
}
}
}
}
impl<C: ManagerComponents> ManagerActor<C> {
pub(crate) async fn check_and_process_completed_group(
&mut self,
group_id: GroupId,
) -> Result<bool, ManagerError> {
let can_process_and_exists = self
.group_lifecycle_manager
.get_group_ref(group_id)
.map_or(false, |g_ref| g_ref.is_complete());
if !can_process_and_exists {
warn!(
"(Manager) [check_and_process] 尝试处理的分组 {} 不存在或未完成。",
group_id
);
return Ok(false);
}
let mut group_state = match self.group_lifecycle_manager.take_group(group_id) {
Some(state) => state,
None => {
error!("(Manager) [check_and_process] 在尝试从GLM take分组 {} 时发现其已被移除 (预检查后)", group_id);
return Ok(false);
}
};
let committed_data_map = std::mem::take(&mut group_state.committed_data); let failed_reservation_infos = std::mem::take(&mut group_state.failed_infos);
let reservation_metadata = std::mem::take(&mut group_state.reservation_metadata);
info!(
"(Manager) [check_and_process] 检查取出的分组 {} 状态 (含 {} 提交项, {} 失败项, {} 元数据项)",
group_id, committed_data_map.len(), failed_reservation_infos.len(), reservation_metadata.len()
);
if !failed_reservation_infos.is_empty() {
warn!(
"(Manager) [check_and_process] 分组 {} 包含 {} 个失败预留,判定为处理失败。",
group_id,
failed_reservation_infos.len()
);
group_state.committed_data = committed_data_map; group_state.failed_infos = failed_reservation_infos;
group_state.reservation_metadata = reservation_metadata;
self.group_lifecycle_manager
.insert_group(group_id, group_state);
debug!(
"(Manager) [check_and_process] 已将失败分组 {} 的状态放回 GLM",
group_id
);
return Err(ManagerError::GroupContainsFailures(group_id));
}
if committed_data_map.is_empty() {
debug!(
"(Manager) [check_and_process] 分组 {} 既无提交数据也无失败信息,视为空分组处理。",
group_id
);
return Ok(true);
}
match self.group_data_processor.process_group_data(
group_id,
committed_data_map,
&reservation_metadata,
) {
Ok(ProcessedGroupOutcome::Success(successful_data)) => {
info!("(Manager) [check_and_process] 分组 {} 数据由 Processor 成功处理并合并,准备发送...", group_id);
let (start_offset_for_send_failure, ref data_payload_for_send_failure) =
successful_data;
let data_size_for_send_failure = data_payload_for_send_failure.len();
if let Err(e) = self.completed_data_tx.send(successful_data).await {
error!(
"(Manager) [check_and_process] 发送完成的分组 {} 数据失败: {}",
group_id, e
);
group_state.failed_infos.push(FailedReservationInfo {
id: ReservationId::MAX,
group_id,
offset: start_offset_for_send_failure,
size: data_size_for_send_failure,
});
group_state.reservation_metadata = reservation_metadata; self.group_lifecycle_manager
.insert_group(group_id, group_state);
Err(ManagerError::SendCompletionFailed(group_id))
} else {
info!(
"(Manager) [check_and_process] 分组 {} 已成功发送给消费者。",
group_id
);
Ok(true) }
}
Ok(ProcessedGroupOutcome::Empty) => {
debug!(
"(Manager) [check_and_process] 分组 {} 由 Processor 处理后视为空。",
group_id
);
Ok(true) }
Err(processor_task_error) => {
let ProcessorTaskError {
error: processing_error,
committed_data_on_failure,
} = processor_task_error;
warn!("(Manager) [check_and_process] 分组 {} 数据由 Processor 处理失败: {:?}. 此分组处理失败。", group_id, processing_error);
group_state.committed_data = committed_data_on_failure;
group_state.reservation_metadata = reservation_metadata;
self.group_lifecycle_manager
.insert_group(group_id, group_state);
match processing_error {
ProcessingError::NotContiguous(gid, exp, act) => {
Err(ManagerError::GroupNotContiguous(gid, exp, act))
}
ProcessingError::SizeMismatch(gid, exp, act) => {
Err(ManagerError::GroupSizeMismatch(gid, exp, act))
}
ProcessingError::OffsetMismatch(gid, exp, act) => {
Err(ManagerError::GroupOffsetMismatch(gid, exp, act))
}
ProcessingError::MergeFailure(gid, exp, act) => {
Err(ManagerError::MergeSizeMismatch(gid, exp, act))
}
ProcessingError::AttemptToProcessEmptyCommittedData(gid) => {
error!("(Manager) [check_and_process] Processor 接到空 committed data for group {}, 这不应发生。", gid);
Err(ManagerError::Internal(format!(
"Processor error: attempt to process empty committed data for group {}",
gid
)))
}
}
}
}
}
}