1use std::io;
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::mpsc;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use crossterm::event::{Event, KeyEventKind};
16use ratatui::Terminal;
17
18use crate::app::Editor;
19use crate::config::Config;
20use crate::config_io::DirectoryContext;
21use crate::server::capture_backend::{
24 terminal_setup_sequences, terminal_teardown_sequences, CaptureBackend,
25};
26use crate::server::input_parser::InputParser;
27use crate::server::ipc::{ServerConnection, ServerListener, SocketPaths, StreamWrapper};
28use crate::server::protocol::{
29 ClientControl, ServerControl, ServerHello, TermSize, VersionMismatch, PROTOCOL_VERSION,
30};
31use crate::view::color_support::ColorCapability;
32
33pub struct EditorServerConfig {
35 pub working_dir: PathBuf,
37 pub session_name: Option<String>,
39 pub idle_timeout: Option<Duration>,
41 pub editor_config: Config,
43 pub dir_context: DirectoryContext,
45 pub plugins_enabled: bool,
47 pub init_enabled: bool,
49 pub startup_authority: Option<crate::services::authority::Authority>,
56 pub workspace_trust: Arc<crate::services::workspace_trust::WorkspaceTrust>,
61 pub env_provider: Arc<crate::services::env_provider::EnvProvider>,
65 pub session_keepalive: Option<Box<dyn std::any::Any + Send>>,
72}
73
74pub struct EditorServer {
76 config: EditorServerConfig,
77 listener: ServerListener,
78 clients: Vec<ConnectedClient>,
79 editor: Option<Editor>,
80 terminal: Option<Terminal<CaptureBackend>>,
81 last_client_activity: Instant,
82 shutdown: Arc<AtomicBool>,
83 term_size: TermSize,
85 last_input_client: Option<usize>,
87 next_wait_id: u64,
89 waiting_clients: std::collections::HashMap<u64, u64>,
91 current_authority: crate::services::authority::Authority,
98 workspace_trust: Arc<crate::services::workspace_trust::WorkspaceTrust>,
104 env_provider: Arc<crate::services::env_provider::EnvProvider>,
106 #[allow(dead_code)]
112 session_keepalive: Option<Box<dyn std::any::Any + Send>>,
113}
114
115struct ClientDataWriter {
122 sender: mpsc::SyncSender<Vec<u8>>,
123 pipe_broken: Arc<AtomicBool>,
124}
125
126impl ClientDataWriter {
127 fn new(data: StreamWrapper, client_id: u64) -> Self {
129 let (tx, rx) = mpsc::sync_channel::<Vec<u8>>(16);
131 let pipe_broken = Arc::new(AtomicBool::new(false));
132 let pipe_broken_clone = pipe_broken.clone();
133
134 std::thread::Builder::new()
135 .name(format!("client-{}-writer", client_id))
136 .spawn(move || {
137 while let Ok(buf) = rx.recv() {
138 if let Err(e) = data.write_all(&buf) {
139 tracing::debug!("Client {} writer pipe error: {}", client_id, e);
140 pipe_broken_clone.store(true, Ordering::Relaxed);
141 break;
142 }
143 if let Err(e) = data.flush() {
144 tracing::debug!("Client {} writer flush error: {}", client_id, e);
145 pipe_broken_clone.store(true, Ordering::Relaxed);
146 break;
147 }
148 }
149 tracing::debug!("Client {} writer thread exiting", client_id);
150 })
151 .expect("Failed to spawn client writer thread");
152
153 Self {
154 sender: tx,
155 pipe_broken,
156 }
157 }
158
159 fn try_write(&self, data: &[u8]) -> bool {
162 self.sender.try_send(data.to_vec()).is_ok()
163 }
164
165 fn is_broken(&self) -> bool {
167 self.pipe_broken.load(Ordering::Relaxed)
168 }
169}
170
171struct ConnectedClient {
173 conn: ServerConnection,
174 data_writer: ClientDataWriter,
176 term_size: TermSize,
177 env: std::collections::HashMap<String, Option<String>>,
178 id: u64,
179 input_parser: InputParser,
180 needs_full_render: bool,
182 wait_id: Option<u64>,
184}
185
186impl EditorServer {
187 pub fn new(mut config: EditorServerConfig) -> io::Result<Self> {
189 let socket_paths = if let Some(ref name) = config.session_name {
190 SocketPaths::for_session_name(name)?
191 } else {
192 SocketPaths::for_working_dir(&config.working_dir)?
193 };
194
195 let listener = ServerListener::bind(socket_paths)?;
196
197 let pid = std::process::id();
199 if let Err(e) = listener.paths().write_pid(pid) {
200 tracing::warn!("Failed to write PID file: {}", e);
201 }
202
203 let workspace_trust = Arc::clone(&config.workspace_trust);
209 let env_provider = Arc::clone(&config.env_provider);
210
211 let current_authority = config.startup_authority.take().unwrap_or_else(|| {
216 crate::services::authority::Authority::local(
217 Arc::clone(&workspace_trust),
218 Arc::clone(&env_provider),
219 )
220 });
221 let session_keepalive = config.session_keepalive.take();
222
223 Ok(Self {
224 config,
225 listener,
226 clients: Vec::new(),
227 editor: None,
228 terminal: None,
229 last_client_activity: Instant::now(),
230 shutdown: Arc::new(AtomicBool::new(false)),
231 term_size: TermSize::new(80, 24), last_input_client: None,
233 next_wait_id: 1,
234 waiting_clients: std::collections::HashMap::new(),
235 current_authority,
236 workspace_trust,
237 env_provider,
238 session_keepalive,
239 })
240 }
241
242 pub fn shutdown_handle(&self) -> Arc<AtomicBool> {
244 self.shutdown.clone()
245 }
246
247 pub fn socket_paths(&self) -> &SocketPaths {
249 self.listener.paths()
250 }
251
252 pub fn editor(&self) -> Option<&Editor> {
254 self.editor.as_ref()
255 }
256
257 pub fn editor_mut(&mut self) -> Option<&mut Editor> {
259 self.editor.as_mut()
260 }
261
262 pub fn run(&mut self) -> io::Result<()> {
264 tracing::info!("Editor server starting for {:?}", self.config.working_dir);
265
266 let mut next_client_id = 1u64;
267 let mut needs_render = true;
268 let mut last_render = Instant::now();
269 const FRAME_DURATION: Duration = Duration::from_millis(16); loop {
272 if self.shutdown.load(Ordering::SeqCst) {
274 tracing::info!("Shutdown requested");
275 break;
276 }
277
278 if let Some(timeout) = self.config.idle_timeout {
280 if self.clients.is_empty() && self.last_client_activity.elapsed() > timeout {
281 tracing::info!("Idle timeout reached, shutting down");
282 break;
283 }
284 }
285
286 tracing::debug!("[server] main loop: calling accept()");
288 match self.listener.accept() {
289 Ok(Some(conn)) => {
290 let cursor_style = self
292 .editor
293 .as_ref()
294 .map(|e| e.config().editor.cursor_style)
295 .unwrap_or(self.config.editor_config.editor.cursor_style);
296 match self.handle_new_connection(conn, next_client_id, cursor_style) {
297 Ok(client) => {
298 tracing::info!("Client {} connected", client.id);
299
300 if self.editor.is_none() {
302 self.term_size = client.term_size;
304 self.initialize_editor()?;
305 } else if self.clients.is_empty() {
306 if self.term_size != client.term_size {
308 self.term_size = client.term_size;
309 self.update_terminal_size()?;
310 }
311 }
312 self.clients.push(client);
315 self.last_client_activity = Instant::now();
316 next_client_id += 1;
317 needs_render = true;
318 }
319 Err(e) => {
320 tracing::warn!("Failed to complete handshake: {}", e);
321 }
322 }
323 }
324 Ok(None) => {}
325 Err(e) => {
326 tracing::error!("Accept error: {}", e);
327 }
328 }
329
330 tracing::debug!("[server] main loop: calling process_clients");
332 let (input_events, resize_occurred, input_source) = self.process_clients()?;
333 if let Some(idx) = input_source {
334 self.last_input_client = Some(idx);
335 }
336 if !input_events.is_empty() {
337 tracing::debug!(
338 "[server] process_clients returned {} events",
339 input_events.len()
340 );
341 }
342
343 if let Some(ref mut editor) = self.editor {
351 if editor.should_quit() {
352 let pending_authority = editor.take_pending_authority();
353 let restart_dir = editor.take_restart_dir();
354 if pending_authority.is_some() || restart_dir.is_some() {
355 tracing::info!(
356 "Session rebuild requested (authority={}, dir={})",
357 pending_authority.is_some(),
358 restart_dir.is_some()
359 );
360 if let Err(e) = self.rebuild_editor(restart_dir, pending_authority) {
361 tracing::error!("Session rebuild failed, shutting down: {}", e);
362 self.shutdown.store(true, Ordering::SeqCst);
363 continue;
364 }
365 needs_render = true;
366 continue;
367 }
368 tracing::info!("Editor requested quit");
369 self.shutdown.store(true, Ordering::SeqCst);
370 continue;
371 }
372 }
373
374 let detach_requested = self
376 .editor
377 .as_ref()
378 .map(|e| e.should_detach())
379 .unwrap_or(false);
380 if detach_requested {
381 if let Some(idx) = self.last_input_client.take() {
383 if idx < self.clients.len() {
384 tracing::info!("Client {} requested detach", self.clients[idx].id);
385 let client = self.clients.remove(idx);
386 let teardown = terminal_teardown_sequences();
387 #[allow(clippy::let_underscore_must_use)]
389 let _ = client.data_writer.try_write(&teardown);
390 let quit_msg = serde_json::to_string(&ServerControl::Quit {
391 reason: "Detached".to_string(),
392 })
393 .unwrap_or_default();
394 #[allow(clippy::let_underscore_must_use)]
396 let _ = client.conn.write_control(&quit_msg);
397 }
398 } else {
399 tracing::info!("Detach requested but no input source, detaching all");
401 self.disconnect_all_clients("Detached")?;
402 }
403 if let Some(ref mut editor) = self.editor {
405 editor.clear_detach();
406 }
407 continue;
408 }
409
410 let suspend_requested = self
416 .editor
417 .as_mut()
418 .map(|e| e.take_suspend_request())
419 .unwrap_or(false);
420 if suspend_requested {
421 if let Some(idx) = self.last_input_client {
422 if idx < self.clients.len() {
423 let client_id = self.clients[idx].id;
424 tracing::info!("Client {} requested suspend", client_id);
425 let suspend_msg = serde_json::to_string(&ServerControl::SuspendClient)
426 .unwrap_or_default();
427 #[allow(clippy::let_underscore_must_use)]
429 let _ = self.clients[idx].conn.write_control(&suspend_msg);
430 self.clients[idx].needs_full_render = true;
434 }
435 } else {
436 tracing::warn!("Suspend requested but no input source; ignoring");
437 }
438 continue;
439 }
440
441 if resize_occurred {
443 self.update_terminal_size()?;
444 needs_render = true;
445 }
446
447 if !input_events.is_empty() {
449 self.last_client_activity = Instant::now();
450 for event in input_events {
451 if self.handle_event(event)? {
452 needs_render = true;
453 }
454 }
455 }
456
457 if let Some(ref mut editor) = self.editor {
459 if editor.process_async_messages() {
460 needs_render = true;
461 }
462 if editor.process_pending_file_opens() {
463 needs_render = true;
464 }
465
466 for wait_id in editor.take_completed_waits() {
468 if let Some(client_id) = self.waiting_clients.remove(&wait_id) {
469 if let Some(client) = self.clients.iter_mut().find(|c| c.id == client_id) {
471 let msg = serde_json::to_string(&ServerControl::WaitComplete)
472 .unwrap_or_default();
473 #[allow(clippy::let_underscore_must_use)]
474 let _ = client.conn.write_control(&msg);
475 client.wait_id = None;
476 }
477 }
478 }
479
480 if let Some(cb) = editor.take_pending_clipboard() {
482 let msg = serde_json::to_string(&ServerControl::SetClipboard {
483 text: cb.text,
484 use_osc52: cb.use_osc52,
485 use_system_clipboard: cb.use_system_clipboard,
486 })
487 .unwrap_or_default();
488 for client in &mut self.clients {
489 #[allow(clippy::let_underscore_must_use)]
490 let _ = client.conn.write_control(&msg);
491 }
492 }
493
494 if editor.check_mouse_hover_timer() {
495 needs_render = true;
496 }
497
498 if editor.active_window().animations.is_active() {
506 needs_render = true;
507 }
508 }
509
510 if needs_render && last_render.elapsed() >= FRAME_DURATION {
512 self.render_and_broadcast()?;
513 last_render = Instant::now();
514 needs_render = false;
515 }
516
517 std::thread::sleep(Duration::from_millis(5));
519 }
520
521 if let Some(ref mut editor) = self.editor {
524 if editor.config().editor.auto_save_enabled {
526 match editor.save_all_on_exit() {
527 Ok(count) if count > 0 => {
528 tracing::info!("Auto-saved {} buffer(s) on exit", count);
529 }
530 Ok(_) => {}
531 Err(e) => {
532 tracing::warn!("Failed to auto-save on exit: {}", e);
533 }
534 }
535 }
536
537 if let Err(e) = editor.end_recovery_session() {
540 tracing::warn!("Failed to end recovery session: {}", e);
541 }
542 if let Err(e) = editor.save_all_windows_workspaces() {
543 tracing::warn!("Failed to save workspaces: {}", e);
544 } else {
545 tracing::debug!("Workspaces saved successfully");
546 }
547 }
548
549 self.disconnect_all_clients("Server shutting down")?;
551
552 Ok(())
553 }
554
555 fn build_editor_instance(&self) -> io::Result<(Editor, Terminal<CaptureBackend>)> {
559 let backend = CaptureBackend::new(self.term_size.cols, self.term_size.rows);
560 let terminal = Terminal::new(backend)
561 .map_err(|e| io::Error::other(format!("Failed to create terminal: {}", e)))?;
562
563 let filesystem = self.current_authority.filesystem.clone();
568 let color_capability = ColorCapability::TrueColor; let mut editor = Editor::with_working_dir(
571 self.config.editor_config.clone(),
572 self.term_size.cols,
573 self.term_size.rows,
574 Some(self.config.working_dir.clone()),
575 self.config.dir_context.clone(),
576 self.config.plugins_enabled,
577 color_capability,
578 filesystem,
579 )
580 .map_err(|e| io::Error::other(format!("Failed to create editor: {}", e)))?;
581
582 editor.set_boot_authority(self.current_authority.clone());
585
586 editor.load_init_script(self.config.init_enabled);
588
589 editor.set_session_mode(true);
591
592 let session_display_name = self.config.session_name.clone().unwrap_or_else(|| {
594 self.config
596 .working_dir
597 .file_name()
598 .and_then(|n| n.to_str())
599 .map(|s| s.to_string())
600 .unwrap_or_else(|| "session".to_string())
601 });
602 editor.set_session_name(Some(session_display_name));
603
604 Ok((editor, terminal))
605 }
606
607 pub fn initialize_editor(&mut self) -> io::Result<()> {
614 let (mut editor, terminal) = self.build_editor_instance()?;
615
616 match editor.try_restore_workspace() {
619 Ok(true) => {
620 tracing::info!("Session workspace restored successfully");
621 }
622 Ok(false) => {
623 tracing::debug!("No previous session workspace found");
624 }
625 Err(e) => {
626 tracing::warn!("Failed to restore session workspace: {}", e);
627 }
628 }
629
630 if editor.has_recovery_files().unwrap_or(false) {
632 tracing::info!("Recovery files found for session, recovering...");
633 match editor.recover_all_buffers() {
634 Ok(count) if count > 0 => {
635 tracing::info!("Recovered {} buffer(s) for session", count);
636 }
637 Ok(_) => {
638 tracing::info!("No buffers to recover for session");
639 }
640 Err(e) => {
641 tracing::warn!("Failed to recover session buffers: {}", e);
642 }
643 }
644 }
645
646 if let Err(e) = editor.start_recovery_session() {
648 tracing::warn!("Failed to start recovery session: {}", e);
649 }
650
651 self.terminal = Some(terminal);
652 self.editor = Some(editor);
653
654 self.maybe_prompt_workspace_trust();
655
656 tracing::info!(
657 "Editor initialized with size {}x{}",
658 self.term_size.cols,
659 self.term_size.rows
660 );
661
662 Ok(())
663 }
664
665 fn maybe_prompt_workspace_trust(&mut self) {
669 if let Some(editor) = self.editor.as_mut() {
670 editor.maybe_prompt_workspace_trust();
671 }
672 }
673
674 pub(crate) fn rebuild_editor(
686 &mut self,
687 new_working_dir: Option<PathBuf>,
688 new_authority: Option<crate::services::authority::Authority>,
689 ) -> io::Result<()> {
690 if let Some(ref mut editor) = self.editor {
694 if editor.config().editor.auto_save_enabled {
695 if let Err(e) = editor.save_all_on_exit() {
696 tracing::warn!("Rebuild: failed to auto-save on exit: {}", e);
697 }
698 }
699 if let Err(e) = editor.end_recovery_session() {
700 tracing::warn!("Rebuild: failed to end recovery session: {}", e);
701 }
702 if let Err(e) = editor.save_all_windows_workspaces() {
703 tracing::warn!("Rebuild: failed to save workspaces: {}", e);
704 }
705 }
706
707 self.editor = None;
710 self.terminal = None;
711
712 if let Some(dir) = new_working_dir {
714 tracing::info!("Rebuild: switching working dir to {}", dir.display());
715 self.config.working_dir = dir;
716 self.workspace_trust
720 .set_root(Some(self.config.working_dir.clone()));
721 self.workspace_trust.set_store(Some(
722 crate::services::workspace_trust::TrustStore::for_project_dir(
723 &self
724 .config
725 .dir_context
726 .project_state_dir(&self.config.working_dir),
727 ),
728 ));
729 self.env_provider.clear();
732 }
733 if let Some(auth) = new_authority {
734 tracing::info!(
735 "Rebuild: installing authority with label {:?}",
736 auth.display_label
737 );
738 self.current_authority = auth;
739 }
740
741 let (mut editor, terminal) = self.build_editor_instance()?;
742
743 match editor.try_restore_workspace() {
747 Ok(true) => tracing::info!("Rebuild: workspace restored"),
748 Ok(false) => tracing::debug!("Rebuild: no workspace to restore"),
749 Err(e) => tracing::warn!("Rebuild: failed to restore workspace: {}", e),
750 }
751
752 if let Err(e) = editor.start_recovery_session() {
753 tracing::warn!("Rebuild: failed to start recovery session: {}", e);
754 }
755
756 self.terminal = Some(terminal);
757 self.editor = Some(editor);
758
759 self.maybe_prompt_workspace_trust();
763
764 for client in &mut self.clients {
767 client.needs_full_render = true;
768 }
769
770 tracing::info!(
771 "Rebuild: complete, {} clients kept attached",
772 self.clients.len()
773 );
774
775 Ok(())
776 }
777
778 fn handle_new_connection(
780 &self,
781 conn: ServerConnection,
782 client_id: u64,
783 cursor_style: crate::config::CursorStyle,
784 ) -> io::Result<ConnectedClient> {
785 #[cfg(not(windows))]
789 conn.control.set_nonblocking(false)?;
790 let hello_json = conn
791 .read_control()?
792 .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "No hello received"))?;
793
794 let client_msg: ClientControl = serde_json::from_str(&hello_json)
795 .map_err(|e| io::Error::other(format!("Invalid hello: {}", e)))?;
796
797 let hello = match client_msg {
798 ClientControl::Hello(h) => h,
799 _ => {
800 return Err(io::Error::other("Expected Hello message"));
801 }
802 };
803
804 if hello.protocol_version != PROTOCOL_VERSION {
806 let mismatch = VersionMismatch {
807 server_version: env!("CARGO_PKG_VERSION").to_string(),
808 client_version: hello.client_version.clone(),
809 action: if hello.protocol_version > PROTOCOL_VERSION {
810 "upgrade_server".to_string()
811 } else {
812 "restart_server".to_string()
813 },
814 message: format!(
815 "Protocol version mismatch: server={}, client={}",
816 PROTOCOL_VERSION, hello.protocol_version
817 ),
818 };
819
820 let response = serde_json::to_string(&ServerControl::VersionMismatch(mismatch))
821 .map_err(|e| io::Error::other(e.to_string()))?;
822 conn.write_control(&response)?;
823
824 return Err(io::Error::other("Version mismatch"));
825 }
826
827 let session_id = self.config.session_name.clone().unwrap_or_else(|| {
829 crate::workspace::encode_path_for_filename(&self.config.working_dir)
830 });
831
832 let server_hello = ServerHello::new(session_id);
833 let response = serde_json::to_string(&ServerControl::Hello(server_hello))
834 .map_err(|e| io::Error::other(e.to_string()))?;
835 conn.write_control(&response)?;
836
837 #[cfg(not(windows))]
840 conn.control.set_nonblocking(true)?;
841
842 let mouse_hover_enabled = self.config.editor_config.editor.mouse_hover_enabled;
844 let setup = terminal_setup_sequences(mouse_hover_enabled);
845 conn.write_data(&setup)?;
846
847 conn.write_data(cursor_style.to_escape_sequence())?;
849
850 tracing::debug!(
851 "Client {} connected: {}x{}, TERM={:?}",
852 client_id,
853 hello.term_size.cols,
854 hello.term_size.rows,
855 hello.term()
856 );
857
858 let data_writer = ClientDataWriter::new(conn.data.clone(), client_id);
860
861 Ok(ConnectedClient {
862 conn,
863 data_writer,
864 term_size: hello.term_size,
865 env: hello.env,
866 id: client_id,
867 input_parser: InputParser::new(),
868 needs_full_render: true,
869 wait_id: None,
870 })
871 }
872
873 fn process_clients(&mut self) -> io::Result<(Vec<Event>, bool, Option<usize>)> {
876 let mut disconnected = Vec::new();
877 let mut input_source_client: Option<usize> = None;
878 let mut input_events = Vec::new();
879 let mut resize_occurred = false;
880 let mut control_messages: Vec<(usize, ClientControl)> = Vec::new();
881
882 for (idx, client) in self.clients.iter_mut().enumerate() {
883 let mut buf = [0u8; 4096];
885 let mut data_eof = false;
886 tracing::debug!("[server] reading from client {} data socket", client.id);
887 match client.conn.read_data(&mut buf) {
888 Ok(0) => {
889 tracing::debug!("[server] Client {} data stream closed (EOF)", client.id);
890 if client.wait_id.is_none() {
892 disconnected.push(idx);
893 }
894 data_eof = true;
895 }
897 Ok(n) => {
898 tracing::debug!(
899 "[server] Client {} read {} bytes from data socket",
900 client.id,
901 n
902 );
903 let events = client.input_parser.parse(&buf[..n]);
904 tracing::debug!(
905 "[server] Client {} parsed {} events",
906 client.id,
907 events.len()
908 );
909 if !events.is_empty() {
910 input_source_client = Some(idx);
911 }
912 input_events.extend(events);
913 }
914 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
915 }
917 Err(e) => {
918 tracing::warn!("[server] Client {} data read error: {}", client.id, e);
919 disconnected.push(idx);
920 data_eof = true;
921 }
923 }
924 let _ = data_eof; #[cfg(not(windows))]
930 #[allow(clippy::let_underscore_must_use)]
931 let _ = client.conn.control.set_nonblocking(true);
932
933 #[cfg(windows)]
935 {
936 let mut buf = [0u8; 1024];
937 match client.conn.control.try_read(&mut buf) {
938 Ok(0) => {
939 tracing::debug!("Client {} control stream closed (EOF)", client.id);
940 disconnected.push(idx);
941 }
942 Ok(n) => {
943 if let Ok(s) = std::str::from_utf8(&buf[..n]) {
945 for line in s.lines() {
946 if !line.trim().is_empty() {
947 if let Ok(msg) = serde_json::from_str::<ClientControl>(line) {
948 control_messages.push((idx, msg));
949 }
950 }
951 }
952 }
953 }
954 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
955 Err(e) => {
956 tracing::warn!("Client {} control read error: {}", client.id, e);
957 }
958 }
959 }
960
961 #[cfg(not(windows))]
962 {
963 let mut reader = std::io::BufReader::new(&client.conn.control);
964 let mut line = String::new();
965 match std::io::BufRead::read_line(&mut reader, &mut line) {
966 Ok(0) => {
967 tracing::debug!("Client {} control stream closed (EOF)", client.id);
968 disconnected.push(idx);
969 }
970 Ok(_) if !line.trim().is_empty() => {
971 if let Ok(msg) = serde_json::from_str::<ClientControl>(&line) {
972 control_messages.push((idx, msg));
973 }
974 }
975 Ok(_) => {}
976 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
977 Err(e) => {
978 tracing::warn!("Client {} control read error: {}", client.id, e);
979 }
980 }
981 }
982 }
983
984 eprintln!(
986 "[server] Processing {} control messages",
987 control_messages.len()
988 );
989 for (idx, msg) in control_messages {
990 eprintln!("[server] Control message from client {}: {:?}", idx, msg);
991 if let ClientControl::Quit = msg {
993 tracing::info!("Client requested quit, shutting down");
994 self.shutdown.store(true, Ordering::SeqCst);
995 continue;
996 }
997
998 if let ClientControl::OpenFiles { .. } = msg {
1000 } else if disconnected.contains(&idx) {
1002 continue;
1004 }
1005
1006 match msg {
1007 ClientControl::Hello(_) => {
1008 tracing::warn!("Unexpected Hello from client");
1009 }
1010 ClientControl::Resize { cols, rows } => {
1011 if let Some(client) = self.clients.get_mut(idx) {
1012 client.term_size = TermSize::new(cols, rows);
1013 if idx == 0 {
1015 self.term_size = TermSize::new(cols, rows);
1016 resize_occurred = true;
1017 }
1018 }
1019 }
1020 ClientControl::Ping => {
1021 if let Some(client) = self.clients.get_mut(idx) {
1022 let pong = serde_json::to_string(&ServerControl::Pong).unwrap_or_default();
1023 #[allow(clippy::let_underscore_must_use)]
1025 let _ = client.conn.write_control(&pong);
1026 }
1027 }
1028 ClientControl::Detach => {
1029 tracing::info!("Client {} detached", idx);
1030 disconnected.push(idx);
1031 }
1032 ClientControl::OpenFiles { files, wait } => {
1033 if let Some(ref mut editor) = self.editor {
1034 let wait_id = if wait {
1036 let id = self.next_wait_id;
1037 self.next_wait_id += 1;
1038 Some(id)
1039 } else {
1040 None
1041 };
1042
1043 let file_count = files.len();
1044 for (i, file_req) in files.iter().enumerate() {
1045 let path = std::path::PathBuf::from(&file_req.path);
1046 tracing::debug!(
1047 "Queuing file open: {:?} line={:?} col={:?} end_line={:?} end_col={:?} message={:?}",
1048 path,
1049 file_req.line,
1050 file_req.column,
1051 file_req.end_line,
1052 file_req.end_column,
1053 file_req.message,
1054 );
1055 let file_wait_id = if i == file_count - 1 { wait_id } else { None };
1057 editor.queue_file_open(
1058 path,
1059 file_req.line,
1060 file_req.column,
1061 file_req.end_line,
1062 file_req.end_column,
1063 file_req.message.clone(),
1064 file_wait_id,
1065 );
1066 }
1067
1068 if let Some(wait_id) = wait_id {
1070 if let Some(client) = self.clients.get_mut(idx) {
1071 self.waiting_clients.insert(wait_id, client.id);
1072 client.wait_id = Some(wait_id);
1073 }
1074 }
1075
1076 resize_occurred = true; }
1078 }
1079 ClientControl::Quit => unreachable!(), }
1081 }
1082
1083 for (idx, client) in self.clients.iter().enumerate() {
1085 if client.data_writer.is_broken() && !disconnected.contains(&idx) {
1086 tracing::info!("Client {} write pipe broken, disconnecting", client.id);
1087 disconnected.push(idx);
1088 }
1089 }
1090
1091 disconnected.sort_unstable();
1093 disconnected.dedup();
1094
1095 for idx in disconnected.into_iter().rev() {
1097 let client = self.clients.remove(idx);
1098 if let Some(wait_id) = client.wait_id {
1100 self.waiting_clients.remove(&wait_id);
1101 if let Some(ref mut editor) = self.editor {
1103 editor.remove_wait_tracking(wait_id);
1104 }
1105 }
1106 let teardown = terminal_teardown_sequences();
1108 let _ = client.data_writer.try_write(&teardown);
1109 tracing::info!("Client {} disconnected", client.id);
1110 if input_source_client == Some(idx) {
1112 input_source_client = None;
1113 }
1114 }
1115
1116 Ok((input_events, resize_occurred, input_source_client))
1117 }
1118
1119 fn update_terminal_size(&mut self) -> io::Result<()> {
1121 if let Some(ref mut terminal) = self.terminal {
1122 let backend = terminal.backend_mut();
1123 backend.resize(self.term_size.cols, self.term_size.rows);
1124 }
1125
1126 if let Some(ref mut editor) = self.editor {
1127 editor.resize(self.term_size.cols, self.term_size.rows);
1128 }
1129
1130 Ok(())
1131 }
1132
1133 fn handle_event(&mut self, event: Event) -> io::Result<bool> {
1135 let Some(ref mut editor) = self.editor else {
1136 return Ok(false);
1137 };
1138
1139 match event {
1140 Event::Key(key_event) => {
1141 if key_event.kind == KeyEventKind::Press {
1142 editor
1143 .handle_key(key_event.code, key_event.modifiers)
1144 .map_err(|e| io::Error::other(e.to_string()))?;
1145 Ok(true)
1146 } else {
1147 Ok(false)
1148 }
1149 }
1150 Event::Mouse(mouse_event) => editor
1151 .handle_mouse(mouse_event)
1152 .map_err(|e| io::Error::other(e.to_string())),
1153 Event::Resize(w, h) => {
1154 editor.resize(w, h);
1155 Ok(true)
1156 }
1157 Event::Paste(text) => {
1158 editor.paste_text(text);
1159 Ok(true)
1160 }
1161 _ => Ok(false),
1162 }
1163 }
1164
1165 fn render_and_broadcast(&mut self) -> io::Result<()> {
1167 let Some(ref mut editor) = self.editor else {
1168 return Ok(());
1169 };
1170
1171 let Some(ref mut terminal) = self.terminal else {
1172 return Ok(());
1173 };
1174
1175 let any_needs_full = self.clients.iter().any(|c| c.needs_full_render);
1177 if any_needs_full {
1178 tracing::info!(
1179 "Full render requested for {} client(s)",
1180 self.clients.iter().filter(|c| c.needs_full_render).count()
1181 );
1182 terminal.backend_mut().reset_style_state();
1184 #[allow(clippy::let_underscore_must_use)]
1186 let _ = terminal.clear();
1187 }
1188
1189 let pending_sequences = editor.take_pending_escape_sequences();
1191
1192 terminal
1194 .draw(|frame| editor.render(frame))
1195 .map_err(|e| io::Error::other(e.to_string()))?;
1196
1197 let output = terminal.backend_mut().take_buffer();
1199
1200 if output.is_empty() && pending_sequences.is_empty() {
1201 return Ok(());
1202 }
1203
1204 for client in &mut self.clients {
1206 if client.wait_id.is_some() {
1207 continue;
1208 }
1209 let frame = if !pending_sequences.is_empty() && !output.is_empty() {
1211 let mut combined = Vec::with_capacity(pending_sequences.len() + output.len());
1212 combined.extend_from_slice(&pending_sequences);
1213 combined.extend_from_slice(&output);
1214 combined
1215 } else if !pending_sequences.is_empty() {
1216 pending_sequences.clone()
1217 } else {
1218 output.clone()
1219 };
1220
1221 if !frame.is_empty() && !client.data_writer.try_write(&frame) {
1222 tracing::warn!("Client {} output buffer full, dropping frame", client.id);
1223 }
1224 client.needs_full_render = false;
1226 }
1227
1228 Ok(())
1229 }
1230
1231 fn disconnect_all_clients(&mut self, reason: &str) -> io::Result<()> {
1233 let teardown = terminal_teardown_sequences();
1234 for client in &mut self.clients {
1235 #[allow(clippy::let_underscore_must_use)]
1237 let _ = client.data_writer.try_write(&teardown);
1238 let quit_msg = serde_json::to_string(&ServerControl::Quit {
1239 reason: reason.to_string(),
1240 })
1241 .unwrap_or_default();
1242 #[allow(clippy::let_underscore_must_use)]
1244 let _ = client.conn.write_control(&quit_msg);
1245 }
1246 self.clients.clear();
1247 Ok(())
1248 }
1249}
1250
1251impl ConnectedClient {
1252 #[allow(dead_code)]
1254 pub fn term(&self) -> Option<&str> {
1255 self.env.get("TERM").and_then(|v| v.as_deref())
1256 }
1257
1258 #[allow(dead_code)]
1260 pub fn supports_truecolor(&self) -> bool {
1261 self.env
1262 .get("COLORTERM")
1263 .and_then(|v| v.as_deref())
1264 .map(|v| v == "truecolor" || v == "24bit")
1265 .unwrap_or(false)
1266 }
1267}