venus_server/
session.rs

1//! Notebook session management.
2//!
3//! Manages the state of an active notebook session including
4//! compilation, execution, and output caching.
5
6use std::collections::HashMap;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::time::{Duration, Instant};
11
12use tokio::sync::{RwLock, broadcast};
13use venus::widgets::{WidgetDef, WidgetValue};
14use venus_core::compile::{
15    CellCompiler, CompilationResult, CompilerConfig, ToolchainManager, UniverseBuilder,
16};
17use venus_core::execute::{ExecutorKillHandle, ProcessExecutor};
18use venus_core::graph::{CellId, CellInfo, CellParser, GraphEngine, MarkdownCell, MoveDirection, SourceEditor};
19use venus_core::paths::NotebookDirs;
20
21use crate::error::{ServerError, ServerResult};
22use crate::protocol::{CellOutput, CellState, CellStatus, DependencyEdge, ServerMessage};
23use crate::undo::{UndoManager, UndoableOperation};
24use venus_core::state::BoxedOutput;
25
26/// Shared interrupt flag that can be checked without locks.
27pub type InterruptFlag = Arc<AtomicBool>;
28
29/// Capacity for the broadcast channel.
30/// 256 messages should be sufficient for normal notebook operation.
31/// If clients fall behind, older messages will be dropped.
32const MESSAGE_CHANNEL_CAPACITY: usize = 256;
33
34/// A notebook session.
35pub struct NotebookSession {
36    /// Path to the notebook file.
37    path: PathBuf,
38
39    /// Parsed code cells.
40    cells: Vec<CellInfo>,
41
42    /// Parsed markdown cells.
43    markdown_cells: Vec<MarkdownCell>,
44
45    /// Dependency graph.
46    graph: GraphEngine,
47
48    /// Cell states for clients (both code and markdown).
49    cell_states: HashMap<CellId, CellState>,
50
51    /// Toolchain manager.
52    toolchain: ToolchainManager,
53
54    /// Compiler configuration.
55    config: CompilerConfig,
56
57    /// Universe path (compiled dependencies).
58    universe_path: Option<PathBuf>,
59
60    /// Dependencies hash for cache invalidation.
61    deps_hash: u64,
62
63    /// Broadcast channel for server messages.
64    tx: broadcast::Sender<ServerMessage>,
65
66    /// Whether an execution is in progress.
67    executing: bool,
68
69    /// Cached cell outputs for dependency passing.
70    /// Maps cell ID to its serialized output.
71    cell_outputs: HashMap<CellId, Arc<BoxedOutput>>,
72
73    /// Process-based executor for isolated cell execution.
74    /// Uses worker processes that can be killed for true interruption.
75    executor: ProcessExecutor,
76
77    /// Optional execution timeout for execute_all.
78    /// After this duration, the executor kills the current worker.
79    execution_timeout: Option<Duration>,
80
81    /// Shared flag indicating if current execution was interrupted by user.
82    /// When true, errors should be reported as "interrupted" not as failures.
83    /// This is shared with AppState so interrupt handler can set it.
84    interrupted: InterruptFlag,
85
86    /// Widget values per cell.
87    /// Maps cell ID -> widget ID -> current value.
88    widget_values: HashMap<CellId, HashMap<String, WidgetValue>>,
89
90    /// Widget definitions per cell (from last execution).
91    /// Used to send widget state to newly connected clients.
92    widget_defs: HashMap<CellId, Vec<WidgetDef>>,
93
94    /// Execution history per cell.
95    /// Stores both serialized output (for dependent cells) and display output.
96    cell_output_history: HashMap<CellId, Vec<OutputHistoryEntry>>,
97
98    /// Current history index per cell.
99    cell_history_index: HashMap<CellId, usize>,
100
101    /// Undo/redo manager for cell operations.
102    undo_manager: UndoManager,
103}
104
105/// Maximum number of history entries per cell.
106const MAX_HISTORY_PER_CELL: usize = 10;
107
108/// A single history entry for a cell's execution.
109#[derive(Clone)]
110pub struct OutputHistoryEntry {
111    /// Serialized output for passing to dependent cells.
112    pub serialized: Arc<BoxedOutput>,
113    /// Display output for the frontend.
114    pub display: CellOutput,
115    /// Timestamp when this execution completed.
116    pub timestamp: u64,
117}
118
119/// Thread-safe session handle.
120pub type SessionHandle = Arc<RwLock<NotebookSession>>;
121
122impl NotebookSession {
123    /// Create a new notebook session.
124    ///
125    /// Uses process isolation for cell execution, allowing true interruption
126    /// by killing worker processes.
127    ///
128    /// The `interrupted` flag is shared with AppState so the interrupt handler
129    /// can set it without needing the session lock.
130    pub fn new(
131        path: impl AsRef<Path>,
132        interrupted: InterruptFlag,
133    ) -> ServerResult<(Self, broadcast::Receiver<ServerMessage>)> {
134        let path = path.as_ref().canonicalize().map_err(|e| ServerError::Io {
135            path: path.as_ref().to_path_buf(),
136            message: e.to_string(),
137        })?;
138
139        // Set up directories using shared abstraction
140        let dirs = NotebookDirs::from_notebook_path(&path)?;
141
142        let toolchain = ToolchainManager::new()?;
143        let config = CompilerConfig::for_notebook(&dirs);
144
145        let (tx, rx) = broadcast::channel(MESSAGE_CHANNEL_CAPACITY);
146
147        // Create process executor with warm worker pool
148        let executor = ProcessExecutor::new(&dirs.state_dir)?;
149
150        let mut session = Self {
151            path,
152            cells: Vec::new(),
153            markdown_cells: Vec::new(),
154            graph: GraphEngine::new(),
155            cell_states: HashMap::new(),
156            toolchain,
157            config,
158            universe_path: None,
159            deps_hash: 0,
160            tx,
161            executing: false,
162            cell_outputs: HashMap::new(),
163            executor,
164            execution_timeout: None,
165            interrupted,
166            widget_values: HashMap::new(),
167            widget_defs: HashMap::new(),
168            cell_output_history: HashMap::new(),
169            cell_history_index: HashMap::new(),
170            undo_manager: UndoManager::new(),
171        };
172
173        session.reload()?;
174
175        Ok((session, rx))
176    }
177
178    /// Get the notebook path.
179    pub fn path(&self) -> &Path {
180        &self.path
181    }
182
183    /// Subscribe to server messages.
184    pub fn subscribe(&self) -> broadcast::Receiver<ServerMessage> {
185        self.tx.subscribe()
186    }
187
188    /// Get a cell by ID.
189    fn get_cell(&self, cell_id: CellId) -> Option<&CellInfo> {
190        self.cells.iter().find(|c| c.id == cell_id)
191    }
192
193    /// Set the status of a code cell.
194    fn set_cell_status(&mut self, cell_id: CellId, status: CellStatus) {
195        if let Some(CellState::Code { status: cell_status, .. }) = self.cell_states.get_mut(&cell_id) {
196            *cell_status = status;
197        }
198    }
199
200    /// Broadcast a server message, ignoring send failures.
201    pub fn broadcast(&self, msg: ServerMessage) {
202        let _ = self.tx.send(msg);
203    }
204
205    /// Reload the notebook from disk.
206    pub fn reload(&mut self) -> ServerResult<()> {
207        let source = std::fs::read_to_string(&self.path)?;
208
209        // Parse cells (both code and markdown)
210        let mut parser = CellParser::new();
211        let parse_result = parser.parse_file(&self.path)?;
212        self.cells = parse_result.code_cells;
213        self.markdown_cells = parse_result.markdown_cells;
214
215        // Build graph and update code cells with real IDs (parser returns placeholder IDs)
216        self.graph = GraphEngine::new();
217        for cell in &mut self.cells {
218            let real_id = self.graph.add_cell(cell.clone());
219            cell.id = real_id;
220        }
221        self.graph.resolve_dependencies()?;
222
223        // Assign unique IDs to markdown cells (they don't participate in the dependency graph)
224        let mut next_id = if let Some(max_code_id) = self.cells.iter().map(|c| c.id.as_usize()).max() {
225            max_code_id + 1
226        } else {
227            0
228        };
229        for md_cell in &mut self.markdown_cells {
230            md_cell.id = CellId::new(next_id);
231            next_id += 1;
232        }
233
234        // Build universe (always needed for bincode/serde runtime)
235        let mut universe_builder =
236            UniverseBuilder::new(self.config.clone(), self.toolchain.clone());
237        universe_builder.parse_dependencies(&source)?;
238
239        self.universe_path = Some(universe_builder.build()?);
240        self.deps_hash = universe_builder.deps_hash();
241
242        // Update cell states
243        self.update_cell_states();
244
245        // Broadcast full state update (includes all cells, not just graph edges)
246        let state = self.get_state();
247        self.broadcast(state);
248
249        Ok(())
250    }
251
252    /// Strip the first heading from a doc comment (since it's used as display name).
253    ///
254    /// If the doc comment starts with `# Heading`, removes that line and returns
255    /// the rest of the content. Otherwise returns the original content.
256    fn strip_display_name_from_description(doc_comment: &Option<String>) -> Option<String> {
257        doc_comment.as_ref().map(|doc| {
258            let lines: Vec<&str> = doc.lines().collect();
259
260            // Find the first heading line
261            if let Some(first_line) = lines.first() {
262                let trimmed = first_line.trim();
263                if trimmed.starts_with('#') {
264                    // Skip the heading line and return the rest
265                    let remaining: Vec<&str> = lines.iter().skip(1)
266                        .map(|s| *s)
267                        .collect();
268
269                    // Trim leading empty lines
270                    let trimmed_lines: Vec<&str> = remaining.iter()
271                        .skip_while(|line| line.trim().is_empty())
272                        .map(|s| *s)
273                        .collect();
274
275                    if trimmed_lines.is_empty() {
276                        return None;
277                    }
278
279                    return Some(trimmed_lines.join("\n"));
280                }
281            }
282
283            // No heading found, return original
284            Some(doc.clone())
285        }).flatten()
286    }
287
288    /// Update cell states from parsed cells.
289    fn update_cell_states(&mut self) {
290        let mut new_states = HashMap::new();
291
292        // Add code cells
293        for cell in &self.cells {
294            let existing = self.cell_states.get(&cell.id);
295
296            // Extract status, output, dirty from existing state if it's a code cell
297            let (status, output, dirty) = if let Some(CellState::Code { status, output, dirty, .. }) = existing {
298                (*status, output.clone(), *dirty)
299            } else {
300                (CellStatus::default(), None, true)
301            };
302
303            let state = CellState::Code {
304                id: cell.id,
305                name: cell.name.clone(),
306                display_name: cell.display_name.clone(),
307                source: cell.source_code.clone(),
308                description: Self::strip_display_name_from_description(&cell.doc_comment),
309                return_type: cell.return_type.clone(),
310                dependencies: cell
311                    .dependencies
312                    .iter()
313                    .map(|d| d.param_name.clone())
314                    .collect(),
315                status,
316                output,
317                dirty,
318            };
319            new_states.insert(cell.id, state);
320        }
321
322        // Add markdown cells
323        for md_cell in &self.markdown_cells {
324            let state = CellState::Markdown {
325                id: md_cell.id,
326                content: md_cell.content.clone(),
327            };
328            new_states.insert(md_cell.id, state);
329        }
330
331        self.cell_states = new_states;
332    }
333
334    /// Broadcast current graph state.
335    fn broadcast_graph_update(&self) {
336        let edges: Vec<DependencyEdge> = self
337            .cells
338            .iter()
339            .flat_map(|cell| {
340                cell.dependencies.iter().filter_map(|dep| {
341                    self.cells
342                        .iter()
343                        .find(|c| c.name == dep.param_name)
344                        .map(|producer| DependencyEdge {
345                            from: producer.id,
346                            to: cell.id,
347                            param_name: dep.param_name.clone(),
348                        })
349                })
350            })
351            .collect();
352
353        let order = match self.graph.topological_order() {
354            Ok(order) => order,
355            Err(e) => {
356                tracing::error!("Failed to compute topological order: {}", e);
357                Vec::new()
358            }
359        };
360        let levels = self.graph.topological_levels(&order);
361
362        self.broadcast(ServerMessage::GraphUpdated { edges, levels });
363    }
364
365    /// Get the full notebook state.
366    pub fn get_state(&self) -> ServerMessage {
367        // Source order: all cells (code + markdown) in the order they appear in the .rs file
368        let mut all_cells: Vec<(CellId, usize)> = Vec::new();
369
370        // Add code cells with their line numbers
371        for cell in &self.cells {
372            all_cells.push((cell.id, cell.span.start_line));
373        }
374
375        // Add markdown cells with their line numbers
376        for md_cell in &self.markdown_cells {
377            all_cells.push((md_cell.id, md_cell.span.start_line));
378        }
379
380        // Sort by line number
381        all_cells.sort_by_key(|(_, line)| *line);
382
383        let source_order: Vec<CellId> = all_cells.into_iter().map(|(id, _)| id).collect();
384
385        // Execution order: topologically sorted for dependency resolution (code cells only)
386        let execution_order = match self.graph.topological_order() {
387            Ok(order) => order,
388            Err(e) => {
389                tracing::error!("Failed to compute execution order: {}", e);
390                Vec::new()
391            }
392        };
393
394        ServerMessage::NotebookState {
395            path: self.path.display().to_string(),
396            cells: self.cell_states.values().cloned().collect(),
397            source_order,
398            execution_order,
399        }
400    }
401
402    /// Execute a specific cell.
403    ///
404    /// Uses process isolation - the cell runs in a worker process that can
405    /// be killed immediately for interruption.
406    pub async fn execute_cell(&mut self, cell_id: CellId) -> ServerResult<()> {
407        if self.executing {
408            return Err(ServerError::ExecutionInProgress);
409        }
410
411        let cell = self
412            .get_cell(cell_id)
413            .ok_or(ServerError::CellNotFound(cell_id))?
414            .clone();
415
416        self.executing = true;
417
418        // Check if all dependencies have outputs available
419        let missing_deps: Vec<&str> = cell
420            .dependencies
421            .iter()
422            .filter(|dep| {
423                let producer = self.cells.iter().find(|c| c.name == dep.param_name);
424                match producer {
425                    Some(c) => !self.cell_outputs.contains_key(&c.id),
426                    None => true,
427                }
428            })
429            .map(|d| d.param_name.as_str())
430            .collect();
431
432        if !missing_deps.is_empty() {
433            self.set_cell_status(cell_id, CellStatus::Error);
434            self.broadcast(ServerMessage::CellError {
435                cell_id,
436                error: format!(
437                    "Missing dependencies: {}. Run dependent cells first.",
438                    missing_deps.join(", ")
439                ),
440                location: None,
441            });
442            self.executing = false;
443            return Ok(());
444        }
445
446        // Compile
447        self.set_cell_status(cell_id, CellStatus::Compiling);
448
449        let mut compiler = CellCompiler::new(self.config.clone(), self.toolchain.clone());
450        if let Some(ref up) = self.universe_path {
451            compiler = compiler.with_universe(up.clone());
452        }
453
454        let result = compiler.compile(&cell, self.deps_hash);
455
456        match result {
457            CompilationResult::Success(compiled) | CompilationResult::Cached(compiled) => {
458                // Execute
459                self.set_cell_status(cell_id, CellStatus::Running);
460                self.broadcast(ServerMessage::CellStarted { cell_id });
461
462                let start = Instant::now();
463
464                // Register the compiled cell with the executor
465                self.executor.register_cell(compiled, cell.dependencies.len());
466
467                // Gather dependency outputs in the order the cell expects them
468                let inputs: Vec<Arc<BoxedOutput>> = cell
469                    .dependencies
470                    .iter()
471                    .filter_map(|dep| {
472                        self.cells
473                            .iter()
474                            .find(|c| c.name == dep.param_name)
475                            .and_then(|c| self.cell_outputs.get(&c.id).cloned())
476                    })
477                    .collect();
478
479                // Get ALL widget values from all cells (widgets can be in any cell)
480                let widget_values = self.get_all_widget_values();
481                let widget_values_json = if widget_values.is_empty() {
482                    Vec::new()
483                } else {
484                    serde_json::to_vec(&widget_values).unwrap_or_default()
485                };
486
487                // Execute the cell in an isolated worker process with widget values
488                let exec_result = self.executor.execute_cell_with_widgets(
489                    cell_id,
490                    &inputs,
491                    widget_values_json,
492                );
493
494                let duration = start.elapsed();
495
496                match exec_result {
497                    Ok((output, widgets_json)) => {
498                        // Store output for dependent cells
499                        let output_arc = Arc::new(output);
500                        self.cell_outputs.insert(cell_id, output_arc.clone());
501
502                        // Also store in executor state for consistency
503                        self.executor.state_mut().store_output(cell_id, (*output_arc).clone());
504
505                        // Parse and store widget definitions
506                        let widgets: Vec<WidgetDef> = if widgets_json.is_empty() {
507                            Vec::new()
508                        } else {
509                            serde_json::from_slice(&widgets_json).unwrap_or_default()
510                        };
511                        self.store_widget_defs(cell_id, widgets.clone());
512
513                        let cell_output = CellOutput {
514                            text: output_arc.display_text().map(|s| s.to_string()),
515                            html: None,
516                            image: None,
517                            json: None,
518                            widgets,
519                        };
520
521                        // Add to history
522                        self.add_to_history(cell_id, output_arc.clone(), cell_output.clone());
523
524                        if let Some(state) = self.cell_states.get_mut(&cell_id) {
525                            state.set_status(CellStatus::Success);
526                            state.set_output(Some(cell_output.clone()));
527                            state.set_dirty(false);
528                        }
529
530                        self.broadcast(ServerMessage::CellCompleted {
531                            cell_id,
532                            duration_ms: duration.as_millis() as u64,
533                            output: Some(cell_output),
534                        });
535                    }
536                    Err(e) => {
537                        // Check if this was an abort or user-initiated interrupt
538                        let was_interrupted = self.interrupted.swap(false, Ordering::SeqCst);
539                        if matches!(e, venus_core::Error::Aborted) || was_interrupted {
540                            // Send friendly "interrupted" message instead of error
541                            self.set_cell_status(cell_id, CellStatus::Idle);
542                            self.broadcast(ServerMessage::ExecutionAborted { cell_id: Some(cell_id) });
543                        } else {
544                            self.set_cell_status(cell_id, CellStatus::Error);
545                            self.broadcast(ServerMessage::CellError {
546                                cell_id,
547                                error: e.to_string(),
548                                location: None,
549                            });
550                        }
551                    }
552                }
553            }
554            CompilationResult::Failed { errors, .. } => {
555                self.set_cell_status(cell_id, CellStatus::Error);
556
557                let compile_errors = errors
558                    .iter()
559                    .map(|e| crate::protocol::CompileErrorInfo {
560                        message: e.message.clone(),
561                        code: e.code.clone(),
562                        location: e.spans.first().map(|s| crate::protocol::SourceLocation {
563                            line: s.location.line as u32,
564                            column: s.location.column as u32,
565                            end_line: s.end_location.as_ref().map(|l| l.line as u32),
566                            end_column: s.end_location.as_ref().map(|l| l.column as u32),
567                        }),
568                        rendered: e.rendered.clone(),
569                    })
570                    .collect();
571
572                self.broadcast(ServerMessage::CompileError {
573                    cell_id,
574                    errors: compile_errors,
575                });
576            }
577        }
578
579        self.executing = false;
580        Ok(())
581    }
582
583    /// Execute all cells in order.
584    ///
585    /// If `execution_timeout` is set, kills the worker process after that duration.
586    /// Unlike cooperative cancellation, this immediately terminates the cell.
587    pub async fn execute_all(&mut self) -> ServerResult<()> {
588        let order = self.graph.topological_order()?;
589        let start = Instant::now();
590        let timeout = self.execution_timeout;
591
592        for cell_id in order {
593            // Check timeout before each cell
594            if timeout.is_some_and(|max_duration| start.elapsed() > max_duration) {
595                self.executor.abort();
596                self.broadcast(ServerMessage::ExecutionAborted { cell_id: Some(cell_id) });
597                return Err(ServerError::ExecutionTimeout);
598            }
599
600            self.execute_cell(cell_id).await?;
601        }
602        Ok(())
603    }
604
605    /// Mark a cell as dirty (needs re-execution).
606    pub fn mark_dirty(&mut self, cell_id: CellId) {
607        if let Some(state) = self.cell_states.get_mut(&cell_id) {
608            state.set_dirty(true);
609        }
610
611        // Also mark dependents as dirty
612        let dependents = self.graph.invalidated_cells(cell_id);
613        for dep_id in dependents {
614            if let Some(state) = self.cell_states.get_mut(&dep_id) {
615                state.set_dirty(true);
616            }
617        }
618    }
619
620    /// Check if execution is in progress.
621    pub fn is_executing(&self) -> bool {
622        self.executing
623    }
624
625    /// Abort the current execution immediately.
626    ///
627    /// Unlike cooperative cancellation, this **kills the worker process**,
628    /// providing true interruption even for long-running computations.
629    /// Returns `true` if there was an execution in progress to abort.
630    pub fn abort(&mut self) -> bool {
631        if self.executing {
632            // Kill the worker process - this is immediate
633            self.executor.abort();
634            self.broadcast(ServerMessage::ExecutionAborted { cell_id: None });
635            self.executing = false;
636            true
637        } else {
638            false
639        }
640    }
641
642    /// Set the execution timeout for execute_all.
643    ///
644    /// When set, execute_all will kill the worker process after this duration,
645    /// providing immediate interruption of even long-running cells.
646    pub fn set_execution_timeout(&mut self, timeout: Option<Duration>) {
647        self.execution_timeout = timeout;
648    }
649
650    /// Get the current execution timeout.
651    pub fn execution_timeout(&self) -> Option<Duration> {
652        self.execution_timeout
653    }
654
655    /// Set the interrupted flag.
656    ///
657    /// When true, execution errors will be reported as "interrupted"
658    /// rather than as failures, showing a friendly message to users.
659    pub fn set_interrupted(&mut self, value: bool) {
660        self.interrupted.store(value, Ordering::SeqCst);
661    }
662
663    /// Get a kill handle for the executor.
664    ///
665    /// This handle can be used from another task to kill the current execution
666    /// without needing to acquire the session lock.
667    pub fn get_kill_handle(&self) -> Option<ExecutorKillHandle> {
668        self.executor.get_kill_handle()
669    }
670
671    /// Restart the kernel: kill WorkerPool, spin up new one, clear memory state, preserve source.
672    ///
673    /// This clears all execution state including:
674    /// - Cell outputs and output history
675    /// - Widget values
676    /// - Cached serialized outputs
677    /// - Cell execution status (all cells reset to Idle)
678    ///
679    /// Source code and cell definitions are preserved.
680    pub fn restart_kernel(&mut self) -> ServerResult<()> {
681        // Abort any running execution first
682        if self.executing {
683            self.abort();
684        }
685
686        // Reload notebook from disk (picks up any file changes)
687        self.reload()?;
688
689        // Shutdown old executor and worker pool
690        self.executor.shutdown();
691
692        // Reconstruct state directory path
693        let dirs = NotebookDirs::from_notebook_path(&self.path)?;
694
695        // Create new ProcessExecutor with warm worker pool
696        self.executor = ProcessExecutor::new(&dirs.state_dir)?;
697
698        // Clear all execution state
699        self.cell_outputs.clear();
700        self.widget_values.clear();
701        self.widget_defs.clear();
702        self.cell_output_history.clear();
703        self.cell_history_index.clear();
704
705        // Reset all cell states to Idle and clear outputs
706        for state in self.cell_states.values_mut() {
707            state.set_status(CellStatus::Idle);
708            state.clear_output();
709            state.set_dirty(false);
710        }
711
712        // Broadcast kernel restarted message
713        self.broadcast(ServerMessage::KernelRestarted { error: None });
714
715        // Send updated state to all clients
716        let state_msg = self.get_state();
717        self.broadcast(state_msg);
718
719        Ok(())
720    }
721
722    /// Clear all cell outputs without restarting the kernel.
723    ///
724    /// This clears the display outputs but preserves:
725    /// - Worker pool and execution state
726    /// - Widget values
727    /// - Cell source code
728    ///
729    /// All cells are marked as needing re-execution (dirty).
730    pub fn clear_outputs(&mut self) {
731        // Clear outputs from cell states
732        for state in self.cell_states.values_mut() {
733            state.clear_output();
734            state.set_dirty(true); // Mark as needing re-execution
735        }
736
737        // Clear output history
738        self.cell_output_history.clear();
739        self.cell_history_index.clear();
740
741        // Broadcast outputs cleared message
742        self.broadcast(ServerMessage::OutputsCleared { error: None });
743
744        // Send updated state to all clients
745        let state_msg = self.get_state();
746        self.broadcast(state_msg);
747    }
748
749    /// Get IDs of all dirty cells in topological order.
750    pub fn get_dirty_cell_ids(&self) -> Vec<CellId> {
751        let order = match self.graph.topological_order() {
752            Ok(order) => order,
753            Err(e) => {
754                tracing::error!("Failed to compute order for dirty cells: {}", e);
755                Vec::new()
756            }
757        };
758        order
759            .into_iter()
760            .filter(|id| self.cell_states.get(id).is_some_and(|state| state.is_dirty()))
761            .collect()
762    }
763
764    /// Update a widget value for a cell.
765    ///
766    /// This stores the new value but does NOT trigger re-execution.
767    /// The user must explicitly run the cell to see the effect.
768    pub fn update_widget_value(&mut self, cell_id: CellId, widget_id: String, value: WidgetValue) {
769        self.widget_values
770            .entry(cell_id)
771            .or_default()
772            .insert(widget_id, value);
773    }
774
775    /// Get widget values for a cell.
776    pub fn get_widget_values(&self, cell_id: CellId) -> HashMap<String, WidgetValue> {
777        self.widget_values
778            .get(&cell_id)
779            .cloned()
780            .unwrap_or_default()
781    }
782
783    /// Get ALL widget values from all cells, flattened into a single map.
784    /// Widget IDs should be unique across the notebook.
785    pub fn get_all_widget_values(&self) -> HashMap<String, WidgetValue> {
786        let mut all_values = HashMap::new();
787        for cell_widgets in self.widget_values.values() {
788            for (widget_id, value) in cell_widgets {
789                all_values.insert(widget_id.clone(), value.clone());
790            }
791        }
792        all_values
793    }
794
795    /// Get widget definitions for a cell.
796    pub fn get_widget_defs(&self, cell_id: CellId) -> Vec<WidgetDef> {
797        self.widget_defs
798            .get(&cell_id)
799            .cloned()
800            .unwrap_or_default()
801    }
802
803    /// Store widget definitions from cell execution.
804    fn store_widget_defs(&mut self, cell_id: CellId, widgets: Vec<WidgetDef>) {
805        if widgets.is_empty() {
806            self.widget_defs.remove(&cell_id);
807        } else {
808            self.widget_defs.insert(cell_id, widgets);
809        }
810    }
811
812    /// Add an execution result to history.
813    fn add_to_history(&mut self, cell_id: CellId, serialized: Arc<BoxedOutput>, display: CellOutput) {
814        use std::time::{SystemTime, UNIX_EPOCH};
815
816        let timestamp = SystemTime::now()
817            .duration_since(UNIX_EPOCH)
818            .unwrap_or_default()
819            .as_millis() as u64;
820
821        let entry = OutputHistoryEntry {
822            serialized,
823            display,
824            timestamp,
825        };
826
827        let history = self.cell_output_history.entry(cell_id).or_default();
828        history.push(entry);
829
830        // Trim if too long
831        while history.len() > MAX_HISTORY_PER_CELL {
832            history.remove(0);
833        }
834
835        // Set current index to the latest entry
836        self.cell_history_index.insert(cell_id, history.len() - 1);
837    }
838
839    /// Select a history entry for a cell, making it the current output.
840    /// Returns the display output if successful.
841    pub fn select_history_entry(&mut self, cell_id: CellId, index: usize) -> Option<CellOutput> {
842        // Clone what we need before doing any mutations (to avoid borrow conflicts)
843        let (serialized, display) = {
844            let history = self.cell_output_history.get(&cell_id)?;
845            let entry = history.get(index)?;
846            (entry.serialized.clone(), entry.display.clone())
847        };
848
849        // Update the current output for dependent cells
850        self.cell_outputs.insert(cell_id, serialized.clone());
851        self.executor.state_mut().store_output(cell_id, (*serialized).clone());
852
853        // Update the cell state
854        if let Some(state) = self.cell_states.get_mut(&cell_id) {
855            state.set_output(Some(display.clone()));
856        }
857
858        // Update history index
859        self.cell_history_index.insert(cell_id, index);
860
861        // Mark dependent cells as dirty
862        self.mark_dependents_dirty(cell_id);
863
864        Some(display)
865    }
866
867    /// Mark all cells that depend on the given cell as dirty.
868    fn mark_dependents_dirty(&mut self, cell_id: CellId) {
869        // Use the graph's invalidated_cells which returns all dependents
870        let dependents = self.graph.invalidated_cells(cell_id);
871
872        // Skip the first one (the changed cell itself) and mark the rest as dirty
873        for dep_id in dependents.into_iter().skip(1) {
874            if let Some(state) = self.cell_states.get_mut(&dep_id) {
875                state.set_dirty(true);
876            }
877        }
878    }
879
880    /// Get history count for a cell.
881    pub fn get_history_count(&self, cell_id: CellId) -> usize {
882        self.cell_output_history.get(&cell_id).map(|h| h.len()).unwrap_or(0)
883    }
884
885    /// Get current history index for a cell.
886    pub fn get_history_index(&self, cell_id: CellId) -> usize {
887        self.cell_history_index.get(&cell_id).copied().unwrap_or(0)
888    }
889
890    /// Get reference to cell states.
891    pub fn cell_states(&self) -> &HashMap<CellId, CellState> {
892        &self.cell_states
893    }
894
895    /// Insert a new cell after the specified cell.
896    ///
897    /// Modifies the source file and triggers a reload.
898    /// Returns the name of the newly created cell.
899    pub fn insert_cell(&mut self, after_cell_id: Option<CellId>) -> ServerResult<String> {
900        // Convert CellId to cell name if provided
901        let after_name = after_cell_id.and_then(|id| {
902            self.cells.iter().find(|c| c.id == id).map(|c| c.name.clone())
903        });
904
905        // Load and edit the source file
906        let mut editor = SourceEditor::load(&self.path)?;
907        let new_name = editor.insert_cell(after_name.as_deref())?;
908        editor.save()?;
909
910        // Record for undo (with position for redo)
911        self.undo_manager.record(UndoableOperation::InsertCell {
912            cell_name: new_name.clone(),
913            after_cell_name: after_name,
914        });
915
916        // File watcher will trigger reload, but we can also reload now
917        // to ensure immediate consistency
918        self.reload()?;
919
920        Ok(new_name)
921    }
922
923    /// Delete a cell from the notebook.
924    ///
925    /// Modifies the .rs source file and reloads the notebook.
926    pub fn delete_cell(&mut self, cell_id: CellId) -> ServerResult<()> {
927        // Find the cell name
928        let cell_name = self.cells
929            .iter()
930            .find(|c| c.id == cell_id)
931            .map(|c| c.name.clone())
932            .ok_or_else(|| ServerError::CellNotFound(cell_id))?;
933
934        // Check if any other cells depend on this cell
935        let dependents: Vec<String> = self.cells
936            .iter()
937            .filter(|c| c.id != cell_id) // Don't check self
938            .filter(|c| {
939                c.dependencies
940                    .iter()
941                    .any(|dep| dep.param_name == cell_name)
942            })
943            .map(|c| c.name.clone())
944            .collect();
945
946        if !dependents.is_empty() {
947            return Err(ServerError::InvalidOperation(format!(
948                "Cannot delete cell '{}' because it is used by: {}",
949                cell_name,
950                dependents.join(", ")
951            )));
952        }
953
954        // Load and edit the source file
955        let mut editor = SourceEditor::load(&self.path)?;
956
957        // Capture source and position before deletion (for undo)
958        let source = editor.get_cell_source(&cell_name)?;
959        let after_cell_name = editor.get_previous_cell_name(&cell_name)?;
960
961        editor.delete_cell(&cell_name)?;
962        editor.save()?;
963
964        // Record for undo
965        self.undo_manager.record(UndoableOperation::DeleteCell {
966            cell_name: cell_name.clone(),
967            source,
968            after_cell_name,
969        });
970
971        // Reload to update in-memory state
972        self.reload()?;
973
974        Ok(())
975    }
976
977    /// Duplicate a cell in the notebook.
978    ///
979    /// Creates a copy of the cell with a unique name.
980    /// Returns the name of the new cell.
981    pub fn duplicate_cell(&mut self, cell_id: CellId) -> ServerResult<String> {
982        // Find the cell name
983        let cell_name = self.cells
984            .iter()
985            .find(|c| c.id == cell_id)
986            .map(|c| c.name.clone())
987            .ok_or_else(|| ServerError::CellNotFound(cell_id))?;
988
989        // Load and edit the source file
990        let mut editor = SourceEditor::load(&self.path)?;
991        let new_name = editor.duplicate_cell(&cell_name)?;
992        editor.save()?;
993
994        // Record for undo
995        self.undo_manager.record(UndoableOperation::DuplicateCell {
996            original_cell_name: cell_name,
997            new_cell_name: new_name.clone(),
998        });
999
1000        // Reload to update in-memory state
1001        self.reload()?;
1002
1003        Ok(new_name)
1004    }
1005
1006    /// Move a cell up or down in the notebook.
1007    ///
1008    /// Modifies the .rs source file and reloads the notebook.
1009    pub fn move_cell(&mut self, cell_id: CellId, direction: MoveDirection) -> ServerResult<()> {
1010        // Find the cell name
1011        let cell_name = self.cells
1012            .iter()
1013            .find(|c| c.id == cell_id)
1014            .map(|c| c.name.clone())
1015            .ok_or_else(|| ServerError::CellNotFound(cell_id))?;
1016
1017        // Load and edit the source file
1018        let mut editor = SourceEditor::load(&self.path)?;
1019        editor.move_cell(&cell_name, direction)?;
1020        editor.save()?;
1021
1022        // Record for undo
1023        self.undo_manager.record(UndoableOperation::MoveCell {
1024            cell_name,
1025            direction,
1026        });
1027
1028        // Reload to update in-memory state
1029        self.reload()?;
1030
1031        Ok(())
1032    }
1033
1034    /// Rename a cell's display name.
1035    ///
1036    /// Updates the cell's doc comment with the new display name and reloads the notebook.
1037    pub fn rename_cell(&mut self, cell_id: CellId, new_display_name: String) -> ServerResult<()> {
1038        // Find the cell name and current display name
1039        let (cell_name, old_display_name) = self.cells
1040            .iter()
1041            .find(|c| c.id == cell_id)
1042            .map(|c| (c.name.clone(), c.display_name.clone()))
1043            .ok_or_else(|| ServerError::CellNotFound(cell_id))?;
1044
1045        // Load and edit the source file
1046        let mut editor = SourceEditor::load(&self.path)?;
1047        editor.rename_cell(&cell_name, &new_display_name)?;
1048        editor.save()?;
1049
1050        // Record for undo
1051        self.undo_manager.record(UndoableOperation::RenameCell {
1052            cell_name,
1053            old_display_name,
1054            new_display_name,
1055        });
1056
1057        // Reload to update in-memory state
1058        self.reload()?;
1059
1060        Ok(())
1061    }
1062
1063    /// Insert a new markdown cell.
1064    ///
1065    /// Modifies the .rs source file and reloads the notebook.
1066    pub fn insert_markdown_cell(&mut self, content: String, after_cell_id: Option<CellId>) -> ServerResult<()> {
1067        // Convert cell ID to line number if provided
1068        let after_line = after_cell_id.and_then(|id| {
1069            // Try to find in code cells
1070            self.cells.iter().find(|c| c.id == id)
1071                .map(|c| c.span.end_line)
1072                .or_else(|| {
1073                    // Try to find in markdown cells
1074                    self.markdown_cells.iter().find(|m| m.id == id)
1075                        .map(|m| m.span.end_line)
1076                })
1077        });
1078
1079        // Load and edit the source file
1080        let mut editor = SourceEditor::load(&self.path)?;
1081        editor.insert_markdown_cell(&content, after_line)?;
1082
1083        // Get the line range of the newly inserted cell (approximate)
1084        let start_line = after_line.map(|l| l + 1).unwrap_or(0);
1085        let line_count = content.lines().count();
1086        let end_line = start_line + line_count;
1087
1088        editor.save()?;
1089
1090        // Record for undo
1091        self.undo_manager.record(UndoableOperation::InsertMarkdownCell {
1092            start_line,
1093            end_line,
1094            content: content.clone(),
1095        });
1096
1097        // Reload to update in-memory state
1098        self.reload()?;
1099
1100        Ok(())
1101    }
1102
1103    /// Edit a markdown cell's content.
1104    ///
1105    /// Modifies the .rs source file and reloads the notebook.
1106    pub fn edit_markdown_cell(&mut self, cell_id: CellId, new_content: String) -> ServerResult<()> {
1107        // Find the markdown cell
1108        let md_cell = self.markdown_cells
1109            .iter()
1110            .find(|m| m.id == cell_id)
1111            .ok_or_else(|| ServerError::CellNotFound(cell_id))?;
1112
1113        let start_line = md_cell.span.start_line;
1114        let end_line = md_cell.span.end_line;
1115        let old_content = md_cell.content.clone();
1116        let is_module_doc = md_cell.is_module_doc;
1117
1118        // Load and edit the source file
1119        let mut editor = SourceEditor::load(&self.path)?;
1120        editor.edit_markdown_cell(start_line, end_line, &new_content, is_module_doc)?;
1121        editor.save()?;
1122
1123        // Record for undo
1124        self.undo_manager.record(UndoableOperation::EditMarkdownCell {
1125            start_line,
1126            end_line,
1127            old_content,
1128            new_content,
1129            is_module_doc,
1130        });
1131
1132        // Reload to update in-memory state
1133        self.reload()?;
1134
1135        Ok(())
1136    }
1137
1138    /// Delete a markdown cell.
1139    ///
1140    /// Modifies the .rs source file and reloads the notebook.
1141    pub fn delete_markdown_cell(&mut self, cell_id: CellId) -> ServerResult<()> {
1142        // Find the markdown cell
1143        let md_cell = self.markdown_cells
1144            .iter()
1145            .find(|m| m.id == cell_id)
1146            .ok_or_else(|| ServerError::CellNotFound(cell_id))?;
1147
1148        let start_line = md_cell.span.start_line;
1149        let end_line = md_cell.span.end_line;
1150        let content = md_cell.content.clone();
1151
1152        // Load and edit the source file
1153        let mut editor = SourceEditor::load(&self.path)?;
1154        editor.delete_markdown_cell(start_line, end_line)?;
1155        editor.save()?;
1156
1157        // Record for undo
1158        self.undo_manager.record(UndoableOperation::DeleteMarkdownCell {
1159            start_line,
1160            content,
1161        });
1162
1163        // Reload to update in-memory state
1164        self.reload()?;
1165
1166        Ok(())
1167    }
1168
1169    /// Move a markdown cell up or down.
1170    ///
1171    /// Modifies the .rs source file and reloads the notebook.
1172    pub fn move_markdown_cell(&mut self, cell_id: CellId, direction: MoveDirection) -> ServerResult<()> {
1173        // Find the markdown cell
1174        let md_cell = self.markdown_cells
1175            .iter()
1176            .find(|m| m.id == cell_id)
1177            .ok_or_else(|| ServerError::CellNotFound(cell_id))?;
1178
1179        let start_line = md_cell.span.start_line;
1180        let end_line = md_cell.span.end_line;
1181
1182        // Load and edit the source file
1183        let mut editor = SourceEditor::load(&self.path)?;
1184        editor.move_markdown_cell(start_line, end_line, direction)?;
1185        editor.save()?;
1186
1187        // Record for undo
1188        self.undo_manager.record(UndoableOperation::MoveMarkdownCell {
1189            start_line,
1190            end_line,
1191            direction,
1192        });
1193
1194        // Reload to update in-memory state
1195        self.reload()?;
1196
1197        Ok(())
1198    }
1199
1200    /// Undo the last cell management operation.
1201    ///
1202    /// Returns a description of what was undone, or an error if undo failed.
1203    pub fn undo(&mut self) -> ServerResult<String> {
1204        let operation = self.undo_manager.pop_undo()
1205            .ok_or_else(|| ServerError::InvalidOperation("Nothing to undo".to_string()))?;
1206
1207        let description = operation.undo_description();
1208
1209        // Execute the reverse operation
1210        let mut editor = SourceEditor::load(&self.path)?;
1211
1212        match &operation {
1213            UndoableOperation::InsertCell { cell_name, .. } => {
1214                // Undo insert = delete
1215                editor.delete_cell(cell_name)?;
1216            }
1217            UndoableOperation::DeleteCell { source, after_cell_name, .. } => {
1218                // Undo delete = restore
1219                editor.restore_cell(source, after_cell_name.as_deref())?;
1220            }
1221            UndoableOperation::DuplicateCell { new_cell_name, .. } => {
1222                // Undo duplicate = delete the new cell
1223                editor.delete_cell(new_cell_name)?;
1224            }
1225            UndoableOperation::MoveCell { cell_name, direction } => {
1226                // Undo move = move in opposite direction
1227                let reverse_direction = match direction {
1228                    MoveDirection::Up => MoveDirection::Down,
1229                    MoveDirection::Down => MoveDirection::Up,
1230                };
1231                editor.move_cell(cell_name, reverse_direction)?;
1232            }
1233            UndoableOperation::RenameCell { cell_name, old_display_name, .. } => {
1234                // Undo rename = restore old display name
1235                editor.rename_cell(cell_name, old_display_name)?;
1236            }
1237            UndoableOperation::InsertMarkdownCell { start_line, end_line, .. } => {
1238                // Undo insert markdown = delete it
1239                editor.delete_markdown_cell(*start_line, *end_line)?;
1240            }
1241            UndoableOperation::EditMarkdownCell { start_line, end_line, old_content, is_module_doc, .. } => {
1242                // Undo edit markdown = restore old content
1243                editor.edit_markdown_cell(*start_line, *end_line, old_content, *is_module_doc)?;
1244            }
1245            UndoableOperation::DeleteMarkdownCell { start_line, content } => {
1246                // Undo delete markdown = restore it
1247                let after_line = if *start_line > 0 { Some(start_line - 1) } else { None };
1248                editor.insert_markdown_cell(content, after_line)?;
1249            }
1250            UndoableOperation::MoveMarkdownCell { start_line, end_line, direction } => {
1251                // Undo move markdown = move in opposite direction
1252                let reverse_direction = match direction {
1253                    MoveDirection::Up => MoveDirection::Down,
1254                    MoveDirection::Down => MoveDirection::Up,
1255                };
1256                editor.move_markdown_cell(*start_line, *end_line, reverse_direction)?;
1257            }
1258        }
1259
1260        editor.save()?;
1261
1262        // Record for redo
1263        self.undo_manager.record_redo(operation);
1264
1265        // Reload to update in-memory state
1266        self.reload()?;
1267
1268        Ok(description)
1269    }
1270
1271    /// Redo the last undone operation.
1272    ///
1273    /// Returns a description of what was redone, or an error if redo failed.
1274    pub fn redo(&mut self) -> ServerResult<String> {
1275        let operation = self.undo_manager.pop_redo()
1276            .ok_or_else(|| ServerError::InvalidOperation("Nothing to redo".to_string()))?;
1277
1278        let description = operation.description();
1279
1280        // Execute the original operation
1281        let mut editor = SourceEditor::load(&self.path)?;
1282
1283        match &operation {
1284            UndoableOperation::InsertCell { after_cell_name, .. } => {
1285                // Re-insert at the original position
1286                let _ = editor.insert_cell(after_cell_name.as_deref())?;
1287            }
1288            UndoableOperation::DeleteCell { cell_name, .. } => {
1289                // Redo delete = delete again
1290                editor.delete_cell(cell_name)?;
1291            }
1292            UndoableOperation::DuplicateCell { original_cell_name, .. } => {
1293                // Redo duplicate = duplicate again (new name will be generated)
1294                let _ = editor.duplicate_cell(original_cell_name)?;
1295            }
1296            UndoableOperation::MoveCell { cell_name, direction } => {
1297                // Redo move = move in same direction
1298                editor.move_cell(cell_name, *direction)?;
1299            }
1300            UndoableOperation::RenameCell { cell_name, new_display_name, .. } => {
1301                // Redo rename = apply new display name again
1302                editor.rename_cell(cell_name, new_display_name)?;
1303            }
1304            UndoableOperation::InsertMarkdownCell { start_line, content, .. } => {
1305                // Redo insert markdown = insert again at original position
1306                let after_line = if *start_line > 0 { Some(start_line - 1) } else { None };
1307                editor.insert_markdown_cell(content, after_line)?;
1308            }
1309            UndoableOperation::EditMarkdownCell { start_line, end_line, new_content, is_module_doc, .. } => {
1310                // Redo edit markdown = apply new content again
1311                editor.edit_markdown_cell(*start_line, *end_line, new_content, *is_module_doc)?;
1312            }
1313            UndoableOperation::DeleteMarkdownCell { start_line, content } => {
1314                // Redo delete markdown = delete again
1315                // We need to find the end line by counting content lines
1316                let line_count = content.lines().count();
1317                let end_line = start_line + line_count;
1318                editor.delete_markdown_cell(*start_line, end_line)?;
1319            }
1320            UndoableOperation::MoveMarkdownCell { start_line, end_line, direction } => {
1321                // Redo move markdown = move in same direction
1322                editor.move_markdown_cell(*start_line, *end_line, *direction)?;
1323            }
1324        }
1325
1326        editor.save()?;
1327
1328        // Record for undo (so we can undo the redo)
1329        self.undo_manager.record(operation);
1330
1331        // Reload to update in-memory state
1332        self.reload()?;
1333
1334        Ok(description)
1335    }
1336
1337    /// Get the current undo/redo state.
1338    pub fn get_undo_redo_state(&self) -> ServerMessage {
1339        ServerMessage::UndoRedoState {
1340            can_undo: self.undo_manager.can_undo(),
1341            can_redo: self.undo_manager.can_redo(),
1342            undo_description: self.undo_manager.undo_description(),
1343            redo_description: self.undo_manager.redo_description(),
1344        }
1345    }
1346
1347    /// Clear undo/redo history.
1348    ///
1349    /// Called when the file is externally modified.
1350    pub fn clear_undo_history(&mut self) {
1351        self.undo_manager.clear();
1352    }
1353}
1354
1355#[cfg(test)]
1356mod tests {
1357    use super::*;
1358
1359    #[test]
1360    fn test_session_creation() {
1361        // This would require a real notebook file, so we just test the types compile
1362        let (tx, _rx) = broadcast::channel::<ServerMessage>(16);
1363        drop(tx);
1364    }
1365}