use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use anyhow::Result;
use pixtuoid_core::source::antigravity::AntigravitySource;
use pixtuoid_core::source::claude_code::ClaudeCodeSource;
use pixtuoid_core::source::codex::CodexSource;
use pixtuoid_core::source::copilot::CopilotSource;
use pixtuoid_core::source::daemon::{self, PresenceMsg};
use pixtuoid_core::source::hook::HookRouter;
use pixtuoid_core::source::jsonl::ChildEndUnclaims;
use pixtuoid_core::source::manager::SourceManager;
use pixtuoid_core::source::registry;
use pixtuoid_core::source::DynSource;
use pixtuoid_core::state::MAX_FLOORS;
use pixtuoid_core::{AgentEvent, Reducer, SceneState, TaggedReceiver, Transport};
use tokio::sync::{mpsc, watch};
use super::{
boot_capacities_for, cap_boot_capacities, summarize, ConnectedSources, RunConfig, SceneRx,
FALLBACK_DESKS,
};
pub fn run(cfg: RunConfig) -> Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
rt.block_on(async move { run_async(cfg).await })
}
async fn run_async(cfg: RunConfig) -> Result<()> {
let RunConfig {
socket,
projects_root,
codex_sessions_root,
pack_dir,
desk_cap,
headless,
config_path,
theme,
pets,
connected,
log_path,
} = cfg;
let connected = ConnectedSources::new(connected);
let socket_path = socket.unwrap_or_else(ClaudeCodeSource::default_socket_path);
let (presence_tx, presence_rx) = tokio::sync::mpsc::unbounded_channel::<PresenceMsg>();
let presence_exit_watch = daemon::spawn_presence_exit_watch(presence_tx.clone());
let sources = build_source_set(
socket_path.clone(),
projects_root,
codex_sessions_root,
Some(presence_tx),
);
let (tx, rx) = mpsc::channel::<(Transport, AgentEvent)>(256);
let boot_caps: [usize; MAX_FLOORS] = match (desk_cap, headless) {
(Some(cap), true) => [cap; MAX_FLOORS],
(None, true) => [FALLBACK_DESKS; MAX_FLOORS],
(cap, false) => cap_boot_capacities(compute_boot_capacities(), cap),
};
let (scene_tx, scene_rx) = watch::channel(Arc::new(SceneState::new(boot_caps)));
let floor_caps: Arc<[AtomicUsize; MAX_FLOORS]> =
Arc::new(std::array::from_fn(|i| AtomicUsize::new(boot_caps[i])));
tokio::spawn(reducer_task(
rx,
scene_tx,
Arc::clone(&floor_caps),
connected.clone(),
presence_rx,
presence_exit_watch,
));
let (health_tx, health_rx) = tokio::sync::watch::channel(Vec::new());
let mut manager = SourceManager::new();
for src in sources {
manager = manager.with_source(src);
}
let _source_handles = manager.spawn_with_health(tx, health_tx);
if headless {
headless_loop(scene_rx, health_rx).await
} else {
crate::tui::run_tui(
scene_rx,
pack_dir,
floor_caps,
theme,
config_path,
desk_cap,
pets,
health_rx,
socket_path,
connected,
log_path,
)
.await
}
}
fn build_source_set(
socket_path: PathBuf,
projects_root: Option<PathBuf>,
codex_sessions_root: Option<PathBuf>,
presence_tx: Option<daemon::PresenceSender>,
) -> Vec<Box<dyn DynSource>> {
let mut cc_src = ClaudeCodeSource::default_paths();
if let Some(p) = projects_root {
cc_src.projects_root = p;
}
let ag_src = AntigravitySource::default_paths();
let copilot_src = CopilotSource::default_paths();
let mut codex_src = CodexSource::default_paths();
if let Some(p) = codex_sessions_root {
codex_src.sessions_root = p;
}
let child_end_unclaims = ChildEndUnclaims::new();
cc_src.child_end_unclaims = Some(child_end_unclaims.clone());
codex_src.child_end_unclaims = Some(child_end_unclaims.clone());
let hook_router = HookRouter::new(socket_path)
.with_child_end_unclaims(Some(child_end_unclaims))
.with_presence_tx(presence_tx);
vec![
Box::new(hook_router) as Box<dyn DynSource>,
Box::new(cc_src),
Box::new(ag_src),
Box::new(codex_src),
Box::new(copilot_src),
]
}
fn event_source<'a>(scene: &'a SceneState, ev: &'a AgentEvent) -> Option<&'a str> {
match ev {
AgentEvent::SessionStart { source, .. } | AgentEvent::Identity { source, .. }
if !source.is_empty() =>
{
Some(source)
}
_ => scene.agents.get(&ev.agent_id()).map(|s| s.source.as_ref()),
}
}
async fn reducer_task(
mut rx: TaggedReceiver,
scene_tx: watch::Sender<Arc<SceneState>>,
floor_caps: Arc<[AtomicUsize; MAX_FLOORS]>,
connected: ConnectedSources,
mut presence_rx: tokio::sync::mpsc::UnboundedReceiver<PresenceMsg>,
presence_exit_watch: Option<daemon::PresenceExitWatch>,
) {
let mut reducer = Reducer::new();
let mut presence_open = true;
let initial_caps: [usize; MAX_FLOORS] =
std::array::from_fn(|i| floor_caps[i].load(Ordering::Relaxed));
let mut scene = SceneState::new(initial_caps);
let mut sweep_interval = tokio::time::interval(Duration::from_secs(1));
sweep_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
for (i, a) in floor_caps.iter().enumerate() {
scene.floor_capacities[i] = a.load(Ordering::Relaxed);
}
tokio::select! {
event = rx.recv() => {
let Some((transport, ev)) = event else { break };
let now = SystemTime::now();
if event_source(&scene, &ev).is_some_and(|src| !connected.is_connected(src)) {
continue;
}
tracing::debug!(?transport, ?ev, "event");
reducer.apply(&mut scene, ev, now, transport);
if scene_tx.send(Arc::new(scene.clone())).is_err() {
tracing::warn!("scene channel closed — renderer dropped");
break;
}
}
update = presence_rx.recv(), if presence_open => {
match update {
Some(PresenceMsg {
source,
delta: update,
}) => {
let now = SystemTime::now();
if connected.is_connected(&source) {
if let Some(ew) = presence_exit_watch.as_ref() {
if let Some(pid) = update.armable_pid() {
ew.watch(&source, pid);
}
}
daemon::apply_presence(&mut scene, &source, update, now);
if scene_tx.send(Arc::new(scene.clone())).is_err() {
tracing::warn!("scene channel closed — renderer dropped");
break;
}
}
}
None => presence_open = false,
}
}
_ = sweep_interval.tick() => {
let now = SystemTime::now();
let cur = connected.snapshot();
reducer.reconcile_connected(&mut scene, &cur, now);
reducer.tick(&mut scene, now);
for (source, ttl) in registry::daemon_sources() {
if !connected.is_connected(source) {
daemon::mark_presence_down(&mut scene, source, now);
}
daemon::sweep_presence_ttl(&mut scene, source, ttl, now);
}
if scene_tx.send(Arc::new(scene.clone())).is_err() {
tracing::warn!("scene channel closed — renderer dropped");
break;
}
}
}
}
}
async fn headless_loop(
scene_rx: SceneRx,
health_rx: tokio::sync::watch::Receiver<Vec<pixtuoid_core::source::manager::SourceDeath>>,
) -> Result<()> {
headless_loop_with_signal(scene_rx, health_rx, Box::pin(tokio::signal::ctrl_c())).await
}
async fn headless_loop_with_signal(
mut scene_rx: SceneRx,
mut health_rx: tokio::sync::watch::Receiver<Vec<pixtuoid_core::source::manager::SourceDeath>>,
mut ctrl_c: std::pin::Pin<Box<dyn std::future::Future<Output = std::io::Result<()>> + Send>>,
) -> Result<()> {
tracing::info!("pixtuoid headless mode — Ctrl-C to quit");
let mut prev_summary = String::new();
let mut deaths_seen = 0usize;
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(200)) => {
let snapshot = scene_rx.borrow_and_update().clone();
let summary = summarize(&snapshot);
if summary != prev_summary {
println!("{summary}");
prev_summary = summary;
}
}
Ok(()) = health_rx.changed() => {
let deaths = health_rx.borrow_and_update().clone();
for d in deaths.iter().skip(deaths_seen) {
println!("{}", super::format_source_death(d));
}
deaths_seen = deaths.len();
}
res = &mut ctrl_c => match res {
Ok(()) => {
tracing::info!("shutting down");
return Ok(());
}
Err(e) => {
tracing::error!(
%e,
"Ctrl-C handler registration failed — headless loop \
continues; SIGINT falls back to the default disposition"
);
ctrl_c = Box::pin(std::future::pending());
}
}
}
}
}
fn compute_boot_capacities() -> [usize; MAX_FLOORS] {
match crossterm::terminal::size().ok() {
Some((cols, rows)) => boot_capacities_for(cols, rows),
None => [FALLBACK_DESKS; MAX_FLOORS],
}
}
#[cfg(test)]
mod tests {
use super::*;
use pixtuoid_core::source::manager::SourceDeath;
type HealthPair = (
watch::Sender<Vec<SourceDeath>>,
watch::Receiver<Vec<SourceDeath>>,
);
fn channels() -> (watch::Sender<Arc<SceneState>>, SceneRx, HealthPair) {
let (scene_tx, scene_rx) =
watch::channel(Arc::new(SceneState::new([FALLBACK_DESKS; MAX_FLOORS])));
(scene_tx, scene_rx, watch::channel(Vec::new()))
}
#[test]
fn build_source_set_wires_every_transcript_bearing_source_plus_the_hook_router() {
use pixtuoid_core::source::{registry::descriptor_for, REGISTERED_SOURCES};
use std::collections::BTreeSet;
let sources = build_source_set(PathBuf::from("/tmp/pixtuoid-test.sock"), None, None, None);
let built: BTreeSet<&str> = sources.iter().map(|s| s.name()).collect();
assert!(
built.contains("hook-router"),
"the shared-socket HookRouter must be spawned (else hook signals never decode)"
);
let transcript_built: BTreeSet<&str> = built
.iter()
.copied()
.filter(|&n| n != "hook-router")
.collect();
let expected: BTreeSet<&str> = REGISTERED_SOURCES
.iter()
.copied()
.filter(|&name| descriptor_for(name).is_some_and(|d| d.line_decoder().is_some()))
.collect();
assert_eq!(
transcript_built, expected,
"run_async's transcript-source wiring diverged from the registry: a \
transcript-bearing source is registered but not built (it would never \
spawn), or a built source isn't registered"
);
}
#[test]
fn event_source_extracts_source_for_the_connection_gate() {
use pixtuoid_core::AgentId;
let now = SystemTime::now();
let mut scene = SceneState::new([FALLBACK_DESKS; MAX_FLOORS]);
let mut reducer = Reducer::new();
let id = AgentId::from_transcript_path("/p/a.jsonl");
let ss = AgentEvent::SessionStart {
agent_id: id,
source: "claude-code".into(),
session_id: "s".into(),
cwd: PathBuf::from("/repo"),
parent_id: None,
};
assert_eq!(event_source(&scene, &ss), Some("claude-code"));
let idy = AgentEvent::Identity {
agent_id: id,
source: "codex".into(),
session_id: "s".into(),
cwd: None,
};
assert_eq!(event_source(&scene, &idy), Some("codex"));
let act = AgentEvent::ActivityStart {
agent_id: id,
tool_use_id: None,
detail: None,
};
assert_eq!(event_source(&scene, &act), None);
reducer.apply(&mut scene, ss, now, Transport::Jsonl);
assert_eq!(event_source(&scene, &act), Some("claude-code"));
let empty = AgentEvent::Identity {
agent_id: id,
source: String::new(),
session_id: "s".into(),
cwd: None,
};
assert_eq!(event_source(&scene, &empty), Some("claude-code"));
}
#[tokio::test(start_paused = true)]
async fn headless_loop_shuts_down_on_a_delivered_signal() {
let (_scene_tx, scene_rx, (_health_tx, health_rx)) = channels();
headless_loop_with_signal(scene_rx, health_rx, Box::pin(async { Ok(()) }))
.await
.expect("a delivered Ctrl-C is a clean shutdown");
}
#[tokio::test(start_paused = true)]
async fn headless_loop_keeps_serving_after_a_failed_signal_registration() {
let (_scene_tx, scene_rx, (_health_tx, health_rx)) = channels();
let res = tokio::time::timeout(
Duration::from_secs(5),
headless_loop_with_signal(
scene_rx,
health_rx,
Box::pin(async { Err(std::io::Error::other("sigaction denied")) }),
),
)
.await;
assert!(
res.is_err(),
"the loop must still be running after a failed signal registration, got {res:?}"
);
}
}