use crate::eval::{Value, ThreadSafeEnvironment, Generation};
use crate::diagnostics::Result;
use std::sync::{Arc, RwLock};
use std::thread::ThreadId;
use std::collections::HashMap;
use std::time::{SystemTime, Duration};
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug)]
pub struct GlobalEnvironmentManager {
root_environment: Arc<ThreadSafeEnvironment>,
thread_local_envs: Arc<RwLock<HashMap<ThreadId, Arc<ThreadSafeEnvironment>>>>,
global_definitions: Arc<RwLock<HashMap<String, Value>>>,
global_generation: Arc<AtomicU64>,
transaction_manager: Arc<TransactionManager>,
snapshot_manager: Arc<StateSnapshotManager>,
}
#[derive(Debug)]
pub struct TransactionManager {
active_transactions: Arc<RwLock<HashMap<u64, StateTransaction>>>,
transaction_sequence: AtomicU64,
default_timeout: Duration,
}
#[derive(Debug)]
pub struct StateSnapshotManager {
environment_snapshots: Arc<RwLock<HashMap<Generation, EnvironmentSnapshot>>>,
max_snapshots: usize,
snapshot_policy: SnapshotPolicy,
}
#[derive(Debug, Clone)]
pub struct StateTransaction {
#[allow(dead_code)] pub id: u64,
pub initiator_thread: ThreadId,
#[allow(dead_code)] pub participating_threads: Vec<ThreadId>,
pub state: TransactionState,
pub changes: Vec<StateChange>,
pub snapshot_generation: Generation,
#[allow(dead_code)] pub created_at: SystemTime,
#[allow(dead_code)] pub timeout: Duration,
}
#[derive(Debug, Clone, PartialEq)]
pub enum TransactionState {
#[allow(dead_code)] Preparing,
Active,
#[allow(dead_code)] Committing,
Committed,
#[allow(dead_code)] Aborting,
Aborted,
#[allow(dead_code)] RolledBack,
}
#[derive(Debug, Clone)]
pub struct StateChange {
pub change_type: ChangeType,
pub variable_name: String,
pub old_value: Option<Value>,
#[allow(dead_code)] pub new_value: Option<Value>,
pub generation: Generation,
#[allow(dead_code)] pub thread_id: ThreadId,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ChangeType {
Define,
Update,
#[allow(dead_code)] Delete,
}
#[derive(Debug, Clone)]
pub struct EnvironmentSnapshot {
pub generation: Generation,
pub global_definitions: HashMap<String, Value>,
#[allow(dead_code)] pub thread_local_envs: HashMap<ThreadId, HashMap<String, Value>>,
#[allow(dead_code)] pub created_at: SystemTime,
}
#[derive(Debug, Clone)]
pub enum SnapshotPolicy {
#[allow(dead_code)] EveryGeneration,
EveryNGenerations(u64),
#[allow(dead_code)] OnDemand,
#[allow(dead_code)] BeforeTransaction,
}
impl GlobalEnvironmentManager {
pub fn new() -> Self {
let root_environment = Self::create_root_environment();
Self {
root_environment,
thread_local_envs: Arc::new(RwLock::new(HashMap::new())),
global_definitions: Arc::new(RwLock::new(HashMap::new())),
global_generation: Arc::new(AtomicU64::new(0)),
transaction_manager: Arc::new(TransactionManager::new()),
snapshot_manager: Arc::new(StateSnapshotManager::new()),
}
}
fn create_root_environment() -> Arc<ThreadSafeEnvironment> {
let env = Arc::new(ThreadSafeEnvironment::new(None, 0));
env.define("true".to_string(), Value::t());
env.define("false".to_string(), Value::f());
env.define("null".to_string(), Value::Nil);
let stdlib = crate::stdlib::StandardLibrary::new();
stdlib.populate_environment(&env);
env
}
pub fn create_thread_local_env(&self, thread_id: ThreadId) -> Arc<ThreadSafeEnvironment> {
let generation = self.next_generation();
let local_env = self.root_environment.extend(generation);
{
let mut thread_envs = self.thread_local_envs.write().unwrap();
thread_envs.insert(thread_id, local_env.clone());
}
local_env
}
pub fn get_thread_local_env(&self, thread_id: ThreadId) -> Option<Arc<ThreadSafeEnvironment>> {
let thread_envs = self.thread_local_envs.read().unwrap();
thread_envs.get(&thread_id).cloned()
}
pub fn define_global(&self, name: String, value: Value) -> Result<()> {
let thread_id = std::thread::current().id();
self.define_global_transactional(name, value, thread_id)
}
pub fn define_global_transactional(
&self,
name: String,
value: Value,
thread_id: ThreadId
) -> Result<()> {
let transaction_id = self.transaction_manager.get_or_start_transaction(thread_id)?;
if self.snapshot_manager.should_create_snapshot() {
self.create_environment_snapshot()?;
}
let old_value = {
let globals = self.global_definitions.read().unwrap();
globals.get(&name).cloned()
};
{
let mut globals = self.global_definitions.write().unwrap();
globals.insert(name.clone(), value.clone());
}
let change = StateChange {
change_type: if old_value.is_some() { ChangeType::Update } else { ChangeType::Define },
variable_name: name,
old_value,
new_value: Some(value),
generation: self.current_generation(),
thread_id,
};
self.transaction_manager.add_change(transaction_id, change)?;
Ok(())
}
pub fn lookup_global(&self, name: &str) -> Option<Value> {
{
let globals = self.global_definitions.read().unwrap();
if let Some(value) = globals.get(name) {
return Some(value.clone());
}
}
self.root_environment.lookup(name)
}
pub fn root_environment(&self) -> Arc<ThreadSafeEnvironment> {
self.root_environment.clone()
}
pub fn current_generation(&self) -> Generation {
self.global_generation.load(Ordering::SeqCst)
}
pub fn next_generation(&self) -> Generation {
let new_gen = self.global_generation.fetch_add(1, Ordering::SeqCst) + 1;
if self.snapshot_manager.should_create_snapshot_for_generation(new_gen) {
let _ = self.create_environment_snapshot();
}
new_gen
}
pub fn remove_thread_local_env(&self, thread_id: ThreadId) {
let mut thread_envs = self.thread_local_envs.write().unwrap();
thread_envs.remove(&thread_id);
}
pub fn active_thread_count(&self) -> usize {
let thread_envs = self.thread_local_envs.read().unwrap();
thread_envs.len()
}
pub fn global_variable_names(&self) -> Vec<String> {
let globals = self.global_definitions.read().unwrap();
globals.keys().cloned().collect()
}
pub fn global_variables_snapshot(&self) -> HashMap<String, Value> {
let globals = self.global_definitions.read().unwrap();
globals.clone()
}
pub fn clear_global_definitions(&self) {
let mut globals = self.global_definitions.write().unwrap();
globals.clear();
let _ = self.create_environment_snapshot();
}
pub fn start_transaction(&self, thread_id: ThreadId) -> Result<u64> {
self.transaction_manager.start_transaction(thread_id, Vec::new())
}
pub fn commit_transaction(&self, transaction_id: u64) -> Result<()> {
self.transaction_manager.commit_transaction(transaction_id)
}
pub fn abort_transaction(&self, transaction_id: u64) -> Result<()> {
let transaction = self.transaction_manager.get_transaction(transaction_id)?;
for change in transaction.changes.iter().rev() {
self.rollback_change(change)?;
}
self.transaction_manager.abort_transaction(transaction_id)
}
pub fn rollback_to_generation(&self, target_generation: Generation) -> Result<()> {
self.snapshot_manager.rollback_to_generation(target_generation, self)
}
pub fn create_environment_snapshot(&self) -> Result<Generation> {
let generation = self.current_generation();
let global_definitions = {
let globals = self.global_definitions.read().unwrap();
globals.clone()
};
let thread_local_envs = {
let envs = self.thread_local_envs.read().unwrap();
let mut snapshot_envs = HashMap::new();
for (thread_id, _env) in envs.iter() {
snapshot_envs.insert(*thread_id, HashMap::new());
}
snapshot_envs
};
let snapshot = EnvironmentSnapshot {
generation,
global_definitions,
thread_local_envs,
created_at: SystemTime::now(),
};
self.snapshot_manager.store_snapshot(snapshot);
Ok(generation)
}
fn rollback_change(&self, change: &StateChange) -> Result<()> {
match change.change_type {
ChangeType::Define => {
let mut globals = self.global_definitions.write().unwrap();
globals.remove(&change.variable_name);
}
ChangeType::Update => {
if let Some(ref old_value) = change.old_value {
let mut globals = self.global_definitions.write().unwrap();
globals.insert(change.variable_name.clone(), old_value.clone());
} else {
let mut globals = self.global_definitions.write().unwrap();
globals.remove(&change.variable_name);
}
}
ChangeType::Delete => {
if let Some(ref old_value) = change.old_value {
let mut globals = self.global_definitions.write().unwrap();
globals.insert(change.variable_name.clone(), old_value.clone());
}
}
}
Ok(())
}
}
impl Default for GlobalEnvironmentManager {
fn default() -> Self {
Self::new()
}
}
impl Clone for GlobalEnvironmentManager {
fn clone(&self) -> Self {
Self {
root_environment: self.root_environment.clone(),
thread_local_envs: self.thread_local_envs.clone(),
global_definitions: self.global_definitions.clone(),
global_generation: self.global_generation.clone(),
transaction_manager: self.transaction_manager.clone(),
snapshot_manager: self.snapshot_manager.clone(),
}
}
}
impl TransactionManager {
pub fn new() -> Self {
Self {
active_transactions: Arc::new(RwLock::new(HashMap::new())),
transaction_sequence: AtomicU64::new(0),
default_timeout: Duration::from_secs(30),
}
}
pub fn start_transaction(
&self,
initiator: ThreadId,
participants: Vec<ThreadId>,
) -> Result<u64> {
let id = self.transaction_sequence.fetch_add(1, Ordering::SeqCst);
let transaction = StateTransaction {
id,
initiator_thread: initiator,
participating_threads: participants,
state: TransactionState::Active,
changes: Vec::new(),
snapshot_generation: 0, created_at: SystemTime::now(),
timeout: self.default_timeout,
};
let mut transactions = self.active_transactions.write().unwrap();
transactions.insert(id, transaction);
Ok(id)
}
pub fn get_or_start_transaction(&self, thread_id: ThreadId) -> Result<u64> {
{
let transactions = self.active_transactions.read().unwrap();
for (id, transaction) in transactions.iter() {
if transaction.initiator_thread == thread_id
&& matches!(transaction.state, TransactionState::Active) {
return Ok(*id);
}
}
}
self.start_transaction(thread_id, Vec::new())
}
pub fn get_transaction(&self, transaction_id: u64) -> Result<StateTransaction> {
let transactions = self.active_transactions.read().unwrap();
transactions.get(&transaction_id)
.cloned()
.ok_or_else(|| crate::diagnostics::Error::runtime_error(
format!("Transaction {transaction_id} not found"),
None
).boxed())
}
pub fn add_change(&self, transaction_id: u64, change: StateChange) -> Result<()> {
let mut transactions = self.active_transactions.write().unwrap();
if let Some(transaction) = transactions.get_mut(&transaction_id) {
if transaction.changes.is_empty() {
transaction.snapshot_generation = change.generation;
}
transaction.changes.push(change);
Ok(())
} else {
Err(crate::diagnostics::Error::runtime_error(
format!("Transaction {transaction_id} not found"),
None
).boxed())
}
}
pub fn commit_transaction(&self, transaction_id: u64) -> Result<()> {
let mut transactions = self.active_transactions.write().unwrap();
if let Some(transaction) = transactions.get_mut(&transaction_id) {
transaction.state = TransactionState::Committed;
Ok(())
} else {
Err(crate::diagnostics::Error::runtime_error(
format!("Transaction {transaction_id} not found"),
None
).boxed())
}
}
pub fn abort_transaction(&self, transaction_id: u64) -> Result<()> {
let mut transactions = self.active_transactions.write().unwrap();
if let Some(transaction) = transactions.get_mut(&transaction_id) {
transaction.state = TransactionState::Aborted;
Ok(())
} else {
Err(crate::diagnostics::Error::runtime_error(
format!("Transaction {transaction_id} not found"),
None
).boxed())
}
}
#[allow(dead_code)] pub fn cleanup_completed_transactions(&self) {
let mut transactions = self.active_transactions.write().unwrap();
transactions.retain(|_, transaction| {
!matches!(transaction.state, TransactionState::Committed | TransactionState::Aborted)
});
}
}
impl StateSnapshotManager {
pub fn new() -> Self {
Self {
environment_snapshots: Arc::new(RwLock::new(HashMap::new())),
max_snapshots: 100,
snapshot_policy: SnapshotPolicy::EveryNGenerations(10),
}
}
#[allow(dead_code)] pub fn with_policy(policy: SnapshotPolicy, max_snapshots: usize) -> Self {
Self {
environment_snapshots: Arc::new(RwLock::new(HashMap::new())),
max_snapshots,
snapshot_policy: policy,
}
}
pub fn should_create_snapshot(&self) -> bool {
matches!(self.snapshot_policy, SnapshotPolicy::BeforeTransaction)
}
pub fn should_create_snapshot_for_generation(&self, generation: Generation) -> bool {
match self.snapshot_policy {
SnapshotPolicy::EveryGeneration => true,
SnapshotPolicy::EveryNGenerations(n) => generation % n == 0,
SnapshotPolicy::OnDemand => false,
SnapshotPolicy::BeforeTransaction => false,
}
}
pub fn store_snapshot(&self, snapshot: EnvironmentSnapshot) {
let mut snapshots = self.environment_snapshots.write().unwrap();
snapshots.insert(snapshot.generation, snapshot);
if snapshots.len() > self.max_snapshots {
let oldest_generations: Vec<_> = {
let mut generations: Vec<_> = snapshots.keys().copied().collect();
generations.sort();
generations.into_iter().take(snapshots.len() - self.max_snapshots).collect()
};
for generation in oldest_generations {
snapshots.remove(&generation);
}
}
}
#[allow(dead_code)] pub fn get_snapshot(&self, generation: Generation) -> Option<EnvironmentSnapshot> {
let snapshots = self.environment_snapshots.read().unwrap();
snapshots.get(&generation).cloned()
}
pub fn find_snapshot_before(&self, generation: Generation) -> Option<EnvironmentSnapshot> {
let snapshots = self.environment_snapshots.read().unwrap();
let mut best_generation = None;
for &snap_generation in snapshots.keys() {
if snap_generation <= generation && (best_generation.is_none() || snap_generation > best_generation.unwrap()) {
best_generation = Some(snap_generation);
}
}
best_generation.and_then(|generation| snapshots.get(&generation).cloned())
}
pub fn rollback_to_generation(
&self,
target_generation: Generation,
env_manager: &GlobalEnvironmentManager,
) -> Result<()> {
let snapshot = self.find_snapshot_before(target_generation)
.ok_or_else(|| crate::diagnostics::Error::runtime_error(
format!("No snapshot found for generation {target_generation}"),
None
))?;
{
let mut globals = env_manager.global_definitions.write().unwrap();
globals.clear();
for (name, value) in snapshot.global_definitions {
globals.insert(name, value);
}
}
env_manager.global_generation.store(target_generation, Ordering::SeqCst);
Ok(())
}
#[allow(dead_code)] pub fn list_snapshots(&self) -> Vec<Generation> {
let snapshots = self.environment_snapshots.read().unwrap();
let mut generations: Vec<_> = snapshots.keys().copied().collect();
generations.sort();
generations
}
#[allow(dead_code)] pub fn clear_snapshots(&self) {
let mut snapshots = self.environment_snapshots.write().unwrap();
snapshots.clear();
}
}