#![forbid(unsafe_code)]
pub mod audit;
pub mod kanshou_state;
pub mod praca_store;
#[cfg(any(test, feature = "testing"))]
pub mod testing;
use std::io;
use std::net::{SocketAddr, TcpListener};
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use tear_config::LiveConfig;
use tear_core::InProcess;
use tear_types::wire::{read_msg, write_msg, Request, Response, WireError};
use tear_types::MultiplexerControl;
use tracing::{debug, error, info, warn};
use crate::audit::{AuditEvent, AuditLog};
pub struct DaemonHandle {
socket_path: PathBuf,
stop: Arc<AtomicBool>,
accept_thread: Option<thread::JoinHandle<()>>,
_inproc: Arc<InProcess>,
config: Arc<LiveConfig>,
audit: Option<AuditLog>,
praca: PracaStore,
_config_watcher: Option<notify::RecommendedWatcher>,
}
impl DaemonHandle {
pub fn socket_path(&self) -> &Path {
&self.socket_path
}
pub fn inproc(&self) -> &Arc<InProcess> {
&self._inproc
}
pub fn config(&self) -> &Arc<LiveConfig> {
&self.config
}
pub fn praca(&self) -> &PracaStore {
&self.praca
}
pub fn stop(mut self) {
self.signal_and_join();
}
fn signal_and_join(&mut self) {
self.stop.store(true, Ordering::SeqCst);
let _ = UnixStream::connect(&self.socket_path);
if let Some(h) = self.accept_thread.take() {
let _ = h.join();
}
let _ = std::fs::remove_file(&self.socket_path);
}
}
impl Drop for DaemonHandle {
fn drop(&mut self) {
if self.accept_thread.is_some() {
self.signal_and_join();
}
}
}
pub fn start(socket_path: PathBuf, inproc: Arc<InProcess>) -> io::Result<DaemonHandle> {
let live = LiveConfig::default();
start_with_config(socket_path, inproc, Arc::new(live))
}
pub fn start_tcp(addr: SocketAddr, inproc: Arc<InProcess>) -> io::Result<DaemonHandle> {
let live = LiveConfig::default();
start_tcp_with_config(addr, inproc, Arc::new(live))
}
pub fn start_tcp_with_config(
addr: SocketAddr,
inproc: Arc<InProcess>,
live_config: Arc<LiveConfig>,
) -> io::Result<DaemonHandle> {
let listener = TcpListener::bind(addr)?;
let bound = listener.local_addr()?;
info!(addr = %bound, "tear-daemon listening (tcp)");
let watcher = match live_config.spawn_watcher() {
Ok(w) => Some(w),
Err(e) => {
warn!(error = %e, "config file watcher could not start (tcp daemon)");
None
}
};
let audit = open_audit_from_config(&live_config);
let required_token = resolve_required_token(&live_config)?;
let praca = praca_store::PracaStore::open_default();
let stop = Arc::new(AtomicBool::new(false));
let stop_for_accept = stop.clone();
let inproc_for_accept = inproc.clone();
let config_for_accept = live_config.clone();
let audit_for_accept = audit.clone();
let token_for_accept = required_token.clone();
let praca_for_accept = praca.clone();
let accept_thread = thread::Builder::new()
.name("tear-daemon-accept-tcp".into())
.spawn(move || {
accept_loop_tcp(
listener,
stop_for_accept,
inproc_for_accept,
config_for_accept,
audit_for_accept,
token_for_accept,
Some(praca_for_accept),
)
})?;
Ok(DaemonHandle {
socket_path: PathBuf::from(format!("tcp://{bound}")),
stop,
accept_thread: Some(accept_thread),
_inproc: inproc,
config: live_config,
audit,
praca,
_config_watcher: watcher,
})
}
fn open_audit_from_config(live: &LiveConfig) -> Option<AuditLog> {
let path = live.load().audit_log.clone()?;
match AuditLog::open(&path) {
Ok(a) => Some(a),
Err(e) => {
warn!(path, error = %e, "audit: open failed; audit disabled this run");
None
}
}
}
fn resolve_required_token(live: &LiveConfig) -> io::Result<Option<String>> {
let Some(name) = live.load().auth_token_env.clone() else {
return Ok(None);
};
match std::env::var(&name) {
Ok(v) if !v.is_empty() => {
info!(env_var = %name, "auth: requiring token on every client connection");
Ok(Some(v))
}
Ok(_) => Err(io::Error::new(
io::ErrorKind::PermissionDenied,
format!("auth_token_env={name} is set but the env var is empty"),
)),
Err(_) => Err(io::Error::new(
io::ErrorKind::PermissionDenied,
format!(
"auth_token_env={name} is set but the env var is not present; \
export it before starting `tear daemon`"
),
)),
}
}
pub fn start_with_config(
socket_path: PathBuf,
inproc: Arc<InProcess>,
live_config: Arc<LiveConfig>,
) -> io::Result<DaemonHandle> {
if socket_path.exists() {
let _ = std::fs::remove_file(&socket_path);
}
if let Some(parent) = socket_path.parent() {
std::fs::create_dir_all(parent)?;
}
let listener = UnixListener::bind(&socket_path)?;
info!(path = %socket_path.display(), "tear-daemon listening");
inproc.set_socket_path(socket_path.clone());
{
let kanshou_state = Arc::new(kanshou_state::TearDaemonState::new(inproc.clone()));
std::thread::Builder::new()
.name("tear-kanshou".into())
.spawn(move || {
match tokio::runtime::Builder::new_current_thread()
.enable_all()
.thread_name("tear-kanshou-tokio")
.build()
{
Ok(rt) => rt.block_on(async {
match kanshou_state::spawn_server("tear-daemon", kanshou_state) {
Ok(path) => {
info!(
socket = %path.display(),
"kanshou introspection live"
);
std::future::pending::<()>().await;
}
Err(e) => {
warn!(error = %e, "kanshou bind failed; introspection disabled");
}
}
}),
Err(e) => {
warn!(error = %e, "could not create kanshou tokio runtime");
}
}
})
.expect("spawn tear-kanshou thread");
}
{
let inproc_for_prune = inproc.clone();
thread::Builder::new()
.name("tear-orphan-prune".into())
.spawn(move || {
use tear_types::{MultiplexerControl, SessionSource};
let mut empty_since: std::collections::HashMap<
tear_types::SessionId,
std::time::Instant,
> = std::collections::HashMap::new();
let tick = std::time::Duration::from_secs(2);
let named_grace = std::time::Duration::from_secs(3);
let agent_grace = std::time::Duration::from_secs(15);
loop {
std::thread::sleep(tick);
let sessions = match inproc_for_prune.list_sessions() {
Ok(s) => s,
Err(_) => continue,
};
let now = std::time::Instant::now();
let mut alive_ids = std::collections::HashSet::new();
for s in &sessions {
alive_ids.insert(s.id);
let grace = match &s.source {
SessionSource::Human => {
empty_since.remove(&s.id);
continue;
}
SessionSource::Named(_) => named_grace,
SessionSource::Agent => agent_grace,
};
let any_subs = s.panes.keys().any(|pid| {
inproc_for_prune
.pane_subscriber_count(*pid)
.map(|n| n > 0)
.unwrap_or(false)
});
if any_subs {
empty_since.remove(&s.id);
} else {
let started = empty_since.entry(s.id).or_insert(now);
if now.duration_since(*started) >= grace {
info!(
session = %s.id,
name = %s.name,
source = %s.source.label(),
grace_secs = grace.as_secs(),
"orphan pruner: killing session"
);
let _ = inproc_for_prune.kill_session(s.id);
empty_since.remove(&s.id);
}
}
}
empty_since.retain(|k, _| alive_ids.contains(k));
}
})?;
}
let watcher = match live_config.spawn_watcher() {
Ok(w) => Some(w),
Err(e) => {
warn!(error = %e, "config file watcher could not start — manual ReloadConfig still works");
None
}
};
let audit = open_audit_from_config(&live_config);
let required_token = resolve_required_token(&live_config)?;
let praca = praca_store::PracaStore::open_default();
let stop = Arc::new(AtomicBool::new(false));
let stop_for_accept = stop.clone();
let inproc_for_accept = inproc.clone();
let socket_for_accept = socket_path.clone();
let config_for_accept = live_config.clone();
let audit_for_accept = audit.clone();
let token_for_accept = required_token.clone();
let praca_for_accept = praca.clone();
let accept_thread = thread::Builder::new()
.name("tear-daemon-accept".into())
.spawn(move || {
accept_loop(
listener,
stop_for_accept,
inproc_for_accept,
config_for_accept,
socket_for_accept,
audit_for_accept,
token_for_accept,
Some(praca_for_accept),
)
})?;
Ok(DaemonHandle {
socket_path,
stop,
accept_thread: Some(accept_thread),
_inproc: inproc,
config: live_config,
audit,
praca,
_config_watcher: watcher,
})
}
fn accept_loop_tcp(
listener: TcpListener,
stop: Arc<AtomicBool>,
inproc: Arc<InProcess>,
config: Arc<LiveConfig>,
audit: Option<AuditLog>,
required_token: Option<String>,
praca: Option<PracaStore>,
) {
listener
.set_nonblocking(true)
.expect("set_nonblocking on TcpListener");
loop {
if stop.load(Ordering::SeqCst) {
debug!("accept loop (tcp): stop requested");
return;
}
match listener.accept() {
Ok((stream, peer)) => {
debug!(peer = %peer, "tcp connection accepted");
let inproc_for_conn = inproc.clone();
let config_for_conn = config.clone();
let audit_for_conn = audit.clone();
let token_for_conn = required_token.clone();
let praca_for_conn = praca.clone();
let _ = thread::Builder::new()
.name("tear-daemon-conn-tcp".into())
.spawn(move || {
let _ = stream.set_nonblocking(false);
if let Err(e) = serve_connection_full(
stream,
inproc_for_conn,
config_for_conn,
audit_for_conn,
token_for_conn,
praca_for_conn,
) {
if e.kind() != io::ErrorKind::UnexpectedEof {
warn!(error = %e, "tcp connection ended");
}
}
});
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
std::thread::sleep(std::time::Duration::from_millis(50));
}
Err(e) => {
error!(error = %e, "tcp accept failed");
std::thread::sleep(std::time::Duration::from_millis(50));
}
}
}
}
fn accept_loop(
listener: UnixListener,
stop: Arc<AtomicBool>,
inproc: Arc<InProcess>,
config: Arc<LiveConfig>,
_socket_path: PathBuf,
audit: Option<AuditLog>,
required_token: Option<String>,
praca: Option<PracaStore>,
) {
for incoming in listener.incoming() {
if stop.load(Ordering::SeqCst) {
debug!("accept loop: stop requested");
return;
}
match incoming {
Ok(stream) => {
let inproc_for_conn = inproc.clone();
let config_for_conn = config.clone();
let audit_for_conn = audit.clone();
let token_for_conn = required_token.clone();
let praca_for_conn = praca.clone();
let _ = thread::Builder::new()
.name("tear-daemon-conn".into())
.spawn(move || {
if let Err(e) = serve_connection_full(
stream,
inproc_for_conn,
config_for_conn,
audit_for_conn,
token_for_conn,
praca_for_conn,
) {
if e.kind() != io::ErrorKind::UnexpectedEof {
warn!(error = %e, "connection ended");
}
}
});
}
Err(e) => {
error!(error = %e, "accept failed");
}
}
}
}
pub fn serve_connection<S: io::Read + io::Write>(
stream: S,
inproc: Arc<InProcess>,
config: Arc<LiveConfig>,
audit: Option<AuditLog>,
) -> io::Result<()> {
serve_connection_with_auth(stream, inproc, config, audit, None)
}
pub fn serve_connection_with_auth<S: io::Read + io::Write>(
stream: S,
inproc: Arc<InProcess>,
config: Arc<LiveConfig>,
audit: Option<AuditLog>,
required_token: Option<String>,
) -> io::Result<()> {
serve_connection_full(stream, inproc, config, audit, required_token, None)
}
pub fn serve_connection_full<S: io::Read + io::Write>(
mut stream: S,
inproc: Arc<InProcess>,
config: Arc<LiveConfig>,
audit: Option<AuditLog>,
required_token: Option<String>,
praca: Option<PracaStore>,
) -> io::Result<()> {
let mut authed = required_token.is_none();
let mut client_id: Option<u64> = None;
loop {
let req: Request = match read_msg(&mut stream) {
Ok(r) => r,
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(()),
Err(e) => return Err(e),
};
if let Request::Authenticate(presented) = &req {
let resp = match &required_token {
Some(expected) if ct_eq(expected.as_bytes(), presented.as_bytes()) => {
authed = true;
Response::Ok
}
Some(_) => Response::Err(tear_types::wire::WireError::Rejected(
"auth failed".into(),
)),
None => Response::Ok,
};
write_msg(&mut stream, &resp)?;
continue;
}
if !authed {
let resp = Response::Err(tear_types::wire::WireError::Rejected(
"authentication required: send Authenticate(token) first".into(),
));
write_msg(&mut stream, &resp)?;
continue;
}
if let Request::IdentifyClient(id) = &req {
client_id = Some(*id);
write_msg(&mut stream, &Response::Ok)?;
continue;
}
if let Request::SendKeys { id, .. } = &req {
if let Ok(pane) = inproc.get_pane(*id) {
if let Some(want) = pane.input_policy.leader_id() {
if client_id != Some(want) {
let resp = Response::Err(tear_types::wire::WireError::Rejected(
format!(
"leader policy: pane {id:?} requires client_id={want}, \
connection identified as {client_id:?}"
),
));
write_msg(&mut stream, &resp)?;
continue;
}
}
}
}
if let Request::Subscribe(pane) = req {
return serve_subscription(stream, inproc, pane);
}
if matches!(req, Request::SubscribeConfigChange) {
return serve_config_subscription(stream, config);
}
let resp = dispatch_with_config(&inproc, &config, req, audit.as_ref(), praca.as_ref());
write_msg(&mut stream, &resp)?;
}
}
fn ct_eq(a: &[u8], b: &[u8]) -> bool {
if a.len() != b.len() {
return false;
}
let mut diff: u8 = 0;
for (x, y) in a.iter().zip(b.iter()) {
diff |= x ^ y;
}
diff == 0
}
fn serve_subscription<S: io::Read + io::Write>(
mut stream: S,
inproc: Arc<InProcess>,
pane: tear_types::PaneId,
) -> io::Result<()> {
let rx = match inproc.subscribe_pane_bytes(pane) {
Ok(rx) => rx,
Err(e) => {
write_msg(&mut stream, &Response::Err(WireError::from(e)))?;
return Ok(());
}
};
write_msg(&mut stream, &Response::Ok)?;
match inproc.pane_snapshot(pane) {
Ok(snap) => {
let bytes = snap.to_ansi();
if !bytes.is_empty()
&& write_msg(&mut stream, &Response::PaneBytes(bytes)).is_err()
{
return Ok(());
}
}
Err(e) => {
tracing::debug!(?e, %pane, "pane_snapshot failed at subscribe time — skipping initial replay");
}
}
loop {
match rx.recv() {
Ok(bytes) => {
if write_msg(&mut stream, &Response::PaneBytes(bytes)).is_err() {
return Ok(());
}
}
Err(_) => {
let _ = write_msg(&mut stream, &Response::PaneClosed(pane));
return Ok(());
}
}
}
}
fn serve_config_subscription<S: io::Read + io::Write>(
mut stream: S,
config: Arc<LiveConfig>,
) -> io::Result<()> {
let rx = config.subscribe();
write_msg(&mut stream, &Response::Ok)?;
loop {
match rx.recv() {
Ok(new_cfg) => {
let yaml = match serde_yaml_ng::to_string(&*new_cfg) {
Ok(y) => y,
Err(_) => continue, };
if write_msg(&mut stream, &Response::ConfigChanged(yaml)).is_err() {
return Ok(());
}
}
Err(_) => return Ok(()),
}
}
}
pub fn dispatch(inproc: &InProcess, req: Request) -> Response {
match req {
Request::ListSessions => map_result(inproc.list_sessions(), Response::Sessions),
Request::GetSession(id) => map_result(inproc.get_session(id), Response::Session),
Request::GetWindow(id) => map_result(inproc.get_window(id), |(s, w)| Response::Window {
session: s,
window: w,
}),
Request::GetPane(id) => map_result(inproc.get_pane(id), Response::Pane),
Request::NewSession { name, shell, source, size_cells } => {
let src = source.unwrap_or_default();
let size = size_cells.unwrap_or((80, 24));
map_result(
inproc.new_session_with_source_and_size(&name, &shell, src, size),
Response::SessionId,
)
}
Request::RenameSession { id, new_name } => {
map_unit(inproc.rename_session(id, &new_name))
}
Request::KillSession(id) => map_unit(inproc.kill_session(id)),
Request::NewWindow { session, name, shell } => {
map_result(inproc.new_window(session, &name, &shell), Response::WindowId)
}
Request::KillWindow(id) => map_unit(inproc.kill_window(id)),
Request::SelectWindow(id) => map_unit(inproc.select_window(id)),
Request::SplitPane { origin, direction, shell } => {
map_result(inproc.split_pane(origin, direction, &shell), Response::PaneId)
}
Request::KillPane(id) => map_unit(inproc.kill_pane(id)),
Request::SelectPane(id) => map_unit(inproc.select_pane(id)),
Request::ResizePane { id, direction, delta_cells } => {
map_unit(inproc.resize_pane(id, direction, delta_cells))
}
Request::ApplyLayout { window, kind } => map_unit(inproc.apply_layout(window, kind)),
Request::SendKeys { id, bytes } => map_unit(inproc.send_keys(id, &bytes)),
Request::SetInputPolicy { id, policy } => {
map_unit(inproc.set_input_policy(id, policy))
}
Request::PaneSubscriberCount(id) => {
map_result(inproc.pane_subscriber_count(id), Response::SubscriberCount)
}
Request::StartPaneRecording(id) => map_unit(inproc.enable_pane_recording(id)),
Request::StopPaneRecording(id) => map_unit(inproc.disable_pane_recording(id)),
Request::ExportPaneRecording(id) => {
map_result(inproc.export_pane_recording(id), Response::CastJson)
}
Request::PaneRecordingStatus(id) => {
map_result(inproc.pane_recording_status(id), |(enabled, events)| {
Response::RecordingStatus { enabled, events }
})
}
Request::PaneBlocksList { pane, since_index, limit } => {
map_result(inproc.pane_blocks_list(pane, since_index, limit), Response::Blocks)
}
Request::PaneBlockAt { pane, index } => {
map_result(inproc.pane_block_at(pane, index), Response::Block)
}
Request::PaneBlocksStatus(id) => {
map_result(inproc.pane_blocks_status(id), |(total, in_progress)| {
Response::BlocksStatus { total, in_progress }
})
}
Request::PaneSnapshot(id) => map_result(inproc.pane_snapshot(id), Response::PaneSnapshot),
Request::PaneResizeAbsolute { id, cols, rows } => {
map_unit(inproc.pane_resize_absolute(id, cols, rows))
}
Request::SetSpawnEnv(env) => {
inproc.set_spawn_env(env);
Response::Ok
}
Request::Subscribe(_) => Response::Err(WireError::Rejected(
"Subscribe must be handled by serve_connection (push mode), not dispatch".into(),
)),
Request::GetConfig | Request::ReloadConfig | Request::SetConfig(_) => {
Response::Err(WireError::Rejected(
"config requests must be handled by dispatch_with_config (needs LiveConfig)"
.into(),
))
}
Request::SubscribeConfigChange => Response::Err(WireError::Rejected(
"SubscribeConfigChange must be handled by serve_connection (push mode), not dispatch"
.into(),
)),
Request::Authenticate(_) => Response::Ok,
Request::IdentifyClient(_) => Response::Ok,
}
}
pub fn dispatch_with_config(
inproc: &InProcess,
config: &LiveConfig,
req: Request,
audit: Option<&AuditLog>,
praca: Option<&PracaStore>,
) -> Response {
match req {
Request::GetConfig => {
let cfg = config.load();
match serde_yaml_ng::to_string(&*cfg) {
Ok(yaml) => Response::ConfigYaml(yaml),
Err(e) => Response::Err(WireError::Internal(format!(
"failed to serialise TearConfig as YAML: {e}"
))),
}
}
Request::ReloadConfig => match config.reload() {
Ok(()) => Response::Ok,
Err(e) => Response::Err(WireError::Internal(format!(
"config reload failed: {e}"
))),
},
Request::SetConfig(yaml) => {
match serde_yaml_ng::from_str::<tear_config::TearConfig>(&yaml) {
Ok(cfg) => {
let hash = hash_str(&yaml);
config.replace(cfg);
if let Some(a) = audit {
a.emit(&AuditEvent::SetConfig {
ts_ms: AuditEvent::now_ms(),
config_hash: hash,
});
}
Response::Ok
}
Err(e) => Response::Err(WireError::Rejected(format!(
"SetConfig payload did not parse as TearConfig YAML: {e}"
))),
}
}
Request::KillSession(id) => {
if let Some(dir) = config.load().recording_auto_dir.clone() {
flush_session_recordings(inproc, id, &dir);
}
let resp = map_unit(inproc.kill_session(id));
if matches!(resp, Response::Ok) {
if let Some(a) = audit {
a.emit(&AuditEvent::SessionKill {
ts_ms: AuditEvent::now_ms(),
sid: id.to_string(),
});
}
if let Some(store) = praca {
praca_on_session_kill(store, id);
}
}
resp
}
Request::NewSession { name, shell, source, size_cells } => {
let src = source.unwrap_or_default();
let size = size_cells.unwrap_or((80, 24));
let result = inproc.new_session_with_source_and_size(&name, &shell, src.clone(), size);
if let Ok(sid) = &result {
if let Some(a) = audit {
a.emit(&AuditEvent::SessionCreate {
ts_ms: AuditEvent::now_ms(),
sid: sid.to_string(),
name: name.clone(),
shell: shell.clone(),
source: src.label().to_string(),
});
}
if let Some(store) = praca {
praca_on_session_create(store, inproc, *sid, now_unix());
}
}
map_result(result, Response::SessionId)
}
Request::SetInputPolicy { id, policy } => {
let resp = map_unit(inproc.set_input_policy(id, policy));
if matches!(resp, Response::Ok) {
if let Some(a) = audit {
a.emit(&AuditEvent::SetInputPolicy {
ts_ms: AuditEvent::now_ms(),
pid: id.to_string(),
policy: policy.label().to_string(),
});
}
}
resp
}
Request::StartPaneRecording(id) => {
let resp = map_unit(inproc.enable_pane_recording(id));
if matches!(resp, Response::Ok) {
if let Some(a) = audit {
a.emit(&AuditEvent::StartRecording {
ts_ms: AuditEvent::now_ms(),
pid: id.to_string(),
});
}
}
resp
}
Request::StopPaneRecording(id) => {
let resp = map_unit(inproc.disable_pane_recording(id));
if matches!(resp, Response::Ok) {
if let Some(a) = audit {
a.emit(&AuditEvent::StopRecording {
ts_ms: AuditEvent::now_ms(),
pid: id.to_string(),
});
}
}
resp
}
Request::RenameSession { id, new_name } => {
let resp = map_unit(inproc.rename_session(id, &new_name));
if matches!(resp, Response::Ok) {
if let Some(store) = praca {
praca_on_session_rename(store, id, &new_name);
}
}
resp
}
other => dispatch(inproc, other),
}
}
fn praca_on_session_rename(store: &PracaStore, id: tear_types::SessionId, new_name: &str) {
store.mutate(|p| {
if let Some(rec) = p.index.get_mut(id) {
rec.rename(new_name);
}
});
debug!(session = %id, "praca: renamed session record");
}
use crate::praca_store::PracaStore;
fn praca_on_session_create(
store: &PracaStore,
inproc: &InProcess,
id: tear_types::SessionId,
now: u64,
) {
let Some(cwd) = inproc.spawn_cwd() else {
debug!(session = %id, "praca: no spawn cwd — skipping binding");
return;
};
let project = praca::find_project_root(&cwd);
store.mutate(|p| {
let prior_visits = p.index.get(id).map_or(0, |r| r.visits);
let (mut rec, anchor) = match &project {
Some(root) => (
praca::SessionRecord::for_project(id, root.clone(), p.name_style, now),
root.clone(),
),
None => {
let seed = id.0; let theme = if seed % 2 == 0 {
praca::SessionTheme::Frost
} else {
praca::SessionTheme::Brazil
};
(
praca::SessionRecord::for_adhoc(id, seed, theme, cwd.clone(), p.name_style, now),
cwd.clone(),
)
}
};
rec.visits = rec.visits.saturating_add(prior_visits);
rec.cwd.clone_from(&cwd);
p.index.upsert(rec);
p.binding.bind(anchor, id);
});
debug!(session = %id, ?project, "praca: registered session");
}
fn praca_on_session_kill(store: &PracaStore, id: tear_types::SessionId) {
store.mutate(|p| {
p.index.remove(id);
p.binding.remove_session(id);
});
debug!(session = %id, "praca: unbound killed session");
}
fn now_unix() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_secs())
}
fn hash_str(s: &str) -> String {
let hash = blake3::hash(s.as_bytes());
hash.to_hex().to_string()
}
fn flush_session_recordings(inproc: &InProcess, session: tear_types::SessionId, dir: &str) {
use std::time::{SystemTime, UNIX_EPOCH};
let expanded = tear_types::path::expand_tilde(dir);
if std::fs::create_dir_all(&expanded).is_err() {
warn!(dir = %expanded, "auto-flush: mkdir failed; skipping");
return;
}
let panes: Vec<tear_types::PaneId> = match inproc.get_session(session) {
Ok(s) => s.panes.keys().copied().collect(),
Err(_) => return,
};
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
for pane in panes {
match inproc.pane_recording_status(pane) {
Ok((_, events)) if events > 0 => {}
_ => continue,
}
match inproc.export_pane_recording(pane) {
Ok(cast) => {
let path = std::path::Path::new(&expanded)
.join(format!("{session}-{ts}-{pane}.cast"));
if let Err(e) = std::fs::write(&path, cast.as_bytes()) {
warn!(path = %path.display(), error = %e, "auto-flush: write failed");
} else {
info!(path = %path.display(), pane = %pane, "auto-flushed recording");
}
}
Err(_) => continue,
}
}
}
fn map_result<T, F: FnOnce(T) -> Response>(
r: tear_types::ControlResult<T>,
ok: F,
) -> Response {
match r {
Ok(v) => ok(v),
Err(e) => Response::Err(WireError::from(e)),
}
}
fn map_unit(r: tear_types::ControlResult<()>) -> Response {
match r {
Ok(()) => Response::Ok,
Err(e) => Response::Err(WireError::from(e)),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn dispatch_list_sessions_on_fresh_inproc_returns_empty() {
let inproc = InProcess::new();
let resp = dispatch(&inproc, Request::ListSessions);
match resp {
Response::Sessions(v) => assert!(v.is_empty()),
other => panic!("unexpected response: {other:?}"),
}
}
#[test]
fn dispatch_new_session_then_list_sees_it() {
let inproc = InProcess::new();
let resp = dispatch(
&inproc,
Request::NewSession {
name: "work".into(),
shell: "/bin/sh".into(),
source: None,
size_cells: None,
},
);
let session_id = match resp {
Response::SessionId(id) => id,
other => panic!("unexpected response: {other:?}"),
};
let listed = dispatch(&inproc, Request::ListSessions);
match listed {
Response::Sessions(v) => {
assert_eq!(v.len(), 1);
assert_eq!(v[0].id, session_id);
}
other => panic!("unexpected response: {other:?}"),
}
}
#[test]
fn dispatch_get_nonexistent_session_returns_wire_error() {
let inproc = InProcess::new();
let bogus = tear_types::SessionId::from_seed("nope");
let resp = dispatch(&inproc, Request::GetSession(bogus));
match resp {
Response::Err(WireError::NoSuchSession(id)) => assert_eq!(id, bogus),
other => panic!("unexpected response: {other:?}"),
}
}
#[test]
fn round_trip_via_in_memory_buffer() {
let inproc = Arc::new(InProcess::new());
let mut buf = Vec::new();
write_msg(&mut buf, &Request::ListSessions).unwrap();
let mut cur = Cursor::new(buf);
let req: Request = read_msg(&mut cur).unwrap();
let resp = dispatch(&inproc, req);
let mut out = Vec::new();
write_msg(&mut out, &resp).unwrap();
let mut out_cur = Cursor::new(out);
let got: Response = read_msg(&mut out_cur).unwrap();
assert!(matches!(got, Response::Sessions(v) if v.is_empty()));
}
#[test]
fn dispatch_subscribe_in_dispatch_path_returns_rejected() {
let inproc = InProcess::new();
let bogus = tear_types::PaneId::from_seed("nope");
let resp = dispatch(&inproc, Request::Subscribe(bogus));
match resp {
Response::Err(WireError::Rejected(msg)) => {
assert!(msg.contains("serve_connection"));
}
other => panic!("expected Rejected, got {other:?}"),
}
}
#[test]
fn dispatch_pane_resize_absolute_on_nonexistent_pane_returns_nosuch() {
let inproc = InProcess::new();
let pane = tear_types::PaneId::from_seed("nope");
let resp = dispatch(
&inproc,
Request::PaneResizeAbsolute {
id: pane,
cols: 80,
rows: 24,
},
);
match resp {
Response::Err(WireError::NoSuchPane(p)) => assert_eq!(p, pane),
other => panic!("expected NoSuchPane, got {other:?}"),
}
}
#[test]
fn dispatch_pane_snapshot_on_nonexistent_pane_returns_nosuch() {
let inproc = InProcess::new();
let pane = tear_types::PaneId::from_seed("nope");
let resp = dispatch(&inproc, Request::PaneSnapshot(pane));
match resp {
Response::Err(WireError::NoSuchPane(p)) => assert_eq!(p, pane),
other => panic!("expected NoSuchPane, got {other:?}"),
}
}
#[test]
fn dispatch_kill_then_get_session_returns_nosuch() {
let inproc = InProcess::new();
let sid = match dispatch(
&inproc,
Request::NewSession {
name: "x".into(),
shell: "/bin/sh".into(),
source: None,
size_cells: None,
},
) {
Response::SessionId(s) => s,
other => panic!("unexpected: {other:?}"),
};
match dispatch(&inproc, Request::KillSession(sid)) {
Response::Ok => {}
other => panic!("expected Ok, got {other:?}"),
}
match dispatch(&inproc, Request::GetSession(sid)) {
Response::Err(WireError::NoSuchSession(s)) => assert_eq!(s, sid),
other => panic!("expected NoSuchSession, got {other:?}"),
}
}
#[test]
fn kill_session_with_recording_auto_dir_writes_cast() {
let inproc = std::sync::Arc::new(InProcess::new());
let tmp = tempfile_path();
let mut cfg = tear_config::TearConfig::default();
cfg.recording_auto_dir = Some(tmp.to_string_lossy().into());
let live = std::sync::Arc::new(LiveConfig::default());
live.replace(cfg);
let sid = inproc
.new_session("auto-flush-test", "/bin/sh")
.expect("new_session");
let pane_id = *inproc
.get_session(sid)
.unwrap()
.panes
.keys()
.next()
.unwrap();
inproc.enable_pane_recording(pane_id).unwrap();
std::thread::sleep(std::time::Duration::from_millis(200));
inproc
.send_keys(pane_id, b"echo hello\n")
.expect("send_keys");
std::thread::sleep(std::time::Duration::from_millis(200));
match dispatch_with_config(&inproc, &live, Request::KillSession(sid), None, None) {
Response::Ok => {}
other => panic!("expected Ok, got {other:?}"),
}
let entries: Vec<_> = std::fs::read_dir(&tmp)
.expect("read_dir")
.filter_map(|e| e.ok())
.map(|e| e.file_name().to_string_lossy().into_owned())
.collect();
let sid_prefix = format!("{sid}-");
let hits: Vec<&String> = entries
.iter()
.filter(|n| n.starts_with(&sid_prefix) && n.ends_with(".cast"))
.collect();
assert_eq!(
hits.len(),
1,
"expected one auto-flushed cast, found {entries:?}"
);
let _ = std::fs::remove_dir_all(&tmp);
}
fn tempfile_path() -> std::path::PathBuf {
let mut p = std::env::temp_dir();
let pid = std::process::id();
let nonce: u64 = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
p.push(format!("tear-auto-flush-{pid}-{nonce}"));
std::fs::create_dir_all(&p).unwrap();
p
}
#[test]
fn ct_eq_returns_true_for_matching_bytes() {
assert!(ct_eq(b"abc", b"abc"));
assert!(ct_eq(b"", b""));
}
#[test]
fn ct_eq_returns_false_for_differing_bytes_or_lengths() {
assert!(!ct_eq(b"abc", b"abd"));
assert!(!ct_eq(b"abc", b"abcd"));
assert!(!ct_eq(b"abc", b""));
}
#[test]
fn auth_required_rejects_other_requests_until_authenticate() {
use crate::testing::{drain_responses, DuplexStream};
use std::sync::mpsc::channel;
let mut input = Vec::new();
write_msg(&mut input, &Request::ListSessions).unwrap();
write_msg(&mut input, &Request::Authenticate("wrong".into())).unwrap();
write_msg(&mut input, &Request::Authenticate("secret".into())).unwrap();
write_msg(&mut input, &Request::ListSessions).unwrap();
let (tx, rx) = channel::<u8>();
let stream = DuplexStream::new(input, tx);
let inproc = Arc::new(InProcess::new());
let live = Arc::new(LiveConfig::default());
let _ = serve_connection_with_auth(
stream,
inproc,
live,
None,
Some("secret".into()),
);
let resps = drain_responses(&rx);
assert_eq!(resps.len(), 4, "got: {resps:?}");
assert!(matches!(resps[0], Response::Err(WireError::Rejected(_))));
assert!(matches!(resps[1], Response::Err(WireError::Rejected(_))));
assert!(matches!(resps[2], Response::Ok));
assert!(matches!(resps[3], Response::Sessions(_)));
}
#[test]
fn no_auth_required_accepts_requests_immediately() {
use crate::testing::{drain_responses, DuplexStream};
use std::sync::mpsc::channel;
let mut input = Vec::new();
write_msg(&mut input, &Request::ListSessions).unwrap();
let (tx, rx) = channel::<u8>();
let stream = DuplexStream::new(input, tx);
let inproc = Arc::new(InProcess::new());
let live = Arc::new(LiveConfig::default());
let _ = serve_connection_with_auth(stream, inproc, live, None, None);
let resps = drain_responses(&rx);
assert!(matches!(resps.first(), Some(Response::Sessions(_))), "got: {resps:?}");
}
#[test]
fn re_authenticate_after_success_returns_ok_again() {
use crate::testing::{drain_responses, DuplexStream};
use std::sync::mpsc::channel;
let mut input = Vec::new();
write_msg(&mut input, &Request::Authenticate("k".into())).unwrap();
write_msg(&mut input, &Request::Authenticate("k".into())).unwrap();
let (tx, rx) = channel::<u8>();
let stream = DuplexStream::new(input, tx);
let inproc = Arc::new(InProcess::new());
let live = Arc::new(LiveConfig::default());
let _ = serve_connection_with_auth(stream, inproc, live, None, Some("k".into()));
let resps = drain_responses(&rx);
assert_eq!(resps.len(), 2);
assert!(matches!(resps[0], Response::Ok));
assert!(matches!(resps[1], Response::Ok));
}
#[test]
fn pre_emptive_authenticate_on_no_auth_daemon_is_ok() {
use crate::testing::{drain_responses, DuplexStream};
use std::sync::mpsc::channel;
let mut input = Vec::new();
write_msg(&mut input, &Request::Authenticate("anything".into())).unwrap();
let (tx, rx) = channel::<u8>();
let stream = DuplexStream::new(input, tx);
let inproc = Arc::new(InProcess::new());
let live = Arc::new(LiveConfig::default());
let _ = serve_connection_with_auth(stream, inproc, live, None, None);
let resps = drain_responses(&rx);
assert!(matches!(resps.first(), Some(Response::Ok)), "got: {resps:?}");
}
#[test]
fn identify_client_is_idempotent_and_returns_ok() {
use crate::testing::{drain_responses, DuplexStream};
use std::sync::mpsc::channel;
let mut input = Vec::new();
write_msg(&mut input, &Request::IdentifyClient(1)).unwrap();
write_msg(&mut input, &Request::IdentifyClient(99)).unwrap();
write_msg(&mut input, &Request::ListSessions).unwrap();
let (tx, rx) = channel::<u8>();
let stream = DuplexStream::new(input, tx);
let inproc = Arc::new(InProcess::new());
let live = Arc::new(LiveConfig::default());
let _ = serve_connection_with_auth(stream, inproc, live, None, None);
let resps = drain_responses(&rx);
assert_eq!(resps.len(), 3);
assert!(matches!(resps[0], Response::Ok));
assert!(matches!(resps[1], Response::Ok));
assert!(matches!(resps[2], Response::Sessions(_)));
}
#[test]
fn resolve_required_token_missing_env_errors() {
let live = LiveConfig::default();
let mut cfg = tear_config::TearConfig::default();
cfg.auth_token_env = Some("__TEAR_NO_SUCH_VAR_FOR_TESTS__".into());
live.replace(cfg);
let err = resolve_required_token(&live).expect_err("missing env must error");
assert_eq!(err.kind(), io::ErrorKind::PermissionDenied);
let msg = format!("{err}");
assert!(msg.contains("__TEAR_NO_SUCH_VAR_FOR_TESTS__"), "msg: {msg}");
}
#[test]
fn resolve_required_token_none_when_unset() {
let live = LiveConfig::default();
assert!(resolve_required_token(&live).unwrap().is_none());
}
#[test]
fn dispatch_with_config_set_config_emits_set_config_audit_event() {
let tmp = tempfile_path();
let log_path = tmp.join("audit.jsonl");
let audit = AuditLog::open(log_path.to_str().unwrap()).unwrap();
let inproc = InProcess::new();
let live = LiveConfig::default();
let yaml = serde_yaml_ng::to_string(&tear_config::TearConfig::default()).unwrap();
let resp = dispatch_with_config(
&inproc,
&live,
Request::SetConfig(yaml),
Some(&audit),
None,
);
assert!(matches!(resp, Response::Ok));
drop(audit);
let content = std::fs::read_to_string(&log_path).unwrap();
assert!(content.contains("set_config"), "audit: {content}");
assert!(content.contains("config_hash"), "audit: {content}");
let _ = std::fs::remove_dir_all(&tmp);
}
#[test]
fn dispatch_with_config_kill_session_emits_session_kill_event() {
let tmp = tempfile_path();
let log_path = tmp.join("audit.jsonl");
let audit = AuditLog::open(log_path.to_str().unwrap()).unwrap();
let inproc = InProcess::new();
let live = LiveConfig::default();
let sid = inproc.new_session("audit-kill-test", "/bin/sh").unwrap();
let resp = dispatch_with_config(
&inproc,
&live,
Request::KillSession(sid),
Some(&audit),
None,
);
assert!(matches!(resp, Response::Ok));
drop(audit);
let content = std::fs::read_to_string(&log_path).unwrap();
assert!(content.contains("session_kill"), "audit: {content}");
assert!(content.contains(&sid.to_string()), "audit: {content}");
let _ = std::fs::remove_dir_all(&tmp);
}
fn praca_temp_path(tag: &str) -> std::path::PathBuf {
let mut p = std::env::temp_dir();
let pid = std::process::id();
let nonce: u128 = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
p.push(format!("tear-praca-daemon-{tag}-{pid}-{nonce}"));
std::fs::create_dir_all(&p).unwrap();
p.join("praca.json")
}
#[test]
fn new_session_via_dispatch_binds_project_root_to_persisted_store() {
let path = praca_temp_path("new-session-binds");
let store = crate::praca_store::PracaStore::open(path.clone());
let inproc = InProcess::new();
let live = LiveConfig::default();
let cwd = path.parent().unwrap().to_path_buf();
inproc.set_spawn_env(
tear_types::SpawnEnv::none().with_cwd(Some(cwd.to_string_lossy().into())),
);
let project = praca::project_root(&cwd);
let sid = match dispatch_with_config(
&inproc,
&live,
Request::NewSession {
name: "praca-bind".into(),
shell: "/bin/sh".into(),
source: None,
size_cells: None,
},
None,
Some(&store),
) {
Response::SessionId(s) => s,
other => panic!("expected SessionId, got {other:?}"),
};
store.with(|p| {
assert_eq!(
p.binding.lookup(&project),
Some(sid),
"live store bound the project root"
);
});
assert!(path.exists(), "store persisted the binding to disk");
let reloaded = crate::praca_store::PracaStore::open(path.clone());
reloaded.with(|p| {
assert_eq!(
p.binding.lookup(&project),
Some(sid),
"binding survived a daemon restart"
);
let rec = p.index.get(sid).expect("record persisted");
assert_eq!(rec.project_root, project);
assert!(rec.visits >= 1, "frecency visit recorded");
});
let _ = inproc.kill_session(sid);
let _ = std::fs::remove_dir_all(path.parent().unwrap());
}
#[test]
fn kill_session_via_dispatch_unbinds_in_persisted_store() {
let path = praca_temp_path("kill-unbinds");
let store = crate::praca_store::PracaStore::open(path.clone());
let inproc = InProcess::new();
let live = LiveConfig::default();
let cwd = path.parent().unwrap().to_path_buf();
inproc.set_spawn_env(
tear_types::SpawnEnv::none().with_cwd(Some(cwd.to_string_lossy().into())),
);
let project = praca::project_root(&cwd);
let sid = match dispatch_with_config(
&inproc,
&live,
Request::NewSession {
name: "praca-kill".into(),
shell: "/bin/sh".into(),
source: None,
size_cells: None,
},
None,
Some(&store),
) {
Response::SessionId(s) => s,
other => panic!("expected SessionId, got {other:?}"),
};
store.with(|p| assert!(p.binding.lookup(&project).is_some()));
match dispatch_with_config(
&inproc,
&live,
Request::KillSession(sid),
None,
Some(&store),
) {
Response::Ok => {}
other => panic!("expected Ok, got {other:?}"),
}
store.with(|p| {
assert!(
p.binding.lookup(&project).is_none(),
"killed session's binding removed"
);
assert!(p.index.get(sid).is_none(), "killed session's record removed");
});
let reloaded = crate::praca_store::PracaStore::open(path.clone());
reloaded.with(|p| assert!(p.binding.lookup(&project).is_none()));
let _ = std::fs::remove_dir_all(path.parent().unwrap());
}
#[test]
fn new_session_without_spawn_cwd_skips_binding() {
let path = praca_temp_path("no-cwd");
let store = crate::praca_store::PracaStore::open(path.clone());
let inproc = InProcess::new();
let live = LiveConfig::default();
let _sid = match dispatch_with_config(
&inproc,
&live,
Request::NewSession {
name: "no-cwd".into(),
shell: "/bin/sh".into(),
source: None,
size_cells: None,
},
None,
Some(&store),
) {
Response::SessionId(s) => s,
other => panic!("expected SessionId, got {other:?}"),
};
store.with(|p| {
assert!(p.binding.is_empty(), "no spawn cwd → no binding");
assert!(p.index.is_empty(), "no spawn cwd → no record");
});
let _ = std::fs::remove_dir_all(path.parent().unwrap());
}
}