1use std::collections::{HashMap, HashSet, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6#[cfg(unix)]
7use std::sync::OnceLock;
8use std::sync::{Arc, Mutex, RwLock};
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10
11use rusqlite::Connection;
12use serde::Serialize;
13
14use crate::context::SharedProgressSender;
15use crate::harness::Harness;
16use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, PushFrame};
17
18#[cfg(unix)]
19use std::os::unix::process::CommandExt;
20#[cfg(windows)]
21use std::os::windows::process::CommandExt;
22
23use super::buffer::{combine_streams, BgBuffer, TokenCountInput};
24use super::persistence::{
25 create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
26 task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, ExitMarker,
27 PersistedTask, TaskPaths,
28};
29use super::process::is_process_alive;
30#[cfg(unix)]
31use super::process::terminate_pgid;
32#[cfg(windows)]
33use super::process::terminate_pid;
34use super::{BgTaskInfo, BgTaskStatus};
35const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
43const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
44const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
45const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
46
47const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
54const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
55
56#[derive(Debug, Clone, Serialize)]
57pub struct BgCompletion {
58 pub task_id: String,
59 #[serde(skip_serializing)]
62 pub session_id: String,
63 pub status: BgTaskStatus,
64 pub exit_code: Option<i32>,
65 pub command: String,
66 #[serde(default, skip_serializing_if = "String::is_empty")]
72 pub output_preview: String,
73 #[serde(default, skip_serializing_if = "is_false")]
78 pub output_truncated: bool,
79 #[serde(default, skip_serializing_if = "Option::is_none")]
82 pub original_tokens: Option<u32>,
83 #[serde(default, skip_serializing_if = "Option::is_none")]
86 pub compressed_tokens: Option<u32>,
87 #[serde(default, skip_serializing_if = "is_false")]
89 pub tokens_skipped: bool,
90}
91
92fn is_false(v: &bool) -> bool {
93 !*v
94}
95
96#[derive(Debug, Clone, Serialize)]
97pub struct BgTaskSnapshot {
98 #[serde(flatten)]
99 pub info: BgTaskInfo,
100 pub exit_code: Option<i32>,
101 pub child_pid: Option<u32>,
102 pub workdir: String,
103 pub output_preview: String,
104 pub output_truncated: bool,
105 pub output_path: Option<String>,
106 pub stderr_path: Option<String>,
107}
108
109#[derive(Clone)]
110pub struct BgTaskRegistry {
111 pub(crate) inner: Arc<RegistryInner>,
112}
113
114pub(crate) struct RegistryInner {
115 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
116 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
117 pub(crate) progress_sender: SharedProgressSender,
118 watchdog_started: AtomicBool,
119 pub(crate) shutdown: AtomicBool,
120 pub(crate) long_running_reminder_enabled: AtomicBool,
121 pub(crate) long_running_reminder_interval_ms: AtomicU64,
122 persisted_gc_started: AtomicBool,
123 #[cfg(test)]
124 persisted_gc_runs: AtomicU64,
125 pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
131 pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
132 pub(crate) db_harness: RwLock<Option<String>>,
133}
134
135pub(crate) struct BgTask {
136 pub(crate) task_id: String,
137 pub(crate) session_id: String,
138 pub(crate) paths: TaskPaths,
139 pub(crate) started: Instant,
140 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
141 pub(crate) terminal_at: Mutex<Option<Instant>>,
142 pub(crate) state: Mutex<BgTaskState>,
143}
144
145pub(crate) struct BgTaskState {
146 pub(crate) metadata: PersistedTask,
147 pub(crate) child: Option<Child>,
148 pub(crate) detached: bool,
149 pub(crate) buffer: BgBuffer,
150}
151
152impl BgTaskRegistry {
153 pub fn new(progress_sender: SharedProgressSender) -> Self {
154 Self {
155 inner: Arc::new(RegistryInner {
156 tasks: Mutex::new(HashMap::new()),
157 completions: Mutex::new(VecDeque::new()),
158 progress_sender,
159 watchdog_started: AtomicBool::new(false),
160 shutdown: AtomicBool::new(false),
161 long_running_reminder_enabled: AtomicBool::new(true),
162 long_running_reminder_interval_ms: AtomicU64::new(600_000),
163 persisted_gc_started: AtomicBool::new(false),
164 #[cfg(test)]
165 persisted_gc_runs: AtomicU64::new(0),
166 compressor: Mutex::new(None),
167 db_pool: RwLock::new(None),
168 db_harness: RwLock::new(None),
169 }),
170 }
171 }
172
173 pub fn set_harness(&self, harness: Harness) {
174 if let Ok(mut slot) = self.inner.db_harness.write() {
175 *slot = Some(harness.as_str().to_string());
176 }
177 }
178
179 pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
180 if let Ok(mut slot) = self.inner.db_pool.write() {
181 *slot = Some(conn);
182 }
183 }
184
185 pub fn clear_db_pool(&self) {
186 if let Ok(mut slot) = self.inner.db_pool.write() {
187 *slot = None;
188 }
189 }
190
191 pub fn set_compressor<F>(&self, compressor: F)
196 where
197 F: Fn(&str, String) -> String + Send + Sync + 'static,
198 {
199 if let Ok(mut slot) = self.inner.compressor.lock() {
200 *slot = Some(Box::new(compressor));
201 }
202 }
203
204 pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
207 let Ok(slot) = self.inner.compressor.lock() else {
208 return output;
209 };
210 match slot.as_ref() {
211 Some(compressor) => compressor(command, output),
212 None => output,
213 }
214 }
215
216 fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
217 write_task(&paths.json, metadata)?;
218 self.dual_write_task(paths, metadata);
219 Ok(())
220 }
221
222 fn update_task_metadata<F>(
223 &self,
224 paths: &TaskPaths,
225 update: F,
226 ) -> std::io::Result<PersistedTask>
227 where
228 F: FnOnce(&mut PersistedTask),
229 {
230 let metadata = update_task(&paths.json, update)?;
231 self.dual_write_task(paths, &metadata);
232 Ok(metadata)
233 }
234
235 fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
236 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
237 let Some(pool) = pool else {
238 return;
239 };
240 let harness = self
241 .inner
242 .db_harness
243 .read()
244 .ok()
245 .and_then(|slot| slot.clone());
246 let Some(harness) = harness else {
247 crate::slog_warn!(
248 "dual-write bash_task to DB skipped for {}: harness not configured",
249 metadata.task_id
250 );
251 return;
252 };
253 let row = match metadata.to_bash_task_row(&harness, paths) {
254 Ok(row) => row,
255 Err(error) => {
256 crate::slog_warn!(
257 "dual-write bash_task to DB failed for {}: {}",
258 metadata.task_id,
259 error
260 );
261 return;
262 }
263 };
264 let conn = match pool.lock() {
265 Ok(conn) => conn,
266 Err(_) => {
267 crate::slog_warn!(
268 "dual-write bash_task to DB failed for {}: db mutex poisoned",
269 metadata.task_id
270 );
271 return;
272 }
273 };
274 if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
275 crate::slog_warn!(
276 "dual-write bash_task to DB failed for {}: {}",
277 metadata.task_id,
278 error
279 );
280 }
281 }
282
283 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
284 self.inner
285 .long_running_reminder_enabled
286 .store(enabled, Ordering::SeqCst);
287 self.inner
288 .long_running_reminder_interval_ms
289 .store(interval_ms, Ordering::SeqCst);
290 }
291
292 #[cfg(unix)]
293 #[allow(clippy::too_many_arguments)]
294 pub fn spawn(
295 &self,
296 command: &str,
297 session_id: String,
298 workdir: PathBuf,
299 env: HashMap<String, String>,
300 timeout: Option<Duration>,
301 storage_dir: PathBuf,
302 max_running: usize,
303 notify_on_completion: bool,
304 compressed: bool,
305 project_root: Option<PathBuf>,
306 ) -> Result<String, String> {
307 self.start_watchdog();
308
309 let running = self.running_count();
310 if running >= max_running {
311 return Err(format!(
312 "background bash task limit exceeded: {running} running (max {max_running})"
313 ));
314 }
315
316 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
317 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
318 let task_id = self.generate_unique_task_id()?;
319 let paths = task_paths(&storage_dir, &session_id, &task_id);
320 fs::create_dir_all(&paths.dir)
321 .map_err(|e| format!("failed to create background task dir: {e}"))?;
322
323 let mut metadata = PersistedTask::starting(
324 task_id.clone(),
325 session_id.clone(),
326 command.to_string(),
327 workdir.clone(),
328 project_root,
329 timeout_ms,
330 notify_on_completion,
331 compressed,
332 );
333 self.persist_task(&paths, &metadata)
334 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
335
336 create_capture_file(&paths.stdout)
340 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
341 create_capture_file(&paths.stderr)
342 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
343
344 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
345 Ok(child) => child,
346 Err(error) => {
347 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
348 let _ = delete_task_bundle(&paths);
349 return Err(error);
350 }
351 };
352
353 let child_pid = child.id();
354 metadata.mark_running(child_pid, child_pid as i32);
355 self.persist_task(&paths, &metadata)
356 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
357
358 let task = Arc::new(BgTask {
359 task_id: task_id.clone(),
360 session_id,
361 paths: paths.clone(),
362 started: Instant::now(),
363 last_reminder_at: Mutex::new(None),
364 terminal_at: Mutex::new(None),
365 state: Mutex::new(BgTaskState {
366 metadata,
367 child: Some(child),
368 detached: false,
369 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
370 }),
371 });
372
373 self.inner
374 .tasks
375 .lock()
376 .map_err(|_| "background task registry lock poisoned".to_string())?
377 .insert(task_id.clone(), task);
378
379 Ok(task_id)
380 }
381
382 #[cfg(windows)]
383 #[allow(clippy::too_many_arguments)]
384 pub fn spawn(
385 &self,
386 command: &str,
387 session_id: String,
388 workdir: PathBuf,
389 env: HashMap<String, String>,
390 timeout: Option<Duration>,
391 storage_dir: PathBuf,
392 max_running: usize,
393 notify_on_completion: bool,
394 compressed: bool,
395 project_root: Option<PathBuf>,
396 ) -> Result<String, String> {
397 self.start_watchdog();
398
399 let running = self.running_count();
400 if running >= max_running {
401 return Err(format!(
402 "background bash task limit exceeded: {running} running (max {max_running})"
403 ));
404 }
405
406 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
407 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
408 let task_id = self.generate_unique_task_id()?;
409 let paths = task_paths(&storage_dir, &session_id, &task_id);
410 fs::create_dir_all(&paths.dir)
411 .map_err(|e| format!("failed to create background task dir: {e}"))?;
412
413 let mut metadata = PersistedTask::starting(
414 task_id.clone(),
415 session_id.clone(),
416 command.to_string(),
417 workdir.clone(),
418 project_root,
419 timeout_ms,
420 notify_on_completion,
421 compressed,
422 );
423 self.persist_task(&paths, &metadata)
424 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
425
426 create_capture_file(&paths.stdout)
432 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
433 create_capture_file(&paths.stderr)
434 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
435
436 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
437 Ok(child) => child,
438 Err(error) => {
439 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
440 let _ = delete_task_bundle(&paths);
441 return Err(error);
442 }
443 };
444
445 let child_pid = child.id();
446 metadata.status = BgTaskStatus::Running;
447 metadata.child_pid = Some(child_pid);
448 metadata.pgid = None;
449 self.persist_task(&paths, &metadata)
450 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
451
452 let task = Arc::new(BgTask {
453 task_id: task_id.clone(),
454 session_id,
455 paths: paths.clone(),
456 started: Instant::now(),
457 last_reminder_at: Mutex::new(None),
458 terminal_at: Mutex::new(None),
459 state: Mutex::new(BgTaskState {
460 metadata,
461 child: Some(child),
462 detached: false,
463 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
464 }),
465 });
466
467 self.inner
468 .tasks
469 .lock()
470 .map_err(|_| "background task registry lock poisoned".to_string())?
471 .insert(task_id.clone(), task);
472
473 Ok(task_id)
474 }
475
476 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
477 self.replay_session_inner(storage_dir, session_id, None)
478 }
479
480 pub fn replay_session_for_project(
481 &self,
482 storage_dir: &Path,
483 session_id: &str,
484 project_root: &Path,
485 ) -> Result<(), String> {
486 self.replay_session_inner(storage_dir, session_id, Some(project_root))
487 }
488
489 fn replay_session_inner(
490 &self,
491 storage_dir: &Path,
492 session_id: &str,
493 project_root: Option<&Path>,
494 ) -> Result<(), String> {
495 self.start_watchdog();
496 if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
497 if let Err(error) = self.maybe_gc_persisted(storage_dir) {
498 crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
499 }
500 }
501
502 let canonical_project = project_root.map(canonicalized_path);
503 let tasks = match self.replay_session_from_db(session_id) {
515 Some(Ok(tasks)) if !tasks.is_empty() => tasks,
516 Some(Ok(_)) => {
517 let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
518 if !disk_tasks.is_empty() {
519 crate::slog_info!(
520 "bash task replay: 0 in DB for session {}, {} from disk fallback",
521 session_id,
522 disk_tasks.len()
523 );
524 }
525 disk_tasks
526 }
527 Some(Err(error)) => {
528 crate::slog_warn!(
529 "bash task replay DB lookup failed for session {}; falling back to disk: {}",
530 session_id,
531 error
532 );
533 self.replay_session_from_disk(storage_dir, session_id)?
534 }
535 None => {
536 self.replay_session_from_disk(storage_dir, session_id)?
538 }
539 };
540
541 for mut metadata in tasks {
542 if metadata.session_id != session_id {
543 continue;
544 }
545 if let Some(canonical_project) = canonical_project.as_deref() {
546 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
547 if metadata_project.as_deref() != Some(canonical_project) {
548 continue;
549 }
550 }
551
552 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
553 match metadata.status {
554 BgTaskStatus::Starting => {
555 metadata.mark_terminal(
556 BgTaskStatus::Failed,
557 None,
558 Some("spawn aborted".to_string()),
559 );
560 let _ = self.persist_task(&paths, &metadata);
561 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
562 self.insert_rehydrated_task(metadata, paths, true)?;
563 }
564 BgTaskStatus::Running | BgTaskStatus::Killing => {
565 if self.running_metadata_is_stale(&metadata) {
566 metadata.mark_terminal(
567 BgTaskStatus::Killed,
568 None,
569 Some("orphaned (>24h)".to_string()),
570 );
571 if !paths.exit.exists() {
572 let _ = write_kill_marker_if_absent(&paths.exit);
573 }
574 let _ = self.persist_task(&paths, &metadata);
575 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
576 self.insert_rehydrated_task(metadata, paths, true)?;
577 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
578 let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
579 "recovered from inconsistent killing state on replay".to_string()
580 });
581 if reason.is_some() {
582 crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
583 metadata.task_id);
584 }
585 metadata = terminal_metadata_from_marker(metadata, marker, reason);
586 let _ = self.persist_task(&paths, &metadata);
587 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
588 self.insert_rehydrated_task(metadata, paths, true)?;
589 } else if metadata.status == BgTaskStatus::Killing {
590 if !paths.exit.exists() {
591 let _ = write_kill_marker_if_absent(&paths.exit);
592 }
593 metadata.mark_terminal(
594 BgTaskStatus::Killed,
595 None,
596 Some("recovered from inconsistent killing state on replay".to_string()),
597 );
598 let _ = self.persist_task(&paths, &metadata);
599 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
600 self.insert_rehydrated_task(metadata, paths, true)?;
601 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
602 metadata.mark_terminal(
603 BgTaskStatus::Failed,
604 None,
605 Some("process exited without exit marker".to_string()),
606 );
607 let _ = self.persist_task(&paths, &metadata);
608 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
609 self.insert_rehydrated_task(metadata, paths, true)?;
610 } else {
611 self.insert_rehydrated_task(metadata, paths, true)?;
612 }
613 }
614 _ if metadata.status.is_terminal() => {
615 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
621 self.insert_rehydrated_task(metadata, paths, true)?;
622 }
623 _ => {}
624 }
625 }
626
627 Ok(())
628 }
629
630 fn replay_session_from_db(
631 &self,
632 session_id: &str,
633 ) -> Option<Result<Vec<PersistedTask>, String>> {
634 let pool = self
635 .inner
636 .db_pool
637 .read()
638 .ok()
639 .and_then(|slot| slot.clone())?;
640 let harness = self
641 .inner
642 .db_harness
643 .read()
644 .ok()
645 .and_then(|slot| slot.clone())?;
646 let conn = match pool.lock() {
647 Ok(conn) => conn,
648 Err(_) => return Some(Err("db mutex poisoned".to_string())),
649 };
650 Some(
651 crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
652 .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
653 .map_err(|error| error.to_string()),
654 )
655 }
656
657 fn replay_session_from_disk(
658 &self,
659 storage_dir: &Path,
660 session_id: &str,
661 ) -> Result<Vec<PersistedTask>, String> {
662 let dir = session_tasks_dir(storage_dir, session_id);
663 if !dir.exists() {
664 return Ok(Vec::new());
665 }
666
667 let entries = fs::read_dir(&dir)
668 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
669 let mut tasks = Vec::new();
670 for entry in entries.flatten() {
671 let path = entry.path();
672 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
673 continue;
674 }
675 match read_task(&path) {
676 Ok(metadata) => tasks.push(metadata),
677 Err(error) => {
678 crate::slog_warn!(
679 "quarantining invalid background task metadata {} during replay: {error}",
680 path.display()
681 );
682 if let Err(quarantine_error) =
683 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
684 {
685 crate::slog_warn!(
686 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
687 path.display()
688 );
689 }
690 }
691 }
692 }
693 Ok(tasks)
694 }
695
696 pub fn status(
697 &self,
698 task_id: &str,
699 session_id: &str,
700 project_root: Option<&Path>,
701 storage_dir: Option<&Path>,
702 preview_bytes: usize,
703 ) -> Option<BgTaskSnapshot> {
704 let mut task = self.task_for_session(task_id, session_id);
705 if task.is_none() {
706 if let Some(storage_dir) = storage_dir {
707 let _ = self.replay_session(storage_dir, session_id);
708 task = self.task_for_session(task_id, session_id);
709 }
710 }
711 let Some(task) = task else {
712 return self.status_relaxed(
713 task_id,
714 session_id,
715 project_root?,
716 storage_dir?,
717 preview_bytes,
718 );
719 };
720 let _ = self.poll_task(&task);
721 let mut snapshot = task.snapshot(preview_bytes);
722 self.maybe_compress_snapshot(&task, &mut snapshot);
723 Some(snapshot)
724 }
725
726 fn status_relaxed_task(
727 &self,
728 task_id: &str,
729 project_root: &Path,
730 storage_dir: &Path,
731 ) -> Option<Arc<BgTask>> {
732 let canonical_project = canonicalized_path(project_root);
733 match self.lookup_relaxed_task_from_db(task_id, project_root) {
734 Some(Ok(Some(metadata))) => {
735 if let Some(task) = self.task(task_id) {
736 let matches_project = task
737 .state
738 .lock()
739 .map(|state| {
740 state
741 .metadata
742 .project_root
743 .as_deref()
744 .map(canonicalized_path)
745 .as_deref()
746 == Some(canonical_project.as_path())
747 })
748 .unwrap_or(false);
749 return matches_project.then_some(task);
750 }
751 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
752 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
753 return None;
754 }
755 return self.task(task_id);
756 }
757 Some(Ok(None)) => {
758 crate::slog_info!(
759 "bash task relaxed DB miss for {}; falling back to disk",
760 task_id
761 );
762 }
763 Some(Err(error)) => {
764 crate::slog_warn!(
765 "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
766 task_id,
767 error
768 );
769 }
770 None => {
771 crate::slog_info!(
772 "bash task relaxed DB unavailable for {}; falling back to disk",
773 task_id
774 );
775 }
776 }
777 let root = storage_dir.join("bash-tasks");
778 let entries = fs::read_dir(&root).ok()?;
779 for entry in entries.flatten() {
780 let dir = entry.path();
781 if !dir.is_dir() {
782 continue;
783 }
784 let path = dir.join(format!("{task_id}.json"));
785 if !path.exists() {
786 continue;
787 }
788 let metadata = match read_task(&path) {
789 Ok(metadata) => metadata,
790 Err(error) => {
791 crate::slog_warn!(
792 "quarantining invalid background task metadata {} during relaxed lookup: {error}",
793 path.display()
794 );
795 if let Err(quarantine_error) =
796 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
797 {
798 crate::slog_warn!(
799 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
800 path.display()
801 );
802 }
803 continue;
804 }
805 };
806 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
807 if metadata_project.as_deref() != Some(canonical_project.as_path()) {
808 continue;
809 }
810 if let Some(task) = self.task(task_id) {
811 let matches_project = task
812 .state
813 .lock()
814 .map(|state| {
815 state
816 .metadata
817 .project_root
818 .as_deref()
819 .map(canonicalized_path)
820 .as_deref()
821 == Some(canonical_project.as_path())
822 })
823 .unwrap_or(false);
824 return matches_project.then_some(task);
825 }
826 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
827 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
828 return None;
829 }
830 return self.task(task_id);
831 }
832 None
833 }
834
835 fn lookup_relaxed_task_from_db(
836 &self,
837 task_id: &str,
838 project_root: &Path,
839 ) -> Option<Result<Option<PersistedTask>, String>> {
840 let pool = self
841 .inner
842 .db_pool
843 .read()
844 .ok()
845 .and_then(|slot| slot.clone())?;
846 let harness = self
847 .inner
848 .db_harness
849 .read()
850 .ok()
851 .and_then(|slot| slot.clone())?;
852 let conn = match pool.lock() {
853 Ok(conn) => conn,
854 Err(_) => return Some(Err("db mutex poisoned".to_string())),
855 };
856 let project_key = crate::search_index::project_cache_key(project_root);
857 Some(
858 crate::db::bash_tasks::find_bash_task_for_project(
859 &conn,
860 &harness,
861 &project_key,
862 task_id,
863 )
864 .map(|row| row.map(PersistedTask::from))
865 .map_err(|error| error.to_string()),
866 )
867 }
868
869 pub(super) fn status_relaxed(
870 &self,
871 task_id: &str,
872 _session_id: &str,
873 project_root: &Path,
874 storage_dir: &Path,
875 preview_bytes: usize,
876 ) -> Option<BgTaskSnapshot> {
877 let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
878 let _ = self.poll_task(&task);
879 let mut snapshot = task.snapshot(preview_bytes);
880 self.maybe_compress_snapshot(&task, &mut snapshot);
881 Some(snapshot)
882 }
883
884 pub fn kill_relaxed(
885 &self,
886 task_id: &str,
887 project_root: &Path,
888 storage_dir: &Path,
889 ) -> Result<BgTaskSnapshot, String> {
890 let task = self
891 .status_relaxed_task(task_id, project_root, storage_dir)
892 .ok_or_else(|| format!("background task not found: {task_id}"))?;
893 self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
894 }
895
896 pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
897 #[cfg(test)]
898 self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
899
900 let mut deleted = 0usize;
901
902 let root = storage_dir.join("bash-tasks");
903 if root.exists() {
904 let session_dirs = fs::read_dir(&root).map_err(|e| {
905 format!(
906 "failed to read background task root {}: {e}",
907 root.display()
908 )
909 })?;
910 for session_entry in session_dirs.flatten() {
911 let session_dir = session_entry.path();
912 if !session_dir.is_dir() {
913 continue;
914 }
915 let task_entries = match fs::read_dir(&session_dir) {
916 Ok(entries) => entries,
917 Err(error) => {
918 crate::slog_warn!(
919 "failed to read background task session dir {}: {error}",
920 session_dir.display()
921 );
922 continue;
923 }
924 };
925 for task_entry in task_entries.flatten() {
926 let json_path = task_entry.path();
927 if json_path
928 .extension()
929 .and_then(|extension| extension.to_str())
930 != Some("json")
931 {
932 continue;
933 }
934 if modified_within(&json_path, PERSISTED_GC_GRACE) {
935 continue;
936 }
937 let metadata = match read_task(&json_path) {
938 Ok(metadata) => metadata,
939 Err(error) => {
940 crate::slog_warn!(
941 "quarantining corrupt background task metadata {}: {error}",
942 json_path.display()
943 );
944 quarantine_task_json(
945 storage_dir,
946 &session_dir,
947 &json_path,
948 QuarantineKind::Corrupt,
949 )?;
950 continue;
951 }
952 };
953 if !(metadata.status.is_terminal() && metadata.completion_delivered) {
954 continue;
955 }
956 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
957 match delete_task_bundle(&paths) {
958 Ok(()) => {
959 deleted += 1;
960 log::debug!(
961 "deleted persisted background task bundle {}",
962 metadata.task_id
963 );
964 }
965 Err(error) => {
966 crate::slog_warn!(
967 "failed to delete background task bundle {}: {error}",
968 metadata.task_id
969 );
970 continue;
971 }
972 }
973 }
974 }
975 }
976 gc_quarantine(storage_dir);
977 Ok(deleted)
978 }
979
980 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
981 let tasks = self
982 .inner
983 .tasks
984 .lock()
985 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
986 .unwrap_or_default();
987 tasks
988 .into_iter()
989 .map(|task| {
990 let _ = self.poll_task(&task);
991 let mut snapshot = task.snapshot(preview_bytes);
992 self.maybe_compress_snapshot(&task, &mut snapshot);
993 snapshot
994 })
995 .collect()
996 }
997
998 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1004 if !snapshot.info.status.is_terminal() {
1005 return;
1006 }
1007 let compressed_flag = task
1008 .state
1009 .lock()
1010 .map(|state| state.metadata.compressed)
1011 .unwrap_or(true);
1012 if !compressed_flag {
1013 return;
1014 }
1015 let raw = std::mem::take(&mut snapshot.output_preview);
1016 snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
1017 }
1018
1019 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1020 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1021 }
1022
1023 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1024 let task = self
1025 .task_for_session(task_id, session_id)
1026 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1027 let mut state = task
1028 .state
1029 .lock()
1030 .map_err(|_| "background task lock poisoned".to_string())?;
1031 let updated = self
1032 .update_task_metadata(&task.paths, |metadata| {
1033 metadata.notify_on_completion = true;
1034 metadata.completion_delivered = false;
1035 })
1036 .map_err(|e| format!("failed to promote background task: {e}"))?;
1037 state.metadata = updated;
1038 if state.metadata.status.is_terminal() {
1039 state.buffer.enforce_terminal_cap();
1040 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1041 }
1042 Ok(true)
1043 }
1044
1045 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1046 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1047 .map(|_| ())
1048 }
1049
1050 pub fn cleanup_finished(&self, older_than: Duration) {
1051 let cutoff = Instant::now().checked_sub(older_than);
1052 let removable_paths: Vec<(String, TaskPaths)> =
1053 if let Ok(mut tasks) = self.inner.tasks.lock() {
1054 let removable = tasks
1055 .iter()
1056 .filter_map(|(task_id, task)| {
1057 let delivered_terminal = task
1058 .state
1059 .lock()
1060 .map(|state| {
1061 state.metadata.status.is_terminal()
1062 && state.metadata.completion_delivered
1063 })
1064 .unwrap_or(false);
1065 if !delivered_terminal {
1066 return None;
1067 }
1068
1069 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1070 let expired = match (terminal_at, cutoff) {
1071 (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1072 (Some(_), None) => true,
1073 (None, _) => false,
1074 };
1075 expired.then(|| task_id.clone())
1076 })
1077 .collect::<Vec<_>>();
1078
1079 removable
1080 .into_iter()
1081 .filter_map(|task_id| {
1082 tasks
1083 .remove(&task_id)
1084 .map(|task| (task_id, task.paths.clone()))
1085 })
1086 .collect()
1087 } else {
1088 Vec::new()
1089 };
1090
1091 for (task_id, paths) in removable_paths {
1092 match delete_task_bundle(&paths) {
1093 Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1094 Err(error) => crate::slog_warn!(
1095 "failed to delete persisted background task bundle {task_id}: {error}"
1096 ),
1097 }
1098 }
1099 }
1100
1101 pub fn drain_completions(&self) -> Vec<BgCompletion> {
1102 self.drain_completions_for_session(None)
1103 }
1104
1105 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1106 let completions = match self.inner.completions.lock() {
1107 Ok(completions) => completions,
1108 Err(_) => return Vec::new(),
1109 };
1110
1111 completions
1112 .iter()
1113 .filter(|completion| {
1114 session_id
1115 .map(|session_id| completion.session_id == session_id)
1116 .unwrap_or(true)
1117 })
1118 .cloned()
1119 .collect()
1120 }
1121
1122 pub fn ack_completions_for_session(
1123 &self,
1124 session_id: Option<&str>,
1125 task_ids: &[String],
1126 ) -> Vec<String> {
1127 if task_ids.is_empty() {
1128 return Vec::new();
1129 }
1130 let task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1131 let mut completions = match self.inner.completions.lock() {
1132 Ok(completions) => completions,
1133 Err(_) => return Vec::new(),
1134 };
1135 let mut acked = Vec::new();
1136 completions.retain(|completion| {
1137 let session_matches = session_id
1138 .map(|session_id| completion.session_id == session_id)
1139 .unwrap_or(true);
1140 if session_matches && task_ids.contains(completion.task_id.as_str()) {
1141 acked.push((completion.task_id.clone(), completion.session_id.clone()));
1142 false
1143 } else {
1144 true
1145 }
1146 });
1147 drop(completions);
1148
1149 let mut delivered = Vec::new();
1150 for (task_id, completion_session_id) in acked {
1151 if let Some(task) = self.task_for_session(&task_id, &completion_session_id) {
1152 if task.set_completion_delivered(true, self).is_ok() {
1153 delivered.push(task_id);
1154 }
1155 }
1156 }
1157
1158 delivered
1159 }
1160
1161 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1162 self.inner
1163 .completions
1164 .lock()
1165 .map(|completions| {
1166 completions
1167 .iter()
1168 .filter(|completion| completion.session_id == session_id)
1169 .cloned()
1170 .collect()
1171 })
1172 .unwrap_or_default()
1173 }
1174
1175 pub fn detach(&self) {
1176 self.inner.shutdown.store(true, Ordering::SeqCst);
1177 if let Ok(mut tasks) = self.inner.tasks.lock() {
1178 for task in tasks.values() {
1179 if let Ok(mut state) = task.state.lock() {
1180 state.child = None;
1181 state.detached = true;
1182 }
1183 }
1184 tasks.clear();
1185 }
1186 }
1187
1188 pub fn shutdown(&self) {
1189 let tasks = self
1190 .inner
1191 .tasks
1192 .lock()
1193 .map(|tasks| {
1194 tasks
1195 .values()
1196 .map(|task| (task.task_id.clone(), task.session_id.clone()))
1197 .collect::<Vec<_>>()
1198 })
1199 .unwrap_or_default();
1200 for (task_id, session_id) in tasks {
1201 let _ = self.kill(&task_id, &session_id);
1202 }
1203 }
1204
1205 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1206 let marker = match read_exit_marker(&task.paths.exit) {
1207 Ok(Some(marker)) => marker,
1208 Ok(None) => return Ok(()),
1209 Err(error) => return Err(format!("failed to read exit marker: {error}")),
1210 };
1211 self.finalize_from_marker(task, marker, None)
1212 }
1213
1214 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1215 let Ok(mut state) = task.state.lock() else {
1216 return;
1217 };
1218 if let Some(child) = state.child.as_mut() {
1219 if matches!(child.try_wait(), Ok(Some(_))) {
1220 state.child = None;
1237 state.detached = true;
1238 self.fail_without_exit_marker_if_needed(task, &mut state);
1239 }
1240 } else if state.detached
1241 && state
1242 .metadata
1243 .child_pid
1244 .is_some_and(|pid| !is_process_alive(pid))
1245 {
1246 self.fail_without_exit_marker_if_needed(task, &mut state);
1247 }
1248 }
1249
1250 fn fail_without_exit_marker_if_needed(&self, task: &Arc<BgTask>, state: &mut BgTaskState) {
1251 if state.metadata.status.is_terminal() {
1252 return;
1253 }
1254 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1255 return;
1256 }
1257 let updated = self.update_task_metadata(&task.paths, |metadata| {
1258 metadata.mark_terminal(
1259 BgTaskStatus::Failed,
1260 None,
1261 Some("process exited without exit marker".to_string()),
1262 );
1263 });
1264 if let Ok(metadata) = updated {
1265 state.metadata = metadata;
1266 task.mark_terminal_now();
1267 state.buffer.enforce_terminal_cap();
1268 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1269 }
1270 }
1271
1272 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1273 self.inner
1274 .tasks
1275 .lock()
1276 .map(|tasks| {
1277 tasks
1278 .values()
1279 .filter(|task| task.is_running())
1280 .cloned()
1281 .collect()
1282 })
1283 .unwrap_or_default()
1284 }
1285
1286 fn insert_rehydrated_task(
1287 &self,
1288 metadata: PersistedTask,
1289 paths: TaskPaths,
1290 detached: bool,
1291 ) -> Result<(), String> {
1292 let task_id = metadata.task_id.clone();
1293 let session_id = metadata.session_id.clone();
1294 let started = started_instant_from_unix_millis(metadata.started_at);
1295 let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1296 let task = Arc::new(BgTask {
1297 task_id: task_id.clone(),
1298 session_id,
1299 paths: paths.clone(),
1300 started,
1301 last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1302 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1303 state: Mutex::new(BgTaskState {
1304 metadata,
1305 child: None,
1306 detached,
1307 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
1308 }),
1309 });
1310 self.inner
1311 .tasks
1312 .lock()
1313 .map_err(|_| "background task registry lock poisoned".to_string())?
1314 .insert(task_id, task);
1315 Ok(())
1316 }
1317
1318 fn kill_with_status(
1319 &self,
1320 task_id: &str,
1321 session_id: &str,
1322 terminal_status: BgTaskStatus,
1323 ) -> Result<BgTaskSnapshot, String> {
1324 let task = self
1325 .task_for_session(task_id, session_id)
1326 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1327
1328 {
1329 let mut state = task
1330 .state
1331 .lock()
1332 .map_err(|_| "background task lock poisoned".to_string())?;
1333 if state.metadata.status.is_terminal() {
1334 return Ok(task.snapshot_locked(&state, 5 * 1024));
1335 }
1336
1337 if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1338 state.metadata =
1339 terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1340 task.mark_terminal_now();
1341 state.child = None;
1342 state.detached = true;
1343 state.buffer.enforce_terminal_cap();
1344 self.persist_task(&task.paths, &state.metadata)
1345 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1346 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1347 return Ok(task.snapshot_locked(&state, 5 * 1024));
1348 }
1349
1350 state.metadata.status = BgTaskStatus::Killing;
1351 self.persist_task(&task.paths, &state.metadata)
1352 .map_err(|e| format!("failed to persist killing state: {e}"))?;
1353
1354 #[cfg(unix)]
1355 if let Some(pgid) = state.metadata.pgid {
1356 terminate_pgid(pgid, state.child.as_mut());
1357 }
1358 #[cfg(windows)]
1359 if let Some(child) = state.child.as_mut() {
1360 super::process::terminate_process(child);
1361 } else if let Some(pid) = state.metadata.child_pid {
1362 terminate_pid(pid);
1363 }
1364 if let Some(child) = state.child.as_mut() {
1365 let _ = child.wait();
1366 }
1367 state.child = None;
1368 state.detached = true;
1369
1370 if !task.paths.exit.exists() {
1371 write_kill_marker_if_absent(&task.paths.exit)
1372 .map_err(|e| format!("failed to write kill marker: {e}"))?;
1373 }
1374
1375 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1376 Some(124)
1377 } else {
1378 None
1379 };
1380 state
1381 .metadata
1382 .mark_terminal(terminal_status, exit_code, None);
1383 task.mark_terminal_now();
1384 self.persist_task(&task.paths, &state.metadata)
1385 .map_err(|e| format!("failed to persist killed state: {e}"))?;
1386 state.buffer.enforce_terminal_cap();
1387 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1388 }
1389
1390 Ok(task.snapshot(5 * 1024))
1391 }
1392
1393 fn finalize_from_marker(
1394 &self,
1395 task: &Arc<BgTask>,
1396 marker: ExitMarker,
1397 reason: Option<String>,
1398 ) -> Result<(), String> {
1399 let mut state = task
1400 .state
1401 .lock()
1402 .map_err(|_| "background task lock poisoned".to_string())?;
1403 if state.metadata.status.is_terminal() {
1404 return Ok(());
1405 }
1406
1407 let updated = self
1408 .update_task_metadata(&task.paths, |metadata| {
1409 let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
1410 *metadata = new_metadata;
1411 })
1412 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1413 state.metadata = updated;
1414 task.mark_terminal_now();
1415 state.child = None;
1416 state.detached = true;
1417 state.buffer.enforce_terminal_cap();
1418 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1419 Ok(())
1420 }
1421
1422 fn enqueue_completion_if_needed(
1423 &self,
1424 metadata: &PersistedTask,
1425 paths: Option<&TaskPaths>,
1426 emit_frame: bool,
1427 ) {
1428 if metadata.status.is_terminal() && !metadata.completion_delivered {
1429 self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
1430 }
1431 }
1432
1433 fn enqueue_completion_locked(
1434 &self,
1435 metadata: &PersistedTask,
1436 buffer: Option<&BgBuffer>,
1437 emit_frame: bool,
1438 ) {
1439 self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
1440 }
1441
1442 fn enqueue_completion_from_parts(
1443 &self,
1444 metadata: &PersistedTask,
1445 buffer: Option<&BgBuffer>,
1446 paths: Option<&TaskPaths>,
1447 emit_frame: bool,
1448 ) {
1449 if !metadata.status.is_terminal() {
1460 return;
1461 }
1462 let (raw_preview, output_truncated) = match buffer {
1467 Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
1468 None => paths
1469 .map(|paths| read_tail_from_disk(paths, BG_COMPLETION_PREVIEW_BYTES))
1470 .unwrap_or_else(|| (String::new(), false)),
1471 };
1472 let output_preview = if metadata.compressed {
1477 self.compress_output(&metadata.command, raw_preview)
1478 } else {
1479 raw_preview
1480 };
1481 let token_counts = self.completion_token_counts(metadata, buffer, paths);
1482 let completion = BgCompletion {
1483 task_id: metadata.task_id.clone(),
1484 session_id: metadata.session_id.clone(),
1485 status: metadata.status.clone(),
1486 exit_code: metadata.exit_code,
1487 command: metadata.command.clone(),
1488 output_preview,
1489 output_truncated,
1490 original_tokens: token_counts.original_tokens,
1491 compressed_tokens: token_counts.compressed_tokens,
1492 tokens_skipped: token_counts.tokens_skipped,
1493 };
1494
1495 self.record_compression_event_if_applicable(metadata, &token_counts);
1506
1507 if metadata.completion_delivered {
1517 return;
1518 }
1519
1520 let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
1523 if completions
1524 .iter()
1525 .any(|existing| existing.task_id == metadata.task_id)
1526 {
1527 false
1528 } else {
1529 completions.push_back(completion.clone());
1530 true
1531 }
1532 } else {
1533 false
1534 };
1535
1536 if pushed && emit_frame {
1537 self.emit_bash_completed(completion);
1538 }
1539 }
1540
1541 fn record_compression_event_if_applicable(
1542 &self,
1543 metadata: &PersistedTask,
1544 token_counts: &CompletionTokenCounts,
1545 ) {
1546 let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
1547 token_counts.original_tokens,
1548 token_counts.compressed_tokens,
1549 token_counts.original_bytes,
1550 token_counts.compressed_bytes,
1551 ) {
1552 (
1553 Some(original_tokens),
1554 Some(compressed_tokens),
1555 Some(original_bytes),
1556 Some(compressed_bytes),
1557 ) => (
1558 original_tokens,
1559 compressed_tokens,
1560 original_bytes,
1561 compressed_bytes,
1562 ),
1563 _ => {
1564 crate::slog_warn!(
1565 "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
1566 metadata.task_id
1567 );
1568 return;
1569 }
1570 };
1571
1572 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
1573 let Some(pool) = pool else {
1574 crate::slog_warn!(
1575 "compression event skipped for {}: db_pool not initialized — was configure run?",
1576 metadata.task_id
1577 );
1578 return;
1579 };
1580 let harness = self
1581 .inner
1582 .db_harness
1583 .read()
1584 .ok()
1585 .and_then(|slot| slot.clone());
1586 let Some(harness) = harness else {
1587 crate::slog_warn!(
1588 "compression event insert skipped for {}: harness not configured",
1589 metadata.task_id
1590 );
1591 return;
1592 };
1593
1594 let project_root = metadata
1595 .project_root
1596 .as_deref()
1597 .unwrap_or(&metadata.workdir);
1598 let project_key = crate::search_index::project_cache_key(project_root);
1599 let row = crate::db::compression_events::CompressionEventRow {
1600 harness: &harness,
1601 session_id: Some(&metadata.session_id),
1602 project_key: &project_key,
1603 tool: "bash",
1604 task_id: Some(&metadata.task_id),
1605 command: Some(&metadata.command),
1606 compressor: if metadata.compressed {
1607 "registry"
1608 } else {
1609 "none"
1610 },
1611 original_bytes,
1612 compressed_bytes,
1613 original_tokens,
1614 compressed_tokens,
1615 created_at: unix_millis() as i64,
1616 };
1617
1618 let conn = match pool.lock() {
1619 Ok(conn) => conn,
1620 Err(_) => {
1621 crate::slog_warn!(
1622 "compression event insert failed for {}: db mutex poisoned",
1623 metadata.task_id
1624 );
1625 return;
1626 }
1627 };
1628 match crate::db::compression_events::insert_compression_event(&conn, &row) {
1629 Ok(_) => {
1630 crate::slog_debug!(
1634 "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
1635 metadata.task_id,
1636 project_key,
1637 metadata.session_id,
1638 original_tokens,
1639 compressed_tokens
1640 );
1641 }
1642 Err(error) => {
1643 crate::slog_warn!(
1644 "compression event insert failed for {}: {}",
1645 metadata.task_id,
1646 error
1647 );
1648 }
1649 }
1650 }
1651
1652 fn emit_bash_completed(&self, completion: BgCompletion) {
1653 let Ok(progress_sender) = self
1654 .inner
1655 .progress_sender
1656 .lock()
1657 .map(|sender| sender.clone())
1658 else {
1659 return;
1660 };
1661 let Some(sender) = progress_sender.as_ref() else {
1662 return;
1663 };
1664 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
1672 completion.task_id,
1673 completion.session_id,
1674 completion.status,
1675 completion.exit_code,
1676 completion.command,
1677 completion.output_preview,
1678 completion.output_truncated,
1679 completion.original_tokens,
1680 completion.compressed_tokens,
1681 completion.tokens_skipped,
1682 )));
1683 }
1684
1685 fn completion_token_counts(
1686 &self,
1687 metadata: &PersistedTask,
1688 buffer: Option<&BgBuffer>,
1689 paths: Option<&TaskPaths>,
1690 ) -> CompletionTokenCounts {
1691 let raw = match buffer {
1692 Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
1693 None => paths
1694 .map(|paths| read_for_token_count_from_disk(paths, TOKENIZE_CAP_BYTES_PER_STREAM))
1695 .unwrap_or(TokenCountInput::Skipped),
1696 };
1697
1698 let TokenCountInput::Text(raw_output) = raw else {
1699 return CompletionTokenCounts::skipped();
1700 };
1701
1702 let original_tokens = token_count_u32(&raw_output);
1703 let original_bytes = raw_output.len() as i64;
1704 let compressed_output = if metadata.compressed {
1705 self.compress_output(&metadata.command, raw_output)
1706 } else {
1707 raw_output
1708 };
1709 let compressed_tokens = token_count_u32(&compressed_output);
1710 let compressed_bytes = compressed_output.len() as i64;
1711 CompletionTokenCounts {
1712 original_tokens: Some(original_tokens),
1713 compressed_tokens: Some(compressed_tokens),
1714 original_bytes: Some(original_bytes),
1715 compressed_bytes: Some(compressed_bytes),
1716 tokens_skipped: false,
1717 }
1718 }
1719
1720 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
1721 if !self
1722 .inner
1723 .long_running_reminder_enabled
1724 .load(Ordering::SeqCst)
1725 {
1726 return;
1727 }
1728 let interval_ms = self
1729 .inner
1730 .long_running_reminder_interval_ms
1731 .load(Ordering::SeqCst);
1732 if interval_ms == 0 {
1733 return;
1734 }
1735 let interval = Duration::from_millis(interval_ms);
1736 let now = Instant::now();
1737 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
1738 return;
1739 };
1740 let since = last_reminder_at.unwrap_or(task.started);
1741 if now.duration_since(since) < interval {
1742 return;
1743 }
1744 let command = task
1745 .state
1746 .lock()
1747 .map(|state| state.metadata.command.clone())
1748 .unwrap_or_default();
1749 *last_reminder_at = Some(now);
1750 self.emit_bash_long_running(BashLongRunningFrame::new(
1751 task.task_id.clone(),
1752 task.session_id.clone(),
1753 command,
1754 task.started.elapsed().as_millis() as u64,
1755 ));
1756 }
1757
1758 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
1759 let Ok(progress_sender) = self
1760 .inner
1761 .progress_sender
1762 .lock()
1763 .map(|sender| sender.clone())
1764 else {
1765 return;
1766 };
1767 if let Some(sender) = progress_sender.as_ref() {
1768 sender(PushFrame::BashLongRunning(frame));
1769 }
1770 }
1771
1772 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
1773 self.inner
1774 .tasks
1775 .lock()
1776 .ok()
1777 .and_then(|tasks| tasks.get(task_id).cloned())
1778 }
1779
1780 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
1781 self.task(task_id)
1782 .filter(|task| task.session_id == session_id)
1783 }
1784
1785 fn running_count(&self) -> usize {
1786 self.inner
1787 .tasks
1788 .lock()
1789 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
1790 .unwrap_or(0)
1791 }
1792
1793 fn start_watchdog(&self) {
1794 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
1795 super::watchdog::start(self.clone());
1796 }
1797 }
1798
1799 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
1800 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
1801 }
1802
1803 #[cfg(test)]
1804 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1805 self.task_for_session(task_id, session_id)
1806 .map(|task| task.paths.json.clone())
1807 }
1808
1809 #[cfg(test)]
1810 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1811 self.task_for_session(task_id, session_id)
1812 .map(|task| task.paths.exit.clone())
1813 }
1814
1815 fn generate_unique_task_id(&self) -> Result<String, String> {
1817 for _ in 0..32 {
1818 let candidate = random_slug();
1819 let tasks = self
1820 .inner
1821 .tasks
1822 .lock()
1823 .map_err(|_| "background task registry lock poisoned".to_string())?;
1824 if tasks.contains_key(&candidate) {
1825 continue;
1826 }
1827 let completions = self
1828 .inner
1829 .completions
1830 .lock()
1831 .map_err(|_| "background completions lock poisoned".to_string())?;
1832 if completions
1833 .iter()
1834 .any(|completion| completion.task_id == candidate)
1835 {
1836 continue;
1837 }
1838 return Ok(candidate);
1839 }
1840 Err("failed to allocate unique background task id after 32 attempts".to_string())
1841 }
1842}
1843
1844struct CompletionTokenCounts {
1845 original_tokens: Option<u32>,
1846 compressed_tokens: Option<u32>,
1847 original_bytes: Option<i64>,
1848 compressed_bytes: Option<i64>,
1849 tokens_skipped: bool,
1850}
1851
1852impl CompletionTokenCounts {
1853 fn skipped() -> Self {
1854 Self {
1855 original_tokens: None,
1856 compressed_tokens: None,
1857 original_bytes: None,
1858 compressed_bytes: None,
1859 tokens_skipped: true,
1860 }
1861 }
1862}
1863
1864fn token_count_u32(text: &str) -> u32 {
1865 aft_tokenizer::count_tokens(text)
1866 .try_into()
1867 .unwrap_or(u32::MAX)
1868}
1869
1870impl Default for BgTaskRegistry {
1871 fn default() -> Self {
1872 Self::new(Arc::new(Mutex::new(None)))
1873 }
1874}
1875
1876fn modified_within(path: &Path, grace: Duration) -> bool {
1877 fs::metadata(path)
1878 .and_then(|metadata| metadata.modified())
1879 .ok()
1880 .and_then(|modified| SystemTime::now().duration_since(modified).ok())
1881 .map(|age| age < grace)
1882 .unwrap_or(false)
1883}
1884
1885fn canonicalized_path(path: &Path) -> PathBuf {
1886 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1887}
1888
1889fn started_instant_from_unix_millis(started_at: u64) -> Instant {
1890 let now_ms = SystemTime::now()
1891 .duration_since(UNIX_EPOCH)
1892 .ok()
1893 .map(|duration| duration.as_millis() as u64)
1894 .unwrap_or(started_at);
1895 let elapsed_ms = now_ms.saturating_sub(started_at);
1896 Instant::now()
1897 .checked_sub(Duration::from_millis(elapsed_ms))
1898 .unwrap_or_else(Instant::now)
1899}
1900
1901fn gc_quarantine(storage_dir: &Path) {
1902 let quarantine_root = storage_dir.join("bash-tasks-quarantine");
1903 let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
1904 return;
1905 };
1906 for session_entry in session_dirs.flatten() {
1907 let session_quarantine_dir = session_entry.path();
1908 if !session_quarantine_dir.is_dir() {
1909 continue;
1910 }
1911 let entries = match fs::read_dir(&session_quarantine_dir) {
1912 Ok(entries) => entries,
1913 Err(error) => {
1914 crate::slog_warn!(
1915 "failed to read background task quarantine dir {}: {error}",
1916 session_quarantine_dir.display()
1917 );
1918 continue;
1919 }
1920 };
1921 for entry in entries.flatten() {
1922 let path = entry.path();
1923 if modified_within(&path, QUARANTINE_GC_GRACE) {
1924 continue;
1925 }
1926 let result = if path.is_dir() {
1927 fs::remove_dir_all(&path)
1928 } else {
1929 fs::remove_file(&path)
1930 };
1931 match result {
1932 Ok(()) => log::debug!(
1933 "deleted old background task quarantine entry {}",
1934 path.display()
1935 ),
1936 Err(error) => crate::slog_warn!(
1937 "failed to delete old background task quarantine entry {}: {error}",
1938 path.display()
1939 ),
1940 }
1941 }
1942 let _ = fs::remove_dir(&session_quarantine_dir);
1943 }
1944 let _ = fs::remove_dir(&quarantine_root);
1945}
1946
1947enum QuarantineKind {
1948 Corrupt,
1949 Invalid,
1950}
1951
1952fn quarantine_task_json(
1953 storage_dir: &Path,
1954 session_dir: &Path,
1955 json_path: &Path,
1956 kind: QuarantineKind,
1957) -> Result<(), String> {
1958 let session_hash = session_dir
1959 .file_name()
1960 .and_then(|name| name.to_str())
1961 .ok_or_else(|| {
1962 format!(
1963 "invalid background task session dir: {}",
1964 session_dir.display()
1965 )
1966 })?;
1967 let task_name = json_path
1968 .file_name()
1969 .and_then(|name| name.to_str())
1970 .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
1971 let unix_ts = SystemTime::now()
1972 .duration_since(UNIX_EPOCH)
1973 .map(|duration| duration.as_secs())
1974 .unwrap_or(0);
1975 let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
1976 fs::create_dir_all(&quarantine_dir).map_err(|e| {
1977 format!(
1978 "failed to create background task quarantine dir {}: {e}",
1979 quarantine_dir.display()
1980 )
1981 })?;
1982 let target_name = quarantine_name(task_name, unix_ts, &kind);
1983 let target = quarantine_dir.join(target_name);
1984 fs::rename(json_path, &target).map_err(|e| {
1985 format!(
1986 "failed to quarantine background task metadata {} to {}: {e}",
1987 json_path.display(),
1988 target.display()
1989 )
1990 })?;
1991
1992 for sibling in task_sibling_paths(json_path) {
1993 if !sibling.exists() {
1994 continue;
1995 }
1996 let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
1997 crate::slog_warn!(
1998 "skipping background task sibling with invalid name during quarantine: {}",
1999 sibling.display()
2000 );
2001 continue;
2002 };
2003 let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
2004 if let Err(error) = fs::rename(&sibling, &sibling_target) {
2005 crate::slog_warn!(
2006 "failed to quarantine background task sibling {} to {}: {error}",
2007 sibling.display(),
2008 sibling_target.display()
2009 );
2010 }
2011 }
2012
2013 let _ = fs::remove_dir(session_dir);
2014 Ok(())
2015}
2016
2017fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
2018 match kind {
2019 QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
2020 QuarantineKind::Invalid => {
2021 let path = Path::new(file_name);
2022 let stem = path.file_stem().and_then(|stem| stem.to_str());
2023 let extension = path.extension().and_then(|extension| extension.to_str());
2024 match (stem, extension) {
2025 (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
2026 _ => format!("{file_name}.invalid.{unix_ts}"),
2027 }
2028 }
2029 }
2030}
2031
2032fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
2033 let Some(parent) = json_path.parent() else {
2034 return Vec::new();
2035 };
2036 let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
2037 return Vec::new();
2038 };
2039 ["stdout", "stderr", "exit", "ps1", "bat", "sh"]
2040 .into_iter()
2041 .map(|extension| parent.join(format!("{stem}.{extension}")))
2042 .collect()
2043}
2044
2045fn read_tail_from_disk(paths: &TaskPaths, max_bytes: usize) -> (String, bool) {
2046 let stdout = fs::read(&paths.stdout).unwrap_or_default();
2047 let stderr = fs::read(&paths.stderr).unwrap_or_default();
2048 let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
2049 bytes.extend_from_slice(&stdout);
2050 bytes.extend_from_slice(&stderr);
2051 if bytes.len() <= max_bytes {
2052 return (String::from_utf8_lossy(&bytes).into_owned(), false);
2053 }
2054 let start = bytes.len().saturating_sub(max_bytes);
2055 (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
2056}
2057
2058fn read_for_token_count_from_disk(
2059 paths: &TaskPaths,
2060 max_bytes_per_stream: usize,
2061) -> TokenCountInput {
2062 let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
2069 let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
2070 match (stdout, stderr) {
2071 (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2072 String::from_utf8_lossy(&stdout).as_ref(),
2073 String::from_utf8_lossy(&stderr).as_ref(),
2074 )),
2075 (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
2076 String::from_utf8_lossy(&stdout).as_ref(),
2077 "",
2078 )),
2079 (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2080 "",
2081 String::from_utf8_lossy(&stderr).as_ref(),
2082 )),
2083 (Err(_), Err(_)) => TokenCountInput::Skipped,
2084 }
2085}
2086
2087fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
2092 use std::io::{Read, Seek, SeekFrom};
2093 let mut file = std::fs::File::open(path)?;
2094 let len = file.metadata()?.len();
2095 let read_len = len.min(max_bytes as u64);
2096 if read_len > 0 && len > max_bytes as u64 {
2097 file.seek(SeekFrom::End(-(read_len as i64)))?;
2098 }
2099 let mut bytes = Vec::with_capacity(read_len as usize);
2100 file.read_to_end(&mut bytes)?;
2101 Ok(bytes)
2102}
2103
2104impl BgTask {
2105 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
2106 let state = self
2107 .state
2108 .lock()
2109 .unwrap_or_else(|poison| poison.into_inner());
2110 self.snapshot_locked(&state, preview_bytes)
2111 }
2112
2113 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
2114 let metadata = &state.metadata;
2115 let duration_ms = metadata.duration_ms.or_else(|| {
2116 metadata
2117 .status
2118 .is_terminal()
2119 .then(|| self.started.elapsed().as_millis() as u64)
2120 });
2121 let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
2122 BgTaskSnapshot {
2123 info: BgTaskInfo {
2124 task_id: self.task_id.clone(),
2125 status: metadata.status.clone(),
2126 command: metadata.command.clone(),
2127 started_at: metadata.started_at,
2128 duration_ms,
2129 },
2130 exit_code: metadata.exit_code,
2131 child_pid: metadata.child_pid,
2132 workdir: metadata.workdir.display().to_string(),
2133 output_preview,
2134 output_truncated,
2135 output_path: state
2136 .buffer
2137 .output_path()
2138 .map(|path| path.display().to_string()),
2139 stderr_path: Some(state.buffer.stderr_path().display().to_string()),
2140 }
2141 }
2142
2143 pub(crate) fn is_running(&self) -> bool {
2144 self.state
2145 .lock()
2146 .map(|state| state.metadata.status == BgTaskStatus::Running)
2147 .unwrap_or(false)
2148 }
2149
2150 fn mark_terminal_now(&self) {
2151 if let Ok(mut terminal_at) = self.terminal_at.lock() {
2152 if terminal_at.is_none() {
2153 *terminal_at = Some(Instant::now());
2154 }
2155 }
2156 }
2157
2158 fn set_completion_delivered(
2159 &self,
2160 delivered: bool,
2161 registry: &BgTaskRegistry,
2162 ) -> Result<(), String> {
2163 let mut state = self
2164 .state
2165 .lock()
2166 .map_err(|_| "background task lock poisoned".to_string())?;
2167 let updated = registry
2168 .update_task_metadata(&self.paths, |metadata| {
2169 metadata.completion_delivered = delivered;
2170 })
2171 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
2172 state.metadata = updated;
2173 Ok(())
2174 }
2175}
2176
2177fn terminal_metadata_from_marker(
2178 mut metadata: PersistedTask,
2179 marker: ExitMarker,
2180 reason: Option<String>,
2181) -> PersistedTask {
2182 match marker {
2183 ExitMarker::Code(code) => {
2184 let status = if code == 0 {
2185 BgTaskStatus::Completed
2186 } else {
2187 BgTaskStatus::Failed
2188 };
2189 metadata.mark_terminal(status, Some(code), reason);
2190 }
2191 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
2192 }
2193 metadata
2194}
2195
2196#[cfg(unix)]
2197fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
2198 let shell = resolve_posix_shell();
2199 let mut cmd = Command::new(&shell);
2200 cmd.arg("-c")
2201 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
2202 .arg(&shell)
2203 .arg(command)
2204 .arg(exit_path);
2205 unsafe {
2206 cmd.pre_exec(|| {
2207 if libc::setsid() == -1 {
2208 return Err(std::io::Error::last_os_error());
2209 }
2210 Ok(())
2211 });
2212 }
2213 cmd
2214}
2215
2216#[cfg(unix)]
2217fn resolve_posix_shell() -> PathBuf {
2218 static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
2219 POSIX_SHELL
2220 .get_or_init(|| {
2221 std::env::var_os("BASH")
2222 .filter(|value| !value.is_empty())
2223 .map(PathBuf::from)
2224 .filter(|path| path.exists())
2225 .or_else(|| which::which("bash").ok())
2226 .or_else(|| which::which("zsh").ok())
2227 .unwrap_or_else(|| PathBuf::from("/bin/sh"))
2228 })
2229 .clone()
2230}
2231
2232#[cfg(windows)]
2233fn detached_shell_command_for(
2234 shell: crate::windows_shell::WindowsShell,
2235 command: &str,
2236 exit_path: &Path,
2237 paths: &TaskPaths,
2238 creation_flags: u32,
2239) -> Result<Command, String> {
2240 use crate::windows_shell::WindowsShell;
2241 let wrapper_body = shell.wrapper_script(command, exit_path);
2254 let wrapper_ext = match shell {
2255 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
2256 WindowsShell::Cmd => "bat",
2257 WindowsShell::Posix(_) => "sh",
2261 };
2262 let wrapper_path = paths.dir.join(format!(
2263 "{}.{}",
2264 paths
2265 .json
2266 .file_stem()
2267 .and_then(|s| s.to_str())
2268 .unwrap_or("wrapper"),
2269 wrapper_ext
2270 ));
2271 fs::write(&wrapper_path, wrapper_body)
2272 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
2273
2274 let mut cmd = Command::new(shell.binary().as_ref());
2275 match shell {
2276 WindowsShell::Pwsh | WindowsShell::Powershell => {
2277 cmd.args([
2280 "-NoLogo",
2281 "-NoProfile",
2282 "-NonInteractive",
2283 "-ExecutionPolicy",
2284 "Bypass",
2285 "-File",
2286 ]);
2287 cmd.arg(&wrapper_path);
2288 }
2289 WindowsShell::Cmd => {
2290 cmd.args(["/D", "/C"]);
2297 cmd.arg(&wrapper_path);
2298 }
2299 WindowsShell::Posix(_) => {
2300 cmd.arg(&wrapper_path);
2305 }
2306 }
2307
2308 cmd.creation_flags(creation_flags);
2312 Ok(cmd)
2313}
2314
2315fn spawn_detached_child(
2331 command: &str,
2332 paths: &TaskPaths,
2333 workdir: &Path,
2334 env: &HashMap<String, String>,
2335) -> Result<std::process::Child, String> {
2336 #[cfg(not(windows))]
2337 {
2338 let stdout = create_capture_file(&paths.stdout)
2339 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
2340 let stderr = create_capture_file(&paths.stderr)
2341 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
2342 detached_shell_command(command, &paths.exit)
2343 .current_dir(workdir)
2344 .envs(env)
2345 .stdin(Stdio::null())
2346 .stdout(Stdio::from(stdout))
2347 .stderr(Stdio::from(stderr))
2348 .spawn()
2349 .map_err(|e| format!("failed to spawn background bash command: {e}"))
2350 }
2351 #[cfg(windows)]
2352 {
2353 use crate::windows_shell::shell_candidates;
2354 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
2365 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
2386 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
2387 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
2388 let with_breakaway =
2389 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
2390 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
2391 let mut last_error: Option<String> = None;
2392 for (idx, shell) in candidates.iter().enumerate() {
2393 for &flags in &[with_breakaway, without_breakaway] {
2397 let stdout = create_capture_file(&paths.stdout)
2399 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
2400 let stderr = create_capture_file(&paths.stderr)
2401 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
2402 let mut cmd =
2403 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
2404 cmd.current_dir(workdir)
2405 .envs(env)
2406 .stdin(Stdio::null())
2407 .stdout(Stdio::from(stdout))
2408 .stderr(Stdio::from(stderr));
2409 match cmd.spawn() {
2410 Ok(child) => {
2411 if idx > 0 {
2412 crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
2413 the cached PATH probe disagreed with runtime spawn — likely PATH \
2414 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
2415 shell.binary(),
2416 idx);
2417 }
2418 if flags == without_breakaway {
2419 crate::slog_warn!(
2420 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
2421 (likely a restrictive Job Object — CI sandbox or MDM policy). \
2422 Spawned without breakaway; the bg task will be torn down if the \
2423 AFT process group is killed."
2424 );
2425 }
2426 return Ok(child);
2427 }
2428 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
2429 crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
2430 shell.binary());
2431 last_error = Some(format!("{}: {e}", shell.binary()));
2432 break;
2435 }
2436 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
2437 crate::slog_warn!(
2439 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
2440 Access Denied — retrying {} without breakaway",
2441 shell.binary()
2442 );
2443 last_error = Some(format!("{}: {e}", shell.binary()));
2444 continue;
2445 }
2446 Err(e) => {
2447 return Err(format!(
2448 "failed to spawn background bash command via {}: {e}",
2449 shell.binary()
2450 ));
2451 }
2452 }
2453 }
2454 }
2455 Err(format!(
2456 "failed to spawn background bash command: no Windows shell could be spawned. \
2457 Last error: {}. PATH-probed candidates: {:?}",
2458 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
2459 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
2460 ))
2461 }
2462}
2463
2464fn random_slug() -> String {
2465 let mut bytes = [0u8; 4];
2466 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
2468 let t = SystemTime::now()
2470 .duration_since(UNIX_EPOCH)
2471 .map(|d| d.subsec_nanos())
2472 .unwrap_or(0);
2473 let p = std::process::id();
2474 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
2475 });
2476 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
2478 format!("bash-{hex}")
2479}
2480
2481#[cfg(test)]
2482mod tests {
2483 use std::collections::HashMap;
2484 #[cfg(windows)]
2485 use std::fs;
2486 use std::sync::{Arc, Mutex};
2487 use std::time::Duration;
2488 #[cfg(windows)]
2489 use std::time::Instant;
2490
2491 use super::*;
2492
2493 #[cfg(unix)]
2494 const QUICK_SUCCESS_COMMAND: &str = "true";
2495 #[cfg(windows)]
2496 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
2497
2498 #[cfg(unix)]
2499 const LONG_RUNNING_COMMAND: &str = "sleep 5";
2500 #[cfg(windows)]
2501 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
2502
2503 fn spawn_dead_child() -> std::process::Child {
2508 #[cfg(unix)]
2509 let mut cmd = std::process::Command::new("true");
2510 #[cfg(windows)]
2511 let mut cmd = {
2512 let mut c = std::process::Command::new("cmd");
2513 c.args(["/c", "exit", "0"]);
2514 c
2515 };
2516 cmd.stdin(std::process::Stdio::null());
2517 cmd.stdout(std::process::Stdio::null());
2518 cmd.stderr(std::process::Stdio::null());
2519 let mut child = cmd.spawn().expect("spawn replacement child for reap test");
2520 let started = Instant::now();
2529 loop {
2530 match child.try_wait() {
2531 Ok(Some(_)) => break,
2532 Ok(None) => {
2533 if started.elapsed() > Duration::from_secs(5) {
2534 panic!("dead-child stand-in did not exit within 5s");
2535 }
2536 std::thread::sleep(Duration::from_millis(10));
2537 }
2538 Err(error) => panic!("dead-child try_wait failed: {error}"),
2539 }
2540 }
2541 child
2542 }
2543
2544 #[test]
2545 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
2546 let registry = BgTaskRegistry::default();
2547 let dir = tempfile::tempdir().unwrap();
2548 let task_id = registry
2549 .spawn(
2550 QUICK_SUCCESS_COMMAND,
2551 "session".to_string(),
2552 dir.path().to_path_buf(),
2553 HashMap::new(),
2554 Some(Duration::from_secs(30)),
2555 dir.path().to_path_buf(),
2556 10,
2557 true,
2558 false,
2559 Some(dir.path().to_path_buf()),
2560 )
2561 .unwrap();
2562 registry
2563 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
2564 .unwrap();
2565 let completions = registry.drain_completions_for_session(Some("session"));
2566 assert_eq!(completions.len(), 1);
2567 assert_eq!(
2568 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
2569 vec![task_id.clone()]
2570 );
2571
2572 registry.cleanup_finished(Duration::ZERO);
2573
2574 assert!(registry.inner.tasks.lock().unwrap().is_empty());
2575 }
2576
2577 #[test]
2578 fn cleanup_finished_retains_undelivered_terminals() {
2579 let registry = BgTaskRegistry::default();
2580 let dir = tempfile::tempdir().unwrap();
2581 let task_id = registry
2582 .spawn(
2583 QUICK_SUCCESS_COMMAND,
2584 "session".to_string(),
2585 dir.path().to_path_buf(),
2586 HashMap::new(),
2587 Some(Duration::from_secs(30)),
2588 dir.path().to_path_buf(),
2589 10,
2590 true,
2591 false,
2592 Some(dir.path().to_path_buf()),
2593 )
2594 .unwrap();
2595 registry
2596 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
2597 .unwrap();
2598
2599 registry.cleanup_finished(Duration::ZERO);
2600
2601 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
2602 }
2603
2604 #[test]
2614 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
2615 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2616 let dir = tempfile::tempdir().unwrap();
2617 let task_id = registry
2618 .spawn(
2619 QUICK_SUCCESS_COMMAND,
2620 "session".to_string(),
2621 dir.path().to_path_buf(),
2622 HashMap::new(),
2623 Some(Duration::from_secs(30)),
2624 dir.path().to_path_buf(),
2625 10,
2626 true,
2627 false,
2628 Some(dir.path().to_path_buf()),
2629 )
2630 .unwrap();
2631
2632 let task = registry.task_for_session(&task_id, "session").unwrap();
2633
2634 let started = Instant::now();
2639 loop {
2640 let exited = {
2641 let mut state = task.state.lock().unwrap();
2642 if let Some(child) = state.child.as_mut() {
2643 matches!(child.try_wait(), Ok(Some(_)))
2644 } else {
2645 true
2646 }
2647 };
2648 if exited {
2649 break;
2650 }
2651 assert!(
2652 started.elapsed() < Duration::from_secs(5),
2653 "child should exit quickly"
2654 );
2655 std::thread::sleep(Duration::from_millis(20));
2656 }
2657
2658 registry
2666 .inner
2667 .shutdown
2668 .store(true, std::sync::atomic::Ordering::SeqCst);
2669 std::thread::sleep(Duration::from_millis(550));
2673
2674 let _ = std::fs::remove_file(&task.paths.exit);
2677
2678 {
2695 let mut state = task.state.lock().unwrap();
2696 state.metadata.status = BgTaskStatus::Running;
2697 state.metadata.status_reason = None;
2698 state.metadata.exit_code = None;
2699 state.metadata.finished_at = None;
2700 state.metadata.duration_ms = None;
2701 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
2704 .expect("persist reset Running metadata for reap_child test");
2705 if state.child.is_none() {
2709 state.child = Some(spawn_dead_child());
2710 }
2711 }
2712 *task.terminal_at.lock().unwrap() = None;
2715
2716 assert!(
2719 task.is_running(),
2720 "precondition: metadata.status == Running"
2721 );
2722 assert!(
2723 !task.paths.exit.exists(),
2724 "precondition: exit marker absent"
2725 );
2726
2727 registry.reap_child(&task);
2731
2732 let state = task.state.lock().unwrap();
2733 assert!(
2734 state.metadata.status.is_terminal(),
2735 "reap_child must transition to terminal when PID dead and no marker. \
2736 Got status={:?}",
2737 state.metadata.status
2738 );
2739 assert_eq!(
2740 state.metadata.status,
2741 BgTaskStatus::Failed,
2742 "must specifically be Failed (not Killed): status={:?}",
2743 state.metadata.status
2744 );
2745 assert_eq!(
2746 state.metadata.status_reason.as_deref(),
2747 Some("process exited without exit marker"),
2748 "reason must match replay path's wording: {:?}",
2749 state.metadata.status_reason
2750 );
2751 assert!(
2752 state.child.is_none(),
2753 "child handle must be released after reap"
2754 );
2755 assert!(state.detached, "task must be marked detached after reap");
2756 }
2757
2758 #[test]
2764 fn reap_child_preserves_running_when_exit_marker_exists() {
2765 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2766 let dir = tempfile::tempdir().unwrap();
2767 let task_id = registry
2768 .spawn(
2769 QUICK_SUCCESS_COMMAND,
2770 "session".to_string(),
2771 dir.path().to_path_buf(),
2772 HashMap::new(),
2773 Some(Duration::from_secs(30)),
2774 dir.path().to_path_buf(),
2775 10,
2776 true,
2777 false,
2778 Some(dir.path().to_path_buf()),
2779 )
2780 .unwrap();
2781
2782 let task = registry.task_for_session(&task_id, "session").unwrap();
2783
2784 let started = Instant::now();
2787 loop {
2788 let exited = {
2789 let mut state = task.state.lock().unwrap();
2790 if let Some(child) = state.child.as_mut() {
2791 matches!(child.try_wait(), Ok(Some(_)))
2792 } else {
2793 true
2794 }
2795 };
2796 if exited && task.paths.exit.exists() {
2797 break;
2798 }
2799 assert!(
2800 started.elapsed() < Duration::from_secs(5),
2801 "child should exit and write marker quickly"
2802 );
2803 std::thread::sleep(Duration::from_millis(20));
2804 }
2805
2806 registry
2812 .inner
2813 .shutdown
2814 .store(true, std::sync::atomic::Ordering::SeqCst);
2815 std::thread::sleep(Duration::from_millis(550));
2816
2817 {
2823 let mut state = task.state.lock().unwrap();
2824 state.metadata.status = BgTaskStatus::Running;
2825 state.metadata.status_reason = None;
2826 if state.child.is_none() {
2827 state.child = Some(spawn_dead_child());
2828 }
2829 }
2830 *task.terminal_at.lock().unwrap() = None;
2831 if !task.paths.exit.exists() {
2834 std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
2835 }
2836
2837 registry.reap_child(&task);
2841
2842 let state = task.state.lock().unwrap();
2843 assert!(
2844 state.child.is_none(),
2845 "child handle still released even when marker exists"
2846 );
2847 assert!(
2848 state.detached,
2849 "task still marked detached even when marker exists"
2850 );
2851 assert_eq!(
2856 state.metadata.status,
2857 BgTaskStatus::Running,
2858 "reap_child must defer to poll_task when marker exists"
2859 );
2860 }
2861
2862 #[test]
2863 fn cleanup_finished_keeps_running_tasks() {
2864 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2865 let dir = tempfile::tempdir().unwrap();
2866 let task_id = registry
2867 .spawn(
2868 LONG_RUNNING_COMMAND,
2869 "session".to_string(),
2870 dir.path().to_path_buf(),
2871 HashMap::new(),
2872 Some(Duration::from_secs(30)),
2873 dir.path().to_path_buf(),
2874 10,
2875 true,
2876 false,
2877 Some(dir.path().to_path_buf()),
2878 )
2879 .unwrap();
2880
2881 registry.cleanup_finished(Duration::ZERO);
2882
2883 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
2884 let _ = registry.kill(&task_id, "session");
2885 }
2886
2887 #[cfg(windows)]
2888 fn wait_for_file(path: &Path) -> String {
2889 let started = Instant::now();
2890 loop {
2891 if path.exists() {
2892 return fs::read_to_string(path).expect("read file");
2893 }
2894 assert!(
2895 started.elapsed() < Duration::from_secs(30),
2896 "timed out waiting for {}",
2897 path.display()
2898 );
2899 std::thread::sleep(Duration::from_millis(100));
2900 }
2901 }
2902
2903 #[cfg(windows)]
2904 fn spawn_windows_registry_command(
2905 command: &str,
2906 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
2907 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2908 let dir = tempfile::tempdir().unwrap();
2909 let task_id = registry
2910 .spawn(
2911 command,
2912 "session".to_string(),
2913 dir.path().to_path_buf(),
2914 HashMap::new(),
2915 Some(Duration::from_secs(30)),
2916 dir.path().to_path_buf(),
2917 10,
2918 false,
2919 false,
2920 Some(dir.path().to_path_buf()),
2921 )
2922 .unwrap();
2923 (registry, dir, task_id)
2924 }
2925
2926 #[cfg(windows)]
2927 #[test]
2928 fn windows_spawn_writes_exit_marker_for_zero_exit() {
2929 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
2930 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2931
2932 let content = wait_for_file(&exit_path);
2933
2934 assert_eq!(content.trim(), "0");
2935 }
2936
2937 #[cfg(windows)]
2938 #[test]
2939 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
2940 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
2941 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2942
2943 let content = wait_for_file(&exit_path);
2944
2945 assert_eq!(content.trim(), "42");
2946 }
2947
2948 #[cfg(windows)]
2949 #[test]
2950 fn windows_spawn_captures_stdout_to_disk() {
2951 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
2952 let task = registry.task_for_session(&task_id, "session").unwrap();
2953 let stdout_path = task.paths.stdout.clone();
2954 let exit_path = task.paths.exit.clone();
2955
2956 let _ = wait_for_file(&exit_path);
2957 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
2958
2959 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
2960 }
2961
2962 #[cfg(windows)]
2963 #[test]
2964 fn windows_spawn_uses_pwsh_when_available() {
2965 let candidates = crate::windows_shell::shell_candidates_with(
2969 |binary| match binary {
2970 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
2971 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
2972 _ => None,
2973 },
2974 || None,
2975 );
2976 let shell = candidates.first().expect("at least one candidate").clone();
2977 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
2978 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
2979 }
2980
2981 #[cfg(windows)]
2988 #[test]
2989 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
2990 let exit_path = Path::new(r"C:\Temp\bash-test.exit");
2991 let script =
2992 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
2993
2994 assert!(
2998 script.contains("set CODE=%ERRORLEVEL%"),
2999 "wrapper must capture exit code into CODE: {script}"
3000 );
3001 assert!(
3002 script.contains("echo %CODE% >"),
3003 "wrapper must echo CODE to a temp marker file: {script}"
3004 );
3005 assert!(
3006 script.contains("move /Y"),
3007 "wrapper must use atomic move to write the marker: {script}"
3008 );
3009 assert!(
3012 script.contains("> nul"),
3013 "wrapper must redirect move output to nul: {script}"
3014 );
3015 assert!(
3017 script.contains("exit /B %CODE%"),
3018 "wrapper must propagate the captured exit code: {script}"
3019 );
3020 assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
3021 assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
3022 }
3023
3024 #[cfg(windows)]
3030 #[test]
3031 fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
3032 use crate::windows_shell::WindowsShell;
3033 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
3034 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3035 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3036 assert_eq!(
3037 args_strs,
3038 vec!["/D", "/S", "/C", "echo wrapped"],
3039 "Cmd::bg_command must prepend /D /S /C"
3040 );
3041 }
3042
3043 #[cfg(windows)]
3047 #[test]
3048 fn windows_shell_pwsh_bg_command_uses_standard_args() {
3049 use crate::windows_shell::WindowsShell;
3050 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
3051 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3052 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3053 assert!(
3054 args_strs.contains(&"-Command"),
3055 "Pwsh::bg_command must use -Command: {args_strs:?}"
3056 );
3057 assert!(
3058 args_strs.contains(&"Get-Date"),
3059 "Pwsh::bg_command must include the user command body"
3060 );
3061 }
3062
3063 #[allow(dead_code)]
3094 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
3096}