1use std::collections::{HashMap, HashSet, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6#[cfg(unix)]
7use std::sync::OnceLock;
8use std::sync::{Arc, Mutex, RwLock};
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10
11use rusqlite::Connection;
12use serde::Serialize;
13
14use crate::context::SharedProgressSender;
15use crate::harness::Harness;
16use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, PushFrame};
17
18#[cfg(unix)]
19use std::os::unix::process::CommandExt;
20#[cfg(windows)]
21use std::os::windows::process::CommandExt;
22
23use super::buffer::{combine_streams, BgBuffer, TokenCountInput};
24use super::persistence::{
25 create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
26 task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, ExitMarker,
27 PersistedTask, TaskPaths,
28};
29use super::process::is_process_alive;
30#[cfg(unix)]
31use super::process::terminate_pgid;
32#[cfg(windows)]
33use super::process::terminate_pid;
34use super::{BgTaskInfo, BgTaskStatus};
35const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
43const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
44const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
45const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
46
47const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
54const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
55
56#[derive(Debug, Clone, Serialize)]
57pub struct BgCompletion {
58 pub task_id: String,
59 #[serde(skip_serializing)]
62 pub session_id: String,
63 pub status: BgTaskStatus,
64 pub exit_code: Option<i32>,
65 pub command: String,
66 #[serde(default, skip_serializing_if = "String::is_empty")]
72 pub output_preview: String,
73 #[serde(default, skip_serializing_if = "is_false")]
78 pub output_truncated: bool,
79 #[serde(default, skip_serializing_if = "Option::is_none")]
82 pub original_tokens: Option<u32>,
83 #[serde(default, skip_serializing_if = "Option::is_none")]
86 pub compressed_tokens: Option<u32>,
87 #[serde(default, skip_serializing_if = "is_false")]
89 pub tokens_skipped: bool,
90}
91
92fn is_false(v: &bool) -> bool {
93 !*v
94}
95
96#[derive(Debug, Clone, Serialize)]
97pub struct BgTaskSnapshot {
98 #[serde(flatten)]
99 pub info: BgTaskInfo,
100 pub exit_code: Option<i32>,
101 pub child_pid: Option<u32>,
102 pub workdir: String,
103 pub output_preview: String,
104 pub output_truncated: bool,
105 pub output_path: Option<String>,
106 pub stderr_path: Option<String>,
107}
108
109#[derive(Clone)]
110pub struct BgTaskRegistry {
111 pub(crate) inner: Arc<RegistryInner>,
112}
113
114pub(crate) struct RegistryInner {
115 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
116 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
117 pub(crate) progress_sender: SharedProgressSender,
118 watchdog_started: AtomicBool,
119 pub(crate) shutdown: AtomicBool,
120 pub(crate) long_running_reminder_enabled: AtomicBool,
121 pub(crate) long_running_reminder_interval_ms: AtomicU64,
122 persisted_gc_started: AtomicBool,
123 #[cfg(test)]
124 persisted_gc_runs: AtomicU64,
125 pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
131 pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
132 pub(crate) db_harness: RwLock<Option<String>>,
133}
134
135pub(crate) struct BgTask {
136 pub(crate) task_id: String,
137 pub(crate) session_id: String,
138 pub(crate) paths: TaskPaths,
139 pub(crate) started: Instant,
140 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
141 pub(crate) terminal_at: Mutex<Option<Instant>>,
142 pub(crate) state: Mutex<BgTaskState>,
143}
144
145pub(crate) struct BgTaskState {
146 pub(crate) metadata: PersistedTask,
147 pub(crate) child: Option<Child>,
148 pub(crate) detached: bool,
149 pub(crate) child_exit_observed: bool,
160 pub(crate) buffer: BgBuffer,
161}
162
163impl BgTaskRegistry {
164 pub fn new(progress_sender: SharedProgressSender) -> Self {
165 Self {
166 inner: Arc::new(RegistryInner {
167 tasks: Mutex::new(HashMap::new()),
168 completions: Mutex::new(VecDeque::new()),
169 progress_sender,
170 watchdog_started: AtomicBool::new(false),
171 shutdown: AtomicBool::new(false),
172 long_running_reminder_enabled: AtomicBool::new(true),
173 long_running_reminder_interval_ms: AtomicU64::new(600_000),
174 persisted_gc_started: AtomicBool::new(false),
175 #[cfg(test)]
176 persisted_gc_runs: AtomicU64::new(0),
177 compressor: Mutex::new(None),
178 db_pool: RwLock::new(None),
179 db_harness: RwLock::new(None),
180 }),
181 }
182 }
183
184 pub fn set_harness(&self, harness: Harness) {
185 if let Ok(mut slot) = self.inner.db_harness.write() {
186 *slot = Some(harness.as_str().to_string());
187 }
188 }
189
190 pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
191 if let Ok(mut slot) = self.inner.db_pool.write() {
192 *slot = Some(conn);
193 }
194 }
195
196 pub fn clear_db_pool(&self) {
197 if let Ok(mut slot) = self.inner.db_pool.write() {
198 *slot = None;
199 }
200 }
201
202 pub fn set_compressor<F>(&self, compressor: F)
207 where
208 F: Fn(&str, String) -> String + Send + Sync + 'static,
209 {
210 if let Ok(mut slot) = self.inner.compressor.lock() {
211 *slot = Some(Box::new(compressor));
212 }
213 }
214
215 pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
218 let Ok(slot) = self.inner.compressor.lock() else {
219 return output;
220 };
221 match slot.as_ref() {
222 Some(compressor) => compressor(command, output),
223 None => output,
224 }
225 }
226
227 fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
228 write_task(&paths.json, metadata)?;
229 self.dual_write_task(paths, metadata);
230 Ok(())
231 }
232
233 fn update_task_metadata<F>(
234 &self,
235 paths: &TaskPaths,
236 update: F,
237 ) -> std::io::Result<PersistedTask>
238 where
239 F: FnOnce(&mut PersistedTask),
240 {
241 let metadata = update_task(&paths.json, update)?;
242 self.dual_write_task(paths, &metadata);
243 Ok(metadata)
244 }
245
246 fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
247 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
248 let Some(pool) = pool else {
249 return;
250 };
251 let harness = self
252 .inner
253 .db_harness
254 .read()
255 .ok()
256 .and_then(|slot| slot.clone());
257 let Some(harness) = harness else {
258 crate::slog_warn!(
259 "dual-write bash_task to DB skipped for {}: harness not configured",
260 metadata.task_id
261 );
262 return;
263 };
264 let row = match metadata.to_bash_task_row(&harness, paths) {
265 Ok(row) => row,
266 Err(error) => {
267 crate::slog_warn!(
268 "dual-write bash_task to DB failed for {}: {}",
269 metadata.task_id,
270 error
271 );
272 return;
273 }
274 };
275 let conn = match pool.lock() {
276 Ok(conn) => conn,
277 Err(_) => {
278 crate::slog_warn!(
279 "dual-write bash_task to DB failed for {}: db mutex poisoned",
280 metadata.task_id
281 );
282 return;
283 }
284 };
285 if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
286 crate::slog_warn!(
287 "dual-write bash_task to DB failed for {}: {}",
288 metadata.task_id,
289 error
290 );
291 }
292 }
293
294 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
295 self.inner
296 .long_running_reminder_enabled
297 .store(enabled, Ordering::SeqCst);
298 self.inner
299 .long_running_reminder_interval_ms
300 .store(interval_ms, Ordering::SeqCst);
301 }
302
303 #[cfg(unix)]
304 #[allow(clippy::too_many_arguments)]
305 pub fn spawn(
306 &self,
307 command: &str,
308 session_id: String,
309 workdir: PathBuf,
310 env: HashMap<String, String>,
311 timeout: Option<Duration>,
312 storage_dir: PathBuf,
313 max_running: usize,
314 notify_on_completion: bool,
315 compressed: bool,
316 project_root: Option<PathBuf>,
317 ) -> Result<String, String> {
318 self.start_watchdog();
319
320 let running = self.running_count();
321 if running >= max_running {
322 return Err(format!(
323 "background bash task limit exceeded: {running} running (max {max_running})"
324 ));
325 }
326
327 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
328 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
329 let task_id = self.generate_unique_task_id()?;
330 let paths = task_paths(&storage_dir, &session_id, &task_id);
331 fs::create_dir_all(&paths.dir)
332 .map_err(|e| format!("failed to create background task dir: {e}"))?;
333
334 let mut metadata = PersistedTask::starting(
335 task_id.clone(),
336 session_id.clone(),
337 command.to_string(),
338 workdir.clone(),
339 project_root,
340 timeout_ms,
341 notify_on_completion,
342 compressed,
343 );
344 self.persist_task(&paths, &metadata)
345 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
346
347 create_capture_file(&paths.stdout)
351 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
352 create_capture_file(&paths.stderr)
353 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
354
355 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
356 Ok(child) => child,
357 Err(error) => {
358 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
359 let _ = delete_task_bundle(&paths);
360 return Err(error);
361 }
362 };
363
364 let child_pid = child.id();
365 metadata.mark_running(child_pid, child_pid as i32);
366 self.persist_task(&paths, &metadata)
367 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
368
369 let task = Arc::new(BgTask {
370 task_id: task_id.clone(),
371 session_id,
372 paths: paths.clone(),
373 started: Instant::now(),
374 last_reminder_at: Mutex::new(None),
375 terminal_at: Mutex::new(None),
376 state: Mutex::new(BgTaskState {
377 metadata,
378 child: Some(child),
379 detached: false,
380 child_exit_observed: false,
381 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
382 }),
383 });
384
385 self.inner
386 .tasks
387 .lock()
388 .map_err(|_| "background task registry lock poisoned".to_string())?
389 .insert(task_id.clone(), task);
390
391 Ok(task_id)
392 }
393
394 #[cfg(windows)]
395 #[allow(clippy::too_many_arguments)]
396 pub fn spawn(
397 &self,
398 command: &str,
399 session_id: String,
400 workdir: PathBuf,
401 env: HashMap<String, String>,
402 timeout: Option<Duration>,
403 storage_dir: PathBuf,
404 max_running: usize,
405 notify_on_completion: bool,
406 compressed: bool,
407 project_root: Option<PathBuf>,
408 ) -> Result<String, String> {
409 self.start_watchdog();
410
411 let running = self.running_count();
412 if running >= max_running {
413 return Err(format!(
414 "background bash task limit exceeded: {running} running (max {max_running})"
415 ));
416 }
417
418 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
419 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
420 let task_id = self.generate_unique_task_id()?;
421 let paths = task_paths(&storage_dir, &session_id, &task_id);
422 fs::create_dir_all(&paths.dir)
423 .map_err(|e| format!("failed to create background task dir: {e}"))?;
424
425 let mut metadata = PersistedTask::starting(
426 task_id.clone(),
427 session_id.clone(),
428 command.to_string(),
429 workdir.clone(),
430 project_root,
431 timeout_ms,
432 notify_on_completion,
433 compressed,
434 );
435 self.persist_task(&paths, &metadata)
436 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
437
438 create_capture_file(&paths.stdout)
444 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
445 create_capture_file(&paths.stderr)
446 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
447
448 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
449 Ok(child) => child,
450 Err(error) => {
451 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
452 let _ = delete_task_bundle(&paths);
453 return Err(error);
454 }
455 };
456
457 let child_pid = child.id();
458 metadata.status = BgTaskStatus::Running;
459 metadata.child_pid = Some(child_pid);
460 metadata.pgid = None;
461 self.persist_task(&paths, &metadata)
462 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
463
464 let task = Arc::new(BgTask {
465 task_id: task_id.clone(),
466 session_id,
467 paths: paths.clone(),
468 started: Instant::now(),
469 last_reminder_at: Mutex::new(None),
470 terminal_at: Mutex::new(None),
471 state: Mutex::new(BgTaskState {
472 metadata,
473 child: Some(child),
474 detached: false,
475 child_exit_observed: false,
476 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
477 }),
478 });
479
480 self.inner
481 .tasks
482 .lock()
483 .map_err(|_| "background task registry lock poisoned".to_string())?
484 .insert(task_id.clone(), task);
485
486 Ok(task_id)
487 }
488
489 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
490 self.replay_session_inner(storage_dir, session_id, None)
491 }
492
493 pub fn replay_session_for_project(
494 &self,
495 storage_dir: &Path,
496 session_id: &str,
497 project_root: &Path,
498 ) -> Result<(), String> {
499 self.replay_session_inner(storage_dir, session_id, Some(project_root))
500 }
501
502 fn replay_session_inner(
503 &self,
504 storage_dir: &Path,
505 session_id: &str,
506 project_root: Option<&Path>,
507 ) -> Result<(), String> {
508 self.start_watchdog();
509 if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
510 if let Err(error) = self.maybe_gc_persisted(storage_dir) {
511 crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
512 }
513 }
514
515 let canonical_project = project_root.map(canonicalized_path);
516 let tasks = match self.replay_session_from_db(session_id) {
528 Some(Ok(tasks)) if !tasks.is_empty() => tasks,
529 Some(Ok(_)) => {
530 let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
531 if !disk_tasks.is_empty() {
532 crate::slog_info!(
533 "bash task replay: 0 in DB for session {}, {} from disk fallback",
534 session_id,
535 disk_tasks.len()
536 );
537 }
538 disk_tasks
539 }
540 Some(Err(error)) => {
541 crate::slog_warn!(
542 "bash task replay DB lookup failed for session {}; falling back to disk: {}",
543 session_id,
544 error
545 );
546 self.replay_session_from_disk(storage_dir, session_id)?
547 }
548 None => {
549 self.replay_session_from_disk(storage_dir, session_id)?
551 }
552 };
553
554 for mut metadata in tasks {
555 if metadata.session_id != session_id {
556 continue;
557 }
558 if let Some(canonical_project) = canonical_project.as_deref() {
559 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
560 if metadata_project.as_deref() != Some(canonical_project) {
561 continue;
562 }
563 }
564
565 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
566 match metadata.status {
567 BgTaskStatus::Starting => {
568 metadata.mark_terminal(
569 BgTaskStatus::Failed,
570 None,
571 Some("spawn aborted".to_string()),
572 );
573 let _ = self.persist_task(&paths, &metadata);
574 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
575 self.insert_rehydrated_task(metadata, paths, true)?;
576 }
577 BgTaskStatus::Running | BgTaskStatus::Killing => {
578 if self.running_metadata_is_stale(&metadata) {
579 metadata.mark_terminal(
580 BgTaskStatus::Killed,
581 None,
582 Some("orphaned (>24h)".to_string()),
583 );
584 if !paths.exit.exists() {
585 let _ = write_kill_marker_if_absent(&paths.exit);
586 }
587 let _ = self.persist_task(&paths, &metadata);
588 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
589 self.insert_rehydrated_task(metadata, paths, true)?;
590 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
591 let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
592 "recovered from inconsistent killing state on replay".to_string()
593 });
594 if reason.is_some() {
595 crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
596 metadata.task_id);
597 }
598 metadata = terminal_metadata_from_marker(metadata, marker, reason);
599 let _ = self.persist_task(&paths, &metadata);
600 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
601 self.insert_rehydrated_task(metadata, paths, true)?;
602 } else if metadata.status == BgTaskStatus::Killing {
603 if !paths.exit.exists() {
604 let _ = write_kill_marker_if_absent(&paths.exit);
605 }
606 metadata.mark_terminal(
607 BgTaskStatus::Killed,
608 None,
609 Some("recovered from inconsistent killing state on replay".to_string()),
610 );
611 let _ = self.persist_task(&paths, &metadata);
612 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
613 self.insert_rehydrated_task(metadata, paths, true)?;
614 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
615 metadata.mark_terminal(
616 BgTaskStatus::Failed,
617 None,
618 Some("process exited without exit marker".to_string()),
619 );
620 let _ = self.persist_task(&paths, &metadata);
621 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
622 self.insert_rehydrated_task(metadata, paths, true)?;
623 } else {
624 self.insert_rehydrated_task(metadata, paths, true)?;
625 }
626 }
627 _ if metadata.status.is_terminal() => {
628 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
634 self.insert_rehydrated_task(metadata, paths, true)?;
635 }
636 _ => {}
637 }
638 }
639
640 Ok(())
641 }
642
643 fn replay_session_from_db(
644 &self,
645 session_id: &str,
646 ) -> Option<Result<Vec<PersistedTask>, String>> {
647 let pool = self
648 .inner
649 .db_pool
650 .read()
651 .ok()
652 .and_then(|slot| slot.clone())?;
653 let harness = self
654 .inner
655 .db_harness
656 .read()
657 .ok()
658 .and_then(|slot| slot.clone())?;
659 let conn = match pool.lock() {
660 Ok(conn) => conn,
661 Err(_) => return Some(Err("db mutex poisoned".to_string())),
662 };
663 Some(
664 crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
665 .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
666 .map_err(|error| error.to_string()),
667 )
668 }
669
670 fn replay_session_from_disk(
671 &self,
672 storage_dir: &Path,
673 session_id: &str,
674 ) -> Result<Vec<PersistedTask>, String> {
675 let dir = session_tasks_dir(storage_dir, session_id);
676 if !dir.exists() {
677 return Ok(Vec::new());
678 }
679
680 let entries = fs::read_dir(&dir)
681 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
682 let mut tasks = Vec::new();
683 for entry in entries.flatten() {
684 let path = entry.path();
685 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
686 continue;
687 }
688 match read_task(&path) {
689 Ok(metadata) => tasks.push(metadata),
690 Err(error) => {
691 crate::slog_warn!(
692 "quarantining invalid background task metadata {} during replay: {error}",
693 path.display()
694 );
695 if let Err(quarantine_error) =
696 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
697 {
698 crate::slog_warn!(
699 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
700 path.display()
701 );
702 }
703 }
704 }
705 }
706 Ok(tasks)
707 }
708
709 pub fn status(
710 &self,
711 task_id: &str,
712 session_id: &str,
713 project_root: Option<&Path>,
714 storage_dir: Option<&Path>,
715 preview_bytes: usize,
716 ) -> Option<BgTaskSnapshot> {
717 let mut task = self.task_for_session(task_id, session_id);
718 if task.is_none() {
719 if let Some(storage_dir) = storage_dir {
720 let _ = self.replay_session(storage_dir, session_id);
721 task = self.task_for_session(task_id, session_id);
722 }
723 }
724 let Some(task) = task else {
725 return self.status_relaxed(
726 task_id,
727 session_id,
728 project_root?,
729 storage_dir?,
730 preview_bytes,
731 );
732 };
733 let _ = self.poll_task(&task);
734 let mut snapshot = task.snapshot(preview_bytes);
735 self.maybe_compress_snapshot(&task, &mut snapshot);
736 Some(snapshot)
737 }
738
739 fn status_relaxed_task(
740 &self,
741 task_id: &str,
742 project_root: &Path,
743 storage_dir: &Path,
744 ) -> Option<Arc<BgTask>> {
745 let canonical_project = canonicalized_path(project_root);
746 match self.lookup_relaxed_task_from_db(task_id, project_root) {
747 Some(Ok(Some(metadata))) => {
748 if let Some(task) = self.task(task_id) {
749 let matches_project = task
750 .state
751 .lock()
752 .map(|state| {
753 state
754 .metadata
755 .project_root
756 .as_deref()
757 .map(canonicalized_path)
758 .as_deref()
759 == Some(canonical_project.as_path())
760 })
761 .unwrap_or(false);
762 return matches_project.then_some(task);
763 }
764 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
765 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
766 return None;
767 }
768 return self.task(task_id);
769 }
770 Some(Ok(None)) => {
771 crate::slog_info!(
772 "bash task relaxed DB miss for {}; falling back to disk",
773 task_id
774 );
775 }
776 Some(Err(error)) => {
777 crate::slog_warn!(
778 "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
779 task_id,
780 error
781 );
782 }
783 None => {
784 crate::slog_info!(
785 "bash task relaxed DB unavailable for {}; falling back to disk",
786 task_id
787 );
788 }
789 }
790 let root = storage_dir.join("bash-tasks");
791 let entries = fs::read_dir(&root).ok()?;
792 for entry in entries.flatten() {
793 let dir = entry.path();
794 if !dir.is_dir() {
795 continue;
796 }
797 let path = dir.join(format!("{task_id}.json"));
798 if !path.exists() {
799 continue;
800 }
801 let metadata = match read_task(&path) {
802 Ok(metadata) => metadata,
803 Err(error) => {
804 crate::slog_warn!(
805 "quarantining invalid background task metadata {} during relaxed lookup: {error}",
806 path.display()
807 );
808 if let Err(quarantine_error) =
809 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
810 {
811 crate::slog_warn!(
812 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
813 path.display()
814 );
815 }
816 continue;
817 }
818 };
819 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
820 if metadata_project.as_deref() != Some(canonical_project.as_path()) {
821 continue;
822 }
823 if let Some(task) = self.task(task_id) {
824 let matches_project = task
825 .state
826 .lock()
827 .map(|state| {
828 state
829 .metadata
830 .project_root
831 .as_deref()
832 .map(canonicalized_path)
833 .as_deref()
834 == Some(canonical_project.as_path())
835 })
836 .unwrap_or(false);
837 return matches_project.then_some(task);
838 }
839 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
840 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
841 return None;
842 }
843 return self.task(task_id);
844 }
845 None
846 }
847
848 fn lookup_relaxed_task_from_db(
849 &self,
850 task_id: &str,
851 project_root: &Path,
852 ) -> Option<Result<Option<PersistedTask>, String>> {
853 let pool = self
854 .inner
855 .db_pool
856 .read()
857 .ok()
858 .and_then(|slot| slot.clone())?;
859 let harness = self
860 .inner
861 .db_harness
862 .read()
863 .ok()
864 .and_then(|slot| slot.clone())?;
865 let conn = match pool.lock() {
866 Ok(conn) => conn,
867 Err(_) => return Some(Err("db mutex poisoned".to_string())),
868 };
869 let project_key = crate::search_index::project_cache_key(project_root);
870 Some(
871 crate::db::bash_tasks::find_bash_task_for_project(
872 &conn,
873 &harness,
874 &project_key,
875 task_id,
876 )
877 .map(|row| row.map(PersistedTask::from))
878 .map_err(|error| error.to_string()),
879 )
880 }
881
882 pub(super) fn status_relaxed(
883 &self,
884 task_id: &str,
885 _session_id: &str,
886 project_root: &Path,
887 storage_dir: &Path,
888 preview_bytes: usize,
889 ) -> Option<BgTaskSnapshot> {
890 let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
891 let _ = self.poll_task(&task);
892 let mut snapshot = task.snapshot(preview_bytes);
893 self.maybe_compress_snapshot(&task, &mut snapshot);
894 Some(snapshot)
895 }
896
897 pub fn kill_relaxed(
898 &self,
899 task_id: &str,
900 project_root: &Path,
901 storage_dir: &Path,
902 ) -> Result<BgTaskSnapshot, String> {
903 let task = self
904 .status_relaxed_task(task_id, project_root, storage_dir)
905 .ok_or_else(|| format!("background task not found: {task_id}"))?;
906 self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
907 }
908
909 pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
910 #[cfg(test)]
911 self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
912
913 let mut deleted = 0usize;
914
915 let root = storage_dir.join("bash-tasks");
916 if root.exists() {
917 let session_dirs = fs::read_dir(&root).map_err(|e| {
918 format!(
919 "failed to read background task root {}: {e}",
920 root.display()
921 )
922 })?;
923 for session_entry in session_dirs.flatten() {
924 let session_dir = session_entry.path();
925 if !session_dir.is_dir() {
926 continue;
927 }
928 let task_entries = match fs::read_dir(&session_dir) {
929 Ok(entries) => entries,
930 Err(error) => {
931 crate::slog_warn!(
932 "failed to read background task session dir {}: {error}",
933 session_dir.display()
934 );
935 continue;
936 }
937 };
938 for task_entry in task_entries.flatten() {
939 let json_path = task_entry.path();
940 if json_path
941 .extension()
942 .and_then(|extension| extension.to_str())
943 != Some("json")
944 {
945 continue;
946 }
947 if modified_within(&json_path, PERSISTED_GC_GRACE) {
948 continue;
949 }
950 let metadata = match read_task(&json_path) {
951 Ok(metadata) => metadata,
952 Err(error) => {
953 crate::slog_warn!(
954 "quarantining corrupt background task metadata {}: {error}",
955 json_path.display()
956 );
957 quarantine_task_json(
958 storage_dir,
959 &session_dir,
960 &json_path,
961 QuarantineKind::Corrupt,
962 )?;
963 continue;
964 }
965 };
966 if !(metadata.status.is_terminal() && metadata.completion_delivered) {
967 continue;
968 }
969 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
970 match delete_task_bundle(&paths) {
971 Ok(()) => {
972 deleted += 1;
973 log::debug!(
974 "deleted persisted background task bundle {}",
975 metadata.task_id
976 );
977 }
978 Err(error) => {
979 crate::slog_warn!(
980 "failed to delete background task bundle {}: {error}",
981 metadata.task_id
982 );
983 continue;
984 }
985 }
986 }
987 }
988 }
989 gc_quarantine(storage_dir);
990 Ok(deleted)
991 }
992
993 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
994 let tasks = self
995 .inner
996 .tasks
997 .lock()
998 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
999 .unwrap_or_default();
1000 tasks
1001 .into_iter()
1002 .map(|task| {
1003 let _ = self.poll_task(&task);
1004 let mut snapshot = task.snapshot(preview_bytes);
1005 self.maybe_compress_snapshot(&task, &mut snapshot);
1006 snapshot
1007 })
1008 .collect()
1009 }
1010
1011 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1017 if !snapshot.info.status.is_terminal() {
1018 return;
1019 }
1020 let compressed_flag = task
1021 .state
1022 .lock()
1023 .map(|state| state.metadata.compressed)
1024 .unwrap_or(true);
1025 if !compressed_flag {
1026 return;
1027 }
1028 let raw = std::mem::take(&mut snapshot.output_preview);
1029 snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
1030 }
1031
1032 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1033 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1034 }
1035
1036 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1037 let task = self
1038 .task_for_session(task_id, session_id)
1039 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1040 let mut state = task
1041 .state
1042 .lock()
1043 .map_err(|_| "background task lock poisoned".to_string())?;
1044 let updated = self
1045 .update_task_metadata(&task.paths, |metadata| {
1046 metadata.notify_on_completion = true;
1047 metadata.completion_delivered = false;
1048 })
1049 .map_err(|e| format!("failed to promote background task: {e}"))?;
1050 state.metadata = updated;
1051 if state.metadata.status.is_terminal() {
1052 state.buffer.enforce_terminal_cap();
1053 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1054 }
1055 Ok(true)
1056 }
1057
1058 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1059 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1060 .map(|_| ())
1061 }
1062
1063 pub fn cleanup_finished(&self, older_than: Duration) {
1064 let cutoff = Instant::now().checked_sub(older_than);
1065 let removable_paths: Vec<(String, TaskPaths)> =
1066 if let Ok(mut tasks) = self.inner.tasks.lock() {
1067 let removable = tasks
1068 .iter()
1069 .filter_map(|(task_id, task)| {
1070 let delivered_terminal = task
1071 .state
1072 .lock()
1073 .map(|state| {
1074 state.metadata.status.is_terminal()
1075 && state.metadata.completion_delivered
1076 })
1077 .unwrap_or(false);
1078 if !delivered_terminal {
1079 return None;
1080 }
1081
1082 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1083 let expired = match (terminal_at, cutoff) {
1084 (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1085 (Some(_), None) => true,
1086 (None, _) => false,
1087 };
1088 expired.then(|| task_id.clone())
1089 })
1090 .collect::<Vec<_>>();
1091
1092 removable
1093 .into_iter()
1094 .filter_map(|task_id| {
1095 tasks
1096 .remove(&task_id)
1097 .map(|task| (task_id, task.paths.clone()))
1098 })
1099 .collect()
1100 } else {
1101 Vec::new()
1102 };
1103
1104 for (task_id, paths) in removable_paths {
1105 match delete_task_bundle(&paths) {
1106 Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1107 Err(error) => crate::slog_warn!(
1108 "failed to delete persisted background task bundle {task_id}: {error}"
1109 ),
1110 }
1111 }
1112 }
1113
1114 pub fn drain_completions(&self) -> Vec<BgCompletion> {
1115 self.drain_completions_for_session(None)
1116 }
1117
1118 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1119 let completions = match self.inner.completions.lock() {
1120 Ok(completions) => completions,
1121 Err(_) => return Vec::new(),
1122 };
1123
1124 completions
1125 .iter()
1126 .filter(|completion| {
1127 session_id
1128 .map(|session_id| completion.session_id == session_id)
1129 .unwrap_or(true)
1130 })
1131 .cloned()
1132 .collect()
1133 }
1134
1135 pub fn ack_completions_for_session(
1136 &self,
1137 session_id: Option<&str>,
1138 task_ids: &[String],
1139 ) -> Vec<String> {
1140 if task_ids.is_empty() {
1141 return Vec::new();
1142 }
1143 let task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1144 let mut completions = match self.inner.completions.lock() {
1145 Ok(completions) => completions,
1146 Err(_) => return Vec::new(),
1147 };
1148 let mut acked = Vec::new();
1149 completions.retain(|completion| {
1150 let session_matches = session_id
1151 .map(|session_id| completion.session_id == session_id)
1152 .unwrap_or(true);
1153 if session_matches && task_ids.contains(completion.task_id.as_str()) {
1154 acked.push((completion.task_id.clone(), completion.session_id.clone()));
1155 false
1156 } else {
1157 true
1158 }
1159 });
1160 drop(completions);
1161
1162 let mut delivered = Vec::new();
1163 for (task_id, completion_session_id) in acked {
1164 if let Some(task) = self.task_for_session(&task_id, &completion_session_id) {
1165 if task.set_completion_delivered(true, self).is_ok() {
1166 delivered.push(task_id);
1167 }
1168 }
1169 }
1170
1171 delivered
1172 }
1173
1174 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1175 self.inner
1176 .completions
1177 .lock()
1178 .map(|completions| {
1179 completions
1180 .iter()
1181 .filter(|completion| completion.session_id == session_id)
1182 .cloned()
1183 .collect()
1184 })
1185 .unwrap_or_default()
1186 }
1187
1188 pub fn detach(&self) {
1189 self.inner.shutdown.store(true, Ordering::SeqCst);
1190 if let Ok(mut tasks) = self.inner.tasks.lock() {
1191 for task in tasks.values() {
1192 if let Ok(mut state) = task.state.lock() {
1193 state.child = None;
1194 state.detached = true;
1195 }
1196 }
1197 tasks.clear();
1198 }
1199 }
1200
1201 pub fn shutdown(&self) {
1202 let tasks = self
1203 .inner
1204 .tasks
1205 .lock()
1206 .map(|tasks| {
1207 tasks
1208 .values()
1209 .map(|task| (task.task_id.clone(), task.session_id.clone()))
1210 .collect::<Vec<_>>()
1211 })
1212 .unwrap_or_default();
1213 for (task_id, session_id) in tasks {
1214 let _ = self.kill(&task_id, &session_id);
1215 }
1216 }
1217
1218 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1219 let marker = match read_exit_marker(&task.paths.exit) {
1220 Ok(Some(marker)) => marker,
1221 Ok(None) => return Ok(()),
1222 Err(error) => return Err(format!("failed to read exit marker: {error}")),
1223 };
1224 self.finalize_from_marker(task, marker, None)
1225 }
1226
1227 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1228 let Ok(mut state) = task.state.lock() else {
1229 return;
1230 };
1231 if let Some(child) = state.child.as_mut() {
1232 if matches!(child.try_wait(), Ok(Some(_))) {
1233 state.child = None;
1246 state.detached = true;
1247 state.child_exit_observed = true;
1248 }
1249 } else if state.detached {
1250 let child_known_dead = state.child_exit_observed
1263 || state
1264 .metadata
1265 .child_pid
1266 .is_some_and(|pid| !is_process_alive(pid));
1267 if child_known_dead {
1268 self.fail_without_exit_marker_if_needed(task, &mut state);
1269 }
1270 }
1271 }
1272
1273 fn fail_without_exit_marker_if_needed(&self, task: &Arc<BgTask>, state: &mut BgTaskState) {
1274 if state.metadata.status.is_terminal() {
1275 return;
1276 }
1277 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1278 return;
1279 }
1280 let updated = self.update_task_metadata(&task.paths, |metadata| {
1281 metadata.mark_terminal(
1282 BgTaskStatus::Failed,
1283 None,
1284 Some("process exited without exit marker".to_string()),
1285 );
1286 });
1287 if let Ok(metadata) = updated {
1288 state.metadata = metadata;
1289 task.mark_terminal_now();
1290 state.buffer.enforce_terminal_cap();
1291 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1292 }
1293 }
1294
1295 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1296 self.inner
1297 .tasks
1298 .lock()
1299 .map(|tasks| {
1300 tasks
1301 .values()
1302 .filter(|task| task.is_running())
1303 .cloned()
1304 .collect()
1305 })
1306 .unwrap_or_default()
1307 }
1308
1309 fn insert_rehydrated_task(
1310 &self,
1311 metadata: PersistedTask,
1312 paths: TaskPaths,
1313 detached: bool,
1314 ) -> Result<(), String> {
1315 let task_id = metadata.task_id.clone();
1316 let session_id = metadata.session_id.clone();
1317 let started = started_instant_from_unix_millis(metadata.started_at);
1318 let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1319 let task = Arc::new(BgTask {
1320 task_id: task_id.clone(),
1321 session_id,
1322 paths: paths.clone(),
1323 started,
1324 last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1325 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1326 state: Mutex::new(BgTaskState {
1327 metadata,
1328 child: None,
1329 detached,
1330 child_exit_observed: false,
1337 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
1338 }),
1339 });
1340 self.inner
1341 .tasks
1342 .lock()
1343 .map_err(|_| "background task registry lock poisoned".to_string())?
1344 .insert(task_id, task);
1345 Ok(())
1346 }
1347
1348 fn kill_with_status(
1349 &self,
1350 task_id: &str,
1351 session_id: &str,
1352 terminal_status: BgTaskStatus,
1353 ) -> Result<BgTaskSnapshot, String> {
1354 let task = self
1355 .task_for_session(task_id, session_id)
1356 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1357
1358 {
1359 let mut state = task
1360 .state
1361 .lock()
1362 .map_err(|_| "background task lock poisoned".to_string())?;
1363 if state.metadata.status.is_terminal() {
1364 return Ok(task.snapshot_locked(&state, 5 * 1024));
1365 }
1366
1367 if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1368 state.metadata =
1369 terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1370 task.mark_terminal_now();
1371 state.child = None;
1372 state.detached = true;
1373 state.buffer.enforce_terminal_cap();
1374 self.persist_task(&task.paths, &state.metadata)
1375 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1376 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1377 return Ok(task.snapshot_locked(&state, 5 * 1024));
1378 }
1379
1380 state.metadata.status = BgTaskStatus::Killing;
1381 self.persist_task(&task.paths, &state.metadata)
1382 .map_err(|e| format!("failed to persist killing state: {e}"))?;
1383
1384 #[cfg(unix)]
1385 if let Some(pgid) = state.metadata.pgid {
1386 terminate_pgid(pgid, state.child.as_mut());
1387 }
1388 #[cfg(windows)]
1389 if let Some(child) = state.child.as_mut() {
1390 super::process::terminate_process(child);
1391 } else if let Some(pid) = state.metadata.child_pid {
1392 terminate_pid(pid);
1393 }
1394 if let Some(child) = state.child.as_mut() {
1395 let _ = child.wait();
1396 }
1397 state.child = None;
1398 state.detached = true;
1399
1400 if !task.paths.exit.exists() {
1401 write_kill_marker_if_absent(&task.paths.exit)
1402 .map_err(|e| format!("failed to write kill marker: {e}"))?;
1403 }
1404
1405 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1406 Some(124)
1407 } else {
1408 None
1409 };
1410 state
1411 .metadata
1412 .mark_terminal(terminal_status, exit_code, None);
1413 task.mark_terminal_now();
1414 self.persist_task(&task.paths, &state.metadata)
1415 .map_err(|e| format!("failed to persist killed state: {e}"))?;
1416 state.buffer.enforce_terminal_cap();
1417 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1418 }
1419
1420 Ok(task.snapshot(5 * 1024))
1421 }
1422
1423 fn finalize_from_marker(
1424 &self,
1425 task: &Arc<BgTask>,
1426 marker: ExitMarker,
1427 reason: Option<String>,
1428 ) -> Result<(), String> {
1429 let mut state = task
1430 .state
1431 .lock()
1432 .map_err(|_| "background task lock poisoned".to_string())?;
1433 if state.metadata.status.is_terminal() {
1434 return Ok(());
1435 }
1436
1437 let updated = self
1438 .update_task_metadata(&task.paths, |metadata| {
1439 let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
1440 *metadata = new_metadata;
1441 })
1442 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1443 state.metadata = updated;
1444 task.mark_terminal_now();
1445 state.child = None;
1446 state.detached = true;
1447 state.buffer.enforce_terminal_cap();
1448 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1449 Ok(())
1450 }
1451
1452 fn enqueue_completion_if_needed(
1453 &self,
1454 metadata: &PersistedTask,
1455 paths: Option<&TaskPaths>,
1456 emit_frame: bool,
1457 ) {
1458 if metadata.status.is_terminal() && !metadata.completion_delivered {
1459 self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
1460 }
1461 }
1462
1463 fn enqueue_completion_locked(
1464 &self,
1465 metadata: &PersistedTask,
1466 buffer: Option<&BgBuffer>,
1467 emit_frame: bool,
1468 ) {
1469 self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
1470 }
1471
1472 fn enqueue_completion_from_parts(
1473 &self,
1474 metadata: &PersistedTask,
1475 buffer: Option<&BgBuffer>,
1476 paths: Option<&TaskPaths>,
1477 emit_frame: bool,
1478 ) {
1479 if !metadata.status.is_terminal() {
1490 return;
1491 }
1492 let (raw_preview, output_truncated) = match buffer {
1497 Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
1498 None => paths
1499 .map(|paths| read_tail_from_disk(paths, BG_COMPLETION_PREVIEW_BYTES))
1500 .unwrap_or_else(|| (String::new(), false)),
1501 };
1502 let output_preview = if metadata.compressed {
1507 self.compress_output(&metadata.command, raw_preview)
1508 } else {
1509 raw_preview
1510 };
1511 let token_counts = self.completion_token_counts(metadata, buffer, paths);
1512 let completion = BgCompletion {
1513 task_id: metadata.task_id.clone(),
1514 session_id: metadata.session_id.clone(),
1515 status: metadata.status.clone(),
1516 exit_code: metadata.exit_code,
1517 command: metadata.command.clone(),
1518 output_preview,
1519 output_truncated,
1520 original_tokens: token_counts.original_tokens,
1521 compressed_tokens: token_counts.compressed_tokens,
1522 tokens_skipped: token_counts.tokens_skipped,
1523 };
1524
1525 self.record_compression_event_if_applicable(metadata, &token_counts);
1536
1537 if metadata.completion_delivered {
1547 return;
1548 }
1549
1550 let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
1553 if completions
1554 .iter()
1555 .any(|existing| existing.task_id == metadata.task_id)
1556 {
1557 false
1558 } else {
1559 completions.push_back(completion.clone());
1560 true
1561 }
1562 } else {
1563 false
1564 };
1565
1566 if pushed && emit_frame {
1567 self.emit_bash_completed(completion);
1568 }
1569 }
1570
1571 fn record_compression_event_if_applicable(
1572 &self,
1573 metadata: &PersistedTask,
1574 token_counts: &CompletionTokenCounts,
1575 ) {
1576 let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
1577 token_counts.original_tokens,
1578 token_counts.compressed_tokens,
1579 token_counts.original_bytes,
1580 token_counts.compressed_bytes,
1581 ) {
1582 (
1583 Some(original_tokens),
1584 Some(compressed_tokens),
1585 Some(original_bytes),
1586 Some(compressed_bytes),
1587 ) => (
1588 original_tokens,
1589 compressed_tokens,
1590 original_bytes,
1591 compressed_bytes,
1592 ),
1593 _ => {
1594 crate::slog_warn!(
1595 "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
1596 metadata.task_id
1597 );
1598 return;
1599 }
1600 };
1601
1602 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
1603 let Some(pool) = pool else {
1604 crate::slog_warn!(
1605 "compression event skipped for {}: db_pool not initialized — was configure run?",
1606 metadata.task_id
1607 );
1608 return;
1609 };
1610 let harness = self
1611 .inner
1612 .db_harness
1613 .read()
1614 .ok()
1615 .and_then(|slot| slot.clone());
1616 let Some(harness) = harness else {
1617 crate::slog_warn!(
1618 "compression event insert skipped for {}: harness not configured",
1619 metadata.task_id
1620 );
1621 return;
1622 };
1623
1624 let project_root = metadata
1625 .project_root
1626 .as_deref()
1627 .unwrap_or(&metadata.workdir);
1628 let project_key = crate::search_index::project_cache_key(project_root);
1629 let row = crate::db::compression_events::CompressionEventRow {
1630 harness: &harness,
1631 session_id: Some(&metadata.session_id),
1632 project_key: &project_key,
1633 tool: "bash",
1634 task_id: Some(&metadata.task_id),
1635 command: Some(&metadata.command),
1636 compressor: if metadata.compressed {
1637 "registry"
1638 } else {
1639 "none"
1640 },
1641 original_bytes,
1642 compressed_bytes,
1643 original_tokens,
1644 compressed_tokens,
1645 created_at: unix_millis() as i64,
1646 };
1647
1648 let conn = match pool.lock() {
1649 Ok(conn) => conn,
1650 Err(_) => {
1651 crate::slog_warn!(
1652 "compression event insert failed for {}: db mutex poisoned",
1653 metadata.task_id
1654 );
1655 return;
1656 }
1657 };
1658 match crate::db::compression_events::insert_compression_event(&conn, &row) {
1659 Ok(_) => {
1660 crate::slog_debug!(
1664 "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
1665 metadata.task_id,
1666 project_key,
1667 metadata.session_id,
1668 original_tokens,
1669 compressed_tokens
1670 );
1671 }
1672 Err(error) => {
1673 crate::slog_warn!(
1674 "compression event insert failed for {}: {}",
1675 metadata.task_id,
1676 error
1677 );
1678 }
1679 }
1680 }
1681
1682 fn emit_bash_completed(&self, completion: BgCompletion) {
1683 let Ok(progress_sender) = self
1684 .inner
1685 .progress_sender
1686 .lock()
1687 .map(|sender| sender.clone())
1688 else {
1689 return;
1690 };
1691 let Some(sender) = progress_sender.as_ref() else {
1692 return;
1693 };
1694 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
1702 completion.task_id,
1703 completion.session_id,
1704 completion.status,
1705 completion.exit_code,
1706 completion.command,
1707 completion.output_preview,
1708 completion.output_truncated,
1709 completion.original_tokens,
1710 completion.compressed_tokens,
1711 completion.tokens_skipped,
1712 )));
1713 }
1714
1715 fn completion_token_counts(
1716 &self,
1717 metadata: &PersistedTask,
1718 buffer: Option<&BgBuffer>,
1719 paths: Option<&TaskPaths>,
1720 ) -> CompletionTokenCounts {
1721 let raw = match buffer {
1722 Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
1723 None => paths
1724 .map(|paths| read_for_token_count_from_disk(paths, TOKENIZE_CAP_BYTES_PER_STREAM))
1725 .unwrap_or(TokenCountInput::Skipped),
1726 };
1727
1728 let TokenCountInput::Text(raw_output) = raw else {
1729 return CompletionTokenCounts::skipped();
1730 };
1731
1732 let original_tokens = token_count_u32(&raw_output);
1733 let original_bytes = raw_output.len() as i64;
1734 let compressed_output = if metadata.compressed {
1735 self.compress_output(&metadata.command, raw_output)
1736 } else {
1737 raw_output
1738 };
1739 let compressed_tokens = token_count_u32(&compressed_output);
1740 let compressed_bytes = compressed_output.len() as i64;
1741 CompletionTokenCounts {
1742 original_tokens: Some(original_tokens),
1743 compressed_tokens: Some(compressed_tokens),
1744 original_bytes: Some(original_bytes),
1745 compressed_bytes: Some(compressed_bytes),
1746 tokens_skipped: false,
1747 }
1748 }
1749
1750 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
1751 if !self
1752 .inner
1753 .long_running_reminder_enabled
1754 .load(Ordering::SeqCst)
1755 {
1756 return;
1757 }
1758 let interval_ms = self
1759 .inner
1760 .long_running_reminder_interval_ms
1761 .load(Ordering::SeqCst);
1762 if interval_ms == 0 {
1763 return;
1764 }
1765 let interval = Duration::from_millis(interval_ms);
1766 let now = Instant::now();
1767 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
1768 return;
1769 };
1770 let since = last_reminder_at.unwrap_or(task.started);
1771 if now.duration_since(since) < interval {
1772 return;
1773 }
1774 let command = task
1775 .state
1776 .lock()
1777 .map(|state| state.metadata.command.clone())
1778 .unwrap_or_default();
1779 *last_reminder_at = Some(now);
1780 self.emit_bash_long_running(BashLongRunningFrame::new(
1781 task.task_id.clone(),
1782 task.session_id.clone(),
1783 command,
1784 task.started.elapsed().as_millis() as u64,
1785 ));
1786 }
1787
1788 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
1789 let Ok(progress_sender) = self
1790 .inner
1791 .progress_sender
1792 .lock()
1793 .map(|sender| sender.clone())
1794 else {
1795 return;
1796 };
1797 if let Some(sender) = progress_sender.as_ref() {
1798 sender(PushFrame::BashLongRunning(frame));
1799 }
1800 }
1801
1802 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
1803 self.inner
1804 .tasks
1805 .lock()
1806 .ok()
1807 .and_then(|tasks| tasks.get(task_id).cloned())
1808 }
1809
1810 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
1811 self.task(task_id)
1812 .filter(|task| task.session_id == session_id)
1813 }
1814
1815 fn running_count(&self) -> usize {
1816 self.inner
1817 .tasks
1818 .lock()
1819 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
1820 .unwrap_or(0)
1821 }
1822
1823 fn start_watchdog(&self) {
1824 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
1825 super::watchdog::start(self.clone());
1826 }
1827 }
1828
1829 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
1830 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
1831 }
1832
1833 #[cfg(test)]
1834 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1835 self.task_for_session(task_id, session_id)
1836 .map(|task| task.paths.json.clone())
1837 }
1838
1839 #[cfg(test)]
1840 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1841 self.task_for_session(task_id, session_id)
1842 .map(|task| task.paths.exit.clone())
1843 }
1844
1845 fn generate_unique_task_id(&self) -> Result<String, String> {
1847 for _ in 0..32 {
1848 let candidate = random_slug();
1849 let tasks = self
1850 .inner
1851 .tasks
1852 .lock()
1853 .map_err(|_| "background task registry lock poisoned".to_string())?;
1854 if tasks.contains_key(&candidate) {
1855 continue;
1856 }
1857 let completions = self
1858 .inner
1859 .completions
1860 .lock()
1861 .map_err(|_| "background completions lock poisoned".to_string())?;
1862 if completions
1863 .iter()
1864 .any(|completion| completion.task_id == candidate)
1865 {
1866 continue;
1867 }
1868 return Ok(candidate);
1869 }
1870 Err("failed to allocate unique background task id after 32 attempts".to_string())
1871 }
1872}
1873
1874struct CompletionTokenCounts {
1875 original_tokens: Option<u32>,
1876 compressed_tokens: Option<u32>,
1877 original_bytes: Option<i64>,
1878 compressed_bytes: Option<i64>,
1879 tokens_skipped: bool,
1880}
1881
1882impl CompletionTokenCounts {
1883 fn skipped() -> Self {
1884 Self {
1885 original_tokens: None,
1886 compressed_tokens: None,
1887 original_bytes: None,
1888 compressed_bytes: None,
1889 tokens_skipped: true,
1890 }
1891 }
1892}
1893
1894fn token_count_u32(text: &str) -> u32 {
1895 aft_tokenizer::count_tokens(text)
1896 .try_into()
1897 .unwrap_or(u32::MAX)
1898}
1899
1900impl Default for BgTaskRegistry {
1901 fn default() -> Self {
1902 Self::new(Arc::new(Mutex::new(None)))
1903 }
1904}
1905
1906fn modified_within(path: &Path, grace: Duration) -> bool {
1907 fs::metadata(path)
1908 .and_then(|metadata| metadata.modified())
1909 .ok()
1910 .and_then(|modified| SystemTime::now().duration_since(modified).ok())
1911 .map(|age| age < grace)
1912 .unwrap_or(false)
1913}
1914
1915fn canonicalized_path(path: &Path) -> PathBuf {
1916 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1917}
1918
1919fn started_instant_from_unix_millis(started_at: u64) -> Instant {
1920 let now_ms = SystemTime::now()
1921 .duration_since(UNIX_EPOCH)
1922 .ok()
1923 .map(|duration| duration.as_millis() as u64)
1924 .unwrap_or(started_at);
1925 let elapsed_ms = now_ms.saturating_sub(started_at);
1926 Instant::now()
1927 .checked_sub(Duration::from_millis(elapsed_ms))
1928 .unwrap_or_else(Instant::now)
1929}
1930
1931fn gc_quarantine(storage_dir: &Path) {
1932 let quarantine_root = storage_dir.join("bash-tasks-quarantine");
1933 let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
1934 return;
1935 };
1936 for session_entry in session_dirs.flatten() {
1937 let session_quarantine_dir = session_entry.path();
1938 if !session_quarantine_dir.is_dir() {
1939 continue;
1940 }
1941 let entries = match fs::read_dir(&session_quarantine_dir) {
1942 Ok(entries) => entries,
1943 Err(error) => {
1944 crate::slog_warn!(
1945 "failed to read background task quarantine dir {}: {error}",
1946 session_quarantine_dir.display()
1947 );
1948 continue;
1949 }
1950 };
1951 for entry in entries.flatten() {
1952 let path = entry.path();
1953 if modified_within(&path, QUARANTINE_GC_GRACE) {
1954 continue;
1955 }
1956 let result = if path.is_dir() {
1957 fs::remove_dir_all(&path)
1958 } else {
1959 fs::remove_file(&path)
1960 };
1961 match result {
1962 Ok(()) => log::debug!(
1963 "deleted old background task quarantine entry {}",
1964 path.display()
1965 ),
1966 Err(error) => crate::slog_warn!(
1967 "failed to delete old background task quarantine entry {}: {error}",
1968 path.display()
1969 ),
1970 }
1971 }
1972 let _ = fs::remove_dir(&session_quarantine_dir);
1973 }
1974 let _ = fs::remove_dir(&quarantine_root);
1975}
1976
1977enum QuarantineKind {
1978 Corrupt,
1979 Invalid,
1980}
1981
1982fn quarantine_task_json(
1983 storage_dir: &Path,
1984 session_dir: &Path,
1985 json_path: &Path,
1986 kind: QuarantineKind,
1987) -> Result<(), String> {
1988 let session_hash = session_dir
1989 .file_name()
1990 .and_then(|name| name.to_str())
1991 .ok_or_else(|| {
1992 format!(
1993 "invalid background task session dir: {}",
1994 session_dir.display()
1995 )
1996 })?;
1997 let task_name = json_path
1998 .file_name()
1999 .and_then(|name| name.to_str())
2000 .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
2001 let unix_ts = SystemTime::now()
2002 .duration_since(UNIX_EPOCH)
2003 .map(|duration| duration.as_secs())
2004 .unwrap_or(0);
2005 let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
2006 fs::create_dir_all(&quarantine_dir).map_err(|e| {
2007 format!(
2008 "failed to create background task quarantine dir {}: {e}",
2009 quarantine_dir.display()
2010 )
2011 })?;
2012 let target_name = quarantine_name(task_name, unix_ts, &kind);
2013 let target = quarantine_dir.join(target_name);
2014 fs::rename(json_path, &target).map_err(|e| {
2015 format!(
2016 "failed to quarantine background task metadata {} to {}: {e}",
2017 json_path.display(),
2018 target.display()
2019 )
2020 })?;
2021
2022 for sibling in task_sibling_paths(json_path) {
2023 if !sibling.exists() {
2024 continue;
2025 }
2026 let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
2027 crate::slog_warn!(
2028 "skipping background task sibling with invalid name during quarantine: {}",
2029 sibling.display()
2030 );
2031 continue;
2032 };
2033 let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
2034 if let Err(error) = fs::rename(&sibling, &sibling_target) {
2035 crate::slog_warn!(
2036 "failed to quarantine background task sibling {} to {}: {error}",
2037 sibling.display(),
2038 sibling_target.display()
2039 );
2040 }
2041 }
2042
2043 let _ = fs::remove_dir(session_dir);
2044 Ok(())
2045}
2046
2047fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
2048 match kind {
2049 QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
2050 QuarantineKind::Invalid => {
2051 let path = Path::new(file_name);
2052 let stem = path.file_stem().and_then(|stem| stem.to_str());
2053 let extension = path.extension().and_then(|extension| extension.to_str());
2054 match (stem, extension) {
2055 (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
2056 _ => format!("{file_name}.invalid.{unix_ts}"),
2057 }
2058 }
2059 }
2060}
2061
2062fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
2063 let Some(parent) = json_path.parent() else {
2064 return Vec::new();
2065 };
2066 let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
2067 return Vec::new();
2068 };
2069 ["stdout", "stderr", "exit", "ps1", "bat", "sh"]
2070 .into_iter()
2071 .map(|extension| parent.join(format!("{stem}.{extension}")))
2072 .collect()
2073}
2074
2075fn read_tail_from_disk(paths: &TaskPaths, max_bytes: usize) -> (String, bool) {
2076 let stdout = fs::read(&paths.stdout).unwrap_or_default();
2077 let stderr = fs::read(&paths.stderr).unwrap_or_default();
2078 let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
2079 bytes.extend_from_slice(&stdout);
2080 bytes.extend_from_slice(&stderr);
2081 if bytes.len() <= max_bytes {
2082 return (String::from_utf8_lossy(&bytes).into_owned(), false);
2083 }
2084 let start = bytes.len().saturating_sub(max_bytes);
2085 (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
2086}
2087
2088fn read_for_token_count_from_disk(
2089 paths: &TaskPaths,
2090 max_bytes_per_stream: usize,
2091) -> TokenCountInput {
2092 let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
2099 let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
2100 match (stdout, stderr) {
2101 (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2102 String::from_utf8_lossy(&stdout).as_ref(),
2103 String::from_utf8_lossy(&stderr).as_ref(),
2104 )),
2105 (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
2106 String::from_utf8_lossy(&stdout).as_ref(),
2107 "",
2108 )),
2109 (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2110 "",
2111 String::from_utf8_lossy(&stderr).as_ref(),
2112 )),
2113 (Err(_), Err(_)) => TokenCountInput::Skipped,
2114 }
2115}
2116
2117fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
2122 use std::io::{Read, Seek, SeekFrom};
2123 let mut file = std::fs::File::open(path)?;
2124 let len = file.metadata()?.len();
2125 let read_len = len.min(max_bytes as u64);
2126 if read_len > 0 && len > max_bytes as u64 {
2127 file.seek(SeekFrom::End(-(read_len as i64)))?;
2128 }
2129 let mut bytes = Vec::with_capacity(read_len as usize);
2130 file.read_to_end(&mut bytes)?;
2131 Ok(bytes)
2132}
2133
2134impl BgTask {
2135 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
2136 let state = self
2137 .state
2138 .lock()
2139 .unwrap_or_else(|poison| poison.into_inner());
2140 self.snapshot_locked(&state, preview_bytes)
2141 }
2142
2143 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
2144 let metadata = &state.metadata;
2145 let duration_ms = metadata.duration_ms.or_else(|| {
2146 metadata
2147 .status
2148 .is_terminal()
2149 .then(|| self.started.elapsed().as_millis() as u64)
2150 });
2151 let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
2152 BgTaskSnapshot {
2153 info: BgTaskInfo {
2154 task_id: self.task_id.clone(),
2155 status: metadata.status.clone(),
2156 command: metadata.command.clone(),
2157 started_at: metadata.started_at,
2158 duration_ms,
2159 },
2160 exit_code: metadata.exit_code,
2161 child_pid: metadata.child_pid,
2162 workdir: metadata.workdir.display().to_string(),
2163 output_preview,
2164 output_truncated,
2165 output_path: state
2166 .buffer
2167 .output_path()
2168 .map(|path| path.display().to_string()),
2169 stderr_path: Some(state.buffer.stderr_path().display().to_string()),
2170 }
2171 }
2172
2173 pub(crate) fn is_running(&self) -> bool {
2174 self.state
2175 .lock()
2176 .map(|state| state.metadata.status == BgTaskStatus::Running)
2177 .unwrap_or(false)
2178 }
2179
2180 fn mark_terminal_now(&self) {
2181 if let Ok(mut terminal_at) = self.terminal_at.lock() {
2182 if terminal_at.is_none() {
2183 *terminal_at = Some(Instant::now());
2184 }
2185 }
2186 }
2187
2188 fn set_completion_delivered(
2189 &self,
2190 delivered: bool,
2191 registry: &BgTaskRegistry,
2192 ) -> Result<(), String> {
2193 let mut state = self
2194 .state
2195 .lock()
2196 .map_err(|_| "background task lock poisoned".to_string())?;
2197 let updated = registry
2198 .update_task_metadata(&self.paths, |metadata| {
2199 metadata.completion_delivered = delivered;
2200 })
2201 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
2202 state.metadata = updated;
2203 Ok(())
2204 }
2205}
2206
2207fn terminal_metadata_from_marker(
2208 mut metadata: PersistedTask,
2209 marker: ExitMarker,
2210 reason: Option<String>,
2211) -> PersistedTask {
2212 match marker {
2213 ExitMarker::Code(code) => {
2214 let status = if code == 0 {
2215 BgTaskStatus::Completed
2216 } else {
2217 BgTaskStatus::Failed
2218 };
2219 metadata.mark_terminal(status, Some(code), reason);
2220 }
2221 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
2222 }
2223 metadata
2224}
2225
2226#[cfg(unix)]
2227fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
2228 let shell = resolve_posix_shell();
2229 let mut cmd = Command::new(&shell);
2230 cmd.arg("-c")
2231 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
2232 .arg(&shell)
2233 .arg(command)
2234 .arg(exit_path);
2235 unsafe {
2236 cmd.pre_exec(|| {
2237 if libc::setsid() == -1 {
2238 return Err(std::io::Error::last_os_error());
2239 }
2240 Ok(())
2241 });
2242 }
2243 cmd
2244}
2245
2246#[cfg(unix)]
2247fn resolve_posix_shell() -> PathBuf {
2248 static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
2249 POSIX_SHELL
2250 .get_or_init(|| {
2251 std::env::var_os("BASH")
2252 .filter(|value| !value.is_empty())
2253 .map(PathBuf::from)
2254 .filter(|path| path.exists())
2255 .or_else(|| which::which("bash").ok())
2256 .or_else(|| which::which("zsh").ok())
2257 .unwrap_or_else(|| PathBuf::from("/bin/sh"))
2258 })
2259 .clone()
2260}
2261
2262#[cfg(windows)]
2263fn detached_shell_command_for(
2264 shell: crate::windows_shell::WindowsShell,
2265 command: &str,
2266 exit_path: &Path,
2267 paths: &TaskPaths,
2268 creation_flags: u32,
2269) -> Result<Command, String> {
2270 use crate::windows_shell::WindowsShell;
2271 let wrapper_body = shell.wrapper_script(command, exit_path);
2284 let wrapper_ext = match shell {
2285 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
2286 WindowsShell::Cmd => "bat",
2287 WindowsShell::Posix(_) => "sh",
2291 };
2292 let wrapper_path = paths.dir.join(format!(
2293 "{}.{}",
2294 paths
2295 .json
2296 .file_stem()
2297 .and_then(|s| s.to_str())
2298 .unwrap_or("wrapper"),
2299 wrapper_ext
2300 ));
2301 fs::write(&wrapper_path, wrapper_body)
2302 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
2303
2304 let mut cmd = Command::new(shell.binary().as_ref());
2305 match shell {
2306 WindowsShell::Pwsh | WindowsShell::Powershell => {
2307 cmd.args([
2310 "-NoLogo",
2311 "-NoProfile",
2312 "-NonInteractive",
2313 "-ExecutionPolicy",
2314 "Bypass",
2315 "-File",
2316 ]);
2317 cmd.arg(&wrapper_path);
2318 }
2319 WindowsShell::Cmd => {
2320 cmd.args(["/D", "/C"]);
2327 cmd.arg(&wrapper_path);
2328 }
2329 WindowsShell::Posix(_) => {
2330 cmd.arg(&wrapper_path);
2335 }
2336 }
2337
2338 cmd.creation_flags(creation_flags);
2342 Ok(cmd)
2343}
2344
2345fn spawn_detached_child(
2361 command: &str,
2362 paths: &TaskPaths,
2363 workdir: &Path,
2364 env: &HashMap<String, String>,
2365) -> Result<std::process::Child, String> {
2366 #[cfg(not(windows))]
2367 {
2368 let stdout = create_capture_file(&paths.stdout)
2369 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
2370 let stderr = create_capture_file(&paths.stderr)
2371 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
2372 detached_shell_command(command, &paths.exit)
2373 .current_dir(workdir)
2374 .envs(env)
2375 .stdin(Stdio::null())
2376 .stdout(Stdio::from(stdout))
2377 .stderr(Stdio::from(stderr))
2378 .spawn()
2379 .map_err(|e| format!("failed to spawn background bash command: {e}"))
2380 }
2381 #[cfg(windows)]
2382 {
2383 use crate::windows_shell::shell_candidates;
2384 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
2395 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
2416 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
2417 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
2418 let with_breakaway =
2419 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
2420 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
2421 let mut last_error: Option<String> = None;
2422 for (idx, shell) in candidates.iter().enumerate() {
2423 for &flags in &[with_breakaway, without_breakaway] {
2427 let stdout = create_capture_file(&paths.stdout)
2429 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
2430 let stderr = create_capture_file(&paths.stderr)
2431 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
2432 let mut cmd =
2433 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
2434 cmd.current_dir(workdir)
2435 .envs(env)
2436 .stdin(Stdio::null())
2437 .stdout(Stdio::from(stdout))
2438 .stderr(Stdio::from(stderr));
2439 match cmd.spawn() {
2440 Ok(child) => {
2441 if idx > 0 {
2442 crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
2443 the cached PATH probe disagreed with runtime spawn — likely PATH \
2444 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
2445 shell.binary(),
2446 idx);
2447 }
2448 if flags == without_breakaway {
2449 crate::slog_warn!(
2450 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
2451 (likely a restrictive Job Object — CI sandbox or MDM policy). \
2452 Spawned without breakaway; the bg task will be torn down if the \
2453 AFT process group is killed."
2454 );
2455 }
2456 return Ok(child);
2457 }
2458 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
2459 crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
2460 shell.binary());
2461 last_error = Some(format!("{}: {e}", shell.binary()));
2462 break;
2465 }
2466 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
2467 crate::slog_warn!(
2469 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
2470 Access Denied — retrying {} without breakaway",
2471 shell.binary()
2472 );
2473 last_error = Some(format!("{}: {e}", shell.binary()));
2474 continue;
2475 }
2476 Err(e) => {
2477 return Err(format!(
2478 "failed to spawn background bash command via {}: {e}",
2479 shell.binary()
2480 ));
2481 }
2482 }
2483 }
2484 }
2485 Err(format!(
2486 "failed to spawn background bash command: no Windows shell could be spawned. \
2487 Last error: {}. PATH-probed candidates: {:?}",
2488 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
2489 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
2490 ))
2491 }
2492}
2493
2494fn random_slug() -> String {
2495 let mut bytes = [0u8; 4];
2496 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
2498 let t = SystemTime::now()
2500 .duration_since(UNIX_EPOCH)
2501 .map(|d| d.subsec_nanos())
2502 .unwrap_or(0);
2503 let p = std::process::id();
2504 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
2505 });
2506 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
2508 format!("bash-{hex}")
2509}
2510
2511#[cfg(test)]
2512mod tests {
2513 use std::collections::HashMap;
2514 #[cfg(windows)]
2515 use std::fs;
2516 use std::sync::{Arc, Mutex};
2517 use std::time::Duration;
2518 #[cfg(windows)]
2519 use std::time::Instant;
2520
2521 use super::*;
2522
2523 #[cfg(unix)]
2524 const QUICK_SUCCESS_COMMAND: &str = "true";
2525 #[cfg(windows)]
2526 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
2527
2528 #[cfg(unix)]
2529 const LONG_RUNNING_COMMAND: &str = "sleep 5";
2530 #[cfg(windows)]
2531 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
2532
2533 fn spawn_dead_child() -> std::process::Child {
2538 #[cfg(unix)]
2539 let mut cmd = std::process::Command::new("true");
2540 #[cfg(windows)]
2541 let mut cmd = {
2542 let mut c = std::process::Command::new("cmd");
2543 c.args(["/c", "exit", "0"]);
2544 c
2545 };
2546 cmd.stdin(std::process::Stdio::null());
2547 cmd.stdout(std::process::Stdio::null());
2548 cmd.stderr(std::process::Stdio::null());
2549 let mut child = cmd.spawn().expect("spawn replacement child for reap test");
2550 let started = Instant::now();
2559 loop {
2560 match child.try_wait() {
2561 Ok(Some(_)) => break,
2562 Ok(None) => {
2563 if started.elapsed() > Duration::from_secs(5) {
2564 panic!("dead-child stand-in did not exit within 5s");
2565 }
2566 std::thread::sleep(Duration::from_millis(10));
2567 }
2568 Err(error) => panic!("dead-child try_wait failed: {error}"),
2569 }
2570 }
2571 child
2572 }
2573
2574 #[test]
2575 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
2576 let registry = BgTaskRegistry::default();
2577 let dir = tempfile::tempdir().unwrap();
2578 let task_id = registry
2579 .spawn(
2580 QUICK_SUCCESS_COMMAND,
2581 "session".to_string(),
2582 dir.path().to_path_buf(),
2583 HashMap::new(),
2584 Some(Duration::from_secs(30)),
2585 dir.path().to_path_buf(),
2586 10,
2587 true,
2588 false,
2589 Some(dir.path().to_path_buf()),
2590 )
2591 .unwrap();
2592 registry
2593 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
2594 .unwrap();
2595 let completions = registry.drain_completions_for_session(Some("session"));
2596 assert_eq!(completions.len(), 1);
2597 assert_eq!(
2598 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
2599 vec![task_id.clone()]
2600 );
2601
2602 registry.cleanup_finished(Duration::ZERO);
2603
2604 assert!(registry.inner.tasks.lock().unwrap().is_empty());
2605 }
2606
2607 #[test]
2608 fn cleanup_finished_retains_undelivered_terminals() {
2609 let registry = BgTaskRegistry::default();
2610 let dir = tempfile::tempdir().unwrap();
2611 let task_id = registry
2612 .spawn(
2613 QUICK_SUCCESS_COMMAND,
2614 "session".to_string(),
2615 dir.path().to_path_buf(),
2616 HashMap::new(),
2617 Some(Duration::from_secs(30)),
2618 dir.path().to_path_buf(),
2619 10,
2620 true,
2621 false,
2622 Some(dir.path().to_path_buf()),
2623 )
2624 .unwrap();
2625 registry
2626 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
2627 .unwrap();
2628
2629 registry.cleanup_finished(Duration::ZERO);
2630
2631 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
2632 }
2633
2634 #[test]
2642 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
2643 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2644 let dir = tempfile::tempdir().unwrap();
2645 let task_id = registry
2646 .spawn(
2647 QUICK_SUCCESS_COMMAND,
2648 "session".to_string(),
2649 dir.path().to_path_buf(),
2650 HashMap::new(),
2651 Some(Duration::from_secs(30)),
2652 dir.path().to_path_buf(),
2653 10,
2654 true,
2655 false,
2656 Some(dir.path().to_path_buf()),
2657 )
2658 .unwrap();
2659
2660 let task = registry.task_for_session(&task_id, "session").unwrap();
2661
2662 let started = Instant::now();
2667 loop {
2668 let exited = {
2669 let mut state = task.state.lock().unwrap();
2670 if let Some(child) = state.child.as_mut() {
2671 matches!(child.try_wait(), Ok(Some(_)))
2672 } else {
2673 true
2674 }
2675 };
2676 if exited {
2677 break;
2678 }
2679 assert!(
2680 started.elapsed() < Duration::from_secs(5),
2681 "child should exit quickly"
2682 );
2683 std::thread::sleep(Duration::from_millis(20));
2684 }
2685
2686 registry
2694 .inner
2695 .shutdown
2696 .store(true, std::sync::atomic::Ordering::SeqCst);
2697 std::thread::sleep(Duration::from_millis(550));
2701
2702 let _ = std::fs::remove_file(&task.paths.exit);
2705
2706 {
2721 let mut state = task.state.lock().unwrap();
2722 state.metadata.status = BgTaskStatus::Running;
2723 state.metadata.status_reason = None;
2724 state.metadata.exit_code = None;
2725 state.metadata.finished_at = None;
2726 state.metadata.duration_ms = None;
2727 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
2730 .expect("persist reset Running metadata for reap_child test");
2731 if state.child.is_none() {
2735 state.child = Some(spawn_dead_child());
2736 }
2737 }
2738 *task.terminal_at.lock().unwrap() = None;
2741
2742 assert!(
2745 task.is_running(),
2746 "precondition: metadata.status == Running"
2747 );
2748 assert!(
2749 !task.paths.exit.exists(),
2750 "precondition: exit marker absent"
2751 );
2752
2753 registry.reap_child(&task);
2758
2759 {
2760 let state = task.state.lock().unwrap();
2761 assert_eq!(
2762 state.metadata.status,
2763 BgTaskStatus::Running,
2764 "first reap must leave status Running while waiting one pass for marker"
2765 );
2766 assert_eq!(
2767 state.metadata.status_reason, None,
2768 "first reap must not record a failure reason"
2769 );
2770 assert!(
2771 state.child.is_none(),
2772 "child handle must be released after first reap"
2773 );
2774 assert!(
2775 state.detached,
2776 "task must be marked detached after first reap"
2777 );
2778 }
2779
2780 registry.reap_child(&task);
2784
2785 let state = task.state.lock().unwrap();
2786 assert!(
2787 state.metadata.status.is_terminal(),
2788 "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
2789 state.metadata.status
2790 );
2791 assert_eq!(
2792 state.metadata.status,
2793 BgTaskStatus::Failed,
2794 "must specifically be Failed (not Killed): status={:?}",
2795 state.metadata.status
2796 );
2797 assert_eq!(
2798 state.metadata.status_reason.as_deref(),
2799 Some("process exited without exit marker"),
2800 "reason must match replay path's wording: {:?}",
2801 state.metadata.status_reason
2802 );
2803 assert!(
2804 state.child.is_none(),
2805 "child handle must stay released after second reap"
2806 );
2807 assert!(
2808 state.detached,
2809 "task must remain detached after second reap"
2810 );
2811 }
2812
2813 #[test]
2818 fn reap_child_preserves_running_when_exit_marker_exists() {
2819 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2820 let dir = tempfile::tempdir().unwrap();
2821 let task_id = registry
2822 .spawn(
2823 QUICK_SUCCESS_COMMAND,
2824 "session".to_string(),
2825 dir.path().to_path_buf(),
2826 HashMap::new(),
2827 Some(Duration::from_secs(30)),
2828 dir.path().to_path_buf(),
2829 10,
2830 true,
2831 false,
2832 Some(dir.path().to_path_buf()),
2833 )
2834 .unwrap();
2835
2836 let task = registry.task_for_session(&task_id, "session").unwrap();
2837
2838 let started = Instant::now();
2841 loop {
2842 let exited = {
2843 let mut state = task.state.lock().unwrap();
2844 if let Some(child) = state.child.as_mut() {
2845 matches!(child.try_wait(), Ok(Some(_)))
2846 } else {
2847 true
2848 }
2849 };
2850 if exited && task.paths.exit.exists() {
2851 break;
2852 }
2853 assert!(
2854 started.elapsed() < Duration::from_secs(5),
2855 "child should exit and write marker quickly"
2856 );
2857 std::thread::sleep(Duration::from_millis(20));
2858 }
2859
2860 registry
2866 .inner
2867 .shutdown
2868 .store(true, std::sync::atomic::Ordering::SeqCst);
2869 std::thread::sleep(Duration::from_millis(550));
2870
2871 {
2877 let mut state = task.state.lock().unwrap();
2878 state.metadata.status = BgTaskStatus::Running;
2879 state.metadata.status_reason = None;
2880 if state.child.is_none() {
2881 state.child = Some(spawn_dead_child());
2882 }
2883 }
2884 *task.terminal_at.lock().unwrap() = None;
2885 if !task.paths.exit.exists() {
2888 std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
2889 }
2890
2891 registry.reap_child(&task);
2895
2896 let state = task.state.lock().unwrap();
2897 assert!(
2898 state.child.is_none(),
2899 "child handle still released even when marker exists"
2900 );
2901 assert!(
2902 state.detached,
2903 "task still marked detached even when marker exists"
2904 );
2905 assert_eq!(
2910 state.metadata.status,
2911 BgTaskStatus::Running,
2912 "reap_child must defer to poll_task when marker exists"
2913 );
2914 }
2915
2916 #[test]
2917 fn cleanup_finished_keeps_running_tasks() {
2918 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2919 let dir = tempfile::tempdir().unwrap();
2920 let task_id = registry
2921 .spawn(
2922 LONG_RUNNING_COMMAND,
2923 "session".to_string(),
2924 dir.path().to_path_buf(),
2925 HashMap::new(),
2926 Some(Duration::from_secs(30)),
2927 dir.path().to_path_buf(),
2928 10,
2929 true,
2930 false,
2931 Some(dir.path().to_path_buf()),
2932 )
2933 .unwrap();
2934
2935 registry.cleanup_finished(Duration::ZERO);
2936
2937 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
2938 let _ = registry.kill(&task_id, "session");
2939 }
2940
2941 #[cfg(windows)]
2942 fn wait_for_file(path: &Path) -> String {
2943 let started = Instant::now();
2944 loop {
2945 if path.exists() {
2946 return fs::read_to_string(path).expect("read file");
2947 }
2948 assert!(
2949 started.elapsed() < Duration::from_secs(30),
2950 "timed out waiting for {}",
2951 path.display()
2952 );
2953 std::thread::sleep(Duration::from_millis(100));
2954 }
2955 }
2956
2957 #[cfg(windows)]
2958 fn spawn_windows_registry_command(
2959 command: &str,
2960 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
2961 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2962 let dir = tempfile::tempdir().unwrap();
2963 let task_id = registry
2964 .spawn(
2965 command,
2966 "session".to_string(),
2967 dir.path().to_path_buf(),
2968 HashMap::new(),
2969 Some(Duration::from_secs(30)),
2970 dir.path().to_path_buf(),
2971 10,
2972 false,
2973 false,
2974 Some(dir.path().to_path_buf()),
2975 )
2976 .unwrap();
2977 (registry, dir, task_id)
2978 }
2979
2980 #[cfg(windows)]
2981 #[test]
2982 fn windows_spawn_writes_exit_marker_for_zero_exit() {
2983 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
2984 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2985
2986 let content = wait_for_file(&exit_path);
2987
2988 assert_eq!(content.trim(), "0");
2989 }
2990
2991 #[cfg(windows)]
2992 #[test]
2993 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
2994 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
2995 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2996
2997 let content = wait_for_file(&exit_path);
2998
2999 assert_eq!(content.trim(), "42");
3000 }
3001
3002 #[cfg(windows)]
3003 #[test]
3004 fn windows_spawn_captures_stdout_to_disk() {
3005 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
3006 let task = registry.task_for_session(&task_id, "session").unwrap();
3007 let stdout_path = task.paths.stdout.clone();
3008 let exit_path = task.paths.exit.clone();
3009
3010 let _ = wait_for_file(&exit_path);
3011 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
3012
3013 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
3014 }
3015
3016 #[cfg(windows)]
3017 #[test]
3018 fn windows_spawn_uses_pwsh_when_available() {
3019 let candidates = crate::windows_shell::shell_candidates_with(
3023 |binary| match binary {
3024 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
3025 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
3026 _ => None,
3027 },
3028 || None,
3029 );
3030 let shell = candidates.first().expect("at least one candidate").clone();
3031 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
3032 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
3033 }
3034
3035 #[cfg(windows)]
3042 #[test]
3043 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
3044 let exit_path = Path::new(r"C:\Temp\bash-test.exit");
3045 let script =
3046 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
3047
3048 assert!(
3052 script.contains("set CODE=%ERRORLEVEL%"),
3053 "wrapper must capture exit code into CODE: {script}"
3054 );
3055 assert!(
3056 script.contains("echo %CODE% >"),
3057 "wrapper must echo CODE to a temp marker file: {script}"
3058 );
3059 assert!(
3060 script.contains("move /Y"),
3061 "wrapper must use atomic move to write the marker: {script}"
3062 );
3063 assert!(
3066 script.contains("> nul"),
3067 "wrapper must redirect move output to nul: {script}"
3068 );
3069 assert!(
3071 script.contains("exit /B %CODE%"),
3072 "wrapper must propagate the captured exit code: {script}"
3073 );
3074 assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
3075 assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
3076 }
3077
3078 #[cfg(windows)]
3084 #[test]
3085 fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
3086 use crate::windows_shell::WindowsShell;
3087 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
3088 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3089 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3090 assert_eq!(
3091 args_strs,
3092 vec!["/D", "/S", "/C", "echo wrapped"],
3093 "Cmd::bg_command must prepend /D /S /C"
3094 );
3095 }
3096
3097 #[cfg(windows)]
3101 #[test]
3102 fn windows_shell_pwsh_bg_command_uses_standard_args() {
3103 use crate::windows_shell::WindowsShell;
3104 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
3105 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3106 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3107 assert!(
3108 args_strs.contains(&"-Command"),
3109 "Pwsh::bg_command must use -Command: {args_strs:?}"
3110 );
3111 assert!(
3112 args_strs.contains(&"Get-Date"),
3113 "Pwsh::bg_command must include the user command body"
3114 );
3115 }
3116
3117 #[allow(dead_code)]
3148 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
3150}