use crate::protocol::{self, ClientMsg, ServerMsg, FrameReader};
use crate::session::SessionHandles;
use std::io::Write;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tracing::debug;
use super::session_bridge::{lock_mutex, prepend_passthrough, render_and_send, RENDER_THROTTLE};
use super::session_setup::resize_pty;
pub(super) async fn screen_to_client(
h: SessionHandles,
mut render_cache: retach::screen::RenderCache,
refresh_notify: Arc<tokio::sync::Notify>,
mut evict_rx: tokio::sync::watch::Receiver<bool>,
mut writer: tokio::net::unix::OwnedWriteHalf,
) -> anyhow::Result<()> {
use std::pin::pin;
use std::time::Duration;
use tokio::time::Instant;
if !h.reader_alive.load(Ordering::Acquire) {
render_and_send(&h.screen, &mut render_cache, &mut writer, true).await?;
let msg = protocol::encode(&ServerMsg::SessionEnded)?;
writer.write_all(&msg).await?;
return Ok(());
}
let mut throttle_sleep = pin!(tokio::time::sleep(Duration::ZERO));
let mut pending_render = false;
loop {
tokio::select! {
_ = h.screen_notify.notified() => {
if !h.reader_alive.load(Ordering::Acquire) {
let (render_data, passthrough) = lock_mutex(&h.screen, "screen")?
.take_and_render(&mut render_cache);
let update = prepend_passthrough(passthrough, render_data);
let msg = protocol::encode(&ServerMsg::ScreenUpdate(update))?;
writer.write_all(&msg).await?;
let msg = protocol::encode(&ServerMsg::SessionEnded)?;
writer.write_all(&msg).await?;
break;
}
pending_render = true;
throttle_sleep.as_mut().reset(Instant::now() + RENDER_THROTTLE);
}
_ = &mut throttle_sleep, if pending_render => {
let (render_data, passthrough) = lock_mutex(&h.screen, "screen")?
.take_and_render(&mut render_cache);
let update = prepend_passthrough(passthrough, render_data);
if !update.is_empty() {
let msg = protocol::encode(&ServerMsg::ScreenUpdate(update))?;
writer.write_all(&msg).await?;
}
pending_render = false;
}
_ = refresh_notify.notified() => {
render_and_send(&h.screen, &mut render_cache, &mut writer, true).await?;
}
result = evict_rx.changed() => {
match result {
Ok(()) => {
debug!(session = %h.name, "client evicted by new connection");
let msg = protocol::encode(&ServerMsg::Error("evicted by new client".into()))?;
if let Err(e) = writer.write_all(&msg).await {
debug!(session = %h.name, error = %e, "failed to send eviction notice to client");
}
}
Err(_) => {
debug!(session = %h.name, "session killed while client connected");
let msg = protocol::encode(&ServerMsg::SessionEnded)?;
if let Err(e) = writer.write_all(&msg).await {
debug!(session = %h.name, error = %e, "failed to send session-ended to killed client");
}
}
}
break;
}
}
}
Ok(())
}
pub(super) async fn client_to_pty(
h: SessionHandles,
mut sock_reader: tokio::net::unix::OwnedReadHalf,
refresh_notify: Arc<tokio::sync::Notify>,
leftover: Vec<u8>,
) -> anyhow::Result<()> {
let mut frames = FrameReader::with_leftover(leftover);
loop {
if !frames.fill_from(&mut sock_reader).await? {
debug!(session = %h.name, "client socket closed");
break;
}
while let Some(msg) = frames.decode_next::<ClientMsg>()? {
match msg {
ClientMsg::Input(input) => {
let pw = h.pty_writer.clone();
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
let mut w = lock_mutex(&pw, "pty_writer")?;
w.write_all(&input)?;
w.flush()?;
Ok(())
}).await??;
}
ClientMsg::Resize { cols, rows } => {
let master_clone = h.master.clone();
let screen_clone = h.screen.clone();
let dims_clone = h.dims.clone();
let name_clone = h.name.clone();
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
resize_pty(&master_clone, &screen_clone, cols, rows)?;
match dims_clone.lock() {
Ok(mut d) => *d = retach::screen::sanitize_dimensions(cols, rows),
Err(e) => tracing::warn!(session = %name_clone, error = %e, "dims mutex poisoned during client resize"),
}
Ok(())
}).await??;
}
ClientMsg::RefreshScreen => {
refresh_notify.notify_one();
}
ClientMsg::Detach => {
debug!(session = %h.name, "client detached");
return Ok(());
}
ClientMsg::Connect { .. } | ClientMsg::ListSessions | ClientMsg::KillSession { .. } => {
tracing::debug!("ignoring unexpected client message in session relay");
}
}
}
}
Ok(())
}