nikau/
rotation.rs

1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::time::{Duration, Instant};
4
5use anyhow::{anyhow, bail, Context, Result};
6use quinn::SendStream;
7use serde::Serialize;
8use tokio::sync::{broadcast, mpsc, oneshot, watch as watchchan};
9use tokio::task;
10use tracing::{debug, error, info, trace, warn};
11
12use crate::device;
13use crate::msgs::{bulk, event};
14use crate::x11clipboard::{
15    reader::{ClipboardReader, ClipboardTypeWatcher},
16    writer::{ClipboardFetch, ClipboardWriter},
17    ClipboardData,
18};
19
20/// If the selected client reconnects within 10 seconds of being removed, then reselect it automatically.
21/// This is intended to help with fast recovery following networking flakes.
22const REMOVED_CLIENT_RECOVERY_DEADLINE: Duration = Duration::from_secs(10);
23
24/// Channels for communicating with a connected client.
25#[derive(Debug)]
26struct ClientInfo {
27    /// The primary identifier for a client. We can have multiple clients with the same fingerprint:
28    /// - When the user is sharing certificates between clients (they are free to do so)
29    /// - When a client has reconnected without the old connection timing out yet
30    endpoint: SocketAddr,
31    /// Cert fingerprint used to select clients via --shortcut-goto keyboard shortcuts
32    fingerprint: String,
33    events_send: SendStream,
34    bulk_send: SendStream,
35}
36
37/// Keeps track of the most recently disconnected client,
38/// used for automatically reactivating clients if they reconnect quickly.
39#[derive(Debug)]
40struct DefunctClientInfo {
41    /// Use the endpoint, not the fingerprint, to identify recently disconnected clients.
42    /// This reduces the likelihood of weird behavior if e.g. clients are sharing certificates.
43    /// In practice we only address clients by certificate with certain keyboard shortcuts.
44    endpoint: SocketAddr,
45    removed_at: Instant,
46}
47
48impl DefunctClientInfo {
49    /// Returns whether the specified endpoint should be reenabled as the selected client.
50    /// true is returned if the IPs match and if the defunct client was disconnected <= N seconds ago.
51    fn recoverable(&self, endpoint: SocketAddr, now: &Instant) -> bool {
52        // Only check IP, port is expected to change
53        endpoint.ip() == self.endpoint.ip() && !self.expired(now)
54    }
55
56    /// Returns whether this defunct client info has expired, in which case it can be cleared.
57    fn expired(&self, now: &Instant) -> bool {
58        now.duration_since(self.removed_at) > REMOVED_CLIENT_RECOVERY_DEADLINE
59    }
60}
61
62/// Tracks the location and type of the current clipboard
63#[derive(Debug)]
64struct ClipboardTarget {
65    /// None if the clipboard is at the server
66    source: Option<SocketAddr>,
67    types: Vec<String>,
68    max_size_bytes: u64,
69}
70
71/// Wrapper around server-local clipboard storage, if available.
72/// Clipboard contents can still be transferred by the server among clients if this is unavailable.
73pub struct LocalClipboard {
74    reader: ClipboardReader,
75    writer: ClipboardWriter,
76
77    /// Pending fetch request for the local server clipboard.
78    waiting_clipboard_tx: Option<oneshot::Sender<ClipboardData>>,
79}
80
81impl LocalClipboard {
82    pub async fn start(
83        config_dir: PathBuf,
84        rotation_tx: mpsc::Sender<RotationEvent>,
85        max_clipboard_size_bytes: u64,
86        max_uncompressed_size_bytes: u64,
87    ) -> Result<Self> {
88        let (clipboard_fetch_tx, mut clipboard_fetch_rx) = mpsc::channel::<ClipboardFetch>(32);
89        let (local_types_tx, mut local_types_rx) = watchchan::channel(vec![]);
90        ClipboardTypeWatcher::start(local_types_tx).await?;
91
92        task::spawn(async move {
93            loop {
94                tokio::select! {
95                    // Listen to local host requests to get the clipboard
96                    fetch_request = clipboard_fetch_rx.recv() => {
97                        if let Some(fetch_request) = fetch_request {
98                            // Got clipboard paste request from the local machine.
99                            // Pass the request through to the main rotation event handler.
100                            let event = RotationEvent::ClipboardRequestContent(ClipboardRequestContentArgs {
101                                request_source: ClipboardRequestSource::Local(fetch_request.fetch_result_tx),
102                                requested_type: fetch_request.requested_type,
103                                max_size_bytes: max_clipboard_size_bytes,
104                            });
105                            if let Err(e) = rotation_tx.send(event).await {
106                                error!("Failed to queue local clipboard request event: {:?}", e);
107                                break;
108                            }
109                        } else {
110                            error!("Clipboard fetch request queue has closed, exiting clipboard loop");
111                            break;
112                        }
113                    },
114                    // Listen to local host updates to the clipboard types
115                    types_notify = local_types_rx.changed() => {
116                        if let Err(e) = types_notify {
117                            error!("local_types_rx has closed: {}", e);
118                            break;
119                        }
120                        // Another application on the server machine has a clipboard entry.
121                        let event = RotationEvent::ClipboardUpdateSource(ClipboardUpdateSourceArgs {
122                            source: None,
123                            types: local_types_rx.borrow().clone(),
124                            max_size_bytes: max_clipboard_size_bytes,
125                        });
126                        if let Err(e) = rotation_tx.send(event).await {
127                            error!("Failed to queue update source event: {:?}", e);
128                            break;
129                        }
130                    }
131                }
132            }
133        });
134
135        Ok(Self {
136            reader: ClipboardReader::new().await?,
137            writer: ClipboardWriter::start(
138                config_dir,
139                max_uncompressed_size_bytes,
140                clipboard_fetch_tx,
141            )
142            .await?,
143            waiting_clipboard_tx: None,
144        })
145    }
146}
147
148pub enum RotationEvent {
149    /// Request to add a client to the rotation
150    AddClient(AddClientArgs),
151    /// Request to remove a disconnected client from the rotation
152    /// If the client currently owns the clipboard, that status is cleared
153    RemoveClient(SocketAddr),
154    /// Request to update the current clipboard location and info
155    ClipboardUpdateSource(ClipboardUpdateSourceArgs),
156    /// Request to fetch a current clipboard's content
157    ClipboardRequestContent(ClipboardRequestContentArgs),
158    /// Request to send a current clipboard's content in response to a prior request
159    ClipboardSendContent(ClipboardSendContentArgs),
160}
161
162pub struct AddClientArgs {
163    pub endpoint: SocketAddr,
164    pub fingerprint: String,
165    pub events_send: SendStream,
166    pub bulk_send: SendStream,
167}
168
169pub struct ClipboardUpdateSourceArgs {
170    pub source: Option<SocketAddr>,
171    pub types: Vec<String>,
172    // min of source_client_max (if any), and server_max:
173    pub max_size_bytes: u64,
174}
175
176pub struct ClipboardRequestContentArgs {
177    pub request_source: ClipboardRequestSource,
178    pub requested_type: String,
179    pub max_size_bytes: u64,
180}
181
182/// Pointer to where clipboard data should be sent once it's been fetched
183pub enum ClipboardRequestSource {
184    /// The clipboard is being requested from the local (server) machine.
185    /// The oneshot can be used for sending back the clipboard result.
186    Local(oneshot::Sender<ClipboardData>),
187
188    /// The clipboard is being requested from a remote client.
189    /// The data should be sent to the client's address.
190    Remote(SocketAddr),
191}
192
193impl<'a> std::fmt::Display for ClipboardRequestSource {
194    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
195        match self {
196            ClipboardRequestSource::Local(_) => f.write_str("Local"),
197            ClipboardRequestSource::Remote(addr) => {
198                f.write_str(format!("Remote({})", addr).as_str())
199            }
200        }
201    }
202}
203
204pub struct ClipboardSendContentArgs {
205    /// The client sending the clipboard data
206    pub data_source: SocketAddr,
207    /// Copied from the ServerClipboardRequest, indicates where the clipboard data should be sent
208    pub request_client: Option<SocketAddr>,
209    pub data: ClipboardData,
210}
211
212pub struct Rotation<O: device::output::OutputHandler> {
213    grab_tx: broadcast::Sender<device::GrabEvent>,
214    output_handler: O,
215    clients: Vec<ClientInfo>,
216    /// Use the endpoint, not the fingerprint, to uniquely identify clients.
217    /// This allows situations like a client reconnecting before the old socket has closed.
218    current_client: Option<SocketAddr>,
219    removed_current_client: Option<DefunctClientInfo>,
220    buf: Vec<u8>,
221    /// Access to the local system clipboard on the server.
222    local_clipboard: Option<LocalClipboard>,
223    /// Tracking the current clipboard owner, whether it's at the server or a client.
224    clipboard_target: Option<ClipboardTarget>,
225}
226
227impl<O: device::output::OutputHandler> Rotation<O> {
228    pub async fn new(
229        grab_tx: broadcast::Sender<device::GrabEvent>,
230        output_handler: O,
231        local_clipboard: Option<LocalClipboard>,
232    ) -> Result<Self> {
233        // Init required for space to be usable
234        let buf = vec![0; 1024];
235        Ok(Rotation {
236            grab_tx,
237            output_handler,
238            clients: Vec::new(),
239            current_client: None,
240            removed_current_client: None,
241            buf,
242            local_clipboard,
243            clipboard_target: None,
244        })
245    }
246
247    pub async fn accept(&mut self, event: RotationEvent) {
248        match event {
249            RotationEvent::AddClient(args) => {
250                self.add_client(
251                    args.endpoint,
252                    args.fingerprint,
253                    args.events_send,
254                    args.bulk_send,
255                )
256                .await
257            }
258            RotationEvent::RemoveClient(endpoint) => {
259                self.remove_client_and_clear_clipboard(endpoint).await
260            }
261            RotationEvent::ClipboardUpdateSource(args) => {
262                if let Err(e) = self
263                    .clipboard_update_source(args.source, args.types, args.max_size_bytes)
264                    .await
265                {
266                    warn!("Failed to update clipboard source to server: {:?}", e);
267                }
268            }
269            RotationEvent::ClipboardRequestContent(args) => {
270                if let Err(e) = self
271                    .clipboard_request_content(
272                        args.request_source,
273                        &args.requested_type,
274                        args.max_size_bytes,
275                    )
276                    .await
277                {
278                    warn!("Failed to retrieve clipboard content for server: {:?}", e);
279                }
280            }
281            RotationEvent::ClipboardSendContent(args) => {
282                if let Err(e) = self
283                    .clipboard_send_content_from_client(
284                        args.data_source,
285                        args.request_client,
286                        args.data,
287                    )
288                    .await
289                {
290                    warn!("Failed to send clipboard content to client: {:?}", e);
291                }
292            }
293        }
294    }
295
296    async fn add_client(
297        &mut self,
298        endpoint: SocketAddr,
299        fingerprint: String,
300        events_send: SendStream,
301        bulk_send: SendStream,
302    ) {
303        // Sort clients by their endpoints as an arbitrary consistent order across sessions
304        let idx = match self.clients.binary_search_by(|c| c.endpoint.cmp(&endpoint)) {
305            Ok(idx) => idx,
306            Err(idx) => idx,
307        };
308        self.clients.insert(
309            idx,
310            ClientInfo {
311                endpoint,
312                fingerprint: fingerprint.clone(),
313                events_send,
314                bulk_send,
315            },
316        );
317
318        info!(
319            "Added client {} @ {} to rotation: {}",
320            fingerprint,
321            endpoint,
322            self.clients
323                .iter()
324                .map(|c| c.endpoint.to_string())
325                .collect::<Vec<String>>()
326                .join(", ")
327        );
328
329        // If the new client has the same IP as the currently enabled client, it's probably a fast retry
330        // where we haven't removed the prior session yet. Mark the new client as enabled/current.
331        // If two clients were connected from the same IP then this will result in spurious switches,
332        // but that shouldn't be the case in practice.
333        if let Some(current_client) = &self.current_client {
334            // Only check IP: port is expected to change between sessions
335            if current_client.ip() == endpoint.ip() {
336                self.update_current_client(Some(endpoint)).await;
337            }
338        }
339
340        // If the new client has the same IP as a recently disconnected client that was enabled,
341        // it's probably a slow reconnect. Mark the new client as enabled/current.
342        if let Some(removed_current_client) = &self.removed_current_client {
343            // Only check IP: port is expected to change between sessions
344            let now = Instant::now();
345            if removed_current_client.recoverable(endpoint, &now) {
346                // Enable this client automatically since it was recently disconnected
347                // This automatically unsets self.removed_current_client
348                self.update_current_client(Some(endpoint)).await;
349            } else if removed_current_client.expired(&now) {
350                // Clean up expired client info
351                self.removed_current_client = None;
352            }
353        }
354
355        // Announce clipboard to client, if its IP doesn't match the clipboard owner's IP.
356        // Matching IP would indicate that the client is reconnecting but we haven't disconnected the old one yet.
357        if let Some(clipboard_target) = &self.clipboard_target {
358            if match clipboard_target.source {
359                // Client has clipboard. Make sure it's not the same client IP.
360                Some(clipboard_source) => clipboard_source.ip() != endpoint.ip(),
361                // Server has clipboard.
362                None => true,
363            } {
364                // Tell the new client about the current clipboard status.
365                let types_str = clipboard_target.types.join(" ");
366                let types_msg = event::ServerEvent::ClipboardTypes(event::ClipboardTypes {
367                    types: &types_str,
368                    max_size_bytes: clipboard_target.max_size_bytes,
369                });
370                if let Err(e) = self.send_event(&endpoint, types_msg).await {
371                    // This shouldn't happen in practice, given we just added the client...
372                    warn!("Newly added client already failed and was removed: {:?}", e);
373                }
374            }
375        }
376    }
377
378    async fn remove_client_and_clear_clipboard(&mut self, endpoint: SocketAddr) {
379        if self.handle_client_removal(&endpoint).await {
380            self.clipboard_clear().await;
381        }
382    }
383
384    /// Switches to the previous client (or to the server) in the arbitrary rotation.
385    pub async fn prev_client(&mut self) {
386        if let Some(current_client) = &self.current_client {
387            // Currently on remote machine, find its entry in the list and go to the prev one
388            let idx = match self
389                .clients
390                .binary_search_by(|c| c.endpoint.cmp(current_client))
391            {
392                Ok(idx) => idx,
393                Err(idx) => idx,
394            };
395            if idx == 0 {
396                // At start of vec or vec is empty - switch to local machine
397                self.update_current_client(None).await;
398            } else {
399                // Go to prev entry in vec
400                self.update_current_client(self.clients.get(idx - 1).map(|c| c.endpoint))
401                    .await;
402            }
403        } else {
404            // Currently on local machine, go to last entry on vec (if any)
405            self.update_current_client(self.clients.last().map(|c| c.endpoint))
406                .await;
407        }
408    }
409
410    /// Switches to the next client (or to the server) in the arbitrary rotation.
411    pub async fn next_client(&mut self) {
412        if let Some(current_client) = &self.current_client {
413            // Currently on remote machine, find its entry in the list and go to the next one
414            let idx = match self
415                .clients
416                .binary_search_by(|c| c.endpoint.cmp(current_client))
417            {
418                Ok(idx) => idx,
419                Err(idx) => idx,
420            };
421            // Go to next entry in vec, or fall back to local machine if vec is empty or we're off the end
422            self.update_current_client(self.clients.get(idx + 1).map(|c| c.endpoint))
423                .await;
424        } else {
425            // Currently on local machine, go to first entry on vec (if any)
426            self.update_current_client(self.clients.first().map(|c| c.endpoint))
427                .await;
428        }
429    }
430
431    /// Switches to the specified client by fingerprint, or to the server if the fingerprint is empty.
432    /// If a matching client isn't connected, does nothing.
433    pub async fn set_client(&mut self, fingerprint: String) {
434        if fingerprint.is_empty() {
435            // Empty fingerprint means "go to server"
436            self.update_current_client(None).await;
437        } else {
438            // Find the matching client, if any. Allow "abcd123" to match client with "abcd12345[...]"
439            let matching_clients: Vec<&ClientInfo> = self
440                .clients
441                .iter()
442                .filter(|c| c.fingerprint.starts_with(&fingerprint))
443                .collect();
444            match matching_clients.len() {
445                0 => {
446                    warn!(
447                        "Missing client with fingerprint {}, doing nothing",
448                        fingerprint
449                    );
450                }
451                1 => {
452                    let endpoint = matching_clients
453                        .first()
454                        .expect("matching_clients has len=1")
455                        .endpoint;
456                    self.update_current_client(Some(endpoint)).await;
457                }
458                _ => {
459                    warn!(
460                        "Multiple clients match fingerprint {}, doing nothing: {:?}",
461                        fingerprint, matching_clients
462                    );
463                }
464            }
465        }
466    }
467
468    /// Updates the tracked location for the current clipboard,
469    /// whether on the server host or on a remote client.
470    async fn clipboard_update_source(
471        &mut self,
472        source: Option<SocketAddr>,
473        types: Vec<String>,
474        // min of source_client_max (if any), and server_max:
475        max_size_bytes: u64,
476    ) -> Result<()> {
477        debug!("Announcing new clipboard source: source={:?} current={:?} with max_size_bytes={} has types={:?}", source, self.current_client, max_size_bytes, types);
478        // Save the clipboard types/source for future retrievals and client switches
479        self.clipboard_target = Some(ClipboardTarget {
480            source,
481            types,
482            max_size_bytes,
483        });
484
485        // Notify the active client (or server) about the clipboard info we just received.
486        // In practice we should be getting this shortly after a client switch.
487        self.update_current_client_clipboard().await?;
488
489        Ok(())
490    }
491
492    /// Routes a request for clipboard content to a remote client or a local application
493    async fn clipboard_request_content(
494        &mut self,
495        request_source: ClipboardRequestSource,
496        requested_type: &str,
497        max_size_bytes: u64,
498    ) -> Result<()> {
499        debug!("Handling clipboard content request from source={} with max_size_bytes={} for requested type {}: have {:?}", request_source, max_size_bytes, requested_type, self.clipboard_target);
500
501        let target = match &self.clipboard_target {
502            Some(c) => c,
503            None => {
504                bail!(
505                    "No clipboard types available: request from {} for requested type {}",
506                    request_source,
507                    requested_type
508                );
509            }
510        };
511        // Sanity check: Is the requested type among the list of supported types?
512        if !target.types.contains(&requested_type.to_string()) {
513            bail!(
514                "Requested clipboard type {} from source {} isn't among available types: {:?}",
515                requested_type,
516                request_source,
517                target.types
518            );
519        }
520
521        // Figure out where the requested clipboard can be found
522        if let Some(clipboard_source) = &target.source.clone() {
523            // A client has the clipboard: route request to them
524            let msg = bulk::ServerBulk::ClipboardRequest(bulk::ServerClipboardRequest {
525                requested_type,
526                max_size_bytes,
527                request_client: if let ClipboardRequestSource::Remote(client) = &request_source {
528                    Some(*client)
529                } else {
530                    None
531                },
532            });
533            debug!(
534                "Requesting clipboard data with type {} from {}{}",
535                requested_type,
536                clipboard_source,
537                if let ClipboardRequestSource::Remote(client) = &request_source {
538                    format!(" on behalf of {}", client)
539                } else {
540                    "".to_string()
541                }
542            );
543
544            // If the request is coming from the server itself, keep the oneshot for handling the reply.
545            match request_source {
546                ClipboardRequestSource::Local(waiting_clipboard_tx) => {
547                    // Clipboard request is from the server itself.
548                    // Keep the oneshot for replying later.
549                    if let Some(local_clipboard) = &mut self.local_clipboard {
550                        local_clipboard.waiting_clipboard_tx = Some(waiting_clipboard_tx);
551                    } else {
552                        bail!(
553                            "Got request for clipboard from server, but server clipboard is disabled"
554                        );
555                    }
556                    if !(self.send_bulk(clipboard_source, msg, None).await?) {
557                        warn!(
558                            "Unable to send request for clipboard to {}: not connected (clients: {:?})",
559                            clipboard_source,
560                            self.clients,
561                        );
562                    }
563                }
564                ClipboardRequestSource::Remote(client) => {
565                    // Clipboard request is from a client.
566                    // Route the request to the clipboard owner.
567                    if !(self.send_bulk(clipboard_source, msg, None).await?) {
568                        warn!(
569                            "Unable to send request for clipboard to {} on behalf of {}: not connected (clients: {:?})",
570                            clipboard_source,
571                            client,
572                            self.clients,
573                        );
574                    }
575                }
576            }
577            Ok(())
578        } else {
579            // The server has the clipboard: serve via X11 from local app
580            let request_client = if let ClipboardRequestSource::Remote(c) = &request_source {
581                c
582            } else {
583                // The nikau server process is getting asked for a clipboard from itself.
584                // The server should only locally serve clipboards from remote clients, but there isn't one.
585                // This may mean that the serving client disconnected, but we should have cleared the status.
586                bail!(
587                    "Server got local clipboard request against itself? current_clipboard={:?}",
588                    target
589                );
590            };
591            let local_clipboard = match &mut self.local_clipboard {
592                Some(c) => c,
593                None => bail!("Fetch for local server clipboard but server clipboard is disabled"),
594            };
595            // Read and send the clipboard content
596            let (content, data_type) = local_clipboard
597                .reader
598                .read(requested_type, max_size_bytes, &Some(*request_client))
599                .await?;
600            let msg = bulk::ServerBulk::ClipboardHeader(bulk::ServerClipboardHeader {
601                requested_type,
602                data_type: data_type.as_ref().map(|t| t.as_str()),
603                content_len_bytes: content.len() as u64,
604            });
605            if let Some(data_type) = &data_type {
606                debug!(
607                    "Sending clipboard data for requested type {} (data type {}) from server to {}",
608                    requested_type, data_type, request_client
609                );
610            } else {
611                debug!(
612                    "Sending clipboard data for requested type {} from server to {}",
613                    requested_type, request_client
614                );
615            }
616            if !(self.send_bulk(request_client, msg, Some(content)).await?) {
617                warn!(
618                    "Unable to send server clipboard data to {}: not connected (clients: {:?})",
619                    request_client, self.clients
620                );
621            }
622            Ok(())
623        }
624    }
625
626    /// Sends clipboard content in response to a prior request via clipboard_request_content.
627    async fn clipboard_send_content_from_client(
628        &mut self,
629        // The client sending the clipboard data
630        data_source: SocketAddr,
631        // Copied from the ServerClipboardRequest, indicates where the clipboard data should be sent
632        request_client: Option<SocketAddr>,
633        data: ClipboardData,
634    ) -> Result<()> {
635        debug!(
636            "Sending clipboard content of requested_type={} data_type={:?} with len={} from source={} to dest={:?}",
637            data.requested_type,
638            data.data_type,
639            data.bytes.len(),
640            data_source,
641            request_client
642        );
643        if let Some(request_client) = request_client {
644            // Send to specified remote client (assuming it's still available etc...)
645            let msg = bulk::ServerBulk::ClipboardHeader(bulk::ServerClipboardHeader {
646                requested_type: &data.requested_type,
647                data_type: data.data_type.as_ref().map(|t| t.as_str()),
648                content_len_bytes: data.bytes.len() as u64,
649            });
650            // If send_bulk returns Ok(false), the client wasn't found. In that case just ignore the request,
651            // don't try to reset state since the client should already be removed.
652            if !(self
653                .send_bulk(&request_client, msg, Some(data.bytes))
654                .await?)
655            {
656                warn!("Unable to send clipboard data received from {} to {}: not connected (clients: {:?})",
657                      data_source, request_client, self.clients);
658            }
659            Ok(())
660        } else if let Some(local_clipboard) = &mut self.local_clipboard {
661            // Send to local X11 clipboard, using response oneshot that we'd gotten with the request.
662            if let Some(waiting_clipboard_tx) = local_clipboard.waiting_clipboard_tx.take() {
663                if let Err(_d_again) = waiting_clipboard_tx.send(data) {
664                    warn!("Discarding clipboard data from client: no pending clipboard request (previous request timed out?)");
665                }
666                Ok(())
667            } else {
668                warn!(
669                    "Ignoring unexpected clipboard data from client: no clipboard fetch is pending"
670                );
671                Ok(())
672            }
673        } else {
674            warn!(
675                "Ignoring unexpected clipboard data from client: clipboard is disabled at server"
676            );
677            Ok(())
678        }
679    }
680
681    /// Updates internal state to route future events to the new client (or to the server).
682    /// Goes through the steps of notifying the new client that it's active (if new_client is Some),
683    /// then notifying any old client that it's inactive (if old_client is Some).
684    async fn update_current_client(&mut self, new_client: Option<SocketAddr>) {
685        // Either we automatically reenabled a client, or the user manually did.
686        // In either case, clear up any history of previously enabled disconnected clients.
687        self.removed_current_client = None;
688
689        // Check if the client is already assigned, treat as a no-op if so
690        match (&new_client, &self.current_client) {
691            (Some(new_client), Some(current_client)) => {
692                if new_client == current_client {
693                    debug!("Already switched to client: {}", current_client);
694                    return;
695                }
696            }
697            (None, None) => {
698                debug!("Already switched to local machine");
699                return;
700            }
701            (_, _) => {}
702        }
703
704        // Save the old client for sending enabled=false below
705        let old_client = self.current_client;
706
707        self.set_and_grab_current_client(new_client).await;
708
709        if let Some(new_client) = new_client {
710            // Try to send switch{true} to the newly assigned current_client.
711            // If it fails then current_client is cleaned up.
712            if let Ok(()) = self
713                .send_event_to_remote_client(event::ServerEvent::Switch(event::SwitchEvent {
714                    enabled: true,
715                }))
716                .await
717            {
718                info!(
719                    "Switched to client: {} (clients: {})",
720                    new_client,
721                    self.clients
722                        .iter()
723                        .map(|c| c.endpoint.to_string())
724                        .collect::<Vec<String>>()
725                        .join(", ")
726                );
727            }
728        } else {
729            info!(
730                "Switched to local machine (clients: {})",
731                self.clients
732                    .iter()
733                    .map(|c| c.endpoint.to_string())
734                    .collect::<Vec<String>>()
735                    .join(", ")
736            );
737        }
738
739        // Notify the new client (or server) about any current clipboard info, or a noop if it fails.
740        // This may be overridden if the old client sends a clipboard update following the switch,
741        // or it won't, if the old client doesn't have a clipboard update to send.
742        // log_info=false: Avoid duplicate info-level logging of clipboard types, between the server
743        // switch and then (potentially) an update from the client that's being deactivated.
744        if let Err(e) = self.update_current_client_clipboard().await {
745            warn!(
746                "Failed to send clipboard update to active client/server: {:?}",
747                e
748            );
749        }
750
751        // AFTER setting up the new client, lets send enabled=false to the old client.
752        // This avoids a potential race between the above clipboard update for current data
753        // vs the old client sending a new clipboard update when it's marked inactive.
754        if let Some(old_client) = old_client {
755            // Try to send switch{false} to last current_client.
756            // If it fails then the client is cleaned up.
757            let _ = self
758                .send_event(
759                    &old_client,
760                    event::ServerEvent::Switch(event::SwitchEvent { enabled: false }),
761                )
762                .await;
763        }
764    }
765
766    /// Updates and announces the current clipboard source for handling any future paste requests.
767    /// In practice this occurs when a client broadcasts its clipboard shortly after being told its no longer active.
768    async fn update_current_client_clipboard(&mut self) -> Result<()> {
769        let c = match &self.clipboard_target {
770            Some(c) => c,
771            // No clipboard to announce
772            None => return Ok(()),
773        };
774
775        if let Some(clipboard_source) = &c.source {
776            // The clipboard is from a client.
777            if let Some(current_client) = self.current_client {
778                // A remote client is active. Tell it about the clipboard, if it isn't the source of the clipboard.
779                if current_client != *clipboard_source {
780                    let types_str = c.types.join(" ");
781                    let types_msg = event::ServerEvent::ClipboardTypes(event::ClipboardTypes {
782                        types: &types_str,
783                        max_size_bytes: c.max_size_bytes,
784                    });
785                    debug!(
786                        "Sending clipboard types for {} to {}: {}",
787                        clipboard_source, current_client, types_str
788                    );
789                    self.send_event_to_remote_client(types_msg).await?;
790                }
791            } else if let Some(local_clipboard) = &mut self.local_clipboard {
792                // The server is active and its clipboard support is enabled.
793                // Tell it about the client clipbard.
794                debug!(
795                    "Storing clipboard types for {} on server: {}",
796                    clipboard_source,
797                    c.types.join(" ")
798                );
799                local_clipboard.writer.store_types(c.types.clone())?;
800            } else {
801                debug!("Ignoring clipboard types sent by client: Server clipboard is disabled");
802            }
803        } else {
804            // The clipboard is from the server.
805            if let Some(current_client) = self.current_client {
806                // A remote client is active. Tell it about the clipboard.
807                let types_str = c.types.join(" ");
808                let types_msg = event::ServerEvent::ClipboardTypes(event::ClipboardTypes {
809                    types: &types_str,
810                    max_size_bytes: c.max_size_bytes,
811                });
812                debug!(
813                    "Sending clipboard types for server to {}: {}",
814                    current_client, types_str
815                );
816                self.send_event_to_remote_client(types_msg).await?;
817            }
818        }
819        Ok(())
820    }
821
822    /// Sends an event to all connected clients, removing any where sending fails.
823    /// If this returns true, then clipboard_clear() should also be called.
824    async fn send_event_all<F>(&mut self, msg: event::ServerEvent<'_>, test_fn: F) -> Result<bool>
825    where
826        F: Fn(&ClientInfo) -> bool,
827    {
828        let mut clients_to_remove = vec![];
829        for client in self.clients.iter_mut() {
830            if test_fn(client) {
831                if let Err(e) =
832                    send_message_to_client(&mut client.events_send, &msg, &mut self.buf).await
833                {
834                    clients_to_remove.push(client.endpoint);
835                    return Err(e);
836                }
837            }
838        }
839        // Reverse: Avoid issues with idx moving as entries are removed
840        clients_to_remove.reverse();
841        let mut should_clear_clipboard = false;
842        for endpoint in clients_to_remove {
843            if self.handle_client_removal(&endpoint).await {
844                should_clear_clipboard = true;
845            }
846        }
847        Ok(should_clear_clipboard)
848    }
849
850    /// Sends an event to the currently active client, removing it if sending fails.
851    /// If no client is active, this does nothing.
852    async fn send_event_to_remote_client(&mut self, msg: event::ServerEvent<'_>) -> Result<()> {
853        let current_client = match self.current_client {
854            Some(client) => client,
855            None => {
856                // On local machine, nothing to do
857                return Ok(());
858            }
859        };
860        if !(self.send_event(&current_client, msg).await?) {
861            // Active client not found?
862            // Shouldn't happen, but recover by switching to local machine and ungrabbing.
863            // Otherwise we're leaving the server stuck in a grabbed state.
864            self.set_and_grab_current_client(None).await;
865        }
866        Ok(())
867    }
868
869    /// Handles an input event collected from the server.
870    pub async fn send_input_events(&mut self, batch: device::InputBatch) -> Result<()> {
871        if let Some(_) = self.current_client {
872            // Remote client is active, send all input to client and not to local machine.
873            self.send_event_to_remote_client(event::ServerEvent::Input(batch.events))
874                .await
875        } else if batch.is_grabbed {
876            // Local machine is active and device is grabbed, write input to local virtual devices.
877            // For example, we grab keyboards so that we can skip sending switch combos to the local system.
878            self.output_handler.write(batch.events).await
879        } else {
880            // Local machine is active and device isn't grabbed (passthrough), drop input event.
881            // For example, we don't grab mice/touchpads since they aren't relevant to switch combos.
882            // If we send their input to the handler, the input is duplicated between the passthrough
883            // and the virtual device.
884            Ok(())
885        }
886    }
887
888    /// Sends an event to the specified client, removing it if sending fails.
889    /// If the client isn't found, returns Ok(false)
890    /// If sending the message fails, removes the client and returns Err
891    async fn send_event(
892        &mut self,
893        endpoint: &SocketAddr,
894        msg: event::ServerEvent<'_>,
895    ) -> Result<bool> {
896        match self.clients.binary_search_by(|c| c.endpoint.cmp(endpoint)) {
897            Ok(idx) => {
898                let events_send = &mut self
899                    .clients
900                    .get_mut(idx)
901                    .expect("missing current_client")
902                    .events_send;
903                if let Err(e) = send_message_to_client(events_send, &msg, &mut self.buf).await {
904                    if self.handle_client_removal(endpoint).await {
905                        self.clipboard_clear().await;
906                    }
907                    Err(e)
908                } else {
909                    Ok(true)
910                }
911            }
912            Err(_idx) => {
913                warn!(
914                    "Event client {} not found in clients map: {:?}",
915                    endpoint, self.clients
916                );
917                Ok(false)
918            }
919        }
920    }
921
922    async fn send_bulk(
923        &mut self,
924        endpoint: &SocketAddr,
925        msg: bulk::ServerBulk<'_>,
926        payload: Option<Vec<u8>>,
927    ) -> Result<bool> {
928        match self.clients.binary_search_by(|c| c.endpoint.cmp(endpoint)) {
929            Ok(idx) => {
930                let bulk_send = &mut self
931                    .clients
932                    .get_mut(idx)
933                    .expect("missing current_client")
934                    .bulk_send;
935                // Try sending the message, then the payload. Stop on the first failure, to handle below.
936                if let Err(e) = send_message_to_client(bulk_send, &msg, &mut self.buf).await {
937                    if self.handle_client_removal(endpoint).await {
938                        self.clipboard_clear().await;
939                    }
940                    return Err(e);
941                }
942                if let Some(payload) = payload {
943                    trace!("Sending {} byte payload", payload.len());
944                    if let Err(e) = bulk_send.write_all(&payload).await {
945                        if self.handle_client_removal(endpoint).await {
946                            self.clipboard_clear().await;
947                        }
948                        return Err(e.into());
949                    }
950                }
951                Ok(true)
952            }
953            Err(_idx) => {
954                warn!(
955                    "Bulk client {} not found in clients map: {:?}",
956                    endpoint, self.clients
957                );
958                Ok(false)
959            }
960        }
961    }
962
963    /// Removes the client and switches to the server if it was the active client.
964    /// If this returns true, then clipboard_clear() should also be called.
965    async fn handle_client_removal(&mut self, endpoint: &SocketAddr) -> bool {
966        // Always refetch the idx to avoid issues if there was an await in which the client was
967        // removed behind our back.
968        match self.clients.binary_search_by(|c| c.endpoint.cmp(&endpoint)) {
969            Ok(idx) => {
970                self.clients.remove(idx);
971            }
972            Err(_e) => {
973                // Noop. Can happen if we're cleaning up for a client that wasn't added yet.
974                debug!("Client to remove not found in rotation: {}", endpoint);
975                return false;
976            }
977        }
978        let client_list = self
979            .clients
980            .iter()
981            .map(|c| c.endpoint.to_string())
982            .collect::<Vec<String>>()
983            .join(", ");
984
985        let mut should_clear_clipboard = false;
986        if let Some(clipboard_info) = &self.clipboard_target {
987            if let Some(clipboard_source) = &clipboard_info.source {
988                if clipboard_source == endpoint {
989                    // The removed client owned the clipboard. Remove the clipboard.
990                    should_clear_clipboard = true;
991                }
992            }
993        }
994
995        if let Some(current_client) = self.current_client {
996            if current_client == *endpoint {
997                // This is the active client. Remove it AND switch to local machine.
998                info!(
999                    "Removing client {} from rotation and switching to local machine (clients: {})",
1000                    endpoint,
1001                    if client_list.is_empty() {
1002                        "none".to_string()
1003                    } else {
1004                        client_list
1005                    }
1006                );
1007
1008                // Current client is being removed. If it comes back soon, we can mark it current again.
1009                self.removed_current_client = Some(DefunctClientInfo {
1010                    endpoint: current_client,
1011                    removed_at: Instant::now(),
1012                });
1013
1014                self.set_and_grab_current_client(None).await;
1015                return should_clear_clipboard;
1016            }
1017        }
1018
1019        info!(
1020            "Removing client {} from client rotation: {}",
1021            endpoint,
1022            if client_list.is_empty() {
1023                "empty".to_string()
1024            } else {
1025                client_list
1026            }
1027        );
1028        should_clear_clipboard
1029    }
1030
1031    async fn set_and_grab_current_client(&mut self, client: Option<SocketAddr>) {
1032        self.current_client = client;
1033        let grab = if client.is_some() {
1034            device::GrabEvent::Grab
1035        } else {
1036            device::GrabEvent::Ungrab
1037        };
1038        if let Err(e) = self.grab_tx.send(grab) {
1039            // Avoid leaving devices in a bad grabbed state
1040            panic!(
1041                "Failed to update device grab, exiting server to avoid bad grab state: {}",
1042                e
1043            );
1044        }
1045    }
1046
1047    /// Ensures that all clients and the server have their clipboard state cleared.
1048    /// To be called when handle_client_removal() returns true, when a client holding the clipboard has disconnected.
1049    /// Broken into a separate function to avoid recursive async calls.
1050    async fn clipboard_clear(&mut self) {
1051        debug!("Clearing clipboard on server and all clients");
1052        self.clipboard_target = None;
1053
1054        // Clear the server's host clipboard status
1055        if let Some(c) = &mut self.local_clipboard {
1056            if let Err(e) = c.writer.store_types(vec![]) {
1057                // Keep going with the clients...
1058                warn!("Failed to clear server clipboard: {}", e);
1059            }
1060        }
1061
1062        // Clear all clients' host clipboard statuses (the client was already removed)
1063        let types_msg = event::ServerEvent::ClipboardTypes(event::ClipboardTypes {
1064            types: "",
1065            // Size shouldn't matter for clearing clipboard...
1066            max_size_bytes: 0,
1067        });
1068        // Treat this as best-effort to tidy up the clients, they should reset locally when disconnected.
1069        if let Err(e) = self
1070            .send_event_all(types_msg, |_client: &ClientInfo| true)
1071            .await
1072        {
1073            warn!("Failed to clear clipboard on all clients: {}", e);
1074        }
1075    }
1076}
1077
1078async fn send_message_to_client<T>(
1079    send: &mut quinn::SendStream,
1080    msg: &T,
1081    buf: &mut Vec<u8>,
1082) -> Result<()>
1083where
1084    T: Serialize + ?Sized,
1085{
1086    // Serialize message data: postcard with cobs encoding for event framing
1087    let buf_len = buf.len();
1088    let serializedmsg = postcard::to_slice_cobs(&msg, buf).map_err(|e| {
1089        anyhow!(
1090            "Failed to serialize message into buf.len={}: {:?}",
1091            buf_len,
1092            e
1093        )
1094    })?;
1095    trace!(
1096        "Sending {} byte serialized message: {:X?}",
1097        serializedmsg.len(),
1098        &serializedmsg
1099    );
1100    send.write_all(serializedmsg)
1101        .await
1102        .context("Failed to send serialized message")
1103}