use crate::protocol::{self, ServerMsg};
use retach::screen::{Screen, RenderCache, TerminalEmulator};
use crate::session::{SessionManager, SessionHandles};
use std::sync::{Arc, Mutex as StdMutex};
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
use tracing::debug;
use super::session_setup::{setup_session, ConnectRequest};
use super::session_relay::{screen_to_client, client_to_pty};
pub(super) const RENDER_THROTTLE: std::time::Duration = std::time::Duration::from_millis(16);
const BINCODE_LINE_OVERHEAD: usize = 16;
pub(super) fn prepend_passthrough(passthrough: Vec<Vec<u8>>, render_data: Vec<u8>) -> Vec<u8> {
if passthrough.is_empty() {
return render_data;
}
let total: usize = passthrough.iter().map(|c| c.len()).sum::<usize>() + render_data.len();
let mut combined = Vec::with_capacity(total);
for chunk in passthrough {
combined.extend_from_slice(&chunk);
}
combined.extend_from_slice(&render_data);
combined
}
pub(super) fn lock_mutex<'a, T>(mutex: &'a StdMutex<T>, label: &str) -> anyhow::Result<std::sync::MutexGuard<'a, T>> {
mutex.lock().map_err(|e| anyhow::anyhow!("{} mutex poisoned: {}", label, e))
}
pub(super) async fn render_and_send(
screen: &Arc<StdMutex<Screen>>,
cache: &mut RenderCache,
writer: &mut tokio::net::unix::OwnedWriteHalf,
full: bool,
) -> anyhow::Result<()> {
let update = lock_mutex(screen, "screen")?.render(full, cache);
let msg = protocol::encode(&ServerMsg::ScreenUpdate(update))?;
writer.write_all(&msg).await?;
Ok(())
}
async fn send_initial_state(
handles: &SessionHandles,
is_new_session: bool,
writer: &mut tokio::net::unix::OwnedWriteHalf,
) -> anyhow::Result<RenderCache> {
let connected = protocol::encode(&ServerMsg::Connected { name: handles.name.clone(), new_session: is_new_session })?;
writer.write_all(&connected).await?;
let mut render_cache = RenderCache::new();
let (hist_chunks, screen_msg) = {
let mut screen = lock_mutex(&handles.screen, "screen")?;
let hist = if screen.in_alt_screen() {
Vec::new()
} else {
screen.get_history()
};
let notifications = screen.take_queued_notifications();
let mut render_data = Vec::new();
for notif in notifications {
render_data.extend_from_slice(¬if);
}
if !hist.is_empty() {
use retach::screen::write_u16;
render_data.extend_from_slice(b"\x1b[");
write_u16(&mut render_data, screen.rows());
render_data.extend_from_slice(b";1H");
render_data.extend(std::iter::repeat_n(b'\n', screen.rows().saturating_sub(1) as usize));
}
render_data.extend_from_slice(&screen.render(true, &mut render_cache));
let screen_msg = protocol::encode(&ServerMsg::ScreenUpdate(render_data))?;
(hist, screen_msg)
};
if !hist_chunks.is_empty() {
let mut chunk = Vec::new();
let mut chunk_size = 0;
let size_limit = protocol::codec::MAX_FRAME_SIZE / 2;
for line in hist_chunks {
let line_size = line.len() + BINCODE_LINE_OVERHEAD;
if chunk_size + line_size > size_limit && !chunk.is_empty() {
let msg = protocol::encode(&ServerMsg::History(std::mem::take(&mut chunk)))?;
writer.write_all(&msg).await?;
chunk_size = 0;
}
chunk_size += line_size;
chunk.push(line);
}
if !chunk.is_empty() {
let msg = protocol::encode(&ServerMsg::History(chunk))?;
writer.write_all(&msg).await?;
}
}
writer.write_all(&screen_msg).await?;
{
let mut screen = lock_mutex(&handles.screen, "screen")?;
screen.take_pending_scrollback();
screen.take_passthrough();
}
Ok(render_cache)
}
pub(super) async fn handle_session(
mut stream: tokio::net::UnixStream,
manager: Arc<Mutex<SessionManager>>,
req: ConnectRequest,
) -> anyhow::Result<()> {
let setup = setup_session(&mut stream, &manager, &req.name, req.history, req.cols, req.rows, req.mode).await?;
let _client_guard = setup.client_guard;
let (reader, mut writer) = stream.into_split();
let render_cache = send_initial_state(&setup.handles, setup.is_new_session, &mut writer).await?;
let refresh_notify = Arc::new(tokio::sync::Notify::new());
setup.handles.screen_notify.notify_one();
let mut screen_to_client_task = tokio::spawn(screen_to_client(
setup.handles.clone(),
render_cache,
refresh_notify.clone(),
setup.evict_rx,
writer,
));
let mut client_to_pty_task = tokio::spawn(client_to_pty(
setup.handles,
reader,
refresh_notify,
req.leftover,
));
tokio::select! {
r = &mut screen_to_client_task => {
debug!("screen_to_client finished: {:?}", r.as_ref().map(|r| r.as_ref().map(|_| "ok")));
client_to_pty_task.abort();
r??;
}
r = &mut client_to_pty_task => {
debug!("client_to_pty finished: {:?}", r.as_ref().map(|r| r.as_ref().map(|_| "ok")));
screen_to_client_task.abort();
r??;
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use retach::screen::{Screen, RenderCache, TerminalEmulator};
#[test]
fn prepend_passthrough_empty() {
let render = b"render-data".to_vec();
let result = prepend_passthrough(vec![], render.clone());
assert_eq!(result, render);
}
#[test]
fn prepend_passthrough_single() {
let pt = vec![b"\x1b[3J".to_vec()];
let render = b"\x1b[?2026hcontent\x1b[?2026l".to_vec();
let result = prepend_passthrough(pt, render);
assert_eq!(&result[..4], b"\x1b[3J");
assert_eq!(&result[4..], b"\x1b[?2026hcontent\x1b[?2026l");
}
#[test]
fn prepend_passthrough_multiple() {
let pt = vec![vec![0x07], b"\x1b[3J".to_vec()];
let render = b"screen".to_vec();
let result = prepend_passthrough(pt, render);
assert_eq!(result, b"\x07\x1b[3Jscreen");
}
#[test]
fn ed3_included_in_screen_update() {
let mut screen = Screen::new(80, 24, 100);
screen.process(b"hello world");
screen.process(b"\x1b[3J");
let passthrough = screen.take_passthrough();
assert_eq!(passthrough.len(), 1);
assert_eq!(passthrough[0], b"\x1b[3J");
let mut cache = RenderCache::new();
let render_data = screen.render(true, &mut cache);
let combined = prepend_passthrough(passthrough, render_data.clone());
assert!(combined.starts_with(b"\x1b[3J"), "passthrough should prefix screen data");
assert_eq!(&combined[4..], &render_data[..]);
}
}