use crate::utils::{GcIntegration, GcIntegrationConfig};
use crate::utils::gc::{ObjectId, gc_alloc, GcObject, GenerationId};
use crate::eval::{
Value, ThreadSafeEnvironment, Continuation, Frame, StackTrace, StackFrame, FrameType
};
use crate::ast::{Expr};
use crate::diagnostics::{Result, Error, Span, Spanned};
use std::sync::{Arc, RwLock, Mutex, atomic::{AtomicBool, AtomicU64, AtomicU32, Ordering}};
use std::collections::HashMap;
use std::time::Instant;
#[derive(Debug)]
pub struct GcContinuationManager {
gc_integration: Arc<GcIntegration>,
continuation_registry: RwLock<HashMap<u64, ContinuationEntry>>,
stack_trace_manager: Arc<StackTraceManager>,
config: GcContinuationConfig,
next_continuation_id: AtomicU64,
}
#[derive(Debug, Clone)]
pub struct GcContinuationConfig {
pub track_continuations: bool,
pub preserve_stack_traces: bool,
pub optimize_continuations: bool,
pub max_stack_depth: usize,
pub enable_debugging: bool,
}
#[derive(Debug)]
pub struct ContinuationEntry {
pub continuation: Arc<Continuation>,
pub gc_object_id: Option<ObjectId>,
pub created_at: Instant,
pub creation_stack_trace: Option<StackTrace>,
pub environment_info: EnvironmentCaptureInfo,
pub active: AtomicBool,
}
#[derive(Debug, Clone)]
pub struct EnvironmentCaptureInfo {
pub main_environment: Arc<ThreadSafeEnvironment>,
pub environment_chain: Vec<Arc<ThreadSafeEnvironment>>,
pub binding_snapshot: HashMap<String, Value>,
pub captured_at: Instant,
}
#[derive(Debug)]
pub struct StackTraceManager {
preserved_traces: RwLock<HashMap<u64, PreservedStackTrace>>,
frame_pool: RwLock<Vec<StackFrame>>,
config: StackTraceConfig,
}
#[derive(Debug, Clone)]
pub struct StackTraceConfig {
pub max_preserved_traces: usize,
pub max_frames_per_trace: usize,
pub compress_old_traces: bool,
}
#[derive(Debug, Clone)]
pub struct PreservedStackTrace {
pub trace: StackTrace,
pub preserved_at: Instant,
pub compressed: bool,
pub gc_object_id: Option<ObjectId>,
}
#[derive(Debug)]
pub struct ContinuationGcWrapper {
continuation_data: ContinuationData,
generation: AtomicU32,
marked: AtomicBool,
}
#[derive(Debug, Clone)]
pub struct ContinuationData {
pub id: u64,
pub frames: Vec<Frame>,
pub environment: Arc<ThreadSafeEnvironment>,
pub current_expr: Option<Spanned<Expr>>,
pub capture_info: CaptureMetadata,
}
#[derive(Debug, Clone)]
pub struct CaptureMetadata {
pub captured_at: Instant,
pub stack_depth: usize,
pub bindings_captured: usize,
pub estimated_memory: usize,
}
impl GcContinuationManager {
pub fn new(
gc_integration: Arc<GcIntegration>,
config: GcContinuationConfig,
) -> Self {
let stack_trace_manager = Arc::new(StackTraceManager::new(
StackTraceConfig::default()
));
Self {
gc_integration,
continuation_registry: RwLock::new(HashMap::new()),
stack_trace_manager,
config,
next_continuation_id: AtomicU64::new(1),
}
}
pub fn with_default_config(gc_integration: Arc<GcIntegration>) -> Self {
Self::new(gc_integration, GcContinuationConfig::default())
}
pub fn capture_continuation(
&self,
frames: Vec<Frame>,
environment: Arc<ThreadSafeEnvironment>,
current_expr: Option<Spanned<Expr>>,
current_stack_trace: Option<StackTrace>,
) -> Result<Arc<Continuation>> {
let continuation_id = self.next_continuation_id.fetch_add(1, Ordering::SeqCst);
let environment_info = self.capture_environment_info(&environment)?;
let capture_metadata = CaptureMetadata {
captured_at: Instant::now(),
stack_depth: frames.len(),
bindings_captured: environment_info.binding_snapshot.len(),
estimated_memory: self.estimate_continuation_memory(&frames, &environment),
};
let continuation_data = ContinuationData {
id: continuation_id,
frames: frames.clone(),
environment: environment.clone(),
current_expr: current_expr.clone(),
capture_info: capture_metadata,
};
let continuation = Arc::new(Continuation::new(
frames,
environment,
continuation_id,
current_expr,
));
let gc_object_id = if self.config.track_continuations {
let wrapper = ContinuationGcWrapper::new(continuation_data);
let gc_ptr = gc_alloc(wrapper);
let object_id = gc_ptr.id();
self.gc_integration.register_continuation_root(object_id);
Some(object_id)
} else {
None
};
if self.config.preserve_stack_traces {
if let Some(ref trace) = current_stack_trace {
self.stack_trace_manager.preserve_trace(continuation_id, trace.clone())?;
}
}
let entry = ContinuationEntry {
continuation: continuation.clone(),
gc_object_id,
created_at: Instant::now(),
creation_stack_trace: current_stack_trace,
environment_info,
active: AtomicBool::new(true),
};
if let Ok(mut registry) = self.continuation_registry.write() {
registry.insert(continuation_id, entry);
}
Ok(continuation)
}
pub fn invoke_continuation(
&self,
continuation: &Arc<Continuation>,
value: Value,
) -> Result<Value> {
let continuation_id = continuation.id;
if let Ok(registry) = self.continuation_registry.read() {
if let Some(entry) = registry.get(&continuation_id) {
if !entry.active.load(Ordering::SeqCst) {
return Err(Box::new(Error::runtime_error(
"Attempt to invoke inactive continuation".to_string(),
None,
)));
}
}
}
if continuation.mark_invoked() {
return Err(Box::new(Error::runtime_error(
"Continuation can only be invoked once".to_string(),
None,
)));
}
if self.config.preserve_stack_traces {
if let Some(preserved_trace) = self.stack_trace_manager.get_preserved_trace(continuation_id) {
}
}
if let Ok(registry) = self.continuation_registry.read() {
if let Some(entry) = registry.get(&continuation_id) {
entry.active.store(false, Ordering::SeqCst);
}
}
Ok(value)
}
pub fn cleanup_continuation(&self, continuation_id: u64) -> Result<()> {
if let Ok(mut registry) = self.continuation_registry.write() {
if let Some(entry) = registry.remove(&continuation_id) {
if let Some(gc_object_id) = entry.gc_object_id {
self.gc_integration.unregister_continuation_root(gc_object_id);
}
if self.config.preserve_stack_traces {
self.stack_trace_manager.remove_preserved_trace(continuation_id);
}
}
}
Ok(())
}
fn capture_environment_info(
&self,
env: &Arc<ThreadSafeEnvironment>,
) -> Result<EnvironmentCaptureInfo> {
let mut environment_chain = Vec::new();
let mut binding_snapshot = HashMap::new();
let mut current_env = Some(env.clone());
while let Some(env) = current_env {
environment_chain.push(env.clone());
let var_names = env.all_variable_names();
for var_name in var_names {
if let Some(value) = env.lookup(&var_name) {
binding_snapshot.insert(var_name, value);
}
}
current_env = env.parent().cloned();
}
Ok(EnvironmentCaptureInfo {
main_environment: env.clone(),
environment_chain,
binding_snapshot,
captured_at: Instant::now(),
})
}
fn estimate_continuation_memory(
&self,
frames: &[Frame],
environment: &Arc<ThreadSafeEnvironment>,
) -> usize {
let frame_size = frames.len() * 256; let env_size = environment.all_variable_names().len() * 64; frame_size + env_size + 512 }
pub fn get_continuation_statistics(&self) -> ContinuationStatistics {
let active_count = if let Ok(registry) = self.continuation_registry.read() {
registry.values()
.filter(|entry| entry.active.load(Ordering::SeqCst))
.count()
} else {
0
};
let total_count = if let Ok(registry) = self.continuation_registry.read() {
registry.len()
} else {
0
};
let preserved_traces = self.stack_trace_manager.preserved_trace_count();
ContinuationStatistics {
active_continuations: active_count,
total_continuations: total_count,
preserved_stack_traces: preserved_traces,
gc_roots_tracked: self.count_gc_tracked_continuations(),
}
}
fn count_gc_tracked_continuations(&self) -> usize {
if let Ok(registry) = self.continuation_registry.read() {
registry.values()
.filter(|entry| entry.gc_object_id.is_some())
.count()
} else {
0
}
}
pub fn config(&self) -> &GcContinuationConfig {
&self.config
}
}
impl StackTraceManager {
pub fn new(config: StackTraceConfig) -> Self {
Self {
preserved_traces: RwLock::new(HashMap::new()),
frame_pool: RwLock::new(Vec::new()),
config,
}
}
pub fn preserve_trace(&self, continuation_id: u64, trace: StackTrace) -> Result<()> {
let preserved_trace = PreservedStackTrace {
trace,
preserved_at: Instant::now(),
compressed: false,
gc_object_id: None,
};
if let Ok(mut traces) = self.preserved_traces.write() {
traces.insert(continuation_id, preserved_trace);
if traces.len() > self.config.max_preserved_traces {
self.cleanup_old_traces(&mut traces);
}
}
Ok(())
}
pub fn get_preserved_trace(&self, continuation_id: u64) -> Option<PreservedStackTrace> {
if let Ok(traces) = self.preserved_traces.read() {
traces.get(&continuation_id).cloned()
} else {
None
}
}
pub fn remove_preserved_trace(&self, continuation_id: u64) {
if let Ok(mut traces) = self.preserved_traces.write() {
traces.remove(&continuation_id);
}
}
pub fn preserved_trace_count(&self) -> usize {
if let Ok(traces) = self.preserved_traces.read() {
traces.len()
} else {
0
}
}
fn cleanup_old_traces(&self, traces: &mut HashMap<u64, PreservedStackTrace>) {
if traces.len() <= self.config.max_preserved_traces {
return;
}
let mut entries: Vec<_> = traces.iter().map(|(k, v)| (*k, v.preserved_at)).collect();
entries.sort_by_key(|(_, time)| *time);
let to_remove = traces.len() - self.config.max_preserved_traces;
let ids_to_remove: Vec<_> = entries.iter().take(to_remove).map(|(id, _)| *id).collect();
for id in ids_to_remove {
traces.remove(&id);
}
}
}
impl ContinuationGcWrapper {
pub fn new(data: ContinuationData) -> Self {
Self {
continuation_data: data,
generation: AtomicU32::new(0),
marked: AtomicBool::new(false),
}
}
pub fn data(&self) -> &ContinuationData {
&self.continuation_data
}
}
impl GcObject for ContinuationGcWrapper {
fn generation(&self) -> GenerationId {
self.generation.load(Ordering::Relaxed)
}
fn set_generation(&mut self, generation: GenerationId) {
self.generation.store(generation, Ordering::Relaxed);
}
fn references(&self) -> Vec<crate::utils::gc::GcPtr> {
Vec::new()
}
fn mark(&self) {
self.marked.store(true, Ordering::Relaxed);
}
fn is_marked(&self) -> bool {
self.marked.load(Ordering::Relaxed)
}
fn clear_mark(&self) {
self.marked.store(false, Ordering::Relaxed);
}
fn size_hint(&self) -> usize {
let base_size = std::mem::size_of::<ContinuationData>();
let frame_size = self.continuation_data.frames.len() * 256;
let env_size = 512; base_size + frame_size + env_size
}
}
#[derive(Debug, Clone)]
pub struct ContinuationStatistics {
pub active_continuations: usize,
pub total_continuations: usize,
pub preserved_stack_traces: usize,
pub gc_roots_tracked: usize,
}
impl Default for GcContinuationConfig {
fn default() -> Self {
Self {
track_continuations: true,
preserve_stack_traces: true,
optimize_continuations: true,
max_stack_depth: 1000,
enable_debugging: false,
}
}
}
impl Default for StackTraceConfig {
fn default() -> Self {
Self {
max_preserved_traces: 100,
max_frames_per_trace: 50,
compress_old_traces: true,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::GcIntegration;
use crate::eval::value::{ThreadSafeEnvironment, Value, Frame};
use crate::diagnostics::Span;
#[test]
fn test_continuation_manager_creation() {
let gc_integration = Arc::new(GcIntegration::with_default_config());
let manager = GcContinuationManager::with_default_config(gc_integration);
let stats = manager.get_continuation_statistics();
assert_eq!(stats.active_continuations, 0);
assert_eq!(stats.total_continuations, 0);
}
#[test]
fn test_continuation_capture() {
let gc_integration = Arc::new(GcIntegration::with_default_config());
let manager = GcContinuationManager::with_default_config(gc_integration);
let env = Arc::new(ThreadSafeEnvironment::new(None, 0));
let frames = Vec::new();
let result = manager.capture_continuation(frames, env, None, None);
assert!(result.is_ok());
let stats = manager.get_continuation_statistics();
assert_eq!(stats.active_continuations, 1);
assert_eq!(stats.total_continuations, 1);
}
#[test]
fn test_stack_trace_preservation() {
let gc_integration = Arc::new(GcIntegration::with_default_config());
let manager = GcContinuationManager::with_default_config(gc_integration);
let mut stack_trace = StackTrace::new();
stack_trace.push(StackFrame::top_level(None));
let env = Arc::new(ThreadSafeEnvironment::new(None, 0));
let frames = Vec::new();
let result = manager.capture_continuation(frames, env, None, Some(stack_trace));
assert!(result.is_ok());
let stats = manager.get_continuation_statistics();
assert_eq!(stats.preserved_stack_traces, 1);
}
#[test]
fn test_continuation_cleanup() {
let gc_integration = Arc::new(GcIntegration::with_default_config());
let manager = GcContinuationManager::with_default_config(gc_integration);
let env = Arc::new(ThreadSafeEnvironment::new(None, 0));
let frames = Vec::new();
let continuation = manager.capture_continuation(frames, env, None, None).unwrap();
let continuation_id = continuation.id;
let cleanup_result = manager.cleanup_continuation(continuation_id);
assert!(cleanup_result.is_ok());
let stats = manager.get_continuation_statistics();
assert_eq!(stats.active_continuations, 0);
}
}