rmux-server 0.1.1

Tokio daemon and request dispatcher for the RMUX terminal multiplexer.
Documentation
use std::collections::HashMap;
use std::io;
use std::process::Stdio;

use rmux_core::events::OutputCursorItem;
use rmux_core::PaneId;
use rmux_proto::{RmuxError, SessionName};
use rmux_pty::PtyMaster;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use tokio::sync::watch;

use crate::pane_io::{PaneOutputReceiver, PaneOutputSender};
use crate::terminal::TerminalProfile;

#[derive(Default)]
pub(crate) struct PanePipeStore {
    sessions: HashMap<SessionName, HashMap<PaneId, ActivePanePipe>>,
}

impl std::fmt::Debug for PanePipeStore {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("PanePipeStore")
            .field("sessions", &self.sessions.keys().collect::<Vec<_>>())
            .finish_non_exhaustive()
    }
}

impl PanePipeStore {
    pub(crate) fn contains(&self, session_name: &SessionName, pane_id: PaneId) -> bool {
        self.sessions
            .get(session_name)
            .is_some_and(|panes| panes.contains_key(&pane_id))
    }

    pub(crate) fn insert(
        &mut self,
        session_name: &SessionName,
        pane_id: PaneId,
        pipe: ActivePanePipe,
    ) -> Option<ActivePanePipe> {
        self.sessions
            .entry(session_name.clone())
            .or_default()
            .insert(pane_id, pipe)
    }

    pub(crate) fn remove(
        &mut self,
        session_name: &SessionName,
        pane_id: PaneId,
    ) -> Option<ActivePanePipe> {
        self.sessions
            .get_mut(session_name)
            .and_then(|panes| panes.remove(&pane_id))
    }

    pub(crate) fn remove_session(
        &mut self,
        session_name: &SessionName,
    ) -> HashMap<PaneId, ActivePanePipe> {
        self.sessions.remove(session_name).unwrap_or_default()
    }

    pub(crate) fn rename_session(
        &mut self,
        session_name: &SessionName,
        new_name: &SessionName,
    ) -> Result<(), RmuxError> {
        if !self.sessions.contains_key(session_name) {
            return Ok(());
        }
        if self.sessions.contains_key(new_name) {
            return Err(RmuxError::Server(format!(
                "pane pipes already exist for session {new_name}"
            )));
        }

        let mut sessions = std::mem::take(&mut self.sessions);
        let panes = sessions
            .remove(session_name)
            .expect("prevalidated pane pipes must exist");
        let replaced = sessions.insert(new_name.clone(), panes);
        debug_assert!(replaced.is_none());
        self.sessions = sessions;
        Ok(())
    }

    pub(crate) fn move_between_sessions(
        &mut self,
        source_session: &SessionName,
        destination_session: &SessionName,
        pane_ids: &[PaneId],
    ) -> Result<(), RmuxError> {
        if source_session == destination_session || pane_ids.is_empty() {
            return Ok(());
        }

        let removed = self.remove_selected(source_session, pane_ids);
        if let Err(error) =
            self.ensure_destination_accepts(destination_session, removed.keys().copied())
        {
            self.sessions
                .entry(source_session.clone())
                .or_default()
                .extend(removed);
            return Err(error);
        }
        self.sessions
            .entry(destination_session.clone())
            .or_default()
            .extend(removed);
        Ok(())
    }

    pub(crate) fn swap_between_sessions(
        &mut self,
        source_session: &SessionName,
        source_pane_ids: &[PaneId],
        destination_session: &SessionName,
        destination_pane_ids: &[PaneId],
    ) -> Result<(), RmuxError> {
        if source_session == destination_session {
            return Ok(());
        }

        let removed_source = self.remove_selected(source_session, source_pane_ids);
        let removed_destination = self.remove_selected(destination_session, destination_pane_ids);

        if let Err(error) =
            self.ensure_destination_accepts(source_session, removed_destination.keys().copied())
        {
            self.sessions
                .entry(source_session.clone())
                .or_default()
                .extend(removed_source);
            self.sessions
                .entry(destination_session.clone())
                .or_default()
                .extend(removed_destination);
            return Err(error);
        }
        if let Err(error) =
            self.ensure_destination_accepts(destination_session, removed_source.keys().copied())
        {
            self.sessions
                .entry(source_session.clone())
                .or_default()
                .extend(removed_source);
            self.sessions
                .entry(destination_session.clone())
                .or_default()
                .extend(removed_destination);
            return Err(error);
        }

        self.sessions
            .entry(source_session.clone())
            .or_default()
            .extend(removed_destination);
        self.sessions
            .entry(destination_session.clone())
            .or_default()
            .extend(removed_source);
        Ok(())
    }

    fn remove_selected(
        &mut self,
        session_name: &SessionName,
        pane_ids: &[PaneId],
    ) -> HashMap<PaneId, ActivePanePipe> {
        let session = self.sessions.entry(session_name.clone()).or_default();
        let mut removed = HashMap::new();
        for pane_id in pane_ids {
            if let Some(pipe) = session.remove(pane_id) {
                removed.insert(*pane_id, pipe);
            }
        }
        removed
    }

    fn ensure_destination_accepts<I>(
        &self,
        session_name: &SessionName,
        pane_ids: I,
    ) -> Result<(), RmuxError>
    where
        I: IntoIterator<Item = PaneId>,
    {
        let session = self.sessions.get(session_name);
        for pane_id in pane_ids {
            if session.is_some_and(|pipes| pipes.contains_key(&pane_id)) {
                return Err(RmuxError::Server(format!(
                    "pane pipe already exists for pane id {} in session {}",
                    pane_id.as_u32(),
                    session_name
                )));
            }
        }
        Ok(())
    }
}

pub(crate) struct ActivePanePipe {
    stop_tx: watch::Sender<bool>,
}

impl std::fmt::Debug for ActivePanePipe {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ActivePanePipe").finish_non_exhaustive()
    }
}

impl ActivePanePipe {
    pub(crate) fn spawn(
        profile: &TerminalProfile,
        pane_output: PaneOutputSender,
        pane_master: PtyMaster,
        command: &str,
        read_from_pipe: bool,
        write_to_pipe: bool,
    ) -> Result<Self, RmuxError> {
        let mut child = profile.shell_command(command);
        child.current_dir(profile.cwd());
        child.env_clear();
        child.kill_on_drop(true);
        child.stdin(if write_to_pipe {
            Stdio::piped()
        } else {
            Stdio::null()
        });
        child.stdout(if read_from_pipe {
            Stdio::piped()
        } else {
            Stdio::null()
        });
        child.stderr(if read_from_pipe {
            Stdio::piped()
        } else {
            Stdio::null()
        });
        for (name, value) in profile.environment() {
            child.env(name, value);
        }

        let mut child = child.spawn().map_err(|error| {
            RmuxError::Server(format!("failed to spawn pipe-pane command: {error}"))
        })?;
        let stdin = child.stdin.take();
        let stdout = child.stdout.take();
        let stderr = child.stderr.take();
        let (stop_tx, stop_rx) = watch::channel(false);
        let stderr_master = stderr.as_ref().and_then(|_| pane_master.try_clone().ok());

        tokio::spawn(async move {
            let pane_output_task = stdin.map(|stdin| {
                tokio::spawn(forward_pane_output_to_pipe(
                    stop_rx.clone(),
                    pane_output.subscribe(),
                    stdin,
                ))
            });
            let pipe_stdout_task = stdout.map(|stdout| {
                tokio::spawn(forward_pipe_output_to_pane(
                    stop_rx.clone(),
                    stdout,
                    pane_master,
                ))
            });
            let pipe_stderr_task = stderr.zip(stderr_master).map(|(stderr, pane_master)| {
                tokio::spawn(forward_pipe_output_to_pane(
                    stop_rx.clone(),
                    stderr,
                    pane_master,
                ))
            });
            let mut stop_wait = stop_rx.clone();
            tokio::select! {
                _ = wait_for_pipe_stop(&mut stop_wait) => {
                    let _ = child.start_kill();
                    let _ = child.wait().await;
                }
                _ = child.wait() => {}
            }

            for task in [pane_output_task, pipe_stdout_task, pipe_stderr_task]
                .into_iter()
                .flatten()
            {
                task.abort();
                let _ = task.await;
            }
        });

        Ok(Self { stop_tx })
    }

    pub(crate) fn stop(self) {
        let _ = self.stop_tx.send(true);
    }
}

async fn wait_for_pipe_stop(stop_rx: &mut watch::Receiver<bool>) {
    while !*stop_rx.borrow() {
        if stop_rx.changed().await.is_err() {
            break;
        }
    }
}

async fn forward_pane_output_to_pipe(
    mut stop_rx: watch::Receiver<bool>,
    mut pane_output: PaneOutputReceiver,
    mut stdin: tokio::process::ChildStdin,
) {
    loop {
        tokio::select! {
            _ = wait_for_pipe_stop(&mut stop_rx) => break,
            next = pane_output.recv() => {
                match next {
                    OutputCursorItem::Event(event) => {
                        let bytes = event.into_bytes();
                        if bytes.is_empty() {
                            break;
                        }
                        if stdin.write_all(&bytes).await.is_err() {
                            break;
                        }
                    }
                    OutputCursorItem::Gap(_) => continue,
                }
            }
        }
    }
    let _ = stdin.shutdown().await;
}

async fn forward_pipe_output_to_pane<R>(
    mut stop_rx: watch::Receiver<bool>,
    mut reader: R,
    pane_master: PtyMaster,
) where
    R: AsyncRead + Unpin,
{
    let mut buffer = [0_u8; 8192];
    loop {
        tokio::select! {
            _ = wait_for_pipe_stop(&mut stop_rx) => break,
            read = reader.read(&mut buffer) => {
                match read {
                    Ok(0) | Err(_) => break,
                    Ok(size) => {
                        if write_pipe_output_to_pane(&pane_master, buffer[..size].to_vec())
                            .await
                            .is_err()
                        {
                            break;
                        }
                    }
                }
            }
        }
    }
}

#[cfg(windows)]
async fn write_pipe_output_to_pane(pane_master: &PtyMaster, bytes: Vec<u8>) -> io::Result<()> {
    let pane_master = pane_master.try_clone().map_err(io::Error::other)?;
    tokio::task::spawn_blocking(move || pane_master.write_all(&bytes))
        .await
        .map_err(|error| io::Error::other(format!("pipe-pane write task failed: {error}")))?
}

#[cfg(not(windows))]
async fn write_pipe_output_to_pane(pane_master: &PtyMaster, bytes: Vec<u8>) -> io::Result<()> {
    pane_master.write_all(&bytes)
}