1use std::collections::{HashMap, HashSet, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6#[cfg(unix)]
7use std::sync::OnceLock;
8use std::sync::{Arc, Mutex, RwLock};
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10
11use rusqlite::Connection;
12use serde::Serialize;
13
14use crate::context::SharedProgressSender;
15use crate::harness::Harness;
16use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, PushFrame};
17
18#[cfg(unix)]
19use std::os::unix::process::CommandExt;
20#[cfg(windows)]
21use std::os::windows::process::CommandExt;
22
23use super::buffer::{combine_streams, BgBuffer, TokenCountInput};
24use super::persistence::{
25 create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
26 task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, ExitMarker,
27 PersistedTask, TaskPaths,
28};
29use super::process::is_process_alive;
30#[cfg(unix)]
31use super::process::terminate_pgid;
32#[cfg(windows)]
33use super::process::terminate_pid;
34use super::{BgTaskInfo, BgTaskStatus};
35const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
43const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
44const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
45const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
46
47const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
54const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
55
56#[derive(Debug, Clone, Serialize)]
57pub struct BgCompletion {
58 pub task_id: String,
59 #[serde(skip_serializing)]
62 pub session_id: String,
63 pub status: BgTaskStatus,
64 pub exit_code: Option<i32>,
65 pub command: String,
66 #[serde(default, skip_serializing_if = "String::is_empty")]
72 pub output_preview: String,
73 #[serde(default, skip_serializing_if = "is_false")]
78 pub output_truncated: bool,
79 #[serde(default, skip_serializing_if = "Option::is_none")]
82 pub original_tokens: Option<u32>,
83 #[serde(default, skip_serializing_if = "Option::is_none")]
86 pub compressed_tokens: Option<u32>,
87 #[serde(default, skip_serializing_if = "is_false")]
89 pub tokens_skipped: bool,
90}
91
92fn is_false(v: &bool) -> bool {
93 !*v
94}
95
96#[derive(Debug, Clone, Serialize)]
97pub struct BgTaskSnapshot {
98 #[serde(flatten)]
99 pub info: BgTaskInfo,
100 pub exit_code: Option<i32>,
101 pub child_pid: Option<u32>,
102 pub workdir: String,
103 pub output_preview: String,
104 pub output_truncated: bool,
105 pub output_path: Option<String>,
106 pub stderr_path: Option<String>,
107}
108
109#[derive(Clone)]
110pub struct BgTaskRegistry {
111 pub(crate) inner: Arc<RegistryInner>,
112}
113
114pub(crate) struct RegistryInner {
115 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
116 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
117 pub(crate) progress_sender: SharedProgressSender,
118 watchdog_started: AtomicBool,
119 pub(crate) shutdown: AtomicBool,
120 pub(crate) long_running_reminder_enabled: AtomicBool,
121 pub(crate) long_running_reminder_interval_ms: AtomicU64,
122 persisted_gc_started: AtomicBool,
123 #[cfg(test)]
124 persisted_gc_runs: AtomicU64,
125 pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
131 pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
132 pub(crate) db_harness: RwLock<Option<String>>,
133}
134
135pub(crate) struct BgTask {
136 pub(crate) task_id: String,
137 pub(crate) session_id: String,
138 pub(crate) paths: TaskPaths,
139 pub(crate) started: Instant,
140 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
141 pub(crate) terminal_at: Mutex<Option<Instant>>,
142 pub(crate) state: Mutex<BgTaskState>,
143}
144
145pub(crate) struct BgTaskState {
146 pub(crate) metadata: PersistedTask,
147 pub(crate) child: Option<Child>,
148 pub(crate) detached: bool,
149 pub(crate) buffer: BgBuffer,
150}
151
152impl BgTaskRegistry {
153 pub fn new(progress_sender: SharedProgressSender) -> Self {
154 Self {
155 inner: Arc::new(RegistryInner {
156 tasks: Mutex::new(HashMap::new()),
157 completions: Mutex::new(VecDeque::new()),
158 progress_sender,
159 watchdog_started: AtomicBool::new(false),
160 shutdown: AtomicBool::new(false),
161 long_running_reminder_enabled: AtomicBool::new(true),
162 long_running_reminder_interval_ms: AtomicU64::new(600_000),
163 persisted_gc_started: AtomicBool::new(false),
164 #[cfg(test)]
165 persisted_gc_runs: AtomicU64::new(0),
166 compressor: Mutex::new(None),
167 db_pool: RwLock::new(None),
168 db_harness: RwLock::new(None),
169 }),
170 }
171 }
172
173 pub fn set_harness(&self, harness: Harness) {
174 if let Ok(mut slot) = self.inner.db_harness.write() {
175 *slot = Some(harness.as_str().to_string());
176 }
177 }
178
179 pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
180 if let Ok(mut slot) = self.inner.db_pool.write() {
181 *slot = Some(conn);
182 }
183 }
184
185 pub fn clear_db_pool(&self) {
186 if let Ok(mut slot) = self.inner.db_pool.write() {
187 *slot = None;
188 }
189 }
190
191 pub fn set_compressor<F>(&self, compressor: F)
196 where
197 F: Fn(&str, String) -> String + Send + Sync + 'static,
198 {
199 if let Ok(mut slot) = self.inner.compressor.lock() {
200 *slot = Some(Box::new(compressor));
201 }
202 }
203
204 pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
207 let Ok(slot) = self.inner.compressor.lock() else {
208 return output;
209 };
210 match slot.as_ref() {
211 Some(compressor) => compressor(command, output),
212 None => output,
213 }
214 }
215
216 fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
217 write_task(&paths.json, metadata)?;
218 self.dual_write_task(paths, metadata);
219 Ok(())
220 }
221
222 fn update_task_metadata<F>(
223 &self,
224 paths: &TaskPaths,
225 update: F,
226 ) -> std::io::Result<PersistedTask>
227 where
228 F: FnOnce(&mut PersistedTask),
229 {
230 let metadata = update_task(&paths.json, update)?;
231 self.dual_write_task(paths, &metadata);
232 Ok(metadata)
233 }
234
235 fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
236 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
237 let Some(pool) = pool else {
238 return;
239 };
240 let harness = self
241 .inner
242 .db_harness
243 .read()
244 .ok()
245 .and_then(|slot| slot.clone());
246 let Some(harness) = harness else {
247 crate::slog_warn!(
248 "dual-write bash_task to DB skipped for {}: harness not configured",
249 metadata.task_id
250 );
251 return;
252 };
253 let row = match metadata.to_bash_task_row(&harness, paths) {
254 Ok(row) => row,
255 Err(error) => {
256 crate::slog_warn!(
257 "dual-write bash_task to DB failed for {}: {}",
258 metadata.task_id,
259 error
260 );
261 return;
262 }
263 };
264 let conn = match pool.lock() {
265 Ok(conn) => conn,
266 Err(_) => {
267 crate::slog_warn!(
268 "dual-write bash_task to DB failed for {}: db mutex poisoned",
269 metadata.task_id
270 );
271 return;
272 }
273 };
274 if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
275 crate::slog_warn!(
276 "dual-write bash_task to DB failed for {}: {}",
277 metadata.task_id,
278 error
279 );
280 }
281 }
282
283 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
284 self.inner
285 .long_running_reminder_enabled
286 .store(enabled, Ordering::SeqCst);
287 self.inner
288 .long_running_reminder_interval_ms
289 .store(interval_ms, Ordering::SeqCst);
290 }
291
292 #[cfg(unix)]
293 #[allow(clippy::too_many_arguments)]
294 pub fn spawn(
295 &self,
296 command: &str,
297 session_id: String,
298 workdir: PathBuf,
299 env: HashMap<String, String>,
300 timeout: Option<Duration>,
301 storage_dir: PathBuf,
302 max_running: usize,
303 notify_on_completion: bool,
304 compressed: bool,
305 project_root: Option<PathBuf>,
306 ) -> Result<String, String> {
307 self.start_watchdog();
308
309 let running = self.running_count();
310 if running >= max_running {
311 return Err(format!(
312 "background bash task limit exceeded: {running} running (max {max_running})"
313 ));
314 }
315
316 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
317 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
318 let task_id = self.generate_unique_task_id()?;
319 let paths = task_paths(&storage_dir, &session_id, &task_id);
320 fs::create_dir_all(&paths.dir)
321 .map_err(|e| format!("failed to create background task dir: {e}"))?;
322
323 let mut metadata = PersistedTask::starting(
324 task_id.clone(),
325 session_id.clone(),
326 command.to_string(),
327 workdir.clone(),
328 project_root,
329 timeout_ms,
330 notify_on_completion,
331 compressed,
332 );
333 self.persist_task(&paths, &metadata)
334 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
335
336 create_capture_file(&paths.stdout)
340 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
341 create_capture_file(&paths.stderr)
342 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
343
344 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
345 Ok(child) => child,
346 Err(error) => {
347 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
348 let _ = delete_task_bundle(&paths);
349 return Err(error);
350 }
351 };
352
353 let child_pid = child.id();
354 metadata.mark_running(child_pid, child_pid as i32);
355 self.persist_task(&paths, &metadata)
356 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
357
358 let task = Arc::new(BgTask {
359 task_id: task_id.clone(),
360 session_id,
361 paths: paths.clone(),
362 started: Instant::now(),
363 last_reminder_at: Mutex::new(None),
364 terminal_at: Mutex::new(None),
365 state: Mutex::new(BgTaskState {
366 metadata,
367 child: Some(child),
368 detached: false,
369 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
370 }),
371 });
372
373 self.inner
374 .tasks
375 .lock()
376 .map_err(|_| "background task registry lock poisoned".to_string())?
377 .insert(task_id.clone(), task);
378
379 Ok(task_id)
380 }
381
382 #[cfg(windows)]
383 #[allow(clippy::too_many_arguments)]
384 pub fn spawn(
385 &self,
386 command: &str,
387 session_id: String,
388 workdir: PathBuf,
389 env: HashMap<String, String>,
390 timeout: Option<Duration>,
391 storage_dir: PathBuf,
392 max_running: usize,
393 notify_on_completion: bool,
394 compressed: bool,
395 project_root: Option<PathBuf>,
396 ) -> Result<String, String> {
397 self.start_watchdog();
398
399 let running = self.running_count();
400 if running >= max_running {
401 return Err(format!(
402 "background bash task limit exceeded: {running} running (max {max_running})"
403 ));
404 }
405
406 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
407 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
408 let task_id = self.generate_unique_task_id()?;
409 let paths = task_paths(&storage_dir, &session_id, &task_id);
410 fs::create_dir_all(&paths.dir)
411 .map_err(|e| format!("failed to create background task dir: {e}"))?;
412
413 let mut metadata = PersistedTask::starting(
414 task_id.clone(),
415 session_id.clone(),
416 command.to_string(),
417 workdir.clone(),
418 project_root,
419 timeout_ms,
420 notify_on_completion,
421 compressed,
422 );
423 self.persist_task(&paths, &metadata)
424 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
425
426 create_capture_file(&paths.stdout)
432 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
433 create_capture_file(&paths.stderr)
434 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
435
436 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
437 Ok(child) => child,
438 Err(error) => {
439 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
440 let _ = delete_task_bundle(&paths);
441 return Err(error);
442 }
443 };
444
445 let child_pid = child.id();
446 metadata.status = BgTaskStatus::Running;
447 metadata.child_pid = Some(child_pid);
448 metadata.pgid = None;
449 self.persist_task(&paths, &metadata)
450 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
451
452 let task = Arc::new(BgTask {
453 task_id: task_id.clone(),
454 session_id,
455 paths: paths.clone(),
456 started: Instant::now(),
457 last_reminder_at: Mutex::new(None),
458 terminal_at: Mutex::new(None),
459 state: Mutex::new(BgTaskState {
460 metadata,
461 child: Some(child),
462 detached: false,
463 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
464 }),
465 });
466
467 self.inner
468 .tasks
469 .lock()
470 .map_err(|_| "background task registry lock poisoned".to_string())?
471 .insert(task_id.clone(), task);
472
473 Ok(task_id)
474 }
475
476 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
477 self.replay_session_inner(storage_dir, session_id, None)
478 }
479
480 pub fn replay_session_for_project(
481 &self,
482 storage_dir: &Path,
483 session_id: &str,
484 project_root: &Path,
485 ) -> Result<(), String> {
486 self.replay_session_inner(storage_dir, session_id, Some(project_root))
487 }
488
489 fn replay_session_inner(
490 &self,
491 storage_dir: &Path,
492 session_id: &str,
493 project_root: Option<&Path>,
494 ) -> Result<(), String> {
495 self.start_watchdog();
496 if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
497 if let Err(error) = self.maybe_gc_persisted(storage_dir) {
498 crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
499 }
500 }
501
502 let canonical_project = project_root.map(canonicalized_path);
503 let tasks = match self.replay_session_from_db(session_id) {
504 Some(Ok(tasks)) if !tasks.is_empty() => tasks,
505 Some(Ok(_)) => {
506 crate::slog_info!(
507 "bash task replay DB miss for session {}; falling back to disk",
508 session_id
509 );
510 self.replay_session_from_disk(storage_dir, session_id)?
511 }
512 Some(Err(error)) => {
513 crate::slog_warn!(
514 "bash task replay DB lookup failed for session {}; falling back to disk: {}",
515 session_id,
516 error
517 );
518 self.replay_session_from_disk(storage_dir, session_id)?
519 }
520 None => {
521 crate::slog_info!(
522 "bash task replay DB unavailable for session {}; falling back to disk",
523 session_id
524 );
525 self.replay_session_from_disk(storage_dir, session_id)?
526 }
527 };
528
529 for mut metadata in tasks {
530 if metadata.session_id != session_id {
531 continue;
532 }
533 if let Some(canonical_project) = canonical_project.as_deref() {
534 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
535 if metadata_project.as_deref() != Some(canonical_project) {
536 continue;
537 }
538 }
539
540 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
541 match metadata.status {
542 BgTaskStatus::Starting => {
543 metadata.mark_terminal(
544 BgTaskStatus::Failed,
545 None,
546 Some("spawn aborted".to_string()),
547 );
548 let _ = self.persist_task(&paths, &metadata);
549 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
550 self.insert_rehydrated_task(metadata, paths, true)?;
551 }
552 BgTaskStatus::Running | BgTaskStatus::Killing => {
553 if self.running_metadata_is_stale(&metadata) {
554 metadata.mark_terminal(
555 BgTaskStatus::Killed,
556 None,
557 Some("orphaned (>24h)".to_string()),
558 );
559 if !paths.exit.exists() {
560 let _ = write_kill_marker_if_absent(&paths.exit);
561 }
562 let _ = self.persist_task(&paths, &metadata);
563 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
564 self.insert_rehydrated_task(metadata, paths, true)?;
565 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
566 let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
567 "recovered from inconsistent killing state on replay".to_string()
568 });
569 if reason.is_some() {
570 crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
571 metadata.task_id);
572 }
573 metadata = terminal_metadata_from_marker(metadata, marker, reason);
574 let _ = self.persist_task(&paths, &metadata);
575 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
576 self.insert_rehydrated_task(metadata, paths, true)?;
577 } else if metadata.status == BgTaskStatus::Killing {
578 if !paths.exit.exists() {
579 let _ = write_kill_marker_if_absent(&paths.exit);
580 }
581 metadata.mark_terminal(
582 BgTaskStatus::Killed,
583 None,
584 Some("recovered from inconsistent killing state on replay".to_string()),
585 );
586 let _ = self.persist_task(&paths, &metadata);
587 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
588 self.insert_rehydrated_task(metadata, paths, true)?;
589 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
590 metadata.mark_terminal(
591 BgTaskStatus::Failed,
592 None,
593 Some("process exited without exit marker".to_string()),
594 );
595 let _ = self.persist_task(&paths, &metadata);
596 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
597 self.insert_rehydrated_task(metadata, paths, true)?;
598 } else {
599 self.insert_rehydrated_task(metadata, paths, true)?;
600 }
601 }
602 _ if metadata.status.is_terminal() => {
603 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
609 self.insert_rehydrated_task(metadata, paths, true)?;
610 }
611 _ => {}
612 }
613 }
614
615 Ok(())
616 }
617
618 fn replay_session_from_db(
619 &self,
620 session_id: &str,
621 ) -> Option<Result<Vec<PersistedTask>, String>> {
622 let pool = self
623 .inner
624 .db_pool
625 .read()
626 .ok()
627 .and_then(|slot| slot.clone())?;
628 let harness = self
629 .inner
630 .db_harness
631 .read()
632 .ok()
633 .and_then(|slot| slot.clone())?;
634 let conn = match pool.lock() {
635 Ok(conn) => conn,
636 Err(_) => return Some(Err("db mutex poisoned".to_string())),
637 };
638 Some(
639 crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
640 .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
641 .map_err(|error| error.to_string()),
642 )
643 }
644
645 fn replay_session_from_disk(
646 &self,
647 storage_dir: &Path,
648 session_id: &str,
649 ) -> Result<Vec<PersistedTask>, String> {
650 let dir = session_tasks_dir(storage_dir, session_id);
651 if !dir.exists() {
652 return Ok(Vec::new());
653 }
654
655 let entries = fs::read_dir(&dir)
656 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
657 let mut tasks = Vec::new();
658 for entry in entries.flatten() {
659 let path = entry.path();
660 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
661 continue;
662 }
663 match read_task(&path) {
664 Ok(metadata) => tasks.push(metadata),
665 Err(error) => {
666 crate::slog_warn!(
667 "quarantining invalid background task metadata {} during replay: {error}",
668 path.display()
669 );
670 if let Err(quarantine_error) =
671 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
672 {
673 crate::slog_warn!(
674 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
675 path.display()
676 );
677 }
678 }
679 }
680 }
681 Ok(tasks)
682 }
683
684 pub fn status(
685 &self,
686 task_id: &str,
687 session_id: &str,
688 project_root: Option<&Path>,
689 storage_dir: Option<&Path>,
690 preview_bytes: usize,
691 ) -> Option<BgTaskSnapshot> {
692 let mut task = self.task_for_session(task_id, session_id);
693 if task.is_none() {
694 if let Some(storage_dir) = storage_dir {
695 let _ = self.replay_session(storage_dir, session_id);
696 task = self.task_for_session(task_id, session_id);
697 }
698 }
699 let Some(task) = task else {
700 return self.status_relaxed(
701 task_id,
702 session_id,
703 project_root?,
704 storage_dir?,
705 preview_bytes,
706 );
707 };
708 let _ = self.poll_task(&task);
709 let mut snapshot = task.snapshot(preview_bytes);
710 self.maybe_compress_snapshot(&task, &mut snapshot);
711 Some(snapshot)
712 }
713
714 fn status_relaxed_task(
715 &self,
716 task_id: &str,
717 project_root: &Path,
718 storage_dir: &Path,
719 ) -> Option<Arc<BgTask>> {
720 let canonical_project = canonicalized_path(project_root);
721 match self.lookup_relaxed_task_from_db(task_id, project_root) {
722 Some(Ok(Some(metadata))) => {
723 if let Some(task) = self.task(task_id) {
724 let matches_project = task
725 .state
726 .lock()
727 .map(|state| {
728 state
729 .metadata
730 .project_root
731 .as_deref()
732 .map(canonicalized_path)
733 .as_deref()
734 == Some(canonical_project.as_path())
735 })
736 .unwrap_or(false);
737 return matches_project.then_some(task);
738 }
739 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
740 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
741 return None;
742 }
743 return self.task(task_id);
744 }
745 Some(Ok(None)) => {
746 crate::slog_info!(
747 "bash task relaxed DB miss for {}; falling back to disk",
748 task_id
749 );
750 }
751 Some(Err(error)) => {
752 crate::slog_warn!(
753 "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
754 task_id,
755 error
756 );
757 }
758 None => {
759 crate::slog_info!(
760 "bash task relaxed DB unavailable for {}; falling back to disk",
761 task_id
762 );
763 }
764 }
765 let root = storage_dir.join("bash-tasks");
766 let entries = fs::read_dir(&root).ok()?;
767 for entry in entries.flatten() {
768 let dir = entry.path();
769 if !dir.is_dir() {
770 continue;
771 }
772 let path = dir.join(format!("{task_id}.json"));
773 if !path.exists() {
774 continue;
775 }
776 let metadata = match read_task(&path) {
777 Ok(metadata) => metadata,
778 Err(error) => {
779 crate::slog_warn!(
780 "quarantining invalid background task metadata {} during relaxed lookup: {error}",
781 path.display()
782 );
783 if let Err(quarantine_error) =
784 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
785 {
786 crate::slog_warn!(
787 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
788 path.display()
789 );
790 }
791 continue;
792 }
793 };
794 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
795 if metadata_project.as_deref() != Some(canonical_project.as_path()) {
796 continue;
797 }
798 if let Some(task) = self.task(task_id) {
799 let matches_project = task
800 .state
801 .lock()
802 .map(|state| {
803 state
804 .metadata
805 .project_root
806 .as_deref()
807 .map(canonicalized_path)
808 .as_deref()
809 == Some(canonical_project.as_path())
810 })
811 .unwrap_or(false);
812 return matches_project.then_some(task);
813 }
814 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
815 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
816 return None;
817 }
818 return self.task(task_id);
819 }
820 None
821 }
822
823 fn lookup_relaxed_task_from_db(
824 &self,
825 task_id: &str,
826 project_root: &Path,
827 ) -> Option<Result<Option<PersistedTask>, String>> {
828 let pool = self
829 .inner
830 .db_pool
831 .read()
832 .ok()
833 .and_then(|slot| slot.clone())?;
834 let harness = self
835 .inner
836 .db_harness
837 .read()
838 .ok()
839 .and_then(|slot| slot.clone())?;
840 let conn = match pool.lock() {
841 Ok(conn) => conn,
842 Err(_) => return Some(Err("db mutex poisoned".to_string())),
843 };
844 let project_key = crate::search_index::project_cache_key(project_root);
845 Some(
846 crate::db::bash_tasks::find_bash_task_for_project(
847 &conn,
848 &harness,
849 &project_key,
850 task_id,
851 )
852 .map(|row| row.map(PersistedTask::from))
853 .map_err(|error| error.to_string()),
854 )
855 }
856
857 pub(super) fn status_relaxed(
858 &self,
859 task_id: &str,
860 _session_id: &str,
861 project_root: &Path,
862 storage_dir: &Path,
863 preview_bytes: usize,
864 ) -> Option<BgTaskSnapshot> {
865 let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
866 let _ = self.poll_task(&task);
867 let mut snapshot = task.snapshot(preview_bytes);
868 self.maybe_compress_snapshot(&task, &mut snapshot);
869 Some(snapshot)
870 }
871
872 pub fn kill_relaxed(
873 &self,
874 task_id: &str,
875 project_root: &Path,
876 storage_dir: &Path,
877 ) -> Result<BgTaskSnapshot, String> {
878 let task = self
879 .status_relaxed_task(task_id, project_root, storage_dir)
880 .ok_or_else(|| format!("background task not found: {task_id}"))?;
881 self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
882 }
883
884 pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
885 #[cfg(test)]
886 self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
887
888 let mut deleted = 0usize;
889
890 let root = storage_dir.join("bash-tasks");
891 if root.exists() {
892 let session_dirs = fs::read_dir(&root).map_err(|e| {
893 format!(
894 "failed to read background task root {}: {e}",
895 root.display()
896 )
897 })?;
898 for session_entry in session_dirs.flatten() {
899 let session_dir = session_entry.path();
900 if !session_dir.is_dir() {
901 continue;
902 }
903 let task_entries = match fs::read_dir(&session_dir) {
904 Ok(entries) => entries,
905 Err(error) => {
906 crate::slog_warn!(
907 "failed to read background task session dir {}: {error}",
908 session_dir.display()
909 );
910 continue;
911 }
912 };
913 for task_entry in task_entries.flatten() {
914 let json_path = task_entry.path();
915 if json_path
916 .extension()
917 .and_then(|extension| extension.to_str())
918 != Some("json")
919 {
920 continue;
921 }
922 if modified_within(&json_path, PERSISTED_GC_GRACE) {
923 continue;
924 }
925 let metadata = match read_task(&json_path) {
926 Ok(metadata) => metadata,
927 Err(error) => {
928 crate::slog_warn!(
929 "quarantining corrupt background task metadata {}: {error}",
930 json_path.display()
931 );
932 quarantine_task_json(
933 storage_dir,
934 &session_dir,
935 &json_path,
936 QuarantineKind::Corrupt,
937 )?;
938 continue;
939 }
940 };
941 if !(metadata.status.is_terminal() && metadata.completion_delivered) {
942 continue;
943 }
944 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
945 match delete_task_bundle(&paths) {
946 Ok(()) => {
947 deleted += 1;
948 log::debug!(
949 "deleted persisted background task bundle {}",
950 metadata.task_id
951 );
952 }
953 Err(error) => {
954 crate::slog_warn!(
955 "failed to delete background task bundle {}: {error}",
956 metadata.task_id
957 );
958 continue;
959 }
960 }
961 }
962 }
963 }
964 gc_quarantine(storage_dir);
965 Ok(deleted)
966 }
967
968 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
969 let tasks = self
970 .inner
971 .tasks
972 .lock()
973 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
974 .unwrap_or_default();
975 tasks
976 .into_iter()
977 .map(|task| {
978 let _ = self.poll_task(&task);
979 let mut snapshot = task.snapshot(preview_bytes);
980 self.maybe_compress_snapshot(&task, &mut snapshot);
981 snapshot
982 })
983 .collect()
984 }
985
986 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
992 if !snapshot.info.status.is_terminal() {
993 return;
994 }
995 let compressed_flag = task
996 .state
997 .lock()
998 .map(|state| state.metadata.compressed)
999 .unwrap_or(true);
1000 if !compressed_flag {
1001 return;
1002 }
1003 let raw = std::mem::take(&mut snapshot.output_preview);
1004 snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
1005 }
1006
1007 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1008 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1009 }
1010
1011 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1012 let task = self
1013 .task_for_session(task_id, session_id)
1014 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1015 let mut state = task
1016 .state
1017 .lock()
1018 .map_err(|_| "background task lock poisoned".to_string())?;
1019 let updated = self
1020 .update_task_metadata(&task.paths, |metadata| {
1021 metadata.notify_on_completion = true;
1022 metadata.completion_delivered = false;
1023 })
1024 .map_err(|e| format!("failed to promote background task: {e}"))?;
1025 state.metadata = updated;
1026 if state.metadata.status.is_terminal() {
1027 state.buffer.enforce_terminal_cap();
1028 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1029 }
1030 Ok(true)
1031 }
1032
1033 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1034 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1035 .map(|_| ())
1036 }
1037
1038 pub fn cleanup_finished(&self, older_than: Duration) {
1039 let cutoff = Instant::now().checked_sub(older_than);
1040 let removable_paths: Vec<(String, TaskPaths)> =
1041 if let Ok(mut tasks) = self.inner.tasks.lock() {
1042 let removable = tasks
1043 .iter()
1044 .filter_map(|(task_id, task)| {
1045 let delivered_terminal = task
1046 .state
1047 .lock()
1048 .map(|state| {
1049 state.metadata.status.is_terminal()
1050 && state.metadata.completion_delivered
1051 })
1052 .unwrap_or(false);
1053 if !delivered_terminal {
1054 return None;
1055 }
1056
1057 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1058 let expired = match (terminal_at, cutoff) {
1059 (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1060 (Some(_), None) => true,
1061 (None, _) => false,
1062 };
1063 expired.then(|| task_id.clone())
1064 })
1065 .collect::<Vec<_>>();
1066
1067 removable
1068 .into_iter()
1069 .filter_map(|task_id| {
1070 tasks
1071 .remove(&task_id)
1072 .map(|task| (task_id, task.paths.clone()))
1073 })
1074 .collect()
1075 } else {
1076 Vec::new()
1077 };
1078
1079 for (task_id, paths) in removable_paths {
1080 match delete_task_bundle(&paths) {
1081 Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1082 Err(error) => crate::slog_warn!(
1083 "failed to delete persisted background task bundle {task_id}: {error}"
1084 ),
1085 }
1086 }
1087 }
1088
1089 pub fn drain_completions(&self) -> Vec<BgCompletion> {
1090 self.drain_completions_for_session(None)
1091 }
1092
1093 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1094 let completions = match self.inner.completions.lock() {
1095 Ok(completions) => completions,
1096 Err(_) => return Vec::new(),
1097 };
1098
1099 completions
1100 .iter()
1101 .filter(|completion| {
1102 session_id
1103 .map(|session_id| completion.session_id == session_id)
1104 .unwrap_or(true)
1105 })
1106 .cloned()
1107 .collect()
1108 }
1109
1110 pub fn ack_completions_for_session(
1111 &self,
1112 session_id: Option<&str>,
1113 task_ids: &[String],
1114 ) -> Vec<String> {
1115 if task_ids.is_empty() {
1116 return Vec::new();
1117 }
1118 let task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1119 let mut completions = match self.inner.completions.lock() {
1120 Ok(completions) => completions,
1121 Err(_) => return Vec::new(),
1122 };
1123 let mut acked = Vec::new();
1124 completions.retain(|completion| {
1125 let session_matches = session_id
1126 .map(|session_id| completion.session_id == session_id)
1127 .unwrap_or(true);
1128 if session_matches && task_ids.contains(completion.task_id.as_str()) {
1129 acked.push((completion.task_id.clone(), completion.session_id.clone()));
1130 false
1131 } else {
1132 true
1133 }
1134 });
1135 drop(completions);
1136
1137 let mut delivered = Vec::new();
1138 for (task_id, completion_session_id) in acked {
1139 if let Some(task) = self.task_for_session(&task_id, &completion_session_id) {
1140 if task.set_completion_delivered(true, self).is_ok() {
1141 delivered.push(task_id);
1142 }
1143 }
1144 }
1145
1146 delivered
1147 }
1148
1149 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1150 self.inner
1151 .completions
1152 .lock()
1153 .map(|completions| {
1154 completions
1155 .iter()
1156 .filter(|completion| completion.session_id == session_id)
1157 .cloned()
1158 .collect()
1159 })
1160 .unwrap_or_default()
1161 }
1162
1163 pub fn detach(&self) {
1164 self.inner.shutdown.store(true, Ordering::SeqCst);
1165 if let Ok(mut tasks) = self.inner.tasks.lock() {
1166 for task in tasks.values() {
1167 if let Ok(mut state) = task.state.lock() {
1168 state.child = None;
1169 state.detached = true;
1170 }
1171 }
1172 tasks.clear();
1173 }
1174 }
1175
1176 pub fn shutdown(&self) {
1177 let tasks = self
1178 .inner
1179 .tasks
1180 .lock()
1181 .map(|tasks| {
1182 tasks
1183 .values()
1184 .map(|task| (task.task_id.clone(), task.session_id.clone()))
1185 .collect::<Vec<_>>()
1186 })
1187 .unwrap_or_default();
1188 for (task_id, session_id) in tasks {
1189 let _ = self.kill(&task_id, &session_id);
1190 }
1191 }
1192
1193 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1194 let marker = match read_exit_marker(&task.paths.exit) {
1195 Ok(Some(marker)) => marker,
1196 Ok(None) => return Ok(()),
1197 Err(error) => return Err(format!("failed to read exit marker: {error}")),
1198 };
1199 self.finalize_from_marker(task, marker, None)
1200 }
1201
1202 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1203 let Ok(mut state) = task.state.lock() else {
1204 return;
1205 };
1206 if let Some(child) = state.child.as_mut() {
1207 if matches!(child.try_wait(), Ok(Some(_))) {
1208 state.child = None;
1225 state.detached = true;
1226 self.fail_without_exit_marker_if_needed(task, &mut state);
1227 }
1228 } else if state.detached
1229 && state
1230 .metadata
1231 .child_pid
1232 .is_some_and(|pid| !is_process_alive(pid))
1233 {
1234 self.fail_without_exit_marker_if_needed(task, &mut state);
1235 }
1236 }
1237
1238 fn fail_without_exit_marker_if_needed(&self, task: &Arc<BgTask>, state: &mut BgTaskState) {
1239 if state.metadata.status.is_terminal() {
1240 return;
1241 }
1242 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1243 return;
1244 }
1245 let updated = self.update_task_metadata(&task.paths, |metadata| {
1246 metadata.mark_terminal(
1247 BgTaskStatus::Failed,
1248 None,
1249 Some("process exited without exit marker".to_string()),
1250 );
1251 });
1252 if let Ok(metadata) = updated {
1253 state.metadata = metadata;
1254 task.mark_terminal_now();
1255 state.buffer.enforce_terminal_cap();
1256 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1257 }
1258 }
1259
1260 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1261 self.inner
1262 .tasks
1263 .lock()
1264 .map(|tasks| {
1265 tasks
1266 .values()
1267 .filter(|task| task.is_running())
1268 .cloned()
1269 .collect()
1270 })
1271 .unwrap_or_default()
1272 }
1273
1274 fn insert_rehydrated_task(
1275 &self,
1276 metadata: PersistedTask,
1277 paths: TaskPaths,
1278 detached: bool,
1279 ) -> Result<(), String> {
1280 let task_id = metadata.task_id.clone();
1281 let session_id = metadata.session_id.clone();
1282 let started = started_instant_from_unix_millis(metadata.started_at);
1283 let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1284 let task = Arc::new(BgTask {
1285 task_id: task_id.clone(),
1286 session_id,
1287 paths: paths.clone(),
1288 started,
1289 last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1290 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1291 state: Mutex::new(BgTaskState {
1292 metadata,
1293 child: None,
1294 detached,
1295 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
1296 }),
1297 });
1298 self.inner
1299 .tasks
1300 .lock()
1301 .map_err(|_| "background task registry lock poisoned".to_string())?
1302 .insert(task_id, task);
1303 Ok(())
1304 }
1305
1306 fn kill_with_status(
1307 &self,
1308 task_id: &str,
1309 session_id: &str,
1310 terminal_status: BgTaskStatus,
1311 ) -> Result<BgTaskSnapshot, String> {
1312 let task = self
1313 .task_for_session(task_id, session_id)
1314 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1315
1316 {
1317 let mut state = task
1318 .state
1319 .lock()
1320 .map_err(|_| "background task lock poisoned".to_string())?;
1321 if state.metadata.status.is_terminal() {
1322 return Ok(task.snapshot_locked(&state, 5 * 1024));
1323 }
1324
1325 if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1326 state.metadata =
1327 terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1328 task.mark_terminal_now();
1329 state.child = None;
1330 state.detached = true;
1331 state.buffer.enforce_terminal_cap();
1332 self.persist_task(&task.paths, &state.metadata)
1333 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1334 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1335 return Ok(task.snapshot_locked(&state, 5 * 1024));
1336 }
1337
1338 state.metadata.status = BgTaskStatus::Killing;
1339 self.persist_task(&task.paths, &state.metadata)
1340 .map_err(|e| format!("failed to persist killing state: {e}"))?;
1341
1342 #[cfg(unix)]
1343 if let Some(pgid) = state.metadata.pgid {
1344 terminate_pgid(pgid, state.child.as_mut());
1345 }
1346 #[cfg(windows)]
1347 if let Some(child) = state.child.as_mut() {
1348 super::process::terminate_process(child);
1349 } else if let Some(pid) = state.metadata.child_pid {
1350 terminate_pid(pid);
1351 }
1352 if let Some(child) = state.child.as_mut() {
1353 let _ = child.wait();
1354 }
1355 state.child = None;
1356 state.detached = true;
1357
1358 if !task.paths.exit.exists() {
1359 write_kill_marker_if_absent(&task.paths.exit)
1360 .map_err(|e| format!("failed to write kill marker: {e}"))?;
1361 }
1362
1363 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1364 Some(124)
1365 } else {
1366 None
1367 };
1368 state
1369 .metadata
1370 .mark_terminal(terminal_status, exit_code, None);
1371 task.mark_terminal_now();
1372 self.persist_task(&task.paths, &state.metadata)
1373 .map_err(|e| format!("failed to persist killed state: {e}"))?;
1374 state.buffer.enforce_terminal_cap();
1375 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1376 }
1377
1378 Ok(task.snapshot(5 * 1024))
1379 }
1380
1381 fn finalize_from_marker(
1382 &self,
1383 task: &Arc<BgTask>,
1384 marker: ExitMarker,
1385 reason: Option<String>,
1386 ) -> Result<(), String> {
1387 let mut state = task
1388 .state
1389 .lock()
1390 .map_err(|_| "background task lock poisoned".to_string())?;
1391 if state.metadata.status.is_terminal() {
1392 return Ok(());
1393 }
1394
1395 let updated = self
1396 .update_task_metadata(&task.paths, |metadata| {
1397 let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
1398 *metadata = new_metadata;
1399 })
1400 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1401 state.metadata = updated;
1402 task.mark_terminal_now();
1403 state.child = None;
1404 state.detached = true;
1405 state.buffer.enforce_terminal_cap();
1406 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1407 Ok(())
1408 }
1409
1410 fn enqueue_completion_if_needed(
1411 &self,
1412 metadata: &PersistedTask,
1413 paths: Option<&TaskPaths>,
1414 emit_frame: bool,
1415 ) {
1416 if metadata.status.is_terminal() && !metadata.completion_delivered {
1417 self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
1418 }
1419 }
1420
1421 fn enqueue_completion_locked(
1422 &self,
1423 metadata: &PersistedTask,
1424 buffer: Option<&BgBuffer>,
1425 emit_frame: bool,
1426 ) {
1427 self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
1428 }
1429
1430 fn enqueue_completion_from_parts(
1431 &self,
1432 metadata: &PersistedTask,
1433 buffer: Option<&BgBuffer>,
1434 paths: Option<&TaskPaths>,
1435 emit_frame: bool,
1436 ) {
1437 if !metadata.status.is_terminal() {
1448 return;
1449 }
1450 let (raw_preview, output_truncated) = match buffer {
1455 Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
1456 None => paths
1457 .map(|paths| read_tail_from_disk(paths, BG_COMPLETION_PREVIEW_BYTES))
1458 .unwrap_or_else(|| (String::new(), false)),
1459 };
1460 let output_preview = if metadata.compressed {
1465 self.compress_output(&metadata.command, raw_preview)
1466 } else {
1467 raw_preview
1468 };
1469 let token_counts = self.completion_token_counts(metadata, buffer, paths);
1470 let completion = BgCompletion {
1471 task_id: metadata.task_id.clone(),
1472 session_id: metadata.session_id.clone(),
1473 status: metadata.status.clone(),
1474 exit_code: metadata.exit_code,
1475 command: metadata.command.clone(),
1476 output_preview,
1477 output_truncated,
1478 original_tokens: token_counts.original_tokens,
1479 compressed_tokens: token_counts.compressed_tokens,
1480 tokens_skipped: token_counts.tokens_skipped,
1481 };
1482
1483 self.record_compression_event_if_applicable(metadata, &token_counts);
1494
1495 if metadata.completion_delivered {
1505 return;
1506 }
1507
1508 let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
1511 if completions
1512 .iter()
1513 .any(|existing| existing.task_id == metadata.task_id)
1514 {
1515 false
1516 } else {
1517 completions.push_back(completion.clone());
1518 true
1519 }
1520 } else {
1521 false
1522 };
1523
1524 if pushed && emit_frame {
1525 self.emit_bash_completed(completion);
1526 }
1527 }
1528
1529 fn record_compression_event_if_applicable(
1530 &self,
1531 metadata: &PersistedTask,
1532 token_counts: &CompletionTokenCounts,
1533 ) {
1534 let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
1535 token_counts.original_tokens,
1536 token_counts.compressed_tokens,
1537 token_counts.original_bytes,
1538 token_counts.compressed_bytes,
1539 ) {
1540 (
1541 Some(original_tokens),
1542 Some(compressed_tokens),
1543 Some(original_bytes),
1544 Some(compressed_bytes),
1545 ) => (
1546 original_tokens,
1547 compressed_tokens,
1548 original_bytes,
1549 compressed_bytes,
1550 ),
1551 _ => {
1552 crate::slog_warn!(
1553 "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
1554 metadata.task_id
1555 );
1556 return;
1557 }
1558 };
1559
1560 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
1561 let Some(pool) = pool else {
1562 crate::slog_warn!(
1563 "compression event skipped for {}: db_pool not initialized — was configure run?",
1564 metadata.task_id
1565 );
1566 return;
1567 };
1568 let harness = self
1569 .inner
1570 .db_harness
1571 .read()
1572 .ok()
1573 .and_then(|slot| slot.clone());
1574 let Some(harness) = harness else {
1575 crate::slog_warn!(
1576 "compression event insert skipped for {}: harness not configured",
1577 metadata.task_id
1578 );
1579 return;
1580 };
1581
1582 let project_root = metadata
1583 .project_root
1584 .as_deref()
1585 .unwrap_or(&metadata.workdir);
1586 let project_key = crate::search_index::project_cache_key(project_root);
1587 let row = crate::db::compression_events::CompressionEventRow {
1588 harness: &harness,
1589 session_id: Some(&metadata.session_id),
1590 project_key: &project_key,
1591 tool: "bash",
1592 task_id: Some(&metadata.task_id),
1593 command: Some(&metadata.command),
1594 compressor: if metadata.compressed {
1595 "registry"
1596 } else {
1597 "none"
1598 },
1599 original_bytes,
1600 compressed_bytes,
1601 original_tokens,
1602 compressed_tokens,
1603 created_at: unix_millis() as i64,
1604 };
1605
1606 let conn = match pool.lock() {
1607 Ok(conn) => conn,
1608 Err(_) => {
1609 crate::slog_warn!(
1610 "compression event insert failed for {}: db mutex poisoned",
1611 metadata.task_id
1612 );
1613 return;
1614 }
1615 };
1616 match crate::db::compression_events::insert_compression_event(&conn, &row) {
1617 Ok(_) => {
1618 crate::slog_info!(
1619 "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
1620 metadata.task_id,
1621 project_key,
1622 metadata.session_id,
1623 original_tokens,
1624 compressed_tokens
1625 );
1626 }
1627 Err(error) => {
1628 crate::slog_warn!(
1629 "compression event insert failed for {}: {}",
1630 metadata.task_id,
1631 error
1632 );
1633 }
1634 }
1635 }
1636
1637 fn emit_bash_completed(&self, completion: BgCompletion) {
1638 let Ok(progress_sender) = self
1639 .inner
1640 .progress_sender
1641 .lock()
1642 .map(|sender| sender.clone())
1643 else {
1644 return;
1645 };
1646 let Some(sender) = progress_sender.as_ref() else {
1647 return;
1648 };
1649 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
1657 completion.task_id,
1658 completion.session_id,
1659 completion.status,
1660 completion.exit_code,
1661 completion.command,
1662 completion.output_preview,
1663 completion.output_truncated,
1664 completion.original_tokens,
1665 completion.compressed_tokens,
1666 completion.tokens_skipped,
1667 )));
1668 }
1669
1670 fn completion_token_counts(
1671 &self,
1672 metadata: &PersistedTask,
1673 buffer: Option<&BgBuffer>,
1674 paths: Option<&TaskPaths>,
1675 ) -> CompletionTokenCounts {
1676 let raw = match buffer {
1677 Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
1678 None => paths
1679 .map(|paths| read_for_token_count_from_disk(paths, TOKENIZE_CAP_BYTES_PER_STREAM))
1680 .unwrap_or(TokenCountInput::Skipped),
1681 };
1682
1683 let TokenCountInput::Text(raw_output) = raw else {
1684 return CompletionTokenCounts::skipped();
1685 };
1686
1687 let original_tokens = token_count_u32(&raw_output);
1688 let original_bytes = raw_output.len() as i64;
1689 let compressed_output = if metadata.compressed {
1690 self.compress_output(&metadata.command, raw_output)
1691 } else {
1692 raw_output
1693 };
1694 let compressed_tokens = token_count_u32(&compressed_output);
1695 let compressed_bytes = compressed_output.len() as i64;
1696 CompletionTokenCounts {
1697 original_tokens: Some(original_tokens),
1698 compressed_tokens: Some(compressed_tokens),
1699 original_bytes: Some(original_bytes),
1700 compressed_bytes: Some(compressed_bytes),
1701 tokens_skipped: false,
1702 }
1703 }
1704
1705 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
1706 if !self
1707 .inner
1708 .long_running_reminder_enabled
1709 .load(Ordering::SeqCst)
1710 {
1711 return;
1712 }
1713 let interval_ms = self
1714 .inner
1715 .long_running_reminder_interval_ms
1716 .load(Ordering::SeqCst);
1717 if interval_ms == 0 {
1718 return;
1719 }
1720 let interval = Duration::from_millis(interval_ms);
1721 let now = Instant::now();
1722 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
1723 return;
1724 };
1725 let since = last_reminder_at.unwrap_or(task.started);
1726 if now.duration_since(since) < interval {
1727 return;
1728 }
1729 let command = task
1730 .state
1731 .lock()
1732 .map(|state| state.metadata.command.clone())
1733 .unwrap_or_default();
1734 *last_reminder_at = Some(now);
1735 self.emit_bash_long_running(BashLongRunningFrame::new(
1736 task.task_id.clone(),
1737 task.session_id.clone(),
1738 command,
1739 task.started.elapsed().as_millis() as u64,
1740 ));
1741 }
1742
1743 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
1744 let Ok(progress_sender) = self
1745 .inner
1746 .progress_sender
1747 .lock()
1748 .map(|sender| sender.clone())
1749 else {
1750 return;
1751 };
1752 if let Some(sender) = progress_sender.as_ref() {
1753 sender(PushFrame::BashLongRunning(frame));
1754 }
1755 }
1756
1757 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
1758 self.inner
1759 .tasks
1760 .lock()
1761 .ok()
1762 .and_then(|tasks| tasks.get(task_id).cloned())
1763 }
1764
1765 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
1766 self.task(task_id)
1767 .filter(|task| task.session_id == session_id)
1768 }
1769
1770 fn running_count(&self) -> usize {
1771 self.inner
1772 .tasks
1773 .lock()
1774 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
1775 .unwrap_or(0)
1776 }
1777
1778 fn start_watchdog(&self) {
1779 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
1780 super::watchdog::start(self.clone());
1781 }
1782 }
1783
1784 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
1785 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
1786 }
1787
1788 #[cfg(test)]
1789 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1790 self.task_for_session(task_id, session_id)
1791 .map(|task| task.paths.json.clone())
1792 }
1793
1794 #[cfg(test)]
1795 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1796 self.task_for_session(task_id, session_id)
1797 .map(|task| task.paths.exit.clone())
1798 }
1799
1800 fn generate_unique_task_id(&self) -> Result<String, String> {
1802 for _ in 0..32 {
1803 let candidate = random_slug();
1804 let tasks = self
1805 .inner
1806 .tasks
1807 .lock()
1808 .map_err(|_| "background task registry lock poisoned".to_string())?;
1809 if tasks.contains_key(&candidate) {
1810 continue;
1811 }
1812 let completions = self
1813 .inner
1814 .completions
1815 .lock()
1816 .map_err(|_| "background completions lock poisoned".to_string())?;
1817 if completions
1818 .iter()
1819 .any(|completion| completion.task_id == candidate)
1820 {
1821 continue;
1822 }
1823 return Ok(candidate);
1824 }
1825 Err("failed to allocate unique background task id after 32 attempts".to_string())
1826 }
1827}
1828
1829struct CompletionTokenCounts {
1830 original_tokens: Option<u32>,
1831 compressed_tokens: Option<u32>,
1832 original_bytes: Option<i64>,
1833 compressed_bytes: Option<i64>,
1834 tokens_skipped: bool,
1835}
1836
1837impl CompletionTokenCounts {
1838 fn skipped() -> Self {
1839 Self {
1840 original_tokens: None,
1841 compressed_tokens: None,
1842 original_bytes: None,
1843 compressed_bytes: None,
1844 tokens_skipped: true,
1845 }
1846 }
1847}
1848
1849fn token_count_u32(text: &str) -> u32 {
1850 aft_tokenizer::count_tokens(text)
1851 .try_into()
1852 .unwrap_or(u32::MAX)
1853}
1854
1855impl Default for BgTaskRegistry {
1856 fn default() -> Self {
1857 Self::new(Arc::new(Mutex::new(None)))
1858 }
1859}
1860
1861fn modified_within(path: &Path, grace: Duration) -> bool {
1862 fs::metadata(path)
1863 .and_then(|metadata| metadata.modified())
1864 .ok()
1865 .and_then(|modified| SystemTime::now().duration_since(modified).ok())
1866 .map(|age| age < grace)
1867 .unwrap_or(false)
1868}
1869
1870fn canonicalized_path(path: &Path) -> PathBuf {
1871 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1872}
1873
1874fn started_instant_from_unix_millis(started_at: u64) -> Instant {
1875 let now_ms = SystemTime::now()
1876 .duration_since(UNIX_EPOCH)
1877 .ok()
1878 .map(|duration| duration.as_millis() as u64)
1879 .unwrap_or(started_at);
1880 let elapsed_ms = now_ms.saturating_sub(started_at);
1881 Instant::now()
1882 .checked_sub(Duration::from_millis(elapsed_ms))
1883 .unwrap_or_else(Instant::now)
1884}
1885
1886fn gc_quarantine(storage_dir: &Path) {
1887 let quarantine_root = storage_dir.join("bash-tasks-quarantine");
1888 let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
1889 return;
1890 };
1891 for session_entry in session_dirs.flatten() {
1892 let session_quarantine_dir = session_entry.path();
1893 if !session_quarantine_dir.is_dir() {
1894 continue;
1895 }
1896 let entries = match fs::read_dir(&session_quarantine_dir) {
1897 Ok(entries) => entries,
1898 Err(error) => {
1899 crate::slog_warn!(
1900 "failed to read background task quarantine dir {}: {error}",
1901 session_quarantine_dir.display()
1902 );
1903 continue;
1904 }
1905 };
1906 for entry in entries.flatten() {
1907 let path = entry.path();
1908 if modified_within(&path, QUARANTINE_GC_GRACE) {
1909 continue;
1910 }
1911 let result = if path.is_dir() {
1912 fs::remove_dir_all(&path)
1913 } else {
1914 fs::remove_file(&path)
1915 };
1916 match result {
1917 Ok(()) => log::debug!(
1918 "deleted old background task quarantine entry {}",
1919 path.display()
1920 ),
1921 Err(error) => crate::slog_warn!(
1922 "failed to delete old background task quarantine entry {}: {error}",
1923 path.display()
1924 ),
1925 }
1926 }
1927 let _ = fs::remove_dir(&session_quarantine_dir);
1928 }
1929 let _ = fs::remove_dir(&quarantine_root);
1930}
1931
1932enum QuarantineKind {
1933 Corrupt,
1934 Invalid,
1935}
1936
1937fn quarantine_task_json(
1938 storage_dir: &Path,
1939 session_dir: &Path,
1940 json_path: &Path,
1941 kind: QuarantineKind,
1942) -> Result<(), String> {
1943 let session_hash = session_dir
1944 .file_name()
1945 .and_then(|name| name.to_str())
1946 .ok_or_else(|| {
1947 format!(
1948 "invalid background task session dir: {}",
1949 session_dir.display()
1950 )
1951 })?;
1952 let task_name = json_path
1953 .file_name()
1954 .and_then(|name| name.to_str())
1955 .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
1956 let unix_ts = SystemTime::now()
1957 .duration_since(UNIX_EPOCH)
1958 .map(|duration| duration.as_secs())
1959 .unwrap_or(0);
1960 let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
1961 fs::create_dir_all(&quarantine_dir).map_err(|e| {
1962 format!(
1963 "failed to create background task quarantine dir {}: {e}",
1964 quarantine_dir.display()
1965 )
1966 })?;
1967 let target_name = quarantine_name(task_name, unix_ts, &kind);
1968 let target = quarantine_dir.join(target_name);
1969 fs::rename(json_path, &target).map_err(|e| {
1970 format!(
1971 "failed to quarantine background task metadata {} to {}: {e}",
1972 json_path.display(),
1973 target.display()
1974 )
1975 })?;
1976
1977 for sibling in task_sibling_paths(json_path) {
1978 if !sibling.exists() {
1979 continue;
1980 }
1981 let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
1982 crate::slog_warn!(
1983 "skipping background task sibling with invalid name during quarantine: {}",
1984 sibling.display()
1985 );
1986 continue;
1987 };
1988 let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
1989 if let Err(error) = fs::rename(&sibling, &sibling_target) {
1990 crate::slog_warn!(
1991 "failed to quarantine background task sibling {} to {}: {error}",
1992 sibling.display(),
1993 sibling_target.display()
1994 );
1995 }
1996 }
1997
1998 let _ = fs::remove_dir(session_dir);
1999 Ok(())
2000}
2001
2002fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
2003 match kind {
2004 QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
2005 QuarantineKind::Invalid => {
2006 let path = Path::new(file_name);
2007 let stem = path.file_stem().and_then(|stem| stem.to_str());
2008 let extension = path.extension().and_then(|extension| extension.to_str());
2009 match (stem, extension) {
2010 (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
2011 _ => format!("{file_name}.invalid.{unix_ts}"),
2012 }
2013 }
2014 }
2015}
2016
2017fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
2018 let Some(parent) = json_path.parent() else {
2019 return Vec::new();
2020 };
2021 let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
2022 return Vec::new();
2023 };
2024 ["stdout", "stderr", "exit", "ps1", "bat", "sh"]
2025 .into_iter()
2026 .map(|extension| parent.join(format!("{stem}.{extension}")))
2027 .collect()
2028}
2029
2030fn read_tail_from_disk(paths: &TaskPaths, max_bytes: usize) -> (String, bool) {
2031 let stdout = fs::read(&paths.stdout).unwrap_or_default();
2032 let stderr = fs::read(&paths.stderr).unwrap_or_default();
2033 let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
2034 bytes.extend_from_slice(&stdout);
2035 bytes.extend_from_slice(&stderr);
2036 if bytes.len() <= max_bytes {
2037 return (String::from_utf8_lossy(&bytes).into_owned(), false);
2038 }
2039 let start = bytes.len().saturating_sub(max_bytes);
2040 (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
2041}
2042
2043fn read_for_token_count_from_disk(
2044 paths: &TaskPaths,
2045 max_bytes_per_stream: usize,
2046) -> TokenCountInput {
2047 let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
2054 let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
2055 match (stdout, stderr) {
2056 (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2057 String::from_utf8_lossy(&stdout).as_ref(),
2058 String::from_utf8_lossy(&stderr).as_ref(),
2059 )),
2060 (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
2061 String::from_utf8_lossy(&stdout).as_ref(),
2062 "",
2063 )),
2064 (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2065 "",
2066 String::from_utf8_lossy(&stderr).as_ref(),
2067 )),
2068 (Err(_), Err(_)) => TokenCountInput::Skipped,
2069 }
2070}
2071
2072fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
2077 use std::io::{Read, Seek, SeekFrom};
2078 let mut file = std::fs::File::open(path)?;
2079 let len = file.metadata()?.len();
2080 let read_len = len.min(max_bytes as u64);
2081 if read_len > 0 && len > max_bytes as u64 {
2082 file.seek(SeekFrom::End(-(read_len as i64)))?;
2083 }
2084 let mut bytes = Vec::with_capacity(read_len as usize);
2085 file.read_to_end(&mut bytes)?;
2086 Ok(bytes)
2087}
2088
2089impl BgTask {
2090 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
2091 let state = self
2092 .state
2093 .lock()
2094 .unwrap_or_else(|poison| poison.into_inner());
2095 self.snapshot_locked(&state, preview_bytes)
2096 }
2097
2098 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
2099 let metadata = &state.metadata;
2100 let duration_ms = metadata.duration_ms.or_else(|| {
2101 metadata
2102 .status
2103 .is_terminal()
2104 .then(|| self.started.elapsed().as_millis() as u64)
2105 });
2106 let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
2107 BgTaskSnapshot {
2108 info: BgTaskInfo {
2109 task_id: self.task_id.clone(),
2110 status: metadata.status.clone(),
2111 command: metadata.command.clone(),
2112 started_at: metadata.started_at,
2113 duration_ms,
2114 },
2115 exit_code: metadata.exit_code,
2116 child_pid: metadata.child_pid,
2117 workdir: metadata.workdir.display().to_string(),
2118 output_preview,
2119 output_truncated,
2120 output_path: state
2121 .buffer
2122 .output_path()
2123 .map(|path| path.display().to_string()),
2124 stderr_path: Some(state.buffer.stderr_path().display().to_string()),
2125 }
2126 }
2127
2128 pub(crate) fn is_running(&self) -> bool {
2129 self.state
2130 .lock()
2131 .map(|state| state.metadata.status == BgTaskStatus::Running)
2132 .unwrap_or(false)
2133 }
2134
2135 fn mark_terminal_now(&self) {
2136 if let Ok(mut terminal_at) = self.terminal_at.lock() {
2137 if terminal_at.is_none() {
2138 *terminal_at = Some(Instant::now());
2139 }
2140 }
2141 }
2142
2143 fn set_completion_delivered(
2144 &self,
2145 delivered: bool,
2146 registry: &BgTaskRegistry,
2147 ) -> Result<(), String> {
2148 let mut state = self
2149 .state
2150 .lock()
2151 .map_err(|_| "background task lock poisoned".to_string())?;
2152 let updated = registry
2153 .update_task_metadata(&self.paths, |metadata| {
2154 metadata.completion_delivered = delivered;
2155 })
2156 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
2157 state.metadata = updated;
2158 Ok(())
2159 }
2160}
2161
2162fn terminal_metadata_from_marker(
2163 mut metadata: PersistedTask,
2164 marker: ExitMarker,
2165 reason: Option<String>,
2166) -> PersistedTask {
2167 match marker {
2168 ExitMarker::Code(code) => {
2169 let status = if code == 0 {
2170 BgTaskStatus::Completed
2171 } else {
2172 BgTaskStatus::Failed
2173 };
2174 metadata.mark_terminal(status, Some(code), reason);
2175 }
2176 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
2177 }
2178 metadata
2179}
2180
2181#[cfg(unix)]
2182fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
2183 let shell = resolve_posix_shell();
2184 let mut cmd = Command::new(&shell);
2185 cmd.arg("-c")
2186 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
2187 .arg(&shell)
2188 .arg(command)
2189 .arg(exit_path);
2190 unsafe {
2191 cmd.pre_exec(|| {
2192 if libc::setsid() == -1 {
2193 return Err(std::io::Error::last_os_error());
2194 }
2195 Ok(())
2196 });
2197 }
2198 cmd
2199}
2200
2201#[cfg(unix)]
2202fn resolve_posix_shell() -> PathBuf {
2203 static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
2204 POSIX_SHELL
2205 .get_or_init(|| {
2206 std::env::var_os("BASH")
2207 .filter(|value| !value.is_empty())
2208 .map(PathBuf::from)
2209 .filter(|path| path.exists())
2210 .or_else(|| which::which("bash").ok())
2211 .or_else(|| which::which("zsh").ok())
2212 .unwrap_or_else(|| PathBuf::from("/bin/sh"))
2213 })
2214 .clone()
2215}
2216
2217#[cfg(windows)]
2218fn detached_shell_command_for(
2219 shell: crate::windows_shell::WindowsShell,
2220 command: &str,
2221 exit_path: &Path,
2222 paths: &TaskPaths,
2223 creation_flags: u32,
2224) -> Result<Command, String> {
2225 use crate::windows_shell::WindowsShell;
2226 let wrapper_body = shell.wrapper_script(command, exit_path);
2239 let wrapper_ext = match shell {
2240 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
2241 WindowsShell::Cmd => "bat",
2242 WindowsShell::Posix(_) => "sh",
2246 };
2247 let wrapper_path = paths.dir.join(format!(
2248 "{}.{}",
2249 paths
2250 .json
2251 .file_stem()
2252 .and_then(|s| s.to_str())
2253 .unwrap_or("wrapper"),
2254 wrapper_ext
2255 ));
2256 fs::write(&wrapper_path, wrapper_body)
2257 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
2258
2259 let mut cmd = Command::new(shell.binary().as_ref());
2260 match shell {
2261 WindowsShell::Pwsh | WindowsShell::Powershell => {
2262 cmd.args([
2265 "-NoLogo",
2266 "-NoProfile",
2267 "-NonInteractive",
2268 "-ExecutionPolicy",
2269 "Bypass",
2270 "-File",
2271 ]);
2272 cmd.arg(&wrapper_path);
2273 }
2274 WindowsShell::Cmd => {
2275 cmd.args(["/D", "/C"]);
2282 cmd.arg(&wrapper_path);
2283 }
2284 WindowsShell::Posix(_) => {
2285 cmd.arg(&wrapper_path);
2290 }
2291 }
2292
2293 cmd.creation_flags(creation_flags);
2297 Ok(cmd)
2298}
2299
2300fn spawn_detached_child(
2316 command: &str,
2317 paths: &TaskPaths,
2318 workdir: &Path,
2319 env: &HashMap<String, String>,
2320) -> Result<std::process::Child, String> {
2321 #[cfg(not(windows))]
2322 {
2323 let stdout = create_capture_file(&paths.stdout)
2324 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
2325 let stderr = create_capture_file(&paths.stderr)
2326 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
2327 detached_shell_command(command, &paths.exit)
2328 .current_dir(workdir)
2329 .envs(env)
2330 .stdin(Stdio::null())
2331 .stdout(Stdio::from(stdout))
2332 .stderr(Stdio::from(stderr))
2333 .spawn()
2334 .map_err(|e| format!("failed to spawn background bash command: {e}"))
2335 }
2336 #[cfg(windows)]
2337 {
2338 use crate::windows_shell::shell_candidates;
2339 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
2350 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
2371 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
2372 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
2373 let with_breakaway =
2374 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
2375 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
2376 let mut last_error: Option<String> = None;
2377 for (idx, shell) in candidates.iter().enumerate() {
2378 for &flags in &[with_breakaway, without_breakaway] {
2382 let stdout = create_capture_file(&paths.stdout)
2384 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
2385 let stderr = create_capture_file(&paths.stderr)
2386 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
2387 let mut cmd =
2388 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
2389 cmd.current_dir(workdir)
2390 .envs(env)
2391 .stdin(Stdio::null())
2392 .stdout(Stdio::from(stdout))
2393 .stderr(Stdio::from(stderr));
2394 match cmd.spawn() {
2395 Ok(child) => {
2396 if idx > 0 {
2397 crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
2398 the cached PATH probe disagreed with runtime spawn — likely PATH \
2399 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
2400 shell.binary(),
2401 idx);
2402 }
2403 if flags == without_breakaway {
2404 crate::slog_warn!(
2405 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
2406 (likely a restrictive Job Object — CI sandbox or MDM policy). \
2407 Spawned without breakaway; the bg task will be torn down if the \
2408 AFT process group is killed."
2409 );
2410 }
2411 return Ok(child);
2412 }
2413 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
2414 crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
2415 shell.binary());
2416 last_error = Some(format!("{}: {e}", shell.binary()));
2417 break;
2420 }
2421 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
2422 crate::slog_warn!(
2424 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
2425 Access Denied — retrying {} without breakaway",
2426 shell.binary()
2427 );
2428 last_error = Some(format!("{}: {e}", shell.binary()));
2429 continue;
2430 }
2431 Err(e) => {
2432 return Err(format!(
2433 "failed to spawn background bash command via {}: {e}",
2434 shell.binary()
2435 ));
2436 }
2437 }
2438 }
2439 }
2440 Err(format!(
2441 "failed to spawn background bash command: no Windows shell could be spawned. \
2442 Last error: {}. PATH-probed candidates: {:?}",
2443 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
2444 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
2445 ))
2446 }
2447}
2448
2449fn random_slug() -> String {
2450 let mut bytes = [0u8; 4];
2451 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
2453 let t = SystemTime::now()
2455 .duration_since(UNIX_EPOCH)
2456 .map(|d| d.subsec_nanos())
2457 .unwrap_or(0);
2458 let p = std::process::id();
2459 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
2460 });
2461 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
2463 format!("bash-{hex}")
2464}
2465
2466#[cfg(test)]
2467mod tests {
2468 use std::collections::HashMap;
2469 #[cfg(windows)]
2470 use std::fs;
2471 use std::sync::{Arc, Mutex};
2472 use std::time::Duration;
2473 #[cfg(windows)]
2474 use std::time::Instant;
2475
2476 use super::*;
2477
2478 #[cfg(unix)]
2479 const QUICK_SUCCESS_COMMAND: &str = "true";
2480 #[cfg(windows)]
2481 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
2482
2483 #[cfg(unix)]
2484 const LONG_RUNNING_COMMAND: &str = "sleep 5";
2485 #[cfg(windows)]
2486 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
2487
2488 fn spawn_dead_child() -> std::process::Child {
2493 #[cfg(unix)]
2494 let mut cmd = std::process::Command::new("true");
2495 #[cfg(windows)]
2496 let mut cmd = {
2497 let mut c = std::process::Command::new("cmd");
2498 c.args(["/c", "exit", "0"]);
2499 c
2500 };
2501 cmd.stdin(std::process::Stdio::null());
2502 cmd.stdout(std::process::Stdio::null());
2503 cmd.stderr(std::process::Stdio::null());
2504 let mut child = cmd.spawn().expect("spawn replacement child for reap test");
2505 let started = Instant::now();
2514 loop {
2515 match child.try_wait() {
2516 Ok(Some(_)) => break,
2517 Ok(None) => {
2518 if started.elapsed() > Duration::from_secs(5) {
2519 panic!("dead-child stand-in did not exit within 5s");
2520 }
2521 std::thread::sleep(Duration::from_millis(10));
2522 }
2523 Err(error) => panic!("dead-child try_wait failed: {error}"),
2524 }
2525 }
2526 child
2527 }
2528
2529 #[test]
2530 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
2531 let registry = BgTaskRegistry::default();
2532 let dir = tempfile::tempdir().unwrap();
2533 let task_id = registry
2534 .spawn(
2535 QUICK_SUCCESS_COMMAND,
2536 "session".to_string(),
2537 dir.path().to_path_buf(),
2538 HashMap::new(),
2539 Some(Duration::from_secs(30)),
2540 dir.path().to_path_buf(),
2541 10,
2542 true,
2543 false,
2544 Some(dir.path().to_path_buf()),
2545 )
2546 .unwrap();
2547 registry
2548 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
2549 .unwrap();
2550 let completions = registry.drain_completions_for_session(Some("session"));
2551 assert_eq!(completions.len(), 1);
2552 assert_eq!(
2553 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
2554 vec![task_id.clone()]
2555 );
2556
2557 registry.cleanup_finished(Duration::ZERO);
2558
2559 assert!(registry.inner.tasks.lock().unwrap().is_empty());
2560 }
2561
2562 #[test]
2563 fn cleanup_finished_retains_undelivered_terminals() {
2564 let registry = BgTaskRegistry::default();
2565 let dir = tempfile::tempdir().unwrap();
2566 let task_id = registry
2567 .spawn(
2568 QUICK_SUCCESS_COMMAND,
2569 "session".to_string(),
2570 dir.path().to_path_buf(),
2571 HashMap::new(),
2572 Some(Duration::from_secs(30)),
2573 dir.path().to_path_buf(),
2574 10,
2575 true,
2576 false,
2577 Some(dir.path().to_path_buf()),
2578 )
2579 .unwrap();
2580 registry
2581 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
2582 .unwrap();
2583
2584 registry.cleanup_finished(Duration::ZERO);
2585
2586 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
2587 }
2588
2589 #[test]
2599 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
2600 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2601 let dir = tempfile::tempdir().unwrap();
2602 let task_id = registry
2603 .spawn(
2604 QUICK_SUCCESS_COMMAND,
2605 "session".to_string(),
2606 dir.path().to_path_buf(),
2607 HashMap::new(),
2608 Some(Duration::from_secs(30)),
2609 dir.path().to_path_buf(),
2610 10,
2611 true,
2612 false,
2613 Some(dir.path().to_path_buf()),
2614 )
2615 .unwrap();
2616
2617 let task = registry.task_for_session(&task_id, "session").unwrap();
2618
2619 let started = Instant::now();
2624 loop {
2625 let exited = {
2626 let mut state = task.state.lock().unwrap();
2627 if let Some(child) = state.child.as_mut() {
2628 matches!(child.try_wait(), Ok(Some(_)))
2629 } else {
2630 true
2631 }
2632 };
2633 if exited {
2634 break;
2635 }
2636 assert!(
2637 started.elapsed() < Duration::from_secs(5),
2638 "child should exit quickly"
2639 );
2640 std::thread::sleep(Duration::from_millis(20));
2641 }
2642
2643 registry
2651 .inner
2652 .shutdown
2653 .store(true, std::sync::atomic::Ordering::SeqCst);
2654 std::thread::sleep(Duration::from_millis(550));
2658
2659 let _ = std::fs::remove_file(&task.paths.exit);
2662
2663 {
2680 let mut state = task.state.lock().unwrap();
2681 state.metadata.status = BgTaskStatus::Running;
2682 state.metadata.status_reason = None;
2683 state.metadata.exit_code = None;
2684 state.metadata.finished_at = None;
2685 state.metadata.duration_ms = None;
2686 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
2689 .expect("persist reset Running metadata for reap_child test");
2690 if state.child.is_none() {
2694 state.child = Some(spawn_dead_child());
2695 }
2696 }
2697 *task.terminal_at.lock().unwrap() = None;
2700
2701 assert!(
2704 task.is_running(),
2705 "precondition: metadata.status == Running"
2706 );
2707 assert!(
2708 !task.paths.exit.exists(),
2709 "precondition: exit marker absent"
2710 );
2711
2712 registry.reap_child(&task);
2716
2717 let state = task.state.lock().unwrap();
2718 assert!(
2719 state.metadata.status.is_terminal(),
2720 "reap_child must transition to terminal when PID dead and no marker. \
2721 Got status={:?}",
2722 state.metadata.status
2723 );
2724 assert_eq!(
2725 state.metadata.status,
2726 BgTaskStatus::Failed,
2727 "must specifically be Failed (not Killed): status={:?}",
2728 state.metadata.status
2729 );
2730 assert_eq!(
2731 state.metadata.status_reason.as_deref(),
2732 Some("process exited without exit marker"),
2733 "reason must match replay path's wording: {:?}",
2734 state.metadata.status_reason
2735 );
2736 assert!(
2737 state.child.is_none(),
2738 "child handle must be released after reap"
2739 );
2740 assert!(state.detached, "task must be marked detached after reap");
2741 }
2742
2743 #[test]
2749 fn reap_child_preserves_running_when_exit_marker_exists() {
2750 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2751 let dir = tempfile::tempdir().unwrap();
2752 let task_id = registry
2753 .spawn(
2754 QUICK_SUCCESS_COMMAND,
2755 "session".to_string(),
2756 dir.path().to_path_buf(),
2757 HashMap::new(),
2758 Some(Duration::from_secs(30)),
2759 dir.path().to_path_buf(),
2760 10,
2761 true,
2762 false,
2763 Some(dir.path().to_path_buf()),
2764 )
2765 .unwrap();
2766
2767 let task = registry.task_for_session(&task_id, "session").unwrap();
2768
2769 let started = Instant::now();
2772 loop {
2773 let exited = {
2774 let mut state = task.state.lock().unwrap();
2775 if let Some(child) = state.child.as_mut() {
2776 matches!(child.try_wait(), Ok(Some(_)))
2777 } else {
2778 true
2779 }
2780 };
2781 if exited && task.paths.exit.exists() {
2782 break;
2783 }
2784 assert!(
2785 started.elapsed() < Duration::from_secs(5),
2786 "child should exit and write marker quickly"
2787 );
2788 std::thread::sleep(Duration::from_millis(20));
2789 }
2790
2791 registry
2797 .inner
2798 .shutdown
2799 .store(true, std::sync::atomic::Ordering::SeqCst);
2800 std::thread::sleep(Duration::from_millis(550));
2801
2802 {
2808 let mut state = task.state.lock().unwrap();
2809 state.metadata.status = BgTaskStatus::Running;
2810 state.metadata.status_reason = None;
2811 if state.child.is_none() {
2812 state.child = Some(spawn_dead_child());
2813 }
2814 }
2815 *task.terminal_at.lock().unwrap() = None;
2816 if !task.paths.exit.exists() {
2819 std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
2820 }
2821
2822 registry.reap_child(&task);
2826
2827 let state = task.state.lock().unwrap();
2828 assert!(
2829 state.child.is_none(),
2830 "child handle still released even when marker exists"
2831 );
2832 assert!(
2833 state.detached,
2834 "task still marked detached even when marker exists"
2835 );
2836 assert_eq!(
2841 state.metadata.status,
2842 BgTaskStatus::Running,
2843 "reap_child must defer to poll_task when marker exists"
2844 );
2845 }
2846
2847 #[test]
2848 fn cleanup_finished_keeps_running_tasks() {
2849 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2850 let dir = tempfile::tempdir().unwrap();
2851 let task_id = registry
2852 .spawn(
2853 LONG_RUNNING_COMMAND,
2854 "session".to_string(),
2855 dir.path().to_path_buf(),
2856 HashMap::new(),
2857 Some(Duration::from_secs(30)),
2858 dir.path().to_path_buf(),
2859 10,
2860 true,
2861 false,
2862 Some(dir.path().to_path_buf()),
2863 )
2864 .unwrap();
2865
2866 registry.cleanup_finished(Duration::ZERO);
2867
2868 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
2869 let _ = registry.kill(&task_id, "session");
2870 }
2871
2872 #[cfg(windows)]
2873 fn wait_for_file(path: &Path) -> String {
2874 let started = Instant::now();
2875 loop {
2876 if path.exists() {
2877 return fs::read_to_string(path).expect("read file");
2878 }
2879 assert!(
2880 started.elapsed() < Duration::from_secs(30),
2881 "timed out waiting for {}",
2882 path.display()
2883 );
2884 std::thread::sleep(Duration::from_millis(100));
2885 }
2886 }
2887
2888 #[cfg(windows)]
2889 fn spawn_windows_registry_command(
2890 command: &str,
2891 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
2892 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2893 let dir = tempfile::tempdir().unwrap();
2894 let task_id = registry
2895 .spawn(
2896 command,
2897 "session".to_string(),
2898 dir.path().to_path_buf(),
2899 HashMap::new(),
2900 Some(Duration::from_secs(30)),
2901 dir.path().to_path_buf(),
2902 10,
2903 false,
2904 false,
2905 Some(dir.path().to_path_buf()),
2906 )
2907 .unwrap();
2908 (registry, dir, task_id)
2909 }
2910
2911 #[cfg(windows)]
2912 #[test]
2913 fn windows_spawn_writes_exit_marker_for_zero_exit() {
2914 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
2915 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2916
2917 let content = wait_for_file(&exit_path);
2918
2919 assert_eq!(content.trim(), "0");
2920 }
2921
2922 #[cfg(windows)]
2923 #[test]
2924 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
2925 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
2926 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2927
2928 let content = wait_for_file(&exit_path);
2929
2930 assert_eq!(content.trim(), "42");
2931 }
2932
2933 #[cfg(windows)]
2934 #[test]
2935 fn windows_spawn_captures_stdout_to_disk() {
2936 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
2937 let task = registry.task_for_session(&task_id, "session").unwrap();
2938 let stdout_path = task.paths.stdout.clone();
2939 let exit_path = task.paths.exit.clone();
2940
2941 let _ = wait_for_file(&exit_path);
2942 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
2943
2944 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
2945 }
2946
2947 #[cfg(windows)]
2948 #[test]
2949 fn windows_spawn_uses_pwsh_when_available() {
2950 let candidates = crate::windows_shell::shell_candidates_with(
2954 |binary| match binary {
2955 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
2956 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
2957 _ => None,
2958 },
2959 || None,
2960 );
2961 let shell = candidates.first().expect("at least one candidate").clone();
2962 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
2963 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
2964 }
2965
2966 #[cfg(windows)]
2973 #[test]
2974 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
2975 let exit_path = Path::new(r"C:\Temp\bash-test.exit");
2976 let script =
2977 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
2978
2979 assert!(
2983 script.contains("set CODE=%ERRORLEVEL%"),
2984 "wrapper must capture exit code into CODE: {script}"
2985 );
2986 assert!(
2987 script.contains("echo %CODE% >"),
2988 "wrapper must echo CODE to a temp marker file: {script}"
2989 );
2990 assert!(
2991 script.contains("move /Y"),
2992 "wrapper must use atomic move to write the marker: {script}"
2993 );
2994 assert!(
2997 script.contains("> nul"),
2998 "wrapper must redirect move output to nul: {script}"
2999 );
3000 assert!(
3002 script.contains("exit /B %CODE%"),
3003 "wrapper must propagate the captured exit code: {script}"
3004 );
3005 assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
3006 assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
3007 }
3008
3009 #[cfg(windows)]
3015 #[test]
3016 fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
3017 use crate::windows_shell::WindowsShell;
3018 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
3019 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3020 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3021 assert_eq!(
3022 args_strs,
3023 vec!["/D", "/S", "/C", "echo wrapped"],
3024 "Cmd::bg_command must prepend /D /S /C"
3025 );
3026 }
3027
3028 #[cfg(windows)]
3032 #[test]
3033 fn windows_shell_pwsh_bg_command_uses_standard_args() {
3034 use crate::windows_shell::WindowsShell;
3035 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
3036 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3037 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3038 assert!(
3039 args_strs.contains(&"-Command"),
3040 "Pwsh::bg_command must use -Command: {args_strs:?}"
3041 );
3042 assert!(
3043 args_strs.contains(&"Get-Date"),
3044 "Pwsh::bg_command must include the user command body"
3045 );
3046 }
3047
3048 #[allow(dead_code)]
3079 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
3081}