1use std::collections::{HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::Write;
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::context::SharedProgressSender;
16use crate::harness::Harness;
17use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
18
19#[cfg(unix)]
20use std::os::unix::process::CommandExt;
21#[cfg(windows)]
22use std::os::windows::process::CommandExt;
23
24use super::buffer::{combine_streams, BgBuffer, TokenCountInput};
25use super::persistence::{
26 create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
27 task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
28 ExitMarker, PersistedTask, TaskPaths,
29};
30use super::process::is_process_alive;
31#[cfg(unix)]
32use super::process::terminate_pgid;
33#[cfg(windows)]
34use super::process::terminate_pid;
35use super::pty_process::spawn_pty_for_command;
36use super::pty_runtime::PtyRuntime;
37use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
38use super::{BgTaskInfo, BgTaskStatus};
39const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
47const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
48const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
49const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
50
51const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
58const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
59
60#[derive(Debug, Clone, Serialize)]
61pub struct BgCompletion {
62 pub task_id: String,
63 #[serde(skip_serializing)]
66 pub session_id: String,
67 pub status: BgTaskStatus,
68 pub exit_code: Option<i32>,
69 pub command: String,
70 #[serde(default, skip_serializing_if = "String::is_empty")]
76 pub output_preview: String,
77 #[serde(default, skip_serializing_if = "is_false")]
82 pub output_truncated: bool,
83 #[serde(default, skip_serializing_if = "Option::is_none")]
86 pub original_tokens: Option<u32>,
87 #[serde(default, skip_serializing_if = "Option::is_none")]
90 pub compressed_tokens: Option<u32>,
91 #[serde(default, skip_serializing_if = "is_false")]
93 pub tokens_skipped: bool,
94}
95
96fn is_false(v: &bool) -> bool {
97 !*v
98}
99
100#[derive(Debug, Clone, Serialize)]
101pub struct BgTaskSnapshot {
102 #[serde(flatten)]
103 pub info: BgTaskInfo,
104 pub exit_code: Option<i32>,
105 pub child_pid: Option<u32>,
106 pub workdir: String,
107 pub output_preview: String,
108 pub output_truncated: bool,
109 pub output_path: Option<String>,
110 pub stderr_path: Option<String>,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 pub pty_rows: Option<u16>,
113 #[serde(skip_serializing_if = "Option::is_none")]
114 pub pty_cols: Option<u16>,
115}
116
117#[derive(Clone)]
118pub struct BgTaskRegistry {
119 pub(crate) inner: Arc<RegistryInner>,
120}
121
122pub(crate) struct RegistryInner {
123 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
124 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
125 pub(crate) progress_sender: SharedProgressSender,
126 watchdog_started: AtomicBool,
127 pub(crate) shutdown: AtomicBool,
128 pub(crate) long_running_reminder_enabled: AtomicBool,
129 pub(crate) long_running_reminder_interval_ms: AtomicU64,
130 persisted_gc_started: AtomicBool,
131 #[cfg(test)]
132 persisted_gc_runs: AtomicU64,
133 pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
139 pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
140 pub(crate) db_harness: RwLock<Option<String>>,
141 pub(crate) wake_tx: crossbeam_channel::Sender<()>,
142 pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
143 pub(crate) watch_registry: Mutex<WatchRegistry>,
144}
145
146pub(crate) struct BgTask {
147 pub(crate) task_id: String,
148 pub(crate) session_id: String,
149 pub(crate) paths: TaskPaths,
150 pub(crate) started: Instant,
151 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
152 pub(crate) terminal_at: Mutex<Option<Instant>>,
153 pub(crate) state: Mutex<BgTaskState>,
154}
155
156pub(crate) enum TaskRuntime {
157 Piped(Option<Child>),
158 Pty(Option<PtyRuntime>),
159}
160
161pub(crate) struct BgTaskState {
162 pub(crate) metadata: PersistedTask,
163 pub(crate) runtime: TaskRuntime,
164 pub(crate) detached: bool,
165 pub(crate) child_exit_observed: bool,
176 pub(crate) buffer: BgBuffer,
177 pub(crate) pending_terminal_override: Option<BgTaskStatus>,
179}
180
181impl BgTaskRegistry {
182 pub fn new(progress_sender: SharedProgressSender) -> Self {
183 let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
184 Self {
185 inner: Arc::new(RegistryInner {
186 tasks: Mutex::new(HashMap::new()),
187 completions: Mutex::new(VecDeque::new()),
188 progress_sender,
189 watchdog_started: AtomicBool::new(false),
190 shutdown: AtomicBool::new(false),
191 long_running_reminder_enabled: AtomicBool::new(true),
192 long_running_reminder_interval_ms: AtomicU64::new(600_000),
193 persisted_gc_started: AtomicBool::new(false),
194 #[cfg(test)]
195 persisted_gc_runs: AtomicU64::new(0),
196 compressor: Mutex::new(None),
197 db_pool: RwLock::new(None),
198 db_harness: RwLock::new(None),
199 wake_tx,
200 wake_rx,
201 watch_registry: Mutex::new(WatchRegistry::default()),
202 }),
203 }
204 }
205
206 pub fn set_harness(&self, harness: Harness) {
207 if let Ok(mut slot) = self.inner.db_harness.write() {
208 *slot = Some(harness.as_str().to_string());
209 }
210 }
211
212 pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
213 if let Ok(mut slot) = self.inner.db_pool.write() {
214 *slot = Some(conn);
215 }
216 }
217
218 pub fn clear_db_pool(&self) {
219 if let Ok(mut slot) = self.inner.db_pool.write() {
220 *slot = None;
221 }
222 }
223
224 pub fn set_compressor<F>(&self, compressor: F)
229 where
230 F: Fn(&str, String) -> String + Send + Sync + 'static,
231 {
232 if let Ok(mut slot) = self.inner.compressor.lock() {
233 *slot = Some(Box::new(compressor));
234 }
235 }
236
237 pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
240 let Ok(slot) = self.inner.compressor.lock() else {
241 return output;
242 };
243 match slot.as_ref() {
244 Some(compressor) => compressor(command, output),
245 None => output,
246 }
247 }
248
249 fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
250 write_task(&paths.json, metadata)?;
251 self.dual_write_task(paths, metadata);
252 Ok(())
253 }
254
255 fn update_task_metadata<F>(
256 &self,
257 paths: &TaskPaths,
258 update: F,
259 ) -> std::io::Result<PersistedTask>
260 where
261 F: FnOnce(&mut PersistedTask),
262 {
263 let metadata = update_task(&paths.json, update)?;
264 self.dual_write_task(paths, &metadata);
265 Ok(metadata)
266 }
267
268 fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
269 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
270 let Some(pool) = pool else {
271 return;
272 };
273 let harness = self
274 .inner
275 .db_harness
276 .read()
277 .ok()
278 .and_then(|slot| slot.clone());
279 let Some(harness) = harness else {
280 crate::slog_warn!(
281 "dual-write bash_task to DB skipped for {}: harness not configured",
282 metadata.task_id
283 );
284 return;
285 };
286 let row = match metadata.to_bash_task_row(&harness, paths) {
287 Ok(row) => row,
288 Err(error) => {
289 crate::slog_warn!(
290 "dual-write bash_task to DB failed for {}: {}",
291 metadata.task_id,
292 error
293 );
294 return;
295 }
296 };
297 let conn = match pool.lock() {
298 Ok(conn) => conn,
299 Err(_) => {
300 crate::slog_warn!(
301 "dual-write bash_task to DB failed for {}: db mutex poisoned",
302 metadata.task_id
303 );
304 return;
305 }
306 };
307 if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
308 crate::slog_warn!(
309 "dual-write bash_task to DB failed for {}: {}",
310 metadata.task_id,
311 error
312 );
313 }
314 }
315
316 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
317 self.inner
318 .long_running_reminder_enabled
319 .store(enabled, Ordering::SeqCst);
320 self.inner
321 .long_running_reminder_interval_ms
322 .store(interval_ms, Ordering::SeqCst);
323 }
324
325 #[cfg(unix)]
326 #[allow(clippy::too_many_arguments)]
327 pub fn spawn(
328 &self,
329 command: &str,
330 session_id: String,
331 workdir: PathBuf,
332 env: HashMap<String, String>,
333 timeout: Option<Duration>,
334 storage_dir: PathBuf,
335 max_running: usize,
336 notify_on_completion: bool,
337 compressed: bool,
338 project_root: Option<PathBuf>,
339 ) -> Result<String, String> {
340 self.start_watchdog();
341
342 let running = self.running_count();
343 if running >= max_running {
344 return Err(format!(
345 "background bash task limit exceeded: {running} running (max {max_running})"
346 ));
347 }
348
349 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
350 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
351 let task_id = self.generate_unique_task_id()?;
352 let paths = task_paths(&storage_dir, &session_id, &task_id);
353 fs::create_dir_all(&paths.dir)
354 .map_err(|e| format!("failed to create background task dir: {e}"))?;
355
356 let mut metadata = PersistedTask::starting(
357 task_id.clone(),
358 session_id.clone(),
359 command.to_string(),
360 workdir.clone(),
361 project_root,
362 timeout_ms,
363 notify_on_completion,
364 compressed,
365 );
366 self.persist_task(&paths, &metadata)
367 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
368
369 create_capture_file(&paths.stdout)
373 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
374 create_capture_file(&paths.stderr)
375 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
376
377 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
378 Ok(child) => child,
379 Err(error) => {
380 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
381 let _ = delete_task_bundle(&paths);
382 return Err(error);
383 }
384 };
385
386 let child_pid = child.id();
387 metadata.mark_running(child_pid, child_pid as i32);
388 self.persist_task(&paths, &metadata)
389 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
390
391 let task = Arc::new(BgTask {
392 task_id: task_id.clone(),
393 session_id,
394 paths: paths.clone(),
395 started: Instant::now(),
396 last_reminder_at: Mutex::new(None),
397 terminal_at: Mutex::new(None),
398 state: Mutex::new(BgTaskState {
399 metadata,
400 runtime: TaskRuntime::Piped(Some(child)),
401 detached: false,
402 child_exit_observed: false,
403 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
404 pending_terminal_override: None,
405 }),
406 });
407
408 self.inner
409 .tasks
410 .lock()
411 .map_err(|_| "background task registry lock poisoned".to_string())?
412 .insert(task_id.clone(), task);
413
414 Ok(task_id)
415 }
416
417 #[allow(clippy::too_many_arguments)]
418 pub fn spawn_pty(
419 &self,
420 command: &str,
421 session_id: String,
422 workdir: PathBuf,
423 env: HashMap<String, String>,
424 timeout: Option<Duration>,
425 storage_dir: PathBuf,
426 max_running: usize,
427 notify_on_completion: bool,
428 compressed: bool,
429 project_root: Option<PathBuf>,
430 rows: u16,
431 cols: u16,
432 ) -> Result<String, String> {
433 self.start_watchdog();
434
435 let running = self.running_count();
436 if running >= max_running {
437 return Err(format!(
438 "background bash task limit exceeded: {running} running (max {max_running})"
439 ));
440 }
441
442 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
443 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
444 let task_id = self.generate_unique_task_id()?;
445 let paths = task_paths(&storage_dir, &session_id, &task_id);
446 fs::create_dir_all(&paths.dir)
447 .map_err(|e| format!("failed to create background task dir: {e}"))?;
448
449 let mut metadata = PersistedTask::starting(
450 task_id.clone(),
451 session_id.clone(),
452 command.to_string(),
453 workdir.clone(),
454 project_root,
455 timeout_ms,
456 notify_on_completion,
457 compressed,
458 );
459 metadata.mode = BgMode::Pty;
460 metadata.pty_rows = Some(rows);
461 metadata.pty_cols = Some(cols);
462 self.persist_task(&paths, &metadata)
463 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
464 create_capture_file(&paths.pty)
465 .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
466
467 let runtime = match spawn_pty_for_command(
468 &task_id,
469 &session_id,
470 command,
471 &paths,
472 &workdir,
473 &env,
474 rows,
475 cols,
476 self.inner.wake_tx.clone(),
477 ) {
478 Ok(runtime) => runtime,
479 Err(error) => {
480 crate::slog_warn!(
481 "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
482 );
483 let _ = delete_task_bundle(&paths);
484 return Err(error);
485 }
486 };
487
488 if let Some(child_pid) = runtime.child_pid {
489 metadata.mark_running(child_pid, child_pid as i32);
490 } else {
491 metadata.status = BgTaskStatus::Running;
492 metadata.pgid = None;
493 }
494 self.persist_task(&paths, &metadata)
495 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
496
497 let task = Arc::new(BgTask {
498 task_id: task_id.clone(),
499 session_id,
500 paths: paths.clone(),
501 started: Instant::now(),
502 last_reminder_at: Mutex::new(None),
503 terminal_at: Mutex::new(None),
504 state: Mutex::new(BgTaskState {
505 metadata,
506 runtime: TaskRuntime::Pty(Some(runtime)),
507 detached: false,
508 child_exit_observed: false,
509 buffer: BgBuffer::pty(paths.pty.clone()),
510 pending_terminal_override: None,
511 }),
512 });
513
514 self.inner
515 .tasks
516 .lock()
517 .map_err(|_| "background task registry lock poisoned".to_string())?
518 .insert(task_id.clone(), task);
519
520 Ok(task_id)
521 }
522
523 #[cfg(windows)]
524 #[allow(clippy::too_many_arguments)]
525 pub fn spawn(
526 &self,
527 command: &str,
528 session_id: String,
529 workdir: PathBuf,
530 env: HashMap<String, String>,
531 timeout: Option<Duration>,
532 storage_dir: PathBuf,
533 max_running: usize,
534 notify_on_completion: bool,
535 compressed: bool,
536 project_root: Option<PathBuf>,
537 ) -> Result<String, String> {
538 self.start_watchdog();
539
540 let running = self.running_count();
541 if running >= max_running {
542 return Err(format!(
543 "background bash task limit exceeded: {running} running (max {max_running})"
544 ));
545 }
546
547 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
548 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
549 let task_id = self.generate_unique_task_id()?;
550 let paths = task_paths(&storage_dir, &session_id, &task_id);
551 fs::create_dir_all(&paths.dir)
552 .map_err(|e| format!("failed to create background task dir: {e}"))?;
553
554 let mut metadata = PersistedTask::starting(
555 task_id.clone(),
556 session_id.clone(),
557 command.to_string(),
558 workdir.clone(),
559 project_root,
560 timeout_ms,
561 notify_on_completion,
562 compressed,
563 );
564 self.persist_task(&paths, &metadata)
565 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
566
567 create_capture_file(&paths.stdout)
573 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
574 create_capture_file(&paths.stderr)
575 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
576
577 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
578 Ok(child) => child,
579 Err(error) => {
580 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
581 let _ = delete_task_bundle(&paths);
582 return Err(error);
583 }
584 };
585
586 let child_pid = child.id();
587 metadata.status = BgTaskStatus::Running;
588 metadata.child_pid = Some(child_pid);
589 metadata.pgid = None;
590 self.persist_task(&paths, &metadata)
591 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
592
593 let task = Arc::new(BgTask {
594 task_id: task_id.clone(),
595 session_id,
596 paths: paths.clone(),
597 started: Instant::now(),
598 last_reminder_at: Mutex::new(None),
599 terminal_at: Mutex::new(None),
600 state: Mutex::new(BgTaskState {
601 metadata,
602 runtime: TaskRuntime::Piped(Some(child)),
603 detached: false,
604 child_exit_observed: false,
605 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
606 pending_terminal_override: None,
607 }),
608 });
609
610 self.inner
611 .tasks
612 .lock()
613 .map_err(|_| "background task registry lock poisoned".to_string())?
614 .insert(task_id.clone(), task);
615
616 Ok(task_id)
617 }
618
619 pub fn write_pty(
620 &self,
621 task_id: &str,
622 session_id: &str,
623 input: &[u8],
624 ) -> Result<usize, String> {
625 let task = self
626 .task_for_session(task_id, session_id)
627 .ok_or_else(|| "task_not_found".to_string())?;
628
629 let writer = {
630 let state = task
631 .state
632 .lock()
633 .map_err(|_| "background task lock poisoned".to_string())?;
634 if state.metadata.mode != BgMode::Pty {
635 return Err("task_not_pty".to_string());
636 }
637 if state.metadata.status.is_terminal() {
638 return Err("task_exited".to_string());
639 }
640 match &state.runtime {
641 TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
642 TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
643 TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
644 }
645 };
646
647 let mut writer = writer
648 .lock()
649 .map_err(|_| "PTY writer lock poisoned".to_string())?;
650 writer
651 .write_all(input)
652 .map_err(|error| format!("failed to write to PTY: {error}"))?;
653 writer
654 .flush()
655 .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
656 Ok(input.len())
657 }
658
659 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
660 self.replay_session_inner(storage_dir, session_id, None)
661 }
662
663 pub fn replay_session_for_project(
664 &self,
665 storage_dir: &Path,
666 session_id: &str,
667 project_root: &Path,
668 ) -> Result<(), String> {
669 self.replay_session_inner(storage_dir, session_id, Some(project_root))
670 }
671
672 fn replay_session_inner(
673 &self,
674 storage_dir: &Path,
675 session_id: &str,
676 project_root: Option<&Path>,
677 ) -> Result<(), String> {
678 self.start_watchdog();
679 if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
680 if let Err(error) = self.maybe_gc_persisted(storage_dir) {
681 crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
682 }
683 }
684
685 let canonical_project = project_root.map(canonicalized_path);
686 let tasks = match self.replay_session_from_db(session_id) {
698 Some(Ok(tasks)) if !tasks.is_empty() => tasks,
699 Some(Ok(_)) => {
700 let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
701 if !disk_tasks.is_empty() {
702 crate::slog_info!(
703 "bash task replay: 0 in DB for session {}, {} from disk fallback",
704 session_id,
705 disk_tasks.len()
706 );
707 }
708 disk_tasks
709 }
710 Some(Err(error)) => {
711 crate::slog_warn!(
712 "bash task replay DB lookup failed for session {}; falling back to disk: {}",
713 session_id,
714 error
715 );
716 self.replay_session_from_disk(storage_dir, session_id)?
717 }
718 None => {
719 self.replay_session_from_disk(storage_dir, session_id)?
721 }
722 };
723
724 for mut metadata in tasks {
725 if metadata.session_id != session_id {
726 continue;
727 }
728 if let Some(canonical_project) = canonical_project.as_deref() {
729 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
730 if metadata_project.as_deref() != Some(canonical_project) {
731 continue;
732 }
733 }
734
735 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
736 match metadata.status {
737 BgTaskStatus::Starting => {
738 let completion_was_delivered = metadata.completion_delivered;
739 metadata.mark_terminal(
740 BgTaskStatus::Failed,
741 None,
742 Some("spawn aborted".to_string()),
743 );
744 metadata.completion_delivered |= completion_was_delivered;
745 let _ = self.persist_task(&paths, &metadata);
746 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
747 self.insert_rehydrated_task(metadata, paths, true)?;
748 }
749 BgTaskStatus::Running | BgTaskStatus::Killing => {
750 if metadata.mode == BgMode::Pty {
751 if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
752 let completion_was_delivered = metadata.completion_delivered;
753 metadata = terminal_metadata_from_marker(metadata, marker, None);
754 metadata.completion_delivered |= completion_was_delivered;
755 let _ = self.persist_task(&paths, &metadata);
756 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
757 self.insert_rehydrated_task(metadata, paths, true)?;
758 } else if metadata.status.is_terminal() {
759 self.insert_rehydrated_task(metadata, paths, true)?;
760 } else {
761 let completion_was_delivered = metadata.completion_delivered;
762 metadata.mark_terminal(
763 BgTaskStatus::Killed,
764 None,
765 Some("pty_lost_on_bridge_restart".to_string()),
766 );
767 metadata.completion_delivered |= completion_was_delivered;
768 let _ = self.persist_task(&paths, &metadata);
769 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
770 self.insert_rehydrated_task(metadata, paths, true)?;
771 }
772 } else if self.running_metadata_is_stale(&metadata) {
773 let completion_was_delivered = metadata.completion_delivered;
774 metadata.mark_terminal(
775 BgTaskStatus::Killed,
776 None,
777 Some("orphaned (>24h)".to_string()),
778 );
779 metadata.completion_delivered |= completion_was_delivered;
780 if !paths.exit.exists() {
781 let _ = write_kill_marker_if_absent(&paths.exit);
782 }
783 let _ = self.persist_task(&paths, &metadata);
784 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
785 self.insert_rehydrated_task(metadata, paths, true)?;
786 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
787 let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
788 "recovered from inconsistent killing state on replay".to_string()
789 });
790 if reason.is_some() {
791 crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
792 metadata.task_id);
793 }
794 let completion_was_delivered = metadata.completion_delivered;
795 metadata = terminal_metadata_from_marker(metadata, marker, reason);
796 metadata.completion_delivered |= completion_was_delivered;
797 let _ = self.persist_task(&paths, &metadata);
798 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
799 self.insert_rehydrated_task(metadata, paths, true)?;
800 } else if metadata.status == BgTaskStatus::Killing {
801 if !paths.exit.exists() {
802 let _ = write_kill_marker_if_absent(&paths.exit);
803 }
804 let completion_was_delivered = metadata.completion_delivered;
805 metadata.mark_terminal(
806 BgTaskStatus::Killed,
807 None,
808 Some("recovered from inconsistent killing state on replay".to_string()),
809 );
810 metadata.completion_delivered |= completion_was_delivered;
811 let _ = self.persist_task(&paths, &metadata);
812 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
813 self.insert_rehydrated_task(metadata, paths, true)?;
814 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
815 let completion_was_delivered = metadata.completion_delivered;
816 metadata.mark_terminal(
817 BgTaskStatus::Failed,
818 None,
819 Some("process exited without exit marker".to_string()),
820 );
821 metadata.completion_delivered |= completion_was_delivered;
822 let _ = self.persist_task(&paths, &metadata);
823 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
824 self.insert_rehydrated_task(metadata, paths, true)?;
825 } else {
826 self.insert_rehydrated_task(metadata, paths, true)?;
827 }
828 }
829 _ if metadata.status.is_terminal() => {
830 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
836 self.insert_rehydrated_task(metadata, paths, true)?;
837 }
838 _ => {}
839 }
840 }
841
842 Ok(())
843 }
844
845 fn replay_session_from_db(
846 &self,
847 session_id: &str,
848 ) -> Option<Result<Vec<PersistedTask>, String>> {
849 let pool = self
850 .inner
851 .db_pool
852 .read()
853 .ok()
854 .and_then(|slot| slot.clone())?;
855 let harness = self
856 .inner
857 .db_harness
858 .read()
859 .ok()
860 .and_then(|slot| slot.clone())?;
861 let conn = match pool.lock() {
862 Ok(conn) => conn,
863 Err(_) => return Some(Err("db mutex poisoned".to_string())),
864 };
865 Some(
866 crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
867 .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
868 .map_err(|error| error.to_string()),
869 )
870 }
871
872 fn replay_session_from_disk(
873 &self,
874 storage_dir: &Path,
875 session_id: &str,
876 ) -> Result<Vec<PersistedTask>, String> {
877 let dir = session_tasks_dir(storage_dir, session_id);
878 if !dir.exists() {
879 return Ok(Vec::new());
880 }
881
882 let entries = fs::read_dir(&dir)
883 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
884 let mut tasks = Vec::new();
885 for entry in entries.flatten() {
886 let path = entry.path();
887 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
888 continue;
889 }
890 match read_task(&path) {
891 Ok(metadata) => tasks.push(metadata),
892 Err(error) => {
893 crate::slog_warn!(
894 "quarantining invalid background task metadata {} during replay: {error}",
895 path.display()
896 );
897 if let Err(quarantine_error) =
898 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
899 {
900 crate::slog_warn!(
901 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
902 path.display()
903 );
904 }
905 }
906 }
907 }
908 Ok(tasks)
909 }
910
911 pub fn register_watch(
912 &self,
913 task_id: String,
914 pattern: WatchPattern,
915 once: bool,
916 ) -> Result<String, &'static str> {
917 let task = self.task(&task_id);
918 let task_paths = task.as_ref().and_then(|task| {
919 task.state.lock().ok().map(|state| {
920 (
921 state.metadata.mode.clone(),
922 state.metadata.status.is_terminal(),
923 task.paths.stdout.clone(),
924 task.paths.stderr.clone(),
925 task.paths.pty.clone(),
926 )
927 })
928 });
929
930 let mut terminal_matches = Vec::new();
931 let watch_id = {
932 let mut registry = self
933 .inner
934 .watch_registry
935 .lock()
936 .map_err(|_| "watch_registry_poisoned")?;
937 let watch_id = registry.register(task_id.clone(), pattern, once)?;
938 if let Some((mode, terminal, stdout, stderr, pty)) = task_paths {
939 match mode {
940 BgMode::Pipes => {
941 let stdout_key = format!("{task_id}:stdout");
942 let stderr_key = format!("{task_id}:stderr");
943 if terminal {
944 registry.set_file_cursor(&stdout_key, 0);
945 registry.set_file_cursor(&stderr_key, 0);
946 terminal_matches.extend(registry.scan_file_new_bytes(
947 &stdout_key,
948 &task_id,
949 &stdout,
950 ));
951 terminal_matches.extend(registry.scan_file_new_bytes(
952 &stderr_key,
953 &task_id,
954 &stderr,
955 ));
956 } else {
957 registry.prime_file_cursor(&stdout_key, &stdout);
958 registry.prime_file_cursor(&stderr_key, &stderr);
959 }
960 }
961 BgMode::Pty => {
962 let pty_key = format!("{task_id}:pty");
963 if terminal {
964 registry.set_file_cursor(&pty_key, 0);
965 terminal_matches
966 .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
967 } else {
968 registry.prime_file_cursor(&pty_key, &pty);
969 }
970 }
971 }
972 }
973 watch_id
974 };
975
976 if let Some(task) = task.as_ref() {
977 if task.is_terminal() {
978 let completion = self.remove_pending_completion(&task_id).or_else(|| {
979 self.completion_snapshot_for_task(task, BG_COMPLETION_PREVIEW_BYTES)
980 });
981 if terminal_matches.is_empty() {
982 if let Some(completion) = completion.as_ref() {
983 self.emit_bash_watch_exit(completion);
984 }
985 } else {
986 for pattern_match in terminal_matches {
987 self.emit_bash_pattern_match(&task.session_id, pattern_match);
988 }
989 }
990 let _ = task.set_completion_delivered(true, self);
991 self.clear_task_watch_state(&task_id);
992 }
993 }
994
995 Ok(watch_id)
996 }
997
998 pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
999 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1000 registry.unregister(task_id, watch_id);
1001 }
1002 }
1003
1004 pub fn active_watch_count(&self, task_id: &str) -> usize {
1005 self.inner
1006 .watch_registry
1007 .lock()
1008 .map(|registry| registry.active_count(task_id))
1009 .unwrap_or(0)
1010 }
1011
1012 fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1013 self.inner
1014 .watch_registry
1015 .lock()
1016 .map(|registry| {
1017 (
1018 registry.has_controlled_task(task_id),
1019 registry.has_matched_task(task_id),
1020 )
1021 })
1022 .unwrap_or((false, false))
1023 }
1024
1025 fn task_has_watch_control(&self, task_id: &str) -> bool {
1026 self.inner
1027 .watch_registry
1028 .lock()
1029 .map(|registry| registry.has_controlled_task(task_id))
1030 .unwrap_or(false)
1031 }
1032
1033 fn clear_task_watch_state(&self, task_id: &str) {
1034 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1035 registry.clear_task(task_id);
1036 }
1037 }
1038
1039 pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1040 let (mode, stdout, stderr, pty) = match task.state.lock() {
1041 Ok(state) => (
1042 state.metadata.mode.clone(),
1043 task.paths.stdout.clone(),
1044 task.paths.stderr.clone(),
1045 task.paths.pty.clone(),
1046 ),
1047 Err(_) => return,
1048 };
1049 let mut matches = Vec::new();
1050 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1051 match mode {
1052 BgMode::Pipes => {
1053 let stdout_key = format!("{}:stdout", task.task_id);
1054 let stderr_key = format!("{}:stderr", task.task_id);
1055 matches.extend(registry.scan_file_new_bytes(
1056 &stdout_key,
1057 &task.task_id,
1058 &stdout,
1059 ));
1060 matches.extend(registry.scan_file_new_bytes(
1061 &stderr_key,
1062 &task.task_id,
1063 &stderr,
1064 ));
1065 }
1066 BgMode::Pty => {
1067 let pty_key = format!("{}:pty", task.task_id);
1068 matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1069 }
1070 }
1071 }
1072 for pattern_match in matches {
1073 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1074 }
1075 }
1076
1077 pub fn status(
1078 &self,
1079 task_id: &str,
1080 session_id: &str,
1081 project_root: Option<&Path>,
1082 storage_dir: Option<&Path>,
1083 preview_bytes: usize,
1084 ) -> Option<BgTaskSnapshot> {
1085 let mut task = self.task_for_session(task_id, session_id);
1086 if task.is_none() {
1087 if let Some(storage_dir) = storage_dir {
1088 let _ = self.replay_session(storage_dir, session_id);
1089 task = self.task_for_session(task_id, session_id);
1090 }
1091 }
1092 let Some(task) = task else {
1093 return self.status_relaxed(
1094 task_id,
1095 session_id,
1096 project_root?,
1097 storage_dir?,
1098 preview_bytes,
1099 );
1100 };
1101 let _ = self.poll_task(&task);
1102 let mut snapshot = task.snapshot(preview_bytes);
1103 self.maybe_compress_snapshot(&task, &mut snapshot);
1104 Some(snapshot)
1105 }
1106
1107 fn status_relaxed_task(
1108 &self,
1109 task_id: &str,
1110 project_root: &Path,
1111 storage_dir: &Path,
1112 ) -> Option<Arc<BgTask>> {
1113 let canonical_project = canonicalized_path(project_root);
1114 match self.lookup_relaxed_task_from_db(task_id, project_root) {
1115 Some(Ok(Some(metadata))) => {
1116 if let Some(task) = self.task(task_id) {
1117 let matches_project = task
1118 .state
1119 .lock()
1120 .map(|state| {
1121 state
1122 .metadata
1123 .project_root
1124 .as_deref()
1125 .map(canonicalized_path)
1126 .as_deref()
1127 == Some(canonical_project.as_path())
1128 })
1129 .unwrap_or(false);
1130 return matches_project.then_some(task);
1131 }
1132 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1133 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1134 return None;
1135 }
1136 return self.task(task_id);
1137 }
1138 Some(Ok(None)) => {
1139 crate::slog_info!(
1140 "bash task relaxed DB miss for {}; falling back to disk",
1141 task_id
1142 );
1143 }
1144 Some(Err(error)) => {
1145 crate::slog_warn!(
1146 "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1147 task_id,
1148 error
1149 );
1150 }
1151 None => {
1152 crate::slog_info!(
1153 "bash task relaxed DB unavailable for {}; falling back to disk",
1154 task_id
1155 );
1156 }
1157 }
1158 let root = storage_dir.join("bash-tasks");
1159 let entries = fs::read_dir(&root).ok()?;
1160 for entry in entries.flatten() {
1161 let dir = entry.path();
1162 if !dir.is_dir() {
1163 continue;
1164 }
1165 let path = dir.join(format!("{task_id}.json"));
1166 if !path.exists() {
1167 continue;
1168 }
1169 let metadata = match read_task(&path) {
1170 Ok(metadata) => metadata,
1171 Err(error) => {
1172 crate::slog_warn!(
1173 "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1174 path.display()
1175 );
1176 if let Err(quarantine_error) =
1177 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1178 {
1179 crate::slog_warn!(
1180 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1181 path.display()
1182 );
1183 }
1184 continue;
1185 }
1186 };
1187 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1188 if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1189 continue;
1190 }
1191 if let Some(task) = self.task(task_id) {
1192 let matches_project = task
1193 .state
1194 .lock()
1195 .map(|state| {
1196 state
1197 .metadata
1198 .project_root
1199 .as_deref()
1200 .map(canonicalized_path)
1201 .as_deref()
1202 == Some(canonical_project.as_path())
1203 })
1204 .unwrap_or(false);
1205 return matches_project.then_some(task);
1206 }
1207 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1208 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1209 return None;
1210 }
1211 return self.task(task_id);
1212 }
1213 None
1214 }
1215
1216 fn lookup_relaxed_task_from_db(
1217 &self,
1218 task_id: &str,
1219 project_root: &Path,
1220 ) -> Option<Result<Option<PersistedTask>, String>> {
1221 let pool = self
1222 .inner
1223 .db_pool
1224 .read()
1225 .ok()
1226 .and_then(|slot| slot.clone())?;
1227 let harness = self
1228 .inner
1229 .db_harness
1230 .read()
1231 .ok()
1232 .and_then(|slot| slot.clone())?;
1233 let conn = match pool.lock() {
1234 Ok(conn) => conn,
1235 Err(_) => return Some(Err("db mutex poisoned".to_string())),
1236 };
1237 let project_key = crate::search_index::project_cache_key(project_root);
1238 Some(
1239 crate::db::bash_tasks::find_bash_task_for_project(
1240 &conn,
1241 &harness,
1242 &project_key,
1243 task_id,
1244 )
1245 .map(|row| row.map(PersistedTask::from))
1246 .map_err(|error| error.to_string()),
1247 )
1248 }
1249
1250 pub(super) fn status_relaxed(
1251 &self,
1252 task_id: &str,
1253 _session_id: &str,
1254 project_root: &Path,
1255 storage_dir: &Path,
1256 preview_bytes: usize,
1257 ) -> Option<BgTaskSnapshot> {
1258 let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1259 let _ = self.poll_task(&task);
1260 let mut snapshot = task.snapshot(preview_bytes);
1261 self.maybe_compress_snapshot(&task, &mut snapshot);
1262 Some(snapshot)
1263 }
1264
1265 pub fn kill_relaxed(
1266 &self,
1267 task_id: &str,
1268 project_root: &Path,
1269 storage_dir: &Path,
1270 ) -> Result<BgTaskSnapshot, String> {
1271 let task = self
1272 .status_relaxed_task(task_id, project_root, storage_dir)
1273 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1274 self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1275 }
1276
1277 pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1278 #[cfg(test)]
1279 self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1280
1281 let mut deleted = 0usize;
1282
1283 let root = storage_dir.join("bash-tasks");
1284 if root.exists() {
1285 let session_dirs = fs::read_dir(&root).map_err(|e| {
1286 format!(
1287 "failed to read background task root {}: {e}",
1288 root.display()
1289 )
1290 })?;
1291 for session_entry in session_dirs.flatten() {
1292 let session_dir = session_entry.path();
1293 if !session_dir.is_dir() {
1294 continue;
1295 }
1296 let task_entries = match fs::read_dir(&session_dir) {
1297 Ok(entries) => entries,
1298 Err(error) => {
1299 crate::slog_warn!(
1300 "failed to read background task session dir {}: {error}",
1301 session_dir.display()
1302 );
1303 continue;
1304 }
1305 };
1306 for task_entry in task_entries.flatten() {
1307 let json_path = task_entry.path();
1308 if json_path
1309 .extension()
1310 .and_then(|extension| extension.to_str())
1311 != Some("json")
1312 {
1313 continue;
1314 }
1315 if modified_within(&json_path, PERSISTED_GC_GRACE) {
1316 continue;
1317 }
1318 let metadata = match read_task(&json_path) {
1319 Ok(metadata) => metadata,
1320 Err(error) => {
1321 crate::slog_warn!(
1322 "quarantining corrupt background task metadata {}: {error}",
1323 json_path.display()
1324 );
1325 quarantine_task_json(
1326 storage_dir,
1327 &session_dir,
1328 &json_path,
1329 QuarantineKind::Corrupt,
1330 )?;
1331 continue;
1332 }
1333 };
1334 if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1335 continue;
1336 }
1337 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1338 match delete_task_bundle(&paths) {
1339 Ok(()) => {
1340 deleted += 1;
1341 log::debug!(
1342 "deleted persisted background task bundle {}",
1343 metadata.task_id
1344 );
1345 }
1346 Err(error) => {
1347 crate::slog_warn!(
1348 "failed to delete background task bundle {}: {error}",
1349 metadata.task_id
1350 );
1351 continue;
1352 }
1353 }
1354 }
1355 }
1356 }
1357 gc_quarantine(storage_dir);
1358 Ok(deleted)
1359 }
1360
1361 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1362 let tasks = self
1363 .inner
1364 .tasks
1365 .lock()
1366 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1367 .unwrap_or_default();
1368 tasks
1369 .into_iter()
1370 .map(|task| {
1371 let _ = self.poll_task(&task);
1372 let mut snapshot = task.snapshot(preview_bytes);
1373 self.maybe_compress_snapshot(&task, &mut snapshot);
1374 snapshot
1375 })
1376 .collect()
1377 }
1378
1379 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1385 if !snapshot.info.status.is_terminal() {
1386 return;
1387 }
1388 let (compressed_flag, mode) = task
1389 .state
1390 .lock()
1391 .map(|state| (state.metadata.compressed, state.metadata.mode.clone()))
1392 .unwrap_or((true, BgMode::Pipes));
1393 if mode == BgMode::Pty {
1394 return;
1395 }
1396 if !compressed_flag {
1397 return;
1398 }
1399 let raw = std::mem::take(&mut snapshot.output_preview);
1400 snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
1401 }
1402
1403 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1404 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1405 }
1406
1407 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1408 let task = self
1409 .task_for_session(task_id, session_id)
1410 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1411 let mut state = task
1412 .state
1413 .lock()
1414 .map_err(|_| "background task lock poisoned".to_string())?;
1415 let updated = self
1416 .update_task_metadata(&task.paths, |metadata| {
1417 metadata.notify_on_completion = true;
1418 metadata.completion_delivered = false;
1419 })
1420 .map_err(|e| format!("failed to promote background task: {e}"))?;
1421 state.metadata = updated;
1422 if state.metadata.status.is_terminal() {
1423 state.buffer.enforce_terminal_cap();
1424 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1425 }
1426 Ok(true)
1427 }
1428
1429 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1430 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1431 .map(|_| ())
1432 }
1433
1434 pub fn cleanup_finished(&self, older_than: Duration) {
1435 let cutoff = Instant::now().checked_sub(older_than);
1436 let removable_paths: Vec<(String, TaskPaths)> =
1437 if let Ok(mut tasks) = self.inner.tasks.lock() {
1438 let removable = tasks
1439 .iter()
1440 .filter_map(|(task_id, task)| {
1441 let delivered_terminal = task
1442 .state
1443 .lock()
1444 .map(|state| {
1445 state.metadata.status.is_terminal()
1446 && state.metadata.completion_delivered
1447 })
1448 .unwrap_or(false);
1449 if !delivered_terminal {
1450 return None;
1451 }
1452
1453 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1454 let expired = match (terminal_at, cutoff) {
1455 (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1456 (Some(_), None) => true,
1457 (None, _) => false,
1458 };
1459 expired.then(|| task_id.clone())
1460 })
1461 .collect::<Vec<_>>();
1462
1463 removable
1464 .into_iter()
1465 .filter_map(|task_id| {
1466 tasks
1467 .remove(&task_id)
1468 .map(|task| (task_id, task.paths.clone()))
1469 })
1470 .collect()
1471 } else {
1472 Vec::new()
1473 };
1474
1475 for (task_id, paths) in removable_paths {
1476 match delete_task_bundle(&paths) {
1477 Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1478 Err(error) => crate::slog_warn!(
1479 "failed to delete persisted background task bundle {task_id}: {error}"
1480 ),
1481 }
1482 }
1483 }
1484
1485 pub fn drain_completions(&self) -> Vec<BgCompletion> {
1486 self.drain_completions_for_session(None)
1487 }
1488
1489 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1490 let completions = match self.inner.completions.lock() {
1491 Ok(completions) => completions,
1492 Err(_) => return Vec::new(),
1493 };
1494
1495 completions
1496 .iter()
1497 .filter(|completion| {
1498 session_id
1499 .map(|session_id| completion.session_id == session_id)
1500 .unwrap_or(true)
1501 })
1502 .cloned()
1503 .collect()
1504 }
1505
1506 pub fn ack_completions_for_session(
1507 &self,
1508 session_id: Option<&str>,
1509 task_ids: &[String],
1510 ) -> Vec<String> {
1511 if task_ids.is_empty() {
1512 return Vec::new();
1513 }
1514 let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1515 let mut completion_sessions = HashMap::new();
1516 if let Ok(mut completions) = self.inner.completions.lock() {
1517 completions.retain(|completion| {
1518 let session_matches = session_id
1519 .map(|session_id| completion.session_id == session_id)
1520 .unwrap_or(true);
1521 if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1522 completion_sessions
1523 .insert(completion.task_id.clone(), completion.session_id.clone());
1524 false
1525 } else {
1526 true
1527 }
1528 });
1529 }
1530
1531 let mut delivered = Vec::new();
1532 for task_id in task_ids {
1533 let task = if let Some(session_id) = session_id {
1534 self.task_for_session(task_id, session_id)
1535 } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1536 self.task_for_session(task_id, completion_session_id)
1537 } else {
1538 self.task(task_id)
1539 };
1540 if let Some(task) = task {
1541 if task.set_completion_delivered(true, self).is_ok() {
1542 delivered.push(task_id.clone());
1543 }
1544 }
1545 }
1546
1547 delivered
1548 }
1549
1550 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1551 self.inner
1552 .completions
1553 .lock()
1554 .map(|completions| {
1555 completions
1556 .iter()
1557 .filter(|completion| completion.session_id == session_id)
1558 .cloned()
1559 .collect()
1560 })
1561 .unwrap_or_default()
1562 }
1563
1564 fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1565 let mut completions = self.inner.completions.lock().ok()?;
1566 let idx = completions
1567 .iter()
1568 .position(|completion| completion.task_id == task_id)?;
1569 completions.remove(idx)
1570 }
1571
1572 fn completion_snapshot_for_task(
1573 &self,
1574 task: &Arc<BgTask>,
1575 preview_bytes: usize,
1576 ) -> Option<BgCompletion> {
1577 let snapshot = task.snapshot(preview_bytes);
1578 if !snapshot.info.status.is_terminal() {
1579 return None;
1580 }
1581 let output_preview = if snapshot.info.mode == BgMode::Pty {
1582 String::new()
1583 } else {
1584 let compressed = task
1585 .state
1586 .lock()
1587 .map(|state| state.metadata.compressed)
1588 .unwrap_or(true);
1589 if compressed {
1590 self.compress_output(&snapshot.info.command, snapshot.output_preview)
1591 } else {
1592 snapshot.output_preview
1593 }
1594 };
1595 Some(BgCompletion {
1596 task_id: snapshot.info.task_id,
1597 session_id: task.session_id.clone(),
1598 status: snapshot.info.status,
1599 exit_code: snapshot.exit_code,
1600 command: snapshot.info.command,
1601 output_preview,
1602 output_truncated: snapshot.output_truncated,
1603 original_tokens: None,
1604 compressed_tokens: None,
1605 tokens_skipped: false,
1606 })
1607 }
1608
1609 pub fn detach(&self) {
1610 self.inner.shutdown.store(true, Ordering::SeqCst);
1611 if let Ok(mut tasks) = self.inner.tasks.lock() {
1612 for task in tasks.values() {
1613 if let Ok(mut state) = task.state.lock() {
1614 match &mut state.runtime {
1615 TaskRuntime::Piped(child) => *child = None,
1616 TaskRuntime::Pty(runtime) => *runtime = None,
1617 }
1618 state.detached = true;
1619 }
1620 }
1621 tasks.clear();
1622 }
1623 }
1624
1625 pub fn shutdown(&self) {
1626 let tasks = self
1627 .inner
1628 .tasks
1629 .lock()
1630 .map(|tasks| {
1631 tasks
1632 .values()
1633 .map(|task| (task.task_id.clone(), task.session_id.clone()))
1634 .collect::<Vec<_>>()
1635 })
1636 .unwrap_or_default();
1637 for (task_id, session_id) in tasks {
1638 let _ = self.kill(&task_id, &session_id);
1639 }
1640 }
1641
1642 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1643 if let Ok(state) = task.state.lock() {
1644 if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1645 let complete = pty.reader_done.load(Ordering::SeqCst)
1646 && pty.exit_observed.load(Ordering::SeqCst);
1647 if !complete {
1648 return Ok(());
1649 }
1650 }
1651 }
1652 let marker = match read_exit_marker(&task.paths.exit) {
1653 Ok(Some(marker)) => marker,
1654 Ok(None) => return Ok(()),
1655 Err(error) => return Err(format!("failed to read exit marker: {error}")),
1656 };
1657 self.finalize_from_marker(task, marker, None)
1658 }
1659
1660 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1661 let Ok(mut state) = task.state.lock() else {
1662 return;
1663 };
1664 match &mut state.runtime {
1665 TaskRuntime::Piped(child_slot) => {
1666 if let Some(child) = child_slot.as_mut() {
1667 if matches!(child.try_wait(), Ok(Some(_))) {
1668 *child_slot = None;
1669 state.detached = true;
1670 state.child_exit_observed = true;
1671 }
1672 } else if state.detached {
1673 let child_known_dead = state.child_exit_observed
1674 || state
1675 .metadata
1676 .child_pid
1677 .is_some_and(|pid| !is_process_alive(pid));
1678 if child_known_dead {
1679 self.fail_without_exit_marker_if_needed(task, &mut state);
1680 }
1681 }
1682 }
1683 TaskRuntime::Pty(Some(pty)) => {
1684 let complete = pty.reader_done.load(Ordering::SeqCst)
1685 && pty.exit_observed.load(Ordering::SeqCst);
1686 if complete {
1687 drop(state);
1688 let _ = self.poll_task(task);
1689 }
1690 }
1691 TaskRuntime::Pty(None) => {}
1692 }
1693 }
1694
1695 fn fail_without_exit_marker_if_needed(&self, task: &Arc<BgTask>, state: &mut BgTaskState) {
1696 if state.metadata.status.is_terminal() {
1697 return;
1698 }
1699 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1700 return;
1701 }
1702 let watch_controlled = self.task_has_watch_control(&task.task_id);
1703 let updated = self.update_task_metadata(&task.paths, |metadata| {
1704 metadata.mark_terminal(
1705 BgTaskStatus::Failed,
1706 None,
1707 Some("process exited without exit marker".to_string()),
1708 );
1709 if watch_controlled {
1710 metadata.completion_delivered = true;
1711 }
1712 });
1713 if let Ok(metadata) = updated {
1714 state.pending_terminal_override = None;
1715 state.metadata = metadata;
1716 task.mark_terminal_now();
1717 state.buffer.enforce_terminal_cap();
1718 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1719 }
1720 }
1721
1722 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1723 self.inner
1724 .tasks
1725 .lock()
1726 .map(|tasks| {
1727 tasks
1728 .values()
1729 .filter(|task| task.is_running())
1730 .cloned()
1731 .collect()
1732 })
1733 .unwrap_or_default()
1734 }
1735
1736 fn insert_rehydrated_task(
1737 &self,
1738 metadata: PersistedTask,
1739 paths: TaskPaths,
1740 detached: bool,
1741 ) -> Result<(), String> {
1742 let task_id = metadata.task_id.clone();
1743 let session_id = metadata.session_id.clone();
1744 let started = started_instant_from_unix_millis(metadata.started_at);
1745 let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1746 let mode = metadata.mode.clone();
1747 let task = Arc::new(BgTask {
1748 task_id: task_id.clone(),
1749 session_id,
1750 paths: paths.clone(),
1751 started,
1752 last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1753 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1754 state: Mutex::new(BgTaskState {
1755 metadata,
1756 runtime: if mode == BgMode::Pty {
1757 TaskRuntime::Pty(None)
1758 } else {
1759 TaskRuntime::Piped(None)
1760 },
1761 detached,
1762 child_exit_observed: false,
1769 buffer: if mode == BgMode::Pty {
1770 BgBuffer::pty(paths.pty.clone())
1771 } else {
1772 BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1773 },
1774 pending_terminal_override: None,
1775 }),
1776 });
1777 self.inner
1778 .tasks
1779 .lock()
1780 .map_err(|_| "background task registry lock poisoned".to_string())?
1781 .insert(task_id, task);
1782 Ok(())
1783 }
1784
1785 fn kill_with_status(
1786 &self,
1787 task_id: &str,
1788 session_id: &str,
1789 terminal_status: BgTaskStatus,
1790 ) -> Result<BgTaskSnapshot, String> {
1791 let task = self
1792 .task_for_session(task_id, session_id)
1793 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1794
1795 {
1796 let mut state = task
1797 .state
1798 .lock()
1799 .map_err(|_| "background task lock poisoned".to_string())?;
1800 if state.metadata.status.is_terminal() {
1801 state.pending_terminal_override = None;
1802 return Ok(task.snapshot_locked(&state, 5 * 1024));
1803 }
1804
1805 if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1806 state.metadata =
1807 terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1808 if self.task_has_watch_control(&task.task_id) {
1809 state.metadata.completion_delivered = true;
1810 }
1811 state.pending_terminal_override = None;
1812 task.mark_terminal_now();
1813 match &mut state.runtime {
1814 TaskRuntime::Piped(child) => *child = None,
1815 TaskRuntime::Pty(runtime) => *runtime = None,
1816 }
1817 state.detached = true;
1818 state.buffer.enforce_terminal_cap();
1819 self.persist_task(&task.paths, &state.metadata)
1820 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1821 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1822 return Ok(task.snapshot_locked(&state, 5 * 1024));
1823 }
1824
1825 let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
1826 if !was_already_killing {
1827 state.metadata.status = BgTaskStatus::Killing;
1828 self.persist_task(&task.paths, &state.metadata)
1829 .map_err(|e| format!("failed to persist killing state: {e}"))?;
1830 }
1831
1832 let pgid = state.metadata.pgid;
1833 #[cfg(windows)]
1834 let child_pid = state.metadata.child_pid;
1835 if !was_already_killing
1836 && state.metadata.mode == BgMode::Pty
1837 && terminal_status == BgTaskStatus::TimedOut
1838 {
1839 state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
1840 }
1841
1842 match &mut state.runtime {
1843 TaskRuntime::Piped(child_slot) => {
1844 #[cfg(unix)]
1845 if let Some(pgid) = pgid {
1846 terminate_pgid(pgid, child_slot.as_mut());
1847 }
1848 #[cfg(windows)]
1849 if let Some(child) = child_slot.as_mut() {
1850 super::process::terminate_process(child);
1851 } else if let Some(pid) = child_pid {
1852 terminate_pid(pid);
1853 }
1854 if let Some(child) = child_slot.as_mut() {
1855 let _ = child.wait();
1856 }
1857 *child_slot = None;
1858 state.detached = true;
1859
1860 if !task.paths.exit.exists() {
1861 write_kill_marker_if_absent(&task.paths.exit)
1862 .map_err(|e| format!("failed to write kill marker: {e}"))?;
1863 }
1864
1865 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1866 Some(124)
1867 } else {
1868 None
1869 };
1870 state
1871 .metadata
1872 .mark_terminal(terminal_status, exit_code, None);
1873 if self.task_has_watch_control(&task.task_id) {
1874 state.metadata.completion_delivered = true;
1875 }
1876 state.pending_terminal_override = None;
1877 task.mark_terminal_now();
1878 self.persist_task(&task.paths, &state.metadata)
1879 .map_err(|e| format!("failed to persist killed state: {e}"))?;
1880 state.buffer.enforce_terminal_cap();
1881 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1882 }
1883 TaskRuntime::Pty(Some(pty)) => {
1884 pty.was_killed.store(true, Ordering::SeqCst);
1885 if let Err(error) = pty.killer.kill() {
1886 crate::slog_warn!("[pty-kill] {task_id} ChildKiller::kill failed: {error}");
1887 }
1888 if let Some(pid) = pty.child_pid {
1889 #[cfg(unix)]
1890 terminate_pgid(pid as i32, None);
1891 #[cfg(windows)]
1892 terminate_pid(pid);
1893 }
1894 drop(pty.master.take());
1895 }
1896 TaskRuntime::Pty(None) => {}
1897 }
1898 }
1899
1900 Ok(task.snapshot(5 * 1024))
1901 }
1902
1903 fn finalize_from_marker(
1904 &self,
1905 task: &Arc<BgTask>,
1906 marker: ExitMarker,
1907 reason: Option<String>,
1908 ) -> Result<(), String> {
1909 let watch_controlled = self.task_has_watch_control(&task.task_id);
1910 {
1911 let mut state = task
1912 .state
1913 .lock()
1914 .map_err(|_| "background task lock poisoned".to_string())?;
1915 if state.metadata.status.is_terminal() {
1916 state.pending_terminal_override = None;
1917 return Ok(());
1918 }
1919
1920 let pending_override = state.pending_terminal_override.take();
1921 let is_pty = state.metadata.mode == BgMode::Pty;
1922 let updated = self
1923 .update_task_metadata(&task.paths, |metadata| {
1924 let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
1925 let mut metadata = metadata.clone();
1926 let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
1927 let exit_code = if target_status == BgTaskStatus::TimedOut {
1928 Some(124)
1929 } else {
1930 None
1931 };
1932 metadata.mark_terminal(target_status, exit_code, reason);
1933 metadata
1934 } else {
1935 terminal_metadata_from_marker(metadata.clone(), marker, reason)
1936 };
1937 if watch_controlled {
1938 new_metadata.completion_delivered = true;
1939 }
1940 *metadata = new_metadata;
1941 })
1942 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1943 state.metadata = updated;
1944 task.mark_terminal_now();
1945 match &mut state.runtime {
1946 TaskRuntime::Piped(child) => *child = None,
1947 TaskRuntime::Pty(runtime) => *runtime = None,
1948 }
1949 state.detached = true;
1950 }
1951
1952 self.scan_task_watch_output(task);
1955
1956 let mut state = task
1957 .state
1958 .lock()
1959 .map_err(|_| "background task lock poisoned".to_string())?;
1960 state.buffer.enforce_terminal_cap();
1961 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1962 Ok(())
1963 }
1964
1965 fn enqueue_completion_if_needed(
1966 &self,
1967 metadata: &PersistedTask,
1968 paths: Option<&TaskPaths>,
1969 emit_frame: bool,
1970 ) {
1971 if metadata.status.is_terminal() && !metadata.completion_delivered {
1972 self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
1973 }
1974 }
1975
1976 fn enqueue_completion_locked(
1977 &self,
1978 metadata: &PersistedTask,
1979 buffer: Option<&BgBuffer>,
1980 emit_frame: bool,
1981 ) {
1982 self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
1983 }
1984
1985 fn enqueue_completion_from_parts(
1986 &self,
1987 metadata: &PersistedTask,
1988 buffer: Option<&BgBuffer>,
1989 paths: Option<&TaskPaths>,
1990 emit_frame: bool,
1991 ) {
1992 if !metadata.status.is_terminal() {
2003 return;
2004 }
2005 let (raw_preview, output_truncated) = if metadata.mode == BgMode::Pty {
2010 (String::new(), false)
2011 } else {
2012 match buffer {
2013 Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
2014 None => paths
2015 .map(|paths| read_tail_from_disk(metadata, paths, BG_COMPLETION_PREVIEW_BYTES))
2016 .unwrap_or_else(|| (String::new(), false)),
2017 }
2018 };
2019 let output_preview = if metadata.compressed {
2024 self.compress_output(&metadata.command, raw_preview)
2025 } else {
2026 raw_preview
2027 };
2028 let token_counts = self.completion_token_counts(metadata, buffer, paths);
2029 let completion = BgCompletion {
2030 task_id: metadata.task_id.clone(),
2031 session_id: metadata.session_id.clone(),
2032 status: metadata.status.clone(),
2033 exit_code: metadata.exit_code,
2034 command: metadata.command.clone(),
2035 output_preview,
2036 output_truncated,
2037 original_tokens: token_counts.original_tokens,
2038 compressed_tokens: token_counts.compressed_tokens,
2039 tokens_skipped: token_counts.tokens_skipped,
2040 };
2041
2042 self.record_compression_event_if_applicable(metadata, &token_counts);
2053
2054 let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2055 if watch_controlled {
2056 if emit_frame && !watch_matched {
2057 self.emit_bash_watch_exit(&completion);
2058 }
2059 self.clear_task_watch_state(&metadata.task_id);
2060 return;
2061 }
2062
2063 if metadata.completion_delivered {
2073 return;
2074 }
2075
2076 let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2079 if completions
2080 .iter()
2081 .any(|existing| existing.task_id == metadata.task_id)
2082 {
2083 false
2084 } else {
2085 completions.push_back(completion.clone());
2086 true
2087 }
2088 } else {
2089 false
2090 };
2091
2092 if pushed && emit_frame {
2093 self.emit_bash_completed(completion);
2094 }
2095 }
2096
2097 fn record_compression_event_if_applicable(
2098 &self,
2099 metadata: &PersistedTask,
2100 token_counts: &CompletionTokenCounts,
2101 ) {
2102 if metadata.mode == BgMode::Pty {
2103 return;
2104 }
2105
2106 let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2107 token_counts.original_tokens,
2108 token_counts.compressed_tokens,
2109 token_counts.original_bytes,
2110 token_counts.compressed_bytes,
2111 ) {
2112 (
2113 Some(original_tokens),
2114 Some(compressed_tokens),
2115 Some(original_bytes),
2116 Some(compressed_bytes),
2117 ) => (
2118 original_tokens,
2119 compressed_tokens,
2120 original_bytes,
2121 compressed_bytes,
2122 ),
2123 _ => {
2124 crate::slog_warn!(
2125 "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2126 metadata.task_id
2127 );
2128 return;
2129 }
2130 };
2131
2132 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2133 let Some(pool) = pool else {
2134 crate::slog_warn!(
2135 "compression event skipped for {}: db_pool not initialized — was configure run?",
2136 metadata.task_id
2137 );
2138 return;
2139 };
2140 let harness = self
2141 .inner
2142 .db_harness
2143 .read()
2144 .ok()
2145 .and_then(|slot| slot.clone());
2146 let Some(harness) = harness else {
2147 crate::slog_warn!(
2148 "compression event insert skipped for {}: harness not configured",
2149 metadata.task_id
2150 );
2151 return;
2152 };
2153
2154 let project_root = metadata
2155 .project_root
2156 .as_deref()
2157 .unwrap_or(&metadata.workdir);
2158 let project_key = crate::search_index::project_cache_key(project_root);
2159 let row = crate::db::compression_events::CompressionEventRow {
2160 harness: &harness,
2161 session_id: Some(&metadata.session_id),
2162 project_key: &project_key,
2163 tool: "bash",
2164 task_id: Some(&metadata.task_id),
2165 command: Some(&metadata.command),
2166 compressor: if metadata.compressed {
2167 "registry"
2168 } else {
2169 "none"
2170 },
2171 original_bytes,
2172 compressed_bytes,
2173 original_tokens,
2174 compressed_tokens,
2175 created_at: unix_millis() as i64,
2176 };
2177
2178 let conn = match pool.lock() {
2179 Ok(conn) => conn,
2180 Err(_) => {
2181 crate::slog_warn!(
2182 "compression event insert failed for {}: db mutex poisoned",
2183 metadata.task_id
2184 );
2185 return;
2186 }
2187 };
2188 match crate::db::compression_events::insert_compression_event(&conn, &row) {
2189 Ok(_) => {
2190 crate::slog_debug!(
2194 "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2195 metadata.task_id,
2196 project_key,
2197 metadata.session_id,
2198 original_tokens,
2199 compressed_tokens
2200 );
2201 }
2202 Err(error) => {
2203 crate::slog_warn!(
2204 "compression event insert failed for {}: {}",
2205 metadata.task_id,
2206 error
2207 );
2208 }
2209 }
2210 }
2211
2212 fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2213 let Ok(progress_sender) = self
2214 .inner
2215 .progress_sender
2216 .lock()
2217 .map(|sender| sender.clone())
2218 else {
2219 return;
2220 };
2221 if let Some(sender) = progress_sender.as_ref() {
2222 sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2223 pattern_match.task_id,
2224 session_id.to_string(),
2225 pattern_match.watch_id,
2226 pattern_match.match_text,
2227 pattern_match.match_offset,
2228 pattern_match.context,
2229 pattern_match.once,
2230 )));
2231 }
2232 }
2233
2234 fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2235 let Ok(progress_sender) = self
2236 .inner
2237 .progress_sender
2238 .lock()
2239 .map(|sender| sender.clone())
2240 else {
2241 return;
2242 };
2243 let Some(sender) = progress_sender.as_ref() else {
2244 return;
2245 };
2246 let status = completion_status_text(&completion.status, completion.exit_code);
2247 let preview = completion.output_preview.trim_end();
2248 let context = if preview.is_empty() {
2249 format!("task {} exited ({status})", completion.task_id)
2250 } else {
2251 format!(
2252 "task {} exited ({status})
2253{preview}",
2254 completion.task_id
2255 )
2256 };
2257 sender(PushFrame::BashPatternMatch(
2258 BashPatternMatchFrame::task_exit(
2259 completion.task_id.clone(),
2260 completion.session_id.clone(),
2261 format!("exited ({status})"),
2262 context,
2263 ),
2264 ));
2265 }
2266
2267 fn emit_bash_completed(&self, completion: BgCompletion) {
2268 let Ok(progress_sender) = self
2269 .inner
2270 .progress_sender
2271 .lock()
2272 .map(|sender| sender.clone())
2273 else {
2274 return;
2275 };
2276 let Some(sender) = progress_sender.as_ref() else {
2277 return;
2278 };
2279 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2287 completion.task_id,
2288 completion.session_id,
2289 completion.status,
2290 completion.exit_code,
2291 completion.command,
2292 completion.output_preview,
2293 completion.output_truncated,
2294 completion.original_tokens,
2295 completion.compressed_tokens,
2296 completion.tokens_skipped,
2297 )));
2298 }
2299
2300 fn completion_token_counts(
2301 &self,
2302 metadata: &PersistedTask,
2303 buffer: Option<&BgBuffer>,
2304 paths: Option<&TaskPaths>,
2305 ) -> CompletionTokenCounts {
2306 if metadata.mode == BgMode::Pty {
2307 return CompletionTokenCounts::skipped();
2308 }
2309
2310 let raw = match buffer {
2311 Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2312 None => paths
2313 .map(|paths| {
2314 read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2315 })
2316 .unwrap_or(TokenCountInput::Skipped),
2317 };
2318
2319 let TokenCountInput::Text(raw_output) = raw else {
2320 return CompletionTokenCounts::skipped();
2321 };
2322
2323 let original_tokens = token_count_u32(&raw_output);
2324 let original_bytes = raw_output.len() as i64;
2325 let compressed_output = if metadata.compressed {
2326 self.compress_output(&metadata.command, raw_output)
2327 } else {
2328 raw_output
2329 };
2330 let compressed_tokens = token_count_u32(&compressed_output);
2331 let compressed_bytes = compressed_output.len() as i64;
2332 CompletionTokenCounts {
2333 original_tokens: Some(original_tokens),
2334 compressed_tokens: Some(compressed_tokens),
2335 original_bytes: Some(original_bytes),
2336 compressed_bytes: Some(compressed_bytes),
2337 tokens_skipped: false,
2338 }
2339 }
2340
2341 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2342 if !self
2343 .inner
2344 .long_running_reminder_enabled
2345 .load(Ordering::SeqCst)
2346 {
2347 return;
2348 }
2349 let interval_ms = self
2350 .inner
2351 .long_running_reminder_interval_ms
2352 .load(Ordering::SeqCst);
2353 if interval_ms == 0 {
2354 return;
2355 }
2356 let interval = Duration::from_millis(interval_ms);
2357 let now = Instant::now();
2358 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2359 return;
2360 };
2361 let since = last_reminder_at.unwrap_or(task.started);
2362 if now.duration_since(since) < interval {
2363 return;
2364 }
2365 let command = task
2366 .state
2367 .lock()
2368 .map(|state| state.metadata.command.clone())
2369 .unwrap_or_default();
2370 *last_reminder_at = Some(now);
2371 self.emit_bash_long_running(BashLongRunningFrame::new(
2372 task.task_id.clone(),
2373 task.session_id.clone(),
2374 command,
2375 task.started.elapsed().as_millis() as u64,
2376 ));
2377 }
2378
2379 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2380 let Ok(progress_sender) = self
2381 .inner
2382 .progress_sender
2383 .lock()
2384 .map(|sender| sender.clone())
2385 else {
2386 return;
2387 };
2388 if let Some(sender) = progress_sender.as_ref() {
2389 sender(PushFrame::BashLongRunning(frame));
2390 }
2391 }
2392
2393 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2394 self.inner
2395 .tasks
2396 .lock()
2397 .ok()
2398 .and_then(|tasks| tasks.get(task_id).cloned())
2399 }
2400
2401 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2402 self.task(task_id)
2403 .filter(|task| task.session_id == session_id)
2404 }
2405
2406 fn running_count(&self) -> usize {
2407 self.inner
2408 .tasks
2409 .lock()
2410 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2411 .unwrap_or(0)
2412 }
2413
2414 fn start_watchdog(&self) {
2415 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2416 super::watchdog::start(self.clone());
2417 }
2418 }
2419
2420 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2421 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2422 }
2423
2424 #[cfg(test)]
2425 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2426 self.task_for_session(task_id, session_id)
2427 .map(|task| task.paths.json.clone())
2428 }
2429
2430 #[cfg(test)]
2431 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2432 self.task_for_session(task_id, session_id)
2433 .map(|task| task.paths.exit.clone())
2434 }
2435
2436 fn generate_unique_task_id(&self) -> Result<String, String> {
2438 for _ in 0..32 {
2439 let candidate = random_slug();
2440 let tasks = self
2441 .inner
2442 .tasks
2443 .lock()
2444 .map_err(|_| "background task registry lock poisoned".to_string())?;
2445 if tasks.contains_key(&candidate) {
2446 continue;
2447 }
2448 let completions = self
2449 .inner
2450 .completions
2451 .lock()
2452 .map_err(|_| "background completions lock poisoned".to_string())?;
2453 if completions
2454 .iter()
2455 .any(|completion| completion.task_id == candidate)
2456 {
2457 continue;
2458 }
2459 return Ok(candidate);
2460 }
2461 Err("failed to allocate unique background task id after 32 attempts".to_string())
2462 }
2463}
2464
2465struct CompletionTokenCounts {
2466 original_tokens: Option<u32>,
2467 compressed_tokens: Option<u32>,
2468 original_bytes: Option<i64>,
2469 compressed_bytes: Option<i64>,
2470 tokens_skipped: bool,
2471}
2472
2473impl CompletionTokenCounts {
2474 fn skipped() -> Self {
2475 Self {
2476 original_tokens: None,
2477 compressed_tokens: None,
2478 original_bytes: None,
2479 compressed_bytes: None,
2480 tokens_skipped: true,
2481 }
2482 }
2483}
2484
2485fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
2486 match status {
2487 BgTaskStatus::TimedOut => "timed out".to_string(),
2488 BgTaskStatus::Killed => "killed".to_string(),
2489 _ => exit_code
2490 .map(|code| format!("exit {code}"))
2491 .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
2492 }
2493}
2494
2495fn token_count_u32(text: &str) -> u32 {
2496 aft_tokenizer::count_tokens(text)
2497 .try_into()
2498 .unwrap_or(u32::MAX)
2499}
2500
2501impl Default for BgTaskRegistry {
2502 fn default() -> Self {
2503 Self::new(Arc::new(Mutex::new(None)))
2504 }
2505}
2506
2507fn modified_within(path: &Path, grace: Duration) -> bool {
2508 fs::metadata(path)
2509 .and_then(|metadata| metadata.modified())
2510 .ok()
2511 .and_then(|modified| SystemTime::now().duration_since(modified).ok())
2512 .map(|age| age < grace)
2513 .unwrap_or(false)
2514}
2515
2516fn canonicalized_path(path: &Path) -> PathBuf {
2517 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
2518}
2519
2520fn started_instant_from_unix_millis(started_at: u64) -> Instant {
2521 let now_ms = SystemTime::now()
2522 .duration_since(UNIX_EPOCH)
2523 .ok()
2524 .map(|duration| duration.as_millis() as u64)
2525 .unwrap_or(started_at);
2526 let elapsed_ms = now_ms.saturating_sub(started_at);
2527 Instant::now()
2528 .checked_sub(Duration::from_millis(elapsed_ms))
2529 .unwrap_or_else(Instant::now)
2530}
2531
2532fn gc_quarantine(storage_dir: &Path) {
2533 let quarantine_root = storage_dir.join("bash-tasks-quarantine");
2534 let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
2535 return;
2536 };
2537 for session_entry in session_dirs.flatten() {
2538 let session_quarantine_dir = session_entry.path();
2539 if !session_quarantine_dir.is_dir() {
2540 continue;
2541 }
2542 let entries = match fs::read_dir(&session_quarantine_dir) {
2543 Ok(entries) => entries,
2544 Err(error) => {
2545 crate::slog_warn!(
2546 "failed to read background task quarantine dir {}: {error}",
2547 session_quarantine_dir.display()
2548 );
2549 continue;
2550 }
2551 };
2552 for entry in entries.flatten() {
2553 let path = entry.path();
2554 if modified_within(&path, QUARANTINE_GC_GRACE) {
2555 continue;
2556 }
2557 let result = if path.is_dir() {
2558 fs::remove_dir_all(&path)
2559 } else {
2560 fs::remove_file(&path)
2561 };
2562 match result {
2563 Ok(()) => log::debug!(
2564 "deleted old background task quarantine entry {}",
2565 path.display()
2566 ),
2567 Err(error) => crate::slog_warn!(
2568 "failed to delete old background task quarantine entry {}: {error}",
2569 path.display()
2570 ),
2571 }
2572 }
2573 let _ = fs::remove_dir(&session_quarantine_dir);
2574 }
2575 let _ = fs::remove_dir(&quarantine_root);
2576}
2577
2578enum QuarantineKind {
2579 Corrupt,
2580 Invalid,
2581}
2582
2583fn quarantine_task_json(
2584 storage_dir: &Path,
2585 session_dir: &Path,
2586 json_path: &Path,
2587 kind: QuarantineKind,
2588) -> Result<(), String> {
2589 let session_hash = session_dir
2590 .file_name()
2591 .and_then(|name| name.to_str())
2592 .ok_or_else(|| {
2593 format!(
2594 "invalid background task session dir: {}",
2595 session_dir.display()
2596 )
2597 })?;
2598 let task_name = json_path
2599 .file_name()
2600 .and_then(|name| name.to_str())
2601 .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
2602 let unix_ts = SystemTime::now()
2603 .duration_since(UNIX_EPOCH)
2604 .map(|duration| duration.as_secs())
2605 .unwrap_or(0);
2606 let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
2607 fs::create_dir_all(&quarantine_dir).map_err(|e| {
2608 format!(
2609 "failed to create background task quarantine dir {}: {e}",
2610 quarantine_dir.display()
2611 )
2612 })?;
2613 let target_name = quarantine_name(task_name, unix_ts, &kind);
2614 let target = quarantine_dir.join(target_name);
2615 fs::rename(json_path, &target).map_err(|e| {
2616 format!(
2617 "failed to quarantine background task metadata {} to {}: {e}",
2618 json_path.display(),
2619 target.display()
2620 )
2621 })?;
2622
2623 for sibling in task_sibling_paths(json_path) {
2624 if !sibling.exists() {
2625 continue;
2626 }
2627 let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
2628 crate::slog_warn!(
2629 "skipping background task sibling with invalid name during quarantine: {}",
2630 sibling.display()
2631 );
2632 continue;
2633 };
2634 let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
2635 if let Err(error) = fs::rename(&sibling, &sibling_target) {
2636 crate::slog_warn!(
2637 "failed to quarantine background task sibling {} to {}: {error}",
2638 sibling.display(),
2639 sibling_target.display()
2640 );
2641 }
2642 }
2643
2644 let _ = fs::remove_dir(session_dir);
2645 Ok(())
2646}
2647
2648fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
2649 match kind {
2650 QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
2651 QuarantineKind::Invalid => {
2652 let path = Path::new(file_name);
2653 let stem = path.file_stem().and_then(|stem| stem.to_str());
2654 let extension = path.extension().and_then(|extension| extension.to_str());
2655 match (stem, extension) {
2656 (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
2657 _ => format!("{file_name}.invalid.{unix_ts}"),
2658 }
2659 }
2660 }
2661}
2662
2663fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
2664 let Some(parent) = json_path.parent() else {
2665 return Vec::new();
2666 };
2667 let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
2668 return Vec::new();
2669 };
2670 ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
2671 .into_iter()
2672 .map(|extension| parent.join(format!("{stem}.{extension}")))
2673 .collect()
2674}
2675
2676fn read_tail_from_disk(
2677 metadata: &PersistedTask,
2678 paths: &TaskPaths,
2679 max_bytes: usize,
2680) -> (String, bool) {
2681 if metadata.mode == BgMode::Pty {
2682 return read_file_tail_capped(&paths.pty, max_bytes)
2683 .map(|bytes| {
2684 let truncated = fs::metadata(&paths.pty)
2685 .map(|metadata| metadata.len() > max_bytes as u64)
2686 .unwrap_or(false);
2687 (String::from_utf8_lossy(&bytes).into_owned(), truncated)
2688 })
2689 .unwrap_or_else(|_| (String::new(), false));
2690 }
2691 let stdout = fs::read(&paths.stdout).unwrap_or_default();
2692 let stderr = fs::read(&paths.stderr).unwrap_or_default();
2693 let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
2694 bytes.extend_from_slice(&stdout);
2695 bytes.extend_from_slice(&stderr);
2696 if bytes.len() <= max_bytes {
2697 return (String::from_utf8_lossy(&bytes).into_owned(), false);
2698 }
2699 let start = bytes.len().saturating_sub(max_bytes);
2700 (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
2701}
2702
2703fn read_for_token_count_from_disk(
2704 metadata: &PersistedTask,
2705 paths: &TaskPaths,
2706 max_bytes_per_stream: usize,
2707) -> TokenCountInput {
2708 if metadata.mode == BgMode::Pty {
2709 return TokenCountInput::Skipped;
2710 }
2711 let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
2718 let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
2719 match (stdout, stderr) {
2720 (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2721 String::from_utf8_lossy(&stdout).as_ref(),
2722 String::from_utf8_lossy(&stderr).as_ref(),
2723 )),
2724 (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
2725 String::from_utf8_lossy(&stdout).as_ref(),
2726 "",
2727 )),
2728 (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2729 "",
2730 String::from_utf8_lossy(&stderr).as_ref(),
2731 )),
2732 (Err(_), Err(_)) => TokenCountInput::Skipped,
2733 }
2734}
2735
2736fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
2741 use std::io::{Read, Seek, SeekFrom};
2742 let mut file = std::fs::File::open(path)?;
2743 let len = file.metadata()?.len();
2744 let read_len = len.min(max_bytes as u64);
2745 if read_len > 0 && len > max_bytes as u64 {
2746 file.seek(SeekFrom::End(-(read_len as i64)))?;
2747 }
2748 let mut bytes = Vec::with_capacity(read_len as usize);
2749 file.read_to_end(&mut bytes)?;
2750 Ok(bytes)
2751}
2752
2753impl BgTask {
2754 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
2755 let state = self
2756 .state
2757 .lock()
2758 .unwrap_or_else(|poison| poison.into_inner());
2759 self.snapshot_locked(&state, preview_bytes)
2760 }
2761
2762 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
2763 let metadata = &state.metadata;
2764 let duration_ms = metadata.duration_ms.or_else(|| {
2765 metadata
2766 .status
2767 .is_terminal()
2768 .then(|| self.started.elapsed().as_millis() as u64)
2769 });
2770 let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
2771 (String::new(), false)
2772 } else {
2773 state.buffer.read_tail(preview_bytes)
2774 };
2775 BgTaskSnapshot {
2776 info: BgTaskInfo {
2777 task_id: self.task_id.clone(),
2778 status: metadata.status.clone(),
2779 command: metadata.command.clone(),
2780 mode: metadata.mode.clone(),
2781 started_at: metadata.started_at,
2782 duration_ms,
2783 },
2784 exit_code: metadata.exit_code,
2785 child_pid: metadata.child_pid,
2786 workdir: metadata.workdir.display().to_string(),
2787 output_preview,
2788 output_truncated,
2789 output_path: state
2790 .buffer
2791 .output_path()
2792 .map(|path| path.display().to_string()),
2793 stderr_path: state
2794 .buffer
2795 .stderr_path()
2796 .map(|path| path.display().to_string()),
2797 pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
2798 pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
2799 }
2800 }
2801
2802 pub(crate) fn is_running(&self) -> bool {
2803 self.state
2804 .lock()
2805 .map(|state| {
2806 state.metadata.status == BgTaskStatus::Running
2807 || (state.metadata.mode == BgMode::Pty
2808 && state.metadata.status == BgTaskStatus::Killing)
2809 })
2810 .unwrap_or(false)
2811 }
2812
2813 fn is_terminal(&self) -> bool {
2814 self.state
2815 .lock()
2816 .map(|state| state.metadata.status.is_terminal())
2817 .unwrap_or(false)
2818 }
2819
2820 fn mark_terminal_now(&self) {
2821 if let Ok(mut terminal_at) = self.terminal_at.lock() {
2822 if terminal_at.is_none() {
2823 *terminal_at = Some(Instant::now());
2824 }
2825 }
2826 }
2827
2828 fn set_completion_delivered(
2829 &self,
2830 delivered: bool,
2831 registry: &BgTaskRegistry,
2832 ) -> Result<(), String> {
2833 let mut state = self
2834 .state
2835 .lock()
2836 .map_err(|_| "background task lock poisoned".to_string())?;
2837 let updated = registry
2838 .update_task_metadata(&self.paths, |metadata| {
2839 metadata.completion_delivered = delivered;
2840 })
2841 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
2842 state.metadata = updated;
2843 Ok(())
2844 }
2845}
2846
2847fn terminal_metadata_from_marker(
2848 mut metadata: PersistedTask,
2849 marker: ExitMarker,
2850 reason: Option<String>,
2851) -> PersistedTask {
2852 match marker {
2853 ExitMarker::Code(code) => {
2854 let status = if code == 0 {
2855 BgTaskStatus::Completed
2856 } else {
2857 BgTaskStatus::Failed
2858 };
2859 metadata.mark_terminal(status, Some(code), reason);
2860 }
2861 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
2862 }
2863 metadata
2864}
2865
2866#[cfg(unix)]
2867fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
2868 let shell = resolve_posix_shell();
2869 let mut cmd = Command::new(&shell);
2870 cmd.arg("-c")
2871 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
2872 .arg(&shell)
2873 .arg(command)
2874 .arg(exit_path);
2875 unsafe {
2876 cmd.pre_exec(|| {
2877 if libc::setsid() == -1 {
2878 return Err(std::io::Error::last_os_error());
2879 }
2880 Ok(())
2881 });
2882 }
2883 cmd
2884}
2885
2886#[cfg(unix)]
2887fn resolve_posix_shell() -> PathBuf {
2888 static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
2889 POSIX_SHELL
2890 .get_or_init(|| {
2891 std::env::var_os("BASH")
2892 .filter(|value| !value.is_empty())
2893 .map(PathBuf::from)
2894 .filter(|path| path.exists())
2895 .or_else(|| which::which("bash").ok())
2896 .or_else(|| which::which("zsh").ok())
2897 .unwrap_or_else(|| PathBuf::from("/bin/sh"))
2898 })
2899 .clone()
2900}
2901
2902#[cfg(windows)]
2903fn detached_shell_command_for(
2904 shell: crate::windows_shell::WindowsShell,
2905 command: &str,
2906 exit_path: &Path,
2907 paths: &TaskPaths,
2908 creation_flags: u32,
2909) -> Result<Command, String> {
2910 use crate::windows_shell::WindowsShell;
2911 let wrapper_body = shell.wrapper_script(command, exit_path);
2924 let wrapper_ext = match shell {
2925 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
2926 WindowsShell::Cmd => "bat",
2927 WindowsShell::Posix(_) => "sh",
2931 };
2932 let wrapper_path = paths.dir.join(format!(
2933 "{}.{}",
2934 paths
2935 .json
2936 .file_stem()
2937 .and_then(|s| s.to_str())
2938 .unwrap_or("wrapper"),
2939 wrapper_ext
2940 ));
2941 fs::write(&wrapper_path, wrapper_body)
2942 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
2943
2944 let mut cmd = Command::new(shell.binary().as_ref());
2945 match shell {
2946 WindowsShell::Pwsh | WindowsShell::Powershell => {
2947 cmd.args([
2950 "-NoLogo",
2951 "-NoProfile",
2952 "-NonInteractive",
2953 "-ExecutionPolicy",
2954 "Bypass",
2955 "-File",
2956 ]);
2957 cmd.arg(&wrapper_path);
2958 }
2959 WindowsShell::Cmd => {
2960 cmd.args(["/D", "/C"]);
2967 cmd.arg(&wrapper_path);
2968 }
2969 WindowsShell::Posix(_) => {
2970 cmd.arg(&wrapper_path);
2975 }
2976 }
2977
2978 cmd.creation_flags(creation_flags);
2982 Ok(cmd)
2983}
2984
2985fn spawn_detached_child(
3001 command: &str,
3002 paths: &TaskPaths,
3003 workdir: &Path,
3004 env: &HashMap<String, String>,
3005) -> Result<std::process::Child, String> {
3006 #[cfg(not(windows))]
3007 {
3008 let stdout = create_capture_file(&paths.stdout)
3009 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3010 let stderr = create_capture_file(&paths.stderr)
3011 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3012 detached_shell_command(command, &paths.exit)
3013 .current_dir(workdir)
3014 .envs(env)
3015 .stdin(Stdio::null())
3016 .stdout(Stdio::from(stdout))
3017 .stderr(Stdio::from(stderr))
3018 .spawn()
3019 .map_err(|e| format!("failed to spawn background bash command: {e}"))
3020 }
3021 #[cfg(windows)]
3022 {
3023 use crate::windows_shell::shell_candidates;
3024 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3035 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3056 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3057 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3058 let with_breakaway =
3059 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3060 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3061 let mut last_error: Option<String> = None;
3062 for (idx, shell) in candidates.iter().enumerate() {
3063 for &flags in &[with_breakaway, without_breakaway] {
3067 let stdout = create_capture_file(&paths.stdout)
3069 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3070 let stderr = create_capture_file(&paths.stderr)
3071 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3072 let mut cmd =
3073 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3074 cmd.current_dir(workdir)
3075 .envs(env)
3076 .stdin(Stdio::null())
3077 .stdout(Stdio::from(stdout))
3078 .stderr(Stdio::from(stderr));
3079 match cmd.spawn() {
3080 Ok(child) => {
3081 if idx > 0 {
3082 crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3083 the cached PATH probe disagreed with runtime spawn — likely PATH \
3084 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3085 shell.binary(),
3086 idx);
3087 }
3088 if flags == without_breakaway {
3089 crate::slog_warn!(
3090 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3091 (likely a restrictive Job Object — CI sandbox or MDM policy). \
3092 Spawned without breakaway; the bg task will be torn down if the \
3093 AFT process group is killed."
3094 );
3095 }
3096 return Ok(child);
3097 }
3098 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3099 crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3100 shell.binary());
3101 last_error = Some(format!("{}: {e}", shell.binary()));
3102 break;
3105 }
3106 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3107 crate::slog_warn!(
3109 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3110 Access Denied — retrying {} without breakaway",
3111 shell.binary()
3112 );
3113 last_error = Some(format!("{}: {e}", shell.binary()));
3114 continue;
3115 }
3116 Err(e) => {
3117 return Err(format!(
3118 "failed to spawn background bash command via {}: {e}",
3119 shell.binary()
3120 ));
3121 }
3122 }
3123 }
3124 }
3125 Err(format!(
3126 "failed to spawn background bash command: no Windows shell could be spawned. \
3127 Last error: {}. PATH-probed candidates: {:?}",
3128 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3129 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3130 ))
3131 }
3132}
3133
3134fn random_slug() -> String {
3135 let mut bytes = [0u8; 4];
3136 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3138 let t = SystemTime::now()
3140 .duration_since(UNIX_EPOCH)
3141 .map(|d| d.subsec_nanos())
3142 .unwrap_or(0);
3143 let p = std::process::id();
3144 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
3145 });
3146 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3148 format!("bash-{hex}")
3149}
3150
3151#[cfg(test)]
3152mod tests {
3153 use std::collections::HashMap;
3154 #[cfg(windows)]
3155 use std::fs;
3156 use std::sync::{Arc, Mutex};
3157 use std::time::Duration;
3158 #[cfg(windows)]
3159 use std::time::Instant;
3160
3161 use super::*;
3162
3163 #[cfg(unix)]
3164 const QUICK_SUCCESS_COMMAND: &str = "true";
3165 #[cfg(windows)]
3166 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3167
3168 #[cfg(unix)]
3169 const LONG_RUNNING_COMMAND: &str = "sleep 5";
3170 #[cfg(windows)]
3171 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3172
3173 #[test]
3174 fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
3175 let registry = BgTaskRegistry::default();
3176 let dir = tempfile::tempdir().unwrap();
3177 let task_id = registry
3178 .spawn_pty(
3179 QUICK_SUCCESS_COMMAND,
3180 "session".to_string(),
3181 dir.path().to_path_buf(),
3182 HashMap::new(),
3183 Some(Duration::from_secs(30)),
3184 dir.path().to_path_buf(),
3185 10,
3186 true,
3187 false,
3188 Some(dir.path().to_path_buf()),
3189 50,
3190 120,
3191 )
3192 .unwrap();
3193
3194 let paths = task_paths(dir.path(), "session", &task_id);
3195 let metadata = read_task(&paths.json).unwrap();
3196 assert_eq!(
3197 metadata.schema_version,
3198 crate::bash_background::persistence::SCHEMA_VERSION
3199 );
3200 assert_eq!(metadata.mode, BgMode::Pty);
3201 assert_eq!(metadata.pty_rows, Some(50));
3202 assert_eq!(metadata.pty_cols, Some(120));
3203
3204 let snapshot = registry
3205 .status(&task_id, "session", None, Some(dir.path()), 1024)
3206 .unwrap();
3207 assert_eq!(snapshot.pty_rows, Some(50));
3208 assert_eq!(snapshot.pty_cols, Some(120));
3209 }
3210
3211 fn spawn_dead_child() -> std::process::Child {
3216 #[cfg(unix)]
3217 let mut cmd = std::process::Command::new("true");
3218 #[cfg(windows)]
3219 let mut cmd = {
3220 let mut c = std::process::Command::new("cmd");
3221 c.args(["/c", "exit", "0"]);
3222 c
3223 };
3224 cmd.stdin(std::process::Stdio::null());
3225 cmd.stdout(std::process::Stdio::null());
3226 cmd.stderr(std::process::Stdio::null());
3227 let mut child = cmd.spawn().expect("spawn replacement child for reap test");
3228 let started = Instant::now();
3237 loop {
3238 match child.try_wait() {
3239 Ok(Some(_)) => break,
3240 Ok(None) => {
3241 if started.elapsed() > Duration::from_secs(5) {
3242 panic!("dead-child stand-in did not exit within 5s");
3243 }
3244 std::thread::sleep(Duration::from_millis(10));
3245 }
3246 Err(error) => panic!("dead-child try_wait failed: {error}"),
3247 }
3248 }
3249 child
3250 }
3251
3252 #[test]
3253 fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
3254 let registry = BgTaskRegistry::default();
3255 let dir = tempfile::tempdir().unwrap();
3256 let task_id = registry
3257 .spawn(
3258 LONG_RUNNING_COMMAND,
3259 "session".to_string(),
3260 dir.path().to_path_buf(),
3261 HashMap::new(),
3262 Some(Duration::from_secs(30)),
3263 dir.path().to_path_buf(),
3264 10,
3265 true,
3266 false,
3267 Some(dir.path().to_path_buf()),
3268 )
3269 .unwrap();
3270 registry
3271 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3272 .unwrap();
3273 assert_eq!(
3274 registry
3275 .drain_completions_for_session(Some("session"))
3276 .len(),
3277 1
3278 );
3279
3280 registry.inner.completions.lock().unwrap().clear();
3283
3284 assert_eq!(
3285 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
3286 vec![task_id.clone()]
3287 );
3288 assert!(registry
3289 .drain_completions_for_session(Some("session"))
3290 .is_empty());
3291
3292 let paths = task_paths(dir.path(), "session", &task_id);
3293 let metadata = read_task(&paths.json).unwrap();
3294 assert!(metadata.completion_delivered);
3295
3296 let replayed = BgTaskRegistry::default();
3297 replayed
3298 .replay_session_inner(dir.path(), "session", None)
3299 .unwrap();
3300 assert!(replayed
3301 .drain_completions_for_session(Some("session"))
3302 .is_empty());
3303 }
3304
3305 #[test]
3306 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
3307 let registry = BgTaskRegistry::default();
3308 let dir = tempfile::tempdir().unwrap();
3309 let task_id = registry
3310 .spawn(
3311 QUICK_SUCCESS_COMMAND,
3312 "session".to_string(),
3313 dir.path().to_path_buf(),
3314 HashMap::new(),
3315 Some(Duration::from_secs(30)),
3316 dir.path().to_path_buf(),
3317 10,
3318 true,
3319 false,
3320 Some(dir.path().to_path_buf()),
3321 )
3322 .unwrap();
3323 registry
3324 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3325 .unwrap();
3326 let completions = registry.drain_completions_for_session(Some("session"));
3327 assert_eq!(completions.len(), 1);
3328 assert_eq!(
3329 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
3330 vec![task_id.clone()]
3331 );
3332
3333 registry.cleanup_finished(Duration::ZERO);
3334
3335 assert!(registry.inner.tasks.lock().unwrap().is_empty());
3336 }
3337
3338 #[test]
3339 fn cleanup_finished_retains_undelivered_terminals() {
3340 let registry = BgTaskRegistry::default();
3341 let dir = tempfile::tempdir().unwrap();
3342 let task_id = registry
3343 .spawn(
3344 QUICK_SUCCESS_COMMAND,
3345 "session".to_string(),
3346 dir.path().to_path_buf(),
3347 HashMap::new(),
3348 Some(Duration::from_secs(30)),
3349 dir.path().to_path_buf(),
3350 10,
3351 true,
3352 false,
3353 Some(dir.path().to_path_buf()),
3354 )
3355 .unwrap();
3356 registry
3357 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3358 .unwrap();
3359
3360 registry.cleanup_finished(Duration::ZERO);
3361
3362 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
3363 }
3364
3365 #[test]
3373 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
3374 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3375 let dir = tempfile::tempdir().unwrap();
3376 let task_id = registry
3377 .spawn(
3378 QUICK_SUCCESS_COMMAND,
3379 "session".to_string(),
3380 dir.path().to_path_buf(),
3381 HashMap::new(),
3382 Some(Duration::from_secs(30)),
3383 dir.path().to_path_buf(),
3384 10,
3385 true,
3386 false,
3387 Some(dir.path().to_path_buf()),
3388 )
3389 .unwrap();
3390
3391 let task = registry.task_for_session(&task_id, "session").unwrap();
3392
3393 let started = Instant::now();
3398 loop {
3399 let exited = {
3400 let mut state = task.state.lock().unwrap();
3401 match &mut state.runtime {
3402 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
3403 _ => true,
3404 }
3405 };
3406 if exited {
3407 break;
3408 }
3409 assert!(
3410 started.elapsed() < Duration::from_secs(5),
3411 "child should exit quickly"
3412 );
3413 std::thread::sleep(Duration::from_millis(20));
3414 }
3415
3416 registry
3424 .inner
3425 .shutdown
3426 .store(true, std::sync::atomic::Ordering::SeqCst);
3427 std::thread::sleep(Duration::from_millis(550));
3431
3432 let _ = std::fs::remove_file(&task.paths.exit);
3435
3436 {
3451 let mut state = task.state.lock().unwrap();
3452 state.metadata.status = BgTaskStatus::Running;
3453 state.metadata.status_reason = None;
3454 state.metadata.exit_code = None;
3455 state.metadata.finished_at = None;
3456 state.metadata.duration_ms = None;
3457 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
3460 .expect("persist reset Running metadata for reap_child test");
3461 if matches!(state.runtime, TaskRuntime::Piped(None)) {
3465 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
3466 }
3467 }
3468 *task.terminal_at.lock().unwrap() = None;
3471
3472 assert!(
3475 task.is_running(),
3476 "precondition: metadata.status == Running"
3477 );
3478 assert!(
3479 !task.paths.exit.exists(),
3480 "precondition: exit marker absent"
3481 );
3482
3483 registry.reap_child(&task);
3488
3489 {
3490 let state = task.state.lock().unwrap();
3491 assert_eq!(
3492 state.metadata.status,
3493 BgTaskStatus::Running,
3494 "first reap must leave status Running while waiting one pass for marker"
3495 );
3496 assert_eq!(
3497 state.metadata.status_reason, None,
3498 "first reap must not record a failure reason"
3499 );
3500 assert!(
3501 matches!(state.runtime, TaskRuntime::Piped(None)),
3502 "child handle must be released after first reap"
3503 );
3504 assert!(
3505 state.detached,
3506 "task must be marked detached after first reap"
3507 );
3508 }
3509
3510 registry.reap_child(&task);
3514
3515 let state = task.state.lock().unwrap();
3516 assert!(
3517 state.metadata.status.is_terminal(),
3518 "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
3519 state.metadata.status
3520 );
3521 assert_eq!(
3522 state.metadata.status,
3523 BgTaskStatus::Failed,
3524 "must specifically be Failed (not Killed): status={:?}",
3525 state.metadata.status
3526 );
3527 assert_eq!(
3528 state.metadata.status_reason.as_deref(),
3529 Some("process exited without exit marker"),
3530 "reason must match replay path's wording: {:?}",
3531 state.metadata.status_reason
3532 );
3533 assert!(
3534 matches!(state.runtime, TaskRuntime::Piped(None)),
3535 "child handle must stay released after second reap"
3536 );
3537 assert!(
3538 state.detached,
3539 "task must remain detached after second reap"
3540 );
3541 }
3542
3543 #[test]
3548 fn reap_child_preserves_running_when_exit_marker_exists() {
3549 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3550 let dir = tempfile::tempdir().unwrap();
3551 let task_id = registry
3552 .spawn(
3553 QUICK_SUCCESS_COMMAND,
3554 "session".to_string(),
3555 dir.path().to_path_buf(),
3556 HashMap::new(),
3557 Some(Duration::from_secs(30)),
3558 dir.path().to_path_buf(),
3559 10,
3560 true,
3561 false,
3562 Some(dir.path().to_path_buf()),
3563 )
3564 .unwrap();
3565
3566 let task = registry.task_for_session(&task_id, "session").unwrap();
3567
3568 let started = Instant::now();
3571 loop {
3572 let exited = {
3573 let mut state = task.state.lock().unwrap();
3574 match &mut state.runtime {
3575 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
3576 _ => true,
3577 }
3578 };
3579 if exited && task.paths.exit.exists() {
3580 break;
3581 }
3582 assert!(
3583 started.elapsed() < Duration::from_secs(5),
3584 "child should exit and write marker quickly"
3585 );
3586 std::thread::sleep(Duration::from_millis(20));
3587 }
3588
3589 registry
3595 .inner
3596 .shutdown
3597 .store(true, std::sync::atomic::Ordering::SeqCst);
3598 std::thread::sleep(Duration::from_millis(550));
3599
3600 {
3606 let mut state = task.state.lock().unwrap();
3607 state.metadata.status = BgTaskStatus::Running;
3608 state.metadata.status_reason = None;
3609 if matches!(state.runtime, TaskRuntime::Piped(None)) {
3610 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
3611 }
3612 }
3613 *task.terminal_at.lock().unwrap() = None;
3614 if !task.paths.exit.exists() {
3617 std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
3618 }
3619
3620 registry.reap_child(&task);
3624
3625 let state = task.state.lock().unwrap();
3626 assert!(
3627 matches!(state.runtime, TaskRuntime::Piped(None)),
3628 "child handle still released even when marker exists"
3629 );
3630 assert!(
3631 state.detached,
3632 "task still marked detached even when marker exists"
3633 );
3634 assert_eq!(
3639 state.metadata.status,
3640 BgTaskStatus::Running,
3641 "reap_child must defer to poll_task when marker exists"
3642 );
3643 }
3644
3645 #[test]
3646 fn cleanup_finished_keeps_running_tasks() {
3647 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3648 let dir = tempfile::tempdir().unwrap();
3649 let task_id = registry
3650 .spawn(
3651 LONG_RUNNING_COMMAND,
3652 "session".to_string(),
3653 dir.path().to_path_buf(),
3654 HashMap::new(),
3655 Some(Duration::from_secs(30)),
3656 dir.path().to_path_buf(),
3657 10,
3658 true,
3659 false,
3660 Some(dir.path().to_path_buf()),
3661 )
3662 .unwrap();
3663
3664 registry.cleanup_finished(Duration::ZERO);
3665
3666 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
3667 let _ = registry.kill(&task_id, "session");
3668 }
3669
3670 #[cfg(windows)]
3671 fn wait_for_file(path: &Path) -> String {
3672 let started = Instant::now();
3673 loop {
3674 if path.exists() {
3675 return fs::read_to_string(path).expect("read file");
3676 }
3677 assert!(
3678 started.elapsed() < Duration::from_secs(30),
3679 "timed out waiting for {}",
3680 path.display()
3681 );
3682 std::thread::sleep(Duration::from_millis(100));
3683 }
3684 }
3685
3686 #[cfg(windows)]
3687 fn spawn_windows_registry_command(
3688 command: &str,
3689 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
3690 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3691 let dir = tempfile::tempdir().unwrap();
3692 let task_id = registry
3693 .spawn(
3694 command,
3695 "session".to_string(),
3696 dir.path().to_path_buf(),
3697 HashMap::new(),
3698 Some(Duration::from_secs(30)),
3699 dir.path().to_path_buf(),
3700 10,
3701 false,
3702 false,
3703 Some(dir.path().to_path_buf()),
3704 )
3705 .unwrap();
3706 (registry, dir, task_id)
3707 }
3708
3709 #[cfg(windows)]
3710 #[test]
3711 fn windows_spawn_writes_exit_marker_for_zero_exit() {
3712 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
3713 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
3714
3715 let content = wait_for_file(&exit_path);
3716
3717 assert_eq!(content.trim(), "0");
3718 }
3719
3720 #[cfg(windows)]
3721 #[test]
3722 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
3723 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
3724 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
3725
3726 let content = wait_for_file(&exit_path);
3727
3728 assert_eq!(content.trim(), "42");
3729 }
3730
3731 #[cfg(windows)]
3732 #[test]
3733 fn windows_spawn_captures_stdout_to_disk() {
3734 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
3735 let task = registry.task_for_session(&task_id, "session").unwrap();
3736 let stdout_path = task.paths.stdout.clone();
3737 let exit_path = task.paths.exit.clone();
3738
3739 let _ = wait_for_file(&exit_path);
3740 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
3741
3742 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
3743 }
3744
3745 #[cfg(windows)]
3746 #[test]
3747 fn windows_spawn_uses_pwsh_when_available() {
3748 let candidates = crate::windows_shell::shell_candidates_with(
3752 |binary| match binary {
3753 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
3754 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
3755 _ => None,
3756 },
3757 || None,
3758 );
3759 let shell = candidates.first().expect("at least one candidate").clone();
3760 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
3761 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
3762 }
3763
3764 #[cfg(windows)]
3771 #[test]
3772 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
3773 let exit_path = Path::new(r"C:\Temp\bash-test.exit");
3774 let script =
3775 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
3776
3777 assert!(
3781 script.contains("set CODE=%ERRORLEVEL%"),
3782 "wrapper must capture exit code into CODE: {script}"
3783 );
3784 assert!(
3785 script.contains("echo %CODE% >"),
3786 "wrapper must echo CODE to a temp marker file: {script}"
3787 );
3788 assert!(
3789 script.contains("move /Y"),
3790 "wrapper must use atomic move to write the marker: {script}"
3791 );
3792 assert!(
3795 script.contains("> nul"),
3796 "wrapper must redirect move output to nul: {script}"
3797 );
3798 assert!(
3800 script.contains("exit /B %CODE%"),
3801 "wrapper must propagate the captured exit code: {script}"
3802 );
3803 assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
3804 assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
3805 }
3806
3807 #[cfg(windows)]
3813 #[test]
3814 fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
3815 use crate::windows_shell::WindowsShell;
3816 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
3817 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3818 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3819 assert_eq!(
3820 args_strs,
3821 vec!["/D", "/S", "/C", "echo wrapped"],
3822 "Cmd::bg_command must prepend /D /S /C"
3823 );
3824 }
3825
3826 #[cfg(windows)]
3830 #[test]
3831 fn windows_shell_pwsh_bg_command_uses_standard_args() {
3832 use crate::windows_shell::WindowsShell;
3833 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
3834 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3835 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3836 assert!(
3837 args_strs.contains(&"-Command"),
3838 "Pwsh::bg_command must use -Command: {args_strs:?}"
3839 );
3840 assert!(
3841 args_strs.contains(&"Get-Date"),
3842 "Pwsh::bg_command must include the user command body"
3843 );
3844 }
3845
3846 #[allow(dead_code)]
3877 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
3879}