use embers_core::{BufferId, RequestId, init_test_tracing};
use embers_protocol::{
BufferRecordState, BufferRequest, BufferResponse, BuffersResponse, ClientMessage, InputRequest,
ProtocolClient, ServerResponse, SessionRequest, SessionSnapshotResponse, SnapshotResponse,
};
use embers_server::{Server, ServerConfig};
use tempfile::tempdir;
use tokio::time::{Duration, Instant, sleep};
async fn request_session_snapshot(
client: &mut ProtocolClient,
request: SessionRequest,
) -> SessionSnapshotResponse {
match client
.request(&ClientMessage::Session(request))
.await
.expect("session request succeeds")
{
ServerResponse::SessionSnapshot(response) => response,
other => panic!("expected session snapshot, got {other:?}"),
}
}
async fn request_buffer(client: &mut ProtocolClient, request: BufferRequest) -> BufferResponse {
match client
.request(&ClientMessage::Buffer(request))
.await
.expect("buffer request succeeds")
{
ServerResponse::Buffer(response) => response,
other => panic!("expected buffer response, got {other:?}"),
}
}
async fn request_buffers(client: &mut ProtocolClient, request: BufferRequest) -> BuffersResponse {
match client
.request(&ClientMessage::Buffer(request))
.await
.expect("buffer list succeeds")
{
ServerResponse::Buffers(response) => response,
other => panic!("expected buffers response, got {other:?}"),
}
}
async fn wait_for_snapshot_line(
client: &mut ProtocolClient,
request_id: RequestId,
buffer_id: BufferId,
expected: &str,
) -> SnapshotResponse {
let deadline = Instant::now() + Duration::from_secs(2);
loop {
if let Ok(ServerResponse::Snapshot(snapshot)) = client
.request(&ClientMessage::Buffer(BufferRequest::Capture {
request_id,
buffer_id,
}))
.await
&& snapshot.lines.iter().any(|line| line.contains(expected))
{
return snapshot;
}
if Instant::now() >= deadline {
break;
}
sleep(Duration::from_millis(25)).await;
}
panic!(
"capture for buffer {buffer_id} did not contain expected line '{expected}' before timeout"
);
}
async fn wait_for_running_buffer(
client: &mut ProtocolClient,
request_id: RequestId,
buffer_id: BufferId,
) -> embers_protocol::BufferRecord {
let deadline = Instant::now() + Duration::from_secs(2);
loop {
let response = request_buffer(
client,
BufferRequest::Get {
request_id,
buffer_id,
},
)
.await;
if response.buffer.state == BufferRecordState::Running {
return response.buffer;
}
if Instant::now() >= deadline {
break;
}
sleep(Duration::from_millis(25)).await;
}
panic!("buffer {buffer_id} did not reach Running before timeout");
}
#[tokio::test]
async fn clean_restart_restores_workspace_and_keeps_live_buffers_running() {
init_test_tracing();
let tempdir = tempdir().expect("tempdir");
let socket_path = tempdir.path().join("mux.sock");
let config = ServerConfig::new(socket_path.clone());
let workspace_path = config.workspace_path.clone();
let handle = Server::new(config.clone())
.start()
.await
.expect("start server");
let mut client = ProtocolClient::connect(&socket_path)
.await
.expect("connect client");
let session = request_session_snapshot(
&mut client,
SessionRequest::Create {
request_id: RequestId(1),
name: "main".to_owned(),
},
)
.await;
let session_id = session.snapshot.session.id;
let attached = request_buffer(
&mut client,
BufferRequest::Create {
request_id: RequestId(2),
title: Some("attached".to_owned()),
command: vec!["/bin/sh".to_owned()],
cwd: None,
env: Default::default(),
},
)
.await
.buffer;
let detached = request_buffer(
&mut client,
BufferRequest::Create {
request_id: RequestId(3),
title: Some("detached".to_owned()),
command: vec!["/bin/sh".to_owned()],
cwd: None,
env: Default::default(),
},
)
.await
.buffer;
let attached_id = attached.id;
let detached_id = detached.id;
let restored_layout = request_session_snapshot(
&mut client,
SessionRequest::AddRootTab {
request_id: RequestId(4),
session_id,
title: "shell".to_owned(),
buffer_id: Some(attached_id),
child_node_id: None,
},
)
.await;
assert_eq!(restored_layout.snapshot.session.id, session_id);
let deadline = Instant::now() + Duration::from_secs(2);
let attached_running = loop {
let response = request_buffer(
&mut client,
BufferRequest::Get {
request_id: RequestId(40),
buffer_id: attached_id,
},
)
.await;
if response.buffer.state == BufferRecordState::Running {
break true;
}
if Instant::now() >= deadline {
break false;
}
sleep(Duration::from_millis(25)).await;
};
assert!(
attached_running,
"attached buffer {attached_id} did not reach Running before shutdown"
);
handle.shutdown().await.expect("shutdown server");
assert!(workspace_path.exists());
let handle = Server::new(config).start().await.expect("restart server");
let mut client = ProtocolClient::connect(&socket_path)
.await
.expect("reconnect client");
let session = request_session_snapshot(
&mut client,
SessionRequest::Get {
request_id: RequestId(5),
session_id,
},
)
.await;
assert_eq!(session.snapshot.session.name, "main");
let attached_buffer = session
.snapshot
.buffers
.iter()
.find(|buffer| buffer.id == attached_id)
.expect("attached buffer restored");
assert_eq!(attached_buffer.state, BufferRecordState::Running);
assert!(attached_buffer.attachment_node_id.is_some());
let buffers = request_buffers(
&mut client,
BufferRequest::List {
request_id: RequestId(6),
session_id: None,
attached_only: false,
detached_only: false,
},
)
.await;
let detached_buffer = buffers
.buffers
.iter()
.find(|buffer| buffer.id == detached_id)
.expect("detached buffer restored");
assert_eq!(detached_buffer.state, BufferRecordState::Running);
assert_eq!(detached_buffer.attachment_node_id, None);
match client
.request(&ClientMessage::Input(InputRequest::Send {
request_id: RequestId(7),
buffer_id: attached_id,
bytes: b"printf restarted-attached\\n\r".to_vec(),
}))
.await
.expect("send to attached buffer succeeds")
{
ServerResponse::Ok(_) => {}
other => panic!("expected ok response, got {other:?}"),
}
match client
.request(&ClientMessage::Input(InputRequest::Send {
request_id: RequestId(8),
buffer_id: detached_id,
bytes: b"printf restarted-detached\\n\r".to_vec(),
}))
.await
.expect("send to detached buffer succeeds")
{
ServerResponse::Ok(_) => {}
other => panic!("expected ok response, got {other:?}"),
}
let _attached_capture =
wait_for_snapshot_line(&mut client, RequestId(9), attached_id, "restarted-attached").await;
let _detached_capture = wait_for_snapshot_line(
&mut client,
RequestId(10),
detached_id,
"restarted-detached",
)
.await;
handle.shutdown().await.expect("shutdown restarted server");
}
#[tokio::test]
async fn clean_restart_stops_ephemeral_buffer_pipes() {
init_test_tracing();
let tempdir = tempdir().expect("tempdir");
let socket_path = tempdir.path().join("mux.sock");
let config = ServerConfig::new(socket_path.clone());
let handle = Server::new(config.clone())
.start()
.await
.expect("start server");
let mut client = ProtocolClient::connect(&socket_path)
.await
.expect("connect client");
let buffer = request_buffer(
&mut client,
BufferRequest::Create {
request_id: RequestId(101),
title: Some("piped".to_owned()),
command: vec!["/bin/sh".to_owned()],
cwd: None,
env: Default::default(),
},
)
.await
.buffer;
let buffer_id = buffer.id;
let _ = wait_for_running_buffer(&mut client, RequestId(102), buffer_id).await;
let started = request_buffer(
&mut client,
BufferRequest::StartPipe {
request_id: RequestId(103),
buffer_id,
command: vec!["/bin/cat".to_owned()],
cwd: None,
env: Default::default(),
},
)
.await
.buffer;
assert!(
started
.pipe
.as_ref()
.is_some_and(|pipe| pipe.state == embers_protocol::BufferPipeState::Running)
);
handle.shutdown().await.expect("shutdown server");
let handle = Server::new(config).start().await.expect("restart server");
let mut client = ProtocolClient::connect(&socket_path)
.await
.expect("reconnect client");
let restored = request_buffer(
&mut client,
BufferRequest::Get {
request_id: RequestId(104),
buffer_id,
},
)
.await
.buffer;
assert_eq!(restored.state, BufferRecordState::Running);
let restored_pipe = restored
.pipe
.expect("restored buffer keeps stopped pipe metadata");
assert_eq!(
restored_pipe.state,
embers_protocol::BufferPipeState::Stopped
);
assert_eq!(
restored_pipe.stop_reason,
Some(embers_protocol::BufferPipeStopReason::Requested)
);
handle.shutdown().await.expect("shutdown restarted server");
}