Skip to main content

fresh/server/
editor_server.rs

1//! Editor integration with the session server
2//!
3//! This module bridges the Editor with the server infrastructure:
4//! - Creates Editor with CaptureBackend for rendering
5//! - Processes input events from clients
6//! - Broadcasts rendered output to all clients
7
8use std::io;
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::mpsc;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use crossterm::event::{Event, KeyEventKind};
16use ratatui::Terminal;
17
18use crate::app::Editor;
19use crate::config::Config;
20use crate::config_io::DirectoryContext;
21use crate::model::filesystem::{FileSystem, StdFileSystem};
22use crate::server::capture_backend::{
23    terminal_setup_sequences, terminal_teardown_sequences, CaptureBackend,
24};
25use crate::server::input_parser::InputParser;
26use crate::server::ipc::{ServerConnection, ServerListener, SocketPaths, StreamWrapper};
27use crate::server::protocol::{
28    ClientControl, ServerControl, ServerHello, TermSize, VersionMismatch, PROTOCOL_VERSION,
29};
30use crate::view::color_support::ColorCapability;
31
32/// Configuration for the editor server
33#[derive(Debug, Clone)]
34pub struct EditorServerConfig {
35    /// Working directory for this session
36    pub working_dir: PathBuf,
37    /// Optional session name
38    pub session_name: Option<String>,
39    /// Idle timeout before auto-shutdown
40    pub idle_timeout: Option<Duration>,
41    /// Editor configuration
42    pub editor_config: Config,
43    /// Directory context for config/data paths
44    pub dir_context: DirectoryContext,
45    /// Whether plugins are enabled
46    pub plugins_enabled: bool,
47}
48
49/// Editor server that manages editor state and client connections
50pub struct EditorServer {
51    config: EditorServerConfig,
52    listener: ServerListener,
53    clients: Vec<ConnectedClient>,
54    editor: Option<Editor>,
55    terminal: Option<Terminal<CaptureBackend>>,
56    last_client_activity: Instant,
57    shutdown: Arc<AtomicBool>,
58    /// Effective terminal size (from the primary/first client)
59    term_size: TermSize,
60    /// Index of the client that most recently provided input (for per-client detach)
61    last_input_client: Option<usize>,
62    /// Next wait ID for --wait tracking
63    next_wait_id: u64,
64    /// Maps wait_id → client_id for clients waiting on file events
65    waiting_clients: std::collections::HashMap<u64, u64>,
66}
67
68/// Buffered writer for sending data to a client without blocking the server loop.
69///
70/// Spawns a background thread that receives data via a bounded channel and
71/// writes it to the client's data pipe. If the channel fills up (client is
72/// too slow to read), frames are dropped. If the pipe breaks, the `pipe_broken`
73/// flag is set so the main loop can disconnect the client.
74struct ClientDataWriter {
75    sender: mpsc::SyncSender<Vec<u8>>,
76    pipe_broken: Arc<AtomicBool>,
77}
78
79impl ClientDataWriter {
80    /// Create a new writer that spawns a background thread to write to the data stream.
81    fn new(data: StreamWrapper, client_id: u64) -> Self {
82        // 16 frames of buffer (~270ms at 60fps before dropping frames)
83        let (tx, rx) = mpsc::sync_channel::<Vec<u8>>(16);
84        let pipe_broken = Arc::new(AtomicBool::new(false));
85        let pipe_broken_clone = pipe_broken.clone();
86
87        std::thread::Builder::new()
88            .name(format!("client-{}-writer", client_id))
89            .spawn(move || {
90                while let Ok(buf) = rx.recv() {
91                    if let Err(e) = data.write_all(&buf) {
92                        tracing::debug!("Client {} writer pipe error: {}", client_id, e);
93                        pipe_broken_clone.store(true, Ordering::Relaxed);
94                        break;
95                    }
96                    if let Err(e) = data.flush() {
97                        tracing::debug!("Client {} writer flush error: {}", client_id, e);
98                        pipe_broken_clone.store(true, Ordering::Relaxed);
99                        break;
100                    }
101                }
102                tracing::debug!("Client {} writer thread exiting", client_id);
103            })
104            .expect("Failed to spawn client writer thread");
105
106        Self {
107            sender: tx,
108            pipe_broken,
109        }
110    }
111
112    /// Try to send data without blocking. Returns false if the channel is full
113    /// (client too slow) or the writer thread has exited.
114    fn try_write(&self, data: &[u8]) -> bool {
115        self.sender.try_send(data.to_vec()).is_ok()
116    }
117
118    /// Check if the writer thread detected a broken pipe.
119    fn is_broken(&self) -> bool {
120        self.pipe_broken.load(Ordering::Relaxed)
121    }
122}
123
124/// A connected client with its own input parser
125struct ConnectedClient {
126    conn: ServerConnection,
127    /// Background writer for non-blocking data output
128    data_writer: ClientDataWriter,
129    term_size: TermSize,
130    env: std::collections::HashMap<String, Option<String>>,
131    id: u64,
132    input_parser: InputParser,
133    /// Whether this client needs a full screen render on next frame
134    needs_full_render: bool,
135    /// If set, this client is waiting for a --wait completion signal
136    wait_id: Option<u64>,
137}
138
139impl EditorServer {
140    /// Create a new editor server
141    pub fn new(config: EditorServerConfig) -> io::Result<Self> {
142        let socket_paths = if let Some(ref name) = config.session_name {
143            SocketPaths::for_session_name(name)?
144        } else {
145            SocketPaths::for_working_dir(&config.working_dir)?
146        };
147
148        let listener = ServerListener::bind(socket_paths)?;
149
150        // Write PID file so clients can detect stale sessions
151        let pid = std::process::id();
152        if let Err(e) = listener.paths().write_pid(pid) {
153            tracing::warn!("Failed to write PID file: {}", e);
154        }
155
156        Ok(Self {
157            config,
158            listener,
159            clients: Vec::new(),
160            editor: None,
161            terminal: None,
162            last_client_activity: Instant::now(),
163            shutdown: Arc::new(AtomicBool::new(false)),
164            term_size: TermSize::new(80, 24), // Default until first client connects
165            last_input_client: None,
166            next_wait_id: 1,
167            waiting_clients: std::collections::HashMap::new(),
168        })
169    }
170
171    /// Get a handle to request shutdown
172    pub fn shutdown_handle(&self) -> Arc<AtomicBool> {
173        self.shutdown.clone()
174    }
175
176    /// Get the socket paths
177    pub fn socket_paths(&self) -> &SocketPaths {
178        self.listener.paths()
179    }
180
181    /// Access the editor instance (available after initialize_editor).
182    pub fn editor(&self) -> Option<&Editor> {
183        self.editor.as_ref()
184    }
185
186    /// Mutable access to the editor instance (available after initialize_editor).
187    pub fn editor_mut(&mut self) -> Option<&mut Editor> {
188        self.editor.as_mut()
189    }
190
191    /// Run the editor server main loop
192    pub fn run(&mut self) -> io::Result<()> {
193        tracing::info!("Editor server starting for {:?}", self.config.working_dir);
194
195        let mut next_client_id = 1u64;
196        let mut needs_render = true;
197        let mut last_render = Instant::now();
198        const FRAME_DURATION: Duration = Duration::from_millis(16); // 60fps
199
200        loop {
201            // Check for shutdown
202            if self.shutdown.load(Ordering::SeqCst) {
203                tracing::info!("Shutdown requested");
204                break;
205            }
206
207            // Check idle timeout
208            if let Some(timeout) = self.config.idle_timeout {
209                if self.clients.is_empty() && self.last_client_activity.elapsed() > timeout {
210                    tracing::info!("Idle timeout reached, shutting down");
211                    break;
212                }
213            }
214
215            // Accept new connections
216            tracing::debug!("[server] main loop: calling accept()");
217            match self.listener.accept() {
218                Ok(Some(conn)) => {
219                    // Get current cursor style from editor if it exists, otherwise from config
220                    let cursor_style = self
221                        .editor
222                        .as_ref()
223                        .map(|e| e.config().editor.cursor_style)
224                        .unwrap_or(self.config.editor_config.editor.cursor_style);
225                    match self.handle_new_connection(conn, next_client_id, cursor_style) {
226                        Ok(client) => {
227                            tracing::info!("Client {} connected", client.id);
228
229                            // Initialize editor on first-ever client, or update size if reconnecting
230                            if self.editor.is_none() {
231                                // First time - initialize editor
232                                self.term_size = client.term_size;
233                                self.initialize_editor()?;
234                            } else if self.clients.is_empty() {
235                                // Reconnecting after all clients disconnected - update terminal size
236                                if self.term_size != client.term_size {
237                                    self.term_size = client.term_size;
238                                    self.update_terminal_size()?;
239                                }
240                            }
241                            // Note: full redraw is handled via client.needs_full_render flag
242
243                            self.clients.push(client);
244                            self.last_client_activity = Instant::now();
245                            next_client_id += 1;
246                            needs_render = true;
247                        }
248                        Err(e) => {
249                            tracing::warn!("Failed to complete handshake: {}", e);
250                        }
251                    }
252                }
253                Ok(None) => {}
254                Err(e) => {
255                    tracing::error!("Accept error: {}", e);
256                }
257            }
258
259            // Process client messages and get input events
260            tracing::debug!("[server] main loop: calling process_clients");
261            let (input_events, resize_occurred, input_source) = self.process_clients()?;
262            if let Some(idx) = input_source {
263                self.last_input_client = Some(idx);
264            }
265            if !input_events.is_empty() {
266                tracing::debug!(
267                    "[server] process_clients returned {} events",
268                    input_events.len()
269                );
270            }
271
272            // Check if editor should quit
273            if let Some(ref editor) = self.editor {
274                if editor.should_quit() {
275                    tracing::info!("Editor requested quit");
276                    self.shutdown.store(true, Ordering::SeqCst);
277                    continue;
278                }
279            }
280
281            // Check if client should detach (keep server running)
282            let detach_requested = self
283                .editor
284                .as_ref()
285                .map(|e| e.should_detach())
286                .unwrap_or(false);
287            if detach_requested {
288                // Detach only the client that triggered it (via last input)
289                if let Some(idx) = self.last_input_client.take() {
290                    if idx < self.clients.len() {
291                        tracing::info!("Client {} requested detach", self.clients[idx].id);
292                        let client = self.clients.remove(idx);
293                        let teardown = terminal_teardown_sequences();
294                        // Best-effort: client may already be disconnected
295                        #[allow(clippy::let_underscore_must_use)]
296                        let _ = client.data_writer.try_write(&teardown);
297                        let quit_msg = serde_json::to_string(&ServerControl::Quit {
298                            reason: "Detached".to_string(),
299                        })
300                        .unwrap_or_default();
301                        // Best-effort: client may already be disconnected
302                        #[allow(clippy::let_underscore_must_use)]
303                        let _ = client.conn.write_control(&quit_msg);
304                    }
305                } else {
306                    // Fallback: if we can't determine which client, detach all
307                    tracing::info!("Detach requested but no input source, detaching all");
308                    self.disconnect_all_clients("Detached")?;
309                }
310                // Reset the detach flag
311                if let Some(ref mut editor) = self.editor {
312                    editor.clear_detach();
313                }
314                continue;
315            }
316
317            // Handle resize
318            if resize_occurred {
319                self.update_terminal_size()?;
320                needs_render = true;
321            }
322
323            // Process input events
324            if !input_events.is_empty() {
325                self.last_client_activity = Instant::now();
326                for event in input_events {
327                    if self.handle_event(event)? {
328                        needs_render = true;
329                    }
330                }
331            }
332
333            // Process async messages from editor
334            if let Some(ref mut editor) = self.editor {
335                if editor.process_async_messages() {
336                    needs_render = true;
337                }
338                if editor.process_pending_file_opens() {
339                    needs_render = true;
340                }
341
342                // Process completed --wait operations
343                for wait_id in editor.take_completed_waits() {
344                    if let Some(client_id) = self.waiting_clients.remove(&wait_id) {
345                        // Find the client and send WaitComplete
346                        if let Some(client) = self.clients.iter_mut().find(|c| c.id == client_id) {
347                            let msg = serde_json::to_string(&ServerControl::WaitComplete)
348                                .unwrap_or_default();
349                            #[allow(clippy::let_underscore_must_use)]
350                            let _ = client.conn.write_control(&msg);
351                            client.wait_id = None;
352                        }
353                    }
354                }
355
356                // Send pending clipboard data to clients via control message
357                if let Some(cb) = editor.take_pending_clipboard() {
358                    let msg = serde_json::to_string(&ServerControl::SetClipboard {
359                        text: cb.text,
360                        use_osc52: cb.use_osc52,
361                        use_system_clipboard: cb.use_system_clipboard,
362                    })
363                    .unwrap_or_default();
364                    for client in &mut self.clients {
365                        #[allow(clippy::let_underscore_must_use)]
366                        let _ = client.conn.write_control(&msg);
367                    }
368                }
369
370                if editor.check_mouse_hover_timer() {
371                    needs_render = true;
372                }
373            }
374
375            // Render and broadcast if needed
376            if needs_render && last_render.elapsed() >= FRAME_DURATION {
377                self.render_and_broadcast()?;
378                last_render = Instant::now();
379                needs_render = false;
380            }
381
382            // Brief sleep to avoid busy-waiting
383            std::thread::sleep(Duration::from_millis(5));
384        }
385
386        // Perform the same shutdown sequence as the normal (non-session) exit path
387        // in run_event_loop_common: auto-save, end recovery session, save workspace.
388        if let Some(ref mut editor) = self.editor {
389            // Auto-save file-backed buffers to disk before exiting
390            if editor.config().editor.auto_save_enabled {
391                match editor.save_all_on_exit() {
392                    Ok(count) if count > 0 => {
393                        tracing::info!("Auto-saved {} buffer(s) on exit", count);
394                    }
395                    Ok(_) => {}
396                    Err(e) => {
397                        tracing::warn!("Failed to auto-save on exit: {}", e);
398                    }
399                }
400            }
401
402            // End recovery session first (flushes dirty buffers + assigns recovery IDs),
403            // then save workspace (captures those IDs for next session restore).
404            if let Err(e) = editor.end_recovery_session() {
405                tracing::warn!("Failed to end recovery session: {}", e);
406            }
407            if let Err(e) = editor.save_workspace() {
408                tracing::warn!("Failed to save workspace: {}", e);
409            } else {
410                tracing::debug!("Workspace saved successfully");
411            }
412        }
413
414        // Clean shutdown
415        self.disconnect_all_clients("Server shutting down")?;
416
417        Ok(())
418    }
419
420    /// Initialize the editor with the current terminal size
421    /// Initialize the editor with the current terminal size.
422    ///
423    /// Performs the full startup sequence: create editor, set session name,
424    /// restore workspace, recover buffers from hot exit, start recovery session.
425    /// Called on first client connection.
426    pub fn initialize_editor(&mut self) -> io::Result<()> {
427        let backend = CaptureBackend::new(self.term_size.cols, self.term_size.rows);
428        let terminal = Terminal::new(backend)
429            .map_err(|e| io::Error::other(format!("Failed to create terminal: {}", e)))?;
430
431        let filesystem: Arc<dyn FileSystem + Send + Sync> = Arc::new(StdFileSystem);
432        let color_capability = ColorCapability::TrueColor; // Assume truecolor for now
433
434        let mut editor = Editor::with_working_dir(
435            self.config.editor_config.clone(),
436            self.term_size.cols,
437            self.term_size.rows,
438            Some(self.config.working_dir.clone()),
439            self.config.dir_context.clone(),
440            self.config.plugins_enabled,
441            color_capability,
442            filesystem,
443        )
444        .map_err(|e| io::Error::other(format!("Failed to create editor: {}", e)))?;
445
446        // Enable session mode - use hardware cursor only, no REVERSED software cursor
447        editor.set_session_mode(true);
448
449        // Set session name for status bar display
450        let session_display_name = self.config.session_name.clone().unwrap_or_else(|| {
451            // Use the directory name as a short display name for unnamed sessions
452            self.config
453                .working_dir
454                .file_name()
455                .and_then(|n| n.to_str())
456                .map(|s| s.to_string())
457                .unwrap_or_else(|| "session".to_string())
458        });
459        editor.set_session_name(Some(session_display_name));
460
461        // Restore workspace and recovery data (mirrors the standalone startup
462        // path in handle_first_run_setup in main.rs).
463        match editor.try_restore_workspace() {
464            Ok(true) => {
465                tracing::info!("Session workspace restored successfully");
466            }
467            Ok(false) => {
468                tracing::debug!("No previous session workspace found");
469            }
470            Err(e) => {
471                tracing::warn!("Failed to restore session workspace: {}", e);
472            }
473        }
474
475        // Recover buffers from hot exit recovery files
476        if editor.has_recovery_files().unwrap_or(false) {
477            tracing::info!("Recovery files found for session, recovering...");
478            match editor.recover_all_buffers() {
479                Ok(count) if count > 0 => {
480                    tracing::info!("Recovered {} buffer(s) for session", count);
481                }
482                Ok(_) => {
483                    tracing::info!("No buffers to recover for session");
484                }
485                Err(e) => {
486                    tracing::warn!("Failed to recover session buffers: {}", e);
487                }
488            }
489        }
490
491        // Start the recovery session (periodic saves of dirty buffers)
492        if let Err(e) = editor.start_recovery_session() {
493            tracing::warn!("Failed to start recovery session: {}", e);
494        }
495
496        self.terminal = Some(terminal);
497        self.editor = Some(editor);
498
499        tracing::info!(
500            "Editor initialized with size {}x{}",
501            self.term_size.cols,
502            self.term_size.rows
503        );
504
505        Ok(())
506    }
507
508    /// Handle a new client connection
509    fn handle_new_connection(
510        &self,
511        conn: ServerConnection,
512        client_id: u64,
513        cursor_style: crate::config::CursorStyle,
514    ) -> io::Result<ConnectedClient> {
515        // Read client hello
516        // On Windows, don't toggle blocking mode - named pipes don't support mode switching
517        // after connection. The read_control() method handles this internally.
518        #[cfg(not(windows))]
519        conn.control.set_nonblocking(false)?;
520        let hello_json = conn
521            .read_control()?
522            .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "No hello received"))?;
523
524        let client_msg: ClientControl = serde_json::from_str(&hello_json)
525            .map_err(|e| io::Error::other(format!("Invalid hello: {}", e)))?;
526
527        let hello = match client_msg {
528            ClientControl::Hello(h) => h,
529            _ => {
530                return Err(io::Error::other("Expected Hello message"));
531            }
532        };
533
534        // Check protocol version
535        if hello.protocol_version != PROTOCOL_VERSION {
536            let mismatch = VersionMismatch {
537                server_version: env!("CARGO_PKG_VERSION").to_string(),
538                client_version: hello.client_version.clone(),
539                action: if hello.protocol_version > PROTOCOL_VERSION {
540                    "upgrade_server".to_string()
541                } else {
542                    "restart_server".to_string()
543                },
544                message: format!(
545                    "Protocol version mismatch: server={}, client={}",
546                    PROTOCOL_VERSION, hello.protocol_version
547                ),
548            };
549
550            let response = serde_json::to_string(&ServerControl::VersionMismatch(mismatch))
551                .map_err(|e| io::Error::other(e.to_string()))?;
552            conn.write_control(&response)?;
553
554            return Err(io::Error::other("Version mismatch"));
555        }
556
557        // Send server hello
558        let session_id = self.config.session_name.clone().unwrap_or_else(|| {
559            crate::workspace::encode_path_for_filename(&self.config.working_dir)
560        });
561
562        let server_hello = ServerHello::new(session_id);
563        let response = serde_json::to_string(&ServerControl::Hello(server_hello))
564            .map_err(|e| io::Error::other(e.to_string()))?;
565        conn.write_control(&response)?;
566
567        // Set sockets back to non-blocking
568        // On Windows, don't toggle mode - named pipes don't support mode switching
569        #[cfg(not(windows))]
570        conn.control.set_nonblocking(true)?;
571
572        // Send terminal setup sequences
573        let mouse_hover_enabled = self.config.editor_config.editor.mouse_hover_enabled;
574        let setup = terminal_setup_sequences(mouse_hover_enabled);
575        conn.write_data(&setup)?;
576
577        // Send cursor style (from editor if running, otherwise from config)
578        conn.write_data(cursor_style.to_escape_sequence())?;
579
580        tracing::debug!(
581            "Client {} connected: {}x{}, TERM={:?}",
582            client_id,
583            hello.term_size.cols,
584            hello.term_size.rows,
585            hello.term()
586        );
587
588        // Create background writer for non-blocking render output
589        let data_writer = ClientDataWriter::new(conn.data.clone(), client_id);
590
591        Ok(ConnectedClient {
592            conn,
593            data_writer,
594            term_size: hello.term_size,
595            env: hello.env,
596            id: client_id,
597            input_parser: InputParser::new(),
598            needs_full_render: true,
599            wait_id: None,
600        })
601    }
602
603    /// Process messages from connected clients
604    /// Returns (input_events, resize_occurred, index of client that provided input)
605    fn process_clients(&mut self) -> io::Result<(Vec<Event>, bool, Option<usize>)> {
606        let mut disconnected = Vec::new();
607        let mut input_source_client: Option<usize> = None;
608        let mut input_events = Vec::new();
609        let mut resize_occurred = false;
610        let mut control_messages: Vec<(usize, ClientControl)> = Vec::new();
611
612        for (idx, client) in self.clients.iter_mut().enumerate() {
613            // Read from data socket
614            let mut buf = [0u8; 4096];
615            let mut data_eof = false;
616            tracing::debug!("[server] reading from client {} data socket", client.id);
617            match client.conn.read_data(&mut buf) {
618                Ok(0) => {
619                    tracing::debug!("[server] Client {} data stream closed (EOF)", client.id);
620                    // Don't disconnect waiting clients on data EOF - they're not sending data
621                    if client.wait_id.is_none() {
622                        disconnected.push(idx);
623                    }
624                    data_eof = true;
625                    // Don't continue - still need to check control socket for pending messages
626                }
627                Ok(n) => {
628                    tracing::debug!(
629                        "[server] Client {} read {} bytes from data socket",
630                        client.id,
631                        n
632                    );
633                    let events = client.input_parser.parse(&buf[..n]);
634                    tracing::debug!(
635                        "[server] Client {} parsed {} events",
636                        client.id,
637                        events.len()
638                    );
639                    if !events.is_empty() {
640                        input_source_client = Some(idx);
641                    }
642                    input_events.extend(events);
643                }
644                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
645                    // No data available
646                }
647                Err(e) => {
648                    tracing::warn!("[server] Client {} data read error: {}", client.id, e);
649                    disconnected.push(idx);
650                    data_eof = true;
651                    // Don't continue - still need to check control socket for pending messages
652                }
653            }
654            let _ = data_eof; // Suppress unused warning
655
656            // Check control socket
657            // On Windows, don't toggle nonblocking mode - it fails on named pipes
658            // Best-effort: nonblocking mode for control socket polling
659            #[cfg(not(windows))]
660            #[allow(clippy::let_underscore_must_use)]
661            let _ = client.conn.control.set_nonblocking(true);
662
663            // On Windows, use try_read pattern instead of blocking read_line
664            #[cfg(windows)]
665            {
666                let mut buf = [0u8; 1024];
667                match client.conn.control.try_read(&mut buf) {
668                    Ok(0) => {
669                        tracing::debug!("Client {} control stream closed (EOF)", client.id);
670                        disconnected.push(idx);
671                    }
672                    Ok(n) => {
673                        // Try to parse as control message
674                        if let Ok(s) = std::str::from_utf8(&buf[..n]) {
675                            for line in s.lines() {
676                                if !line.trim().is_empty() {
677                                    if let Ok(msg) = serde_json::from_str::<ClientControl>(line) {
678                                        control_messages.push((idx, msg));
679                                    }
680                                }
681                            }
682                        }
683                    }
684                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
685                    Err(e) => {
686                        tracing::warn!("Client {} control read error: {}", client.id, e);
687                    }
688                }
689            }
690
691            #[cfg(not(windows))]
692            {
693                let mut reader = std::io::BufReader::new(&client.conn.control);
694                let mut line = String::new();
695                match std::io::BufRead::read_line(&mut reader, &mut line) {
696                    Ok(0) => {
697                        tracing::debug!("Client {} control stream closed (EOF)", client.id);
698                        disconnected.push(idx);
699                    }
700                    Ok(_) if !line.trim().is_empty() => {
701                        if let Ok(msg) = serde_json::from_str::<ClientControl>(&line) {
702                            control_messages.push((idx, msg));
703                        }
704                    }
705                    Ok(_) => {}
706                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
707                    Err(e) => {
708                        tracing::warn!("Client {} control read error: {}", client.id, e);
709                    }
710                }
711            }
712        }
713
714        // Process control messages
715        eprintln!(
716            "[server] Processing {} control messages",
717            control_messages.len()
718        );
719        for (idx, msg) in control_messages {
720            eprintln!("[server] Control message from client {}: {:?}", idx, msg);
721            // Always process Quit, even from disconnected clients
722            if let ClientControl::Quit = msg {
723                tracing::info!("Client requested quit, shutting down");
724                self.shutdown.store(true, Ordering::SeqCst);
725                continue;
726            }
727
728            // Always process OpenFiles - it's a one-shot command from clients that disconnect immediately
729            if let ClientControl::OpenFiles { .. } = msg {
730                // Fall through to process it
731            } else if disconnected.contains(&idx) {
732                // Skip other messages from disconnected clients
733                continue;
734            }
735
736            match msg {
737                ClientControl::Hello(_) => {
738                    tracing::warn!("Unexpected Hello from client");
739                }
740                ClientControl::Resize { cols, rows } => {
741                    if let Some(client) = self.clients.get_mut(idx) {
742                        client.term_size = TermSize::new(cols, rows);
743                        // Update server size to match first client
744                        if idx == 0 {
745                            self.term_size = TermSize::new(cols, rows);
746                            resize_occurred = true;
747                        }
748                    }
749                }
750                ClientControl::Ping => {
751                    if let Some(client) = self.clients.get_mut(idx) {
752                        let pong = serde_json::to_string(&ServerControl::Pong).unwrap_or_default();
753                        // Best-effort pong reply
754                        #[allow(clippy::let_underscore_must_use)]
755                        let _ = client.conn.write_control(&pong);
756                    }
757                }
758                ClientControl::Detach => {
759                    tracing::info!("Client {} detached", idx);
760                    disconnected.push(idx);
761                }
762                ClientControl::OpenFiles { files, wait } => {
763                    if let Some(ref mut editor) = self.editor {
764                        // Assign a wait_id if --wait was requested
765                        let wait_id = if wait {
766                            let id = self.next_wait_id;
767                            self.next_wait_id += 1;
768                            Some(id)
769                        } else {
770                            None
771                        };
772
773                        let file_count = files.len();
774                        for (i, file_req) in files.iter().enumerate() {
775                            let path = std::path::PathBuf::from(&file_req.path);
776                            tracing::debug!(
777                                "Queuing file open: {:?} line={:?} col={:?} end_line={:?} end_col={:?} message={:?}",
778                                path,
779                                file_req.line,
780                                file_req.column,
781                                file_req.end_line,
782                                file_req.end_column,
783                                file_req.message,
784                            );
785                            // Only the last file gets the wait_id (it's the one that will be active)
786                            let file_wait_id = if i == file_count - 1 { wait_id } else { None };
787                            editor.queue_file_open(
788                                path,
789                                file_req.line,
790                                file_req.column,
791                                file_req.end_line,
792                                file_req.end_column,
793                                file_req.message.clone(),
794                                file_wait_id,
795                            );
796                        }
797
798                        // Track the waiting client
799                        if let Some(wait_id) = wait_id {
800                            if let Some(client) = self.clients.get_mut(idx) {
801                                self.waiting_clients.insert(wait_id, client.id);
802                                client.wait_id = Some(wait_id);
803                            }
804                        }
805
806                        resize_occurred = true; // Force re-render
807                    }
808                }
809                ClientControl::Quit => unreachable!(), // Handled above
810            }
811        }
812
813        // Check for clients with broken write pipes
814        for (idx, client) in self.clients.iter().enumerate() {
815            if client.data_writer.is_broken() && !disconnected.contains(&idx) {
816                tracing::info!("Client {} write pipe broken, disconnecting", client.id);
817                disconnected.push(idx);
818            }
819        }
820
821        // Deduplicate and sort for safe reverse removal
822        disconnected.sort_unstable();
823        disconnected.dedup();
824
825        // Remove disconnected clients
826        for idx in disconnected.into_iter().rev() {
827            let client = self.clients.remove(idx);
828            // Clean up --wait tracking if this client was waiting
829            if let Some(wait_id) = client.wait_id {
830                self.waiting_clients.remove(&wait_id);
831                // Also clean up editor wait_tracking for this wait_id
832                if let Some(ref mut editor) = self.editor {
833                    editor.remove_wait_tracking(wait_id);
834                }
835            }
836            // Best-effort teardown via the non-blocking writer
837            let teardown = terminal_teardown_sequences();
838            let _ = client.data_writer.try_write(&teardown);
839            tracing::info!("Client {} disconnected", client.id);
840            // Invalidate input source if that client disconnected
841            if input_source_client == Some(idx) {
842                input_source_client = None;
843            }
844        }
845
846        Ok((input_events, resize_occurred, input_source_client))
847    }
848
849    /// Update terminal size after resize
850    fn update_terminal_size(&mut self) -> io::Result<()> {
851        if let Some(ref mut terminal) = self.terminal {
852            let backend = terminal.backend_mut();
853            backend.resize(self.term_size.cols, self.term_size.rows);
854        }
855
856        if let Some(ref mut editor) = self.editor {
857            editor.resize(self.term_size.cols, self.term_size.rows);
858        }
859
860        Ok(())
861    }
862
863    /// Handle an input event
864    fn handle_event(&mut self, event: Event) -> io::Result<bool> {
865        let Some(ref mut editor) = self.editor else {
866            return Ok(false);
867        };
868
869        match event {
870            Event::Key(key_event) => {
871                if key_event.kind == KeyEventKind::Press {
872                    editor
873                        .handle_key(key_event.code, key_event.modifiers)
874                        .map_err(|e| io::Error::other(e.to_string()))?;
875                    Ok(true)
876                } else {
877                    Ok(false)
878                }
879            }
880            Event::Mouse(mouse_event) => editor
881                .handle_mouse(mouse_event)
882                .map_err(|e| io::Error::other(e.to_string())),
883            Event::Resize(w, h) => {
884                editor.resize(w, h);
885                Ok(true)
886            }
887            Event::Paste(text) => {
888                editor.paste_text(text);
889                Ok(true)
890            }
891            _ => Ok(false),
892        }
893    }
894
895    /// Render the editor and broadcast output to all clients
896    fn render_and_broadcast(&mut self) -> io::Result<()> {
897        let Some(ref mut editor) = self.editor else {
898            return Ok(());
899        };
900
901        let Some(ref mut terminal) = self.terminal else {
902            return Ok(());
903        };
904
905        // Check if any client needs a full render (e.g., newly connected)
906        let any_needs_full = self.clients.iter().any(|c| c.needs_full_render);
907        if any_needs_full {
908            tracing::info!(
909                "Full render requested for {} client(s)",
910                self.clients.iter().filter(|c| c.needs_full_render).count()
911            );
912            // Force full redraw by invalidating terminal state
913            terminal.backend_mut().reset_style_state();
914            // Best-effort terminal clear for full redraw
915            #[allow(clippy::let_underscore_must_use)]
916            let _ = terminal.clear();
917        }
918
919        // Take any pending escape sequences (e.g., cursor style changes)
920        let pending_sequences = editor.take_pending_escape_sequences();
921
922        // Render to capture backend
923        terminal
924            .draw(|frame| editor.render(frame))
925            .map_err(|e| io::Error::other(e.to_string()))?;
926
927        // Get the captured output
928        let output = terminal.backend_mut().take_buffer();
929
930        if output.is_empty() && pending_sequences.is_empty() {
931            return Ok(());
932        }
933
934        // Broadcast to all clients via non-blocking writer threads (skip waiting clients)
935        for client in &mut self.clients {
936            if client.wait_id.is_some() {
937                continue;
938            }
939            // Combine pending sequences and output into a single frame
940            let frame = if !pending_sequences.is_empty() && !output.is_empty() {
941                let mut combined = Vec::with_capacity(pending_sequences.len() + output.len());
942                combined.extend_from_slice(&pending_sequences);
943                combined.extend_from_slice(&output);
944                combined
945            } else if !pending_sequences.is_empty() {
946                pending_sequences.clone()
947            } else {
948                output.clone()
949            };
950
951            if !frame.is_empty() && !client.data_writer.try_write(&frame) {
952                tracing::warn!("Client {} output buffer full, dropping frame", client.id);
953            }
954            // Clear full render flag after sending
955            client.needs_full_render = false;
956        }
957
958        Ok(())
959    }
960
961    /// Disconnect all clients
962    fn disconnect_all_clients(&mut self, reason: &str) -> io::Result<()> {
963        let teardown = terminal_teardown_sequences();
964        for client in &mut self.clients {
965            // Best-effort: client may already be disconnected
966            #[allow(clippy::let_underscore_must_use)]
967            let _ = client.data_writer.try_write(&teardown);
968            let quit_msg = serde_json::to_string(&ServerControl::Quit {
969                reason: reason.to_string(),
970            })
971            .unwrap_or_default();
972            // Best-effort: client may already be disconnected
973            #[allow(clippy::let_underscore_must_use)]
974            let _ = client.conn.write_control(&quit_msg);
975        }
976        self.clients.clear();
977        Ok(())
978    }
979}
980
981impl ConnectedClient {
982    /// Get the client's TERM environment variable
983    #[allow(dead_code)]
984    pub fn term(&self) -> Option<&str> {
985        self.env.get("TERM").and_then(|v| v.as_deref())
986    }
987
988    /// Check if the client supports truecolor
989    #[allow(dead_code)]
990    pub fn supports_truecolor(&self) -> bool {
991        self.env
992            .get("COLORTERM")
993            .and_then(|v| v.as_deref())
994            .map(|v| v == "truecolor" || v == "24bit")
995            .unwrap_or(false)
996    }
997}