1use std::io;
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::mpsc;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use crossterm::event::{Event, KeyEventKind};
16use ratatui::Terminal;
17
18use crate::app::Editor;
19use crate::config::Config;
20use crate::config_io::DirectoryContext;
21use crate::server::capture_backend::{
24 terminal_setup_sequences, terminal_teardown_sequences, CaptureBackend,
25};
26use crate::server::input_parser::InputParser;
27use crate::server::ipc::{ServerConnection, ServerListener, SocketPaths, StreamWrapper};
28use crate::server::protocol::{
29 ClientControl, ServerControl, ServerHello, TermSize, VersionMismatch, PROTOCOL_VERSION,
30};
31use crate::view::color_support::ColorCapability;
32
33pub struct EditorServerConfig {
35 pub working_dir: PathBuf,
37 pub session_name: Option<String>,
39 pub idle_timeout: Option<Duration>,
41 pub editor_config: Config,
43 pub dir_context: DirectoryContext,
45 pub plugins_enabled: bool,
47 pub init_enabled: bool,
49 pub startup_authority: Option<crate::services::authority::Authority>,
56 pub workspace_trust: Arc<crate::services::workspace_trust::WorkspaceTrust>,
61 pub env_provider: Arc<crate::services::env_provider::EnvProvider>,
65 pub session_keepalive: Option<Box<dyn std::any::Any + Send>>,
72}
73
74pub struct EditorServer {
76 config: EditorServerConfig,
77 listener: ServerListener,
78 clients: Vec<ConnectedClient>,
79 editor: Option<Editor>,
80 terminal: Option<Terminal<CaptureBackend>>,
81 last_client_activity: Instant,
82 shutdown: Arc<AtomicBool>,
83 term_size: TermSize,
85 last_input_client: Option<usize>,
87 next_wait_id: u64,
89 waiting_clients: std::collections::HashMap<u64, u64>,
91 current_authority: crate::services::authority::Authority,
98 workspace_trust: Arc<crate::services::workspace_trust::WorkspaceTrust>,
104 env_provider: Arc<crate::services::env_provider::EnvProvider>,
106 #[allow(dead_code)]
112 session_keepalive: Option<Box<dyn std::any::Any + Send>>,
113}
114
115struct ClientDataWriter {
122 sender: mpsc::SyncSender<Vec<u8>>,
123 pipe_broken: Arc<AtomicBool>,
124}
125
126impl ClientDataWriter {
127 fn new(data: StreamWrapper, client_id: u64) -> Self {
129 let (tx, rx) = mpsc::sync_channel::<Vec<u8>>(16);
131 let pipe_broken = Arc::new(AtomicBool::new(false));
132 let pipe_broken_clone = pipe_broken.clone();
133
134 std::thread::Builder::new()
135 .name(format!("client-{}-writer", client_id))
136 .spawn(move || {
137 while let Ok(buf) = rx.recv() {
138 if let Err(e) = data.write_all(&buf) {
139 tracing::debug!("Client {} writer pipe error: {}", client_id, e);
140 pipe_broken_clone.store(true, Ordering::Relaxed);
141 break;
142 }
143 if let Err(e) = data.flush() {
144 tracing::debug!("Client {} writer flush error: {}", client_id, e);
145 pipe_broken_clone.store(true, Ordering::Relaxed);
146 break;
147 }
148 }
149 tracing::debug!("Client {} writer thread exiting", client_id);
150 })
151 .expect("Failed to spawn client writer thread");
152
153 Self {
154 sender: tx,
155 pipe_broken,
156 }
157 }
158
159 fn try_write(&self, data: &[u8]) -> bool {
162 self.sender.try_send(data.to_vec()).is_ok()
163 }
164
165 fn is_broken(&self) -> bool {
167 self.pipe_broken.load(Ordering::Relaxed)
168 }
169}
170
171struct ConnectedClient {
173 conn: ServerConnection,
174 data_writer: ClientDataWriter,
176 term_size: TermSize,
177 env: std::collections::HashMap<String, Option<String>>,
178 id: u64,
179 input_parser: InputParser,
180 needs_full_render: bool,
182 wait_id: Option<u64>,
184}
185
186impl EditorServer {
187 pub fn new(mut config: EditorServerConfig) -> io::Result<Self> {
189 let socket_paths = if let Some(ref name) = config.session_name {
190 SocketPaths::for_session_name(name)?
191 } else {
192 SocketPaths::for_working_dir(&config.working_dir)?
193 };
194
195 let listener = ServerListener::bind(socket_paths)?;
196
197 let pid = std::process::id();
199 if let Err(e) = listener.paths().write_pid(pid) {
200 tracing::warn!("Failed to write PID file: {}", e);
201 }
202
203 let workspace_trust = Arc::clone(&config.workspace_trust);
209 let env_provider = Arc::clone(&config.env_provider);
210
211 let current_authority = config.startup_authority.take().unwrap_or_else(|| {
216 crate::services::authority::Authority::local(
217 Arc::clone(&workspace_trust),
218 Arc::clone(&env_provider),
219 )
220 });
221 let session_keepalive = config.session_keepalive.take();
222
223 Ok(Self {
224 config,
225 listener,
226 clients: Vec::new(),
227 editor: None,
228 terminal: None,
229 last_client_activity: Instant::now(),
230 shutdown: Arc::new(AtomicBool::new(false)),
231 term_size: TermSize::new(80, 24), last_input_client: None,
233 next_wait_id: 1,
234 waiting_clients: std::collections::HashMap::new(),
235 current_authority,
236 workspace_trust,
237 env_provider,
238 session_keepalive,
239 })
240 }
241
242 pub fn shutdown_handle(&self) -> Arc<AtomicBool> {
244 self.shutdown.clone()
245 }
246
247 pub fn socket_paths(&self) -> &SocketPaths {
249 self.listener.paths()
250 }
251
252 pub fn editor(&self) -> Option<&Editor> {
254 self.editor.as_ref()
255 }
256
257 pub fn editor_mut(&mut self) -> Option<&mut Editor> {
259 self.editor.as_mut()
260 }
261
262 pub fn run(&mut self) -> io::Result<()> {
264 tracing::info!("Editor server starting for {:?}", self.config.working_dir);
265
266 let mut next_client_id = 1u64;
267 let mut needs_render = true;
268 let mut last_render = Instant::now();
269 const FRAME_DURATION: Duration = Duration::from_millis(16); loop {
272 if self.shutdown.load(Ordering::SeqCst) {
274 tracing::info!("Shutdown requested");
275 break;
276 }
277
278 if let Some(timeout) = self.config.idle_timeout {
280 if self.clients.is_empty() && self.last_client_activity.elapsed() > timeout {
281 tracing::info!("Idle timeout reached, shutting down");
282 break;
283 }
284 }
285
286 tracing::debug!("[server] main loop: calling accept()");
288 match self.listener.accept() {
289 Ok(Some(conn)) => {
290 let cursor_style = self
292 .editor
293 .as_ref()
294 .map(|e| e.config().editor.cursor_style)
295 .unwrap_or(self.config.editor_config.editor.cursor_style);
296 match self.handle_new_connection(conn, next_client_id, cursor_style) {
297 Ok(client) => {
298 tracing::info!("Client {} connected", client.id);
299
300 if self.editor.is_none() {
302 self.term_size = client.term_size;
304 self.initialize_editor()?;
305 } else if self.clients.is_empty() {
306 if self.term_size != client.term_size {
308 self.term_size = client.term_size;
309 self.update_terminal_size()?;
310 }
311 }
312 self.clients.push(client);
315 self.last_client_activity = Instant::now();
316 next_client_id += 1;
317 needs_render = true;
318 }
319 Err(e) => {
320 tracing::warn!("Failed to complete handshake: {}", e);
321 }
322 }
323 }
324 Ok(None) => {}
325 Err(e) => {
326 tracing::error!("Accept error: {}", e);
327 }
328 }
329
330 tracing::debug!("[server] main loop: calling process_clients");
332 let (input_events, resize_occurred, input_source) = self.process_clients()?;
333 if let Some(idx) = input_source {
334 self.last_input_client = Some(idx);
335 }
336 if !input_events.is_empty() {
337 tracing::debug!(
338 "[server] process_clients returned {} events",
339 input_events.len()
340 );
341 }
342
343 if let Some(ref mut editor) = self.editor {
351 if editor.should_quit() {
352 let pending_authority = editor.take_pending_authority();
353 let pending_keepalive = editor.take_pending_keepalive();
354 let restart_dir = editor.take_restart_dir();
355 if pending_authority.is_some() || restart_dir.is_some() {
356 tracing::info!(
357 "Session rebuild requested (authority={}, dir={})",
358 pending_authority.is_some(),
359 restart_dir.is_some()
360 );
361 if let Err(e) =
362 self.rebuild_editor(restart_dir, pending_authority, pending_keepalive)
363 {
364 tracing::error!("Session rebuild failed, shutting down: {}", e);
365 self.shutdown.store(true, Ordering::SeqCst);
366 continue;
367 }
368 needs_render = true;
369 continue;
370 }
371 tracing::info!("Editor requested quit");
372 self.shutdown.store(true, Ordering::SeqCst);
373 continue;
374 }
375 }
376
377 let detach_requested = self
379 .editor
380 .as_ref()
381 .map(|e| e.should_detach())
382 .unwrap_or(false);
383 if detach_requested {
384 if let Some(idx) = self.last_input_client.take() {
386 if idx < self.clients.len() {
387 tracing::info!("Client {} requested detach", self.clients[idx].id);
388 let client = self.clients.remove(idx);
389 let teardown = terminal_teardown_sequences();
390 #[allow(clippy::let_underscore_must_use)]
392 let _ = client.data_writer.try_write(&teardown);
393 let quit_msg = serde_json::to_string(&ServerControl::Quit {
394 reason: "Detached".to_string(),
395 })
396 .unwrap_or_default();
397 #[allow(clippy::let_underscore_must_use)]
399 let _ = client.conn.write_control(&quit_msg);
400 }
401 } else {
402 tracing::info!("Detach requested but no input source, detaching all");
404 self.disconnect_all_clients("Detached")?;
405 }
406 if let Some(ref mut editor) = self.editor {
408 editor.clear_detach();
409 }
410 continue;
411 }
412
413 let suspend_requested = self
419 .editor
420 .as_mut()
421 .map(|e| e.take_suspend_request())
422 .unwrap_or(false);
423 if suspend_requested {
424 if let Some(idx) = self.last_input_client {
425 if idx < self.clients.len() {
426 let client_id = self.clients[idx].id;
427 tracing::info!("Client {} requested suspend", client_id);
428 let suspend_msg = serde_json::to_string(&ServerControl::SuspendClient)
429 .unwrap_or_default();
430 #[allow(clippy::let_underscore_must_use)]
432 let _ = self.clients[idx].conn.write_control(&suspend_msg);
433 self.clients[idx].needs_full_render = true;
437 }
438 } else {
439 tracing::warn!("Suspend requested but no input source; ignoring");
440 }
441 continue;
442 }
443
444 if resize_occurred {
446 self.update_terminal_size()?;
447 needs_render = true;
448 }
449
450 if !input_events.is_empty() {
452 self.last_client_activity = Instant::now();
453 for event in input_events {
454 if self.handle_event(event)? {
455 needs_render = true;
456 }
457 }
458 }
459
460 if let Some(ref mut editor) = self.editor {
462 if editor.process_async_messages() {
463 needs_render = true;
464 }
465 if editor.process_pending_file_opens() {
466 needs_render = true;
467 }
468
469 for wait_id in editor.take_completed_waits() {
471 if let Some(client_id) = self.waiting_clients.remove(&wait_id) {
472 if let Some(client) = self.clients.iter_mut().find(|c| c.id == client_id) {
474 let msg = serde_json::to_string(&ServerControl::WaitComplete)
475 .unwrap_or_default();
476 #[allow(clippy::let_underscore_must_use)]
477 let _ = client.conn.write_control(&msg);
478 client.wait_id = None;
479 }
480 }
481 }
482
483 if let Some(cb) = editor.take_pending_clipboard() {
485 let msg = serde_json::to_string(&ServerControl::SetClipboard {
486 text: cb.text,
487 use_osc52: cb.use_osc52,
488 use_system_clipboard: cb.use_system_clipboard,
489 })
490 .unwrap_or_default();
491 for client in &mut self.clients {
492 #[allow(clippy::let_underscore_must_use)]
493 let _ = client.conn.write_control(&msg);
494 }
495 }
496
497 if editor.check_mouse_hover_timer() {
498 needs_render = true;
499 }
500
501 if editor.active_window().animations.is_active() {
509 needs_render = true;
510 }
511 }
512
513 if needs_render && last_render.elapsed() >= FRAME_DURATION {
515 self.render_and_broadcast()?;
516 last_render = Instant::now();
517 needs_render = false;
518 }
519
520 std::thread::sleep(Duration::from_millis(5));
522 }
523
524 if let Some(ref mut editor) = self.editor {
527 if editor.config().editor.auto_save_enabled {
529 match editor.save_all_on_exit() {
530 Ok(count) if count > 0 => {
531 tracing::info!("Auto-saved {} buffer(s) on exit", count);
532 }
533 Ok(_) => {}
534 Err(e) => {
535 tracing::warn!("Failed to auto-save on exit: {}", e);
536 }
537 }
538 }
539
540 if let Err(e) = editor.end_recovery_session() {
543 tracing::warn!("Failed to end recovery session: {}", e);
544 }
545 if let Err(e) = editor.save_all_windows_workspaces() {
546 tracing::warn!("Failed to save workspaces: {}", e);
547 } else {
548 tracing::debug!("Workspaces saved successfully");
549 }
550 }
551
552 self.disconnect_all_clients("Server shutting down")?;
554
555 Ok(())
556 }
557
558 fn build_editor_instance(&self) -> io::Result<(Editor, Terminal<CaptureBackend>)> {
562 let backend = CaptureBackend::new(self.term_size.cols, self.term_size.rows);
563 let terminal = Terminal::new(backend)
564 .map_err(|e| io::Error::other(format!("Failed to create terminal: {}", e)))?;
565
566 let color_capability = ColorCapability::TrueColor; let mut editor = Editor::with_working_dir_opts(
573 self.config.editor_config.clone(),
574 self.term_size.cols,
575 self.term_size.rows,
576 Some(self.config.working_dir.clone()),
577 self.config.dir_context.clone(),
578 self.config.plugins_enabled,
579 color_capability,
580 self.current_authority.clone(),
581 false,
582 )
583 .map_err(|e| io::Error::other(format!("Failed to create editor: {}", e)))?;
584
585 editor.load_init_script(self.config.init_enabled);
587
588 editor.set_session_mode(true);
590
591 let session_display_name = self.config.session_name.clone().unwrap_or_else(|| {
593 self.config
595 .working_dir
596 .file_name()
597 .and_then(|n| n.to_str())
598 .map(|s| s.to_string())
599 .unwrap_or_else(|| "session".to_string())
600 });
601 editor.set_session_name(Some(session_display_name));
602
603 Ok((editor, terminal))
604 }
605
606 pub fn initialize_editor(&mut self) -> io::Result<()> {
613 let (mut editor, terminal) = self.build_editor_instance()?;
614
615 match editor.try_restore_workspace() {
618 Ok(true) => {
619 tracing::info!("Session workspace restored successfully");
620 }
621 Ok(false) => {
622 tracing::debug!("No previous session workspace found");
623 }
624 Err(e) => {
625 tracing::warn!("Failed to restore session workspace: {}", e);
626 }
627 }
628
629 if editor.has_recovery_files().unwrap_or(false) {
631 tracing::info!("Recovery files found for session, recovering...");
632 match editor.recover_all_buffers() {
633 Ok(count) if count > 0 => {
634 tracing::info!("Recovered {} buffer(s) for session", count);
635 }
636 Ok(_) => {
637 tracing::info!("No buffers to recover for session");
638 }
639 Err(e) => {
640 tracing::warn!("Failed to recover session buffers: {}", e);
641 }
642 }
643 }
644
645 if let Err(e) = editor.start_recovery_session() {
647 tracing::warn!("Failed to start recovery session: {}", e);
648 }
649
650 self.terminal = Some(terminal);
651 self.editor = Some(editor);
652
653 self.maybe_prompt_workspace_trust();
654
655 tracing::info!(
656 "Editor initialized with size {}x{}",
657 self.term_size.cols,
658 self.term_size.rows
659 );
660
661 Ok(())
662 }
663
664 fn maybe_prompt_workspace_trust(&mut self) {
668 if let Some(editor) = self.editor.as_mut() {
669 editor.maybe_prompt_workspace_trust();
670 }
671 }
672
673 pub(crate) fn rebuild_editor(
685 &mut self,
686 new_working_dir: Option<PathBuf>,
687 new_authority: Option<crate::services::authority::Authority>,
688 new_keepalive: Option<Box<dyn std::any::Any + Send>>,
689 ) -> io::Result<()> {
690 if let Some(ref mut editor) = self.editor {
694 if editor.config().editor.auto_save_enabled {
695 if let Err(e) = editor.save_all_on_exit() {
696 tracing::warn!("Rebuild: failed to auto-save on exit: {}", e);
697 }
698 }
699 if let Err(e) = editor.end_recovery_session() {
700 tracing::warn!("Rebuild: failed to end recovery session: {}", e);
701 }
702 if let Err(e) = editor.save_all_windows_workspaces() {
703 tracing::warn!("Rebuild: failed to save workspaces: {}", e);
704 }
705 }
706
707 self.editor = None;
710 self.terminal = None;
711
712 if let Some(dir) = new_working_dir {
714 tracing::info!("Rebuild: switching working dir to {}", dir.display());
715 self.config.working_dir = dir;
716 self.workspace_trust
720 .set_root(Some(self.config.working_dir.clone()));
721 self.workspace_trust.set_store(Some(
722 crate::services::workspace_trust::TrustStore::for_project_dir(
723 &self
724 .config
725 .dir_context
726 .project_state_dir(&self.config.working_dir),
727 ),
728 ));
729 self.env_provider.clear();
732 }
733 if let Some(auth) = new_authority {
734 tracing::info!(
735 "Rebuild: installing authority with label {:?}",
736 auth.display_label
737 );
738 self.current_authority = auth;
739 }
740 if let Some(keepalive) = new_keepalive {
746 self.session_keepalive = Some(keepalive);
747 }
748
749 let (mut editor, terminal) = self.build_editor_instance()?;
750
751 match editor.try_restore_workspace() {
755 Ok(true) => tracing::info!("Rebuild: workspace restored"),
756 Ok(false) => tracing::debug!("Rebuild: no workspace to restore"),
757 Err(e) => tracing::warn!("Rebuild: failed to restore workspace: {}", e),
758 }
759
760 if let Err(e) = editor.start_recovery_session() {
761 tracing::warn!("Rebuild: failed to start recovery session: {}", e);
762 }
763
764 self.terminal = Some(terminal);
765 self.editor = Some(editor);
766
767 self.maybe_prompt_workspace_trust();
771
772 for client in &mut self.clients {
775 client.needs_full_render = true;
776 }
777
778 tracing::info!(
779 "Rebuild: complete, {} clients kept attached",
780 self.clients.len()
781 );
782
783 Ok(())
784 }
785
786 fn handle_new_connection(
788 &self,
789 conn: ServerConnection,
790 client_id: u64,
791 cursor_style: crate::config::CursorStyle,
792 ) -> io::Result<ConnectedClient> {
793 #[cfg(not(windows))]
797 conn.control.set_nonblocking(false)?;
798 let hello_json = conn
799 .read_control()?
800 .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "No hello received"))?;
801
802 let client_msg: ClientControl = serde_json::from_str(&hello_json)
803 .map_err(|e| io::Error::other(format!("Invalid hello: {}", e)))?;
804
805 let hello = match client_msg {
806 ClientControl::Hello(h) => h,
807 _ => {
808 return Err(io::Error::other("Expected Hello message"));
809 }
810 };
811
812 if hello.protocol_version != PROTOCOL_VERSION {
814 let mismatch = VersionMismatch {
815 server_version: env!("CARGO_PKG_VERSION").to_string(),
816 client_version: hello.client_version.clone(),
817 action: if hello.protocol_version > PROTOCOL_VERSION {
818 "upgrade_server".to_string()
819 } else {
820 "restart_server".to_string()
821 },
822 message: format!(
823 "Protocol version mismatch: server={}, client={}",
824 PROTOCOL_VERSION, hello.protocol_version
825 ),
826 };
827
828 let response = serde_json::to_string(&ServerControl::VersionMismatch(mismatch))
829 .map_err(|e| io::Error::other(e.to_string()))?;
830 conn.write_control(&response)?;
831
832 return Err(io::Error::other("Version mismatch"));
833 }
834
835 let session_id = self.config.session_name.clone().unwrap_or_else(|| {
837 crate::workspace::encode_path_for_filename(&self.config.working_dir)
838 });
839
840 let server_hello = ServerHello::new(session_id);
841 let response = serde_json::to_string(&ServerControl::Hello(server_hello))
842 .map_err(|e| io::Error::other(e.to_string()))?;
843 conn.write_control(&response)?;
844
845 #[cfg(not(windows))]
848 conn.control.set_nonblocking(true)?;
849
850 let mouse_hover_enabled = self.config.editor_config.editor.mouse_hover_enabled;
852 let setup = terminal_setup_sequences(mouse_hover_enabled);
853 conn.write_data(&setup)?;
854
855 conn.write_data(cursor_style.to_escape_sequence())?;
857
858 tracing::debug!(
859 "Client {} connected: {}x{}, TERM={:?}",
860 client_id,
861 hello.term_size.cols,
862 hello.term_size.rows,
863 hello.term()
864 );
865
866 let data_writer = ClientDataWriter::new(conn.data.clone(), client_id);
868
869 Ok(ConnectedClient {
870 conn,
871 data_writer,
872 term_size: hello.term_size,
873 env: hello.env,
874 id: client_id,
875 input_parser: InputParser::new(),
876 needs_full_render: true,
877 wait_id: None,
878 })
879 }
880
881 fn process_clients(&mut self) -> io::Result<(Vec<Event>, bool, Option<usize>)> {
884 let mut disconnected = Vec::new();
885 let mut input_source_client: Option<usize> = None;
886 let mut input_events = Vec::new();
887 let mut resize_occurred = false;
888 let mut control_messages: Vec<(usize, ClientControl)> = Vec::new();
889
890 for (idx, client) in self.clients.iter_mut().enumerate() {
891 let mut buf = [0u8; 4096];
893 let mut data_eof = false;
894 tracing::debug!("[server] reading from client {} data socket", client.id);
895 match client.conn.read_data(&mut buf) {
896 Ok(0) => {
897 tracing::debug!("[server] Client {} data stream closed (EOF)", client.id);
898 if client.wait_id.is_none() {
900 disconnected.push(idx);
901 }
902 data_eof = true;
903 }
905 Ok(n) => {
906 tracing::debug!(
907 "[server] Client {} read {} bytes from data socket",
908 client.id,
909 n
910 );
911 let events = client.input_parser.parse(&buf[..n]);
912 tracing::debug!(
913 "[server] Client {} parsed {} events",
914 client.id,
915 events.len()
916 );
917 if !events.is_empty() {
918 input_source_client = Some(idx);
919 }
920 input_events.extend(events);
921 }
922 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
923 }
925 Err(e) => {
926 tracing::warn!("[server] Client {} data read error: {}", client.id, e);
927 disconnected.push(idx);
928 data_eof = true;
929 }
931 }
932 let _ = data_eof; #[cfg(not(windows))]
938 #[allow(clippy::let_underscore_must_use)]
939 let _ = client.conn.control.set_nonblocking(true);
940
941 #[cfg(windows)]
943 {
944 let mut buf = [0u8; 1024];
945 match client.conn.control.try_read(&mut buf) {
946 Ok(0) => {
947 tracing::debug!("Client {} control stream closed (EOF)", client.id);
948 disconnected.push(idx);
949 }
950 Ok(n) => {
951 if let Ok(s) = std::str::from_utf8(&buf[..n]) {
953 for line in s.lines() {
954 if !line.trim().is_empty() {
955 if let Ok(msg) = serde_json::from_str::<ClientControl>(line) {
956 control_messages.push((idx, msg));
957 }
958 }
959 }
960 }
961 }
962 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
963 Err(e) => {
964 tracing::warn!("Client {} control read error: {}", client.id, e);
965 }
966 }
967 }
968
969 #[cfg(not(windows))]
970 {
971 let mut reader = std::io::BufReader::new(&client.conn.control);
972 let mut line = String::new();
973 match std::io::BufRead::read_line(&mut reader, &mut line) {
974 Ok(0) => {
975 tracing::debug!("Client {} control stream closed (EOF)", client.id);
976 disconnected.push(idx);
977 }
978 Ok(_) if !line.trim().is_empty() => {
979 if let Ok(msg) = serde_json::from_str::<ClientControl>(&line) {
980 control_messages.push((idx, msg));
981 }
982 }
983 Ok(_) => {}
984 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
985 Err(e) => {
986 tracing::warn!("Client {} control read error: {}", client.id, e);
987 }
988 }
989 }
990 }
991
992 if !control_messages.is_empty() {
994 tracing::debug!(
995 "[server] Processing {} control messages",
996 control_messages.len()
997 );
998 }
999 for (idx, msg) in control_messages {
1000 tracing::debug!("[server] Control message from client {}: {:?}", idx, msg);
1001 if let ClientControl::Quit = msg {
1003 tracing::info!("Client requested quit, shutting down");
1004 self.shutdown.store(true, Ordering::SeqCst);
1005 continue;
1006 }
1007
1008 if let ClientControl::OpenFiles { .. } | ClientControl::OpenWindow { .. } = msg {
1011 } else if disconnected.contains(&idx) {
1013 continue;
1015 }
1016
1017 match msg {
1018 ClientControl::Hello(_) => {
1019 tracing::warn!("Unexpected Hello from client");
1020 }
1021 ClientControl::Resize { cols, rows } => {
1022 if let Some(client) = self.clients.get_mut(idx) {
1023 client.term_size = TermSize::new(cols, rows);
1024 if idx == 0 {
1026 self.term_size = TermSize::new(cols, rows);
1027 resize_occurred = true;
1028 }
1029 }
1030 }
1031 ClientControl::Ping => {
1032 if let Some(client) = self.clients.get_mut(idx) {
1033 let pong = serde_json::to_string(&ServerControl::Pong).unwrap_or_default();
1034 #[allow(clippy::let_underscore_must_use)]
1036 let _ = client.conn.write_control(&pong);
1037 }
1038 }
1039 ClientControl::Detach => {
1040 tracing::info!("Client {} detached", idx);
1041 disconnected.push(idx);
1042 }
1043 ClientControl::OpenFiles { files, wait } => {
1044 if let Some(ref mut editor) = self.editor {
1045 let wait_id = if wait {
1047 let id = self.next_wait_id;
1048 self.next_wait_id += 1;
1049 Some(id)
1050 } else {
1051 None
1052 };
1053
1054 let file_count = files.len();
1055 for (i, file_req) in files.iter().enumerate() {
1056 let path = std::path::PathBuf::from(&file_req.path);
1057 tracing::debug!(
1058 "Queuing file open: {:?} line={:?} col={:?} end_line={:?} end_col={:?} message={:?}",
1059 path,
1060 file_req.line,
1061 file_req.column,
1062 file_req.end_line,
1063 file_req.end_column,
1064 file_req.message,
1065 );
1066 let file_wait_id = if i == file_count - 1 { wait_id } else { None };
1068 editor.queue_file_open(
1069 path,
1070 file_req.line,
1071 file_req.column,
1072 file_req.end_line,
1073 file_req.end_column,
1074 file_req.message.clone(),
1075 file_wait_id,
1076 );
1077 }
1078
1079 if let Some(wait_id) = wait_id {
1081 if let Some(client) = self.clients.get_mut(idx) {
1082 self.waiting_clients.insert(wait_id, client.id);
1083 client.wait_id = Some(wait_id);
1084 }
1085 }
1086
1087 resize_occurred = true; }
1089 }
1090 ClientControl::OpenWindow { path } => {
1091 if let Some(ref mut editor) = self.editor {
1092 let path = std::path::PathBuf::from(path);
1093 if path.is_absolute() {
1094 let label = path
1095 .file_name()
1096 .map(|s| s.to_string_lossy().into_owned())
1097 .unwrap_or_else(|| path.to_string_lossy().into_owned());
1098 let id = editor.create_window_at(path, label);
1099 editor.set_active_window(id);
1100 resize_occurred = true; } else {
1102 tracing::warn!(
1103 "OpenWindow rejected: path must be absolute: {:?}",
1104 path
1105 );
1106 }
1107 }
1108 }
1109 ClientControl::Quit => unreachable!(), }
1111 }
1112
1113 for (idx, client) in self.clients.iter().enumerate() {
1115 if client.data_writer.is_broken() && !disconnected.contains(&idx) {
1116 tracing::info!("Client {} write pipe broken, disconnecting", client.id);
1117 disconnected.push(idx);
1118 }
1119 }
1120
1121 disconnected.sort_unstable();
1123 disconnected.dedup();
1124
1125 for idx in disconnected.into_iter().rev() {
1127 let client = self.clients.remove(idx);
1128 if let Some(wait_id) = client.wait_id {
1130 self.waiting_clients.remove(&wait_id);
1131 if let Some(ref mut editor) = self.editor {
1133 editor.remove_wait_tracking(wait_id);
1134 }
1135 }
1136 let teardown = terminal_teardown_sequences();
1138 let _ = client.data_writer.try_write(&teardown);
1139 tracing::info!("Client {} disconnected", client.id);
1140 if input_source_client == Some(idx) {
1142 input_source_client = None;
1143 }
1144 }
1145
1146 Ok((input_events, resize_occurred, input_source_client))
1147 }
1148
1149 fn update_terminal_size(&mut self) -> io::Result<()> {
1151 if let Some(ref mut terminal) = self.terminal {
1152 let backend = terminal.backend_mut();
1153 backend.resize(self.term_size.cols, self.term_size.rows);
1154 }
1155
1156 if let Some(ref mut editor) = self.editor {
1157 editor.resize(self.term_size.cols, self.term_size.rows);
1158 }
1159
1160 Ok(())
1161 }
1162
1163 fn handle_event(&mut self, event: Event) -> io::Result<bool> {
1165 let Some(ref mut editor) = self.editor else {
1166 return Ok(false);
1167 };
1168
1169 match event {
1170 Event::Key(key_event) => {
1171 if key_event.kind == KeyEventKind::Press {
1172 editor
1173 .handle_key(key_event.code, key_event.modifiers)
1174 .map_err(|e| io::Error::other(e.to_string()))?;
1175 Ok(true)
1176 } else {
1177 Ok(false)
1178 }
1179 }
1180 Event::Mouse(mouse_event) => editor
1181 .handle_mouse(mouse_event)
1182 .map_err(|e| io::Error::other(e.to_string())),
1183 Event::Resize(w, h) => {
1184 editor.resize(w, h);
1185 Ok(true)
1186 }
1187 Event::Paste(text) => {
1188 editor.paste_text(text);
1189 Ok(true)
1190 }
1191 _ => Ok(false),
1192 }
1193 }
1194
1195 fn render_and_broadcast(&mut self) -> io::Result<()> {
1197 let Some(ref mut editor) = self.editor else {
1198 return Ok(());
1199 };
1200
1201 let Some(ref mut terminal) = self.terminal else {
1202 return Ok(());
1203 };
1204
1205 let any_needs_full = self.clients.iter().any(|c| c.needs_full_render);
1207 if any_needs_full {
1208 tracing::info!(
1209 "Full render requested for {} client(s)",
1210 self.clients.iter().filter(|c| c.needs_full_render).count()
1211 );
1212 terminal.backend_mut().reset_style_state();
1214 #[allow(clippy::let_underscore_must_use)]
1216 let _ = terminal.clear();
1217 }
1218
1219 let pending_sequences = editor.take_pending_escape_sequences();
1221
1222 terminal
1224 .draw(|frame| editor.render(frame))
1225 .map_err(|e| io::Error::other(e.to_string()))?;
1226
1227 let output = terminal.backend_mut().take_buffer();
1229
1230 if output.is_empty() && pending_sequences.is_empty() {
1231 return Ok(());
1232 }
1233
1234 for client in &mut self.clients {
1236 if client.wait_id.is_some() {
1237 continue;
1238 }
1239 let frame = if !pending_sequences.is_empty() && !output.is_empty() {
1241 let mut combined = Vec::with_capacity(pending_sequences.len() + output.len());
1242 combined.extend_from_slice(&pending_sequences);
1243 combined.extend_from_slice(&output);
1244 combined
1245 } else if !pending_sequences.is_empty() {
1246 pending_sequences.clone()
1247 } else {
1248 output.clone()
1249 };
1250
1251 if !frame.is_empty() && !client.data_writer.try_write(&frame) {
1252 tracing::warn!("Client {} output buffer full, dropping frame", client.id);
1253 }
1254 client.needs_full_render = false;
1256 }
1257
1258 Ok(())
1259 }
1260
1261 fn disconnect_all_clients(&mut self, reason: &str) -> io::Result<()> {
1263 let teardown = terminal_teardown_sequences();
1264 for client in &mut self.clients {
1265 #[allow(clippy::let_underscore_must_use)]
1267 let _ = client.data_writer.try_write(&teardown);
1268 let quit_msg = serde_json::to_string(&ServerControl::Quit {
1269 reason: reason.to_string(),
1270 })
1271 .unwrap_or_default();
1272 #[allow(clippy::let_underscore_must_use)]
1274 let _ = client.conn.write_control(&quit_msg);
1275 }
1276 self.clients.clear();
1277 Ok(())
1278 }
1279}
1280
1281impl ConnectedClient {
1282 #[allow(dead_code)]
1284 pub fn term(&self) -> Option<&str> {
1285 self.env.get("TERM").and_then(|v| v.as_deref())
1286 }
1287
1288 #[allow(dead_code)]
1290 pub fn supports_truecolor(&self) -> bool {
1291 self.env
1292 .get("COLORTERM")
1293 .and_then(|v| v.as_deref())
1294 .map(|v| v == "truecolor" || v == "24bit")
1295 .unwrap_or(false)
1296 }
1297}