use std::any::TypeId;
use std::collections::HashMap;
use std::sync::{Arc, RwLock, OnceLock};
use yrs::TransactionMut;
use mf_transform::step::Step;
use crate::types::StepResult;
use super::error::{ConversionError, ConversionResult};
use super::typed_converter::{ErasedConverter, ConversionContext, ConverterInfo};
pub struct StaticConverterRegistry {
converters: HashMap<TypeId, Arc<ErasedConverter>>,
ordered_converters: Vec<Arc<ErasedConverter>>,
converter_info: HashMap<TypeId, ConverterInfo>,
performance_stats: PerformanceStats,
}
impl StaticConverterRegistry {
pub fn new() -> Self {
Self {
converters: HashMap::new(),
ordered_converters: Vec::new(),
converter_info: HashMap::new(),
performance_stats: PerformanceStats::new(),
}
}
pub fn register_converter<T, C>(&mut self) -> &mut Self
where
T: Step + 'static,
C: super::typed_converter::TypedStepConverter<T> + Default + 'static,
{
let type_id = TypeId::of::<T>();
let converter = Arc::new(ErasedConverter::new::<T, C>());
if self.converters.contains_key(&type_id) {
tracing::warn!(
"转换器重复注册: {} for type {}",
converter.converter_name(),
converter.type_name()
);
return self;
}
let info = ConverterInfo {
type_name: converter.type_name(),
converter_name: converter.converter_name(),
priority: converter.priority(),
supports_concurrent: converter.supports_concurrent(),
step_type_id: type_id,
};
self.converter_info.insert(type_id, info);
self.converters.insert(type_id, converter.clone());
let insert_pos = self
.ordered_converters
.iter()
.position(|c| c.priority() > converter.priority())
.unwrap_or(self.ordered_converters.len());
self.ordered_converters.insert(insert_pos, converter);
tracing::info!(
"✅ 转换器已注册: {} for {} (优先级: {})",
C::converter_name(),
std::any::type_name::<T>(),
C::priority()
);
self
}
pub fn convert_step(
&self,
step: &dyn Step,
txn: &mut TransactionMut,
context: &ConversionContext,
) -> ConversionResult<StepResult> {
let start_time = std::time::Instant::now();
let step_type_id = step.type_id();
if let Some(converter) = self.converters.get(&step_type_id) {
let result = converter.try_convert(step, txn, context);
self.performance_stats.record_conversion(
step_type_id,
start_time.elapsed(),
result.is_ok(),
);
return result;
}
for converter in &self.ordered_converters {
if converter.type_id() == step_type_id {
let result = converter.try_convert(step, txn, context);
self.performance_stats.record_conversion(
step_type_id,
start_time.elapsed(),
result.is_ok(),
);
return result;
}
}
self.performance_stats.record_conversion(
step_type_id,
start_time.elapsed(),
false,
);
Err(ConversionError::UnsupportedStepType {
step_type: step.name().to_string(),
type_id: step_type_id,
})
}
pub fn convert_steps_batch(
&self,
steps: &[&dyn Step],
txn: &mut TransactionMut,
context: &ConversionContext,
) -> Vec<ConversionResult<StepResult>> {
let mut results = Vec::with_capacity(steps.len());
let mut grouped_steps: HashMap<TypeId, Vec<&dyn Step>> = HashMap::new();
for step in steps {
grouped_steps.entry(step.type_id()).or_default().push(*step);
}
for (type_id, group_steps) in grouped_steps {
if let Some(converter) = self.converters.get(&type_id) {
for step in group_steps {
let result = converter.try_convert(step, txn, context);
results.push(result);
}
} else {
for step in group_steps {
results.push(Err(ConversionError::UnsupportedStepType {
step_type: step.name().to_string(),
type_id,
}));
}
}
}
results
}
pub fn validate_step(
&self,
step: &dyn Step,
_context: &ConversionContext,
) -> ConversionResult<()> {
let step_type_id = step.type_id();
if let Some(_converter) = self.converters.get(&step_type_id) {
Ok(())
} else {
Err(ConversionError::UnsupportedStepType {
step_type: step.name().to_string(),
type_id: step_type_id,
})
}
}
pub fn supports_step_type(
&self,
type_id: TypeId,
) -> bool {
self.converters.contains_key(&type_id)
}
pub fn get_all_converter_info(&self) -> Vec<&ConverterInfo> {
self.converter_info.values().collect()
}
pub fn get_converter_info(
&self,
type_id: TypeId,
) -> Option<&ConverterInfo> {
self.converter_info.get(&type_id)
}
pub fn get_performance_stats(&self) -> &PerformanceStats {
&self.performance_stats
}
pub fn clear(&mut self) {
self.converters.clear();
self.ordered_converters.clear();
self.converter_info.clear();
self.performance_stats = PerformanceStats::new();
}
pub fn converter_count(&self) -> usize {
self.converters.len()
}
}
impl Default for StaticConverterRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct PerformanceStats {
total_conversions: std::sync::atomic::AtomicU64,
successful_conversions: std::sync::atomic::AtomicU64,
type_stats: RwLock<HashMap<TypeId, TypeConversionStats>>,
created_at: std::time::Instant,
}
#[derive(Debug, Clone)]
pub struct TypeConversionStats {
pub total_count: u64,
pub success_count: u64,
pub total_duration: std::time::Duration,
pub avg_duration: std::time::Duration,
pub min_duration: std::time::Duration,
pub max_duration: std::time::Duration,
}
impl Default for PerformanceStats {
fn default() -> Self {
Self::new()
}
}
impl PerformanceStats {
pub fn new() -> Self {
Self {
total_conversions: std::sync::atomic::AtomicU64::new(0),
successful_conversions: std::sync::atomic::AtomicU64::new(0),
type_stats: RwLock::new(HashMap::new()),
created_at: std::time::Instant::now(),
}
}
pub fn record_conversion(
&self,
type_id: TypeId,
duration: std::time::Duration,
success: bool,
) {
use std::sync::atomic::Ordering;
self.total_conversions.fetch_add(1, Ordering::Relaxed);
if success {
self.successful_conversions.fetch_add(1, Ordering::Relaxed);
}
let mut type_stats = self.type_stats.write().unwrap();
let stats =
type_stats.entry(type_id).or_insert_with(|| TypeConversionStats {
total_count: 0,
success_count: 0,
total_duration: std::time::Duration::ZERO,
avg_duration: std::time::Duration::ZERO,
min_duration: std::time::Duration::MAX,
max_duration: std::time::Duration::ZERO,
});
stats.total_count += 1;
if success {
stats.success_count += 1;
}
stats.total_duration += duration;
stats.avg_duration = stats.total_duration / stats.total_count as u32;
stats.min_duration = stats.min_duration.min(duration);
stats.max_duration = stats.max_duration.max(duration);
}
pub fn get_total_conversions(&self) -> u64 {
self.total_conversions.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn get_success_rate(&self) -> f64 {
let total = self.get_total_conversions();
if total == 0 {
0.0
} else {
let successful = self
.successful_conversions
.load(std::sync::atomic::Ordering::Relaxed);
successful as f64 / total as f64
}
}
pub fn get_type_stats(
&self,
type_id: TypeId,
) -> Option<TypeConversionStats> {
self.type_stats.read().unwrap().get(&type_id).cloned()
}
pub fn get_uptime(&self) -> std::time::Duration {
self.created_at.elapsed()
}
}
static GLOBAL_REGISTRY: OnceLock<RwLock<StaticConverterRegistry>> =
OnceLock::new();
pub fn global_registry() -> &'static RwLock<StaticConverterRegistry> {
GLOBAL_REGISTRY.get_or_init(|| RwLock::new(StaticConverterRegistry::new()))
}
pub fn register_global_converter<T, C>()
where
T: Step + 'static,
C: super::typed_converter::TypedStepConverter<T> + Default + 'static,
{
match global_registry().write() {
Ok(mut registry) => {
registry.register_converter::<T, C>();
},
Err(e) => {
tracing::error!("无法获取全局注册表写锁以注册转换器: {}", e);
},
}
}
pub fn convert_step_global(
step: &dyn Step,
txn: &mut TransactionMut,
context: &ConversionContext,
) -> ConversionResult<StepResult> {
let registry =
global_registry().read().map_err(|e| ConversionError::Custom {
message: format!("无法获取全局注册表读锁: {e}"),
})?;
registry.convert_step(step, txn, context)
}
pub fn get_global_performance_stats()
-> std::sync::RwLockReadGuard<'static, StaticConverterRegistry> {
global_registry()
.read()
.expect("获取全局注册表读锁失败:这是一个严重的内部错误,请报告此问题")
}