Skip to main content

fresh/server/
editor_server.rs

1//! Editor integration with the session server
2//!
3//! This module bridges the Editor with the server infrastructure:
4//! - Creates Editor with CaptureBackend for rendering
5//! - Processes input events from clients
6//! - Broadcasts rendered output to all clients
7
8use std::io;
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::mpsc;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use crossterm::event::{Event, KeyEventKind};
16use ratatui::Terminal;
17
18use crate::app::Editor;
19use crate::config::Config;
20use crate::config_io::DirectoryContext;
21// Filesystem is now owned by `self.current_authority`; the server no
22// longer constructs a `StdFileSystem` directly.
23use crate::server::capture_backend::{
24    terminal_setup_sequences, terminal_teardown_sequences, CaptureBackend,
25};
26use crate::server::input_parser::InputParser;
27use crate::server::ipc::{ServerConnection, ServerListener, SocketPaths, StreamWrapper};
28use crate::server::protocol::{
29    ClientControl, ServerControl, ServerHello, TermSize, VersionMismatch, PROTOCOL_VERSION,
30};
31use crate::view::color_support::ColorCapability;
32
33/// Configuration for the editor server
34pub struct EditorServerConfig {
35    /// Working directory for this session
36    pub working_dir: PathBuf,
37    /// Optional session name
38    pub session_name: Option<String>,
39    /// Idle timeout before auto-shutdown
40    pub idle_timeout: Option<Duration>,
41    /// Editor configuration
42    pub editor_config: Config,
43    /// Directory context for config/data paths
44    pub dir_context: DirectoryContext,
45    /// Whether plugins are enabled
46    pub plugins_enabled: bool,
47    /// Whether to auto-load ~/.config/fresh/init.ts (requires `plugins_enabled`).
48    pub init_enabled: bool,
49    /// Authority to install at boot.  `None` means `Authority::local()`,
50    /// which is the standard session-mode default (principle 6 of
51    /// `AUTHORITY_DESIGN.md`).  The CLI `ssh://` / `user@host:path`
52    /// forms construct an `Authority::ssh(...)` and pass it here so
53    /// the daemon boots already attached to the remote host.  Plugins
54    /// can still replace this post-boot via `setAuthority`.
55    pub startup_authority: Option<crate::services::authority::Authority>,
56    /// Workspace Trust handle, created by the caller (`main.rs`) before the
57    /// startup authority so the same `Arc` backs both the authority's spawners
58    /// and the server. Mandatory: every spawner holds it, so there's no
59    /// ungated path. Tests pass `Arc::new(WorkspaceTrust::permissive())`.
60    pub workspace_trust: Arc<crate::services::workspace_trust::WorkspaceTrust>,
61    /// Live environment provider, created by the caller (`main.rs`) before the
62    /// startup authority so the same `Arc` backs the authority's spawners and
63    /// the server. Tests pass `Arc::new(EnvProvider::inactive())`.
64    pub env_provider: Arc<crate::services::env_provider::EnvProvider>,
65    /// Opaque handle kept alive for the server's lifetime alongside
66    /// `startup_authority`.  SSH authorities back this with the Tokio
67    /// runtime, the `SshConnection`, and the reconnect task — dropping
68    /// any of those tears down the remote session — so the caller
69    /// bundles them here and the server just holds on until shutdown.
70    /// Local authorities leave this `None`.
71    pub session_keepalive: Option<Box<dyn std::any::Any + Send>>,
72}
73
74/// Editor server that manages editor state and client connections
75pub struct EditorServer {
76    config: EditorServerConfig,
77    listener: ServerListener,
78    clients: Vec<ConnectedClient>,
79    editor: Option<Editor>,
80    terminal: Option<Terminal<CaptureBackend>>,
81    last_client_activity: Instant,
82    shutdown: Arc<AtomicBool>,
83    /// Effective terminal size (from the primary/first client)
84    term_size: TermSize,
85    /// Index of the client that most recently provided input (for per-client detach)
86    last_input_client: Option<usize>,
87    /// Next wait ID for --wait tracking
88    next_wait_id: u64,
89    /// Maps wait_id → client_id for clients waiting on file events
90    waiting_clients: std::collections::HashMap<u64, u64>,
91    /// Current authority. Carried across editor rebuilds so plugin-
92    /// installed authorities (e.g. a devcontainer attach) survive the
93    /// restart-based transition: the old editor is dropped, a new one
94    /// is built with this authority in effect, and clients stay
95    /// connected the whole time. Starts as
96    /// `config.startup_authority.unwrap_or_else(Authority::local)`.
97    current_authority: crate::services::authority::Authority,
98    /// Workspace Trust state, gating process execution. Held here (not on
99    /// the `Editor`) so the chosen level survives editor rebuilds. Shared
100    /// by `Arc` into the authority's guarding spawners on every build via
101    /// `Authority::with_trust`; its root is updated when the working
102    /// directory changes.
103    workspace_trust: Arc<crate::services::workspace_trust::WorkspaceTrust>,
104    /// Live env provider, shared into the authority's spawners across rebuilds.
105    env_provider: Arc<crate::services::env_provider::EnvProvider>,
106    /// Keepalive bundle paired with the startup authority — held for
107    /// the server's lifetime so SSH runtimes, reconnect tasks, and
108    /// similar resources outlive the editor rebuilds that happen on
109    /// authority transitions.  Never inspected; dropped only when the
110    /// server is dropped.
111    #[allow(dead_code)]
112    session_keepalive: Option<Box<dyn std::any::Any + Send>>,
113}
114
115/// Buffered writer for sending data to a client without blocking the server loop.
116///
117/// Spawns a background thread that receives data via a bounded channel and
118/// writes it to the client's data pipe. If the channel fills up (client is
119/// too slow to read), frames are dropped. If the pipe breaks, the `pipe_broken`
120/// flag is set so the main loop can disconnect the client.
121struct ClientDataWriter {
122    sender: mpsc::SyncSender<Vec<u8>>,
123    pipe_broken: Arc<AtomicBool>,
124}
125
126impl ClientDataWriter {
127    /// Create a new writer that spawns a background thread to write to the data stream.
128    fn new(data: StreamWrapper, client_id: u64) -> Self {
129        // 16 frames of buffer (~270ms at 60fps before dropping frames)
130        let (tx, rx) = mpsc::sync_channel::<Vec<u8>>(16);
131        let pipe_broken = Arc::new(AtomicBool::new(false));
132        let pipe_broken_clone = pipe_broken.clone();
133
134        std::thread::Builder::new()
135            .name(format!("client-{}-writer", client_id))
136            .spawn(move || {
137                while let Ok(buf) = rx.recv() {
138                    if let Err(e) = data.write_all(&buf) {
139                        tracing::debug!("Client {} writer pipe error: {}", client_id, e);
140                        pipe_broken_clone.store(true, Ordering::Relaxed);
141                        break;
142                    }
143                    if let Err(e) = data.flush() {
144                        tracing::debug!("Client {} writer flush error: {}", client_id, e);
145                        pipe_broken_clone.store(true, Ordering::Relaxed);
146                        break;
147                    }
148                }
149                tracing::debug!("Client {} writer thread exiting", client_id);
150            })
151            .expect("Failed to spawn client writer thread");
152
153        Self {
154            sender: tx,
155            pipe_broken,
156        }
157    }
158
159    /// Try to send data without blocking. Returns false if the channel is full
160    /// (client too slow) or the writer thread has exited.
161    fn try_write(&self, data: &[u8]) -> bool {
162        self.sender.try_send(data.to_vec()).is_ok()
163    }
164
165    /// Check if the writer thread detected a broken pipe.
166    fn is_broken(&self) -> bool {
167        self.pipe_broken.load(Ordering::Relaxed)
168    }
169}
170
171/// A connected client with its own input parser
172struct ConnectedClient {
173    conn: ServerConnection,
174    /// Background writer for non-blocking data output
175    data_writer: ClientDataWriter,
176    term_size: TermSize,
177    env: std::collections::HashMap<String, Option<String>>,
178    id: u64,
179    input_parser: InputParser,
180    /// Whether this client needs a full screen render on next frame
181    needs_full_render: bool,
182    /// If set, this client is waiting for a --wait completion signal
183    wait_id: Option<u64>,
184}
185
186impl EditorServer {
187    /// Create a new editor server
188    pub fn new(mut config: EditorServerConfig) -> io::Result<Self> {
189        let socket_paths = if let Some(ref name) = config.session_name {
190            SocketPaths::for_session_name(name)?
191        } else {
192            SocketPaths::for_working_dir(&config.working_dir)?
193        };
194
195        let listener = ServerListener::bind(socket_paths)?;
196
197        // Write PID file so clients can detect stale sessions
198        let pid = std::process::id();
199        if let Err(e) = listener.paths().write_pid(pid) {
200            tracing::warn!("Failed to write PID file: {}", e);
201        }
202
203        // Workspace Trust is born in `main.rs` (already rooted at the working
204        // dir, store attached, persisted level loaded) and threaded in via the
205        // config — the same `Arc` the startup authority's spawners hold. The
206        // server just keeps a handle so rebuilds can re-anchor it on a
207        // working-dir change and the prompt can read it.
208        let workspace_trust = Arc::clone(&config.workspace_trust);
209        let env_provider = Arc::clone(&config.env_provider);
210
211        // Move the startup authority + its keepalive off the config —
212        // they are consumed once and belong to the server from here on.
213        // A missing startup authority defaults to a local one carrying the
214        // same trust + env handles.
215        let current_authority = config.startup_authority.take().unwrap_or_else(|| {
216            crate::services::authority::Authority::local(
217                Arc::clone(&workspace_trust),
218                Arc::clone(&env_provider),
219            )
220        });
221        let session_keepalive = config.session_keepalive.take();
222
223        Ok(Self {
224            config,
225            listener,
226            clients: Vec::new(),
227            editor: None,
228            terminal: None,
229            last_client_activity: Instant::now(),
230            shutdown: Arc::new(AtomicBool::new(false)),
231            term_size: TermSize::new(80, 24), // Default until first client connects
232            last_input_client: None,
233            next_wait_id: 1,
234            waiting_clients: std::collections::HashMap::new(),
235            current_authority,
236            workspace_trust,
237            env_provider,
238            session_keepalive,
239        })
240    }
241
242    /// Get a handle to request shutdown
243    pub fn shutdown_handle(&self) -> Arc<AtomicBool> {
244        self.shutdown.clone()
245    }
246
247    /// Get the socket paths
248    pub fn socket_paths(&self) -> &SocketPaths {
249        self.listener.paths()
250    }
251
252    /// Access the editor instance (available after initialize_editor).
253    pub fn editor(&self) -> Option<&Editor> {
254        self.editor.as_ref()
255    }
256
257    /// Mutable access to the editor instance (available after initialize_editor).
258    pub fn editor_mut(&mut self) -> Option<&mut Editor> {
259        self.editor.as_mut()
260    }
261
262    /// Run the editor server main loop
263    pub fn run(&mut self) -> io::Result<()> {
264        tracing::info!("Editor server starting for {:?}", self.config.working_dir);
265
266        let mut next_client_id = 1u64;
267        let mut needs_render = true;
268        let mut last_render = Instant::now();
269        const FRAME_DURATION: Duration = Duration::from_millis(16); // 60fps
270
271        loop {
272            // Check for shutdown
273            if self.shutdown.load(Ordering::SeqCst) {
274                tracing::info!("Shutdown requested");
275                break;
276            }
277
278            // Check idle timeout
279            if let Some(timeout) = self.config.idle_timeout {
280                if self.clients.is_empty() && self.last_client_activity.elapsed() > timeout {
281                    tracing::info!("Idle timeout reached, shutting down");
282                    break;
283                }
284            }
285
286            // Accept new connections
287            tracing::debug!("[server] main loop: calling accept()");
288            match self.listener.accept() {
289                Ok(Some(conn)) => {
290                    // Get current cursor style from editor if it exists, otherwise from config
291                    let cursor_style = self
292                        .editor
293                        .as_ref()
294                        .map(|e| e.config().editor.cursor_style)
295                        .unwrap_or(self.config.editor_config.editor.cursor_style);
296                    match self.handle_new_connection(conn, next_client_id, cursor_style) {
297                        Ok(client) => {
298                            tracing::info!("Client {} connected", client.id);
299
300                            // Initialize editor on first-ever client, or update size if reconnecting
301                            if self.editor.is_none() {
302                                // First time - initialize editor
303                                self.term_size = client.term_size;
304                                self.initialize_editor()?;
305                            } else if self.clients.is_empty() {
306                                // Reconnecting after all clients disconnected - update terminal size
307                                if self.term_size != client.term_size {
308                                    self.term_size = client.term_size;
309                                    self.update_terminal_size()?;
310                                }
311                            }
312                            // Note: full redraw is handled via client.needs_full_render flag
313
314                            self.clients.push(client);
315                            self.last_client_activity = Instant::now();
316                            next_client_id += 1;
317                            needs_render = true;
318                        }
319                        Err(e) => {
320                            tracing::warn!("Failed to complete handshake: {}", e);
321                        }
322                    }
323                }
324                Ok(None) => {}
325                Err(e) => {
326                    tracing::error!("Accept error: {}", e);
327                }
328            }
329
330            // Process client messages and get input events
331            tracing::debug!("[server] main loop: calling process_clients");
332            let (input_events, resize_occurred, input_source) = self.process_clients()?;
333            if let Some(idx) = input_source {
334                self.last_input_client = Some(idx);
335            }
336            if !input_events.is_empty() {
337                tracing::debug!(
338                    "[server] process_clients returned {} events",
339                    input_events.len()
340                );
341            }
342
343            // Check if editor should quit. `should_quit` is set both
344            // by a genuine user quit and by `request_restart`
345            // (triggered by `change_working_dir` and by
346            // `install_authority`).  Distinguish the two by peeking at
347            // the editor's pending-restart fields: if either carries a
348            // value, rebuild the editor in place and keep clients
349            // attached; otherwise this is a real shutdown.
350            if let Some(ref mut editor) = self.editor {
351                if editor.should_quit() {
352                    let pending_authority = editor.take_pending_authority();
353                    let pending_keepalive = editor.take_pending_keepalive();
354                    let restart_dir = editor.take_restart_dir();
355                    if pending_authority.is_some() || restart_dir.is_some() {
356                        tracing::info!(
357                            "Session rebuild requested (authority={}, dir={})",
358                            pending_authority.is_some(),
359                            restart_dir.is_some()
360                        );
361                        if let Err(e) =
362                            self.rebuild_editor(restart_dir, pending_authority, pending_keepalive)
363                        {
364                            tracing::error!("Session rebuild failed, shutting down: {}", e);
365                            self.shutdown.store(true, Ordering::SeqCst);
366                            continue;
367                        }
368                        needs_render = true;
369                        continue;
370                    }
371                    tracing::info!("Editor requested quit");
372                    self.shutdown.store(true, Ordering::SeqCst);
373                    continue;
374                }
375            }
376
377            // Check if client should detach (keep server running)
378            let detach_requested = self
379                .editor
380                .as_ref()
381                .map(|e| e.should_detach())
382                .unwrap_or(false);
383            if detach_requested {
384                // Detach only the client that triggered it (via last input)
385                if let Some(idx) = self.last_input_client.take() {
386                    if idx < self.clients.len() {
387                        tracing::info!("Client {} requested detach", self.clients[idx].id);
388                        let client = self.clients.remove(idx);
389                        let teardown = terminal_teardown_sequences();
390                        // Best-effort: client may already be disconnected
391                        #[allow(clippy::let_underscore_must_use)]
392                        let _ = client.data_writer.try_write(&teardown);
393                        let quit_msg = serde_json::to_string(&ServerControl::Quit {
394                            reason: "Detached".to_string(),
395                        })
396                        .unwrap_or_default();
397                        // Best-effort: client may already be disconnected
398                        #[allow(clippy::let_underscore_must_use)]
399                        let _ = client.conn.write_control(&quit_msg);
400                    }
401                } else {
402                    // Fallback: if we can't determine which client, detach all
403                    tracing::info!("Detach requested but no input source, detaching all");
404                    self.disconnect_all_clients("Detached")?;
405                }
406                // Reset the detach flag
407                if let Some(ref mut editor) = self.editor {
408                    editor.clear_detach();
409                }
410                continue;
411            }
412
413            // Check if the client should suspend itself (SIGTSTP). The server
414            // keeps running — only the client drops back to the shell. On
415            // resume, the client sends a Resize to nudge a full redraw; we
416            // pre-mark `needs_full_render` so the next rendered frame after
417            // resume re-emits setup sequences and a complete paint.
418            let suspend_requested = self
419                .editor
420                .as_mut()
421                .map(|e| e.take_suspend_request())
422                .unwrap_or(false);
423            if suspend_requested {
424                if let Some(idx) = self.last_input_client {
425                    if idx < self.clients.len() {
426                        let client_id = self.clients[idx].id;
427                        tracing::info!("Client {} requested suspend", client_id);
428                        let suspend_msg = serde_json::to_string(&ServerControl::SuspendClient)
429                            .unwrap_or_default();
430                        // Best-effort: client may already be disconnected
431                        #[allow(clippy::let_underscore_must_use)]
432                        let _ = self.clients[idx].conn.write_control(&suspend_msg);
433                        // Mark so the next render re-emits setup sequences and a
434                        // full paint once the client resumes and reconnects its
435                        // terminal.
436                        self.clients[idx].needs_full_render = true;
437                    }
438                } else {
439                    tracing::warn!("Suspend requested but no input source; ignoring");
440                }
441                continue;
442            }
443
444            // Handle resize
445            if resize_occurred {
446                self.update_terminal_size()?;
447                needs_render = true;
448            }
449
450            // Process input events
451            if !input_events.is_empty() {
452                self.last_client_activity = Instant::now();
453                for event in input_events {
454                    if self.handle_event(event)? {
455                        needs_render = true;
456                    }
457                }
458            }
459
460            // Process async messages from editor
461            if let Some(ref mut editor) = self.editor {
462                if editor.process_async_messages() {
463                    needs_render = true;
464                }
465                if editor.process_pending_file_opens() {
466                    needs_render = true;
467                }
468
469                // Process completed --wait operations
470                for wait_id in editor.take_completed_waits() {
471                    if let Some(client_id) = self.waiting_clients.remove(&wait_id) {
472                        // Find the client and send WaitComplete
473                        if let Some(client) = self.clients.iter_mut().find(|c| c.id == client_id) {
474                            let msg = serde_json::to_string(&ServerControl::WaitComplete)
475                                .unwrap_or_default();
476                            #[allow(clippy::let_underscore_must_use)]
477                            let _ = client.conn.write_control(&msg);
478                            client.wait_id = None;
479                        }
480                    }
481                }
482
483                // Send pending clipboard data to clients via control message
484                if let Some(cb) = editor.take_pending_clipboard() {
485                    let msg = serde_json::to_string(&ServerControl::SetClipboard {
486                        text: cb.text,
487                        use_osc52: cb.use_osc52,
488                        use_system_clipboard: cb.use_system_clipboard,
489                    })
490                    .unwrap_or_default();
491                    for client in &mut self.clients {
492                        #[allow(clippy::let_underscore_must_use)]
493                        let _ = client.conn.write_control(&msg);
494                    }
495                }
496
497                if editor.check_mouse_hover_timer() {
498                    needs_render = true;
499                }
500
501                // Active animations force a render every FRAME_DURATION so
502                // the slide settles on its own. Without this the loop only
503                // ticks when an external event (input, resize, async
504                // message) flips `needs_render`, so under tmux a buffer
505                // switch paints its first frame and then freezes mid-slide
506                // until the user nudges the terminal. Mirrors the direct
507                // (non-server) loop in `main.rs`.
508                if editor.active_window().animations.is_active() {
509                    needs_render = true;
510                }
511            }
512
513            // Render and broadcast if needed
514            if needs_render && last_render.elapsed() >= FRAME_DURATION {
515                self.render_and_broadcast()?;
516                last_render = Instant::now();
517                needs_render = false;
518            }
519
520            // Brief sleep to avoid busy-waiting
521            std::thread::sleep(Duration::from_millis(5));
522        }
523
524        // Perform the same shutdown sequence as the normal (non-session) exit path
525        // in run_event_loop_common: auto-save, end recovery session, save workspace.
526        if let Some(ref mut editor) = self.editor {
527            // Auto-save file-backed buffers to disk before exiting
528            if editor.config().editor.auto_save_enabled {
529                match editor.save_all_on_exit() {
530                    Ok(count) if count > 0 => {
531                        tracing::info!("Auto-saved {} buffer(s) on exit", count);
532                    }
533                    Ok(_) => {}
534                    Err(e) => {
535                        tracing::warn!("Failed to auto-save on exit: {}", e);
536                    }
537                }
538            }
539
540            // End recovery session first (flushes dirty buffers + assigns recovery IDs),
541            // then save workspace (captures those IDs for next session restore).
542            if let Err(e) = editor.end_recovery_session() {
543                tracing::warn!("Failed to end recovery session: {}", e);
544            }
545            if let Err(e) = editor.save_all_windows_workspaces() {
546                tracing::warn!("Failed to save workspaces: {}", e);
547            } else {
548                tracing::debug!("Workspaces saved successfully");
549            }
550        }
551
552        // Clean shutdown
553        self.disconnect_all_clients("Server shutting down")?;
554
555        Ok(())
556    }
557
558    /// Build a fresh `Editor` instance using the current configuration
559    /// and stored authority.  Shared between first-boot initialization
560    /// and post-restart rebuild.
561    fn build_editor_instance(&mut self) -> io::Result<(Editor, Terminal<CaptureBackend>)> {
562        let backend = CaptureBackend::new(self.term_size.cols, self.term_size.rows);
563        let terminal = Terminal::new(backend)
564            .map_err(|e| io::Error::other(format!("Failed to create terminal: {}", e)))?;
565
566        // The editor is constructed with the real authority it runs under, so
567        // plugins and init.ts load against the correct backend from the first
568        // tick — no post-construction swap. The authority already carries the
569        // shared trust handle (every spawner holds it).
570        let color_capability = ColorCapability::TrueColor; // Assume truecolor for now
571
572        let mut editor = Editor::with_working_dir_opts(
573            self.config.editor_config.clone(),
574            self.term_size.cols,
575            self.term_size.rows,
576            Some(self.config.working_dir.clone()),
577            self.config.dir_context.clone(),
578            self.config.plugins_enabled,
579            color_capability,
580            // `Authority` is single-owner (non-`Clone`): move the current one
581            // into the rebuilt editor, leaving a local placeholder behind. A
582            // real authority transition overwrites it just below; otherwise
583            // each window's own backend spec drives restore/reconnect, so the
584            // placeholder only governs the active window until it reconnects.
585            std::mem::replace(
586                &mut self.current_authority,
587                crate::services::authority::Authority::local(
588                    std::sync::Arc::clone(&self.workspace_trust),
589                    std::sync::Arc::clone(&self.env_provider),
590                ),
591            ),
592            false,
593        )
594        .map_err(|e| io::Error::other(format!("Failed to create editor: {}", e)))?;
595
596        // Auto-load init.ts via the same pipeline as the non-server entry point.
597        editor.load_init_script(self.config.init_enabled);
598
599        // Enable session mode - use hardware cursor only, no REVERSED software cursor
600        editor.set_session_mode(true);
601
602        // Set session name for status bar display
603        let session_display_name = self.config.session_name.clone().unwrap_or_else(|| {
604            // Use the directory name as a short display name for unnamed sessions
605            self.config
606                .working_dir
607                .file_name()
608                .and_then(|n| n.to_str())
609                .map(|s| s.to_string())
610                .unwrap_or_else(|| "session".to_string())
611        });
612        editor.set_session_name(Some(session_display_name));
613
614        Ok((editor, terminal))
615    }
616
617    /// Initialize the editor on first client connection.
618    ///
619    /// Performs the full first-boot sequence: build editor, restore
620    /// workspace, recover buffers from hot exit, start recovery
621    /// session.  Subsequent rebuilds (on authority/working-dir change)
622    /// go through [`rebuild_editor`].
623    pub fn initialize_editor(&mut self) -> io::Result<()> {
624        let (mut editor, terminal) = self.build_editor_instance()?;
625
626        // Restore workspace and recovery data (mirrors the standalone startup
627        // path in handle_first_run_setup in main.rs).
628        match editor.try_restore_workspace() {
629            Ok(true) => {
630                tracing::info!("Session workspace restored successfully");
631            }
632            Ok(false) => {
633                tracing::debug!("No previous session workspace found");
634            }
635            Err(e) => {
636                tracing::warn!("Failed to restore session workspace: {}", e);
637            }
638        }
639
640        // Recover buffers from hot exit recovery files
641        if editor.has_recovery_files().unwrap_or(false) {
642            tracing::info!("Recovery files found for session, recovering...");
643            match editor.recover_all_buffers() {
644                Ok(count) if count > 0 => {
645                    tracing::info!("Recovered {} buffer(s) for session", count);
646                }
647                Ok(_) => {
648                    tracing::info!("No buffers to recover for session");
649                }
650                Err(e) => {
651                    tracing::warn!("Failed to recover session buffers: {}", e);
652                }
653            }
654        }
655
656        // Start the recovery session (periodic saves of dirty buffers)
657        if let Err(e) = editor.start_recovery_session() {
658            tracing::warn!("Failed to start recovery session: {}", e);
659        }
660
661        self.terminal = Some(terminal);
662        self.editor = Some(editor);
663
664        self.maybe_prompt_workspace_trust();
665
666        tracing::info!(
667            "Editor initialized with size {}x{}",
668            self.term_size.cols,
669            self.term_size.rows
670        );
671
672        Ok(())
673    }
674
675    /// Surface the workspace-trust prompt after the editor is built. Delegates
676    /// to `Editor::maybe_prompt_workspace_trust` (single source of truth shared
677    /// with the in-process run path).
678    fn maybe_prompt_workspace_trust(&mut self) {
679        if let Some(editor) = self.editor.as_mut() {
680            editor.maybe_prompt_workspace_trust();
681        }
682    }
683
684    /// Rebuild the editor in place after an authority transition or a
685    /// working-directory change.
686    ///
687    /// Mirrors the restart loop in `main.rs`: save the workspace so
688    /// open buffers come back, drop the old editor (which cascades
689    /// into shutting down terminals, LSP servers, and plugin state),
690    /// swap in any new authority / working-dir, build a fresh editor,
691    /// and restore the workspace under the new backend.  The TCP
692    /// clients stay connected throughout; each is flagged for a full
693    /// redraw on the next frame so they see the new editor from a
694    /// clean state rather than a mid-transition frame.
695    pub(crate) fn rebuild_editor(
696        &mut self,
697        new_working_dir: Option<PathBuf>,
698        new_authority: Option<crate::services::authority::Authority>,
699        new_keepalive: Option<Box<dyn std::any::Any + Send>>,
700    ) -> io::Result<()> {
701        // Flush buffer saves + workspace before dropping the old editor,
702        // mirroring the standalone exit path.  On failure we log and
703        // continue — rebuild should still succeed.
704        if let Some(ref mut editor) = self.editor {
705            if editor.config().editor.auto_save_enabled {
706                if let Err(e) = editor.save_all_on_exit() {
707                    tracing::warn!("Rebuild: failed to auto-save on exit: {}", e);
708                }
709            }
710            if let Err(e) = editor.end_recovery_session() {
711                tracing::warn!("Rebuild: failed to end recovery session: {}", e);
712            }
713            if let Err(e) = editor.save_all_windows_workspaces() {
714                tracing::warn!("Rebuild: failed to save workspaces: {}", e);
715            }
716        }
717
718        // Non-transition rebuild (working-dir change, config reload): carry the
719        // active session's own backend forward by moving it out of the old
720        // editor, so a remote session isn't dropped to the local placeholder
721        // `build_editor_instance` leaves behind. A real authority transition
722        // (`new_authority`) overwrites `current_authority` just below.
723        if new_authority.is_none() {
724            if let Some(ref mut editor) = self.editor {
725                self.current_authority = editor.take_active_authority();
726            }
727        }
728
729        // Drop old editor + terminal.  Drop impls shut down PTYs, LSP
730        // servers, and plugin threads.
731        self.editor = None;
732        self.terminal = None;
733
734        // Apply the pending changes before building the next editor.
735        if let Some(dir) = new_working_dir {
736            tracing::info!("Rebuild: switching working dir to {}", dir.display());
737            self.config.working_dir = dir;
738            // Re-anchor trust to the new workspace: move the containment
739            // root and repoint persistence at the new project's trust file,
740            // adopting that project's stored decision.
741            self.workspace_trust
742                .set_root(Some(self.config.working_dir.clone()));
743            self.workspace_trust.set_store(Some(
744                crate::services::workspace_trust::TrustStore::for_project_dir(
745                    &self
746                        .config
747                        .dir_context
748                        .project_state_dir(&self.config.working_dir),
749                ),
750            ));
751            // New project ⇒ the old env recipe no longer applies; deactivate
752            // and let the env-manager plugin re-detect for the new workspace.
753            self.env_provider.clear();
754        }
755        if let Some(auth) = new_authority {
756            tracing::info!(
757                "Rebuild: installing authority with label {:?}",
758                auth.display_label
759            );
760            self.current_authority = auth;
761        }
762        // Adopt the keepalive that rode with a connection-backed authority
763        // (remote agent / K8s) so its carrier + reconnect/heartbeat tasks
764        // survive the rebuild; the previous keepalive drops, tearing down
765        // any prior remote session. A local/docker transition carries
766        // none, leaving the current session untouched.
767        if let Some(keepalive) = new_keepalive {
768            self.session_keepalive = Some(keepalive);
769        }
770
771        let (mut editor, terminal) = self.build_editor_instance()?;
772
773        // Bring buffers back under the new backend.  `try_restore_workspace`
774        // reads the workspace file we wrote above and re-opens the
775        // same splits/buffers.
776        match editor.try_restore_workspace() {
777            Ok(true) => tracing::info!("Rebuild: workspace restored"),
778            Ok(false) => tracing::debug!("Rebuild: no workspace to restore"),
779            Err(e) => tracing::warn!("Rebuild: failed to restore workspace: {}", e),
780        }
781
782        if let Err(e) = editor.start_recovery_session() {
783            tracing::warn!("Rebuild: failed to start recovery session: {}", e);
784        }
785
786        self.terminal = Some(terminal);
787        self.editor = Some(editor);
788
789        // A working-dir change lands us in a possibly-undecided project;
790        // re-evaluate the trust prompt. (A rebuild triggered by a trust
791        // decision just recorded one, so this is a no-op there.)
792        self.maybe_prompt_workspace_trust();
793
794        // Force every attached client to repaint from scratch — the
795        // previous frame described the old editor's screen.
796        for client in &mut self.clients {
797            client.needs_full_render = true;
798        }
799
800        tracing::info!(
801            "Rebuild: complete, {} clients kept attached",
802            self.clients.len()
803        );
804
805        Ok(())
806    }
807
808    /// Handle a new client connection
809    fn handle_new_connection(
810        &self,
811        conn: ServerConnection,
812        client_id: u64,
813        cursor_style: crate::config::CursorStyle,
814    ) -> io::Result<ConnectedClient> {
815        // Read client hello
816        // On Windows, don't toggle blocking mode - named pipes don't support mode switching
817        // after connection. The read_control() method handles this internally.
818        #[cfg(not(windows))]
819        conn.control.set_nonblocking(false)?;
820        let hello_json = conn
821            .read_control()?
822            .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "No hello received"))?;
823
824        let client_msg: ClientControl = serde_json::from_str(&hello_json)
825            .map_err(|e| io::Error::other(format!("Invalid hello: {}", e)))?;
826
827        let hello = match client_msg {
828            ClientControl::Hello(h) => h,
829            _ => {
830                return Err(io::Error::other("Expected Hello message"));
831            }
832        };
833
834        // Check protocol version
835        if hello.protocol_version != PROTOCOL_VERSION {
836            let mismatch = VersionMismatch {
837                server_version: env!("CARGO_PKG_VERSION").to_string(),
838                client_version: hello.client_version.clone(),
839                action: if hello.protocol_version > PROTOCOL_VERSION {
840                    "upgrade_server".to_string()
841                } else {
842                    "restart_server".to_string()
843                },
844                message: format!(
845                    "Protocol version mismatch: server={}, client={}",
846                    PROTOCOL_VERSION, hello.protocol_version
847                ),
848            };
849
850            let response = serde_json::to_string(&ServerControl::VersionMismatch(mismatch))
851                .map_err(|e| io::Error::other(e.to_string()))?;
852            conn.write_control(&response)?;
853
854            return Err(io::Error::other("Version mismatch"));
855        }
856
857        // Send server hello
858        let session_id = self.config.session_name.clone().unwrap_or_else(|| {
859            crate::workspace::encode_path_for_filename(&self.config.working_dir)
860        });
861
862        let server_hello = ServerHello::new(session_id);
863        let response = serde_json::to_string(&ServerControl::Hello(server_hello))
864            .map_err(|e| io::Error::other(e.to_string()))?;
865        conn.write_control(&response)?;
866
867        // Set sockets back to non-blocking
868        // On Windows, don't toggle mode - named pipes don't support mode switching
869        #[cfg(not(windows))]
870        conn.control.set_nonblocking(true)?;
871
872        // Send terminal setup sequences
873        let mouse_hover_enabled = self.config.editor_config.editor.mouse_hover_enabled;
874        let setup = terminal_setup_sequences(mouse_hover_enabled);
875        conn.write_data(&setup)?;
876
877        // Send cursor style (from editor if running, otherwise from config)
878        conn.write_data(cursor_style.to_escape_sequence())?;
879
880        tracing::debug!(
881            "Client {} connected: {}x{}, TERM={:?}",
882            client_id,
883            hello.term_size.cols,
884            hello.term_size.rows,
885            hello.term()
886        );
887
888        // Create background writer for non-blocking render output
889        let data_writer = ClientDataWriter::new(conn.data.clone(), client_id);
890
891        Ok(ConnectedClient {
892            conn,
893            data_writer,
894            term_size: hello.term_size,
895            env: hello.env,
896            id: client_id,
897            input_parser: InputParser::new(),
898            needs_full_render: true,
899            wait_id: None,
900        })
901    }
902
903    /// Process messages from connected clients
904    /// Returns (input_events, resize_occurred, index of client that provided input)
905    fn process_clients(&mut self) -> io::Result<(Vec<Event>, bool, Option<usize>)> {
906        let mut disconnected = Vec::new();
907        let mut input_source_client: Option<usize> = None;
908        let mut input_events = Vec::new();
909        let mut resize_occurred = false;
910        let mut control_messages: Vec<(usize, ClientControl)> = Vec::new();
911
912        for (idx, client) in self.clients.iter_mut().enumerate() {
913            // Read from data socket
914            let mut buf = [0u8; 4096];
915            let mut data_eof = false;
916            tracing::debug!("[server] reading from client {} data socket", client.id);
917            match client.conn.read_data(&mut buf) {
918                Ok(0) => {
919                    tracing::debug!("[server] Client {} data stream closed (EOF)", client.id);
920                    // Don't disconnect waiting clients on data EOF - they're not sending data
921                    if client.wait_id.is_none() {
922                        disconnected.push(idx);
923                    }
924                    data_eof = true;
925                    // Don't continue - still need to check control socket for pending messages
926                }
927                Ok(n) => {
928                    tracing::debug!(
929                        "[server] Client {} read {} bytes from data socket",
930                        client.id,
931                        n
932                    );
933                    let events = client.input_parser.parse(&buf[..n]);
934                    tracing::debug!(
935                        "[server] Client {} parsed {} events",
936                        client.id,
937                        events.len()
938                    );
939                    if !events.is_empty() {
940                        input_source_client = Some(idx);
941                    }
942                    input_events.extend(events);
943                }
944                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
945                    // No data available
946                }
947                Err(e) => {
948                    tracing::warn!("[server] Client {} data read error: {}", client.id, e);
949                    disconnected.push(idx);
950                    data_eof = true;
951                    // Don't continue - still need to check control socket for pending messages
952                }
953            }
954            let _ = data_eof; // Suppress unused warning
955
956            // Check control socket
957            // On Windows, don't toggle nonblocking mode - it fails on named pipes
958            // Best-effort: nonblocking mode for control socket polling
959            #[cfg(not(windows))]
960            #[allow(clippy::let_underscore_must_use)]
961            let _ = client.conn.control.set_nonblocking(true);
962
963            // On Windows, use try_read pattern instead of blocking read_line
964            #[cfg(windows)]
965            {
966                let mut buf = [0u8; 1024];
967                match client.conn.control.try_read(&mut buf) {
968                    Ok(0) => {
969                        tracing::debug!("Client {} control stream closed (EOF)", client.id);
970                        disconnected.push(idx);
971                    }
972                    Ok(n) => {
973                        // Try to parse as control message
974                        if let Ok(s) = std::str::from_utf8(&buf[..n]) {
975                            for line in s.lines() {
976                                if !line.trim().is_empty() {
977                                    if let Ok(msg) = serde_json::from_str::<ClientControl>(line) {
978                                        control_messages.push((idx, msg));
979                                    }
980                                }
981                            }
982                        }
983                    }
984                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
985                    Err(e) => {
986                        tracing::warn!("Client {} control read error: {}", client.id, e);
987                    }
988                }
989            }
990
991            #[cfg(not(windows))]
992            {
993                let mut reader = std::io::BufReader::new(&client.conn.control);
994                let mut line = String::new();
995                match std::io::BufRead::read_line(&mut reader, &mut line) {
996                    Ok(0) => {
997                        tracing::debug!("Client {} control stream closed (EOF)", client.id);
998                        disconnected.push(idx);
999                    }
1000                    Ok(_) if !line.trim().is_empty() => {
1001                        if let Ok(msg) = serde_json::from_str::<ClientControl>(&line) {
1002                            control_messages.push((idx, msg));
1003                        }
1004                    }
1005                    Ok(_) => {}
1006                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
1007                    Err(e) => {
1008                        tracing::warn!("Client {} control read error: {}", client.id, e);
1009                    }
1010                }
1011            }
1012        }
1013
1014        // Process control messages
1015        if !control_messages.is_empty() {
1016            tracing::debug!(
1017                "[server] Processing {} control messages",
1018                control_messages.len()
1019            );
1020        }
1021        for (idx, msg) in control_messages {
1022            tracing::debug!("[server] Control message from client {}: {:?}", idx, msg);
1023            // Always process Quit, even from disconnected clients
1024            if let ClientControl::Quit = msg {
1025                tracing::info!("Client requested quit, shutting down");
1026                self.shutdown.store(true, Ordering::SeqCst);
1027                continue;
1028            }
1029
1030            // Always process OpenFiles / OpenWindow - they're one-shot
1031            // commands from clients that disconnect immediately
1032            if let ClientControl::OpenFiles { .. } | ClientControl::OpenWindow { .. } = msg {
1033                // Fall through to process it
1034            } else if disconnected.contains(&idx) {
1035                // Skip other messages from disconnected clients
1036                continue;
1037            }
1038
1039            match msg {
1040                ClientControl::Hello(_) => {
1041                    tracing::warn!("Unexpected Hello from client");
1042                }
1043                ClientControl::Resize { cols, rows } => {
1044                    if let Some(client) = self.clients.get_mut(idx) {
1045                        client.term_size = TermSize::new(cols, rows);
1046                        // Update server size to match first client
1047                        if idx == 0 {
1048                            self.term_size = TermSize::new(cols, rows);
1049                            resize_occurred = true;
1050                        }
1051                    }
1052                }
1053                ClientControl::Ping => {
1054                    if let Some(client) = self.clients.get_mut(idx) {
1055                        let pong = serde_json::to_string(&ServerControl::Pong).unwrap_or_default();
1056                        // Best-effort pong reply
1057                        #[allow(clippy::let_underscore_must_use)]
1058                        let _ = client.conn.write_control(&pong);
1059                    }
1060                }
1061                ClientControl::Detach => {
1062                    tracing::info!("Client {} detached", idx);
1063                    disconnected.push(idx);
1064                }
1065                ClientControl::OpenFiles { files, wait } => {
1066                    if let Some(ref mut editor) = self.editor {
1067                        // Assign a wait_id if --wait was requested
1068                        let wait_id = if wait {
1069                            let id = self.next_wait_id;
1070                            self.next_wait_id += 1;
1071                            Some(id)
1072                        } else {
1073                            None
1074                        };
1075
1076                        let file_count = files.len();
1077                        for (i, file_req) in files.iter().enumerate() {
1078                            let path = std::path::PathBuf::from(&file_req.path);
1079                            tracing::debug!(
1080                                "Queuing file open: {:?} line={:?} col={:?} end_line={:?} end_col={:?} message={:?}",
1081                                path,
1082                                file_req.line,
1083                                file_req.column,
1084                                file_req.end_line,
1085                                file_req.end_column,
1086                                file_req.message,
1087                            );
1088                            // Only the last file gets the wait_id (it's the one that will be active)
1089                            let file_wait_id = if i == file_count - 1 { wait_id } else { None };
1090                            editor.queue_file_open(
1091                                path,
1092                                file_req.line,
1093                                file_req.column,
1094                                file_req.end_line,
1095                                file_req.end_column,
1096                                file_req.message.clone(),
1097                                file_wait_id,
1098                            );
1099                        }
1100
1101                        // Track the waiting client
1102                        if let Some(wait_id) = wait_id {
1103                            if let Some(client) = self.clients.get_mut(idx) {
1104                                self.waiting_clients.insert(wait_id, client.id);
1105                                client.wait_id = Some(wait_id);
1106                            }
1107                        }
1108
1109                        resize_occurred = true; // Force re-render
1110                    }
1111                }
1112                ClientControl::OpenWindow { path } => {
1113                    if let Some(ref mut editor) = self.editor {
1114                        let path = std::path::PathBuf::from(path);
1115                        if path.is_absolute() {
1116                            let label = path
1117                                .file_name()
1118                                .map(|s| s.to_string_lossy().into_owned())
1119                                .unwrap_or_else(|| path.to_string_lossy().into_owned());
1120                            let id = editor.create_window_at(path, label);
1121                            editor.set_active_window(id);
1122                            resize_occurred = true; // Force re-render
1123                        } else {
1124                            tracing::warn!(
1125                                "OpenWindow rejected: path must be absolute: {:?}",
1126                                path
1127                            );
1128                        }
1129                    }
1130                }
1131                ClientControl::Quit => unreachable!(), // Handled above
1132            }
1133        }
1134
1135        // Check for clients with broken write pipes
1136        for (idx, client) in self.clients.iter().enumerate() {
1137            if client.data_writer.is_broken() && !disconnected.contains(&idx) {
1138                tracing::info!("Client {} write pipe broken, disconnecting", client.id);
1139                disconnected.push(idx);
1140            }
1141        }
1142
1143        // Deduplicate and sort for safe reverse removal
1144        disconnected.sort_unstable();
1145        disconnected.dedup();
1146
1147        // Remove disconnected clients
1148        for idx in disconnected.into_iter().rev() {
1149            let client = self.clients.remove(idx);
1150            // Clean up --wait tracking if this client was waiting
1151            if let Some(wait_id) = client.wait_id {
1152                self.waiting_clients.remove(&wait_id);
1153                // Also clean up editor wait_tracking for this wait_id
1154                if let Some(ref mut editor) = self.editor {
1155                    editor.remove_wait_tracking(wait_id);
1156                }
1157            }
1158            // Best-effort teardown via the non-blocking writer
1159            let teardown = terminal_teardown_sequences();
1160            let _ = client.data_writer.try_write(&teardown);
1161            tracing::info!("Client {} disconnected", client.id);
1162            // Invalidate input source if that client disconnected
1163            if input_source_client == Some(idx) {
1164                input_source_client = None;
1165            }
1166        }
1167
1168        Ok((input_events, resize_occurred, input_source_client))
1169    }
1170
1171    /// Update terminal size after resize
1172    fn update_terminal_size(&mut self) -> io::Result<()> {
1173        if let Some(ref mut terminal) = self.terminal {
1174            let backend = terminal.backend_mut();
1175            backend.resize(self.term_size.cols, self.term_size.rows);
1176        }
1177
1178        if let Some(ref mut editor) = self.editor {
1179            editor.resize(self.term_size.cols, self.term_size.rows);
1180        }
1181
1182        Ok(())
1183    }
1184
1185    /// Handle an input event
1186    fn handle_event(&mut self, event: Event) -> io::Result<bool> {
1187        let Some(ref mut editor) = self.editor else {
1188            return Ok(false);
1189        };
1190
1191        match event {
1192            Event::Key(key_event) => {
1193                if key_event.kind == KeyEventKind::Press {
1194                    editor
1195                        .handle_key(key_event.code, key_event.modifiers)
1196                        .map_err(|e| io::Error::other(e.to_string()))?;
1197                    Ok(true)
1198                } else {
1199                    Ok(false)
1200                }
1201            }
1202            Event::Mouse(mouse_event) => editor
1203                .handle_mouse(mouse_event)
1204                .map_err(|e| io::Error::other(e.to_string())),
1205            Event::Resize(w, h) => {
1206                editor.resize(w, h);
1207                Ok(true)
1208            }
1209            Event::Paste(text) => {
1210                editor.paste_text(text);
1211                Ok(true)
1212            }
1213            _ => Ok(false),
1214        }
1215    }
1216
1217    /// Render the editor and broadcast output to all clients
1218    fn render_and_broadcast(&mut self) -> io::Result<()> {
1219        let Some(ref mut editor) = self.editor else {
1220            return Ok(());
1221        };
1222
1223        let Some(ref mut terminal) = self.terminal else {
1224            return Ok(());
1225        };
1226
1227        // Check if any client needs a full render (e.g., newly connected)
1228        let any_needs_full = self.clients.iter().any(|c| c.needs_full_render);
1229        if any_needs_full {
1230            tracing::info!(
1231                "Full render requested for {} client(s)",
1232                self.clients.iter().filter(|c| c.needs_full_render).count()
1233            );
1234            // Force full redraw by invalidating terminal state
1235            terminal.backend_mut().reset_style_state();
1236            // Best-effort terminal clear for full redraw
1237            #[allow(clippy::let_underscore_must_use)]
1238            let _ = terminal.clear();
1239        }
1240
1241        // Take any pending escape sequences (e.g., cursor style changes)
1242        let pending_sequences = editor.take_pending_escape_sequences();
1243
1244        // Render to capture backend
1245        terminal
1246            .draw(|frame| editor.render(frame))
1247            .map_err(|e| io::Error::other(e.to_string()))?;
1248
1249        // Get the captured output
1250        let output = terminal.backend_mut().take_buffer();
1251
1252        if output.is_empty() && pending_sequences.is_empty() {
1253            return Ok(());
1254        }
1255
1256        // Broadcast to all clients via non-blocking writer threads (skip waiting clients)
1257        for client in &mut self.clients {
1258            if client.wait_id.is_some() {
1259                continue;
1260            }
1261            // Combine pending sequences and output into a single frame
1262            let frame = if !pending_sequences.is_empty() && !output.is_empty() {
1263                let mut combined = Vec::with_capacity(pending_sequences.len() + output.len());
1264                combined.extend_from_slice(&pending_sequences);
1265                combined.extend_from_slice(&output);
1266                combined
1267            } else if !pending_sequences.is_empty() {
1268                pending_sequences.clone()
1269            } else {
1270                output.clone()
1271            };
1272
1273            if !frame.is_empty() && !client.data_writer.try_write(&frame) {
1274                tracing::warn!("Client {} output buffer full, dropping frame", client.id);
1275            }
1276            // Clear full render flag after sending
1277            client.needs_full_render = false;
1278        }
1279
1280        Ok(())
1281    }
1282
1283    /// Disconnect all clients
1284    fn disconnect_all_clients(&mut self, reason: &str) -> io::Result<()> {
1285        let teardown = terminal_teardown_sequences();
1286        for client in &mut self.clients {
1287            // Best-effort: client may already be disconnected
1288            #[allow(clippy::let_underscore_must_use)]
1289            let _ = client.data_writer.try_write(&teardown);
1290            let quit_msg = serde_json::to_string(&ServerControl::Quit {
1291                reason: reason.to_string(),
1292            })
1293            .unwrap_or_default();
1294            // Best-effort: client may already be disconnected
1295            #[allow(clippy::let_underscore_must_use)]
1296            let _ = client.conn.write_control(&quit_msg);
1297        }
1298        self.clients.clear();
1299        Ok(())
1300    }
1301}
1302
1303impl ConnectedClient {
1304    /// Get the client's TERM environment variable
1305    #[allow(dead_code)]
1306    pub fn term(&self) -> Option<&str> {
1307        self.env.get("TERM").and_then(|v| v.as_deref())
1308    }
1309
1310    /// Check if the client supports truecolor
1311    #[allow(dead_code)]
1312    pub fn supports_truecolor(&self) -> bool {
1313        self.env
1314            .get("COLORTERM")
1315            .and_then(|v| v.as_deref())
1316            .map(|v| v == "truecolor" || v == "24bit")
1317            .unwrap_or(false)
1318    }
1319}