use crate::manager::components::finalization_handler::FinalizationHandler;
use crate::manager::components::{ComponentsBuilder, ManagerComponents, SpawnInfoProvider};
use crate::types::Request;
use crate::{FailedGroupDataTransmission, FinalizeResult, ManagerError, SuccessfulGroupData, ZeroCopyHandle};
use std::num::NonZeroUsize;
use tokio::sync::mpsc;
use tracing::{error, info, trace, warn};
pub struct ManagerActor<C: ManagerComponents> {
request_rx: mpsc::Receiver<Request>,
pub(crate) completed_data_tx: mpsc::Sender<SuccessfulGroupData>,
pub(crate) failed_data_tx: mpsc::Sender<FailedGroupDataTransmission>,
pub(crate) reservation_allocator: C::RA,
pub(crate) group_lifecycle_manager: C::GLM,
pub(crate) group_data_processor: C::GDP,
finalization_handler: C::FH,
pub(crate) is_finalizing: bool,
}
impl<C: ManagerComponents> ManagerActor<C> {
pub fn spawn(
channel_buffer_size: NonZeroUsize,
config: &<<C as ManagerComponents>::CB as ComponentsBuilder<C>>::BuilderConfig,
) -> (
ZeroCopyHandle,
mpsc::Receiver<SuccessfulGroupData>,
mpsc::Receiver<FailedGroupDataTransmission>,
) {
let chan_size = usize::from(channel_buffer_size);
let (request_tx, request_rx) = mpsc::channel(chan_size);
let (completed_data_tx, completed_data_rx) = mpsc::channel(chan_size);
let (failed_data_tx, failed_data_rx) = mpsc::channel(chan_size);
let components = C::CB::build(config);
let manager: ManagerActor<C> = ManagerActor {
request_rx, completed_data_tx, failed_data_tx, reservation_allocator: components.0, group_lifecycle_manager: components.1, group_data_processor: components.2, finalization_handler: components.3, is_finalizing: false, };
let handle = ZeroCopyHandle::new(request_tx);
tokio::spawn(manager.run());
C::SIP::info(channel_buffer_size, config);
(handle, completed_data_rx, failed_data_rx)
}
async fn run(mut self) {
info!("(Manager) 事件循环开始。");
loop {
tokio::select! {
maybe_request = self.request_rx.recv() => {
match maybe_request {
Some(request) => {
trace!("(Manager) 收到请求: {:?}", request);
if !self.handle_request(request).await {
info!("(Manager) handle_request 指示停止事件循环 (通常在 Finalize 后)。");
break; }
}
None => {
info!("(Manager) 请求通道已关闭 (所有 Handle 已 Drop),自动开始 Finalize...");
let _finalize_report = self.finalize_internal().await; info!("(Manager) Finalize (因通道关闭) 完成。");
break; }
}
}
}
}
info!("(Manager) 事件循环结束。正在关闭数据通道...");
info!("(Manager) 数据通道已关闭。Manager Actor 任务正常退出。");
}
async fn handle_request(&mut self, request: Request) -> bool {
if self.is_finalizing {
trace!("(Manager) 当前处于 Finalizing 状态,正在处理请求: {:?}", request);
match request {
Request::Finalize { reply_tx } => {
warn!("(Manager) Finalizing 状态下收到重复的 Finalize 请求,将忽略并回复 None");
let _ = reply_tx.send(None).map_err(|_e| {
error!("(Manager) 回复重复 Finalize 请求失败,接收端可能已放弃等待")
});
}
Request::Reserve(req) => {
warn!("(Manager) Finalizing 状态,拒绝 Reserve 请求");
let _ = req
.reply_tx
.send(Err(ManagerError::ManagerFinalizing))
.map_err(|_e| {
error!("(Manager) 回复 Reserve 拒绝失败,Agent 可能已放弃等待")
});
}
Request::SubmitBytes(req) => {
warn!(
"(Manager) Finalizing 状态,拒绝 SubmitBytes 请求 for Res {}",
req.reservation_id
);
let _ = req
.reply_tx
.send(Err(ManagerError::ManagerFinalizing))
.map_err(|_e| {
error!("(Manager) 回复 SubmitBytes 拒绝失败,Agent 可能已放弃等待")
});
}
Request::FailedInfo(req) => {
warn!(
"(Manager) Finalizing 状态,忽略 FailedInfo 请求 for Res {}",
req.info.id
);
}
}
return true;
}
match request {
Request::Reserve(req) => {
self.handle_reserve(req);
}
Request::SubmitBytes(req) => {
self.handle_submit_bytes(req).await;
}
Request::FailedInfo(req) => {
self.handle_failed_info(req).await;
}
Request::Finalize { reply_tx } => {
info!("(Manager) 收到明确的 Finalize 请求,开始执行...");
let finalize_result: Option<FinalizeResult> = self.finalize_internal().await;
if let Some(ref report) = finalize_result {
info!(
"(Manager) Finalize 执行完成,生成报告(含 {} 个需报告的失败组信息)。",
report.failed_len()
);
} else {
info!("(Manager) Finalize 执行完成,但未生成新报告 (Finalize 已在进行)。");
}
if reply_tx.send(finalize_result).is_err() {
error!("(Manager) 发送 Finalize 结果给调用者失败 (Handle 可能已 Drop)");
}
return false;
}
}
true
}
async fn finalize_internal(&mut self) -> Option<FinalizeResult> {
if self.is_finalizing {
warn!("(Manager) Finalize 已在进行中,忽略重复的 finalize_internal 调用");
return None; }
info!("(Manager) 开始执行内部 Finalize 逻辑...");
self.is_finalizing = true;
let result = self
.finalization_handler .finalize_all_groups( &mut self.group_lifecycle_manager, &self.group_data_processor, &self.completed_data_tx, &self.failed_data_tx, )
.await;
info!(
"(Manager) 内部 Finalize 调用 FinalizationHandler 完成,生成报告 (含 {} 个失败组)",
result.failed_len()
);
Some(result) }
}