Skip to main content

fresh/server/
editor_server.rs

1//! Editor integration with the session server
2//!
3//! This module bridges the Editor with the server infrastructure:
4//! - Creates Editor with CaptureBackend for rendering
5//! - Processes input events from clients
6//! - Broadcasts rendered output to all clients
7
8use std::io;
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::mpsc;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use crossterm::event::{Event, KeyEventKind};
16use ratatui::Terminal;
17
18use crate::app::Editor;
19use crate::config::Config;
20use crate::config_io::DirectoryContext;
21// Filesystem is now owned by `self.current_authority`; the server no
22// longer constructs a `StdFileSystem` directly.
23use crate::server::capture_backend::{
24    terminal_setup_sequences, terminal_teardown_sequences, CaptureBackend,
25};
26use crate::server::input_parser::InputParser;
27use crate::server::ipc::{ServerConnection, ServerListener, SocketPaths, StreamWrapper};
28use crate::server::protocol::{
29    ClientControl, ServerControl, ServerHello, TermSize, VersionMismatch, PROTOCOL_VERSION,
30};
31use crate::view::color_support::ColorCapability;
32
33/// Configuration for the editor server
34pub struct EditorServerConfig {
35    /// Working directory for this session
36    pub working_dir: PathBuf,
37    /// Optional session name
38    pub session_name: Option<String>,
39    /// Idle timeout before auto-shutdown
40    pub idle_timeout: Option<Duration>,
41    /// Editor configuration
42    pub editor_config: Config,
43    /// Directory context for config/data paths
44    pub dir_context: DirectoryContext,
45    /// Whether plugins are enabled
46    pub plugins_enabled: bool,
47    /// Whether to auto-load ~/.config/fresh/init.ts (requires `plugins_enabled`).
48    pub init_enabled: bool,
49    /// Authority to install at boot.  `None` means `Authority::local()`,
50    /// which is the standard session-mode default (principle 6 of
51    /// `AUTHORITY_DESIGN.md`).  The CLI `ssh://` / `user@host:path`
52    /// forms construct an `Authority::ssh(...)` and pass it here so
53    /// the daemon boots already attached to the remote host.  Plugins
54    /// can still replace this post-boot via `setAuthority`.
55    pub startup_authority: Option<crate::services::authority::Authority>,
56    /// Opaque handle kept alive for the server's lifetime alongside
57    /// `startup_authority`.  SSH authorities back this with the Tokio
58    /// runtime, the `SshConnection`, and the reconnect task — dropping
59    /// any of those tears down the remote session — so the caller
60    /// bundles them here and the server just holds on until shutdown.
61    /// Local authorities leave this `None`.
62    pub session_keepalive: Option<Box<dyn std::any::Any + Send>>,
63}
64
65/// Editor server that manages editor state and client connections
66pub struct EditorServer {
67    config: EditorServerConfig,
68    listener: ServerListener,
69    clients: Vec<ConnectedClient>,
70    editor: Option<Editor>,
71    terminal: Option<Terminal<CaptureBackend>>,
72    last_client_activity: Instant,
73    shutdown: Arc<AtomicBool>,
74    /// Effective terminal size (from the primary/first client)
75    term_size: TermSize,
76    /// Index of the client that most recently provided input (for per-client detach)
77    last_input_client: Option<usize>,
78    /// Next wait ID for --wait tracking
79    next_wait_id: u64,
80    /// Maps wait_id → client_id for clients waiting on file events
81    waiting_clients: std::collections::HashMap<u64, u64>,
82    /// Current authority. Carried across editor rebuilds so plugin-
83    /// installed authorities (e.g. a devcontainer attach) survive the
84    /// restart-based transition: the old editor is dropped, a new one
85    /// is built with this authority in effect, and clients stay
86    /// connected the whole time. Starts as
87    /// `config.startup_authority.unwrap_or_else(Authority::local)`.
88    current_authority: crate::services::authority::Authority,
89    /// Keepalive bundle paired with the startup authority — held for
90    /// the server's lifetime so SSH runtimes, reconnect tasks, and
91    /// similar resources outlive the editor rebuilds that happen on
92    /// authority transitions.  Never inspected; dropped only when the
93    /// server is dropped.
94    #[allow(dead_code)]
95    session_keepalive: Option<Box<dyn std::any::Any + Send>>,
96}
97
98/// Buffered writer for sending data to a client without blocking the server loop.
99///
100/// Spawns a background thread that receives data via a bounded channel and
101/// writes it to the client's data pipe. If the channel fills up (client is
102/// too slow to read), frames are dropped. If the pipe breaks, the `pipe_broken`
103/// flag is set so the main loop can disconnect the client.
104struct ClientDataWriter {
105    sender: mpsc::SyncSender<Vec<u8>>,
106    pipe_broken: Arc<AtomicBool>,
107}
108
109impl ClientDataWriter {
110    /// Create a new writer that spawns a background thread to write to the data stream.
111    fn new(data: StreamWrapper, client_id: u64) -> Self {
112        // 16 frames of buffer (~270ms at 60fps before dropping frames)
113        let (tx, rx) = mpsc::sync_channel::<Vec<u8>>(16);
114        let pipe_broken = Arc::new(AtomicBool::new(false));
115        let pipe_broken_clone = pipe_broken.clone();
116
117        std::thread::Builder::new()
118            .name(format!("client-{}-writer", client_id))
119            .spawn(move || {
120                while let Ok(buf) = rx.recv() {
121                    if let Err(e) = data.write_all(&buf) {
122                        tracing::debug!("Client {} writer pipe error: {}", client_id, e);
123                        pipe_broken_clone.store(true, Ordering::Relaxed);
124                        break;
125                    }
126                    if let Err(e) = data.flush() {
127                        tracing::debug!("Client {} writer flush error: {}", client_id, e);
128                        pipe_broken_clone.store(true, Ordering::Relaxed);
129                        break;
130                    }
131                }
132                tracing::debug!("Client {} writer thread exiting", client_id);
133            })
134            .expect("Failed to spawn client writer thread");
135
136        Self {
137            sender: tx,
138            pipe_broken,
139        }
140    }
141
142    /// Try to send data without blocking. Returns false if the channel is full
143    /// (client too slow) or the writer thread has exited.
144    fn try_write(&self, data: &[u8]) -> bool {
145        self.sender.try_send(data.to_vec()).is_ok()
146    }
147
148    /// Check if the writer thread detected a broken pipe.
149    fn is_broken(&self) -> bool {
150        self.pipe_broken.load(Ordering::Relaxed)
151    }
152}
153
154/// A connected client with its own input parser
155struct ConnectedClient {
156    conn: ServerConnection,
157    /// Background writer for non-blocking data output
158    data_writer: ClientDataWriter,
159    term_size: TermSize,
160    env: std::collections::HashMap<String, Option<String>>,
161    id: u64,
162    input_parser: InputParser,
163    /// Whether this client needs a full screen render on next frame
164    needs_full_render: bool,
165    /// If set, this client is waiting for a --wait completion signal
166    wait_id: Option<u64>,
167}
168
169impl EditorServer {
170    /// Create a new editor server
171    pub fn new(mut config: EditorServerConfig) -> io::Result<Self> {
172        let socket_paths = if let Some(ref name) = config.session_name {
173            SocketPaths::for_session_name(name)?
174        } else {
175            SocketPaths::for_working_dir(&config.working_dir)?
176        };
177
178        let listener = ServerListener::bind(socket_paths)?;
179
180        // Write PID file so clients can detect stale sessions
181        let pid = std::process::id();
182        if let Err(e) = listener.paths().write_pid(pid) {
183            tracing::warn!("Failed to write PID file: {}", e);
184        }
185
186        // Move the startup authority + its keepalive off the config —
187        // they are consumed once and belong to the server from here on.
188        let current_authority = config
189            .startup_authority
190            .take()
191            .unwrap_or_else(crate::services::authority::Authority::local);
192        let session_keepalive = config.session_keepalive.take();
193
194        Ok(Self {
195            config,
196            listener,
197            clients: Vec::new(),
198            editor: None,
199            terminal: None,
200            last_client_activity: Instant::now(),
201            shutdown: Arc::new(AtomicBool::new(false)),
202            term_size: TermSize::new(80, 24), // Default until first client connects
203            last_input_client: None,
204            next_wait_id: 1,
205            waiting_clients: std::collections::HashMap::new(),
206            current_authority,
207            session_keepalive,
208        })
209    }
210
211    /// Get a handle to request shutdown
212    pub fn shutdown_handle(&self) -> Arc<AtomicBool> {
213        self.shutdown.clone()
214    }
215
216    /// Get the socket paths
217    pub fn socket_paths(&self) -> &SocketPaths {
218        self.listener.paths()
219    }
220
221    /// Access the editor instance (available after initialize_editor).
222    pub fn editor(&self) -> Option<&Editor> {
223        self.editor.as_ref()
224    }
225
226    /// Mutable access to the editor instance (available after initialize_editor).
227    pub fn editor_mut(&mut self) -> Option<&mut Editor> {
228        self.editor.as_mut()
229    }
230
231    /// Run the editor server main loop
232    pub fn run(&mut self) -> io::Result<()> {
233        tracing::info!("Editor server starting for {:?}", self.config.working_dir);
234
235        let mut next_client_id = 1u64;
236        let mut needs_render = true;
237        let mut last_render = Instant::now();
238        const FRAME_DURATION: Duration = Duration::from_millis(16); // 60fps
239
240        loop {
241            // Check for shutdown
242            if self.shutdown.load(Ordering::SeqCst) {
243                tracing::info!("Shutdown requested");
244                break;
245            }
246
247            // Check idle timeout
248            if let Some(timeout) = self.config.idle_timeout {
249                if self.clients.is_empty() && self.last_client_activity.elapsed() > timeout {
250                    tracing::info!("Idle timeout reached, shutting down");
251                    break;
252                }
253            }
254
255            // Accept new connections
256            tracing::debug!("[server] main loop: calling accept()");
257            match self.listener.accept() {
258                Ok(Some(conn)) => {
259                    // Get current cursor style from editor if it exists, otherwise from config
260                    let cursor_style = self
261                        .editor
262                        .as_ref()
263                        .map(|e| e.config().editor.cursor_style)
264                        .unwrap_or(self.config.editor_config.editor.cursor_style);
265                    match self.handle_new_connection(conn, next_client_id, cursor_style) {
266                        Ok(client) => {
267                            tracing::info!("Client {} connected", client.id);
268
269                            // Initialize editor on first-ever client, or update size if reconnecting
270                            if self.editor.is_none() {
271                                // First time - initialize editor
272                                self.term_size = client.term_size;
273                                self.initialize_editor()?;
274                            } else if self.clients.is_empty() {
275                                // Reconnecting after all clients disconnected - update terminal size
276                                if self.term_size != client.term_size {
277                                    self.term_size = client.term_size;
278                                    self.update_terminal_size()?;
279                                }
280                            }
281                            // Note: full redraw is handled via client.needs_full_render flag
282
283                            self.clients.push(client);
284                            self.last_client_activity = Instant::now();
285                            next_client_id += 1;
286                            needs_render = true;
287                        }
288                        Err(e) => {
289                            tracing::warn!("Failed to complete handshake: {}", e);
290                        }
291                    }
292                }
293                Ok(None) => {}
294                Err(e) => {
295                    tracing::error!("Accept error: {}", e);
296                }
297            }
298
299            // Process client messages and get input events
300            tracing::debug!("[server] main loop: calling process_clients");
301            let (input_events, resize_occurred, input_source) = self.process_clients()?;
302            if let Some(idx) = input_source {
303                self.last_input_client = Some(idx);
304            }
305            if !input_events.is_empty() {
306                tracing::debug!(
307                    "[server] process_clients returned {} events",
308                    input_events.len()
309                );
310            }
311
312            // Check if editor should quit. `should_quit` is set both
313            // by a genuine user quit and by `request_restart`
314            // (triggered by `change_working_dir` and by
315            // `install_authority`).  Distinguish the two by peeking at
316            // the editor's pending-restart fields: if either carries a
317            // value, rebuild the editor in place and keep clients
318            // attached; otherwise this is a real shutdown.
319            if let Some(ref mut editor) = self.editor {
320                if editor.should_quit() {
321                    let pending_authority = editor.take_pending_authority();
322                    let restart_dir = editor.take_restart_dir();
323                    if pending_authority.is_some() || restart_dir.is_some() {
324                        tracing::info!(
325                            "Session rebuild requested (authority={}, dir={})",
326                            pending_authority.is_some(),
327                            restart_dir.is_some()
328                        );
329                        if let Err(e) = self.rebuild_editor(restart_dir, pending_authority) {
330                            tracing::error!("Session rebuild failed, shutting down: {}", e);
331                            self.shutdown.store(true, Ordering::SeqCst);
332                            continue;
333                        }
334                        needs_render = true;
335                        continue;
336                    }
337                    tracing::info!("Editor requested quit");
338                    self.shutdown.store(true, Ordering::SeqCst);
339                    continue;
340                }
341            }
342
343            // Check if client should detach (keep server running)
344            let detach_requested = self
345                .editor
346                .as_ref()
347                .map(|e| e.should_detach())
348                .unwrap_or(false);
349            if detach_requested {
350                // Detach only the client that triggered it (via last input)
351                if let Some(idx) = self.last_input_client.take() {
352                    if idx < self.clients.len() {
353                        tracing::info!("Client {} requested detach", self.clients[idx].id);
354                        let client = self.clients.remove(idx);
355                        let teardown = terminal_teardown_sequences();
356                        // Best-effort: client may already be disconnected
357                        #[allow(clippy::let_underscore_must_use)]
358                        let _ = client.data_writer.try_write(&teardown);
359                        let quit_msg = serde_json::to_string(&ServerControl::Quit {
360                            reason: "Detached".to_string(),
361                        })
362                        .unwrap_or_default();
363                        // Best-effort: client may already be disconnected
364                        #[allow(clippy::let_underscore_must_use)]
365                        let _ = client.conn.write_control(&quit_msg);
366                    }
367                } else {
368                    // Fallback: if we can't determine which client, detach all
369                    tracing::info!("Detach requested but no input source, detaching all");
370                    self.disconnect_all_clients("Detached")?;
371                }
372                // Reset the detach flag
373                if let Some(ref mut editor) = self.editor {
374                    editor.clear_detach();
375                }
376                continue;
377            }
378
379            // Check if the client should suspend itself (SIGTSTP). The server
380            // keeps running — only the client drops back to the shell. On
381            // resume, the client sends a Resize to nudge a full redraw; we
382            // pre-mark `needs_full_render` so the next rendered frame after
383            // resume re-emits setup sequences and a complete paint.
384            let suspend_requested = self
385                .editor
386                .as_mut()
387                .map(|e| e.take_suspend_request())
388                .unwrap_or(false);
389            if suspend_requested {
390                if let Some(idx) = self.last_input_client {
391                    if idx < self.clients.len() {
392                        let client_id = self.clients[idx].id;
393                        tracing::info!("Client {} requested suspend", client_id);
394                        let suspend_msg = serde_json::to_string(&ServerControl::SuspendClient)
395                            .unwrap_or_default();
396                        // Best-effort: client may already be disconnected
397                        #[allow(clippy::let_underscore_must_use)]
398                        let _ = self.clients[idx].conn.write_control(&suspend_msg);
399                        // Mark so the next render re-emits setup sequences and a
400                        // full paint once the client resumes and reconnects its
401                        // terminal.
402                        self.clients[idx].needs_full_render = true;
403                    }
404                } else {
405                    tracing::warn!("Suspend requested but no input source; ignoring");
406                }
407                continue;
408            }
409
410            // Handle resize
411            if resize_occurred {
412                self.update_terminal_size()?;
413                needs_render = true;
414            }
415
416            // Process input events
417            if !input_events.is_empty() {
418                self.last_client_activity = Instant::now();
419                for event in input_events {
420                    if self.handle_event(event)? {
421                        needs_render = true;
422                    }
423                }
424            }
425
426            // Process async messages from editor
427            if let Some(ref mut editor) = self.editor {
428                if editor.process_async_messages() {
429                    needs_render = true;
430                }
431                if editor.process_pending_file_opens() {
432                    needs_render = true;
433                }
434
435                // Process completed --wait operations
436                for wait_id in editor.take_completed_waits() {
437                    if let Some(client_id) = self.waiting_clients.remove(&wait_id) {
438                        // Find the client and send WaitComplete
439                        if let Some(client) = self.clients.iter_mut().find(|c| c.id == client_id) {
440                            let msg = serde_json::to_string(&ServerControl::WaitComplete)
441                                .unwrap_or_default();
442                            #[allow(clippy::let_underscore_must_use)]
443                            let _ = client.conn.write_control(&msg);
444                            client.wait_id = None;
445                        }
446                    }
447                }
448
449                // Send pending clipboard data to clients via control message
450                if let Some(cb) = editor.take_pending_clipboard() {
451                    let msg = serde_json::to_string(&ServerControl::SetClipboard {
452                        text: cb.text,
453                        use_osc52: cb.use_osc52,
454                        use_system_clipboard: cb.use_system_clipboard,
455                    })
456                    .unwrap_or_default();
457                    for client in &mut self.clients {
458                        #[allow(clippy::let_underscore_must_use)]
459                        let _ = client.conn.write_control(&msg);
460                    }
461                }
462
463                if editor.check_mouse_hover_timer() {
464                    needs_render = true;
465                }
466            }
467
468            // Render and broadcast if needed
469            if needs_render && last_render.elapsed() >= FRAME_DURATION {
470                self.render_and_broadcast()?;
471                last_render = Instant::now();
472                needs_render = false;
473            }
474
475            // Brief sleep to avoid busy-waiting
476            std::thread::sleep(Duration::from_millis(5));
477        }
478
479        // Perform the same shutdown sequence as the normal (non-session) exit path
480        // in run_event_loop_common: auto-save, end recovery session, save workspace.
481        if let Some(ref mut editor) = self.editor {
482            // Auto-save file-backed buffers to disk before exiting
483            if editor.config().editor.auto_save_enabled {
484                match editor.save_all_on_exit() {
485                    Ok(count) if count > 0 => {
486                        tracing::info!("Auto-saved {} buffer(s) on exit", count);
487                    }
488                    Ok(_) => {}
489                    Err(e) => {
490                        tracing::warn!("Failed to auto-save on exit: {}", e);
491                    }
492                }
493            }
494
495            // End recovery session first (flushes dirty buffers + assigns recovery IDs),
496            // then save workspace (captures those IDs for next session restore).
497            if let Err(e) = editor.end_recovery_session() {
498                tracing::warn!("Failed to end recovery session: {}", e);
499            }
500            if let Err(e) = editor.save_workspace() {
501                tracing::warn!("Failed to save workspace: {}", e);
502            } else {
503                tracing::debug!("Workspace saved successfully");
504            }
505        }
506
507        // Clean shutdown
508        self.disconnect_all_clients("Server shutting down")?;
509
510        Ok(())
511    }
512
513    /// Build a fresh `Editor` instance using the current configuration
514    /// and stored authority.  Shared between first-boot initialization
515    /// and post-restart rebuild.
516    fn build_editor_instance(&self) -> io::Result<(Editor, Terminal<CaptureBackend>)> {
517        let backend = CaptureBackend::new(self.term_size.cols, self.term_size.rows);
518        let terminal = Terminal::new(backend)
519            .map_err(|e| io::Error::other(format!("Failed to create terminal: {}", e)))?;
520
521        // The Editor constructor still takes a filesystem; the real
522        // authority is installed via `set_boot_authority` right after
523        // construction so plugins and init.ts load against the correct
524        // backend from the first tick.
525        let filesystem = self.current_authority.filesystem.clone();
526        let color_capability = ColorCapability::TrueColor; // Assume truecolor for now
527
528        let mut editor = Editor::with_working_dir(
529            self.config.editor_config.clone(),
530            self.term_size.cols,
531            self.term_size.rows,
532            Some(self.config.working_dir.clone()),
533            self.config.dir_context.clone(),
534            self.config.plugins_enabled,
535            color_capability,
536            filesystem,
537        )
538        .map_err(|e| io::Error::other(format!("Failed to create editor: {}", e)))?;
539
540        editor.set_boot_authority(self.current_authority.clone());
541
542        // Auto-load init.ts via the same pipeline as the non-server entry point.
543        editor.load_init_script(self.config.init_enabled);
544
545        // Enable session mode - use hardware cursor only, no REVERSED software cursor
546        editor.set_session_mode(true);
547
548        // Set session name for status bar display
549        let session_display_name = self.config.session_name.clone().unwrap_or_else(|| {
550            // Use the directory name as a short display name for unnamed sessions
551            self.config
552                .working_dir
553                .file_name()
554                .and_then(|n| n.to_str())
555                .map(|s| s.to_string())
556                .unwrap_or_else(|| "session".to_string())
557        });
558        editor.set_session_name(Some(session_display_name));
559
560        Ok((editor, terminal))
561    }
562
563    /// Initialize the editor on first client connection.
564    ///
565    /// Performs the full first-boot sequence: build editor, restore
566    /// workspace, recover buffers from hot exit, start recovery
567    /// session.  Subsequent rebuilds (on authority/working-dir change)
568    /// go through [`rebuild_editor`].
569    pub fn initialize_editor(&mut self) -> io::Result<()> {
570        let (mut editor, terminal) = self.build_editor_instance()?;
571
572        // Restore workspace and recovery data (mirrors the standalone startup
573        // path in handle_first_run_setup in main.rs).
574        match editor.try_restore_workspace() {
575            Ok(true) => {
576                tracing::info!("Session workspace restored successfully");
577            }
578            Ok(false) => {
579                tracing::debug!("No previous session workspace found");
580            }
581            Err(e) => {
582                tracing::warn!("Failed to restore session workspace: {}", e);
583            }
584        }
585
586        // Recover buffers from hot exit recovery files
587        if editor.has_recovery_files().unwrap_or(false) {
588            tracing::info!("Recovery files found for session, recovering...");
589            match editor.recover_all_buffers() {
590                Ok(count) if count > 0 => {
591                    tracing::info!("Recovered {} buffer(s) for session", count);
592                }
593                Ok(_) => {
594                    tracing::info!("No buffers to recover for session");
595                }
596                Err(e) => {
597                    tracing::warn!("Failed to recover session buffers: {}", e);
598                }
599            }
600        }
601
602        // Start the recovery session (periodic saves of dirty buffers)
603        if let Err(e) = editor.start_recovery_session() {
604            tracing::warn!("Failed to start recovery session: {}", e);
605        }
606
607        self.terminal = Some(terminal);
608        self.editor = Some(editor);
609
610        tracing::info!(
611            "Editor initialized with size {}x{}",
612            self.term_size.cols,
613            self.term_size.rows
614        );
615
616        Ok(())
617    }
618
619    /// Rebuild the editor in place after an authority transition or a
620    /// working-directory change.
621    ///
622    /// Mirrors the restart loop in `main.rs`: save the workspace so
623    /// open buffers come back, drop the old editor (which cascades
624    /// into shutting down terminals, LSP servers, and plugin state),
625    /// swap in any new authority / working-dir, build a fresh editor,
626    /// and restore the workspace under the new backend.  The TCP
627    /// clients stay connected throughout; each is flagged for a full
628    /// redraw on the next frame so they see the new editor from a
629    /// clean state rather than a mid-transition frame.
630    pub(crate) fn rebuild_editor(
631        &mut self,
632        new_working_dir: Option<PathBuf>,
633        new_authority: Option<crate::services::authority::Authority>,
634    ) -> io::Result<()> {
635        // Flush buffer saves + workspace before dropping the old editor,
636        // mirroring the standalone exit path.  On failure we log and
637        // continue — rebuild should still succeed.
638        if let Some(ref mut editor) = self.editor {
639            if editor.config().editor.auto_save_enabled {
640                if let Err(e) = editor.save_all_on_exit() {
641                    tracing::warn!("Rebuild: failed to auto-save on exit: {}", e);
642                }
643            }
644            if let Err(e) = editor.end_recovery_session() {
645                tracing::warn!("Rebuild: failed to end recovery session: {}", e);
646            }
647            if let Err(e) = editor.save_workspace() {
648                tracing::warn!("Rebuild: failed to save workspace: {}", e);
649            }
650        }
651
652        // Drop old editor + terminal.  Drop impls shut down PTYs, LSP
653        // servers, and plugin threads.
654        self.editor = None;
655        self.terminal = None;
656
657        // Apply the pending changes before building the next editor.
658        if let Some(dir) = new_working_dir {
659            tracing::info!("Rebuild: switching working dir to {}", dir.display());
660            self.config.working_dir = dir;
661        }
662        if let Some(auth) = new_authority {
663            tracing::info!(
664                "Rebuild: installing authority with label {:?}",
665                auth.display_label
666            );
667            self.current_authority = auth;
668        }
669
670        let (mut editor, terminal) = self.build_editor_instance()?;
671
672        // Bring buffers back under the new backend.  `try_restore_workspace`
673        // reads the workspace file we wrote above and re-opens the
674        // same splits/buffers.
675        match editor.try_restore_workspace() {
676            Ok(true) => tracing::info!("Rebuild: workspace restored"),
677            Ok(false) => tracing::debug!("Rebuild: no workspace to restore"),
678            Err(e) => tracing::warn!("Rebuild: failed to restore workspace: {}", e),
679        }
680
681        if let Err(e) = editor.start_recovery_session() {
682            tracing::warn!("Rebuild: failed to start recovery session: {}", e);
683        }
684
685        self.terminal = Some(terminal);
686        self.editor = Some(editor);
687
688        // Force every attached client to repaint from scratch — the
689        // previous frame described the old editor's screen.
690        for client in &mut self.clients {
691            client.needs_full_render = true;
692        }
693
694        tracing::info!(
695            "Rebuild: complete, {} clients kept attached",
696            self.clients.len()
697        );
698
699        Ok(())
700    }
701
702    /// Handle a new client connection
703    fn handle_new_connection(
704        &self,
705        conn: ServerConnection,
706        client_id: u64,
707        cursor_style: crate::config::CursorStyle,
708    ) -> io::Result<ConnectedClient> {
709        // Read client hello
710        // On Windows, don't toggle blocking mode - named pipes don't support mode switching
711        // after connection. The read_control() method handles this internally.
712        #[cfg(not(windows))]
713        conn.control.set_nonblocking(false)?;
714        let hello_json = conn
715            .read_control()?
716            .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "No hello received"))?;
717
718        let client_msg: ClientControl = serde_json::from_str(&hello_json)
719            .map_err(|e| io::Error::other(format!("Invalid hello: {}", e)))?;
720
721        let hello = match client_msg {
722            ClientControl::Hello(h) => h,
723            _ => {
724                return Err(io::Error::other("Expected Hello message"));
725            }
726        };
727
728        // Check protocol version
729        if hello.protocol_version != PROTOCOL_VERSION {
730            let mismatch = VersionMismatch {
731                server_version: env!("CARGO_PKG_VERSION").to_string(),
732                client_version: hello.client_version.clone(),
733                action: if hello.protocol_version > PROTOCOL_VERSION {
734                    "upgrade_server".to_string()
735                } else {
736                    "restart_server".to_string()
737                },
738                message: format!(
739                    "Protocol version mismatch: server={}, client={}",
740                    PROTOCOL_VERSION, hello.protocol_version
741                ),
742            };
743
744            let response = serde_json::to_string(&ServerControl::VersionMismatch(mismatch))
745                .map_err(|e| io::Error::other(e.to_string()))?;
746            conn.write_control(&response)?;
747
748            return Err(io::Error::other("Version mismatch"));
749        }
750
751        // Send server hello
752        let session_id = self.config.session_name.clone().unwrap_or_else(|| {
753            crate::workspace::encode_path_for_filename(&self.config.working_dir)
754        });
755
756        let server_hello = ServerHello::new(session_id);
757        let response = serde_json::to_string(&ServerControl::Hello(server_hello))
758            .map_err(|e| io::Error::other(e.to_string()))?;
759        conn.write_control(&response)?;
760
761        // Set sockets back to non-blocking
762        // On Windows, don't toggle mode - named pipes don't support mode switching
763        #[cfg(not(windows))]
764        conn.control.set_nonblocking(true)?;
765
766        // Send terminal setup sequences
767        let mouse_hover_enabled = self.config.editor_config.editor.mouse_hover_enabled;
768        let setup = terminal_setup_sequences(mouse_hover_enabled);
769        conn.write_data(&setup)?;
770
771        // Send cursor style (from editor if running, otherwise from config)
772        conn.write_data(cursor_style.to_escape_sequence())?;
773
774        tracing::debug!(
775            "Client {} connected: {}x{}, TERM={:?}",
776            client_id,
777            hello.term_size.cols,
778            hello.term_size.rows,
779            hello.term()
780        );
781
782        // Create background writer for non-blocking render output
783        let data_writer = ClientDataWriter::new(conn.data.clone(), client_id);
784
785        Ok(ConnectedClient {
786            conn,
787            data_writer,
788            term_size: hello.term_size,
789            env: hello.env,
790            id: client_id,
791            input_parser: InputParser::new(),
792            needs_full_render: true,
793            wait_id: None,
794        })
795    }
796
797    /// Process messages from connected clients
798    /// Returns (input_events, resize_occurred, index of client that provided input)
799    fn process_clients(&mut self) -> io::Result<(Vec<Event>, bool, Option<usize>)> {
800        let mut disconnected = Vec::new();
801        let mut input_source_client: Option<usize> = None;
802        let mut input_events = Vec::new();
803        let mut resize_occurred = false;
804        let mut control_messages: Vec<(usize, ClientControl)> = Vec::new();
805
806        for (idx, client) in self.clients.iter_mut().enumerate() {
807            // Read from data socket
808            let mut buf = [0u8; 4096];
809            let mut data_eof = false;
810            tracing::debug!("[server] reading from client {} data socket", client.id);
811            match client.conn.read_data(&mut buf) {
812                Ok(0) => {
813                    tracing::debug!("[server] Client {} data stream closed (EOF)", client.id);
814                    // Don't disconnect waiting clients on data EOF - they're not sending data
815                    if client.wait_id.is_none() {
816                        disconnected.push(idx);
817                    }
818                    data_eof = true;
819                    // Don't continue - still need to check control socket for pending messages
820                }
821                Ok(n) => {
822                    tracing::debug!(
823                        "[server] Client {} read {} bytes from data socket",
824                        client.id,
825                        n
826                    );
827                    let events = client.input_parser.parse(&buf[..n]);
828                    tracing::debug!(
829                        "[server] Client {} parsed {} events",
830                        client.id,
831                        events.len()
832                    );
833                    if !events.is_empty() {
834                        input_source_client = Some(idx);
835                    }
836                    input_events.extend(events);
837                }
838                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
839                    // No data available
840                }
841                Err(e) => {
842                    tracing::warn!("[server] Client {} data read error: {}", client.id, e);
843                    disconnected.push(idx);
844                    data_eof = true;
845                    // Don't continue - still need to check control socket for pending messages
846                }
847            }
848            let _ = data_eof; // Suppress unused warning
849
850            // Check control socket
851            // On Windows, don't toggle nonblocking mode - it fails on named pipes
852            // Best-effort: nonblocking mode for control socket polling
853            #[cfg(not(windows))]
854            #[allow(clippy::let_underscore_must_use)]
855            let _ = client.conn.control.set_nonblocking(true);
856
857            // On Windows, use try_read pattern instead of blocking read_line
858            #[cfg(windows)]
859            {
860                let mut buf = [0u8; 1024];
861                match client.conn.control.try_read(&mut buf) {
862                    Ok(0) => {
863                        tracing::debug!("Client {} control stream closed (EOF)", client.id);
864                        disconnected.push(idx);
865                    }
866                    Ok(n) => {
867                        // Try to parse as control message
868                        if let Ok(s) = std::str::from_utf8(&buf[..n]) {
869                            for line in s.lines() {
870                                if !line.trim().is_empty() {
871                                    if let Ok(msg) = serde_json::from_str::<ClientControl>(line) {
872                                        control_messages.push((idx, msg));
873                                    }
874                                }
875                            }
876                        }
877                    }
878                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
879                    Err(e) => {
880                        tracing::warn!("Client {} control read error: {}", client.id, e);
881                    }
882                }
883            }
884
885            #[cfg(not(windows))]
886            {
887                let mut reader = std::io::BufReader::new(&client.conn.control);
888                let mut line = String::new();
889                match std::io::BufRead::read_line(&mut reader, &mut line) {
890                    Ok(0) => {
891                        tracing::debug!("Client {} control stream closed (EOF)", client.id);
892                        disconnected.push(idx);
893                    }
894                    Ok(_) if !line.trim().is_empty() => {
895                        if let Ok(msg) = serde_json::from_str::<ClientControl>(&line) {
896                            control_messages.push((idx, msg));
897                        }
898                    }
899                    Ok(_) => {}
900                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
901                    Err(e) => {
902                        tracing::warn!("Client {} control read error: {}", client.id, e);
903                    }
904                }
905            }
906        }
907
908        // Process control messages
909        eprintln!(
910            "[server] Processing {} control messages",
911            control_messages.len()
912        );
913        for (idx, msg) in control_messages {
914            eprintln!("[server] Control message from client {}: {:?}", idx, msg);
915            // Always process Quit, even from disconnected clients
916            if let ClientControl::Quit = msg {
917                tracing::info!("Client requested quit, shutting down");
918                self.shutdown.store(true, Ordering::SeqCst);
919                continue;
920            }
921
922            // Always process OpenFiles - it's a one-shot command from clients that disconnect immediately
923            if let ClientControl::OpenFiles { .. } = msg {
924                // Fall through to process it
925            } else if disconnected.contains(&idx) {
926                // Skip other messages from disconnected clients
927                continue;
928            }
929
930            match msg {
931                ClientControl::Hello(_) => {
932                    tracing::warn!("Unexpected Hello from client");
933                }
934                ClientControl::Resize { cols, rows } => {
935                    if let Some(client) = self.clients.get_mut(idx) {
936                        client.term_size = TermSize::new(cols, rows);
937                        // Update server size to match first client
938                        if idx == 0 {
939                            self.term_size = TermSize::new(cols, rows);
940                            resize_occurred = true;
941                        }
942                    }
943                }
944                ClientControl::Ping => {
945                    if let Some(client) = self.clients.get_mut(idx) {
946                        let pong = serde_json::to_string(&ServerControl::Pong).unwrap_or_default();
947                        // Best-effort pong reply
948                        #[allow(clippy::let_underscore_must_use)]
949                        let _ = client.conn.write_control(&pong);
950                    }
951                }
952                ClientControl::Detach => {
953                    tracing::info!("Client {} detached", idx);
954                    disconnected.push(idx);
955                }
956                ClientControl::OpenFiles { files, wait } => {
957                    if let Some(ref mut editor) = self.editor {
958                        // Assign a wait_id if --wait was requested
959                        let wait_id = if wait {
960                            let id = self.next_wait_id;
961                            self.next_wait_id += 1;
962                            Some(id)
963                        } else {
964                            None
965                        };
966
967                        let file_count = files.len();
968                        for (i, file_req) in files.iter().enumerate() {
969                            let path = std::path::PathBuf::from(&file_req.path);
970                            tracing::debug!(
971                                "Queuing file open: {:?} line={:?} col={:?} end_line={:?} end_col={:?} message={:?}",
972                                path,
973                                file_req.line,
974                                file_req.column,
975                                file_req.end_line,
976                                file_req.end_column,
977                                file_req.message,
978                            );
979                            // Only the last file gets the wait_id (it's the one that will be active)
980                            let file_wait_id = if i == file_count - 1 { wait_id } else { None };
981                            editor.queue_file_open(
982                                path,
983                                file_req.line,
984                                file_req.column,
985                                file_req.end_line,
986                                file_req.end_column,
987                                file_req.message.clone(),
988                                file_wait_id,
989                            );
990                        }
991
992                        // Track the waiting client
993                        if let Some(wait_id) = wait_id {
994                            if let Some(client) = self.clients.get_mut(idx) {
995                                self.waiting_clients.insert(wait_id, client.id);
996                                client.wait_id = Some(wait_id);
997                            }
998                        }
999
1000                        resize_occurred = true; // Force re-render
1001                    }
1002                }
1003                ClientControl::Quit => unreachable!(), // Handled above
1004            }
1005        }
1006
1007        // Check for clients with broken write pipes
1008        for (idx, client) in self.clients.iter().enumerate() {
1009            if client.data_writer.is_broken() && !disconnected.contains(&idx) {
1010                tracing::info!("Client {} write pipe broken, disconnecting", client.id);
1011                disconnected.push(idx);
1012            }
1013        }
1014
1015        // Deduplicate and sort for safe reverse removal
1016        disconnected.sort_unstable();
1017        disconnected.dedup();
1018
1019        // Remove disconnected clients
1020        for idx in disconnected.into_iter().rev() {
1021            let client = self.clients.remove(idx);
1022            // Clean up --wait tracking if this client was waiting
1023            if let Some(wait_id) = client.wait_id {
1024                self.waiting_clients.remove(&wait_id);
1025                // Also clean up editor wait_tracking for this wait_id
1026                if let Some(ref mut editor) = self.editor {
1027                    editor.remove_wait_tracking(wait_id);
1028                }
1029            }
1030            // Best-effort teardown via the non-blocking writer
1031            let teardown = terminal_teardown_sequences();
1032            let _ = client.data_writer.try_write(&teardown);
1033            tracing::info!("Client {} disconnected", client.id);
1034            // Invalidate input source if that client disconnected
1035            if input_source_client == Some(idx) {
1036                input_source_client = None;
1037            }
1038        }
1039
1040        Ok((input_events, resize_occurred, input_source_client))
1041    }
1042
1043    /// Update terminal size after resize
1044    fn update_terminal_size(&mut self) -> io::Result<()> {
1045        if let Some(ref mut terminal) = self.terminal {
1046            let backend = terminal.backend_mut();
1047            backend.resize(self.term_size.cols, self.term_size.rows);
1048        }
1049
1050        if let Some(ref mut editor) = self.editor {
1051            editor.resize(self.term_size.cols, self.term_size.rows);
1052        }
1053
1054        Ok(())
1055    }
1056
1057    /// Handle an input event
1058    fn handle_event(&mut self, event: Event) -> io::Result<bool> {
1059        let Some(ref mut editor) = self.editor else {
1060            return Ok(false);
1061        };
1062
1063        match event {
1064            Event::Key(key_event) => {
1065                if key_event.kind == KeyEventKind::Press {
1066                    editor
1067                        .handle_key(key_event.code, key_event.modifiers)
1068                        .map_err(|e| io::Error::other(e.to_string()))?;
1069                    Ok(true)
1070                } else {
1071                    Ok(false)
1072                }
1073            }
1074            Event::Mouse(mouse_event) => editor
1075                .handle_mouse(mouse_event)
1076                .map_err(|e| io::Error::other(e.to_string())),
1077            Event::Resize(w, h) => {
1078                editor.resize(w, h);
1079                Ok(true)
1080            }
1081            Event::Paste(text) => {
1082                editor.paste_text(text);
1083                Ok(true)
1084            }
1085            _ => Ok(false),
1086        }
1087    }
1088
1089    /// Render the editor and broadcast output to all clients
1090    fn render_and_broadcast(&mut self) -> io::Result<()> {
1091        let Some(ref mut editor) = self.editor else {
1092            return Ok(());
1093        };
1094
1095        let Some(ref mut terminal) = self.terminal else {
1096            return Ok(());
1097        };
1098
1099        // Check if any client needs a full render (e.g., newly connected)
1100        let any_needs_full = self.clients.iter().any(|c| c.needs_full_render);
1101        if any_needs_full {
1102            tracing::info!(
1103                "Full render requested for {} client(s)",
1104                self.clients.iter().filter(|c| c.needs_full_render).count()
1105            );
1106            // Force full redraw by invalidating terminal state
1107            terminal.backend_mut().reset_style_state();
1108            // Best-effort terminal clear for full redraw
1109            #[allow(clippy::let_underscore_must_use)]
1110            let _ = terminal.clear();
1111        }
1112
1113        // Take any pending escape sequences (e.g., cursor style changes)
1114        let pending_sequences = editor.take_pending_escape_sequences();
1115
1116        // Render to capture backend
1117        terminal
1118            .draw(|frame| editor.render(frame))
1119            .map_err(|e| io::Error::other(e.to_string()))?;
1120
1121        // Get the captured output
1122        let output = terminal.backend_mut().take_buffer();
1123
1124        if output.is_empty() && pending_sequences.is_empty() {
1125            return Ok(());
1126        }
1127
1128        // Broadcast to all clients via non-blocking writer threads (skip waiting clients)
1129        for client in &mut self.clients {
1130            if client.wait_id.is_some() {
1131                continue;
1132            }
1133            // Combine pending sequences and output into a single frame
1134            let frame = if !pending_sequences.is_empty() && !output.is_empty() {
1135                let mut combined = Vec::with_capacity(pending_sequences.len() + output.len());
1136                combined.extend_from_slice(&pending_sequences);
1137                combined.extend_from_slice(&output);
1138                combined
1139            } else if !pending_sequences.is_empty() {
1140                pending_sequences.clone()
1141            } else {
1142                output.clone()
1143            };
1144
1145            if !frame.is_empty() && !client.data_writer.try_write(&frame) {
1146                tracing::warn!("Client {} output buffer full, dropping frame", client.id);
1147            }
1148            // Clear full render flag after sending
1149            client.needs_full_render = false;
1150        }
1151
1152        Ok(())
1153    }
1154
1155    /// Disconnect all clients
1156    fn disconnect_all_clients(&mut self, reason: &str) -> io::Result<()> {
1157        let teardown = terminal_teardown_sequences();
1158        for client in &mut self.clients {
1159            // Best-effort: client may already be disconnected
1160            #[allow(clippy::let_underscore_must_use)]
1161            let _ = client.data_writer.try_write(&teardown);
1162            let quit_msg = serde_json::to_string(&ServerControl::Quit {
1163                reason: reason.to_string(),
1164            })
1165            .unwrap_or_default();
1166            // Best-effort: client may already be disconnected
1167            #[allow(clippy::let_underscore_must_use)]
1168            let _ = client.conn.write_control(&quit_msg);
1169        }
1170        self.clients.clear();
1171        Ok(())
1172    }
1173}
1174
1175impl ConnectedClient {
1176    /// Get the client's TERM environment variable
1177    #[allow(dead_code)]
1178    pub fn term(&self) -> Option<&str> {
1179        self.env.get("TERM").and_then(|v| v.as_deref())
1180    }
1181
1182    /// Check if the client supports truecolor
1183    #[allow(dead_code)]
1184    pub fn supports_truecolor(&self) -> bool {
1185        self.env
1186            .get("COLORTERM")
1187            .and_then(|v| v.as_deref())
1188            .map(|v| v == "truecolor" || v == "24bit")
1189            .unwrap_or(false)
1190    }
1191}