use crate::diagnostics::{Error as DiagnosticError, Result, Span};
use std::sync::{Arc, RwLock, Mutex};
use std::thread::ThreadId;
use std::collections::{HashMap, VecDeque};
use std::time::{SystemTime, Duration};
use std::sync::atomic::{AtomicU64, Ordering};
use crossbeam::channel::{Sender, Receiver, unbounded};
use std::fmt;
#[derive(Debug)]
pub struct ErrorPropagationCoordinator {
thread_error_contexts: Arc<RwLock<HashMap<ThreadId, ThreadErrorContext>>>,
error_channels: Arc<RwLock<HashMap<ThreadId, ErrorChannel>>>,
error_history: Arc<Mutex<VecDeque<ErrorEvent>>>,
policies: Arc<ErrorPropagationPolicies>,
sequence_counter: AtomicU64,
}
#[derive(Debug, Clone)]
pub struct ThreadErrorContext {
pub thread_id: ThreadId,
pub error_stack: Vec<ThreadError>,
pub propagation_state: ErrorPropagationState,
pub last_error_time: Option<SystemTime>,
pub error_generation: u64,
}
#[derive(Debug)]
pub struct ErrorChannel {
pub sender: Sender<ErrorPropagationMessage>,
#[allow(dead_code)] pub receiver: Receiver<ErrorPropagationMessage>,
}
#[derive(Debug, Clone)]
pub struct ThreadError {
pub id: u64,
pub diagnostic_error: DiagnosticError,
pub originating_thread: ThreadId,
pub stack_trace: Vec<StackFrame>,
pub propagation_path: Vec<ThreadId>,
pub occurred_at: SystemTime,
pub severity: ErrorSeverity,
pub category: ErrorCategory,
pub context: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub struct StackFrame {
pub function_name: Option<String>,
pub file_info: Option<String>,
pub line_number: Option<u32>,
pub column_number: Option<u32>,
pub span: Option<Span>,
pub thread_id: ThreadId,
pub context: Option<String>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ErrorPropagationState {
Normal,
ErrorOccurred,
PropagatingError,
PropagationCompleted,
ErrorHandled,
PropagationFailed,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ErrorSeverity {
Info,
Warning,
Error,
Critical,
Fatal,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ErrorCategory {
Syntax,
Type,
Runtime,
IO,
Effect,
Resource,
Concurrency,
System,
User,
}
#[derive(Debug, Clone)]
pub struct ErrorEvent {
pub sequence: u64,
pub event_type: ErrorEventType,
pub thread_id: ThreadId,
pub error: ThreadError,
pub timestamp: SystemTime,
pub context: Option<String>,
}
#[derive(Debug, Clone)]
pub enum ErrorEventType {
ErrorOccurred,
ErrorPropagating,
PropagationCompleted,
ErrorHandled,
PropagationFailed,
ThreadShutdown,
}
#[derive(Debug, Clone)]
pub enum ErrorPropagationMessage {
PropagateError {
#[allow(dead_code)] error: ThreadError,
#[allow(dead_code)] target_threads: Vec<ThreadId>,
#[allow(dead_code)] propagation_strategy: PropagationStrategy,
},
#[allow(dead_code)] ErrorAcknowledged {
error_id: u64,
acknowledging_thread: ThreadId,
},
#[allow(dead_code)] ErrorHandled {
error_id: u64,
handling_thread: ThreadId,
result: std::result::Result<(), String>,
},
#[allow(dead_code)] RequestErrorContext {
requesting_thread: ThreadId,
},
#[allow(dead_code)] ErrorContextResponse {
context: ThreadErrorContext,
},
FatalErrorShutdown {
#[allow(dead_code)] error: ThreadError,
#[allow(dead_code)] message: String,
},
}
#[derive(Debug, Clone)]
pub enum PropagationStrategy {
Broadcast,
#[allow(dead_code)] Targeted(Vec<ThreadId>),
Parent,
SeverityBased,
#[allow(dead_code)] Custom(fn(&ThreadError) -> Vec<ThreadId>),
}
#[derive(Debug)]
pub struct ErrorPropagationPolicies {
track_error_history: bool,
max_error_history_size: usize,
preserve_stack_traces: bool,
#[allow(dead_code)] max_stack_trace_depth: usize,
enable_cross_thread_propagation: bool,
default_propagation_strategy: PropagationStrategy,
fatal_errors_shutdown_all: bool,
#[allow(dead_code)] propagation_timeout: Duration,
#[allow(dead_code)] enable_error_recovery: bool,
#[allow(dead_code)] max_errors_before_shutdown: usize,
}
impl ErrorPropagationCoordinator {
pub fn new() -> Self {
Self {
thread_error_contexts: Arc::new(RwLock::new(HashMap::new())),
error_channels: Arc::new(RwLock::new(HashMap::new())),
error_history: Arc::new(Mutex::new(VecDeque::new())),
policies: Arc::new(ErrorPropagationPolicies::default()),
sequence_counter: AtomicU64::new(0),
}
}
pub fn with_policies(policies: ErrorPropagationPolicies) -> Self {
Self {
thread_error_contexts: Arc::new(RwLock::new(HashMap::new())),
error_channels: Arc::new(RwLock::new(HashMap::new())),
error_history: Arc::new(Mutex::new(VecDeque::new())),
policies: Arc::new(policies),
sequence_counter: AtomicU64::new(0),
}
}
pub fn register_thread(&self, thread_id: ThreadId) {
let context = ThreadErrorContext {
thread_id,
error_stack: Vec::new(),
propagation_state: ErrorPropagationState::Normal,
last_error_time: None,
error_generation: 0,
};
{
let mut contexts = self.thread_error_contexts.write().unwrap();
contexts.insert(thread_id, context);
}
let (sender, receiver) = unbounded();
let channel = ErrorChannel { sender, receiver };
{
let mut channels = self.error_channels.write().unwrap();
channels.insert(thread_id, channel);
}
}
pub fn unregister_thread(&self, thread_id: ThreadId) {
{
let mut contexts = self.thread_error_contexts.write().unwrap();
contexts.remove(&thread_id);
}
{
let mut channels = self.error_channels.write().unwrap();
channels.remove(&thread_id);
}
}
pub fn report_error(
&self,
thread_id: ThreadId,
diagnostic_error: DiagnosticError,
context: Option<HashMap<String, String>>,
) -> Result<u64> {
let error_id = self.sequence_counter.fetch_add(1, Ordering::SeqCst) + 1;
let stack_trace = if self.policies.preserve_stack_traces {
self.capture_stack_trace(thread_id)?
} else {
Vec::new()
};
let severity = self.determine_error_severity(&diagnostic_error);
let category = self.determine_error_category(&diagnostic_error);
let thread_error = ThreadError {
id: error_id,
diagnostic_error,
originating_thread: thread_id,
stack_trace,
propagation_path: vec![thread_id],
occurred_at: SystemTime::now(),
severity,
category,
context: context.unwrap_or_default(),
};
{
let mut contexts = self.thread_error_contexts.write().unwrap();
if let Some(context) = contexts.get_mut(&thread_id) {
context.error_stack.push(thread_error.clone());
context.propagation_state = ErrorPropagationState::ErrorOccurred;
context.last_error_time = Some(SystemTime::now());
context.error_generation += 1;
}
}
if self.policies.track_error_history {
self.record_error_event(ErrorEventType::ErrorOccurred, thread_id, thread_error.clone(), None);
}
if self.policies.enable_cross_thread_propagation && self.should_propagate_error(&thread_error) {
self.initiate_error_propagation(thread_error.clone())?;
}
if thread_error.severity == ErrorSeverity::Fatal && self.policies.fatal_errors_shutdown_all {
self.initiate_fatal_error_shutdown(thread_error)?;
}
Ok(error_id)
}
fn initiate_error_propagation(&self, error: ThreadError) -> Result<()> {
let target_threads = self.determine_propagation_targets(&error);
if target_threads.is_empty() {
return Ok(());
}
{
let mut contexts = self.thread_error_contexts.write().unwrap();
if let Some(context) = contexts.get_mut(&error.originating_thread) {
context.propagation_state = ErrorPropagationState::PropagatingError;
}
}
{
let channels = self.error_channels.read().unwrap();
let message = ErrorPropagationMessage::PropagateError {
error: error.clone(),
target_threads: target_threads.clone(),
propagation_strategy: self.policies.default_propagation_strategy.clone(),
};
for &target_thread in &target_threads {
if let Some(channel) = channels.get(&target_thread) {
if channel.sender.try_send(message.clone()).is_err() {
eprintln!("Warning: Could not propagate error to thread {target_thread:?}");
}
}
}
}
if self.policies.track_error_history {
self.record_error_event(
ErrorEventType::ErrorPropagating,
error.originating_thread,
error,
Some(format!("Propagating to {} threads", target_threads.len())),
);
}
Ok(())
}
fn initiate_fatal_error_shutdown(&self, error: ThreadError) -> Result<()> {
let message = ErrorPropagationMessage::FatalErrorShutdown {
error: error.clone(),
message: format!("Fatal error in thread {:?}: {}", error.originating_thread, error.diagnostic_error),
};
{
let channels = self.error_channels.read().unwrap();
for channel in channels.values() {
let _ = channel.sender.try_send(message.clone());
}
}
if self.policies.track_error_history {
self.record_error_event(
ErrorEventType::ThreadShutdown,
error.originating_thread,
error,
Some("Fatal error initiated shutdown".to_string()),
);
}
Ok(())
}
fn capture_stack_trace(&self, thread_id: ThreadId) -> Result<Vec<StackFrame>> {
let frame = StackFrame {
function_name: Some("unknown".to_string()),
file_info: None,
line_number: None,
column_number: None,
span: None,
thread_id,
context: Some("Stack trace capture".to_string()),
};
Ok(vec![frame])
}
fn should_propagate_error(&self, error: &ThreadError) -> bool {
match error.severity {
ErrorSeverity::Critical | ErrorSeverity::Fatal => true,
ErrorSeverity::Error => error.category == ErrorCategory::Concurrency,
_ => false,
}
}
fn determine_propagation_targets(&self, error: &ThreadError) -> Vec<ThreadId> {
match &self.policies.default_propagation_strategy {
PropagationStrategy::Broadcast => {
let contexts = self.thread_error_contexts.read().unwrap();
contexts.keys().filter(|&&tid| tid != error.originating_thread).copied().collect()
}
PropagationStrategy::Targeted(threads) => threads.clone(),
PropagationStrategy::Parent => {
Vec::new()
}
PropagationStrategy::SeverityBased => {
if error.severity >= ErrorSeverity::Critical {
let contexts = self.thread_error_contexts.read().unwrap();
contexts.keys().filter(|&&tid| tid != error.originating_thread).copied().collect()
} else {
Vec::new()
}
}
PropagationStrategy::Custom(_) => {
Vec::new()
}
}
}
fn determine_error_severity(&self, error: &DiagnosticError) -> ErrorSeverity {
let message = error.to_string().to_lowercase();
if message.contains("fatal") || message.contains("panic") {
ErrorSeverity::Fatal
} else if message.contains("critical") || message.contains("thread") {
ErrorSeverity::Critical
} else if message.contains("warning") {
ErrorSeverity::Warning
} else {
ErrorSeverity::Error
}
}
fn determine_error_category(&self, error: &DiagnosticError) -> ErrorCategory {
let message = error.to_string().to_lowercase();
if message.contains("syntax") || message.contains("parse") {
ErrorCategory::Syntax
} else if message.contains("type") {
ErrorCategory::Type
} else if message.contains("io") || message.contains("file") {
ErrorCategory::IO
} else if message.contains("effect") {
ErrorCategory::Effect
} else if message.contains("memory") || message.contains("resource") {
ErrorCategory::Resource
} else if message.contains("thread") || message.contains("concurrency") {
ErrorCategory::Concurrency
} else if message.contains("system") {
ErrorCategory::System
} else {
ErrorCategory::Runtime
}
}
fn record_error_event(
&self,
event_type: ErrorEventType,
thread_id: ThreadId,
error: ThreadError,
context: Option<String>,
) {
let event = ErrorEvent {
sequence: self.sequence_counter.fetch_add(1, Ordering::SeqCst) + 1,
event_type,
thread_id,
error,
timestamp: SystemTime::now(),
context,
};
let mut history = self.error_history.lock().unwrap();
history.push_back(event);
if history.len() > self.policies.max_error_history_size {
history.pop_front();
}
}
pub fn get_error_statistics(&self) -> ErrorStatistics {
let contexts = self.thread_error_contexts.read().unwrap();
let history = self.error_history.lock().unwrap();
let mut stats = ErrorStatistics {
active_threads: contexts.len(),
total_errors: 0,
errors_by_severity: HashMap::new(),
errors_by_category: HashMap::new(),
threads_with_errors: 0,
recent_errors: 0,
propagated_errors: 0,
};
for context in contexts.values() {
if !context.error_stack.is_empty() {
stats.threads_with_errors += 1;
stats.total_errors += context.error_stack.len();
for error in &context.error_stack {
*stats.errors_by_severity.entry(error.severity.clone()).or_insert(0) += 1;
*stats.errors_by_category.entry(error.category.clone()).or_insert(0) += 1;
}
}
}
let one_minute_ago = SystemTime::now() - Duration::from_secs(60);
stats.recent_errors = history
.iter()
.filter(|event| event.timestamp > one_minute_ago)
.count();
stats.propagated_errors = history
.iter()
.filter(|event| matches!(event.event_type, ErrorEventType::ErrorPropagating))
.count();
stats
}
pub fn get_error_history(&self) -> Vec<ErrorEvent> {
let history = self.error_history.lock().unwrap();
history.iter().cloned().collect()
}
pub fn get_thread_error_context(&self, thread_id: ThreadId) -> Option<ThreadErrorContext> {
let contexts = self.thread_error_contexts.read().unwrap();
contexts.get(&thread_id).cloned()
}
pub fn clear_error_history(&self) {
let mut history = self.error_history.lock().unwrap();
history.clear();
}
pub fn clear_thread_errors(&self, thread_id: ThreadId) -> Result<()> {
let mut contexts = self.thread_error_contexts.write().unwrap();
if let Some(context) = contexts.get_mut(&thread_id) {
context.error_stack.clear();
context.propagation_state = ErrorPropagationState::Normal;
context.last_error_time = None;
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct ErrorStatistics {
pub active_threads: usize,
pub total_errors: usize,
pub errors_by_severity: HashMap<ErrorSeverity, usize>,
pub errors_by_category: HashMap<ErrorCategory, usize>,
pub threads_with_errors: usize,
pub recent_errors: usize,
pub propagated_errors: usize,
}
impl Default for ErrorPropagationPolicies {
fn default() -> Self {
Self {
track_error_history: true,
max_error_history_size: 1000,
preserve_stack_traces: true,
max_stack_trace_depth: 50,
enable_cross_thread_propagation: true,
default_propagation_strategy: PropagationStrategy::SeverityBased,
fatal_errors_shutdown_all: true,
propagation_timeout: Duration::from_secs(5),
enable_error_recovery: true,
max_errors_before_shutdown: 10,
}
}
}
impl ErrorPropagationPolicies {
pub fn minimal() -> Self {
Self {
track_error_history: false,
max_error_history_size: 100,
preserve_stack_traces: false,
max_stack_trace_depth: 10,
enable_cross_thread_propagation: false,
default_propagation_strategy: PropagationStrategy::Parent,
fatal_errors_shutdown_all: false,
propagation_timeout: Duration::from_millis(100),
enable_error_recovery: false,
max_errors_before_shutdown: 5,
}
}
pub fn debug() -> Self {
Self {
track_error_history: true,
max_error_history_size: 5000,
preserve_stack_traces: true,
max_stack_trace_depth: 100,
enable_cross_thread_propagation: true,
default_propagation_strategy: PropagationStrategy::Broadcast,
fatal_errors_shutdown_all: true,
propagation_timeout: Duration::from_secs(30),
enable_error_recovery: true,
max_errors_before_shutdown: 100,
}
}
}
impl Clone for ErrorChannel {
fn clone(&self) -> Self {
let (sender, receiver) = unbounded();
Self { sender, receiver }
}
}
impl Default for ErrorPropagationCoordinator {
fn default() -> Self {
Self::new()
}
}
impl fmt::Display for ErrorSeverity {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ErrorSeverity::Info => write!(f, "INFO"),
ErrorSeverity::Warning => write!(f, "WARNING"),
ErrorSeverity::Error => write!(f, "ERROR"),
ErrorSeverity::Critical => write!(f, "CRITICAL"),
ErrorSeverity::Fatal => write!(f, "FATAL"),
}
}
}
impl fmt::Display for ErrorCategory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ErrorCategory::Syntax => write!(f, "Syntax"),
ErrorCategory::Type => write!(f, "Type"),
ErrorCategory::Runtime => write!(f, "Runtime"),
ErrorCategory::IO => write!(f, "IO"),
ErrorCategory::Effect => write!(f, "Effect"),
ErrorCategory::Resource => write!(f, "Resource"),
ErrorCategory::Concurrency => write!(f, "Concurrency"),
ErrorCategory::System => write!(f, "System"),
ErrorCategory::User => write!(f, "User"),
}
}
}