use super::{NotebookCell, CellId, ScopeManager, NotebookError, NotebookResult};
use crate::api::{Yufmath, ComputeProgress, ProgressCallback};
use crate::core::Expression;
use crate::engine::ComputeError;
use crate::formatter::{FormatType};
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, Instant, SystemTime};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::fs;
use std::path::Path;
use serde::{Serialize, Deserialize};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ExecutionResult {
Success {
value: String,
format: FormatType,
execution_time: Duration,
},
Error {
error: String,
error_type: String,
execution_time: Duration,
},
Skipped,
Cancelled,
}
impl ExecutionResult {
pub fn is_success(&self) -> bool {
matches!(self, ExecutionResult::Success { .. })
}
pub fn is_error(&self) -> bool {
matches!(self, ExecutionResult::Error { .. })
}
pub fn execution_time(&self) -> Option<Duration> {
match self {
ExecutionResult::Success { execution_time, .. } => Some(*execution_time),
ExecutionResult::Error { execution_time, .. } => Some(*execution_time),
_ => None,
}
}
pub fn value(&self) -> Option<&str> {
match self {
ExecutionResult::Success { value, .. } => Some(value),
ExecutionResult::Error { error, .. } => Some(error),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct ExecutionContext {
pub cell_id: CellId,
pub start_time: Instant,
pub allow_cancellation: bool,
pub output_format: FormatType,
pub show_timing: bool,
pub max_execution_time: Option<Duration>,
}
impl ExecutionContext {
pub fn new(cell_id: CellId) -> Self {
Self {
cell_id,
start_time: Instant::now(),
allow_cancellation: true,
output_format: FormatType::Standard,
show_timing: true,
max_execution_time: Some(Duration::from_secs(60)), }
}
pub fn elapsed_time(&self) -> Duration {
self.start_time.elapsed()
}
pub fn is_timeout(&self) -> bool {
if let Some(max_time) = self.max_execution_time {
self.elapsed_time() > max_time
} else {
false
}
}
}
#[derive(Debug, Clone)]
pub struct ExecutionQueueItem {
pub cell_id: CellId,
pub priority: u32,
pub dependencies: Vec<CellId>,
pub queued_at: SystemTime,
pub is_incremental: bool,
pub estimated_duration: Option<Duration>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum TaskStatus {
Pending,
Running,
Completed,
Failed,
Cancelled,
Skipped,
}
#[derive(Debug, Clone)]
pub struct ExecutionTask {
pub id: Uuid,
pub cell_id: CellId,
pub status: TaskStatus,
pub started_at: Option<Instant>,
pub completed_at: Option<Instant>,
pub result: Option<ExecutionResult>,
pub error: Option<String>,
pub retry_count: u32,
pub max_retries: u32,
}
impl ExecutionTask {
pub fn new(cell_id: CellId) -> Self {
Self {
id: Uuid::new_v4(),
cell_id,
status: TaskStatus::Pending,
started_at: None,
completed_at: None,
result: None,
error: None,
retry_count: 0,
max_retries: 3,
}
}
pub fn start(&mut self) {
self.status = TaskStatus::Running;
self.started_at = Some(Instant::now());
}
pub fn complete(&mut self, result: ExecutionResult) {
self.status = TaskStatus::Completed;
self.completed_at = Some(Instant::now());
self.result = Some(result);
}
pub fn fail(&mut self, error: String) {
self.status = TaskStatus::Failed;
self.completed_at = Some(Instant::now());
self.error = Some(error);
}
pub fn cancel(&mut self) {
self.status = TaskStatus::Cancelled;
self.completed_at = Some(Instant::now());
}
pub fn skip(&mut self) {
self.status = TaskStatus::Skipped;
self.completed_at = Some(Instant::now());
}
pub fn can_retry(&self) -> bool {
self.retry_count < self.max_retries && matches!(self.status, TaskStatus::Failed)
}
pub fn retry(&mut self) {
if self.can_retry() {
self.retry_count += 1;
self.status = TaskStatus::Pending;
self.started_at = None;
self.completed_at = None;
self.result = None;
self.error = None;
}
}
pub fn execution_time(&self) -> Option<Duration> {
match (self.started_at, self.completed_at) {
(Some(start), Some(end)) => Some(end.duration_since(start)),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct DependencyNode {
pub cell_id: CellId,
pub dependencies: HashSet<CellId>,
pub dependents: HashSet<CellId>,
pub last_modified: SystemTime,
pub needs_execution: bool,
}
impl DependencyNode {
pub fn new(cell_id: CellId) -> Self {
Self {
cell_id,
dependencies: HashSet::new(),
dependents: HashSet::new(),
last_modified: SystemTime::now(),
needs_execution: true,
}
}
pub fn add_dependency(&mut self, dep_id: CellId) {
self.dependencies.insert(dep_id);
}
pub fn remove_dependency(&mut self, dep_id: &CellId) {
self.dependencies.remove(dep_id);
}
pub fn add_dependent(&mut self, dep_id: CellId) {
self.dependents.insert(dep_id);
}
pub fn remove_dependent(&mut self, dep_id: &CellId) {
self.dependents.remove(dep_id);
}
pub fn mark_modified(&mut self) {
self.last_modified = SystemTime::now();
self.needs_execution = true;
}
pub fn mark_executed(&mut self) {
self.needs_execution = false;
}
}
pub struct DependencyGraph {
nodes: HashMap<CellId, DependencyNode>,
topo_cache: Option<Vec<CellId>>,
cache_valid: bool,
}
impl DependencyGraph {
pub fn new() -> Self {
Self {
nodes: HashMap::new(),
topo_cache: None,
cache_valid: false,
}
}
pub fn add_node(&mut self, cell_id: CellId) {
if !self.nodes.contains_key(&cell_id) {
self.nodes.insert(cell_id, DependencyNode::new(cell_id));
self.invalidate_cache();
}
}
pub fn remove_node(&mut self, cell_id: &CellId) {
if let Some(node) = self.nodes.remove(cell_id) {
for dep_id in &node.dependencies {
if let Some(dep_node) = self.nodes.get_mut(dep_id) {
dep_node.remove_dependent(cell_id);
}
}
for dep_id in &node.dependents {
if let Some(dep_node) = self.nodes.get_mut(dep_id) {
dep_node.remove_dependency(cell_id);
}
}
self.invalidate_cache();
}
}
pub fn add_dependency(&mut self, cell_id: CellId, dep_id: CellId) {
self.add_node(cell_id);
self.add_node(dep_id);
if self.would_create_cycle(cell_id, dep_id) {
return; }
if let Some(node) = self.nodes.get_mut(&cell_id) {
node.add_dependency(dep_id);
}
if let Some(dep_node) = self.nodes.get_mut(&dep_id) {
dep_node.add_dependent(cell_id);
}
self.invalidate_cache();
}
pub fn remove_dependency(&mut self, cell_id: &CellId, dep_id: &CellId) {
if let Some(node) = self.nodes.get_mut(cell_id) {
node.remove_dependency(dep_id);
}
if let Some(dep_node) = self.nodes.get_mut(dep_id) {
dep_node.remove_dependent(cell_id);
}
self.invalidate_cache();
}
fn would_create_cycle(&self, from: CellId, to: CellId) -> bool {
if from == to {
return true;
}
let mut visited = HashSet::new();
let mut stack = vec![to];
while let Some(current) = stack.pop() {
if current == from {
return true;
}
if visited.contains(¤t) {
continue;
}
visited.insert(current);
if let Some(node) = self.nodes.get(¤t) {
for dep in &node.dependencies {
stack.push(*dep);
}
}
}
false
}
pub fn topological_sort(&mut self) -> Vec<CellId> {
if self.cache_valid && self.topo_cache.is_some() {
return self.topo_cache.as_ref().unwrap().clone();
}
let mut result = Vec::new();
let mut in_degree: HashMap<CellId, usize> = HashMap::new();
let mut queue = VecDeque::new();
for &cell_id in self.nodes.keys() {
in_degree.insert(cell_id, 0);
}
for node in self.nodes.values() {
for &dep_id in &node.dependencies {
*in_degree.entry(node.cell_id).or_insert(0) += 1;
}
}
for (&cell_id, °ree) in &in_degree {
if degree == 0 {
queue.push_back(cell_id);
}
}
while let Some(cell_id) = queue.pop_front() {
result.push(cell_id);
if let Some(node) = self.nodes.get(&cell_id) {
for &dependent_id in &node.dependents {
if let Some(degree) = in_degree.get_mut(&dependent_id) {
*degree -= 1;
if *degree == 0 {
queue.push_back(dependent_id);
}
}
}
}
}
self.topo_cache = Some(result.clone());
self.cache_valid = true;
result
}
fn dfs_topo_sort(
&self,
cell_id: CellId,
visited: &mut HashSet<CellId>,
temp_visited: &mut HashSet<CellId>,
result: &mut Vec<CellId>,
) {
if temp_visited.contains(&cell_id) {
return;
}
if visited.contains(&cell_id) {
return;
}
temp_visited.insert(cell_id);
if let Some(node) = self.nodes.get(&cell_id) {
for &dep_id in &node.dependencies {
self.dfs_topo_sort(dep_id, visited, temp_visited, result);
}
}
temp_visited.remove(&cell_id);
visited.insert(cell_id);
result.push(cell_id);
}
pub fn get_cells_to_execute(&mut self, modified_cells: &HashSet<CellId>) -> Vec<CellId> {
let mut to_execute = HashSet::new();
for &cell_id in modified_cells {
if let Some(node) = self.nodes.get_mut(&cell_id) {
node.mark_modified();
to_execute.insert(cell_id);
self.mark_dependents_for_execution(cell_id, &mut to_execute);
}
}
let topo_order = self.topological_sort();
topo_order.into_iter()
.filter(|id| to_execute.contains(id))
.collect()
}
fn mark_dependents_for_execution(&mut self, cell_id: CellId, to_execute: &mut HashSet<CellId>) {
if let Some(node) = self.nodes.get(&cell_id) {
let dependents = node.dependents.clone();
for dependent_id in dependents {
if !to_execute.contains(&dependent_id) {
to_execute.insert(dependent_id);
if let Some(dep_node) = self.nodes.get_mut(&dependent_id) {
dep_node.needs_execution = true;
}
self.mark_dependents_for_execution(dependent_id, to_execute);
}
}
}
}
pub fn mark_executed(&mut self, cell_id: &CellId) {
if let Some(node) = self.nodes.get_mut(cell_id) {
node.mark_executed();
}
}
pub fn get_dependencies(&self, cell_id: &CellId) -> Vec<CellId> {
self.nodes.get(cell_id)
.map(|node| node.dependencies.iter().cloned().collect())
.unwrap_or_default()
}
pub fn get_dependents(&self, cell_id: &CellId) -> Vec<CellId> {
self.nodes.get(cell_id)
.map(|node| node.dependents.iter().cloned().collect())
.unwrap_or_default()
}
pub fn clear(&mut self) {
self.nodes.clear();
self.invalidate_cache();
}
fn invalidate_cache(&mut self) {
self.cache_valid = false;
self.topo_cache = None;
}
pub fn statistics(&self) -> DependencyGraphStats {
let total_nodes = self.nodes.len();
let total_edges: usize = self.nodes.values()
.map(|node| node.dependencies.len())
.sum();
let nodes_needing_execution = self.nodes.values()
.filter(|node| node.needs_execution)
.count();
DependencyGraphStats {
total_nodes,
total_edges,
nodes_needing_execution,
has_cycles: self.has_cycles(),
}
}
fn has_cycles(&self) -> bool {
let mut visited = HashSet::new();
let mut rec_stack = HashSet::new();
for &cell_id in self.nodes.keys() {
if !visited.contains(&cell_id) {
if self.has_cycle_util(cell_id, &mut visited, &mut rec_stack) {
return true;
}
}
}
false
}
fn has_cycle_util(
&self,
cell_id: CellId,
visited: &mut HashSet<CellId>,
rec_stack: &mut HashSet<CellId>,
) -> bool {
visited.insert(cell_id);
rec_stack.insert(cell_id);
if let Some(node) = self.nodes.get(&cell_id) {
for &dep_id in &node.dependencies {
if !visited.contains(&dep_id) {
if self.has_cycle_util(dep_id, visited, rec_stack) {
return true;
}
} else if rec_stack.contains(&dep_id) {
return true;
}
}
}
rec_stack.remove(&cell_id);
false
}
}
impl Default for DependencyGraph {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct DependencyGraphStats {
pub total_nodes: usize,
pub total_edges: usize,
pub nodes_needing_execution: usize,
pub has_cycles: bool,
}
pub struct ExecutionQueue {
queue: VecDeque<ExecutionQueueItem>,
executing: HashMap<CellId, ExecutionTask>,
completed: HashSet<CellId>,
failed: HashSet<CellId>,
dependency_graph: DependencyGraph,
max_concurrent: usize,
}
impl ExecutionQueue {
pub fn new() -> Self {
Self {
queue: VecDeque::new(),
executing: HashMap::new(),
completed: HashSet::new(),
failed: HashSet::new(),
dependency_graph: DependencyGraph::new(),
max_concurrent: 4, }
}
pub fn set_max_concurrent(&mut self, max: usize) {
self.max_concurrent = max.max(1); }
pub fn enqueue(&mut self, item: ExecutionQueueItem) {
self.dependency_graph.add_node(item.cell_id);
for dep_id in &item.dependencies {
self.dependency_graph.add_dependency(item.cell_id, *dep_id);
}
let mut insert_index = None;
for (i, existing) in self.queue.iter().enumerate() {
if item.priority < existing.priority {
insert_index = Some(i);
break;
}
}
if let Some(index) = insert_index {
self.queue.insert(index, item);
} else {
self.queue.push_back(item);
}
}
pub fn enqueue_incremental(&mut self, modified_cells: &HashSet<CellId>) {
let cells_to_execute = self.dependency_graph.get_cells_to_execute(modified_cells);
for cell_id in cells_to_execute {
let dependencies = self.dependency_graph.get_dependencies(&cell_id);
let item = ExecutionQueueItem {
cell_id,
priority: 0,
dependencies,
queued_at: SystemTime::now(),
is_incremental: true,
estimated_duration: None,
};
self.queue.push_back(item);
}
self.reorder_by_dependencies();
}
fn reorder_by_dependencies(&mut self) {
let topo_order = self.dependency_graph.topological_sort();
let mut ordered_queue = VecDeque::new();
for cell_id in topo_order {
if let Some(pos) = self.queue.iter().position(|item| item.cell_id == cell_id) {
let item = self.queue.remove(pos).unwrap();
ordered_queue.push_back(item);
}
}
while let Some(item) = self.queue.pop_front() {
ordered_queue.push_back(item);
}
self.queue = ordered_queue;
}
pub fn dequeue(&mut self) -> Option<ExecutionQueueItem> {
if self.executing.len() >= self.max_concurrent {
return None;
}
for i in 0..self.queue.len() {
let item = &self.queue[i];
if self.executing.contains_key(&item.cell_id) {
continue;
}
let dependencies_satisfied = item.dependencies.iter()
.all(|dep| self.completed.contains(dep) || self.failed.contains(dep));
if dependencies_satisfied {
let item = self.queue.remove(i).unwrap();
let mut task = ExecutionTask::new(item.cell_id);
task.start();
self.executing.insert(item.cell_id, task);
return Some(item);
}
}
None
}
pub fn dequeue_batch(&mut self, max_count: usize) -> Vec<ExecutionQueueItem> {
let mut items = Vec::new();
let available_slots = self.max_concurrent.saturating_sub(self.executing.len());
let count = max_count.min(available_slots);
for _ in 0..count {
if let Some(item) = self.dequeue() {
items.push(item);
} else {
break;
}
}
items
}
pub fn mark_completed(&mut self, cell_id: CellId, success: bool, result: Option<ExecutionResult>) {
if let Some(mut task) = self.executing.remove(&cell_id) {
if success {
if let Some(result) = result {
task.complete(result);
} else {
task.complete(ExecutionResult::Skipped);
}
self.completed.insert(cell_id);
self.dependency_graph.mark_executed(&cell_id);
} else {
let error = task.error.clone().unwrap_or_else(|| "未知错误".to_string());
task.fail(error);
self.failed.insert(cell_id);
}
}
}
pub fn mark_failed_with_retry(&mut self, cell_id: CellId, error: String) -> bool {
if let Some(mut task) = self.executing.remove(&cell_id) {
task.fail(error);
if task.can_retry() {
task.retry();
let dependencies = self.dependency_graph.get_dependencies(&cell_id);
let item = ExecutionQueueItem {
cell_id,
priority: 1000, dependencies,
queued_at: SystemTime::now(),
is_incremental: false,
estimated_duration: task.execution_time(),
};
self.queue.push_back(item);
return true; } else {
self.failed.insert(cell_id);
return false; }
}
false
}
pub fn cancel_cell(&mut self, cell_id: &CellId) {
if let Some(mut task) = self.executing.remove(cell_id) {
task.cancel();
}
self.queue.retain(|item| item.cell_id != *cell_id);
}
pub fn is_empty(&self) -> bool {
self.queue.is_empty() && self.executing.is_empty()
}
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn executing_count(&self) -> usize {
self.executing.len()
}
pub fn get_executing_tasks(&self) -> Vec<&ExecutionTask> {
self.executing.values().collect()
}
pub fn get_task(&self, cell_id: &CellId) -> Option<&ExecutionTask> {
self.executing.get(cell_id)
}
pub fn clear(&mut self) {
self.queue.clear();
self.executing.clear();
self.completed.clear();
self.failed.clear();
self.dependency_graph.clear();
}
pub fn statistics(&self) -> QueueStatistics {
QueueStatistics {
queued: self.queue.len(),
executing: self.executing.len(),
completed: self.completed.len(),
failed: self.failed.len(),
max_concurrent: self.max_concurrent,
dependency_stats: self.dependency_graph.statistics(),
}
}
pub fn get_dependency_graph(&mut self) -> &mut DependencyGraph {
&mut self.dependency_graph
}
pub fn estimate_remaining_time(&self) -> Option<Duration> {
if self.queue.is_empty() {
return None;
}
let avg_time = self.executing.values()
.filter_map(|task| task.execution_time())
.fold(Duration::ZERO, |acc, time| acc + time)
.checked_div(self.executing.len() as u32)
.unwrap_or(Duration::from_secs(10));
Some(avg_time * self.queue.len() as u32)
}
}
impl Default for ExecutionQueue {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct QueueStatistics {
pub queued: usize,
pub executing: usize,
pub completed: usize,
pub failed: usize,
pub max_concurrent: usize,
pub dependency_stats: DependencyGraphStats,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionCache {
results: HashMap<CellId, CachedResult>,
cache_file: Option<String>,
max_size: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CachedResult {
content_hash: u64,
result: ExecutionResult,
cached_at: SystemTime,
access_count: u64,
last_accessed: SystemTime,
}
impl CachedResult {
pub fn new(content_hash: u64, result: ExecutionResult) -> Self {
let now = SystemTime::now();
Self {
content_hash,
result,
cached_at: now,
access_count: 0,
last_accessed: now,
}
}
pub fn mark_accessed(&mut self) {
self.access_count += 1;
self.last_accessed = SystemTime::now();
}
pub fn is_expired(&self, max_age: Duration) -> bool {
self.cached_at.elapsed().unwrap_or(Duration::ZERO) > max_age
}
}
impl ExecutionCache {
pub fn new() -> Self {
Self {
results: HashMap::new(),
cache_file: None,
max_size: 1000, }
}
pub fn with_persistence<P: AsRef<Path>>(cache_file: P) -> Self {
let mut cache = Self::new();
cache.cache_file = Some(cache_file.as_ref().to_string_lossy().to_string());
cache.load_from_disk();
cache
}
pub fn set_max_size(&mut self, max_size: usize) {
self.max_size = max_size;
self.evict_if_needed();
}
fn compute_hash(content: &str) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
content.hash(&mut hasher);
hasher.finish()
}
pub fn get(&mut self, cell_id: &CellId, content: &str) -> Option<ExecutionResult> {
let content_hash = Self::compute_hash(content);
if let Some(cached) = self.results.get_mut(cell_id) {
if cached.content_hash == content_hash {
cached.mark_accessed();
return Some(cached.result.clone());
} else {
self.results.remove(cell_id);
}
}
None
}
pub fn put(&mut self, cell_id: CellId, content: &str, result: ExecutionResult) {
let content_hash = Self::compute_hash(content);
let cached_result = CachedResult::new(content_hash, result);
self.results.insert(cell_id, cached_result);
self.evict_if_needed();
if self.cache_file.is_some() {
self.save_to_disk();
}
}
pub fn remove(&mut self, cell_id: &CellId) {
self.results.remove(cell_id);
}
pub fn clear(&mut self) {
self.results.clear();
if self.cache_file.is_some() {
self.save_to_disk();
}
}
pub fn cleanup_expired(&mut self, max_age: Duration) {
self.results.retain(|_, cached| !cached.is_expired(max_age));
}
fn evict_if_needed(&mut self) {
if self.results.len() <= self.max_size {
return;
}
let mut items: Vec<_> = self.results.iter().map(|(k, v)| (*k, v.last_accessed)).collect();
items.sort_by_key(|(_, last_accessed)| *last_accessed);
let to_remove = items.len() - self.max_size;
let keys_to_remove: Vec<_> = items.into_iter().take(to_remove).map(|(k, _)| k).collect();
for cell_id in keys_to_remove {
self.results.remove(&cell_id);
}
}
fn load_from_disk(&mut self) {
if let Some(ref cache_file) = self.cache_file {
if let Ok(data) = fs::read_to_string(cache_file) {
if let Ok(results) = serde_json::from_str(&data) {
self.results = results;
}
}
}
}
fn save_to_disk(&self) {
if let Some(ref cache_file) = self.cache_file {
if let Ok(data) = serde_json::to_string(&self.results) {
let _ = fs::write(cache_file, data);
}
}
}
pub fn statistics(&self) -> CacheStatistics {
let total_access_count: u64 = self.results.values()
.map(|cached| cached.access_count)
.sum();
let avg_access_count = if self.results.is_empty() {
0.0
} else {
total_access_count as f64 / self.results.len() as f64
};
CacheStatistics {
total_entries: self.results.len(),
max_size: self.max_size,
total_access_count,
avg_access_count,
hit_rate: 0.0, }
}
}
impl Default for ExecutionCache {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct CacheStatistics {
pub total_entries: usize,
pub max_size: usize,
pub total_access_count: u64,
pub avg_access_count: f64,
pub hit_rate: f64,
}
#[derive(Debug, Clone)]
pub struct ExecutionEngineStatus {
pub is_running: bool,
pub is_cancelled: bool,
pub queue_statistics: QueueStatistics,
pub cache_statistics: CacheStatistics,
pub execution_statistics: ExecutionStatistics,
pub cache_hit_rate: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionEngineState {
pub statistics: ExecutionStatistics,
pub cache_hits: u64,
pub cache_misses: u64,
pub config: ExecutionEngineConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionEngineConfig {
pub max_concurrent: usize,
pub enable_cache: bool,
pub cache_file: Option<String>,
pub cache_max_size: usize,
pub cache_max_age: Duration,
pub enable_progress: bool,
pub progress_interval: Duration,
pub execution_timeout: Option<Duration>,
pub max_retries: u32,
}
impl Default for ExecutionEngineConfig {
fn default() -> Self {
Self {
max_concurrent: 4,
enable_cache: true,
cache_file: None,
cache_max_size: 1000,
cache_max_age: Duration::from_secs(3600), enable_progress: true,
progress_interval: Duration::from_millis(100),
execution_timeout: Some(Duration::from_secs(300)), max_retries: 3,
}
}
}
pub struct ExecutionEngine {
yufmath: Yufmath,
scope_manager: ScopeManager,
execution_queue: ExecutionQueue,
cache: ExecutionCache,
config: ExecutionEngineConfig,
is_running: Arc<RwLock<bool>>,
cancel_flag: Arc<Mutex<bool>>,
progress_callback: Option<ProgressCallback>,
statistics: ExecutionStatistics,
cache_hits: u64,
cache_misses: u64,
}
impl ExecutionEngine {
pub fn new() -> Self {
Self::with_config(ExecutionEngineConfig::default())
}
pub fn with_config(config: ExecutionEngineConfig) -> Self {
let mut execution_queue = ExecutionQueue::new();
execution_queue.set_max_concurrent(config.max_concurrent);
let cache = if config.enable_cache {
if let Some(ref cache_file) = config.cache_file {
ExecutionCache::with_persistence(cache_file)
} else {
ExecutionCache::new()
}
} else {
ExecutionCache::new()
};
Self {
yufmath: Yufmath::new(),
scope_manager: ScopeManager::new(),
execution_queue,
cache,
config,
is_running: Arc::new(RwLock::new(false)),
cancel_flag: Arc::new(Mutex::new(false)),
progress_callback: None,
statistics: ExecutionStatistics::default(),
cache_hits: 0,
cache_misses: 0,
}
}
pub fn set_progress_callback(&mut self, callback: ProgressCallback) {
self.progress_callback = Some(callback);
}
pub fn update_config(&mut self, config: ExecutionEngineConfig) {
self.execution_queue.set_max_concurrent(config.max_concurrent);
self.cache.set_max_size(config.cache_max_size);
self.config = config;
}
pub fn execute_cell(&mut self, cell: &mut NotebookCell) -> NotebookResult<ExecutionResult> {
let context = ExecutionContext::new(cell.id);
self.execute_cell_with_context(cell, context)
}
pub async fn execute_cell_async(&mut self, cell: &mut NotebookCell) -> NotebookResult<ExecutionResult> {
let context = ExecutionContext::new(cell.id);
self.execute_cell_with_context_async(cell, context).await
}
pub fn execute_cell_with_context(
&mut self,
cell: &mut NotebookCell,
context: ExecutionContext
) -> NotebookResult<ExecutionResult> {
let start_time = Instant::now();
if !cell.is_executable() {
return Ok(ExecutionResult::Skipped);
}
if *self.cancel_flag.lock().unwrap() {
return Ok(ExecutionResult::Cancelled);
}
if self.config.enable_cache {
let content = cell.get_text();
if let Some(cached_result) = self.cache.get(&cell.id, &content) {
self.cache_hits += 1;
return Ok(cached_result);
} else {
self.cache_misses += 1;
}
}
self.scope_manager.set_current_scope(Some(cell.id));
if let Some(ref callback) = self.progress_callback {
let progress = ComputeProgress {
current_step: "解析表达式".to_string(),
progress: 0.1,
estimated_remaining: None,
expression_size: cell.get_text().len(),
completed_subtasks: 0,
total_subtasks: 3,
phase: crate::api::ComputePhase::Parsing,
details: None,
memory_usage: 0,
cache_hit_rate: 0.0,
};
if !callback(&progress) {
return Ok(ExecutionResult::Cancelled);
}
}
let expression = match self.parse_cell_content(cell) {
Ok(expr) => expr,
Err(e) => {
let execution_time = start_time.elapsed();
self.statistics.record_execution(false, execution_time);
let result = ExecutionResult::Error {
error: format!("解析错误: {}", e),
error_type: "ParseError".to_string(),
execution_time,
};
if self.config.enable_cache {
self.cache.put(cell.id, &cell.get_text(), result.clone());
}
return Ok(result);
}
};
if let Some(ref callback) = self.progress_callback {
let progress = ComputeProgress {
current_step: "执行计算".to_string(),
progress: 0.5,
estimated_remaining: Some(Duration::from_secs(5)),
expression_size: cell.get_text().len(),
completed_subtasks: 1,
total_subtasks: 3,
phase: crate::api::ComputePhase::Computation,
details: None,
memory_usage: 0,
cache_hit_rate: 0.0,
};
if !callback(&progress) {
return Ok(ExecutionResult::Cancelled);
}
}
let result = match self.compute_expression(&expression, &context) {
Ok(value) => {
let execution_time = start_time.elapsed();
self.statistics.record_execution(true, execution_time);
let output_cell = NotebookCell::new_output(
value.clone(),
context.output_format.clone(),
Some(execution_time)
);
cell.set_output(output_cell);
ExecutionResult::Success {
value,
format: context.output_format,
execution_time,
}
}
Err(e) => {
let execution_time = start_time.elapsed();
self.statistics.record_execution(false, execution_time);
ExecutionResult::Error {
error: format!("计算错误: {}", e),
error_type: "ComputeError".to_string(),
execution_time,
}
}
};
if self.config.enable_cache {
self.cache.put(cell.id, &cell.get_text(), result.clone());
}
if let Some(ref callback) = self.progress_callback {
let progress = ComputeProgress {
current_step: "执行完成".to_string(),
progress: 1.0,
estimated_remaining: Some(Duration::ZERO),
expression_size: cell.get_text().len(),
completed_subtasks: 3,
total_subtasks: 3,
phase: crate::api::ComputePhase::Completed,
details: None,
memory_usage: 0,
cache_hit_rate: 0.0,
};
callback(&progress);
}
Ok(result)
}
pub async fn execute_cell_with_context_async(
&mut self,
cell: &mut NotebookCell,
context: ExecutionContext
) -> NotebookResult<ExecutionResult> {
let result = self.execute_cell_with_context(cell, context);
tokio::time::sleep(Duration::from_millis(10)).await;
result
}
fn parse_cell_content(&mut self, cell: &NotebookCell) -> NotebookResult<Expression> {
let content = cell.get_text();
match self.yufmath.parse(&content) {
Ok(expr) => Ok(expr),
Err(e) => Err(NotebookError::Execution(ComputeError::UnsupportedOperation {
operation: format!("解析失败: {}", e)
})),
}
}
fn compute_expression(&mut self, expr: &Expression, context: &ExecutionContext) -> NotebookResult<String> {
if context.is_timeout() {
return Err(NotebookError::Execution(ComputeError::UnsupportedOperation {
operation: "执行超时".to_string()
}));
}
if *self.cancel_flag.lock().unwrap() {
return Err(NotebookError::Execution(ComputeError::UnsupportedOperation {
operation: "执行被取消".to_string()
}));
}
let variables = self.scope_manager.export_for_computation();
match self.yufmath.evaluate(expr, &variables) {
Ok(result) => {
let formatted = format!("{:?}", result);
Ok(formatted)
}
Err(e) => Err(NotebookError::Execution(ComputeError::UnsupportedOperation {
operation: format!("计算失败: {}", e)
})),
}
}
pub fn execute_cells(&mut self, cells: &mut [NotebookCell]) -> Vec<NotebookResult<ExecutionResult>> {
let mut results = Vec::new();
for cell in cells.iter_mut() {
let result = self.execute_cell(cell);
results.push(result);
if *self.cancel_flag.lock().unwrap() {
break;
}
}
results
}
pub fn execute_cells_incremental(
&mut self,
cells: &mut HashMap<CellId, NotebookCell>
) -> HashMap<CellId, NotebookResult<ExecutionResult>> {
let mut results = HashMap::new();
let modified_cells: HashSet<CellId> = cells.values()
.filter(|cell| cell.needs_execution())
.map(|cell| cell.id)
.collect();
if modified_cells.is_empty() {
return results;
}
let dependencies = self.analyze_dependencies_from_cells(cells.values().collect());
for (cell_id, deps) in &dependencies {
self.execution_queue.get_dependency_graph().add_node(*cell_id);
for dep_id in deps {
self.execution_queue.get_dependency_graph().add_dependency(*cell_id, *dep_id);
}
}
let cells_to_execute = self.execution_queue.get_dependency_graph()
.get_cells_to_execute(&modified_cells);
for cell_id in cells_to_execute {
if let Some(cell) = cells.get_mut(&cell_id) {
let result = self.execute_cell(cell);
results.insert(cell_id, result);
if *self.cancel_flag.lock().unwrap() {
break;
}
}
}
results
}
pub async fn execute_cells_async(
&mut self,
cells: &mut HashMap<CellId, NotebookCell>
) -> HashMap<CellId, NotebookResult<ExecutionResult>> {
let mut results = HashMap::new();
*self.is_running.write().unwrap() = true;
let dependencies = self.analyze_dependencies_from_cells(cells.values().collect());
self.execution_queue.clear();
for (cell_id, deps) in dependencies {
let item = ExecutionQueueItem {
cell_id,
priority: 0,
dependencies: deps,
queued_at: SystemTime::now(),
is_incremental: false,
estimated_duration: None,
};
self.execution_queue.enqueue(item);
}
while !self.execution_queue.is_empty() {
if *self.cancel_flag.lock().unwrap() {
break;
}
let batch = self.execution_queue.dequeue_batch(self.config.max_concurrent);
if batch.is_empty() {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
let mut handles = Vec::new();
for item in batch {
if let Some(cell) = cells.get_mut(&item.cell_id) {
let cell_clone = cell.clone();
let context = ExecutionContext::new(item.cell_id);
let handle = tokio::spawn(async move {
(item.cell_id, Ok(ExecutionResult::Skipped))
});
handles.push(handle);
}
}
for handle in handles {
if let Ok((cell_id, result)) = handle.await {
let success = result.as_ref().map(|r| r.is_success()).unwrap_or(false);
let result_for_queue = result.as_ref().ok().cloned();
self.execution_queue.mark_completed(cell_id, success, result_for_queue);
results.insert(cell_id, result);
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
*self.is_running.write().unwrap() = false;
results
}
pub fn execute_queue(&mut self) -> NotebookResult<()> {
*self.is_running.write().unwrap() = true;
*self.cancel_flag.lock().unwrap() = false;
while !self.execution_queue.is_empty() {
if *self.cancel_flag.lock().unwrap() {
break;
}
if let Some(queue_item) = self.execution_queue.dequeue() {
self.execution_queue.mark_completed(queue_item.cell_id, true, None);
} else {
thread::sleep(Duration::from_millis(100));
}
}
*self.is_running.write().unwrap() = false;
Ok(())
}
pub fn queue_cell(&mut self, cell_id: CellId, dependencies: Vec<CellId>) {
let item = ExecutionQueueItem {
cell_id,
priority: 0,
dependencies,
queued_at: SystemTime::now(),
is_incremental: false,
estimated_duration: None,
};
self.execution_queue.enqueue(item);
}
pub fn cancel_execution(&mut self) {
*self.cancel_flag.lock().unwrap() = true;
}
pub fn is_running(&self) -> bool {
*self.is_running.read().unwrap()
}
pub fn clear_queue(&mut self) {
self.execution_queue.clear();
}
pub fn get_scope_manager(&mut self) -> &mut ScopeManager {
&mut self.scope_manager
}
pub fn get_statistics(&self) -> &ExecutionStatistics {
&self.statistics
}
pub fn reset_statistics(&mut self) {
self.statistics = ExecutionStatistics::default();
}
pub fn analyze_dependencies(&self, cells: &[NotebookCell]) -> HashMap<CellId, Vec<CellId>> {
self.analyze_dependencies_from_cells(cells.iter().collect())
}
fn analyze_dependencies_from_cells(&self, cells: Vec<&NotebookCell>) -> HashMap<CellId, Vec<CellId>> {
let mut dependencies = HashMap::new();
let mut variable_definitions: HashMap<String, CellId> = HashMap::new();
for cell in &cells {
if cell.is_executable() {
let content = cell.get_text();
if content.contains('=') && !content.contains("==") {
if let Some(var_name) = content.split('=').next() {
let var_name = var_name.trim().to_string();
variable_definitions.insert(var_name, cell.id);
}
}
}
}
for cell in &cells {
if cell.is_executable() {
let content = cell.get_text();
let mut cell_deps = Vec::new();
for (var_name, def_cell_id) in &variable_definitions {
if content.contains(var_name) && *def_cell_id != cell.id {
cell_deps.push(*def_cell_id);
}
}
dependencies.insert(cell.id, cell_deps);
}
}
dependencies
}
pub fn handle_execution_error(
&mut self,
cell_id: CellId,
error: &NotebookError
) -> NotebookResult<bool> {
match error {
NotebookError::Execution(compute_error) => {
match compute_error {
ComputeError::DivisionByZero => {
self.try_symbolic_recovery(cell_id)
}
ComputeError::UndefinedVariable { name } => {
self.try_variable_recovery(cell_id, name)
}
ComputeError::Overflow => {
self.try_precision_recovery(cell_id)
}
_ => Ok(false), }
}
_ => Ok(false), }
}
fn try_symbolic_recovery(&mut self, _cell_id: CellId) -> NotebookResult<bool> {
Ok(false)
}
fn try_variable_recovery(&mut self, _cell_id: CellId, var_name: &str) -> NotebookResult<bool> {
if self.scope_manager.has_variable(var_name) {
return Ok(true);
}
Ok(false)
}
fn try_precision_recovery(&mut self, _cell_id: CellId) -> NotebookResult<bool> {
Ok(false)
}
pub fn cleanup_execution_environment(&mut self) {
self.cache.cleanup_expired(self.config.cache_max_age);
*self.cancel_flag.lock().unwrap() = false;
self.execution_queue.clear();
*self.is_running.write().unwrap() = false;
}
pub fn get_engine_status(&self) -> ExecutionEngineStatus {
let queue_stats = self.execution_queue.statistics();
let cache_stats = self.cache.statistics();
let is_running = *self.is_running.read().unwrap();
let is_cancelled = *self.cancel_flag.lock().unwrap();
ExecutionEngineStatus {
is_running,
is_cancelled,
queue_statistics: queue_stats,
cache_statistics: cache_stats,
execution_statistics: self.statistics.clone(),
cache_hit_rate: if self.cache_hits + self.cache_misses > 0 {
self.cache_hits as f64 / (self.cache_hits + self.cache_misses) as f64
} else {
0.0
},
}
}
pub fn save_state<P: AsRef<Path>>(&self, path: P) -> NotebookResult<()> {
let state = ExecutionEngineState {
statistics: self.statistics.clone(),
cache_hits: self.cache_hits,
cache_misses: self.cache_misses,
config: self.config.clone(),
};
let data = serde_json::to_string_pretty(&state)
.map_err(|e| NotebookError::Io(std::io::Error::new(
std::io::ErrorKind::Other,
format!("序列化失败: {}", e)
)))?;
fs::write(path, data)
.map_err(|e| NotebookError::Io(e))?;
Ok(())
}
pub fn load_state<P: AsRef<Path>>(&mut self, path: P) -> NotebookResult<()> {
let data = fs::read_to_string(path)
.map_err(|e| NotebookError::Io(e))?;
let state: ExecutionEngineState = serde_json::from_str(&data)
.map_err(|e| NotebookError::Io(std::io::Error::new(
std::io::ErrorKind::Other,
format!("反序列化失败: {}", e)
)))?;
self.statistics = state.statistics;
self.cache_hits = state.cache_hits;
self.cache_misses = state.cache_misses;
self.update_config(state.config);
Ok(())
}
}
impl Default for ExecutionEngine {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct ExecutionStatistics {
pub total_executions: u64,
pub successful_executions: u64,
pub failed_executions: u64,
pub total_execution_time: Duration,
pub average_execution_time: Duration,
pub max_execution_time: Duration,
pub min_execution_time: Duration,
}
impl ExecutionStatistics {
pub fn record_execution(&mut self, success: bool, execution_time: Duration) {
self.total_executions += 1;
if success {
self.successful_executions += 1;
} else {
self.failed_executions += 1;
}
self.total_execution_time += execution_time;
self.average_execution_time = self.total_execution_time / self.total_executions as u32;
if execution_time > self.max_execution_time {
self.max_execution_time = execution_time;
}
if self.min_execution_time == Duration::ZERO || execution_time < self.min_execution_time {
self.min_execution_time = execution_time;
}
}
pub fn success_rate(&self) -> f64 {
if self.total_executions == 0 {
0.0
} else {
self.successful_executions as f64 / self.total_executions as f64
}
}
pub fn failure_rate(&self) -> f64 {
1.0 - self.success_rate()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::notebook::NotebookCell;
use std::collections::HashSet;
#[test]
fn test_execution_result() {
let success = ExecutionResult::Success {
value: "42".to_string(),
format: FormatType::Standard,
execution_time: Duration::from_millis(100),
};
assert!(success.is_success());
assert!(!success.is_error());
assert_eq!(success.value(), Some("42"));
assert_eq!(success.execution_time(), Some(Duration::from_millis(100)));
let error = ExecutionResult::Error {
error: "除零错误".to_string(),
error_type: "DivisionByZero".to_string(),
execution_time: Duration::from_millis(50),
};
assert!(!error.is_success());
assert!(error.is_error());
assert_eq!(error.value(), Some("除零错误"));
}
#[test]
fn test_execution_context() {
let cell_id = uuid::Uuid::new_v4();
let context = ExecutionContext::new(cell_id);
assert_eq!(context.cell_id, cell_id);
assert!(context.allow_cancellation);
assert!(context.show_timing);
let mut timeout_context = context.clone();
timeout_context.max_execution_time = Some(Duration::from_nanos(1));
thread::sleep(Duration::from_millis(1));
assert!(timeout_context.is_timeout());
}
#[test]
fn test_execution_task() {
let cell_id = uuid::Uuid::new_v4();
let mut task = ExecutionTask::new(cell_id);
assert_eq!(task.cell_id, cell_id);
assert_eq!(task.status, TaskStatus::Pending);
assert_eq!(task.retry_count, 0);
task.start();
assert_eq!(task.status, TaskStatus::Running);
assert!(task.started_at.is_some());
let result = ExecutionResult::Success {
value: "42".to_string(),
format: FormatType::Standard,
execution_time: Duration::from_millis(100),
};
task.complete(result);
assert_eq!(task.status, TaskStatus::Completed);
assert!(task.completed_at.is_some());
let mut failed_task = ExecutionTask::new(cell_id);
failed_task.fail("测试错误".to_string());
assert!(failed_task.can_retry());
failed_task.retry();
assert_eq!(failed_task.status, TaskStatus::Pending);
assert_eq!(failed_task.retry_count, 1);
}
#[test]
fn test_dependency_graph() {
let mut graph = DependencyGraph::new();
let cell1 = uuid::Uuid::new_v4();
let cell2 = uuid::Uuid::new_v4();
let cell3 = uuid::Uuid::new_v4();
graph.add_node(cell1);
graph.add_node(cell2);
graph.add_node(cell3);
graph.add_dependency(cell2, cell1); graph.add_dependency(cell3, cell2);
let deps_cell2 = graph.get_dependencies(&cell2);
let deps_cell3 = graph.get_dependencies(&cell3);
println!("cell2 的依赖: {:?}", deps_cell2);
println!("cell3 的依赖: {:?}", deps_cell3);
assert!(deps_cell2.contains(&cell1), "cell2 应该依赖 cell1");
assert!(deps_cell3.contains(&cell2), "cell3 应该依赖 cell2");
let topo_order = graph.topological_sort();
println!("拓扑排序结果: {:?}", topo_order);
let cell1_pos = topo_order.iter().position(|&id| id == cell1).unwrap();
let cell2_pos = topo_order.iter().position(|&id| id == cell2).unwrap();
let cell3_pos = topo_order.iter().position(|&id| id == cell3).unwrap();
println!("位置: cell1={}, cell2={}, cell3={}", cell1_pos, cell2_pos, cell3_pos);
assert!(cell1_pos < cell2_pos, "cell1 应该在 cell2 之前,但实际位置: cell1={}, cell2={}", cell1_pos, cell2_pos);
assert!(cell2_pos < cell3_pos, "cell2 应该在 cell3 之前,但实际位置: cell2={}, cell3={}", cell2_pos, cell3_pos);
let mut modified = HashSet::new();
modified.insert(cell1);
let to_execute = graph.get_cells_to_execute(&modified);
assert!(to_execute.contains(&cell1));
assert!(to_execute.contains(&cell2));
assert!(to_execute.contains(&cell3));
let stats = graph.statistics();
assert_eq!(stats.total_nodes, 3);
assert_eq!(stats.total_edges, 2);
assert!(!stats.has_cycles);
}
#[test]
fn test_execution_queue_enhanced() {
let mut queue = ExecutionQueue::new();
let cell_id1 = uuid::Uuid::new_v4();
let cell_id2 = uuid::Uuid::new_v4();
let item1 = ExecutionQueueItem {
cell_id: cell_id1,
priority: 1,
dependencies: vec![],
queued_at: SystemTime::now(),
is_incremental: false,
estimated_duration: None,
};
let item2 = ExecutionQueueItem {
cell_id: cell_id2,
priority: 0, dependencies: vec![cell_id1], queued_at: SystemTime::now(),
is_incremental: true,
estimated_duration: Some(Duration::from_secs(5)),
};
queue.enqueue(item1);
queue.enqueue(item2);
let next = queue.dequeue().unwrap();
assert_eq!(next.cell_id, cell_id1);
queue.mark_completed(cell_id1, true, Some(ExecutionResult::Success {
value: "42".to_string(),
format: FormatType::Standard,
execution_time: Duration::from_millis(100),
}));
let next = queue.dequeue().unwrap();
assert_eq!(next.cell_id, cell_id2);
queue.clear();
for i in 0..5 {
let item = ExecutionQueueItem {
cell_id: uuid::Uuid::new_v4(),
priority: i,
dependencies: vec![],
queued_at: SystemTime::now(),
is_incremental: false,
estimated_duration: None,
};
queue.enqueue(item);
}
let batch = queue.dequeue_batch(3);
assert_eq!(batch.len(), 3);
}
#[test]
fn test_execution_cache() {
let mut cache = ExecutionCache::new();
let cell_id = uuid::Uuid::new_v4();
let content = "2 + 3";
assert!(cache.get(&cell_id, content).is_none());
let result = ExecutionResult::Success {
value: "5".to_string(),
format: FormatType::Standard,
execution_time: Duration::from_millis(10),
};
cache.put(cell_id, content, result.clone());
let cached = cache.get(&cell_id, content).unwrap();
assert_eq!(cached.value(), Some("5"));
let new_content = "3 + 4";
assert!(cache.get(&cell_id, new_content).is_none());
cache.clear();
assert!(cache.get(&cell_id, content).is_none());
}
#[test]
fn test_execution_engine_enhanced() {
let config = ExecutionEngineConfig {
max_concurrent: 2,
enable_cache: true,
cache_file: None,
cache_max_size: 100,
cache_max_age: Duration::from_secs(3600),
enable_progress: false, progress_interval: Duration::from_millis(100),
execution_timeout: Some(Duration::from_secs(10)),
max_retries: 2,
};
let mut engine = ExecutionEngine::with_config(config);
let mut cell = NotebookCell::new_code("2 + 3".to_string());
let result1 = engine.execute_cell(&mut cell).unwrap();
assert!(result1.is_success());
let result2 = engine.execute_cell(&mut cell).unwrap();
assert!(result2.is_success());
let status = engine.get_engine_status();
assert!(status.cache_hit_rate > 0.0);
let mut text_cell = NotebookCell::new_text("这是文本".to_string());
let result = engine.execute_cell(&mut text_cell).unwrap();
assert!(matches!(result, ExecutionResult::Skipped));
}
#[test]
fn test_incremental_execution() {
let mut engine = ExecutionEngine::new();
let mut cells = HashMap::new();
let mut cell1 = NotebookCell::new_code("x = 10".to_string());
let mut cell2 = NotebookCell::new_code("y = x + 5".to_string());
let mut cell3 = NotebookCell::new_code("z = y * 2".to_string());
cell1.metadata.mark_dirty();
cell2.metadata.mark_dirty();
cell3.metadata.mark_dirty();
cells.insert(cell1.id, cell1);
cells.insert(cell2.id, cell2);
cells.insert(cell3.id, cell3);
let results = engine.execute_cells_incremental(&mut cells);
assert_eq!(results.len(), 3);
if let Some(cell1) = cells.values_mut().next() {
cell1.set_text("x = 20".to_string());
}
let results = engine.execute_cells_incremental(&mut cells);
assert!(results.len() > 0);
}
#[test]
fn test_error_handling_and_recovery() {
let mut engine = ExecutionEngine::new();
let error = NotebookError::Execution(ComputeError::DivisionByZero);
let cell_id = uuid::Uuid::new_v4();
let can_recover = engine.handle_execution_error(cell_id, &error).unwrap();
assert!(!can_recover);
let error = NotebookError::Execution(ComputeError::UndefinedVariable {
name: "undefined_var".to_string()
});
let can_recover = engine.handle_execution_error(cell_id, &error).unwrap();
assert!(!can_recover);
}
#[test]
fn test_execution_statistics() {
let mut stats = ExecutionStatistics::default();
stats.record_execution(true, Duration::from_millis(100));
stats.record_execution(false, Duration::from_millis(50));
stats.record_execution(true, Duration::from_millis(200));
assert_eq!(stats.total_executions, 3);
assert_eq!(stats.successful_executions, 2);
assert_eq!(stats.failed_executions, 1);
assert_eq!(stats.success_rate(), 2.0 / 3.0);
assert_eq!(stats.max_execution_time, Duration::from_millis(200));
assert_eq!(stats.min_execution_time, Duration::from_millis(50));
}
#[test]
fn test_dependency_analysis() {
let engine = ExecutionEngine::new();
let cell1 = NotebookCell::new_code("x = 10".to_string());
let cell2 = NotebookCell::new_code("y = x + 5".to_string());
let cell3 = NotebookCell::new_code("z = y * 2".to_string());
let cells = vec![cell1.clone(), cell2.clone(), cell3.clone()];
let deps = engine.analyze_dependencies(&cells);
assert!(deps.get(&cell2.id).unwrap().contains(&cell1.id));
assert!(deps.get(&cell3.id).unwrap().contains(&cell2.id));
}
#[tokio::test]
async fn test_async_execution() {
let mut engine = ExecutionEngine::new();
let mut cell = NotebookCell::new_code("2 + 3".to_string());
let result = engine.execute_cell_async(&mut cell).await.unwrap();
assert!(result.is_success());
}
}