Skip to main content

fresh/server/
editor_server.rs

1//! Editor integration with the session server
2//!
3//! This module bridges the Editor with the server infrastructure:
4//! - Creates Editor with CaptureBackend for rendering
5//! - Processes input events from clients
6//! - Broadcasts rendered output to all clients
7
8use std::io;
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::mpsc;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use crossterm::event::{Event, KeyEventKind};
16use ratatui::Terminal;
17
18use crate::app::Editor;
19use crate::config::Config;
20use crate::config_io::DirectoryContext;
21// Filesystem is now owned by `self.current_authority`; the server no
22// longer constructs a `StdFileSystem` directly.
23use crate::server::capture_backend::{
24    terminal_setup_sequences, terminal_teardown_sequences, CaptureBackend,
25};
26use crate::server::input_parser::InputParser;
27use crate::server::ipc::{ServerConnection, ServerListener, SocketPaths, StreamWrapper};
28use crate::server::protocol::{
29    ClientControl, ServerControl, ServerHello, TermSize, VersionMismatch, PROTOCOL_VERSION,
30};
31use crate::view::color_support::ColorCapability;
32
33/// Configuration for the editor server
34pub struct EditorServerConfig {
35    /// Working directory for this session
36    pub working_dir: PathBuf,
37    /// Optional session name
38    pub session_name: Option<String>,
39    /// Idle timeout before auto-shutdown
40    pub idle_timeout: Option<Duration>,
41    /// Editor configuration
42    pub editor_config: Config,
43    /// Directory context for config/data paths
44    pub dir_context: DirectoryContext,
45    /// Whether plugins are enabled
46    pub plugins_enabled: bool,
47    /// Whether to auto-load ~/.config/fresh/init.ts (requires `plugins_enabled`).
48    pub init_enabled: bool,
49    /// Authority to install at boot.  `None` means `Authority::local()`,
50    /// which is the standard session-mode default (principle 6 of
51    /// `AUTHORITY_DESIGN.md`).  The CLI `ssh://` / `user@host:path`
52    /// forms construct an `Authority::ssh(...)` and pass it here so
53    /// the daemon boots already attached to the remote host.  Plugins
54    /// can still replace this post-boot via `setAuthority`.
55    pub startup_authority: Option<crate::services::authority::Authority>,
56    /// Opaque handle kept alive for the server's lifetime alongside
57    /// `startup_authority`.  SSH authorities back this with the Tokio
58    /// runtime, the `SshConnection`, and the reconnect task — dropping
59    /// any of those tears down the remote session — so the caller
60    /// bundles them here and the server just holds on until shutdown.
61    /// Local authorities leave this `None`.
62    pub session_keepalive: Option<Box<dyn std::any::Any + Send>>,
63}
64
65/// Editor server that manages editor state and client connections
66pub struct EditorServer {
67    config: EditorServerConfig,
68    listener: ServerListener,
69    clients: Vec<ConnectedClient>,
70    editor: Option<Editor>,
71    terminal: Option<Terminal<CaptureBackend>>,
72    last_client_activity: Instant,
73    shutdown: Arc<AtomicBool>,
74    /// Effective terminal size (from the primary/first client)
75    term_size: TermSize,
76    /// Index of the client that most recently provided input (for per-client detach)
77    last_input_client: Option<usize>,
78    /// Next wait ID for --wait tracking
79    next_wait_id: u64,
80    /// Maps wait_id → client_id for clients waiting on file events
81    waiting_clients: std::collections::HashMap<u64, u64>,
82    /// Current authority. Carried across editor rebuilds so plugin-
83    /// installed authorities (e.g. a devcontainer attach) survive the
84    /// restart-based transition: the old editor is dropped, a new one
85    /// is built with this authority in effect, and clients stay
86    /// connected the whole time. Starts as
87    /// `config.startup_authority.unwrap_or_else(Authority::local)`.
88    current_authority: crate::services::authority::Authority,
89    /// Keepalive bundle paired with the startup authority — held for
90    /// the server's lifetime so SSH runtimes, reconnect tasks, and
91    /// similar resources outlive the editor rebuilds that happen on
92    /// authority transitions.  Never inspected; dropped only when the
93    /// server is dropped.
94    #[allow(dead_code)]
95    session_keepalive: Option<Box<dyn std::any::Any + Send>>,
96}
97
98/// Buffered writer for sending data to a client without blocking the server loop.
99///
100/// Spawns a background thread that receives data via a bounded channel and
101/// writes it to the client's data pipe. If the channel fills up (client is
102/// too slow to read), frames are dropped. If the pipe breaks, the `pipe_broken`
103/// flag is set so the main loop can disconnect the client.
104struct ClientDataWriter {
105    sender: mpsc::SyncSender<Vec<u8>>,
106    pipe_broken: Arc<AtomicBool>,
107}
108
109impl ClientDataWriter {
110    /// Create a new writer that spawns a background thread to write to the data stream.
111    fn new(data: StreamWrapper, client_id: u64) -> Self {
112        // 16 frames of buffer (~270ms at 60fps before dropping frames)
113        let (tx, rx) = mpsc::sync_channel::<Vec<u8>>(16);
114        let pipe_broken = Arc::new(AtomicBool::new(false));
115        let pipe_broken_clone = pipe_broken.clone();
116
117        std::thread::Builder::new()
118            .name(format!("client-{}-writer", client_id))
119            .spawn(move || {
120                while let Ok(buf) = rx.recv() {
121                    if let Err(e) = data.write_all(&buf) {
122                        tracing::debug!("Client {} writer pipe error: {}", client_id, e);
123                        pipe_broken_clone.store(true, Ordering::Relaxed);
124                        break;
125                    }
126                    if let Err(e) = data.flush() {
127                        tracing::debug!("Client {} writer flush error: {}", client_id, e);
128                        pipe_broken_clone.store(true, Ordering::Relaxed);
129                        break;
130                    }
131                }
132                tracing::debug!("Client {} writer thread exiting", client_id);
133            })
134            .expect("Failed to spawn client writer thread");
135
136        Self {
137            sender: tx,
138            pipe_broken,
139        }
140    }
141
142    /// Try to send data without blocking. Returns false if the channel is full
143    /// (client too slow) or the writer thread has exited.
144    fn try_write(&self, data: &[u8]) -> bool {
145        self.sender.try_send(data.to_vec()).is_ok()
146    }
147
148    /// Check if the writer thread detected a broken pipe.
149    fn is_broken(&self) -> bool {
150        self.pipe_broken.load(Ordering::Relaxed)
151    }
152}
153
154/// A connected client with its own input parser
155struct ConnectedClient {
156    conn: ServerConnection,
157    /// Background writer for non-blocking data output
158    data_writer: ClientDataWriter,
159    term_size: TermSize,
160    env: std::collections::HashMap<String, Option<String>>,
161    id: u64,
162    input_parser: InputParser,
163    /// Whether this client needs a full screen render on next frame
164    needs_full_render: bool,
165    /// If set, this client is waiting for a --wait completion signal
166    wait_id: Option<u64>,
167}
168
169impl EditorServer {
170    /// Create a new editor server
171    pub fn new(mut config: EditorServerConfig) -> io::Result<Self> {
172        let socket_paths = if let Some(ref name) = config.session_name {
173            SocketPaths::for_session_name(name)?
174        } else {
175            SocketPaths::for_working_dir(&config.working_dir)?
176        };
177
178        let listener = ServerListener::bind(socket_paths)?;
179
180        // Write PID file so clients can detect stale sessions
181        let pid = std::process::id();
182        if let Err(e) = listener.paths().write_pid(pid) {
183            tracing::warn!("Failed to write PID file: {}", e);
184        }
185
186        // Move the startup authority + its keepalive off the config —
187        // they are consumed once and belong to the server from here on.
188        let current_authority = config
189            .startup_authority
190            .take()
191            .unwrap_or_else(crate::services::authority::Authority::local);
192        let session_keepalive = config.session_keepalive.take();
193
194        Ok(Self {
195            config,
196            listener,
197            clients: Vec::new(),
198            editor: None,
199            terminal: None,
200            last_client_activity: Instant::now(),
201            shutdown: Arc::new(AtomicBool::new(false)),
202            term_size: TermSize::new(80, 24), // Default until first client connects
203            last_input_client: None,
204            next_wait_id: 1,
205            waiting_clients: std::collections::HashMap::new(),
206            current_authority,
207            session_keepalive,
208        })
209    }
210
211    /// Get a handle to request shutdown
212    pub fn shutdown_handle(&self) -> Arc<AtomicBool> {
213        self.shutdown.clone()
214    }
215
216    /// Get the socket paths
217    pub fn socket_paths(&self) -> &SocketPaths {
218        self.listener.paths()
219    }
220
221    /// Access the editor instance (available after initialize_editor).
222    pub fn editor(&self) -> Option<&Editor> {
223        self.editor.as_ref()
224    }
225
226    /// Mutable access to the editor instance (available after initialize_editor).
227    pub fn editor_mut(&mut self) -> Option<&mut Editor> {
228        self.editor.as_mut()
229    }
230
231    /// Run the editor server main loop
232    pub fn run(&mut self) -> io::Result<()> {
233        tracing::info!("Editor server starting for {:?}", self.config.working_dir);
234
235        let mut next_client_id = 1u64;
236        let mut needs_render = true;
237        let mut last_render = Instant::now();
238        const FRAME_DURATION: Duration = Duration::from_millis(16); // 60fps
239
240        loop {
241            // Check for shutdown
242            if self.shutdown.load(Ordering::SeqCst) {
243                tracing::info!("Shutdown requested");
244                break;
245            }
246
247            // Check idle timeout
248            if let Some(timeout) = self.config.idle_timeout {
249                if self.clients.is_empty() && self.last_client_activity.elapsed() > timeout {
250                    tracing::info!("Idle timeout reached, shutting down");
251                    break;
252                }
253            }
254
255            // Accept new connections
256            tracing::debug!("[server] main loop: calling accept()");
257            match self.listener.accept() {
258                Ok(Some(conn)) => {
259                    // Get current cursor style from editor if it exists, otherwise from config
260                    let cursor_style = self
261                        .editor
262                        .as_ref()
263                        .map(|e| e.config().editor.cursor_style)
264                        .unwrap_or(self.config.editor_config.editor.cursor_style);
265                    match self.handle_new_connection(conn, next_client_id, cursor_style) {
266                        Ok(client) => {
267                            tracing::info!("Client {} connected", client.id);
268
269                            // Initialize editor on first-ever client, or update size if reconnecting
270                            if self.editor.is_none() {
271                                // First time - initialize editor
272                                self.term_size = client.term_size;
273                                self.initialize_editor()?;
274                            } else if self.clients.is_empty() {
275                                // Reconnecting after all clients disconnected - update terminal size
276                                if self.term_size != client.term_size {
277                                    self.term_size = client.term_size;
278                                    self.update_terminal_size()?;
279                                }
280                            }
281                            // Note: full redraw is handled via client.needs_full_render flag
282
283                            self.clients.push(client);
284                            self.last_client_activity = Instant::now();
285                            next_client_id += 1;
286                            needs_render = true;
287                        }
288                        Err(e) => {
289                            tracing::warn!("Failed to complete handshake: {}", e);
290                        }
291                    }
292                }
293                Ok(None) => {}
294                Err(e) => {
295                    tracing::error!("Accept error: {}", e);
296                }
297            }
298
299            // Process client messages and get input events
300            tracing::debug!("[server] main loop: calling process_clients");
301            let (input_events, resize_occurred, input_source) = self.process_clients()?;
302            if let Some(idx) = input_source {
303                self.last_input_client = Some(idx);
304            }
305            if !input_events.is_empty() {
306                tracing::debug!(
307                    "[server] process_clients returned {} events",
308                    input_events.len()
309                );
310            }
311
312            // Check if editor should quit. `should_quit` is set both
313            // by a genuine user quit and by `request_restart`
314            // (triggered by `change_working_dir` and by
315            // `install_authority`).  Distinguish the two by peeking at
316            // the editor's pending-restart fields: if either carries a
317            // value, rebuild the editor in place and keep clients
318            // attached; otherwise this is a real shutdown.
319            if let Some(ref mut editor) = self.editor {
320                if editor.should_quit() {
321                    let pending_authority = editor.take_pending_authority();
322                    let restart_dir = editor.take_restart_dir();
323                    if pending_authority.is_some() || restart_dir.is_some() {
324                        tracing::info!(
325                            "Session rebuild requested (authority={}, dir={})",
326                            pending_authority.is_some(),
327                            restart_dir.is_some()
328                        );
329                        if let Err(e) = self.rebuild_editor(restart_dir, pending_authority) {
330                            tracing::error!("Session rebuild failed, shutting down: {}", e);
331                            self.shutdown.store(true, Ordering::SeqCst);
332                            continue;
333                        }
334                        needs_render = true;
335                        continue;
336                    }
337                    tracing::info!("Editor requested quit");
338                    self.shutdown.store(true, Ordering::SeqCst);
339                    continue;
340                }
341            }
342
343            // Check if client should detach (keep server running)
344            let detach_requested = self
345                .editor
346                .as_ref()
347                .map(|e| e.should_detach())
348                .unwrap_or(false);
349            if detach_requested {
350                // Detach only the client that triggered it (via last input)
351                if let Some(idx) = self.last_input_client.take() {
352                    if idx < self.clients.len() {
353                        tracing::info!("Client {} requested detach", self.clients[idx].id);
354                        let client = self.clients.remove(idx);
355                        let teardown = terminal_teardown_sequences();
356                        // Best-effort: client may already be disconnected
357                        #[allow(clippy::let_underscore_must_use)]
358                        let _ = client.data_writer.try_write(&teardown);
359                        let quit_msg = serde_json::to_string(&ServerControl::Quit {
360                            reason: "Detached".to_string(),
361                        })
362                        .unwrap_or_default();
363                        // Best-effort: client may already be disconnected
364                        #[allow(clippy::let_underscore_must_use)]
365                        let _ = client.conn.write_control(&quit_msg);
366                    }
367                } else {
368                    // Fallback: if we can't determine which client, detach all
369                    tracing::info!("Detach requested but no input source, detaching all");
370                    self.disconnect_all_clients("Detached")?;
371                }
372                // Reset the detach flag
373                if let Some(ref mut editor) = self.editor {
374                    editor.clear_detach();
375                }
376                continue;
377            }
378
379            // Check if the client should suspend itself (SIGTSTP). The server
380            // keeps running — only the client drops back to the shell. On
381            // resume, the client sends a Resize to nudge a full redraw; we
382            // pre-mark `needs_full_render` so the next rendered frame after
383            // resume re-emits setup sequences and a complete paint.
384            let suspend_requested = self
385                .editor
386                .as_mut()
387                .map(|e| e.take_suspend_request())
388                .unwrap_or(false);
389            if suspend_requested {
390                if let Some(idx) = self.last_input_client {
391                    if idx < self.clients.len() {
392                        let client_id = self.clients[idx].id;
393                        tracing::info!("Client {} requested suspend", client_id);
394                        let suspend_msg = serde_json::to_string(&ServerControl::SuspendClient)
395                            .unwrap_or_default();
396                        // Best-effort: client may already be disconnected
397                        #[allow(clippy::let_underscore_must_use)]
398                        let _ = self.clients[idx].conn.write_control(&suspend_msg);
399                        // Mark so the next render re-emits setup sequences and a
400                        // full paint once the client resumes and reconnects its
401                        // terminal.
402                        self.clients[idx].needs_full_render = true;
403                    }
404                } else {
405                    tracing::warn!("Suspend requested but no input source; ignoring");
406                }
407                continue;
408            }
409
410            // Handle resize
411            if resize_occurred {
412                self.update_terminal_size()?;
413                needs_render = true;
414            }
415
416            // Process input events
417            if !input_events.is_empty() {
418                self.last_client_activity = Instant::now();
419                for event in input_events {
420                    if self.handle_event(event)? {
421                        needs_render = true;
422                    }
423                }
424            }
425
426            // Process async messages from editor
427            if let Some(ref mut editor) = self.editor {
428                if editor.process_async_messages() {
429                    needs_render = true;
430                }
431                if editor.process_pending_file_opens() {
432                    needs_render = true;
433                }
434
435                // Process completed --wait operations
436                for wait_id in editor.take_completed_waits() {
437                    if let Some(client_id) = self.waiting_clients.remove(&wait_id) {
438                        // Find the client and send WaitComplete
439                        if let Some(client) = self.clients.iter_mut().find(|c| c.id == client_id) {
440                            let msg = serde_json::to_string(&ServerControl::WaitComplete)
441                                .unwrap_or_default();
442                            #[allow(clippy::let_underscore_must_use)]
443                            let _ = client.conn.write_control(&msg);
444                            client.wait_id = None;
445                        }
446                    }
447                }
448
449                // Send pending clipboard data to clients via control message
450                if let Some(cb) = editor.take_pending_clipboard() {
451                    let msg = serde_json::to_string(&ServerControl::SetClipboard {
452                        text: cb.text,
453                        use_osc52: cb.use_osc52,
454                        use_system_clipboard: cb.use_system_clipboard,
455                    })
456                    .unwrap_or_default();
457                    for client in &mut self.clients {
458                        #[allow(clippy::let_underscore_must_use)]
459                        let _ = client.conn.write_control(&msg);
460                    }
461                }
462
463                if editor.check_mouse_hover_timer() {
464                    needs_render = true;
465                }
466
467                // Active animations force a render every FRAME_DURATION so
468                // the slide settles on its own. Without this the loop only
469                // ticks when an external event (input, resize, async
470                // message) flips `needs_render`, so under tmux a buffer
471                // switch paints its first frame and then freezes mid-slide
472                // until the user nudges the terminal. Mirrors the direct
473                // (non-server) loop in `main.rs`.
474                if editor.active_window().animations.is_active() {
475                    needs_render = true;
476                }
477            }
478
479            // Render and broadcast if needed
480            if needs_render && last_render.elapsed() >= FRAME_DURATION {
481                self.render_and_broadcast()?;
482                last_render = Instant::now();
483                needs_render = false;
484            }
485
486            // Brief sleep to avoid busy-waiting
487            std::thread::sleep(Duration::from_millis(5));
488        }
489
490        // Perform the same shutdown sequence as the normal (non-session) exit path
491        // in run_event_loop_common: auto-save, end recovery session, save workspace.
492        if let Some(ref mut editor) = self.editor {
493            // Auto-save file-backed buffers to disk before exiting
494            if editor.config().editor.auto_save_enabled {
495                match editor.save_all_on_exit() {
496                    Ok(count) if count > 0 => {
497                        tracing::info!("Auto-saved {} buffer(s) on exit", count);
498                    }
499                    Ok(_) => {}
500                    Err(e) => {
501                        tracing::warn!("Failed to auto-save on exit: {}", e);
502                    }
503                }
504            }
505
506            // End recovery session first (flushes dirty buffers + assigns recovery IDs),
507            // then save workspace (captures those IDs for next session restore).
508            if let Err(e) = editor.end_recovery_session() {
509                tracing::warn!("Failed to end recovery session: {}", e);
510            }
511            if let Err(e) = editor.save_all_windows_workspaces() {
512                tracing::warn!("Failed to save workspaces: {}", e);
513            } else {
514                tracing::debug!("Workspaces saved successfully");
515            }
516        }
517
518        // Clean shutdown
519        self.disconnect_all_clients("Server shutting down")?;
520
521        Ok(())
522    }
523
524    /// Build a fresh `Editor` instance using the current configuration
525    /// and stored authority.  Shared between first-boot initialization
526    /// and post-restart rebuild.
527    fn build_editor_instance(&self) -> io::Result<(Editor, Terminal<CaptureBackend>)> {
528        let backend = CaptureBackend::new(self.term_size.cols, self.term_size.rows);
529        let terminal = Terminal::new(backend)
530            .map_err(|e| io::Error::other(format!("Failed to create terminal: {}", e)))?;
531
532        // The Editor constructor still takes a filesystem; the real
533        // authority is installed via `set_boot_authority` right after
534        // construction so plugins and init.ts load against the correct
535        // backend from the first tick.
536        let filesystem = self.current_authority.filesystem.clone();
537        let color_capability = ColorCapability::TrueColor; // Assume truecolor for now
538
539        let mut editor = Editor::with_working_dir(
540            self.config.editor_config.clone(),
541            self.term_size.cols,
542            self.term_size.rows,
543            Some(self.config.working_dir.clone()),
544            self.config.dir_context.clone(),
545            self.config.plugins_enabled,
546            color_capability,
547            filesystem,
548        )
549        .map_err(|e| io::Error::other(format!("Failed to create editor: {}", e)))?;
550
551        editor.set_boot_authority(self.current_authority.clone());
552
553        // Auto-load init.ts via the same pipeline as the non-server entry point.
554        editor.load_init_script(self.config.init_enabled);
555
556        // Enable session mode - use hardware cursor only, no REVERSED software cursor
557        editor.set_session_mode(true);
558
559        // Set session name for status bar display
560        let session_display_name = self.config.session_name.clone().unwrap_or_else(|| {
561            // Use the directory name as a short display name for unnamed sessions
562            self.config
563                .working_dir
564                .file_name()
565                .and_then(|n| n.to_str())
566                .map(|s| s.to_string())
567                .unwrap_or_else(|| "session".to_string())
568        });
569        editor.set_session_name(Some(session_display_name));
570
571        Ok((editor, terminal))
572    }
573
574    /// Initialize the editor on first client connection.
575    ///
576    /// Performs the full first-boot sequence: build editor, restore
577    /// workspace, recover buffers from hot exit, start recovery
578    /// session.  Subsequent rebuilds (on authority/working-dir change)
579    /// go through [`rebuild_editor`].
580    pub fn initialize_editor(&mut self) -> io::Result<()> {
581        let (mut editor, terminal) = self.build_editor_instance()?;
582
583        // Restore workspace and recovery data (mirrors the standalone startup
584        // path in handle_first_run_setup in main.rs).
585        match editor.try_restore_workspace() {
586            Ok(true) => {
587                tracing::info!("Session workspace restored successfully");
588            }
589            Ok(false) => {
590                tracing::debug!("No previous session workspace found");
591            }
592            Err(e) => {
593                tracing::warn!("Failed to restore session workspace: {}", e);
594            }
595        }
596
597        // Recover buffers from hot exit recovery files
598        if editor.has_recovery_files().unwrap_or(false) {
599            tracing::info!("Recovery files found for session, recovering...");
600            match editor.recover_all_buffers() {
601                Ok(count) if count > 0 => {
602                    tracing::info!("Recovered {} buffer(s) for session", count);
603                }
604                Ok(_) => {
605                    tracing::info!("No buffers to recover for session");
606                }
607                Err(e) => {
608                    tracing::warn!("Failed to recover session buffers: {}", e);
609                }
610            }
611        }
612
613        // Start the recovery session (periodic saves of dirty buffers)
614        if let Err(e) = editor.start_recovery_session() {
615            tracing::warn!("Failed to start recovery session: {}", e);
616        }
617
618        self.terminal = Some(terminal);
619        self.editor = Some(editor);
620
621        tracing::info!(
622            "Editor initialized with size {}x{}",
623            self.term_size.cols,
624            self.term_size.rows
625        );
626
627        Ok(())
628    }
629
630    /// Rebuild the editor in place after an authority transition or a
631    /// working-directory change.
632    ///
633    /// Mirrors the restart loop in `main.rs`: save the workspace so
634    /// open buffers come back, drop the old editor (which cascades
635    /// into shutting down terminals, LSP servers, and plugin state),
636    /// swap in any new authority / working-dir, build a fresh editor,
637    /// and restore the workspace under the new backend.  The TCP
638    /// clients stay connected throughout; each is flagged for a full
639    /// redraw on the next frame so they see the new editor from a
640    /// clean state rather than a mid-transition frame.
641    pub(crate) fn rebuild_editor(
642        &mut self,
643        new_working_dir: Option<PathBuf>,
644        new_authority: Option<crate::services::authority::Authority>,
645    ) -> io::Result<()> {
646        // Flush buffer saves + workspace before dropping the old editor,
647        // mirroring the standalone exit path.  On failure we log and
648        // continue — rebuild should still succeed.
649        if let Some(ref mut editor) = self.editor {
650            if editor.config().editor.auto_save_enabled {
651                if let Err(e) = editor.save_all_on_exit() {
652                    tracing::warn!("Rebuild: failed to auto-save on exit: {}", e);
653                }
654            }
655            if let Err(e) = editor.end_recovery_session() {
656                tracing::warn!("Rebuild: failed to end recovery session: {}", e);
657            }
658            if let Err(e) = editor.save_all_windows_workspaces() {
659                tracing::warn!("Rebuild: failed to save workspaces: {}", e);
660            }
661        }
662
663        // Drop old editor + terminal.  Drop impls shut down PTYs, LSP
664        // servers, and plugin threads.
665        self.editor = None;
666        self.terminal = None;
667
668        // Apply the pending changes before building the next editor.
669        if let Some(dir) = new_working_dir {
670            tracing::info!("Rebuild: switching working dir to {}", dir.display());
671            self.config.working_dir = dir;
672        }
673        if let Some(auth) = new_authority {
674            tracing::info!(
675                "Rebuild: installing authority with label {:?}",
676                auth.display_label
677            );
678            self.current_authority = auth;
679        }
680
681        let (mut editor, terminal) = self.build_editor_instance()?;
682
683        // Bring buffers back under the new backend.  `try_restore_workspace`
684        // reads the workspace file we wrote above and re-opens the
685        // same splits/buffers.
686        match editor.try_restore_workspace() {
687            Ok(true) => tracing::info!("Rebuild: workspace restored"),
688            Ok(false) => tracing::debug!("Rebuild: no workspace to restore"),
689            Err(e) => tracing::warn!("Rebuild: failed to restore workspace: {}", e),
690        }
691
692        if let Err(e) = editor.start_recovery_session() {
693            tracing::warn!("Rebuild: failed to start recovery session: {}", e);
694        }
695
696        self.terminal = Some(terminal);
697        self.editor = Some(editor);
698
699        // Force every attached client to repaint from scratch — the
700        // previous frame described the old editor's screen.
701        for client in &mut self.clients {
702            client.needs_full_render = true;
703        }
704
705        tracing::info!(
706            "Rebuild: complete, {} clients kept attached",
707            self.clients.len()
708        );
709
710        Ok(())
711    }
712
713    /// Handle a new client connection
714    fn handle_new_connection(
715        &self,
716        conn: ServerConnection,
717        client_id: u64,
718        cursor_style: crate::config::CursorStyle,
719    ) -> io::Result<ConnectedClient> {
720        // Read client hello
721        // On Windows, don't toggle blocking mode - named pipes don't support mode switching
722        // after connection. The read_control() method handles this internally.
723        #[cfg(not(windows))]
724        conn.control.set_nonblocking(false)?;
725        let hello_json = conn
726            .read_control()?
727            .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "No hello received"))?;
728
729        let client_msg: ClientControl = serde_json::from_str(&hello_json)
730            .map_err(|e| io::Error::other(format!("Invalid hello: {}", e)))?;
731
732        let hello = match client_msg {
733            ClientControl::Hello(h) => h,
734            _ => {
735                return Err(io::Error::other("Expected Hello message"));
736            }
737        };
738
739        // Check protocol version
740        if hello.protocol_version != PROTOCOL_VERSION {
741            let mismatch = VersionMismatch {
742                server_version: env!("CARGO_PKG_VERSION").to_string(),
743                client_version: hello.client_version.clone(),
744                action: if hello.protocol_version > PROTOCOL_VERSION {
745                    "upgrade_server".to_string()
746                } else {
747                    "restart_server".to_string()
748                },
749                message: format!(
750                    "Protocol version mismatch: server={}, client={}",
751                    PROTOCOL_VERSION, hello.protocol_version
752                ),
753            };
754
755            let response = serde_json::to_string(&ServerControl::VersionMismatch(mismatch))
756                .map_err(|e| io::Error::other(e.to_string()))?;
757            conn.write_control(&response)?;
758
759            return Err(io::Error::other("Version mismatch"));
760        }
761
762        // Send server hello
763        let session_id = self.config.session_name.clone().unwrap_or_else(|| {
764            crate::workspace::encode_path_for_filename(&self.config.working_dir)
765        });
766
767        let server_hello = ServerHello::new(session_id);
768        let response = serde_json::to_string(&ServerControl::Hello(server_hello))
769            .map_err(|e| io::Error::other(e.to_string()))?;
770        conn.write_control(&response)?;
771
772        // Set sockets back to non-blocking
773        // On Windows, don't toggle mode - named pipes don't support mode switching
774        #[cfg(not(windows))]
775        conn.control.set_nonblocking(true)?;
776
777        // Send terminal setup sequences
778        let mouse_hover_enabled = self.config.editor_config.editor.mouse_hover_enabled;
779        let setup = terminal_setup_sequences(mouse_hover_enabled);
780        conn.write_data(&setup)?;
781
782        // Send cursor style (from editor if running, otherwise from config)
783        conn.write_data(cursor_style.to_escape_sequence())?;
784
785        tracing::debug!(
786            "Client {} connected: {}x{}, TERM={:?}",
787            client_id,
788            hello.term_size.cols,
789            hello.term_size.rows,
790            hello.term()
791        );
792
793        // Create background writer for non-blocking render output
794        let data_writer = ClientDataWriter::new(conn.data.clone(), client_id);
795
796        Ok(ConnectedClient {
797            conn,
798            data_writer,
799            term_size: hello.term_size,
800            env: hello.env,
801            id: client_id,
802            input_parser: InputParser::new(),
803            needs_full_render: true,
804            wait_id: None,
805        })
806    }
807
808    /// Process messages from connected clients
809    /// Returns (input_events, resize_occurred, index of client that provided input)
810    fn process_clients(&mut self) -> io::Result<(Vec<Event>, bool, Option<usize>)> {
811        let mut disconnected = Vec::new();
812        let mut input_source_client: Option<usize> = None;
813        let mut input_events = Vec::new();
814        let mut resize_occurred = false;
815        let mut control_messages: Vec<(usize, ClientControl)> = Vec::new();
816
817        for (idx, client) in self.clients.iter_mut().enumerate() {
818            // Read from data socket
819            let mut buf = [0u8; 4096];
820            let mut data_eof = false;
821            tracing::debug!("[server] reading from client {} data socket", client.id);
822            match client.conn.read_data(&mut buf) {
823                Ok(0) => {
824                    tracing::debug!("[server] Client {} data stream closed (EOF)", client.id);
825                    // Don't disconnect waiting clients on data EOF - they're not sending data
826                    if client.wait_id.is_none() {
827                        disconnected.push(idx);
828                    }
829                    data_eof = true;
830                    // Don't continue - still need to check control socket for pending messages
831                }
832                Ok(n) => {
833                    tracing::debug!(
834                        "[server] Client {} read {} bytes from data socket",
835                        client.id,
836                        n
837                    );
838                    let events = client.input_parser.parse(&buf[..n]);
839                    tracing::debug!(
840                        "[server] Client {} parsed {} events",
841                        client.id,
842                        events.len()
843                    );
844                    if !events.is_empty() {
845                        input_source_client = Some(idx);
846                    }
847                    input_events.extend(events);
848                }
849                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
850                    // No data available
851                }
852                Err(e) => {
853                    tracing::warn!("[server] Client {} data read error: {}", client.id, e);
854                    disconnected.push(idx);
855                    data_eof = true;
856                    // Don't continue - still need to check control socket for pending messages
857                }
858            }
859            let _ = data_eof; // Suppress unused warning
860
861            // Check control socket
862            // On Windows, don't toggle nonblocking mode - it fails on named pipes
863            // Best-effort: nonblocking mode for control socket polling
864            #[cfg(not(windows))]
865            #[allow(clippy::let_underscore_must_use)]
866            let _ = client.conn.control.set_nonblocking(true);
867
868            // On Windows, use try_read pattern instead of blocking read_line
869            #[cfg(windows)]
870            {
871                let mut buf = [0u8; 1024];
872                match client.conn.control.try_read(&mut buf) {
873                    Ok(0) => {
874                        tracing::debug!("Client {} control stream closed (EOF)", client.id);
875                        disconnected.push(idx);
876                    }
877                    Ok(n) => {
878                        // Try to parse as control message
879                        if let Ok(s) = std::str::from_utf8(&buf[..n]) {
880                            for line in s.lines() {
881                                if !line.trim().is_empty() {
882                                    if let Ok(msg) = serde_json::from_str::<ClientControl>(line) {
883                                        control_messages.push((idx, msg));
884                                    }
885                                }
886                            }
887                        }
888                    }
889                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
890                    Err(e) => {
891                        tracing::warn!("Client {} control read error: {}", client.id, e);
892                    }
893                }
894            }
895
896            #[cfg(not(windows))]
897            {
898                let mut reader = std::io::BufReader::new(&client.conn.control);
899                let mut line = String::new();
900                match std::io::BufRead::read_line(&mut reader, &mut line) {
901                    Ok(0) => {
902                        tracing::debug!("Client {} control stream closed (EOF)", client.id);
903                        disconnected.push(idx);
904                    }
905                    Ok(_) if !line.trim().is_empty() => {
906                        if let Ok(msg) = serde_json::from_str::<ClientControl>(&line) {
907                            control_messages.push((idx, msg));
908                        }
909                    }
910                    Ok(_) => {}
911                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
912                    Err(e) => {
913                        tracing::warn!("Client {} control read error: {}", client.id, e);
914                    }
915                }
916            }
917        }
918
919        // Process control messages
920        eprintln!(
921            "[server] Processing {} control messages",
922            control_messages.len()
923        );
924        for (idx, msg) in control_messages {
925            eprintln!("[server] Control message from client {}: {:?}", idx, msg);
926            // Always process Quit, even from disconnected clients
927            if let ClientControl::Quit = msg {
928                tracing::info!("Client requested quit, shutting down");
929                self.shutdown.store(true, Ordering::SeqCst);
930                continue;
931            }
932
933            // Always process OpenFiles - it's a one-shot command from clients that disconnect immediately
934            if let ClientControl::OpenFiles { .. } = msg {
935                // Fall through to process it
936            } else if disconnected.contains(&idx) {
937                // Skip other messages from disconnected clients
938                continue;
939            }
940
941            match msg {
942                ClientControl::Hello(_) => {
943                    tracing::warn!("Unexpected Hello from client");
944                }
945                ClientControl::Resize { cols, rows } => {
946                    if let Some(client) = self.clients.get_mut(idx) {
947                        client.term_size = TermSize::new(cols, rows);
948                        // Update server size to match first client
949                        if idx == 0 {
950                            self.term_size = TermSize::new(cols, rows);
951                            resize_occurred = true;
952                        }
953                    }
954                }
955                ClientControl::Ping => {
956                    if let Some(client) = self.clients.get_mut(idx) {
957                        let pong = serde_json::to_string(&ServerControl::Pong).unwrap_or_default();
958                        // Best-effort pong reply
959                        #[allow(clippy::let_underscore_must_use)]
960                        let _ = client.conn.write_control(&pong);
961                    }
962                }
963                ClientControl::Detach => {
964                    tracing::info!("Client {} detached", idx);
965                    disconnected.push(idx);
966                }
967                ClientControl::OpenFiles { files, wait } => {
968                    if let Some(ref mut editor) = self.editor {
969                        // Assign a wait_id if --wait was requested
970                        let wait_id = if wait {
971                            let id = self.next_wait_id;
972                            self.next_wait_id += 1;
973                            Some(id)
974                        } else {
975                            None
976                        };
977
978                        let file_count = files.len();
979                        for (i, file_req) in files.iter().enumerate() {
980                            let path = std::path::PathBuf::from(&file_req.path);
981                            tracing::debug!(
982                                "Queuing file open: {:?} line={:?} col={:?} end_line={:?} end_col={:?} message={:?}",
983                                path,
984                                file_req.line,
985                                file_req.column,
986                                file_req.end_line,
987                                file_req.end_column,
988                                file_req.message,
989                            );
990                            // Only the last file gets the wait_id (it's the one that will be active)
991                            let file_wait_id = if i == file_count - 1 { wait_id } else { None };
992                            editor.queue_file_open(
993                                path,
994                                file_req.line,
995                                file_req.column,
996                                file_req.end_line,
997                                file_req.end_column,
998                                file_req.message.clone(),
999                                file_wait_id,
1000                            );
1001                        }
1002
1003                        // Track the waiting client
1004                        if let Some(wait_id) = wait_id {
1005                            if let Some(client) = self.clients.get_mut(idx) {
1006                                self.waiting_clients.insert(wait_id, client.id);
1007                                client.wait_id = Some(wait_id);
1008                            }
1009                        }
1010
1011                        resize_occurred = true; // Force re-render
1012                    }
1013                }
1014                ClientControl::Quit => unreachable!(), // Handled above
1015            }
1016        }
1017
1018        // Check for clients with broken write pipes
1019        for (idx, client) in self.clients.iter().enumerate() {
1020            if client.data_writer.is_broken() && !disconnected.contains(&idx) {
1021                tracing::info!("Client {} write pipe broken, disconnecting", client.id);
1022                disconnected.push(idx);
1023            }
1024        }
1025
1026        // Deduplicate and sort for safe reverse removal
1027        disconnected.sort_unstable();
1028        disconnected.dedup();
1029
1030        // Remove disconnected clients
1031        for idx in disconnected.into_iter().rev() {
1032            let client = self.clients.remove(idx);
1033            // Clean up --wait tracking if this client was waiting
1034            if let Some(wait_id) = client.wait_id {
1035                self.waiting_clients.remove(&wait_id);
1036                // Also clean up editor wait_tracking for this wait_id
1037                if let Some(ref mut editor) = self.editor {
1038                    editor.remove_wait_tracking(wait_id);
1039                }
1040            }
1041            // Best-effort teardown via the non-blocking writer
1042            let teardown = terminal_teardown_sequences();
1043            let _ = client.data_writer.try_write(&teardown);
1044            tracing::info!("Client {} disconnected", client.id);
1045            // Invalidate input source if that client disconnected
1046            if input_source_client == Some(idx) {
1047                input_source_client = None;
1048            }
1049        }
1050
1051        Ok((input_events, resize_occurred, input_source_client))
1052    }
1053
1054    /// Update terminal size after resize
1055    fn update_terminal_size(&mut self) -> io::Result<()> {
1056        if let Some(ref mut terminal) = self.terminal {
1057            let backend = terminal.backend_mut();
1058            backend.resize(self.term_size.cols, self.term_size.rows);
1059        }
1060
1061        if let Some(ref mut editor) = self.editor {
1062            editor.resize(self.term_size.cols, self.term_size.rows);
1063        }
1064
1065        Ok(())
1066    }
1067
1068    /// Handle an input event
1069    fn handle_event(&mut self, event: Event) -> io::Result<bool> {
1070        let Some(ref mut editor) = self.editor else {
1071            return Ok(false);
1072        };
1073
1074        match event {
1075            Event::Key(key_event) => {
1076                if key_event.kind == KeyEventKind::Press {
1077                    editor
1078                        .handle_key(key_event.code, key_event.modifiers)
1079                        .map_err(|e| io::Error::other(e.to_string()))?;
1080                    Ok(true)
1081                } else {
1082                    Ok(false)
1083                }
1084            }
1085            Event::Mouse(mouse_event) => editor
1086                .handle_mouse(mouse_event)
1087                .map_err(|e| io::Error::other(e.to_string())),
1088            Event::Resize(w, h) => {
1089                editor.resize(w, h);
1090                Ok(true)
1091            }
1092            Event::Paste(text) => {
1093                editor.paste_text(text);
1094                Ok(true)
1095            }
1096            _ => Ok(false),
1097        }
1098    }
1099
1100    /// Render the editor and broadcast output to all clients
1101    fn render_and_broadcast(&mut self) -> io::Result<()> {
1102        let Some(ref mut editor) = self.editor else {
1103            return Ok(());
1104        };
1105
1106        let Some(ref mut terminal) = self.terminal else {
1107            return Ok(());
1108        };
1109
1110        // Check if any client needs a full render (e.g., newly connected)
1111        let any_needs_full = self.clients.iter().any(|c| c.needs_full_render);
1112        if any_needs_full {
1113            tracing::info!(
1114                "Full render requested for {} client(s)",
1115                self.clients.iter().filter(|c| c.needs_full_render).count()
1116            );
1117            // Force full redraw by invalidating terminal state
1118            terminal.backend_mut().reset_style_state();
1119            // Best-effort terminal clear for full redraw
1120            #[allow(clippy::let_underscore_must_use)]
1121            let _ = terminal.clear();
1122        }
1123
1124        // Take any pending escape sequences (e.g., cursor style changes)
1125        let pending_sequences = editor.take_pending_escape_sequences();
1126
1127        // Render to capture backend
1128        terminal
1129            .draw(|frame| editor.render(frame))
1130            .map_err(|e| io::Error::other(e.to_string()))?;
1131
1132        // Get the captured output
1133        let output = terminal.backend_mut().take_buffer();
1134
1135        if output.is_empty() && pending_sequences.is_empty() {
1136            return Ok(());
1137        }
1138
1139        // Broadcast to all clients via non-blocking writer threads (skip waiting clients)
1140        for client in &mut self.clients {
1141            if client.wait_id.is_some() {
1142                continue;
1143            }
1144            // Combine pending sequences and output into a single frame
1145            let frame = if !pending_sequences.is_empty() && !output.is_empty() {
1146                let mut combined = Vec::with_capacity(pending_sequences.len() + output.len());
1147                combined.extend_from_slice(&pending_sequences);
1148                combined.extend_from_slice(&output);
1149                combined
1150            } else if !pending_sequences.is_empty() {
1151                pending_sequences.clone()
1152            } else {
1153                output.clone()
1154            };
1155
1156            if !frame.is_empty() && !client.data_writer.try_write(&frame) {
1157                tracing::warn!("Client {} output buffer full, dropping frame", client.id);
1158            }
1159            // Clear full render flag after sending
1160            client.needs_full_render = false;
1161        }
1162
1163        Ok(())
1164    }
1165
1166    /// Disconnect all clients
1167    fn disconnect_all_clients(&mut self, reason: &str) -> io::Result<()> {
1168        let teardown = terminal_teardown_sequences();
1169        for client in &mut self.clients {
1170            // Best-effort: client may already be disconnected
1171            #[allow(clippy::let_underscore_must_use)]
1172            let _ = client.data_writer.try_write(&teardown);
1173            let quit_msg = serde_json::to_string(&ServerControl::Quit {
1174                reason: reason.to_string(),
1175            })
1176            .unwrap_or_default();
1177            // Best-effort: client may already be disconnected
1178            #[allow(clippy::let_underscore_must_use)]
1179            let _ = client.conn.write_control(&quit_msg);
1180        }
1181        self.clients.clear();
1182        Ok(())
1183    }
1184}
1185
1186impl ConnectedClient {
1187    /// Get the client's TERM environment variable
1188    #[allow(dead_code)]
1189    pub fn term(&self) -> Option<&str> {
1190        self.env.get("TERM").and_then(|v| v.as_deref())
1191    }
1192
1193    /// Check if the client supports truecolor
1194    #[allow(dead_code)]
1195    pub fn supports_truecolor(&self) -> bool {
1196        self.env
1197            .get("COLORTERM")
1198            .and_then(|v| v.as_deref())
1199            .map(|v| v == "truecolor" || v == "24bit")
1200            .unwrap_or(false)
1201    }
1202}