#![cfg_attr(feature = "fail-on-warnings", deny(warnings))]
#![warn(clippy::all, clippy::pedantic, clippy::nursery, clippy::cargo)]
#![allow(clippy::multiple_crate_versions)]
#![allow(clippy::cargo_common_metadata)]
mod persistence;
use anyhow::{Context, Result};
use bmux_config::{BmuxConfig, ConfigPaths};
use bmux_ipc::transport::{IpcTransportError, LocalIpcListener, LocalIpcStream};
use bmux_ipc::{
AttachGrant, CURRENT_PROTOCOL_VERSION, ClientSummary, Envelope, EnvelopeKind, ErrorCode,
ErrorResponse, Event, IpcEndpoint, ProtocolVersion, Request, Response, ResponsePayload,
ServerSnapshotStatus, SessionPermissionSummary, SessionRole, SessionSelector, SessionSummary,
WindowSelector, WindowSummary, decode, encode,
};
use bmux_session::{ClientId, Session, SessionId, SessionManager, WindowId};
use persistence::{
ClientSelectedSessionSnapshotV1, FollowEdgeSnapshotV1, RoleAssignmentSnapshotV1,
SessionSnapshotV1, SnapshotManager, SnapshotV1, WindowSnapshotV1,
};
use portable_pty::{CommandBuilder, PtySize, native_pty_system};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::io::{Read, Write};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::{Mutex as AsyncMutex, mpsc, oneshot, watch};
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};
use uuid::Uuid;
const DEFAULT_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(5);
const ATTACH_TOKEN_TTL: Duration = Duration::from_secs(10);
const MAX_WINDOW_OUTPUT_BUFFER_BYTES: usize = 1_048_576;
const SNAPSHOT_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(300);
#[derive(Clone)]
pub struct BmuxServer {
endpoint: IpcEndpoint,
state: Arc<ServerState>,
shutdown_tx: watch::Sender<bool>,
}
struct ServerState {
session_manager: Mutex<SessionManager>,
session_runtimes: Mutex<SessionRuntimeManager>,
attach_tokens: Mutex<AttachTokenManager>,
follow_state: Mutex<FollowState>,
permission_state: Mutex<PermissionState>,
snapshot_runtime: Mutex<SnapshotRuntime>,
operation_lock: AsyncMutex<()>,
event_hub: Mutex<EventHub>,
handshake_timeout: Duration,
}
#[derive(Debug)]
struct SnapshotRuntime {
manager: Option<SnapshotManager>,
dirty: bool,
last_marked_at: Option<Instant>,
debounce_interval: Duration,
last_write_epoch_ms: Option<u64>,
last_restore_epoch_ms: Option<u64>,
last_restore_error: Option<String>,
}
#[derive(Debug, Clone, Copy, Default)]
struct RestoreSummary {
sessions: usize,
windows: usize,
roles: usize,
follows: usize,
selected_sessions: usize,
}
impl SnapshotRuntime {
fn disabled() -> Self {
Self {
manager: None,
dirty: false,
last_marked_at: None,
debounce_interval: SNAPSHOT_DEBOUNCE_INTERVAL,
last_write_epoch_ms: None,
last_restore_epoch_ms: None,
last_restore_error: None,
}
}
fn with_manager(manager: SnapshotManager) -> Self {
Self {
manager: Some(manager),
dirty: false,
last_marked_at: None,
debounce_interval: SNAPSHOT_DEBOUNCE_INTERVAL,
last_write_epoch_ms: None,
last_restore_epoch_ms: None,
last_restore_error: None,
}
}
}
#[derive(Debug)]
struct EventHub {
events: Vec<EventRecord>,
subscribers: BTreeMap<ClientId, usize>,
max_events: usize,
}
#[derive(Debug, Clone)]
struct EventRecord {
event: Event,
}
impl EventHub {
fn new(max_events: usize) -> Self {
Self {
events: Vec::new(),
subscribers: BTreeMap::new(),
max_events,
}
}
fn emit(&mut self, event: Event) {
self.events.push(EventRecord { event });
if self.events.len() > self.max_events {
let dropped = self.events.len() - self.max_events;
self.events.drain(..dropped);
for cursor in self.subscribers.values_mut() {
*cursor = cursor.saturating_sub(dropped);
}
}
}
fn subscribe(&mut self, client_id: ClientId) {
let start = self.events.len().saturating_sub(32);
self.subscribers.insert(client_id, start);
}
fn unsubscribe(&mut self, client_id: ClientId) {
self.subscribers.remove(&client_id);
}
fn poll(&mut self, client_id: ClientId, max_events: usize) -> Option<Vec<Event>> {
let cursor = self.subscribers.get_mut(&client_id)?;
let start = *cursor;
let count = max_events.max(1);
let events = self
.events
.iter()
.skip(start)
.take(count)
.map(|record| record.event.clone())
.collect::<Vec<_>>();
*cursor = start + events.len();
Some(events)
}
}
#[derive(Debug)]
struct AttachTokenManager {
ttl: Duration,
tokens: BTreeMap<Uuid, AttachTokenEntry>,
}
#[derive(Debug, Clone, Copy)]
struct AttachTokenEntry {
session_id: SessionId,
expires_at: Instant,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum AttachTokenValidationError {
NotFound,
Expired,
SessionMismatch,
}
impl AttachTokenManager {
fn new(ttl: Duration) -> Self {
Self {
ttl,
tokens: BTreeMap::new(),
}
}
fn issue(&mut self, session_id: SessionId) -> AttachGrant {
self.prune_expired();
let attach_token = Uuid::new_v4();
let expires_at = Instant::now() + self.ttl;
let expires_at_epoch_ms = epoch_millis_now().saturating_add(self.ttl.as_millis() as u64);
self.tokens.insert(
attach_token,
AttachTokenEntry {
session_id,
expires_at,
},
);
AttachGrant {
session_id: session_id.0,
attach_token,
expires_at_epoch_ms,
}
}
fn consume(
&mut self,
session_id: SessionId,
attach_token: Uuid,
) -> std::result::Result<(), AttachTokenValidationError> {
let Some(entry) = self.tokens.get(&attach_token).copied() else {
return Err(AttachTokenValidationError::NotFound);
};
if entry.expires_at <= Instant::now() {
self.tokens.remove(&attach_token);
return Err(AttachTokenValidationError::Expired);
}
if entry.session_id != session_id {
return Err(AttachTokenValidationError::SessionMismatch);
}
self.tokens.remove(&attach_token);
Ok(())
}
fn remove_for_session(&mut self, session_id: SessionId) {
self.tokens
.retain(|_, entry| entry.session_id != session_id);
}
fn clear(&mut self) {
self.tokens.clear();
}
fn prune_expired(&mut self) {
let now = Instant::now();
self.tokens.retain(|_, entry| entry.expires_at > now);
}
}
fn epoch_millis_now() -> u64 {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
now.as_millis() as u64
}
#[derive(Debug, Clone, Copy)]
struct FollowEntry {
leader_client_id: ClientId,
global: bool,
}
#[derive(Debug, Clone, Copy)]
struct FollowTargetUpdate {
follower_client_id: ClientId,
leader_client_id: ClientId,
session_id: SessionId,
}
#[derive(Debug, Default)]
struct FollowState {
connected_clients: std::collections::BTreeSet<ClientId>,
selected_sessions: BTreeMap<ClientId, Option<SessionId>>,
follows: BTreeMap<ClientId, FollowEntry>,
}
#[derive(Debug, Default)]
struct PermissionState {
roles: BTreeMap<SessionId, BTreeMap<ClientId, SessionRole>>,
}
impl PermissionState {
fn ensure_owner(&mut self, session_id: SessionId, owner_client_id: ClientId) {
let session_roles = self.roles.entry(session_id).or_default();
session_roles.insert(owner_client_id, SessionRole::Owner);
}
fn role_for(&self, session_id: SessionId, client_id: ClientId) -> SessionRole {
self.roles
.get(&session_id)
.and_then(|session_roles| session_roles.get(&client_id).copied())
.unwrap_or(SessionRole::Observer)
}
fn set_role(&mut self, session_id: SessionId, client_id: ClientId, role: SessionRole) {
let session_roles = self.roles.entry(session_id).or_default();
session_roles.insert(client_id, role);
}
fn clear_to_observer(&mut self, session_id: SessionId, client_id: ClientId) {
if let Some(session_roles) = self.roles.get_mut(&session_id) {
session_roles.remove(&client_id);
}
}
fn remove_session(&mut self, session_id: SessionId) {
self.roles.remove(&session_id);
}
fn is_owner(&self, session_id: SessionId, client_id: ClientId) -> bool {
self.role_for(session_id, client_id) == SessionRole::Owner
}
fn list_permissions(&self, session_id: SessionId) -> Vec<SessionPermissionSummary> {
self.roles
.get(&session_id)
.map(|session_roles| {
session_roles
.iter()
.map(|(client_id, role)| SessionPermissionSummary {
client_id: client_id.0,
role: *role,
})
.collect::<Vec<_>>()
})
.unwrap_or_default()
}
fn rebalance_after_disconnect(
&mut self,
disconnected_client_id: ClientId,
connected_clients: &BTreeSet<ClientId>,
session_clients: &BTreeMap<SessionId, BTreeSet<ClientId>>,
) -> Vec<Event> {
let mut events = Vec::new();
for (session_id, roles) in &mut self.roles {
let disconnected_role = roles.get(&disconnected_client_id).copied();
if disconnected_role == Some(SessionRole::Owner) {
let writer_candidate = roles
.iter()
.filter_map(|(client_id, role)| {
(*client_id != disconnected_client_id
&& *role == SessionRole::Writer
&& connected_clients.contains(client_id))
.then_some(*client_id)
})
.next();
let observer_candidate = roles
.iter()
.filter_map(|(client_id, role)| {
(*client_id != disconnected_client_id
&& *role == SessionRole::Observer
&& connected_clients.contains(client_id))
.then_some(*client_id)
})
.next();
let implicit_observer_candidate =
session_clients.get(session_id).and_then(|clients| {
clients
.iter()
.find(|client_id| {
**client_id != disconnected_client_id
&& connected_clients.contains(client_id)
})
.copied()
});
if let Some(next_owner) = writer_candidate
.or(observer_candidate)
.or(implicit_observer_candidate)
{
roles.insert(next_owner, SessionRole::Owner);
roles.remove(&disconnected_client_id);
events.push(Event::RoleChanged {
session_id: session_id.0,
client_id: next_owner.0,
role: SessionRole::Owner,
by_client_id: disconnected_client_id.0,
});
}
} else {
roles.remove(&disconnected_client_id);
}
}
events
}
}
impl FollowState {
fn connect_client(&mut self, client_id: ClientId) {
self.connected_clients.insert(client_id);
self.selected_sessions.entry(client_id).or_insert(None);
}
fn disconnect_client(&mut self, client_id: ClientId) -> Vec<Event> {
self.connected_clients.remove(&client_id);
self.selected_sessions.remove(&client_id);
self.follows.remove(&client_id);
let impacted_followers = self
.follows
.iter()
.filter_map(|(follower_id, entry)| {
(entry.leader_client_id == client_id).then_some(*follower_id)
})
.collect::<Vec<_>>();
impacted_followers
.into_iter()
.filter_map(|follower_id| {
self.follows
.remove(&follower_id)
.map(|entry| Event::FollowTargetGone {
follower_client_id: follower_id.0,
former_leader_client_id: entry.leader_client_id.0,
})
})
.collect()
}
fn set_selected_session(&mut self, client_id: ClientId, session_id: Option<SessionId>) {
if self.connected_clients.contains(&client_id) {
self.selected_sessions.insert(client_id, session_id);
}
}
fn selected_session(&self, client_id: ClientId) -> Option<Option<SessionId>> {
self.selected_sessions.get(&client_id).copied()
}
fn start_follow(
&mut self,
follower_client_id: ClientId,
leader_client_id: ClientId,
global: bool,
) -> std::result::Result<Option<SessionId>, &'static str> {
if follower_client_id == leader_client_id {
return Err("cannot follow self");
}
if !self.connected_clients.contains(&leader_client_id) {
return Err("target client not connected");
}
if !self.connected_clients.contains(&follower_client_id) {
return Err("follower client not connected");
}
self.follows.insert(
follower_client_id,
FollowEntry {
leader_client_id,
global,
},
);
if global
&& let Some(leader_selected) = self.selected_sessions.get(&leader_client_id).copied()
{
self.selected_sessions
.insert(follower_client_id, leader_selected);
return Ok(leader_selected);
}
Ok(None)
}
fn stop_follow(&mut self, follower_client_id: ClientId) -> bool {
self.follows.remove(&follower_client_id).is_some()
}
fn sync_followers_from_leader(
&mut self,
leader_client_id: ClientId,
selected_session: Option<SessionId>,
) -> Vec<FollowTargetUpdate> {
let followers = self
.follows
.iter()
.filter_map(|(follower_id, entry)| {
(entry.leader_client_id == leader_client_id && entry.global).then_some(*follower_id)
})
.collect::<Vec<_>>();
let mut updates = Vec::new();
for follower_id in followers {
if self.connected_clients.contains(&follower_id) {
let previous = self.selected_sessions.get(&follower_id).copied().flatten();
self.selected_sessions.insert(follower_id, selected_session);
if let Some(session_id) = selected_session
&& previous != Some(session_id)
{
updates.push(FollowTargetUpdate {
follower_client_id: follower_id,
leader_client_id,
session_id,
});
}
}
}
updates
}
fn list_clients(&self) -> Vec<ClientSummary> {
self.connected_clients
.iter()
.map(|client_id| {
let selected_session_id = self
.selected_sessions
.get(client_id)
.and_then(|selected| selected.map(|session_id| session_id.0));
let (following_client_id, following_global) =
self.follows.get(client_id).map_or((None, false), |entry| {
(Some(entry.leader_client_id.0), entry.global)
});
ClientSummary {
id: client_id.0,
selected_session_id,
following_client_id,
following_global,
session_role: None,
}
})
.collect()
}
}
fn resolve_server_shell(config: &BmuxConfig) -> String {
if let Some(shell) = config.general.default_shell.as_ref()
&& !shell.trim().is_empty()
{
return shell.clone();
}
if let Ok(shell) = std::env::var("SHELL")
&& !shell.trim().is_empty()
{
return shell;
}
if cfg!(windows) {
"cmd.exe".to_string()
} else {
"/bin/sh".to_string()
}
}
async fn shutdown_runtime_handle(removed: RemovedRuntime) {
for window in removed.handle.windows.into_values() {
shutdown_window_handle(window).await;
}
}
async fn shutdown_window_handle(mut window: WindowRuntimeHandle) {
if let Some(stop_tx) = window.stop_tx.take() {
let _ = stop_tx.send(());
}
match tokio::time::timeout(Duration::from_millis(250), &mut window.task).await {
Ok(_) => {}
Err(_) => {
window.task.abort();
let _ = window.task.await;
}
}
}
struct SessionRuntimeManager {
runtimes: BTreeMap<SessionId, SessionRuntimeHandle>,
shell: String,
}
struct SessionRuntimeHandle {
windows: BTreeMap<bmux_session::WindowId, WindowRuntimeHandle>,
active_window: bmux_session::WindowId,
next_auto_window_number: u32,
attached_clients: BTreeSet<ClientId>,
}
struct WindowRuntimeHandle {
id: bmux_session::WindowId,
name: Option<String>,
stop_tx: Option<oneshot::Sender<()>>,
task: JoinHandle<()>,
input_tx: mpsc::UnboundedSender<Vec<u8>>,
output_buffer: Arc<std::sync::Mutex<OutputFanoutBuffer>>,
}
struct OutputFanoutBuffer {
max_bytes: usize,
start_offset: u64,
data: VecDeque<u8>,
cursors: BTreeMap<ClientId, u64>,
}
impl OutputFanoutBuffer {
fn new(max_bytes: usize) -> Self {
Self {
max_bytes: max_bytes.max(1),
start_offset: 0,
data: VecDeque::new(),
cursors: BTreeMap::new(),
}
}
fn end_offset(&self) -> u64 {
self.start_offset + self.data.len() as u64
}
fn register_client_at_tail(&mut self, client_id: ClientId) {
self.cursors.insert(client_id, self.end_offset());
}
fn unregister_client(&mut self, client_id: ClientId) {
self.cursors.remove(&client_id);
}
fn push_chunk(&mut self, chunk: &[u8]) {
self.data.extend(chunk.iter().copied());
while self.data.len() > self.max_bytes {
let _ = self.data.pop_front();
self.start_offset = self.start_offset.saturating_add(1);
}
for cursor in self.cursors.values_mut() {
if *cursor < self.start_offset {
*cursor = self.start_offset;
}
}
}
fn read_for_client(&mut self, client_id: ClientId, max_bytes: usize) -> Vec<u8> {
let limit = max_bytes.max(1);
let end = self.end_offset();
let cursor = self.cursors.entry(client_id).or_insert(end);
if *cursor < self.start_offset {
*cursor = self.start_offset;
}
let available = end.saturating_sub(*cursor) as usize;
if available == 0 {
return Vec::new();
}
let to_read = available.min(limit);
let start_index = (*cursor - self.start_offset) as usize;
let output = self
.data
.iter()
.skip(start_index)
.take(to_read)
.copied()
.collect::<Vec<_>>();
*cursor = cursor.saturating_add(output.len() as u64);
output
}
}
struct RemovedRuntime {
session_id: SessionId,
had_attached_clients: bool,
handle: SessionRuntimeHandle,
}
struct RemovedWindowRuntime {
session_id: SessionId,
window_id: bmux_session::WindowId,
handle: WindowRuntimeHandle,
session_removed: Option<RemovedRuntime>,
}
struct WindowRuntimeSummary {
id: bmux_session::WindowId,
name: Option<String>,
active: bool,
}
#[derive(Debug, Clone)]
enum WindowSelection {
Active,
Id(bmux_session::WindowId),
Name(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SessionRuntimeError {
NotFound,
NotAttached,
Closed,
}
impl SessionRuntimeManager {
fn new(shell: String) -> Self {
Self {
runtimes: BTreeMap::new(),
shell,
}
}
fn start_runtime(&mut self, session_id: SessionId) -> Result<()> {
if self.runtimes.contains_key(&session_id) {
anyhow::bail!("runtime already exists for session {}", session_id.0);
}
let first_window = self.spawn_window_runtime(None, Some("window-1".to_string()))?;
let first_window_id = first_window.id;
let mut windows = BTreeMap::new();
windows.insert(first_window.id, first_window);
self.runtimes.insert(
session_id,
SessionRuntimeHandle {
windows,
active_window: first_window_id,
next_auto_window_number: 2,
attached_clients: BTreeSet::new(),
},
);
Ok(())
}
fn restore_runtime(
&mut self,
session_id: SessionId,
windows: Vec<(bmux_session::WindowId, Option<String>)>,
active_window: bmux_session::WindowId,
) -> Result<()> {
if self.runtimes.contains_key(&session_id) {
anyhow::bail!("runtime already exists for session {}", session_id.0);
}
let mut runtime_windows = BTreeMap::new();
for (window_id, window_name) in windows {
let handle = self.spawn_window_runtime(Some(window_id), window_name)?;
runtime_windows.insert(window_id, handle);
}
if !runtime_windows.contains_key(&active_window) {
anyhow::bail!("active window missing from restore runtime");
}
self.runtimes.insert(
session_id,
SessionRuntimeHandle {
windows: runtime_windows,
active_window,
next_auto_window_number: 2,
attached_clients: BTreeSet::new(),
},
);
Ok(())
}
fn spawn_window_runtime(
&self,
id: Option<bmux_session::WindowId>,
name: Option<String>,
) -> Result<WindowRuntimeHandle> {
let (stop_tx, mut stop_rx) = oneshot::channel();
let (input_tx, mut input_rx) = mpsc::unbounded_channel::<Vec<u8>>();
let output_buffer = Arc::new(std::sync::Mutex::new(OutputFanoutBuffer::new(
MAX_WINDOW_OUTPUT_BUFFER_BYTES,
)));
let shell = self.shell.clone();
let window_id = id.unwrap_or_else(bmux_session::WindowId::new);
let output_buffer_for_reader = Arc::clone(&output_buffer);
let task = tokio::spawn(async move {
let pty_system = native_pty_system();
let pty_pair = match pty_system.openpty(PtySize {
rows: 24,
cols: 80,
pixel_width: 0,
pixel_height: 0,
}) {
Ok(pair) => pair,
Err(_) => return,
};
let mut command = CommandBuilder::new(&shell);
command.env("TERM", "xterm-256color");
let mut child = match pty_pair.slave.spawn_command(command) {
Ok(child) => child,
Err(_) => return,
};
drop(pty_pair.slave);
let mut reader = match pty_pair.master.try_clone_reader() {
Ok(reader) => reader,
Err(_) => {
let _ = child.kill();
return;
}
};
let mut writer = match pty_pair.master.take_writer() {
Ok(writer) => writer,
Err(_) => {
let _ = child.kill();
return;
}
};
let reader_output = Arc::clone(&output_buffer_for_reader);
let reader_thread = std::thread::Builder::new()
.name(format!("bmux-server-window-{}", window_id.0))
.spawn(move || {
let mut buffer = [0_u8; 8192];
loop {
match reader.read(&mut buffer) {
Ok(0) => break,
Ok(bytes_read) => {
if let Ok(mut output) = reader_output.lock() {
output.push_chunk(&buffer[..bytes_read]);
} else {
break;
}
}
Err(_) => break,
}
}
})
.ok();
loop {
tokio::select! {
_ = &mut stop_rx => {
break;
}
input = input_rx.recv() => {
match input {
Some(bytes) => {
if writer.write_all(&bytes).is_err() {
break;
}
let _ = writer.flush();
}
None => break,
}
}
}
}
let _ = child.kill();
let _ = child.wait();
if let Some(thread) = reader_thread {
let _ = thread.join();
}
});
Ok(WindowRuntimeHandle {
id: window_id,
name,
stop_tx: Some(stop_tx),
task,
input_tx,
output_buffer,
})
}
fn new_window(
&mut self,
session_id: SessionId,
name: Option<String>,
) -> Result<(bmux_session::WindowId, Option<String>)> {
let resolved_name = if let Some(name) = name {
Some(name)
} else {
let session = self
.runtimes
.get(&session_id)
.ok_or_else(|| anyhow::anyhow!("runtime not found for session {}", session_id.0))?;
Some(format!("window-{}", session.next_auto_window_number))
};
let window = self.spawn_window_runtime(None, resolved_name.clone())?;
let window_id = window.id;
let session = self
.runtimes
.get_mut(&session_id)
.ok_or_else(|| anyhow::anyhow!("runtime not found for session {}", session_id.0))?;
session.windows.insert(window_id, window);
session.next_auto_window_number = session.next_auto_window_number.saturating_add(1);
Ok((window_id, resolved_name))
}
fn list_windows(&self, session_id: SessionId) -> Result<Vec<WindowRuntimeSummary>> {
let session = self
.runtimes
.get(&session_id)
.ok_or_else(|| anyhow::anyhow!("runtime not found for session {}", session_id.0))?;
Ok(session
.windows
.values()
.map(|window| WindowRuntimeSummary {
id: window.id,
name: window.name.clone(),
active: window.id == session.active_window,
})
.collect())
}
fn switch_window(
&mut self,
session_id: SessionId,
selector: WindowSelection,
) -> Result<bmux_session::WindowId> {
let session = self
.runtimes
.get_mut(&session_id)
.ok_or_else(|| anyhow::anyhow!("runtime not found for session {}", session_id.0))?;
let lookup_selector = selector.clone();
let window_id = resolve_window_id_from_selector(session, selector).ok_or_else(|| {
anyhow::anyhow!(window_not_found_in_session_message(
&lookup_selector,
session_id
))
})?;
session.active_window = window_id;
Ok(window_id)
}
fn kill_window(
&mut self,
session_id: SessionId,
selector: WindowSelection,
) -> Result<RemovedWindowRuntime> {
let (window_id, is_last_window) = {
let session = self
.runtimes
.get_mut(&session_id)
.ok_or_else(|| anyhow::anyhow!("runtime not found for session {}", session_id.0))?;
let lookup_selector = selector.clone();
let Some(window_id) = resolve_window_id_from_selector(session, selector) else {
anyhow::bail!(
"{}",
window_not_found_in_session_message(&lookup_selector, session_id)
);
};
let is_last_window = session.windows.len() == 1;
if !is_last_window && session.active_window == window_id {
let next_active = session
.windows
.keys()
.copied()
.find(|id| *id != window_id)
.ok_or_else(|| anyhow::anyhow!("failed selecting next active window"))?;
session.active_window = next_active;
}
(window_id, is_last_window)
};
if is_last_window {
let mut removed_session = self.remove_runtime(session_id)?;
let window = removed_session
.handle
.windows
.remove(&window_id)
.ok_or_else(|| anyhow::anyhow!("window missing during session removal"))?;
return Ok(RemovedWindowRuntime {
session_id,
window_id,
handle: window,
session_removed: Some(removed_session),
});
}
let session = self
.runtimes
.get_mut(&session_id)
.ok_or_else(|| anyhow::anyhow!("runtime not found for session {}", session_id.0))?;
let window = session
.windows
.remove(&window_id)
.ok_or_else(|| anyhow::anyhow!("window not found in session {}", session_id.0))?;
Ok(RemovedWindowRuntime {
session_id,
window_id,
handle: window,
session_removed: None,
})
}
fn remove_runtime(&mut self, session_id: SessionId) -> Result<RemovedRuntime> {
let runtime = self
.runtimes
.remove(&session_id)
.ok_or_else(|| anyhow::anyhow!("runtime not found for session {}", session_id.0))?;
Ok(RemovedRuntime {
session_id,
had_attached_clients: !runtime.attached_clients.is_empty(),
handle: runtime,
})
}
fn remove_all_runtimes(&mut self) -> Vec<RemovedRuntime> {
std::mem::take(&mut self.runtimes)
.into_iter()
.map(|(session_id, runtime)| RemovedRuntime {
session_id,
had_attached_clients: !runtime.attached_clients.is_empty(),
handle: runtime,
})
.collect()
}
fn begin_attach(
&mut self,
session_id: SessionId,
client_id: ClientId,
) -> Result<(), SessionRuntimeError> {
let runtime = self
.runtimes
.get_mut(&session_id)
.ok_or(SessionRuntimeError::NotFound)?;
runtime.attached_clients.insert(client_id);
for window in runtime.windows.values_mut() {
let mut output = window
.output_buffer
.lock()
.map_err(|_| SessionRuntimeError::Closed)?;
output.register_client_at_tail(client_id);
}
Ok(())
}
fn end_attach(&mut self, session_id: SessionId, client_id: ClientId) {
if let Some(runtime) = self.runtimes.get_mut(&session_id) {
let removed = runtime.attached_clients.remove(&client_id);
if removed {
for window in runtime.windows.values_mut() {
if let Ok(mut output) = window.output_buffer.lock() {
output.unregister_client(client_id);
}
}
}
}
}
fn write_input(
&mut self,
session_id: SessionId,
client_id: ClientId,
data: Vec<u8>,
) -> Result<usize, SessionRuntimeError> {
let runtime = self
.runtimes
.get_mut(&session_id)
.ok_or(SessionRuntimeError::NotFound)?;
if !runtime.attached_clients.contains(&client_id) {
return Err(SessionRuntimeError::NotAttached);
}
let window = runtime
.windows
.get_mut(&runtime.active_window)
.ok_or(SessionRuntimeError::NotFound)?;
let bytes = data.len();
window
.input_tx
.send(data)
.map_err(|_| SessionRuntimeError::Closed)?;
Ok(bytes)
}
fn read_output(
&mut self,
session_id: SessionId,
client_id: ClientId,
max_bytes: usize,
) -> Result<Vec<u8>, SessionRuntimeError> {
let runtime = self
.runtimes
.get_mut(&session_id)
.ok_or(SessionRuntimeError::NotFound)?;
if !runtime.attached_clients.contains(&client_id) {
return Err(SessionRuntimeError::NotAttached);
}
let window = runtime
.windows
.get_mut(&runtime.active_window)
.ok_or(SessionRuntimeError::NotFound)?;
let mut output = window
.output_buffer
.lock()
.map_err(|_| SessionRuntimeError::Closed)?;
Ok(output.read_for_client(client_id, max_bytes))
}
#[cfg(test)]
fn runtime_count(&self) -> usize {
self.runtimes.len()
}
#[cfg(test)]
fn has_runtime(&self, session_id: SessionId) -> bool {
self.runtimes.contains_key(&session_id)
}
#[cfg(test)]
fn window_count(&self, session_id: SessionId) -> usize {
self.runtimes
.get(&session_id)
.map(|runtime| runtime.windows.len())
.unwrap_or(0)
}
}
fn resolve_window_id_from_selector(
session: &SessionRuntimeHandle,
selector: WindowSelection,
) -> Option<bmux_session::WindowId> {
match selector {
WindowSelection::Active => Some(session.active_window),
WindowSelection::Id(id) => session.windows.contains_key(&id).then_some(id),
WindowSelection::Name(value) => {
if let Some(window) = session
.windows
.values()
.find(|window| window.name.as_deref() == Some(value.as_str()))
{
return Some(window.id);
}
if let Some(window) = session
.windows
.values()
.find(|window| window.id.to_string().eq_ignore_ascii_case(&value))
{
return Some(window.id);
}
let value_lower = value.to_ascii_lowercase();
session
.windows
.values()
.find(|window| {
window
.id
.to_string()
.to_ascii_lowercase()
.starts_with(&value_lower)
})
.map(|window| window.id)
}
}
}
fn window_not_found_in_session_message(
selector: &WindowSelection,
session_id: SessionId,
) -> String {
match selector {
WindowSelection::Name(_) => format!(
"window not found in session {} for selector {selector:?} (lookup order: exact name -> exact UUID -> UUID prefix)",
session_id.0
),
_ => format!(
"window not found in session {} for selector {selector:?}",
session_id.0
),
}
}
impl BmuxServer {
fn new_with_snapshot(endpoint: IpcEndpoint, snapshot_manager: Option<SnapshotManager>) -> Self {
let snapshot_runtime = match snapshot_manager {
Some(manager) => SnapshotRuntime::with_manager(manager),
None => SnapshotRuntime::disabled(),
};
let config = BmuxConfig::load().unwrap_or_default();
let shell = resolve_server_shell(&config);
let (shutdown_tx, _) = watch::channel(false);
Self {
endpoint,
state: Arc::new(ServerState {
session_manager: Mutex::new(SessionManager::new()),
session_runtimes: Mutex::new(SessionRuntimeManager::new(shell)),
attach_tokens: Mutex::new(AttachTokenManager::new(ATTACH_TOKEN_TTL)),
follow_state: Mutex::new(FollowState::default()),
permission_state: Mutex::new(PermissionState::default()),
snapshot_runtime: Mutex::new(snapshot_runtime),
operation_lock: AsyncMutex::new(()),
event_hub: Mutex::new(EventHub::new(1024)),
handshake_timeout: DEFAULT_HANDSHAKE_TIMEOUT,
}),
shutdown_tx,
}
}
#[must_use]
pub fn new(endpoint: IpcEndpoint) -> Self {
Self::new_with_snapshot(endpoint, None)
}
#[must_use]
pub fn from_config_paths(paths: &ConfigPaths) -> Self {
#[cfg(unix)]
let endpoint = IpcEndpoint::unix_socket(paths.server_socket());
#[cfg(windows)]
let endpoint = IpcEndpoint::windows_named_pipe(paths.server_named_pipe());
let snapshot_manager = SnapshotManager::from_paths(paths);
Self::new_with_snapshot(endpoint, Some(snapshot_manager))
}
#[must_use]
pub fn from_default_paths() -> Self {
Self::from_config_paths(&ConfigPaths::default())
}
#[must_use]
pub const fn endpoint(&self) -> &IpcEndpoint {
&self.endpoint
}
pub fn request_shutdown(&self) {
let _ = self.shutdown_tx.send(true);
}
pub async fn run(&self) -> Result<()> {
self.run_impl(None).await
}
async fn run_impl(
&self,
mut ready_tx: Option<oneshot::Sender<std::result::Result<(), String>>>,
) -> Result<()> {
let listener = match LocalIpcListener::bind(&self.endpoint)
.await
.with_context(|| format!("failed binding server endpoint {:?}", self.endpoint))
{
Ok(listener) => listener,
Err(error) => {
if let Some(tx) = ready_tx.take() {
let _ = tx.send(Err(format!("{error:#}")));
}
return Err(error);
}
};
if let Err(error) = restore_snapshot_if_present(&self.state) {
if let Some(tx) = ready_tx.take() {
let _ = tx.send(Err(format!("{error:#}")));
}
return Err(error);
}
info!("bmux server listening on {:?}", self.endpoint);
emit_event(&self.state, Event::ServerStarted)?;
if let Some(tx) = ready_tx.take() {
let _ = tx.send(Ok(()));
}
let mut shutdown_rx = self.shutdown_tx.subscribe();
loop {
tokio::select! {
changed = shutdown_rx.changed() => {
if changed.is_ok() && *shutdown_rx.borrow() {
info!("bmux server shutdown requested");
break;
}
if changed.is_err() {
break;
}
}
accepted = listener.accept() => {
match accepted {
Ok(stream) => {
let state = Arc::clone(&self.state);
let shutdown_tx = self.shutdown_tx.clone();
tokio::spawn(async move {
if let Err(error) = handle_connection(state, shutdown_tx, stream).await {
warn!("connection handler failed: {error:#}");
}
});
}
Err(error) => {
return Err(error).context("accept loop failed");
}
}
}
}
}
let _ = maybe_flush_snapshot(&self.state, true);
let removed_runtimes = if let Ok(mut runtime_manager) = self.state.session_runtimes.lock() {
runtime_manager.remove_all_runtimes()
} else {
Vec::new()
};
for removed_runtime in removed_runtimes {
if removed_runtime.had_attached_clients {
let _ = emit_event(
&self.state,
Event::ClientDetached {
id: removed_runtime.session_id.0,
},
);
}
shutdown_runtime_handle(removed_runtime).await;
}
if let Ok(mut session_manager) = self.state.session_manager.lock() {
*session_manager = SessionManager::new();
}
if let Ok(mut permission_state) = self.state.permission_state.lock() {
*permission_state = PermissionState::default();
}
let _ = emit_event(&self.state, Event::ServerStopping);
if let Ok(mut attach_tokens) = self.state.attach_tokens.lock() {
attach_tokens.clear();
}
Ok(())
}
#[cfg(test)]
async fn run_with_ready(
&self,
ready_tx: oneshot::Sender<std::result::Result<(), String>>,
) -> Result<()> {
self.run_impl(Some(ready_tx)).await
}
}
async fn handle_connection(
state: Arc<ServerState>,
shutdown_tx: watch::Sender<bool>,
mut stream: LocalIpcStream,
) -> Result<()> {
let client_id = ClientId::new();
let mut selected_session: Option<SessionId> = None;
let mut attached_stream_session: Option<SessionId> = None;
let first_envelope = tokio::time::timeout(state.handshake_timeout, stream.recv_envelope())
.await
.context("handshake timed out")??;
let handshake = parse_request(&first_envelope)?;
match handshake {
Request::Hello {
protocol_version,
client_name,
} => {
if protocol_version != ProtocolVersion::current() {
send_error(
&mut stream,
first_envelope.request_id,
ErrorCode::VersionMismatch,
format!(
"unsupported protocol version {}; expected {}",
protocol_version.0, CURRENT_PROTOCOL_VERSION
),
)
.await?;
return Ok(());
}
debug!("accepted client handshake: {client_name}");
let snapshot = snapshot_status(&state)?;
send_ok(
&mut stream,
first_envelope.request_id,
ResponsePayload::ServerStatus {
running: true,
snapshot,
},
)
.await?;
}
_ => {
send_error(
&mut stream,
first_envelope.request_id,
ErrorCode::InvalidRequest,
"first request must be hello".to_string(),
)
.await?;
return Ok(());
}
}
{
let mut follow_state = state
.follow_state
.lock()
.map_err(|_| anyhow::anyhow!("follow state lock poisoned"))?;
follow_state.connect_client(client_id);
}
loop {
let envelope = match stream.recv_envelope().await {
Ok(envelope) => envelope,
Err(IpcTransportError::Io(error))
if error.kind() == std::io::ErrorKind::UnexpectedEof =>
{
break;
}
Err(error) => return Err(error).context("failed receiving request envelope"),
};
let request = match parse_request(&envelope) {
Ok(request) => request,
Err(error) => {
send_error(
&mut stream,
envelope.request_id,
ErrorCode::InvalidRequest,
format!("failed parsing request: {error:#}"),
)
.await?;
continue;
}
};
let response = handle_request(
&state,
&shutdown_tx,
client_id,
&mut selected_session,
&mut attached_stream_session,
request,
)
.await?;
send_response(&mut stream, envelope.request_id, response).await?;
}
detach_client_state_on_disconnect(
&state,
client_id,
&mut selected_session,
&mut attached_stream_session,
)?;
disconnect_follow_state(&state, client_id)?;
rebalance_owner_roles_after_disconnect(&state, client_id)?;
mark_snapshot_dirty(&state)?;
maybe_flush_snapshot(&state, false)?;
unsubscribe_events(&state, client_id)?;
Ok(())
}
fn emit_event(state: &Arc<ServerState>, event: Event) -> Result<()> {
let mut hub = state
.event_hub
.lock()
.map_err(|_| anyhow::anyhow!("event hub lock poisoned"))?;
hub.emit(event);
Ok(())
}
fn unsubscribe_events(state: &Arc<ServerState>, client_id: ClientId) -> Result<()> {
let mut hub = state
.event_hub
.lock()
.map_err(|_| anyhow::anyhow!("event hub lock poisoned"))?;
hub.unsubscribe(client_id);
Ok(())
}
fn sync_selected_session_from_follow_state(
state: &Arc<ServerState>,
client_id: ClientId,
selected_session: &mut Option<SessionId>,
) -> Result<()> {
let follow_state = state
.follow_state
.lock()
.map_err(|_| anyhow::anyhow!("follow state lock poisoned"))?;
if let Some(follow_selected_session) = follow_state.selected_session(client_id) {
*selected_session = follow_selected_session;
}
Ok(())
}
fn reconcile_selected_session_membership(
state: &Arc<ServerState>,
client_id: ClientId,
previous: Option<SessionId>,
next: Option<SessionId>,
) -> Result<()> {
if previous == next {
return Ok(());
}
let mut manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
if let Some(previous_session) = previous
&& let Some(session) = manager.get_session_mut(&previous_session)
{
session.remove_client(&client_id);
}
if let Some(next_session) = next
&& let Some(session) = manager.get_session_mut(&next_session)
{
session.add_client(client_id);
}
Ok(())
}
fn persist_selected_session(
state: &Arc<ServerState>,
client_id: ClientId,
selected_session: Option<SessionId>,
) -> Result<()> {
let updates = {
let mut follow_state = state
.follow_state
.lock()
.map_err(|_| anyhow::anyhow!("follow state lock poisoned"))?;
follow_state.set_selected_session(client_id, selected_session);
follow_state.sync_followers_from_leader(client_id, selected_session)
};
for update in updates {
emit_event(
state,
Event::FollowTargetChanged {
follower_client_id: update.follower_client_id.0,
leader_client_id: update.leader_client_id.0,
session_id: update.session_id.0,
},
)?;
}
Ok(())
}
fn disconnect_follow_state(state: &Arc<ServerState>, client_id: ClientId) -> Result<()> {
let events = {
let mut follow_state = state
.follow_state
.lock()
.map_err(|_| anyhow::anyhow!("follow state lock poisoned"))?;
follow_state.disconnect_client(client_id)
};
for event in events {
emit_event(state, event)?;
}
Ok(())
}
fn clear_selected_session_for_all(
state: &Arc<ServerState>,
removed_session_id: SessionId,
) -> Result<()> {
let mut follow_state = state
.follow_state
.lock()
.map_err(|_| anyhow::anyhow!("follow state lock poisoned"))?;
let affected_clients = follow_state
.selected_sessions
.iter()
.filter_map(|(client_id, selected)| {
(*selected == Some(removed_session_id)).then_some(*client_id)
})
.collect::<Vec<_>>();
for client_id in &affected_clients {
follow_state.selected_sessions.insert(*client_id, None);
}
for client_id in affected_clients {
let _ = follow_state.sync_followers_from_leader(client_id, None);
}
Ok(())
}
fn rebalance_owner_roles_after_disconnect(
state: &Arc<ServerState>,
disconnected_client_id: ClientId,
) -> Result<()> {
let connected_clients = {
let follow_state = state
.follow_state
.lock()
.map_err(|_| anyhow::anyhow!("follow state lock poisoned"))?;
follow_state.connected_clients.clone()
};
let session_clients = {
let manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
manager
.list_sessions()
.into_iter()
.filter_map(|summary| {
manager
.get_session(&summary.id)
.map(|session| (summary.id, session.clients.clone()))
})
.collect::<BTreeMap<SessionId, BTreeSet<ClientId>>>()
};
let role_change_events = {
let mut permission_state = state
.permission_state
.lock()
.map_err(|_| anyhow::anyhow!("permission state lock poisoned"))?;
permission_state.rebalance_after_disconnect(
disconnected_client_id,
&connected_clients,
&session_clients,
)
};
for event in role_change_events {
emit_event(state, event)?;
}
Ok(())
}
fn mark_snapshot_dirty(state: &Arc<ServerState>) -> Result<()> {
let mut runtime = state
.snapshot_runtime
.lock()
.map_err(|_| anyhow::anyhow!("snapshot runtime lock poisoned"))?;
if runtime.manager.is_some() {
runtime.dirty = true;
runtime.last_marked_at = Some(Instant::now());
}
Ok(())
}
fn maybe_flush_snapshot(state: &Arc<ServerState>, force: bool) -> Result<()> {
let manager = {
let mut runtime = state
.snapshot_runtime
.lock()
.map_err(|_| anyhow::anyhow!("snapshot runtime lock poisoned"))?;
let should_flush = if force {
runtime.dirty
} else {
runtime.dirty
&& runtime
.last_marked_at
.is_some_and(|last| last.elapsed() >= runtime.debounce_interval)
};
if !should_flush {
return Ok(());
}
runtime.dirty = false;
runtime.last_marked_at = None;
runtime.manager.clone()
};
let Some(manager) = manager else {
return Ok(());
};
let snapshot = build_snapshot(state)?;
if let Err(error) = manager.write_snapshot(&snapshot) {
warn!("failed writing server snapshot: {error}");
let mut runtime = state
.snapshot_runtime
.lock()
.map_err(|_| anyhow::anyhow!("snapshot runtime lock poisoned"))?;
runtime.dirty = true;
runtime.last_marked_at = Some(Instant::now());
runtime.last_restore_error = Some(format!("snapshot write failed: {error}"));
} else {
let mut runtime = state
.snapshot_runtime
.lock()
.map_err(|_| anyhow::anyhow!("snapshot runtime lock poisoned"))?;
runtime.last_write_epoch_ms = Some(epoch_millis_now());
runtime.last_restore_error = None;
}
Ok(())
}
fn snapshot_status(state: &Arc<ServerState>) -> Result<ServerSnapshotStatus> {
let runtime = state
.snapshot_runtime
.lock()
.map_err(|_| anyhow::anyhow!("snapshot runtime lock poisoned"))?;
let path = runtime
.manager
.as_ref()
.map(|manager| manager.path().to_string_lossy().to_string());
let snapshot_exists = runtime
.manager
.as_ref()
.is_some_and(|manager| manager.path().exists());
Ok(ServerSnapshotStatus {
enabled: runtime.manager.is_some(),
path,
snapshot_exists,
last_write_epoch_ms: runtime.last_write_epoch_ms,
last_restore_epoch_ms: runtime.last_restore_epoch_ms,
last_restore_error: runtime.last_restore_error.clone(),
})
}
fn build_snapshot(state: &Arc<ServerState>) -> Result<SnapshotV1> {
let sessions = {
let manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
manager.list_sessions()
};
let session_snapshots = {
let manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
let runtime_manager = state
.session_runtimes
.lock()
.map_err(|_| anyhow::anyhow!("session runtime manager lock poisoned"))?;
sessions
.iter()
.map(|session_info| {
let windows = runtime_manager
.list_windows(session_info.id)
.unwrap_or_default();
let active_window_id = windows.iter().find(|w| w.active).map(|w| w.id.0);
let window_snapshots = windows
.into_iter()
.map(|window| WindowSnapshotV1 {
id: window.id.0,
name: window.name,
})
.collect::<Vec<_>>();
let name = manager
.get_session(&session_info.id)
.and_then(|session| session.name.clone());
SessionSnapshotV1 {
id: session_info.id.0,
name,
windows: window_snapshots,
active_window_id,
}
})
.collect::<Vec<_>>()
};
let roles = {
let permission_state = state
.permission_state
.lock()
.map_err(|_| anyhow::anyhow!("permission state lock poisoned"))?;
permission_state
.roles
.iter()
.flat_map(|(session_id, assignments)| {
assignments
.iter()
.map(|(client_id, role)| RoleAssignmentSnapshotV1 {
session_id: session_id.0,
client_id: client_id.0,
role: *role,
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>()
};
let (follows, selected_sessions) = {
let follow_state = state
.follow_state
.lock()
.map_err(|_| anyhow::anyhow!("follow state lock poisoned"))?;
let follows = follow_state
.follows
.iter()
.map(|(follower_id, entry)| FollowEdgeSnapshotV1 {
follower_client_id: follower_id.0,
leader_client_id: entry.leader_client_id.0,
global: entry.global,
})
.collect::<Vec<_>>();
let selected_sessions = follow_state
.selected_sessions
.iter()
.map(|(client_id, selected)| ClientSelectedSessionSnapshotV1 {
client_id: client_id.0,
session_id: selected.map(|session| session.0),
})
.collect::<Vec<_>>();
(follows, selected_sessions)
};
Ok(SnapshotV1 {
sessions: session_snapshots,
roles,
follows,
selected_sessions,
})
}
fn restore_snapshot_if_present(state: &Arc<ServerState>) -> Result<()> {
let manager = {
let runtime = state
.snapshot_runtime
.lock()
.map_err(|_| anyhow::anyhow!("snapshot runtime lock poisoned"))?;
runtime.manager.clone()
};
let Some(snapshot_manager) = manager else {
return Ok(());
};
if let Err(error) = snapshot_manager.cleanup_temp_file() {
warn!("failed cleaning stale snapshot temp file: {error}");
}
if !snapshot_manager.path().exists() {
return Ok(());
}
let snapshot = match snapshot_manager.read_snapshot() {
Ok(snapshot) => snapshot,
Err(error) => {
warn!("failed reading snapshot; starting clean: {error}");
let mut runtime = state
.snapshot_runtime
.lock()
.map_err(|_| anyhow::anyhow!("snapshot runtime lock poisoned"))?;
runtime.last_restore_error = Some(format!("{error}"));
return Ok(());
}
};
let _ = apply_snapshot_state(state, &snapshot)?;
{
let mut runtime = state
.snapshot_runtime
.lock()
.map_err(|_| anyhow::anyhow!("snapshot runtime lock poisoned"))?;
runtime.dirty = false;
runtime.last_marked_at = None;
runtime.last_restore_epoch_ms = Some(epoch_millis_now());
runtime.last_restore_error = None;
}
Ok(())
}
fn apply_snapshot_state(state: &Arc<ServerState>, snapshot: &SnapshotV1) -> Result<RestoreSummary> {
let mut summary = RestoreSummary::default();
{
let mut session_manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
let mut runtime_manager = state
.session_runtimes
.lock()
.map_err(|_| anyhow::anyhow!("session runtime manager lock poisoned"))?;
for session_snapshot in &snapshot.sessions {
if session_snapshot.windows.is_empty() {
warn!(
"skipping snapshot session {}: no windows to restore",
session_snapshot.id
);
continue;
}
let session_id = SessionId(session_snapshot.id);
let mut session = Session::new(session_snapshot.name.clone());
session.id = session_id;
for window_snapshot in &session_snapshot.windows {
session.add_window(WindowId(window_snapshot.id));
}
if let Some(active_window_id) = session_snapshot.active_window_id {
let _ = session.set_active_window(WindowId(active_window_id));
}
if let Err(error) = session_manager.insert_session(session) {
warn!(
"skipping snapshot session {} insertion failure: {error}",
session_snapshot.id
);
continue;
}
let active_window = session_snapshot
.active_window_id
.or_else(|| session_snapshot.windows.first().map(|window| window.id))
.map(WindowId)
.expect("windows non-empty implies active fallback");
let runtime_windows = session_snapshot
.windows
.iter()
.map(|window| (WindowId(window.id), window.name.clone()))
.collect::<Vec<_>>();
if let Err(error) =
runtime_manager.restore_runtime(session_id, runtime_windows, active_window)
{
warn!(
"failed restoring runtime for session {}: {error}",
session_snapshot.id
);
let _ = session_manager.remove_session(&session_id);
continue;
}
summary.sessions += 1;
summary.windows += session_snapshot.windows.len();
}
}
{
let mut permission_state = state
.permission_state
.lock()
.map_err(|_| anyhow::anyhow!("permission state lock poisoned"))?;
permission_state.roles.clear();
let session_manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
for role in &snapshot.roles {
let session_id = SessionId(role.session_id);
if session_manager.get_session(&session_id).is_some() {
permission_state.set_role(session_id, ClientId(role.client_id), role.role);
summary.roles += 1;
}
}
}
{
let mut follow_state = state
.follow_state
.lock()
.map_err(|_| anyhow::anyhow!("follow state lock poisoned"))?;
follow_state.follows.clear();
follow_state.selected_sessions.clear();
let session_manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
for selected in &snapshot.selected_sessions {
let selected_session = selected.session_id.map(SessionId);
if selected_session
.is_none_or(|session_id| session_manager.get_session(&session_id).is_some())
{
follow_state
.selected_sessions
.insert(ClientId(selected.client_id), selected_session);
summary.selected_sessions += 1;
}
}
for follow in &snapshot.follows {
follow_state.follows.insert(
ClientId(follow.follower_client_id),
FollowEntry {
leader_client_id: ClientId(follow.leader_client_id),
global: follow.global,
},
);
summary.follows += 1;
}
}
Ok(summary)
}
async fn restore_snapshot_replace(
state: &Arc<ServerState>,
snapshot: SnapshotV1,
) -> Result<RestoreSummary> {
let removed_runtimes = {
let mut runtime_manager = state
.session_runtimes
.lock()
.map_err(|_| anyhow::anyhow!("session runtime manager lock poisoned"))?;
runtime_manager.remove_all_runtimes()
};
for removed_runtime in removed_runtimes {
shutdown_runtime_handle(removed_runtime).await;
}
{
let mut session_manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
*session_manager = SessionManager::new();
}
{
let mut permission_state = state
.permission_state
.lock()
.map_err(|_| anyhow::anyhow!("permission state lock poisoned"))?;
*permission_state = PermissionState::default();
}
{
let mut follow_state = state
.follow_state
.lock()
.map_err(|_| anyhow::anyhow!("follow state lock poisoned"))?;
follow_state.follows.clear();
follow_state.selected_sessions.clear();
}
{
let mut attach_tokens = state
.attach_tokens
.lock()
.map_err(|_| anyhow::anyhow!("attach token manager lock poisoned"))?;
attach_tokens.clear();
}
apply_snapshot_state(state, &snapshot)
}
async fn handle_request(
state: &Arc<ServerState>,
shutdown_tx: &watch::Sender<bool>,
client_id: ClientId,
selected_session: &mut Option<SessionId>,
attached_stream_session: &mut Option<SessionId>,
request: Request,
) -> Result<Response> {
let _operation_guard = if request_requires_exclusive(&request) {
Some(state.operation_lock.lock().await)
} else {
None
};
let previous_selected_session = *selected_session;
sync_selected_session_from_follow_state(state, client_id, selected_session)?;
reconcile_selected_session_membership(
state,
client_id,
previous_selected_session,
*selected_session,
)?;
let response = match request {
Request::Hello { .. } => Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
message: "hello request is only valid during handshake".to_string(),
}),
Request::Ping => Response::Ok(ResponsePayload::Pong),
Request::WhoAmI => Response::Ok(ResponsePayload::ClientIdentity { id: client_id.0 }),
Request::ServerStatus => {
let snapshot = snapshot_status(state)?;
Response::Ok(ResponsePayload::ServerStatus {
running: true,
snapshot,
})
}
Request::ServerSave => {
mark_snapshot_dirty(state)?;
maybe_flush_snapshot(state, true)?;
let status = snapshot_status(state)?;
Response::Ok(ResponsePayload::ServerSnapshotSaved { path: status.path })
}
Request::ServerRestoreDryRun => {
let snapshot_runtime = state
.snapshot_runtime
.lock()
.map_err(|_| anyhow::anyhow!("snapshot runtime lock poisoned"))?;
let Some(manager) = snapshot_runtime.manager.clone() else {
return Ok(Response::Ok(ResponsePayload::ServerSnapshotRestoreDryRun {
ok: false,
message: "snapshot persistence is disabled".to_string(),
}));
};
drop(snapshot_runtime);
match manager.read_snapshot() {
Ok(snapshot) => Response::Ok(ResponsePayload::ServerSnapshotRestoreDryRun {
ok: true,
message: format!(
"snapshot is valid (sessions={}, roles={}, follows={}, selected={})",
snapshot.sessions.len(),
snapshot.roles.len(),
snapshot.follows.len(),
snapshot.selected_sessions.len()
),
}),
Err(error) => Response::Ok(ResponsePayload::ServerSnapshotRestoreDryRun {
ok: false,
message: format!("snapshot dry-run failed: {error}"),
}),
}
}
Request::ServerRestoreApply => {
let manager = {
let snapshot_runtime = state
.snapshot_runtime
.lock()
.map_err(|_| anyhow::anyhow!("snapshot runtime lock poisoned"))?;
snapshot_runtime.manager.clone()
};
let Some(manager) = manager else {
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
message: "snapshot persistence is disabled".to_string(),
}));
};
let snapshot = match manager.read_snapshot() {
Ok(snapshot) => snapshot,
Err(error) => {
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
message: format!("snapshot restore failed: {error}"),
}));
}
};
let summary = restore_snapshot_replace(state, snapshot).await?;
{
let mut runtime = state
.snapshot_runtime
.lock()
.map_err(|_| anyhow::anyhow!("snapshot runtime lock poisoned"))?;
runtime.last_restore_epoch_ms = Some(epoch_millis_now());
runtime.last_restore_error = None;
runtime.dirty = false;
runtime.last_marked_at = None;
}
Response::Ok(ResponsePayload::ServerSnapshotRestored {
sessions: summary.sessions,
windows: summary.windows,
roles: summary.roles,
follows: summary.follows,
selected_sessions: summary.selected_sessions,
})
}
Request::ServerStop => {
let _ = shutdown_tx.send(true);
Response::Ok(ResponsePayload::ServerStopping)
}
Request::NewSession { name } => {
let mut manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
if let Some(requested_name) = name.as_deref()
&& manager
.list_sessions()
.iter()
.any(|session| session.name.as_deref() == Some(requested_name))
{
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::AlreadyExists,
message: format!("session already exists with name '{requested_name}'"),
}));
}
match manager.create_session(name.clone()) {
Ok(session_id) => {
drop(manager);
let mut runtime_manager = state
.session_runtimes
.lock()
.map_err(|_| anyhow::anyhow!("session runtime manager lock poisoned"))?;
if let Err(error) = runtime_manager.start_runtime(session_id) {
drop(runtime_manager);
let mut rollback_manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
let _ = rollback_manager.remove_session(&session_id);
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::Internal,
message: format!("failed creating session runtime: {error:#}"),
}));
}
let initial_window_ids = runtime_manager
.list_windows(session_id)
.map(|windows| windows.into_iter().map(|w| w.id).collect::<Vec<_>>())
.unwrap_or_default();
drop(runtime_manager);
let mut manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
if let Some(session_model) = manager.get_session_mut(&session_id) {
for window_id in initial_window_ids {
session_model.add_window(window_id);
}
}
drop(manager);
let mut permission_state = state
.permission_state
.lock()
.map_err(|_| anyhow::anyhow!("permission state lock poisoned"))?;
permission_state.ensure_owner(session_id, client_id);
Response::Ok(ResponsePayload::SessionCreated {
id: session_id.0,
name,
})
}
Err(error) => Response::Err(ErrorResponse {
code: ErrorCode::Internal,
message: format!("failed creating session: {error:#}"),
}),
}
}
Request::NewWindow { session, name } => {
let manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
let session_id =
match resolve_window_request_session_id(&manager, &session, selected_session) {
Ok(session_id) => session_id,
Err(response) => return Ok(Response::Err(response)),
};
drop(manager);
if let Err(response) = ensure_owner_for_session(state, session_id, client_id) {
return Ok(Response::Err(response));
}
let mut runtime_manager = state
.session_runtimes
.lock()
.map_err(|_| anyhow::anyhow!("session runtime manager lock poisoned"))?;
let (window_id, resolved_name) = match runtime_manager.new_window(session_id, name) {
Ok(created) => created,
Err(error) => {
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::Internal,
message: format!("failed creating window runtime: {error:#}"),
}));
}
};
drop(runtime_manager);
let mut manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
if let Some(session_model) = manager.get_session_mut(&session_id) {
session_model.add_window(window_id);
}
Response::Ok(ResponsePayload::WindowCreated {
id: window_id.0,
session_id: session_id.0,
name: resolved_name,
})
}
Request::ListWindows { session } => {
let manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
let session_id =
match resolve_window_request_session_id(&manager, &session, selected_session) {
Ok(session_id) => session_id,
Err(response) => return Ok(Response::Err(response)),
};
drop(manager);
let runtime_manager = state
.session_runtimes
.lock()
.map_err(|_| anyhow::anyhow!("session runtime manager lock poisoned"))?;
let windows = match runtime_manager.list_windows(session_id) {
Ok(windows) => windows,
Err(error) => {
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::NotFound,
message: format!("failed listing windows: {error:#}"),
}));
}
};
Response::Ok(ResponsePayload::WindowList {
windows: windows
.into_iter()
.map(|window| WindowSummary {
id: window.id.0,
session_id: session_id.0,
name: window.name,
active: window.active,
})
.collect(),
})
}
Request::KillWindow { session, target } => {
let session_id = {
let manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
match resolve_window_request_session_id(&manager, &session, selected_session) {
Ok(session_id) => session_id,
Err(response) => return Ok(Response::Err(response)),
}
};
if let Err(response) = ensure_owner_for_session(state, session_id, client_id) {
return Ok(Response::Err(response));
}
let selection = window_selection_from_selector(target);
let removed_window = {
let mut runtime_manager = state
.session_runtimes
.lock()
.map_err(|_| anyhow::anyhow!("session runtime manager lock poisoned"))?;
match runtime_manager.kill_window(session_id, selection) {
Ok(removed) => removed,
Err(error) => {
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::NotFound,
message: format!("failed killing window: {error:#}"),
}));
}
}
};
let removed_window_session_id = removed_window.session_id;
let removed_window_id = removed_window.window_id;
let removed_window_handle = removed_window.handle;
let session_removed = removed_window.session_removed;
shutdown_window_handle(removed_window_handle).await;
{
let mut manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
if let Some(session_model) = manager.get_session_mut(&removed_window_session_id) {
session_model.remove_window(&removed_window_id);
}
}
emit_event(
state,
Event::WindowRemoved {
id: removed_window_id.0,
session_id: removed_window_session_id.0,
},
)?;
if let Some(removed_session) = session_removed {
if removed_session.had_attached_clients {
emit_event(
state,
Event::ClientDetached {
id: removed_window_session_id.0,
},
)?;
}
shutdown_runtime_handle(removed_session).await;
let mut manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
let _ = manager.remove_session(&removed_window_session_id);
if *selected_session == Some(removed_window_session_id) {
*selected_session = None;
persist_selected_session(state, client_id, None)?;
}
if *attached_stream_session == Some(removed_window_session_id) {
*attached_stream_session = None;
}
let mut attach_tokens = state
.attach_tokens
.lock()
.map_err(|_| anyhow::anyhow!("attach token manager lock poisoned"))?;
attach_tokens.remove_for_session(removed_window_session_id);
drop(attach_tokens);
clear_selected_session_for_all(state, removed_window_session_id)?;
let mut permission_state = state
.permission_state
.lock()
.map_err(|_| anyhow::anyhow!("permission state lock poisoned"))?;
permission_state.remove_session(removed_window_session_id);
drop(permission_state);
emit_event(
state,
Event::SessionRemoved {
id: removed_window_session_id.0,
},
)?;
}
Response::Ok(ResponsePayload::WindowKilled {
id: removed_window_id.0,
session_id: removed_window_session_id.0,
})
}
Request::SwitchWindow { session, target } => {
let mut manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
let session_id =
match resolve_window_request_session_id(&manager, &session, selected_session) {
Ok(session_id) => session_id,
Err(response) => return Ok(Response::Err(response)),
};
if let Err(response) = ensure_owner_for_session(state, session_id, client_id) {
return Ok(Response::Err(response));
}
let selection = window_selection_from_selector(target);
let mut runtime_manager = state
.session_runtimes
.lock()
.map_err(|_| anyhow::anyhow!("session runtime manager lock poisoned"))?;
let switched_id = match runtime_manager.switch_window(session_id, selection) {
Ok(window_id) => window_id,
Err(error) => {
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::NotFound,
message: format!("failed switching window: {error:#}"),
}));
}
};
drop(runtime_manager);
if let Some(session_model) = manager.get_session_mut(&session_id) {
let _ = session_model.set_active_window(switched_id);
}
emit_event(
state,
Event::WindowSwitched {
id: switched_id.0,
session_id: session_id.0,
by_client_id: client_id.0,
},
)?;
Response::Ok(ResponsePayload::WindowSwitched {
id: switched_id.0,
session_id: session_id.0,
})
}
Request::ListSessions => {
let manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
let runtime_manager = state
.session_runtimes
.lock()
.map_err(|_| anyhow::anyhow!("session runtime manager lock poisoned"))?;
let sessions = manager
.list_sessions()
.into_iter()
.map(|session| SessionSummary {
id: session.id.0,
name: session.name,
window_count: runtime_manager
.list_windows(session.id)
.map(|windows| windows.len())
.unwrap_or(session.window_count),
client_count: session.client_count,
})
.collect::<Vec<_>>();
Response::Ok(ResponsePayload::SessionList { sessions })
}
Request::ListClients => {
let follow_state = state
.follow_state
.lock()
.map_err(|_| anyhow::anyhow!("follow state lock poisoned"))?;
let permission_state = state
.permission_state
.lock()
.map_err(|_| anyhow::anyhow!("permission state lock poisoned"))?;
let clients = follow_state
.list_clients()
.into_iter()
.map(|mut client| {
client.session_role = client.selected_session_id.map(|session_id| {
permission_state.role_for(SessionId(session_id), ClientId(client.id))
});
client
})
.collect::<Vec<_>>();
Response::Ok(ResponsePayload::ClientList { clients })
}
Request::ListPermissions { session } => {
let manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
let Some(session_id) = resolve_session_id(&manager, &session) else {
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::NotFound,
message: session_not_found_message(&session),
}));
};
drop(manager);
let permission_state = state
.permission_state
.lock()
.map_err(|_| anyhow::anyhow!("permission state lock poisoned"))?;
let permissions = permission_state.list_permissions(session_id);
Response::Ok(ResponsePayload::PermissionsList {
session_id: session_id.0,
permissions,
})
}
Request::GrantRole {
session,
client_id: target_client_id,
role,
} => {
let manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
let Some(session_id) = resolve_session_id(&manager, &session) else {
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::NotFound,
message: session_not_found_message(&session),
}));
};
drop(manager);
if let Err(response) = ensure_owner_for_session(state, session_id, client_id) {
return Ok(Response::Err(response));
}
let target_client_id = ClientId(target_client_id);
let mut permission_state = state
.permission_state
.lock()
.map_err(|_| anyhow::anyhow!("permission state lock poisoned"))?;
permission_state.set_role(session_id, target_client_id, role);
if role == SessionRole::Owner && target_client_id != client_id {
permission_state.clear_to_observer(session_id, client_id);
}
drop(permission_state);
emit_event(
state,
Event::RoleChanged {
session_id: session_id.0,
client_id: target_client_id.0,
role,
by_client_id: client_id.0,
},
)?;
if role == SessionRole::Owner && target_client_id != client_id {
emit_event(
state,
Event::RoleChanged {
session_id: session_id.0,
client_id: client_id.0,
role: SessionRole::Observer,
by_client_id: client_id.0,
},
)?;
}
Response::Ok(ResponsePayload::RoleGranted {
session_id: session_id.0,
client_id: target_client_id.0,
role,
})
}
Request::RevokeRole {
session,
client_id: target_client_id,
} => {
let manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
let Some(session_id) = resolve_session_id(&manager, &session) else {
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::NotFound,
message: session_not_found_message(&session),
}));
};
drop(manager);
if let Err(response) = ensure_owner_for_session(state, session_id, client_id) {
return Ok(Response::Err(response));
}
let target_client_id = ClientId(target_client_id);
let mut permission_state = state
.permission_state
.lock()
.map_err(|_| anyhow::anyhow!("permission state lock poisoned"))?;
if permission_state.role_for(session_id, target_client_id) == SessionRole::Owner {
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
message: "cannot revoke the current owner role".to_string(),
}));
}
permission_state.clear_to_observer(session_id, target_client_id);
drop(permission_state);
emit_event(
state,
Event::RoleChanged {
session_id: session_id.0,
client_id: target_client_id.0,
role: SessionRole::Observer,
by_client_id: client_id.0,
},
)?;
Response::Ok(ResponsePayload::RoleRevoked {
session_id: session_id.0,
client_id: target_client_id.0,
role: SessionRole::Observer,
})
}
Request::KillSession { selector } => {
let (session_id, removed_runtime) = {
let mut manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
let Some(session_id) = resolve_session_id(&manager, &selector) else {
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::NotFound,
message: session_not_found_message(&selector),
}));
};
if let Err(response) = ensure_owner_for_session(state, session_id, client_id) {
return Ok(Response::Err(response));
}
if manager.remove_session(&session_id).is_err() {
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::Internal,
message: format!("failed removing session {}", session_id.0),
}));
}
if *selected_session == Some(session_id) {
*selected_session = None;
persist_selected_session(state, client_id, None)?;
}
if *attached_stream_session == Some(session_id) {
*attached_stream_session = None;
}
drop(manager);
let mut runtime_manager = state
.session_runtimes
.lock()
.map_err(|_| anyhow::anyhow!("session runtime manager lock poisoned"))?;
let removed_runtime = match runtime_manager.remove_runtime(session_id) {
Ok(removed) => removed,
Err(error) => {
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::Internal,
message: format!("failed stopping session runtime: {error:#}"),
}));
}
};
(session_id, removed_runtime)
};
if removed_runtime.had_attached_clients {
emit_event(state, Event::ClientDetached { id: session_id.0 })?;
}
shutdown_runtime_handle(removed_runtime).await;
let mut attach_tokens = state
.attach_tokens
.lock()
.map_err(|_| anyhow::anyhow!("attach token manager lock poisoned"))?;
attach_tokens.remove_for_session(session_id);
drop(attach_tokens);
clear_selected_session_for_all(state, session_id)?;
let mut permission_state = state
.permission_state
.lock()
.map_err(|_| anyhow::anyhow!("permission state lock poisoned"))?;
permission_state.remove_session(session_id);
drop(permission_state);
emit_event(state, Event::SessionRemoved { id: session_id.0 })?;
Response::Ok(ResponsePayload::SessionKilled { id: session_id.0 })
}
Request::FollowClient {
target_client_id,
global,
} => {
let leader_client_id = ClientId(target_client_id);
let initial_target_session = {
let mut follow_state = state
.follow_state
.lock()
.map_err(|_| anyhow::anyhow!("follow state lock poisoned"))?;
match follow_state.start_follow(client_id, leader_client_id, global) {
Ok(initial) => initial,
Err(reason) => {
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
message: reason.to_string(),
}));
}
}
};
if global {
let previous_selected = *selected_session;
*selected_session = initial_target_session;
reconcile_selected_session_membership(
state,
client_id,
previous_selected,
*selected_session,
)?;
}
emit_event(
state,
Event::FollowStarted {
follower_client_id: client_id.0,
leader_client_id: leader_client_id.0,
global,
},
)?;
if let Some(session_id) = initial_target_session {
emit_event(
state,
Event::FollowTargetChanged {
follower_client_id: client_id.0,
leader_client_id: leader_client_id.0,
session_id: session_id.0,
},
)?;
}
Response::Ok(ResponsePayload::FollowStarted {
follower_client_id: client_id.0,
leader_client_id: leader_client_id.0,
global,
})
}
Request::Unfollow => {
let removed = {
let mut follow_state = state
.follow_state
.lock()
.map_err(|_| anyhow::anyhow!("follow state lock poisoned"))?;
follow_state.stop_follow(client_id)
};
if removed {
emit_event(
state,
Event::FollowStopped {
follower_client_id: client_id.0,
},
)?;
}
Response::Ok(ResponsePayload::FollowStopped {
follower_client_id: client_id.0,
})
}
Request::Attach { selector } => {
let mut manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
let Some(next_session_id) = resolve_session_id(&manager, &selector) else {
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::NotFound,
message: session_not_found_message(&selector),
}));
};
if let Some(previous_session_id) = selected_session.take()
&& previous_session_id != next_session_id
&& let Some(previous) = manager.get_session_mut(&previous_session_id)
{
previous.remove_client(&client_id);
}
match manager.get_session_mut(&next_session_id) {
Some(session) => {
session.add_client(client_id);
*selected_session = Some(next_session_id);
persist_selected_session(state, client_id, *selected_session)?;
drop(manager);
let mut attach_tokens = state
.attach_tokens
.lock()
.map_err(|_| anyhow::anyhow!("attach token manager lock poisoned"))?;
let grant = attach_tokens.issue(next_session_id);
Response::Ok(ResponsePayload::Attached { grant })
}
None => Response::Err(ErrorResponse {
code: ErrorCode::NotFound,
message: format!("session not found: {}", next_session_id.0),
}),
}
}
Request::AttachOpen {
session_id,
attach_token,
} => {
let session_id = SessionId(session_id);
let manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
if manager.get_session(&session_id).is_none() {
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::NotFound,
message: format!("session not found: {}", session_id.0),
}));
}
drop(manager);
let mut attach_tokens = state
.attach_tokens
.lock()
.map_err(|_| anyhow::anyhow!("attach token manager lock poisoned"))?;
match attach_tokens.consume(session_id, attach_token) {
Ok(()) => {
drop(attach_tokens);
let mut runtime_manager = state
.session_runtimes
.lock()
.map_err(|_| anyhow::anyhow!("session runtime manager lock poisoned"))?;
if let Some(previous_stream_session) = *attached_stream_session
&& previous_stream_session != session_id
{
runtime_manager.end_attach(previous_stream_session, client_id);
emit_event(
state,
Event::ClientDetached {
id: previous_stream_session.0,
},
)?;
}
let can_write = {
let permission_state = state
.permission_state
.lock()
.map_err(|_| anyhow::anyhow!("permission state lock poisoned"))?;
matches!(
permission_state.role_for(session_id, client_id),
SessionRole::Owner | SessionRole::Writer
)
};
match runtime_manager.begin_attach(session_id, client_id) {
Ok(()) => {
*attached_stream_session = Some(session_id);
Response::Ok(ResponsePayload::AttachReady {
session_id: session_id.0,
can_write,
})
}
Err(SessionRuntimeError::NotFound) => Response::Err(ErrorResponse {
code: ErrorCode::NotFound,
message: format!("session runtime not found: {}", session_id.0),
}),
Err(SessionRuntimeError::NotAttached | SessionRuntimeError::Closed) => {
Response::Err(ErrorResponse {
code: ErrorCode::Internal,
message: "failed opening attach stream".to_string(),
})
}
}
}
Err(AttachTokenValidationError::NotFound) => Response::Err(ErrorResponse {
code: ErrorCode::NotFound,
message: "attach token not found".to_string(),
}),
Err(AttachTokenValidationError::Expired) => Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
message: "attach token expired".to_string(),
}),
Err(AttachTokenValidationError::SessionMismatch) => Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
message: "attach token does not match requested session".to_string(),
}),
}
}
Request::AttachInput { session_id, data } => {
let session_id = SessionId(session_id);
let role = {
let permission_state = state
.permission_state
.lock()
.map_err(|_| anyhow::anyhow!("permission state lock poisoned"))?;
permission_state.role_for(session_id, client_id)
};
if role == SessionRole::Observer {
return Ok(Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
message: "attach input denied: observer role is read-only".to_string(),
}));
}
let mut runtime_manager = state
.session_runtimes
.lock()
.map_err(|_| anyhow::anyhow!("session runtime manager lock poisoned"))?;
match runtime_manager.write_input(session_id, client_id, data) {
Ok(bytes) => Response::Ok(ResponsePayload::AttachInputAccepted { bytes }),
Err(SessionRuntimeError::NotFound) => Response::Err(ErrorResponse {
code: ErrorCode::NotFound,
message: format!("session runtime not found: {}", session_id.0),
}),
Err(SessionRuntimeError::NotAttached) => Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
message: "client is not attached to session runtime".to_string(),
}),
Err(SessionRuntimeError::Closed) => Response::Err(ErrorResponse {
code: ErrorCode::Internal,
message: "failed writing attach input".to_string(),
}),
}
}
Request::AttachOutput {
session_id,
max_bytes,
} => {
let mut runtime_manager = state
.session_runtimes
.lock()
.map_err(|_| anyhow::anyhow!("session runtime manager lock poisoned"))?;
match runtime_manager.read_output(SessionId(session_id), client_id, max_bytes) {
Ok(data) => Response::Ok(ResponsePayload::AttachOutput { data }),
Err(SessionRuntimeError::NotFound) => Response::Err(ErrorResponse {
code: ErrorCode::NotFound,
message: format!("session runtime not found: {session_id}"),
}),
Err(SessionRuntimeError::NotAttached) => Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
message: "client is not attached to session runtime".to_string(),
}),
Err(SessionRuntimeError::Closed) => Response::Err(ErrorResponse {
code: ErrorCode::Internal,
message: "failed reading attach output".to_string(),
}),
}
}
Request::Detach => {
let mut manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
let previous_selected_session = selected_session.take();
if let Some(current_selected_session) = previous_selected_session
&& let Some(session) = manager.get_session_mut(¤t_selected_session)
{
session.remove_client(&client_id);
}
drop(manager);
if let Some(current_stream_session) = attached_stream_session.take() {
let mut runtime_manager = state
.session_runtimes
.lock()
.map_err(|_| anyhow::anyhow!("session runtime manager lock poisoned"))?;
runtime_manager.end_attach(current_stream_session, client_id);
emit_event(
state,
Event::ClientDetached {
id: current_stream_session.0,
},
)?;
}
persist_selected_session(state, client_id, None)?;
Response::Ok(ResponsePayload::Detached)
}
Request::SubscribeEvents => {
let mut hub = state
.event_hub
.lock()
.map_err(|_| anyhow::anyhow!("event hub lock poisoned"))?;
hub.subscribe(client_id);
Response::Ok(ResponsePayload::EventsSubscribed)
}
Request::PollEvents { max_events } => {
let mut hub = state
.event_hub
.lock()
.map_err(|_| anyhow::anyhow!("event hub lock poisoned"))?;
match hub.poll(client_id, max_events) {
Some(events) => Response::Ok(ResponsePayload::EventBatch { events }),
None => Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
message: "event subscription not found for client".to_string(),
}),
}
}
};
if let Response::Ok(ResponsePayload::SessionCreated { id, name }) = &response {
emit_event(
state,
Event::SessionCreated {
id: *id,
name: name.clone(),
},
)?;
}
if let Response::Ok(ResponsePayload::WindowCreated {
id,
session_id,
name,
}) = &response
{
emit_event(
state,
Event::WindowCreated {
id: *id,
session_id: *session_id,
name: name.clone(),
},
)?;
}
if let Response::Ok(ResponsePayload::AttachReady { session_id, .. }) = &response {
emit_event(state, Event::ClientAttached { id: *session_id })?;
}
if response_requires_snapshot(&response) {
mark_snapshot_dirty(state)?;
maybe_flush_snapshot(state, false)?;
}
Ok(response)
}
fn request_requires_exclusive(request: &Request) -> bool {
matches!(
request,
Request::ServerSave
| Request::ServerStop
| Request::ServerRestoreApply
| Request::NewSession { .. }
| Request::NewWindow { .. }
| Request::GrantRole { .. }
| Request::RevokeRole { .. }
| Request::KillSession { .. }
| Request::KillWindow { .. }
| Request::SwitchWindow { .. }
| Request::FollowClient { .. }
| Request::Unfollow
| Request::Attach { .. }
| Request::AttachOpen { .. }
| Request::AttachInput { .. }
| Request::Detach
)
}
fn response_requires_snapshot(response: &Response) -> bool {
matches!(
response,
Response::Ok(
ResponsePayload::SessionCreated { .. }
| ResponsePayload::WindowCreated { .. }
| ResponsePayload::WindowKilled { .. }
| ResponsePayload::WindowSwitched { .. }
| ResponsePayload::SessionKilled { .. }
| ResponsePayload::RoleGranted { .. }
| ResponsePayload::RoleRevoked { .. }
| ResponsePayload::FollowStarted { .. }
| ResponsePayload::FollowStopped { .. }
| ResponsePayload::Attached { .. }
| ResponsePayload::Detached
)
)
}
fn detach_client_state_on_disconnect(
state: &Arc<ServerState>,
client_id: ClientId,
selected_session: &mut Option<SessionId>,
attached_stream_session: &mut Option<SessionId>,
) -> Result<()> {
let previous_selected = selected_session.take();
let previous_stream = attached_stream_session.take();
if previous_selected.is_none() && previous_stream.is_none() {
return Ok(());
}
let mut manager = state
.session_manager
.lock()
.map_err(|_| anyhow::anyhow!("session manager lock poisoned"))?;
if let Some(session_id) = previous_selected
&& let Some(session) = manager.get_session_mut(&session_id)
{
session.remove_client(&client_id);
}
drop(manager);
if let Some(stream_session_id) = previous_stream {
let mut runtime_manager = state
.session_runtimes
.lock()
.map_err(|_| anyhow::anyhow!("session runtime manager lock poisoned"))?;
runtime_manager.end_attach(stream_session_id, client_id);
drop(runtime_manager);
emit_event(
state,
Event::ClientDetached {
id: stream_session_id.0,
},
)?;
}
Ok(())
}
fn resolve_session_id(manager: &SessionManager, selector: &SessionSelector) -> Option<SessionId> {
match selector {
SessionSelector::ById(raw_id) => {
let session_id = SessionId(*raw_id);
manager.get_session(&session_id).map(|_| session_id)
}
SessionSelector::ByName(value) => {
let sessions = manager.list_sessions();
if let Some(session) = sessions
.iter()
.find(|session| session.name.as_deref() == Some(value.as_str()))
{
return Some(session.id);
}
if let Some(session) = sessions
.iter()
.find(|session| session.id.to_string().eq_ignore_ascii_case(value))
{
return Some(session.id);
}
let value_lower = value.to_ascii_lowercase();
sessions
.iter()
.find(|session| {
session
.id
.to_string()
.to_ascii_lowercase()
.starts_with(&value_lower)
})
.map(|session| session.id)
}
}
}
fn session_not_found_message(selector: &SessionSelector) -> String {
format!(
"session not found for selector {selector:?} (lookup order: exact name -> exact UUID -> UUID prefix)"
)
}
fn resolve_window_request_session_id(
manager: &SessionManager,
selector: &Option<SessionSelector>,
selected_session: &Option<SessionId>,
) -> std::result::Result<SessionId, ErrorResponse> {
if let Some(selector) = selector {
return resolve_session_id(manager, selector).ok_or_else(|| ErrorResponse {
code: ErrorCode::NotFound,
message: session_not_found_message(selector),
});
}
if let Some(selected) = selected_session {
return Ok(*selected);
}
let sessions = manager.list_sessions();
if sessions.len() == 1 {
return Ok(sessions[0].id);
}
Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
message: "session selector is required when no attached session is active".to_string(),
})
}
fn ensure_owner_for_session(
state: &Arc<ServerState>,
session_id: SessionId,
client_id: ClientId,
) -> std::result::Result<(), ErrorResponse> {
let permission_state = state.permission_state.lock().map_err(|_| ErrorResponse {
code: ErrorCode::Internal,
message: "permission state lock poisoned".to_string(),
})?;
if permission_state.is_owner(session_id, client_id) {
Ok(())
} else {
Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
message: "owner role required for this operation".to_string(),
})
}
}
fn window_selection_from_selector(selector: WindowSelector) -> WindowSelection {
match selector {
WindowSelector::ById(id) => WindowSelection::Id(WindowId(id)),
WindowSelector::ByName(name) => WindowSelection::Name(name),
WindowSelector::Active => WindowSelection::Active,
}
}
fn parse_request(envelope: &Envelope) -> Result<Request> {
if envelope.kind != EnvelopeKind::Request {
anyhow::bail!("expected request envelope kind")
}
decode(&envelope.payload).context("failed to decode request payload")
}
async fn send_ok(
stream: &mut LocalIpcStream,
request_id: u64,
payload: ResponsePayload,
) -> Result<()> {
let response = Response::Ok(payload);
send_response(stream, request_id, response).await
}
async fn send_error(
stream: &mut LocalIpcStream,
request_id: u64,
code: ErrorCode,
message: String,
) -> Result<()> {
let response = Response::Err(ErrorResponse { code, message });
send_response(stream, request_id, response).await
}
async fn send_response(
stream: &mut LocalIpcStream,
request_id: u64,
response: Response,
) -> Result<()> {
let payload = encode(&response).context("failed encoding response payload")?;
let envelope = Envelope::new(request_id, EnvelopeKind::Response, payload);
stream
.send_envelope(&envelope)
.await
.context("failed sending response envelope")
}
#[cfg(test)]
mod tests {
use super::{BmuxServer, resolve_session_id};
use bmux_config::ConfigPaths;
use bmux_ipc::transport::LocalIpcStream;
use bmux_ipc::{
Envelope, EnvelopeKind, ErrorCode, ErrorResponse, Event, IpcEndpoint, ProtocolVersion,
Request, Response, ResponsePayload, SessionRole, SessionSelector, WindowSelector, decode,
encode,
};
use bmux_session::{SessionId, SessionManager};
use std::path::Path;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::time::sleep;
use uuid::Uuid;
const TEST_STARTUP_TIMEOUT: Duration = Duration::from_secs(5);
#[cfg(unix)]
async fn spawn_server_with_ready(
server: BmuxServer,
) -> tokio::task::JoinHandle<anyhow::Result<()>> {
let (ready_tx, ready_rx) = oneshot::channel();
let server_clone = server.clone();
let server_task = tokio::spawn(async move { server_clone.run_with_ready(ready_tx).await });
match tokio::time::timeout(TEST_STARTUP_TIMEOUT, ready_rx).await {
Ok(Ok(Ok(()))) => {}
Ok(Ok(Err(error))) => panic!("server failed to start: {error}"),
Ok(Err(_)) => panic!("server ready channel dropped before startup"),
Err(_) => panic!("timed out waiting for server startup"),
}
server_task
}
#[test]
fn resolve_session_id_prefers_exact_name_before_uuid_fallbacks() {
let mut manager = SessionManager::new();
let prefix_source = manager
.create_session(None)
.expect("session should be created");
let selector_value = prefix_source.to_string()[..2].to_string();
let named = manager
.create_session(Some(selector_value.clone()))
.expect("named session should be created");
let resolved = resolve_session_id(&manager, &SessionSelector::ByName(selector_value));
assert_eq!(resolved, Some(named));
}
#[test]
fn resolve_session_id_matches_exact_uuid_string() {
let mut manager = SessionManager::new();
let session_id = manager
.create_session(None)
.expect("session should be created");
let resolved =
resolve_session_id(&manager, &SessionSelector::ByName(session_id.to_string()));
assert_eq!(resolved, Some(session_id));
}
#[test]
fn resolve_session_id_allows_short_unique_prefixes_without_minimum() {
let mut manager = SessionManager::new();
let session_id = manager
.create_session(None)
.expect("session should be created");
let prefix = session_id.to_string()[..1].to_string();
let resolved = resolve_session_id(&manager, &SessionSelector::ByName(prefix));
assert_eq!(resolved, Some(session_id));
}
#[test]
fn resolve_session_id_picks_first_match_for_ambiguous_prefix() {
let mut manager = SessionManager::new();
let mut selector = None;
for _ in 0..512 {
let _ = manager
.create_session(None)
.expect("session should be created");
let sessions = manager.list_sessions();
for nibble in "0123456789abcdef".chars() {
let matches = sessions
.iter()
.filter(|session| session.id.to_string().starts_with(nibble))
.collect::<Vec<_>>();
if matches.len() >= 2 {
selector = Some(nibble.to_string());
break;
}
}
if selector.is_some() {
break;
}
}
let selector = selector.expect("expected to find an ambiguous prefix");
let expected_first = manager
.list_sessions()
.into_iter()
.find(|session| session.id.to_string().starts_with(&selector))
.map(|session| session.id)
.expect("expected at least one matching session");
let resolved = resolve_session_id(&manager, &SessionSelector::ByName(selector));
assert_eq!(resolved, Some(expected_first));
}
#[cfg(unix)]
#[tokio::test]
async fn handshake_accepts_current_protocol_version() {
let socket_path = std::env::temp_dir().join(format!("bmux-server-{}.sock", Uuid::new_v4()));
let endpoint = IpcEndpoint::unix_socket(&socket_path);
let server = BmuxServer::new(endpoint.clone());
let server_task = spawn_server_with_ready(server.clone()).await;
let mut client = LocalIpcStream::connect(&endpoint)
.await
.expect("client should connect");
let hello_payload = encode(&Request::Hello {
protocol_version: ProtocolVersion::current(),
client_name: "test-client".to_string(),
})
.expect("hello should encode");
let hello = Envelope::new(1, EnvelopeKind::Request, hello_payload);
client
.send_envelope(&hello)
.await
.expect("hello send should succeed");
let reply = client
.recv_envelope()
.await
.expect("hello reply should be received");
let response: Response = decode(&reply.payload).expect("response should decode");
assert_eq!(reply.request_id, 1);
assert!(matches!(
response,
Response::Ok(ResponsePayload::ServerStatus { running: true, .. })
));
server.request_shutdown();
server_task
.await
.expect("server task should join")
.expect("server should shut down cleanly");
if socket_path.exists() {
std::fs::remove_file(&socket_path).expect("socket cleanup should succeed");
}
}
#[cfg(unix)]
#[tokio::test]
async fn handshake_rejects_version_mismatch() {
let socket_path = std::env::temp_dir().join(format!("bmux-server-{}.sock", Uuid::new_v4()));
let endpoint = IpcEndpoint::unix_socket(&socket_path);
let server = BmuxServer::new(endpoint.clone());
let server_task = spawn_server_with_ready(server.clone()).await;
let mut client = LocalIpcStream::connect(&endpoint)
.await
.expect("client should connect");
let hello_payload = encode(&Request::Hello {
protocol_version: ProtocolVersion(99),
client_name: "test-client".to_string(),
})
.expect("hello should encode");
let hello = Envelope::new(77, EnvelopeKind::Request, hello_payload);
client
.send_envelope(&hello)
.await
.expect("hello send should succeed");
let reply = client
.recv_envelope()
.await
.expect("hello reply should be received");
let response: Response = decode(&reply.payload).expect("response should decode");
assert_eq!(reply.request_id, 77);
assert!(matches!(
response,
Response::Err(ErrorResponse {
code: ErrorCode::VersionMismatch,
..
})
));
server.request_shutdown();
server_task
.await
.expect("server task should join")
.expect("server should shut down cleanly");
if socket_path.exists() {
std::fs::remove_file(&socket_path).expect("socket cleanup should succeed");
}
}
#[cfg(unix)]
#[tokio::test]
async fn supports_new_session_and_list_sessions() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut client = connect_and_handshake(&endpoint).await;
let created = send_request(
&mut client,
10,
Request::NewSession {
name: Some("dev".to_string()),
},
)
.await;
let created_id = match created {
Response::Ok(ResponsePayload::SessionCreated { id, name }) => {
assert_eq!(name.as_deref(), Some("dev"));
id
}
other => panic!("unexpected new-session response: {other:?}"),
};
{
let runtime_manager = server
.state
.session_runtimes
.lock()
.expect("runtime manager lock should succeed");
assert_eq!(runtime_manager.runtime_count(), 1);
assert!(runtime_manager.has_runtime(SessionId(created_id)));
}
let listed = send_request(&mut client, 11, Request::ListSessions).await;
match listed {
Response::Ok(ResponsePayload::SessionList { sessions }) => {
assert_eq!(sessions.len(), 1);
assert_eq!(sessions[0].id, created_id);
assert_eq!(sessions[0].name.as_deref(), Some("dev"));
}
other => panic!("unexpected list response: {other:?}"),
}
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn list_clients_reports_connected_clients() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut client_a = connect_and_handshake(&endpoint).await;
let mut client_b = connect_and_handshake(&endpoint).await;
let listed = send_request(&mut client_a, 110, Request::ListClients).await;
let clients = match listed {
Response::Ok(ResponsePayload::ClientList { clients }) => clients,
other => panic!("unexpected list clients response: {other:?}"),
};
assert_eq!(clients.len(), 2);
assert!(
clients
.iter()
.all(|client| client.following_client_id.is_none())
);
assert!(clients.iter().all(|client| !client.following_global));
let listed_from_b = send_request(&mut client_b, 111, Request::ListClients).await;
let clients_from_b = match listed_from_b {
Response::Ok(ResponsePayload::ClientList { clients }) => clients,
other => panic!("unexpected list clients response: {other:?}"),
};
assert_eq!(clients_from_b.len(), 2);
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn list_clients_reports_follow_relationships() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut leader = connect_and_handshake(&endpoint).await;
let mut follower = connect_and_handshake(&endpoint).await;
let session_id = match send_request(
&mut leader,
120,
Request::NewSession {
name: Some("clients-follow".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create session response: {other:?}"),
};
let _ = send_request(
&mut leader,
121,
Request::Attach {
selector: SessionSelector::ById(session_id),
},
)
.await;
let leader_client_id = match send_request(&mut leader, 122, Request::ListClients).await {
Response::Ok(ResponsePayload::ClientList { clients }) => clients
.into_iter()
.find(|client| client.selected_session_id == Some(session_id))
.map(|client| client.id)
.expect("leader client should be listed"),
other => panic!("unexpected list clients response: {other:?}"),
};
let followed = send_request(
&mut follower,
123,
Request::FollowClient {
target_client_id: leader_client_id,
global: true,
},
)
.await;
assert!(matches!(
followed,
Response::Ok(ResponsePayload::FollowStarted { global: true, .. })
));
let listed = send_request(&mut follower, 124, Request::ListClients).await;
let clients = match listed {
Response::Ok(ResponsePayload::ClientList { clients }) => clients,
other => panic!("unexpected list clients response: {other:?}"),
};
let follower_entry = clients
.iter()
.find(|client| client.id != leader_client_id)
.expect("follower client should be listed");
assert_eq!(follower_entry.following_client_id, Some(leader_client_id));
assert!(follower_entry.following_global);
assert_eq!(follower_entry.selected_session_id, Some(session_id));
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn non_owner_cannot_mutate_session_or_windows() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut owner = connect_and_handshake(&endpoint).await;
let mut observer = connect_and_handshake(&endpoint).await;
let session_id = match send_request(
&mut owner,
130,
Request::NewSession {
name: Some("owner-only".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create session response: {other:?}"),
};
let switch_attempt = send_request(
&mut observer,
131,
Request::SwitchWindow {
session: Some(SessionSelector::ById(session_id)),
target: WindowSelector::Active,
},
)
.await;
assert!(matches!(
switch_attempt,
Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
..
})
));
let kill_attempt = send_request(
&mut observer,
132,
Request::KillSession {
selector: SessionSelector::ById(session_id),
},
)
.await;
assert!(matches!(
kill_attempt,
Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
..
})
));
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn grant_writer_role_allows_attach_input() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut owner = connect_and_handshake(&endpoint).await;
let mut writer = connect_and_handshake(&endpoint).await;
let session_id = match send_request(
&mut owner,
140,
Request::NewSession {
name: Some("writer-role".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create session response: {other:?}"),
};
let writer_client_id = match send_request(&mut writer, 141, Request::WhoAmI).await {
Response::Ok(ResponsePayload::ClientIdentity { id }) => id,
other => panic!("unexpected whoami response: {other:?}"),
};
let granted = send_request(
&mut owner,
142,
Request::GrantRole {
session: SessionSelector::ById(session_id),
client_id: writer_client_id,
role: SessionRole::Writer,
},
)
.await;
assert!(matches!(
granted,
Response::Ok(ResponsePayload::RoleGranted {
role: SessionRole::Writer,
..
})
));
let writer_grant = match send_request(
&mut writer,
143,
Request::Attach {
selector: SessionSelector::ById(session_id),
},
)
.await
{
Response::Ok(ResponsePayload::Attached { grant }) => grant,
other => panic!("unexpected attach response: {other:?}"),
};
let writer_open = send_request(
&mut writer,
144,
Request::AttachOpen {
session_id,
attach_token: writer_grant.attach_token,
},
)
.await;
assert!(matches!(
writer_open,
Response::Ok(ResponsePayload::AttachReady {
session_id: opened_session,
can_write: true,
}) if opened_session == session_id
));
let writer_input = send_request(
&mut writer,
145,
Request::AttachInput {
session_id,
data: b"printf 'writer-role-ok\\n'\n".to_vec(),
},
)
.await;
assert!(matches!(
writer_input,
Response::Ok(ResponsePayload::AttachInputAccepted { bytes }) if bytes > 0
));
let listed_permissions = send_request(
&mut owner,
146,
Request::ListPermissions {
session: SessionSelector::ById(session_id),
},
)
.await;
match listed_permissions {
Response::Ok(ResponsePayload::PermissionsList { permissions, .. }) => {
assert!(permissions.iter().any(|entry| {
entry.client_id == writer_client_id && entry.role == SessionRole::Writer
}));
}
other => panic!("unexpected permissions list response: {other:?}"),
}
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn owner_transfer_allows_new_owner_mutations() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut owner = connect_and_handshake(&endpoint).await;
let mut successor = connect_and_handshake(&endpoint).await;
let session_id = match send_request(
&mut owner,
170,
Request::NewSession {
name: Some("owner-transfer".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create response: {other:?}"),
};
let successor_id = match send_request(&mut successor, 171, Request::WhoAmI).await {
Response::Ok(ResponsePayload::ClientIdentity { id }) => id,
other => panic!("unexpected whoami response: {other:?}"),
};
let transferred = send_request(
&mut owner,
172,
Request::GrantRole {
session: SessionSelector::ById(session_id),
client_id: successor_id,
role: SessionRole::Owner,
},
)
.await;
assert!(matches!(
transferred,
Response::Ok(ResponsePayload::RoleGranted {
role: SessionRole::Owner,
..
})
));
let old_owner_mutation = send_request(
&mut owner,
173,
Request::NewWindow {
session: Some(SessionSelector::ById(session_id)),
name: Some("should-fail".to_string()),
},
)
.await;
assert!(matches!(
old_owner_mutation,
Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
..
})
));
let new_owner_mutation = send_request(
&mut successor,
174,
Request::NewWindow {
session: Some(SessionSelector::ById(session_id)),
name: Some("allowed".to_string()),
},
)
.await;
assert!(matches!(
new_owner_mutation,
Response::Ok(ResponsePayload::WindowCreated {
session_id: created_session,
..
}) if created_session == session_id
));
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn owner_disconnect_promotes_writer() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut owner = connect_and_handshake(&endpoint).await;
let mut writer = connect_and_handshake(&endpoint).await;
let session_id = match send_request(
&mut owner,
180,
Request::NewSession {
name: Some("owner-disconnect".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create response: {other:?}"),
};
let writer_id = match send_request(&mut writer, 181, Request::WhoAmI).await {
Response::Ok(ResponsePayload::ClientIdentity { id }) => id,
other => panic!("unexpected whoami response: {other:?}"),
};
let granted = send_request(
&mut owner,
182,
Request::GrantRole {
session: SessionSelector::ById(session_id),
client_id: writer_id,
role: SessionRole::Writer,
},
)
.await;
assert!(matches!(
granted,
Response::Ok(ResponsePayload::RoleGranted {
role: SessionRole::Writer,
..
})
));
drop(owner);
sleep(Duration::from_millis(50)).await;
let listed = send_request(
&mut writer,
183,
Request::ListPermissions {
session: SessionSelector::ById(session_id),
},
)
.await;
match listed {
Response::Ok(ResponsePayload::PermissionsList { permissions, .. }) => {
assert!(permissions.iter().any(|entry| {
entry.client_id == writer_id && entry.role == SessionRole::Owner
}));
}
other => panic!("unexpected permissions list response: {other:?}"),
}
let mutation = send_request(
&mut writer,
184,
Request::NewWindow {
session: Some(SessionSelector::ById(session_id)),
name: Some("post-promotion".to_string()),
},
)
.await;
assert!(matches!(
mutation,
Response::Ok(ResponsePayload::WindowCreated {
session_id: created_session,
..
}) if created_session == session_id
));
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn rapid_grant_revoke_toggles_attach_input_permissions() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut owner = connect_and_handshake(&endpoint).await;
let mut member = connect_and_handshake(&endpoint).await;
let session_id = match send_request(
&mut owner,
190,
Request::NewSession {
name: Some("rapid-role-toggle".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create response: {other:?}"),
};
let member_id = match send_request(&mut member, 191, Request::WhoAmI).await {
Response::Ok(ResponsePayload::ClientIdentity { id }) => id,
other => panic!("unexpected whoami response: {other:?}"),
};
let grant = match send_request(
&mut member,
192,
Request::Attach {
selector: SessionSelector::ById(session_id),
},
)
.await
{
Response::Ok(ResponsePayload::Attached { grant }) => grant,
other => panic!("unexpected attach response: {other:?}"),
};
let opened = send_request(
&mut member,
193,
Request::AttachOpen {
session_id,
attach_token: grant.attach_token,
},
)
.await;
assert!(matches!(
opened,
Response::Ok(ResponsePayload::AttachReady {
session_id: opened_session,
can_write: false,
}) if opened_session == session_id
));
for idx in 0..3u64 {
let grant_writer = send_request(
&mut owner,
194 + idx * 3,
Request::GrantRole {
session: SessionSelector::ById(session_id),
client_id: member_id,
role: SessionRole::Writer,
},
)
.await;
assert!(matches!(
grant_writer,
Response::Ok(ResponsePayload::RoleGranted {
role: SessionRole::Writer,
..
})
));
let writer_input = send_request(
&mut member,
195 + idx * 3,
Request::AttachInput {
session_id,
data: b"printf 'writer-allowed\\n'\n".to_vec(),
},
)
.await;
assert!(matches!(
writer_input,
Response::Ok(ResponsePayload::AttachInputAccepted { bytes }) if bytes > 0
));
let revoke = send_request(
&mut owner,
196 + idx * 3,
Request::RevokeRole {
session: SessionSelector::ById(session_id),
client_id: member_id,
},
)
.await;
assert!(matches!(
revoke,
Response::Ok(ResponsePayload::RoleRevoked {
role: SessionRole::Observer,
..
})
));
let denied_input = send_request(
&mut member,
197 + idx * 3,
Request::AttachInput {
session_id,
data: b"printf 'writer-denied\\n'\n".to_vec(),
},
)
.await;
assert!(matches!(
denied_input,
Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
..
})
));
}
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn owner_transfer_during_follow_attach_preserves_control_rules() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut owner = connect_and_handshake(&endpoint).await;
let mut successor = connect_and_handshake(&endpoint).await;
let session_id = match send_request(
&mut owner,
230,
Request::NewSession {
name: Some("follow-transfer".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create response: {other:?}"),
};
let successor_id = match send_request(&mut successor, 231, Request::WhoAmI).await {
Response::Ok(ResponsePayload::ClientIdentity { id }) => id,
other => panic!("unexpected whoami response: {other:?}"),
};
let follow_started = send_request(
&mut successor,
232,
Request::FollowClient {
target_client_id: match send_request(&mut owner, 233, Request::WhoAmI).await {
Response::Ok(ResponsePayload::ClientIdentity { id }) => id,
other => panic!("unexpected owner whoami response: {other:?}"),
},
global: true,
},
)
.await;
assert!(matches!(
follow_started,
Response::Ok(ResponsePayload::FollowStarted { global: true, .. })
));
let owner_attach = send_request(
&mut owner,
234,
Request::Attach {
selector: SessionSelector::ById(session_id),
},
)
.await;
assert!(matches!(
owner_attach,
Response::Ok(ResponsePayload::Attached { .. })
));
let successor_grant = match send_request(
&mut successor,
235,
Request::Attach {
selector: SessionSelector::ById(session_id),
},
)
.await
{
Response::Ok(ResponsePayload::Attached { grant }) => grant,
other => panic!("unexpected successor attach response: {other:?}"),
};
let successor_open = send_request(
&mut successor,
236,
Request::AttachOpen {
session_id,
attach_token: successor_grant.attach_token,
},
)
.await;
assert!(matches!(
successor_open,
Response::Ok(ResponsePayload::AttachReady {
session_id: opened_session,
can_write: false,
}) if opened_session == session_id
));
let transfer = send_request(
&mut owner,
237,
Request::GrantRole {
session: SessionSelector::ById(session_id),
client_id: successor_id,
role: SessionRole::Owner,
},
)
.await;
assert!(matches!(
transfer,
Response::Ok(ResponsePayload::RoleGranted {
role: SessionRole::Owner,
..
})
));
let old_owner_mutation = send_request(
&mut owner,
238,
Request::NewWindow {
session: Some(SessionSelector::ById(session_id)),
name: Some("old-owner-blocked".to_string()),
},
)
.await;
assert!(matches!(
old_owner_mutation,
Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
..
})
));
let new_owner_mutation = send_request(
&mut successor,
239,
Request::NewWindow {
session: Some(SessionSelector::ById(session_id)),
name: Some("new-owner-allowed".to_string()),
},
)
.await;
assert!(matches!(
new_owner_mutation,
Response::Ok(ResponsePayload::WindowCreated {
session_id: created_session,
..
}) if created_session == session_id
));
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn observer_remains_read_only_across_follow_target_changes() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut leader = connect_and_handshake(&endpoint).await;
let mut observer = connect_and_handshake(&endpoint).await;
let alpha = match send_request(
&mut leader,
260,
Request::NewSession {
name: Some("observer-alpha".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected alpha create response: {other:?}"),
};
let beta = match send_request(
&mut leader,
261,
Request::NewSession {
name: Some("observer-beta".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected beta create response: {other:?}"),
};
let leader_id = match send_request(&mut leader, 262, Request::WhoAmI).await {
Response::Ok(ResponsePayload::ClientIdentity { id }) => id,
other => panic!("unexpected leader whoami response: {other:?}"),
};
let leader_attach_alpha = send_request(
&mut leader,
263,
Request::Attach {
selector: SessionSelector::ById(alpha),
},
)
.await;
assert!(matches!(
leader_attach_alpha,
Response::Ok(ResponsePayload::Attached { .. })
));
let observer_follow = send_request(
&mut observer,
264,
Request::FollowClient {
target_client_id: leader_id,
global: true,
},
)
.await;
assert!(matches!(
observer_follow,
Response::Ok(ResponsePayload::FollowStarted { global: true, .. })
));
let observer_alpha_grant = match send_request(
&mut observer,
265,
Request::Attach {
selector: SessionSelector::ById(alpha),
},
)
.await
{
Response::Ok(ResponsePayload::Attached { grant }) => grant,
other => panic!("unexpected observer attach response: {other:?}"),
};
let observer_alpha_open = send_request(
&mut observer,
266,
Request::AttachOpen {
session_id: alpha,
attach_token: observer_alpha_grant.attach_token,
},
)
.await;
assert!(matches!(
observer_alpha_open,
Response::Ok(ResponsePayload::AttachReady {
session_id: opened_session,
can_write: false,
}) if opened_session == alpha
));
let observer_alpha_input = send_request(
&mut observer,
267,
Request::AttachInput {
session_id: alpha,
data: b"printf 'observer-alpha'\n".to_vec(),
},
)
.await;
assert!(matches!(
observer_alpha_input,
Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
..
})
));
let leader_attach_beta = send_request(
&mut leader,
268,
Request::Attach {
selector: SessionSelector::ById(beta),
},
)
.await;
assert!(matches!(
leader_attach_beta,
Response::Ok(ResponsePayload::Attached { .. })
));
let observer_beta_grant = match send_request(
&mut observer,
269,
Request::Attach {
selector: SessionSelector::ById(beta),
},
)
.await
{
Response::Ok(ResponsePayload::Attached { grant }) => grant,
other => panic!("unexpected observer beta attach response: {other:?}"),
};
let observer_beta_open = send_request(
&mut observer,
270,
Request::AttachOpen {
session_id: beta,
attach_token: observer_beta_grant.attach_token,
},
)
.await;
assert!(matches!(
observer_beta_open,
Response::Ok(ResponsePayload::AttachReady {
session_id: opened_session,
can_write: false,
}) if opened_session == beta
));
let observer_beta_input = send_request(
&mut observer,
271,
Request::AttachInput {
session_id: beta,
data: b"printf 'observer-beta'\n".to_vec(),
},
)
.await;
assert!(matches!(
observer_beta_input,
Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
..
})
));
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn window_lifecycle_supports_create_list_switch_and_kill() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut client = connect_and_handshake(&endpoint).await;
let created = send_request(
&mut client,
12,
Request::NewSession {
name: Some("windows".to_string()),
},
)
.await;
let session_id = match created {
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create session response: {other:?}"),
};
let created_window = send_request(
&mut client,
13,
Request::NewWindow {
session: Some(SessionSelector::ById(session_id)),
name: Some("logs".to_string()),
},
)
.await;
let logs_window_id = match created_window {
Response::Ok(ResponsePayload::WindowCreated {
id,
session_id: sid,
..
}) => {
assert_eq!(sid, session_id);
id
}
other => panic!("unexpected create window response: {other:?}"),
};
let listed = send_request(
&mut client,
14,
Request::ListWindows {
session: Some(SessionSelector::ById(session_id)),
},
)
.await;
let windows = match listed {
Response::Ok(ResponsePayload::WindowList { windows }) => windows,
other => panic!("unexpected list windows response: {other:?}"),
};
assert_eq!(windows.len(), 2);
let switched = send_request(
&mut client,
15,
Request::SwitchWindow {
session: Some(SessionSelector::ById(session_id)),
target: WindowSelector::ById(logs_window_id),
},
)
.await;
assert_eq!(
switched,
Response::Ok(ResponsePayload::WindowSwitched {
id: logs_window_id,
session_id,
})
);
let killed = send_request(
&mut client,
16,
Request::KillWindow {
session: Some(SessionSelector::ById(session_id)),
target: WindowSelector::ById(logs_window_id),
},
)
.await;
assert_eq!(
killed,
Response::Ok(ResponsePayload::WindowKilled {
id: logs_window_id,
session_id,
})
);
{
let runtime_manager = server
.state
.session_runtimes
.lock()
.expect("runtime manager lock should succeed");
assert_eq!(runtime_manager.window_count(SessionId(session_id)), 1);
}
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn switch_window_accepts_uuid_prefix_via_name_selector() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut client = connect_and_handshake(&endpoint).await;
let created = send_request(
&mut client,
50,
Request::NewSession {
name: Some("window-prefix".to_string()),
},
)
.await;
let session_id = match created {
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create session response: {other:?}"),
};
let created_window = send_request(
&mut client,
51,
Request::NewWindow {
session: Some(SessionSelector::ById(session_id)),
name: Some("logs".to_string()),
},
)
.await;
let logs_window_id = match created_window {
Response::Ok(ResponsePayload::WindowCreated { id, .. }) => id,
other => panic!("unexpected create window response: {other:?}"),
};
let prefix = logs_window_id.to_string()[..2].to_string();
let switched = send_request(
&mut client,
52,
Request::SwitchWindow {
session: Some(SessionSelector::ById(session_id)),
target: WindowSelector::ByName(prefix),
},
)
.await;
assert_eq!(
switched,
Response::Ok(ResponsePayload::WindowSwitched {
id: logs_window_id,
session_id,
})
);
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn switch_window_name_selector_prefers_exact_name() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut client = connect_and_handshake(&endpoint).await;
let created = send_request(
&mut client,
53,
Request::NewSession {
name: Some("window-name-priority".to_string()),
},
)
.await;
let session_id = match created {
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create session response: {other:?}"),
};
let source_window = send_request(
&mut client,
54,
Request::NewWindow {
session: Some(SessionSelector::ById(session_id)),
name: Some("source".to_string()),
},
)
.await;
let source_window_id = match source_window {
Response::Ok(ResponsePayload::WindowCreated { id, .. }) => id,
other => panic!("unexpected source window response: {other:?}"),
};
let selector_value = source_window_id.to_string()[..2].to_string();
let named_window = send_request(
&mut client,
55,
Request::NewWindow {
session: Some(SessionSelector::ById(session_id)),
name: Some(selector_value.clone()),
},
)
.await;
let named_window_id = match named_window {
Response::Ok(ResponsePayload::WindowCreated { id, .. }) => id,
other => panic!("unexpected named window response: {other:?}"),
};
let switched = send_request(
&mut client,
56,
Request::SwitchWindow {
session: Some(SessionSelector::ById(session_id)),
target: WindowSelector::ByName(selector_value),
},
)
.await;
assert_eq!(
switched,
Response::Ok(ResponsePayload::WindowSwitched {
id: named_window_id,
session_id,
})
);
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn switch_window_empty_prefix_picks_first_match_deterministically() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut client = connect_and_handshake(&endpoint).await;
let created = send_request(
&mut client,
57,
Request::NewSession {
name: Some("window-prefix-order".to_string()),
},
)
.await;
let session_id = match created {
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create session response: {other:?}"),
};
let _ = send_request(
&mut client,
58,
Request::NewWindow {
session: Some(SessionSelector::ById(session_id)),
name: Some("alpha".to_string()),
},
)
.await;
let _ = send_request(
&mut client,
59,
Request::NewWindow {
session: Some(SessionSelector::ById(session_id)),
name: Some("beta".to_string()),
},
)
.await;
let listed = send_request(
&mut client,
60,
Request::ListWindows {
session: Some(SessionSelector::ById(session_id)),
},
)
.await;
let expected_first = match listed {
Response::Ok(ResponsePayload::WindowList { windows }) => windows
.first()
.map(|window| window.id)
.expect("expected at least one window"),
other => panic!("unexpected list windows response: {other:?}"),
};
let switched = send_request(
&mut client,
61,
Request::SwitchWindow {
session: Some(SessionSelector::ById(session_id)),
target: WindowSelector::ByName(String::new()),
},
)
.await;
assert_eq!(
switched,
Response::Ok(ResponsePayload::WindowSwitched {
id: expected_first,
session_id,
})
);
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn switch_window_name_not_found_includes_lookup_chain() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut client = connect_and_handshake(&endpoint).await;
let created = send_request(
&mut client,
62,
Request::NewSession {
name: Some("window-not-found-chain".to_string()),
},
)
.await;
let session_id = match created {
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create session response: {other:?}"),
};
let switched = send_request(
&mut client,
63,
Request::SwitchWindow {
session: Some(SessionSelector::ById(session_id)),
target: WindowSelector::ByName("does-not-exist".to_string()),
},
)
.await;
match switched {
Response::Err(ErrorResponse {
code: ErrorCode::NotFound,
message,
}) => {
assert!(message.contains("lookup order"));
assert!(message.contains("exact name"));
assert!(message.contains("exact UUID"));
assert!(message.contains("UUID prefix"));
}
other => panic!("unexpected switch response: {other:?}"),
}
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn killing_last_window_removes_session() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut client = connect_and_handshake(&endpoint).await;
let created = send_request(
&mut client,
17,
Request::NewSession {
name: Some("single-window".to_string()),
},
)
.await;
let session_id = match created {
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create response: {other:?}"),
};
let listed = send_request(
&mut client,
18,
Request::ListWindows {
session: Some(SessionSelector::ById(session_id)),
},
)
.await;
let window_id = match listed {
Response::Ok(ResponsePayload::WindowList { windows }) if windows.len() == 1 => {
windows[0].id
}
other => panic!("unexpected initial window list: {other:?}"),
};
let killed = send_request(
&mut client,
19,
Request::KillWindow {
session: Some(SessionSelector::ById(session_id)),
target: WindowSelector::ById(window_id),
},
)
.await;
assert!(matches!(
killed,
Response::Ok(ResponsePayload::WindowKilled { .. })
));
let listed_sessions = send_request(&mut client, 20, Request::ListSessions).await;
assert_eq!(
listed_sessions,
Response::Ok(ResponsePayload::SessionList {
sessions: Vec::new(),
})
);
{
let runtime_manager = server
.state
.session_runtimes
.lock()
.expect("runtime manager lock should succeed");
assert_eq!(runtime_manager.runtime_count(), 0);
}
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn supports_attach_detach_and_kill() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut client = connect_and_handshake(&endpoint).await;
let created = send_request(
&mut client,
20,
Request::NewSession {
name: Some("ops".to_string()),
},
)
.await;
let session_id = match created {
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected new-session response: {other:?}"),
};
let attached = send_request(
&mut client,
21,
Request::Attach {
selector: SessionSelector::ByName("ops".to_string()),
},
)
.await;
let grant = match attached {
Response::Ok(ResponsePayload::Attached { grant }) => {
assert_eq!(grant.session_id, session_id);
grant
}
other => panic!("unexpected attach response: {other:?}"),
};
let attach_open = send_request(
&mut client,
211,
Request::AttachOpen {
session_id,
attach_token: grant.attach_token,
},
)
.await;
assert_eq!(
attach_open,
Response::Ok(ResponsePayload::AttachReady {
session_id,
can_write: true,
})
);
let detached = send_request(&mut client, 22, Request::Detach).await;
assert_eq!(detached, Response::Ok(ResponsePayload::Detached));
let killed = send_request(
&mut client,
23,
Request::KillSession {
selector: SessionSelector::ById(session_id),
},
)
.await;
assert_eq!(
killed,
Response::Ok(ResponsePayload::SessionKilled { id: session_id })
);
let listed = send_request(&mut client, 24, Request::ListSessions).await;
assert_eq!(
listed,
Response::Ok(ResponsePayload::SessionList {
sessions: Vec::new(),
})
);
{
let runtime_manager = server
.state
.session_runtimes
.lock()
.expect("runtime manager lock should succeed");
assert_eq!(runtime_manager.runtime_count(), 0);
assert!(!runtime_manager.has_runtime(SessionId(session_id)));
}
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn allows_second_attach_for_same_session_with_read_only_input() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut client_a = connect_and_handshake(&endpoint).await;
let mut client_b = connect_and_handshake(&endpoint).await;
let created = send_request(
&mut client_a,
60,
Request::NewSession {
name: Some("single-attach".to_string()),
},
)
.await;
let session_id = match created {
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create response: {other:?}"),
};
let grant_a = match send_request(
&mut client_a,
61,
Request::Attach {
selector: SessionSelector::ById(session_id),
},
)
.await
{
Response::Ok(ResponsePayload::Attached { grant }) => grant,
other => panic!("unexpected attach response for client a: {other:?}"),
};
let open_a = send_request(
&mut client_a,
62,
Request::AttachOpen {
session_id,
attach_token: grant_a.attach_token,
},
)
.await;
assert_eq!(
open_a,
Response::Ok(ResponsePayload::AttachReady {
session_id,
can_write: true,
})
);
let grant_b = match send_request(
&mut client_b,
63,
Request::Attach {
selector: SessionSelector::ById(session_id),
},
)
.await
{
Response::Ok(ResponsePayload::Attached { grant }) => grant,
other => panic!("unexpected attach response for client b: {other:?}"),
};
let open_b = send_request(
&mut client_b,
64,
Request::AttachOpen {
session_id,
attach_token: grant_b.attach_token,
},
)
.await;
assert_eq!(
open_b,
Response::Ok(ResponsePayload::AttachReady {
session_id,
can_write: false,
})
);
let owner_write = send_request(
&mut client_a,
65,
Request::AttachInput {
session_id,
data: b"printf 'owner-ok\\n'\n".to_vec(),
},
)
.await;
assert!(matches!(
owner_write,
Response::Ok(ResponsePayload::AttachInputAccepted { bytes }) if bytes > 0
));
let output_a = collect_attach_output_until(&mut client_a, session_id, "owner-ok", 20).await;
assert!(output_a.contains("owner-ok"));
let output_b = collect_attach_output_until(&mut client_b, session_id, "owner-ok", 20).await;
assert!(output_b.contains("owner-ok"));
let follower_write = send_request(
&mut client_b,
66,
Request::AttachInput {
session_id,
data: b"printf 'follower-write'\n".to_vec(),
},
)
.await;
assert!(matches!(
follower_write,
Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
..
})
));
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn detach_keeps_runtime_alive_and_allows_reattach() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut client = connect_and_handshake(&endpoint).await;
let created = send_request(
&mut client,
70,
Request::NewSession {
name: Some("reattach".to_string()),
},
)
.await;
let session_id = match created {
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create response: {other:?}"),
};
let grant = match send_request(
&mut client,
71,
Request::Attach {
selector: SessionSelector::ById(session_id),
},
)
.await
{
Response::Ok(ResponsePayload::Attached { grant }) => grant,
other => panic!("unexpected attach response: {other:?}"),
};
let open = send_request(
&mut client,
72,
Request::AttachOpen {
session_id,
attach_token: grant.attach_token,
},
)
.await;
assert_eq!(
open,
Response::Ok(ResponsePayload::AttachReady {
session_id,
can_write: true,
})
);
let detached = send_request(&mut client, 73, Request::Detach).await;
assert_eq!(detached, Response::Ok(ResponsePayload::Detached));
{
let runtime_manager = server
.state
.session_runtimes
.lock()
.expect("runtime manager lock should succeed");
assert_eq!(runtime_manager.runtime_count(), 1);
assert!(runtime_manager.has_runtime(SessionId(session_id)));
}
let regrant = match send_request(
&mut client,
74,
Request::Attach {
selector: SessionSelector::ById(session_id),
},
)
.await
{
Response::Ok(ResponsePayload::Attached { grant }) => grant,
other => panic!("unexpected reattach grant response: {other:?}"),
};
let reopen = send_request(
&mut client,
75,
Request::AttachOpen {
session_id,
attach_token: regrant.attach_token,
},
)
.await;
assert_eq!(
reopen,
Response::Ok(ResponsePayload::AttachReady {
session_id,
can_write: true,
})
);
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn attach_io_routes_through_active_window() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut client = connect_and_handshake(&endpoint).await;
let created = send_request(
&mut client,
300,
Request::NewSession {
name: Some("window-routing".to_string()),
},
)
.await;
let session_id = match created {
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected session create response: {other:?}"),
};
let listed = send_request(
&mut client,
301,
Request::ListWindows {
session: Some(SessionSelector::ById(session_id)),
},
)
.await;
let primary_window = match listed {
Response::Ok(ResponsePayload::WindowList { windows }) => windows
.iter()
.find(|window| window.active)
.map(|window| window.id)
.expect("expected active initial window"),
other => panic!("unexpected window list response: {other:?}"),
};
let secondary_window = match send_request(
&mut client,
302,
Request::NewWindow {
session: Some(SessionSelector::ById(session_id)),
name: Some("secondary".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::WindowCreated { id, .. }) => id,
other => panic!("unexpected new window response: {other:?}"),
};
let grant = match send_request(
&mut client,
303,
Request::Attach {
selector: SessionSelector::ById(session_id),
},
)
.await
{
Response::Ok(ResponsePayload::Attached { grant }) => grant,
other => panic!("unexpected attach response: {other:?}"),
};
let opened = send_request(
&mut client,
304,
Request::AttachOpen {
session_id,
attach_token: grant.attach_token,
},
)
.await;
assert_eq!(
opened,
Response::Ok(ResponsePayload::AttachReady {
session_id,
can_write: true,
})
);
let switched_primary = send_request(
&mut client,
305,
Request::SwitchWindow {
session: Some(SessionSelector::ById(session_id)),
target: WindowSelector::ById(primary_window),
},
)
.await;
assert_eq!(
switched_primary,
Response::Ok(ResponsePayload::WindowSwitched {
id: primary_window,
session_id,
})
);
let export_primary = send_request(
&mut client,
306,
Request::AttachInput {
session_id,
data: b"export BMUX_WINDOW_ROUTE=one\n".to_vec(),
},
)
.await;
assert!(matches!(
export_primary,
Response::Ok(ResponsePayload::AttachInputAccepted { bytes }) if bytes > 0
));
let _ = collect_attach_output_until(&mut client, session_id, "one", 5).await;
let switched_secondary = send_request(
&mut client,
307,
Request::SwitchWindow {
session: Some(SessionSelector::ById(session_id)),
target: WindowSelector::ById(secondary_window),
},
)
.await;
assert_eq!(
switched_secondary,
Response::Ok(ResponsePayload::WindowSwitched {
id: secondary_window,
session_id,
})
);
let print_secondary = send_request(
&mut client,
308,
Request::AttachInput {
session_id,
data: b"printf 'W2=[%s]\\n' \"$BMUX_WINDOW_ROUTE\"\n".to_vec(),
},
)
.await;
assert!(matches!(
print_secondary,
Response::Ok(ResponsePayload::AttachInputAccepted { bytes }) if bytes > 0
));
let second_output = collect_attach_output_until(&mut client, session_id, "W2=[]", 20).await;
assert!(second_output.contains("W2=[]"));
let switched_back = send_request(
&mut client,
309,
Request::SwitchWindow {
session: Some(SessionSelector::ById(session_id)),
target: WindowSelector::ById(primary_window),
},
)
.await;
assert_eq!(
switched_back,
Response::Ok(ResponsePayload::WindowSwitched {
id: primary_window,
session_id,
})
);
let print_primary = send_request(
&mut client,
310,
Request::AttachInput {
session_id,
data: b"printf 'W1=[%s]\\n' \"$BMUX_WINDOW_ROUTE\"\n".to_vec(),
},
)
.await;
assert!(matches!(
print_primary,
Response::Ok(ResponsePayload::AttachInputAccepted { bytes }) if bytes > 0
));
let first_output =
collect_attach_output_until(&mut client, session_id, "W1=[one]", 20).await;
assert!(first_output.contains("W1=[one]"));
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn reattach_uses_current_active_window() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut client = connect_and_handshake(&endpoint).await;
let created = send_request(
&mut client,
320,
Request::NewSession {
name: Some("reattach-window".to_string()),
},
)
.await;
let session_id = match created {
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected session create response: {other:?}"),
};
let listed = send_request(
&mut client,
321,
Request::ListWindows {
session: Some(SessionSelector::ById(session_id)),
},
)
.await;
let primary_window = match listed {
Response::Ok(ResponsePayload::WindowList { windows }) => windows
.iter()
.find(|window| window.active)
.map(|window| window.id)
.expect("expected active initial window"),
other => panic!("unexpected window list response: {other:?}"),
};
let secondary_window = match send_request(
&mut client,
322,
Request::NewWindow {
session: Some(SessionSelector::ById(session_id)),
name: Some("secondary".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::WindowCreated { id, .. }) => id,
other => panic!("unexpected new window response: {other:?}"),
};
let grant = match send_request(
&mut client,
323,
Request::Attach {
selector: SessionSelector::ById(session_id),
},
)
.await
{
Response::Ok(ResponsePayload::Attached { grant }) => grant,
other => panic!("unexpected attach response: {other:?}"),
};
let opened = send_request(
&mut client,
324,
Request::AttachOpen {
session_id,
attach_token: grant.attach_token,
},
)
.await;
assert_eq!(
opened,
Response::Ok(ResponsePayload::AttachReady {
session_id,
can_write: true,
})
);
let switched_primary = send_request(
&mut client,
325,
Request::SwitchWindow {
session: Some(SessionSelector::ById(session_id)),
target: WindowSelector::ById(primary_window),
},
)
.await;
assert_eq!(
switched_primary,
Response::Ok(ResponsePayload::WindowSwitched {
id: primary_window,
session_id,
})
);
let export_primary = send_request(
&mut client,
326,
Request::AttachInput {
session_id,
data: b"export BMUX_REATTACH=one\n".to_vec(),
},
)
.await;
assert!(matches!(
export_primary,
Response::Ok(ResponsePayload::AttachInputAccepted { bytes }) if bytes > 0
));
let _ = collect_attach_output_until(&mut client, session_id, "one", 5).await;
let switched_secondary = send_request(
&mut client,
327,
Request::SwitchWindow {
session: Some(SessionSelector::ById(session_id)),
target: WindowSelector::ById(secondary_window),
},
)
.await;
assert_eq!(
switched_secondary,
Response::Ok(ResponsePayload::WindowSwitched {
id: secondary_window,
session_id,
})
);
let export_secondary = send_request(
&mut client,
328,
Request::AttachInput {
session_id,
data: b"export BMUX_REATTACH=two\n".to_vec(),
},
)
.await;
assert!(matches!(
export_secondary,
Response::Ok(ResponsePayload::AttachInputAccepted { bytes }) if bytes > 0
));
let _ = collect_attach_output_until(&mut client, session_id, "two", 5).await;
let detached = send_request(&mut client, 329, Request::Detach).await;
assert_eq!(detached, Response::Ok(ResponsePayload::Detached));
let regrant = match send_request(
&mut client,
330,
Request::Attach {
selector: SessionSelector::ById(session_id),
},
)
.await
{
Response::Ok(ResponsePayload::Attached { grant }) => grant,
other => panic!("unexpected reattach grant response: {other:?}"),
};
let reopened = send_request(
&mut client,
331,
Request::AttachOpen {
session_id,
attach_token: regrant.attach_token,
},
)
.await;
assert_eq!(
reopened,
Response::Ok(ResponsePayload::AttachReady {
session_id,
can_write: true,
})
);
let print_output = send_request(
&mut client,
332,
Request::AttachInput {
session_id,
data: b"printf 'RA=[%s]\\n' \"$BMUX_REATTACH\"\n".to_vec(),
},
)
.await;
assert!(matches!(
print_output,
Response::Ok(ResponsePayload::AttachInputAccepted { bytes }) if bytes > 0
));
let output = collect_attach_output_until(&mut client, session_id, "RA=[two]", 20).await;
assert!(output.contains("RA=[two]"));
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn event_subscription_reports_lifecycle_order() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut client = connect_and_handshake(&endpoint).await;
let subscribed = send_request(&mut client, 80, Request::SubscribeEvents).await;
assert_eq!(subscribed, Response::Ok(ResponsePayload::EventsSubscribed));
let created = send_request(
&mut client,
81,
Request::NewSession {
name: Some("events".to_string()),
},
)
.await;
let session_id = match created {
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create response: {other:?}"),
};
let grant = match send_request(
&mut client,
82,
Request::Attach {
selector: SessionSelector::ById(session_id),
},
)
.await
{
Response::Ok(ResponsePayload::Attached { grant }) => grant,
other => panic!("unexpected attach response: {other:?}"),
};
let opened = send_request(
&mut client,
83,
Request::AttachOpen {
session_id,
attach_token: grant.attach_token,
},
)
.await;
assert_eq!(
opened,
Response::Ok(ResponsePayload::AttachReady {
session_id,
can_write: true,
})
);
let detached = send_request(&mut client, 84, Request::Detach).await;
assert_eq!(detached, Response::Ok(ResponsePayload::Detached));
let killed = send_request(
&mut client,
85,
Request::KillSession {
selector: SessionSelector::ById(session_id),
},
)
.await;
assert_eq!(
killed,
Response::Ok(ResponsePayload::SessionKilled { id: session_id })
);
let events = send_request(&mut client, 86, Request::PollEvents { max_events: 10 }).await;
let events = match events {
Response::Ok(ResponsePayload::EventBatch { events }) => events,
other => panic!("unexpected events response: {other:?}"),
};
let created_idx = events
.iter()
.position(
|event| matches!(event, Event::SessionCreated { id, .. } if *id == session_id),
)
.expect("session_created event should exist");
let attached_idx = events
.iter()
.position(|event| matches!(event, Event::ClientAttached { id } if *id == session_id))
.expect("client_attached event should exist");
let detached_idx = events
.iter()
.position(|event| matches!(event, Event::ClientDetached { id } if *id == session_id))
.expect("client_detached event should exist");
let removed_idx = events
.iter()
.position(|event| matches!(event, Event::SessionRemoved { id } if *id == session_id))
.expect("session_removed event should exist");
assert!(created_idx < attached_idx);
assert!(attached_idx < detached_idx);
assert!(detached_idx < removed_idx);
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn global_follow_updates_control_plane_without_rebinding_attach_stream() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut leader = connect_and_handshake(&endpoint).await;
let mut follower = connect_and_handshake(&endpoint).await;
let subscribed = send_request(&mut follower, 500, Request::SubscribeEvents).await;
assert_eq!(subscribed, Response::Ok(ResponsePayload::EventsSubscribed));
let alpha_session = match send_request(
&mut leader,
501,
Request::NewSession {
name: Some("leader-alpha".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected alpha session response: {other:?}"),
};
let beta_session = match send_request(
&mut leader,
502,
Request::NewSession {
name: Some("leader-beta".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected beta session response: {other:?}"),
};
let leader_attached_alpha = send_request(
&mut leader,
503,
Request::Attach {
selector: SessionSelector::ById(alpha_session),
},
)
.await;
assert!(matches!(
leader_attached_alpha,
Response::Ok(ResponsePayload::Attached { .. })
));
let leader_client_id =
discover_client_id_from_window_switch(&mut follower, &mut leader, alpha_session, 1500)
.await;
let follow_started = send_request(
&mut follower,
504,
Request::FollowClient {
target_client_id: leader_client_id,
global: true,
},
)
.await;
assert!(matches!(
follow_started,
Response::Ok(ResponsePayload::FollowStarted { global: true, .. })
));
let follower_windows_alpha =
send_request(&mut follower, 505, Request::ListWindows { session: None }).await;
match follower_windows_alpha {
Response::Ok(ResponsePayload::WindowList { windows }) => {
assert_eq!(windows.len(), 1);
assert_eq!(windows[0].session_id, alpha_session);
}
other => panic!("unexpected follower windows(alpha) response: {other:?}"),
}
let follower_client_id = match send_request(&mut follower, 5051, Request::WhoAmI).await {
Response::Ok(ResponsePayload::ClientIdentity { id }) => id,
other => panic!("unexpected follower whoami response: {other:?}"),
};
let grant_writer = send_request(
&mut leader,
5052,
Request::GrantRole {
session: SessionSelector::ById(alpha_session),
client_id: follower_client_id,
role: SessionRole::Writer,
},
)
.await;
assert!(matches!(
grant_writer,
Response::Ok(ResponsePayload::RoleGranted {
role: SessionRole::Writer,
..
})
));
let grant = match send_request(
&mut follower,
506,
Request::Attach {
selector: SessionSelector::ById(alpha_session),
},
)
.await
{
Response::Ok(ResponsePayload::Attached { grant }) => grant,
other => panic!("unexpected follower attach response: {other:?}"),
};
let opened = send_request(
&mut follower,
507,
Request::AttachOpen {
session_id: alpha_session,
attach_token: grant.attach_token,
},
)
.await;
assert_eq!(
opened,
Response::Ok(ResponsePayload::AttachReady {
session_id: alpha_session,
can_write: true,
})
);
let set_marker = send_request(
&mut follower,
508,
Request::AttachInput {
session_id: alpha_session,
data: b"export BMUX_FOLLOW_STREAM=ok\n".to_vec(),
},
)
.await;
assert!(matches!(
set_marker,
Response::Ok(ResponsePayload::AttachInputAccepted { bytes }) if bytes > 0
));
let leader_attached_beta = send_request(
&mut leader,
509,
Request::Attach {
selector: SessionSelector::ById(beta_session),
},
)
.await;
assert!(matches!(
leader_attached_beta,
Response::Ok(ResponsePayload::Attached { .. })
));
let follower_windows_beta =
send_request(&mut follower, 510, Request::ListWindows { session: None }).await;
match follower_windows_beta {
Response::Ok(ResponsePayload::WindowList { windows }) => {
assert_eq!(windows.len(), 1);
assert_eq!(windows[0].session_id, beta_session);
}
other => panic!("unexpected follower windows(beta) response: {other:?}"),
}
let print_stream = send_request(
&mut follower,
511,
Request::AttachInput {
session_id: alpha_session,
data: b"printf 'FS=[%s]\\n' \"$BMUX_FOLLOW_STREAM\"\n".to_vec(),
},
)
.await;
assert!(matches!(
print_stream,
Response::Ok(ResponsePayload::AttachInputAccepted { bytes }) if bytes > 0
));
let output = collect_attach_output_until(&mut follower, alpha_session, "FS=[ok]", 20).await;
assert!(output.contains("FS=[ok]"));
let unfollowed = send_request(&mut follower, 512, Request::Unfollow).await;
assert!(matches!(
unfollowed,
Response::Ok(ResponsePayload::FollowStopped { .. })
));
let events = poll_events_collect(&mut follower, 513, 10, 4).await;
assert!(
events
.iter()
.any(|event| matches!(event, Event::FollowStarted { global: true, .. }))
);
assert!(events.iter().any(|event| {
matches!(
event,
Event::FollowTargetChanged {
session_id,
..
} if *session_id == beta_session
)
}));
assert!(
events
.iter()
.any(|event| matches!(event, Event::FollowStopped { .. }))
);
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn leader_disconnect_emits_follow_target_gone() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut leader = connect_and_handshake(&endpoint).await;
let mut follower = connect_and_handshake(&endpoint).await;
let subscribed = send_request(&mut follower, 540, Request::SubscribeEvents).await;
assert_eq!(subscribed, Response::Ok(ResponsePayload::EventsSubscribed));
let session_id = match send_request(
&mut leader,
541,
Request::NewSession {
name: Some("follow-disconnect".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected session create response: {other:?}"),
};
let leader_client_id =
discover_client_id_from_window_switch(&mut follower, &mut leader, session_id, 1600)
.await;
let follow_started = send_request(
&mut follower,
542,
Request::FollowClient {
target_client_id: leader_client_id,
global: false,
},
)
.await;
assert!(matches!(
follow_started,
Response::Ok(ResponsePayload::FollowStarted { global: false, .. })
));
drop(leader);
let events = poll_events_collect(&mut follower, 543, 10, 8).await;
assert!(
events
.iter()
.any(|event| matches!(event, Event::FollowTargetGone { .. }))
);
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn rapid_leader_session_switches_emit_multiple_follow_target_changes() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut leader = connect_and_handshake(&endpoint).await;
let mut follower = connect_and_handshake(&endpoint).await;
let subscribed = send_request(&mut follower, 580, Request::SubscribeEvents).await;
assert_eq!(subscribed, Response::Ok(ResponsePayload::EventsSubscribed));
let alpha = match send_request(
&mut leader,
581,
Request::NewSession {
name: Some("rapid-alpha".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected alpha create response: {other:?}"),
};
let beta = match send_request(
&mut leader,
582,
Request::NewSession {
name: Some("rapid-beta".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected beta create response: {other:?}"),
};
let attached_alpha = send_request(
&mut leader,
583,
Request::Attach {
selector: SessionSelector::ById(alpha),
},
)
.await;
assert!(matches!(
attached_alpha,
Response::Ok(ResponsePayload::Attached { .. })
));
let leader_client_id = match send_request(&mut leader, 584, Request::ListClients).await {
Response::Ok(ResponsePayload::ClientList { clients }) => clients
.into_iter()
.find(|client| client.selected_session_id == Some(alpha))
.map(|client| client.id)
.expect("leader client should be listed"),
other => panic!("unexpected client list response: {other:?}"),
};
let follow_started = send_request(
&mut follower,
585,
Request::FollowClient {
target_client_id: leader_client_id,
global: true,
},
)
.await;
assert!(matches!(
follow_started,
Response::Ok(ResponsePayload::FollowStarted { global: true, .. })
));
let attached_beta = send_request(
&mut leader,
586,
Request::Attach {
selector: SessionSelector::ById(beta),
},
)
.await;
assert!(matches!(
attached_beta,
Response::Ok(ResponsePayload::Attached { .. })
));
let attached_alpha_again = send_request(
&mut leader,
587,
Request::Attach {
selector: SessionSelector::ById(alpha),
},
)
.await;
assert!(matches!(
attached_alpha_again,
Response::Ok(ResponsePayload::Attached { .. })
));
let events = poll_events_collect(&mut follower, 588, 32, 10).await;
let target_change_sessions = events
.iter()
.filter_map(|event| match event {
Event::FollowTargetChanged {
leader_client_id: event_leader,
session_id,
..
} if *event_leader == leader_client_id => Some(*session_id),
_ => None,
})
.collect::<Vec<_>>();
assert!(target_change_sessions.contains(&alpha));
assert!(target_change_sessions.contains(&beta));
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn detach_uses_active_stream_session_when_follow_updates_selected_session() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut leader = connect_and_handshake(&endpoint).await;
let mut follower = connect_and_handshake(&endpoint).await;
let alpha = match send_request(
&mut leader,
620,
Request::NewSession {
name: Some("detach-alpha".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected alpha create response: {other:?}"),
};
let beta = match send_request(
&mut leader,
621,
Request::NewSession {
name: Some("detach-beta".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected beta create response: {other:?}"),
};
let _ = send_request(
&mut leader,
622,
Request::Attach {
selector: SessionSelector::ById(alpha),
},
)
.await;
let leader_client_id = match send_request(&mut leader, 623, Request::ListClients).await {
Response::Ok(ResponsePayload::ClientList { clients }) => clients
.into_iter()
.find(|client| client.selected_session_id == Some(alpha))
.map(|client| client.id)
.expect("leader client should be listed"),
other => panic!("unexpected client list response: {other:?}"),
};
let _ = send_request(
&mut follower,
624,
Request::FollowClient {
target_client_id: leader_client_id,
global: true,
},
)
.await;
let follower_grant_alpha = match send_request(
&mut follower,
625,
Request::Attach {
selector: SessionSelector::ById(alpha),
},
)
.await
{
Response::Ok(ResponsePayload::Attached { grant }) => grant,
other => panic!("unexpected follower attach response: {other:?}"),
};
let follower_open_alpha = send_request(
&mut follower,
626,
Request::AttachOpen {
session_id: alpha,
attach_token: follower_grant_alpha.attach_token,
},
)
.await;
assert!(matches!(
follower_open_alpha,
Response::Ok(ResponsePayload::AttachReady {
session_id,
..
}) if session_id == alpha
));
let _ = send_request(
&mut leader,
627,
Request::Attach {
selector: SessionSelector::ById(beta),
},
)
.await;
let detached = send_request(&mut follower, 628, Request::Detach).await;
assert_eq!(detached, Response::Ok(ResponsePayload::Detached));
let output_after_detach = send_request(
&mut follower,
629,
Request::AttachOutput {
session_id: alpha,
max_bytes: 1024,
},
)
.await;
assert!(matches!(
output_after_detach,
Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
..
})
));
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn server_stop_while_attached_cleans_runtime_state() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut client = connect_and_handshake(&endpoint).await;
let created = send_request(
&mut client,
90,
Request::NewSession {
name: Some("stop-attached".to_string()),
},
)
.await;
let session_id = match created {
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create response: {other:?}"),
};
let grant = match send_request(
&mut client,
91,
Request::Attach {
selector: SessionSelector::ById(session_id),
},
)
.await
{
Response::Ok(ResponsePayload::Attached { grant }) => grant,
other => panic!("unexpected attach response: {other:?}"),
};
let opened = send_request(
&mut client,
92,
Request::AttachOpen {
session_id,
attach_token: grant.attach_token,
},
)
.await;
assert_eq!(
opened,
Response::Ok(ResponsePayload::AttachReady {
session_id,
can_write: true,
})
);
let stopper = connect_and_handshake(&endpoint).await;
let mut stopper = stopper;
let stopped = send_request(&mut stopper, 93, Request::ServerStop).await;
assert_eq!(stopped, Response::Ok(ResponsePayload::ServerStopping));
server_task
.await
.expect("server task should join")
.expect("server should stop cleanly");
{
let runtime_manager = server
.state
.session_runtimes
.lock()
.expect("runtime manager lock should succeed");
assert_eq!(runtime_manager.runtime_count(), 0);
}
{
let session_manager = server
.state
.session_manager
.lock()
.expect("session manager lock should succeed");
assert_eq!(session_manager.session_count(), 0);
}
if socket_path.exists() {
std::fs::remove_file(&socket_path).expect("socket cleanup should succeed");
}
}
#[cfg(unix)]
#[tokio::test]
async fn attach_open_rejects_invalid_token() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut client = connect_and_handshake(&endpoint).await;
let created = send_request(
&mut client,
40,
Request::NewSession {
name: Some("bad-token".to_string()),
},
)
.await;
let session_id = match created {
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create response: {other:?}"),
};
let response = send_request(
&mut client,
41,
Request::AttachOpen {
session_id,
attach_token: Uuid::new_v4(),
},
)
.await;
assert!(matches!(
response,
Response::Err(ErrorResponse {
code: ErrorCode::NotFound,
..
})
));
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn attach_open_rejects_expired_token() {
let (server, endpoint, socket_path, server_task) = start_server().await;
let mut client = connect_and_handshake(&endpoint).await;
let created = send_request(
&mut client,
50,
Request::NewSession {
name: Some("exp-token".to_string()),
},
)
.await;
let session_id = match created {
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected create response: {other:?}"),
};
let attached = send_request(
&mut client,
51,
Request::Attach {
selector: SessionSelector::ById(session_id),
},
)
.await;
let grant = match attached {
Response::Ok(ResponsePayload::Attached { grant }) => grant,
other => panic!("unexpected attach response: {other:?}"),
};
{
let mut token_manager = server
.state
.attach_tokens
.lock()
.expect("attach token manager lock should succeed");
force_expire_attach_token(&mut token_manager, grant.attach_token);
}
let response = send_request(
&mut client,
52,
Request::AttachOpen {
session_id,
attach_token: grant.attach_token,
},
)
.await;
assert!(matches!(
response,
Response::Err(ErrorResponse {
code: ErrorCode::InvalidRequest,
..
})
));
stop_server(server, server_task, &socket_path).await;
}
#[cfg(unix)]
#[tokio::test]
async fn server_stop_request_triggers_shutdown() {
let (_server, endpoint, socket_path, server_task) = start_server().await;
let mut client = connect_and_handshake(&endpoint).await;
let response = send_request(&mut client, 30, Request::ServerStop).await;
assert_eq!(response, Response::Ok(ResponsePayload::ServerStopping));
server_task
.await
.expect("server task should join")
.expect("server should stop gracefully");
if Path::new(&socket_path).exists() {
std::fs::remove_file(&socket_path).expect("socket cleanup should succeed");
}
}
#[cfg(unix)]
#[tokio::test]
async fn restores_sessions_windows_and_roles_from_snapshot() {
let suffix = Uuid::new_v4().to_string();
let root = std::path::PathBuf::from(format!("/tmp/bmxr-{}", &suffix[..8]));
let paths = ConfigPaths::new(root.join("config"), root.join("runtime"), root.join("data"));
paths.ensure_dirs().expect("paths should be created");
let endpoint = IpcEndpoint::unix_socket(paths.server_socket());
let (_server, server_task) = start_server_from_paths(&paths).await;
let mut owner = connect_and_handshake(&endpoint).await;
let mut member = connect_and_handshake(&endpoint).await;
let session_id = match send_request(
&mut owner,
300,
Request::NewSession {
name: Some("persisted".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected session create response: {other:?}"),
};
let window_id = match send_request(
&mut owner,
301,
Request::NewWindow {
session: Some(SessionSelector::ById(session_id)),
name: Some("extra".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::WindowCreated { id, .. }) => id,
other => panic!("unexpected window create response: {other:?}"),
};
let member_id = match send_request(&mut member, 302, Request::WhoAmI).await {
Response::Ok(ResponsePayload::ClientIdentity { id }) => id,
other => panic!("unexpected whoami response: {other:?}"),
};
let granted = send_request(
&mut owner,
303,
Request::GrantRole {
session: SessionSelector::ById(session_id),
client_id: member_id,
role: SessionRole::Writer,
},
)
.await;
assert!(matches!(
granted,
Response::Ok(ResponsePayload::RoleGranted { .. })
));
let stopped = send_request(&mut owner, 304, Request::ServerStop).await;
assert_eq!(stopped, Response::Ok(ResponsePayload::ServerStopping));
server_task
.await
.expect("server task should join")
.expect("server should stop cleanly");
if Path::new(paths.server_socket().as_path()).exists() {
std::fs::remove_file(paths.server_socket()).expect("socket cleanup should succeed");
}
let (_restored_server, restored_task) = start_server_from_paths(&paths).await;
let mut restored_client = connect_and_handshake(&endpoint).await;
let sessions = send_request(&mut restored_client, 305, Request::ListSessions).await;
let restored = match sessions {
Response::Ok(ResponsePayload::SessionList { sessions }) => sessions,
other => panic!("unexpected list sessions response: {other:?}"),
};
assert!(restored.iter().any(|s| s.id == session_id));
let windows = send_request(
&mut restored_client,
306,
Request::ListWindows {
session: Some(SessionSelector::ById(session_id)),
},
)
.await;
match windows {
Response::Ok(ResponsePayload::WindowList { windows }) => {
assert!(windows.iter().any(|window| window.id == window_id));
}
other => panic!("unexpected list windows response: {other:?}"),
}
let permissions = send_request(
&mut restored_client,
307,
Request::ListPermissions {
session: SessionSelector::ById(session_id),
},
)
.await;
match permissions {
Response::Ok(ResponsePayload::PermissionsList { permissions, .. }) => {
assert!(permissions.iter().any(|entry| {
entry.client_id == member_id && entry.role == SessionRole::Writer
}));
}
other => panic!("unexpected list permissions response: {other:?}"),
}
let stop_restored = send_request(&mut restored_client, 308, Request::ServerStop).await;
assert_eq!(stop_restored, Response::Ok(ResponsePayload::ServerStopping));
restored_task
.await
.expect("restored server task should join")
.expect("restored server should stop cleanly");
}
#[cfg(unix)]
#[tokio::test]
async fn restore_apply_replaces_current_state() {
let suffix = Uuid::new_v4().to_string();
let root = std::path::PathBuf::from(format!("/tmp/bmxr-{}", &suffix[..8]));
let paths = ConfigPaths::new(root.join("config"), root.join("runtime"), root.join("data"));
paths.ensure_dirs().expect("paths should be created");
let endpoint = IpcEndpoint::unix_socket(paths.server_socket());
let (_server, server_task) = start_server_from_paths(&paths).await;
let mut client = connect_and_handshake(&endpoint).await;
let baseline_id = match send_request(
&mut client,
340,
Request::NewSession {
name: Some("baseline".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected baseline create response: {other:?}"),
};
let saved = send_request(&mut client, 341, Request::ServerSave).await;
assert!(matches!(
saved,
Response::Ok(ResponsePayload::ServerSnapshotSaved { .. })
));
let transient_id = match send_request(
&mut client,
342,
Request::NewSession {
name: Some("transient".to_string()),
},
)
.await
{
Response::Ok(ResponsePayload::SessionCreated { id, .. }) => id,
other => panic!("unexpected transient create response: {other:?}"),
};
let restored = send_request(&mut client, 343, Request::ServerRestoreApply).await;
assert!(matches!(
restored,
Response::Ok(ResponsePayload::ServerSnapshotRestored { sessions, .. }) if sessions >= 1
));
let sessions = send_request(&mut client, 344, Request::ListSessions).await;
let sessions = match sessions {
Response::Ok(ResponsePayload::SessionList { sessions }) => sessions,
other => panic!("unexpected session list response: {other:?}"),
};
assert!(sessions.iter().any(|session| session.id == baseline_id));
assert!(!sessions.iter().any(|session| session.id == transient_id));
let stopped = send_request(&mut client, 345, Request::ServerStop).await;
assert_eq!(stopped, Response::Ok(ResponsePayload::ServerStopping));
server_task
.await
.expect("server task should join")
.expect("server should stop cleanly");
}
#[cfg(unix)]
#[tokio::test]
async fn restore_dry_run_reports_checksum_failure() {
let suffix = Uuid::new_v4().to_string();
let root = std::path::PathBuf::from(format!("/tmp/bmxr-{}", &suffix[..8]));
let paths = ConfigPaths::new(root.join("config"), root.join("runtime"), root.join("data"));
paths.ensure_dirs().expect("paths should be created");
let endpoint = IpcEndpoint::unix_socket(paths.server_socket());
let (_server, server_task) = start_server_from_paths(&paths).await;
let mut client = connect_and_handshake(&endpoint).await;
let _ = send_request(
&mut client,
350,
Request::NewSession {
name: Some("checksum".to_string()),
},
)
.await;
let saved = send_request(&mut client, 351, Request::ServerSave).await;
let snapshot_path = match saved {
Response::Ok(ResponsePayload::ServerSnapshotSaved { path: Some(path) }) => path,
other => panic!("unexpected save response: {other:?}"),
};
let mut payload: serde_json::Value = serde_json::from_slice(
&std::fs::read(&snapshot_path).expect("snapshot file should exist"),
)
.expect("snapshot json should decode");
let checksum = payload["checksum"]
.as_u64()
.expect("checksum field should be u64");
payload["checksum"] = serde_json::json!(checksum.wrapping_add(1));
std::fs::write(
&snapshot_path,
serde_json::to_vec_pretty(&payload).expect("snapshot json should encode"),
)
.expect("tampered snapshot should write");
let dry_run = send_request(&mut client, 352, Request::ServerRestoreDryRun).await;
assert!(matches!(
dry_run,
Response::Ok(ResponsePayload::ServerSnapshotRestoreDryRun { ok: false, message })
if message.contains("checksum")
));
let stopped = send_request(&mut client, 353, Request::ServerStop).await;
assert_eq!(stopped, Response::Ok(ResponsePayload::ServerStopping));
server_task
.await
.expect("server task should join")
.expect("server should stop cleanly");
}
#[cfg(unix)]
async fn start_server() -> (
BmuxServer,
IpcEndpoint,
std::path::PathBuf,
tokio::task::JoinHandle<anyhow::Result<()>>,
) {
let socket_path = std::env::temp_dir().join(format!("bmux-server-{}.sock", Uuid::new_v4()));
let endpoint = IpcEndpoint::unix_socket(&socket_path);
let server = BmuxServer::new(endpoint.clone());
let server_task = spawn_server_with_ready(server.clone()).await;
(server, endpoint, socket_path, server_task)
}
#[cfg(unix)]
async fn start_server_from_paths(
paths: &ConfigPaths,
) -> (BmuxServer, tokio::task::JoinHandle<anyhow::Result<()>>) {
let server = BmuxServer::from_config_paths(paths);
let server_task = spawn_server_with_ready(server.clone()).await;
(server, server_task)
}
#[cfg(unix)]
async fn connect_and_handshake(endpoint: &IpcEndpoint) -> LocalIpcStream {
let mut client = LocalIpcStream::connect(endpoint)
.await
.expect("client should connect");
let hello_payload = encode(&Request::Hello {
protocol_version: ProtocolVersion::current(),
client_name: "test-client".to_string(),
})
.expect("hello should encode");
let hello = Envelope::new(1, EnvelopeKind::Request, hello_payload);
client
.send_envelope(&hello)
.await
.expect("hello send should succeed");
let reply = client
.recv_envelope()
.await
.expect("hello reply should be received");
let response: Response = decode(&reply.payload).expect("response should decode");
assert!(matches!(
response,
Response::Ok(ResponsePayload::ServerStatus { running: true, .. })
));
client
}
#[cfg(unix)]
async fn send_request(
client: &mut LocalIpcStream,
request_id: u64,
request: Request,
) -> Response {
let payload = encode(&request).expect("request should encode");
let envelope = Envelope::new(request_id, EnvelopeKind::Request, payload);
client
.send_envelope(&envelope)
.await
.expect("request send should succeed");
let reply = client
.recv_envelope()
.await
.expect("request reply should be received");
assert_eq!(reply.request_id, request_id);
decode(&reply.payload).expect("response decode should succeed")
}
#[cfg(unix)]
async fn poll_events_collect(
client: &mut LocalIpcStream,
request_id_base: u64,
max_events: usize,
attempts: usize,
) -> Vec<Event> {
let mut all_events = Vec::new();
for idx in 0..attempts.max(1) {
let response = send_request(
client,
request_id_base + idx as u64,
Request::PollEvents { max_events },
)
.await;
if let Response::Ok(ResponsePayload::EventBatch { events }) = response
&& !events.is_empty()
{
all_events.extend(events);
}
sleep(Duration::from_millis(25)).await;
}
all_events
}
#[cfg(unix)]
async fn discover_client_id_from_window_switch(
observer: &mut LocalIpcStream,
actor: &mut LocalIpcStream,
session_id: Uuid,
request_id_base: u64,
) -> Uuid {
let switched = send_request(
actor,
request_id_base,
Request::SwitchWindow {
session: Some(SessionSelector::ById(session_id)),
target: WindowSelector::Active,
},
)
.await;
assert!(matches!(
switched,
Response::Ok(ResponsePayload::WindowSwitched {
session_id: switched_session,
..
}) if switched_session == session_id
));
let events = poll_events_collect(observer, request_id_base + 1, 10, 6).await;
events
.iter()
.find_map(|event| match event {
Event::WindowSwitched {
session_id: switched_session,
by_client_id,
..
} if *switched_session == session_id => Some(*by_client_id),
_ => None,
})
.expect("window switched event with client id should exist")
}
#[cfg(unix)]
async fn collect_attach_output_until(
client: &mut LocalIpcStream,
session_id: uuid::Uuid,
needle: &str,
attempts: usize,
) -> String {
let mut collected = String::new();
let mut idx = 0usize;
let max_attempts = attempts.max(1).saturating_mul(10);
while idx < max_attempts {
let response = send_request(
client,
4000 + idx as u64,
Request::AttachOutput {
session_id,
max_bytes: 8192,
},
)
.await;
if let Response::Ok(ResponsePayload::AttachOutput { data }) = response
&& !data.is_empty()
{
collected.push_str(&String::from_utf8_lossy(&data));
if collected.contains(needle) {
break;
}
}
idx += 1;
sleep(Duration::from_millis(25)).await;
}
collected
}
#[cfg(unix)]
async fn stop_server(
server: BmuxServer,
server_task: tokio::task::JoinHandle<anyhow::Result<()>>,
socket_path: &std::path::Path,
) {
server.request_shutdown();
server_task
.await
.expect("server task should join")
.expect("server should shut down cleanly");
if socket_path.exists() {
std::fs::remove_file(socket_path).expect("socket cleanup should succeed");
}
}
fn force_expire_attach_token(token_manager: &mut super::AttachTokenManager, token: Uuid) {
if let Some(entry) = token_manager.tokens.get_mut(&token) {
entry.expires_at = std::time::Instant::now() - Duration::from_millis(1);
}
}
}