1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
// src/manager/components/finalization_handler.rs
//! 定义了用于处理 Manager Finalize 期间所有剩余分组的 Trait (`FinalizationHandler`) 及其默认实现。
use crate::manager::components::group_data_processor::GroupDataProcessor;
use crate::manager::components::group_lifecycle::GroupLifecycleManager;
use crate::manager::operations::group_processing::try_process_taken_group_state; // 导入共享的分组处理逻辑
use crate::types::{ // 引入所需的类型
AbsoluteOffset, FailedGroupData, FailedGroupDataTransmission, FailedReservationInfo,
FinalizeResult, GroupId, ReservationId, SuccessfulGroupData,
};
use async_trait::async_trait; // 用于在 Trait 中定义异步方法
use bytes::Bytes; // 引入 Bytes 类型
use std::collections::BTreeMap; // 引入有序映射
use std::mem; // 引入 mem::take 用于获取字段所有权
use tokio::sync::mpsc; // 引入 MPSC 通道
use tracing::{debug, error, info, warn}; // 引入日志宏
/// 处理 Manager Finalize 阶段的 Trait。
///
/// 当 `ManagerActor` 收到 `Finalize` 请求或所有 `ZeroCopyHandle` 被 Drop 时,
/// 会调用此 Trait 的实现来处理所有当前存在于 `GroupLifecycleManager` 中的分组。
/// 目的是确保所有预留都有最终状态(成功或失败),并将结果(成功合并的数据或失败信息)
/// 发送给相应的消费者通道,同时生成一份包含无法发送的失败信息的 `FinalizeResult` 报告。
#[async_trait]
pub trait FinalizationHandler: Send + Sync + 'static { // 要求线程安全且静态生命周期
/// 处理所有剩余的分组,执行清理和报告。
///
/// 该方法会迭代 `GroupLifecycleManager` 中的所有分组 ID,并对每个分组执行以下操作:
/// 1. **强制密封**: 确保分组处于密封状态,即使其大小未达到阈值。
/// 2. **处理未完成预留**: 将分组状态中 `reservations` 集合内仍然存在的预留 ID 视为失败,
/// 查找它们的元数据(偏移量和大小),创建 `FailedReservationInfo` 并添加到分组的 `failed_infos` 列表。
/// 3. **获取分组状态**: 从 `GroupLifecycleManager` 中 `take` 出分组的状态 (`GroupState`)。
/// 4. **尝试处理**: 调用共享的 `try_process_taken_group_state` 函数处理获取到的 `GroupState`。
/// - 如果成功处理(发送了成功数据或确定为空),则记录日志。
/// - 如果处理失败(例如内部包含失败信息),则判定为失败分组。
/// 5. **处理失败分组**: 如果步骤 4 判定为失败,再次 `take` 该分组的状态(因为 `try_process...` 失败时会放回),
/// 提取其已提交的数据块 (`committed_data`) 和失败预留信息 (`failed_infos`),
/// 构建 `FailedGroupData` 结构体。
/// 6. **发送失败数据**: 尝试通过 `failed_data_tx` 通道发送构建好的 `FailedGroupData`。
/// 7. **记录报告**: 如果发送失败(例如通道已关闭或阻塞),则将 `FailedGroupData` 添加到最终的 `FinalizeResult` 报告中。
///
/// # Arguments
/// * `&self` - FinalizationHandler 实例的引用 (通常无状态)。
/// * `glm` - 对 `GroupLifecycleManager` 实现的可变引用,用于获取、修改和移除分组状态。
/// * `processor` - 对 `GroupDataProcessor` 实现的引用,用于在处理成功分组时合并数据。
/// * `completed_data_tx` - 对成功数据发送通道 (`mpsc::Sender<SuccessfulGroupData>`) 的引用。
/// * `failed_data_tx` - 对失败数据发送通道 (`mpsc::Sender<FailedGroupDataTransmission>`) 的引用。
///
/// # Returns
/// * `FinalizeResult` - 包含所有处理失败且 **未能** 通过 `failed_data_tx` 发送出去的失败分组信息 (`FailedGroupData`) 的集合。
async fn finalize_all_groups(
&self, // 通常 FinalizationHandler 本身是无状态的
glm: &mut (dyn GroupLifecycleManager + Send), // 需要可变引用来修改和 take 分组状态
processor: &(dyn GroupDataProcessor + Send + Sync), // 需要 Sync 因为可能被并发访问
completed_data_tx: &mpsc::Sender<SuccessfulGroupData>,
failed_data_tx: &mpsc::Sender<FailedGroupDataTransmission>,
) -> FinalizeResult;
}
/// `FinalizationHandler` Trait 的默认实现。
/// 提供了一个标准的 Finalize 处理逻辑。
pub struct DefaultFinalizationHandler;
impl DefaultFinalizationHandler {
/// 创建 `DefaultFinalizationHandler` 的新实例。
pub fn new() -> Self {
DefaultFinalizationHandler {}
}
}
#[async_trait]
impl FinalizationHandler for DefaultFinalizationHandler {
/// `DefaultFinalizationHandler` 对 `finalize_all_groups` 的具体实现。
async fn finalize_all_groups(
&self,
glm: &mut (dyn GroupLifecycleManager + Send),
processor: &(dyn GroupDataProcessor + Send + Sync),
completed_data_tx: &mpsc::Sender<SuccessfulGroupData>,
failed_data_tx: &mpsc::Sender<FailedGroupDataTransmission>,
) -> FinalizeResult {
info!("(FinalizationHandler) 开始 Finalize 过程,处理所有剩余分组...");
// 用于收集无法通过通道发送的失败分组信息
let mut failed_groups_for_report: Vec<FailedGroupData> = Vec::new();
// 获取当前 GroupLifecycleManager 中所有分组的 ID 列表
// 注意:这里获取的是快照,如果在迭代过程中 GLM 被并发修改,某些 ID 可能失效
let group_ids: Vec<GroupId> = glm.all_group_ids();
info!(
"(FinalizationHandler) Finalize: 发现 {} 个需要处理的分组: {:?}",
group_ids.len(),
group_ids
);
// 迭代处理每个分组 ID
for group_id in group_ids {
debug!("(FinalizationHandler) Finalize: 正在处理 Group {}", group_id);
// 预检查:在尝试任何操作前,再次确认分组是否存在于 GLM 中
// 这可以减少后续操作因分组已被移除而失败的可能性
if glm.get_group_ref(group_id).is_none() {
warn!(
"(FinalizationHandler) Finalize: 在处理开始时 Group ID {} 已不存在于 GLM,跳过。",
group_id
);
continue; // 处理下一个分组 ID
}
// --- 步骤 1: 强制密封分组 ---
// 确保分组进入可以被处理的状态
debug!("(FinalizationHandler) Finalize: 尝试强制密封 Group {}", group_id);
if let Err(e) = glm.force_seal_group(group_id) {
// 如果强制密封失败(例如分组已被移除),记录警告并跳过
warn!(
"(FinalizationHandler) Finalize: 强制密封 Group {} 失败: {:?}。可能已被并发移除,跳过处理。",
group_id, e
);
continue;
}
debug!("(FinalizationHandler) Finalize: Group {} 已强制密封", group_id);
// --- 步骤 2: 处理未完成的预留 (标记为失败) ---
// 获取对分组状态的可变引用,以处理其内部的 `reservations` 和 `failed_infos`
if let Some(group_mut_ref) = glm.get_group_mut(group_id) {
// 使用 drain() 从 `reservations` 集合中移除所有剩余的 ID,并收集它们
let remaining_res_ids: Vec<ReservationId> = group_mut_ref.reservations.drain().collect();
// 如果存在未完成的预留
if !remaining_res_ids.is_empty() {
warn!("(FinalizationHandler) Finalize: Group {} 发现 {} 个未提交/未完成的预留: {:?},将标记为失败", group_id, remaining_res_ids.len(), remaining_res_ids);
// 遍历每个未完成的预留 ID
for res_id_to_fail in remaining_res_ids {
// 尝试从元数据中获取该预留的偏移量和大小
if let Some((offset, size)) = group_mut_ref.get_reservation_metadata(res_id_to_fail) {
// 如果找到元数据,创建 FailedReservationInfo 并添加到 failed_infos
debug!("(FinalizationHandler) Finalize: 为未完成的 Res {} (Offset: {}, Size: {}) 创建失败信息", res_id_to_fail, offset, size);
group_mut_ref.add_failed_info(FailedReservationInfo {
id: res_id_to_fail,
group_id, // 关联的分组 ID
offset, // 预留的偏移量
size, // 预留的大小
});
} else {
// 严重错误:预留存在于 `reservations` 中,但其元数据却丢失了!
// 这通常不应该发生,表明状态管理可能存在问题。
// 记录错误,并使用无效值创建失败信息以尽量保留记录。
error!("(FinalizationHandler) Finalize: 严重错误!未完成的 Res {} 在 Group {} 的元数据中也找不到!", res_id_to_fail, group_id);
group_mut_ref.add_failed_info(FailedReservationInfo {
id: res_id_to_fail,
group_id,
offset: AbsoluteOffset::MAX, // 使用无效偏移量
size: 0, // 使用无效大小
});
}
}
}
} else {
// 如果在强制密封后无法获取可变引用,说明分组可能刚好被其他任务移除了
warn!("(FinalizationHandler) Finalize: 获取 Group {} 以处理未提交预留时未找到。可能已被并发移除,尝试继续 take。", group_id);
// 即使无法标记失败预留,仍然尝试继续执行 take 和处理
}
// --- 步骤 3: 从 GLM 中 `take` 分组状态 ---
// `take_group` 会尝试移除并返回 GroupState 的所有权
debug!("(FinalizationHandler) Finalize: 尝试 take Group {} 的状态", group_id);
let group_state_option = glm.take_group(group_id);
// --- 步骤 4 & 5 & 6 & 7: 处理获取到的分组状态 ---
if let Some(taken_group_state) = group_state_option {
debug!("(FinalizationHandler) Finalize: 成功 take Group {} 状态,调用 try_process_taken_group_state...", group_id);
// 调用位于 manager::operations::group_processing 中的共享处理逻辑
match try_process_taken_group_state(
group_id,
taken_group_state, // 移交 GroupState 的所有权
glm, // 传入 GLM 引用 (处理失败时可能需要放回状态)
processor, // 传入数据处理器引用
completed_data_tx, // 传入成功数据通道引用
).await // 处理逻辑是异步的
{
Ok(processed_and_consumed) => {
// 处理成功
if processed_and_consumed {
// 分组已成功处理并发送(或为空)
info!("(FinalizationHandler) Finalize: Group {} 在 Finalize 期间被成功处理(数据已发送或为空)。", group_id);
} else {
// 理论上不应发生:try_process... 返回 Ok(false) 意味着状态被放回了,
// 但我们已经 take 了状态。这可能指示一个逻辑错误。
error!("(FinalizationHandler) Finalize: Group {} 处理意外返回 Ok(false) 状态!", group_id);
// 尽管意外,但继续下一个分组
}
}
Err(manager_error) => {
// 处理失败,分组状态已被 try_process... 放回 GLM
warn!("(FinalizationHandler) Finalize: Group {} 处理失败 ({:?}),判定为失败分组。将构建并发送失败数据。", group_id, manager_error);
// --- 步骤 5 (续): 再次 take 失败分组的状态以构建 FailedGroupData ---
if let Some(mut failed_group_state_after_processing_failure) =
glm.take_group(group_id)
{
debug!("(FinalizationHandler) Finalize: 成功 take 失败的 Group {} 状态以构建报告。", group_id);
// 从失败的分组状态中提取所需信息
// 使用 mem::take 获取字段所有权,避免克隆大型数据结构
let committed_data_for_failure = mem::take(
&mut failed_group_state_after_processing_failure.committed_data,
);
let failed_reservation_infos_for_failure = mem::take(
&mut failed_group_state_after_processing_failure.failed_infos,
);
// 元数据也取走并丢弃,确保 GroupState 清空
let _metadata_for_failure = mem::take(
&mut failed_group_state_after_processing_failure.reservation_metadata,
);
// 将 committed_data (按预留偏移组织) 转换为 group_chunks (按块绝对偏移组织)
let group_chunks_for_failure: BTreeMap<AbsoluteOffset, Bytes> =
committed_data_for_failure
.into_iter()
.flat_map(|(res_start_offset, (_res_id, _res_size, chunks))| {
// `chunks` 的 Key 是块相对于预留 `res_start_offset` 的偏移 (usize)
// 需要转换为块的绝对偏移 = res_start_offset + chunk_relative_offset
let mut absolute_chunk_map = BTreeMap::new();
for (chunk_relative_offset, bytes) in chunks {
absolute_chunk_map.insert(
res_start_offset + chunk_relative_offset, // 计算绝对偏移
bytes,
);
}
absolute_chunk_map // flat_map 需要返回一个迭代器
})
.collect(); // 收集成 BTreeMap<AbsoluteOffset, Bytes>
// 构建 FailedGroupData 结构体
let failed_data = FailedGroupData {
group_id,
group_chunks: group_chunks_for_failure, // 包含所有成功提交的块
failed_reservations: failed_reservation_infos_for_failure, // 包含所有失败预留的信息
};
debug!("(FinalizationHandler) Finalize: 为失败 Group {} 构建 FailedGroupData ({} 个块, {} 个失败信息)", group_id, failed_data.group_chunks.len(), failed_data.failed_reservations.len());
// --- 步骤 6 & 7: 尝试发送失败数据,否则加入报告 ---
// 检查失败数据通道是否仍然打开
if !failed_data_tx.is_closed() {
// 使用 try_send 进行非阻塞发送尝试
// 在 Finalize 期间,下游消费者可能已经关闭或停止处理,
// 使用阻塞的 send 可能导致 Manager 卡死。
if let Err(send_error) =
failed_data_tx.try_send(failed_data.clone()) // 克隆数据以备加入报告
{
error!("(FinalizationHandler) Finalize: 发送失败 Group {} 的 FailedGroupData 到通道失败 (可能已满或关闭): {}。将加入报告。", group_id, send_error);
// 发送失败,将数据加入最终报告
failed_groups_for_report.push(failed_data);
} else {
info!("(FinalizationHandler) Finalize: 失败 Group {} 的 FailedGroupData 已成功发送到失败通道。", group_id);
}
} else {
warn!("(FinalizationHandler) Finalize: 失败数据通道已关闭,无法发送 Group {} 的数据。将加入报告。", group_id);
// 通道已关闭,将数据加入最终报告
failed_groups_for_report.push(failed_data);
}
} else {
// 严重错误:处理失败后无法再次 take 分组状态!
// 这可能意味着 GLM 的状态不一致,或者 take 逻辑有问题。
error!("(FinalizationHandler) Finalize: 严重错误!Group {} 处理失败后无法再次从 GLM take 其状态!无法生成该分组的失败报告。", group_id);
}
}
}
} else {
// 如果在标记未完成预留为失败后,take_group 仍然失败,
// 说明分组在标记失败和 take 之间被并发移除了。
warn!("(FinalizationHandler) Finalize: 尝试 take Group {} 以最终处理时失败(在标记失败预留后)。可能已被并发移除,跳过。", group_id);
}
}
// Finalize 循环结束
info!("(FinalizationHandler) Finalize 处理完成。最终报告将包含 {} 个未能通过通道发送的失败分组信息。", failed_groups_for_report.len());
// 返回 FinalizeResult,其中包含所有未能成功发送到失败通道的 FailedGroupData
FinalizeResult {
failed: failed_groups_for_report,
}
}
}