1use std::collections::BTreeMap;
39use std::io::{Read, Write};
40use std::path::PathBuf;
41use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
42use std::sync::{Arc, LazyLock, Mutex, OnceLock};
43use std::time::Duration;
44
45use harn_vm::VmValue;
46
47use crate::error::HostlibError;
48use crate::process::{self as process_handle, ProcessHandle, ProcessKiller, SpawnSpec};
49use crate::tools::proc::{self, CaptureConfig, CommandStatus, EnvMode};
50
51static HANDLE_COUNTER: AtomicU64 = AtomicU64::new(1);
53
54struct CancelState {
56 cancelled: AtomicBool,
59 timed_out: AtomicBool,
63}
64
65#[derive(Default)]
66struct OutputState {
67 stdout: Vec<u8>,
68 stderr: Vec<u8>,
69}
70
71struct HandleEntry {
73 handle: Option<Box<dyn ProcessHandle>>,
75 killer: Arc<dyn ProcessKiller>,
77 session_id: String,
78 cancel_state: Arc<CancelState>,
80 completion_tx: Option<std::sync::mpsc::SyncSender<()>>,
84 result_tx: Option<std::sync::mpsc::SyncSender<VmValue>>,
87}
88
89#[derive(Default)]
90struct HandleStore {
91 entries: BTreeMap<String, HandleEntry>,
92}
93
94static HANDLE_STORE: LazyLock<Mutex<HandleStore>> =
95 LazyLock::new(|| Mutex::new(HandleStore::default()));
96
97pub struct LongRunningHandleInfo {
100 pub command_id: String,
102 pub handle_id: String,
104 pub started_at: String,
106 pub pid: u32,
108 pub process_group_id: Option<u32>,
110 pub command_display: String,
112}
113
114pub(crate) struct LongRunningSpawnOptions {
115 pub(crate) env_mode: EnvMode,
116 pub(crate) capture: CaptureConfig,
117 pub(crate) session_id: String,
118 pub(crate) progress_interval: Option<Duration>,
119 pub(crate) progress_max_inline_bytes: usize,
120}
121
122struct WaiterContext {
123 command_id: String,
124 handle_id: String,
125 session_id: String,
126 started_at: String,
127 process_group_id: Option<u32>,
128 command_display: String,
129 progress_interval: Option<Duration>,
130 progress_max_inline_bytes: usize,
131}
132
133struct ProgressThreadContext {
134 command_id: String,
135 handle_id: String,
136 session_id: String,
137 started_at: String,
138 command_display: String,
139 process_group_id: Option<u32>,
140 output_path: PathBuf,
141 stdout_path: PathBuf,
142 stderr_path: PathBuf,
143 output_state: Arc<Mutex<OutputState>>,
144 cancel_state: Arc<CancelState>,
145 done: Arc<AtomicBool>,
146 started: std::time::Instant,
147 interval: Duration,
148 max_inline_bytes: usize,
149}
150
151impl LongRunningHandleInfo {
152 pub fn into_handle_response(self) -> VmValue {
154 proc::running_response(
155 self.command_id,
156 self.handle_id,
157 self.pid,
158 self.process_group_id,
159 self.started_at,
160 self.command_display,
161 )
162 }
163}
164
165pub fn spawn_long_running(
171 builtin: &'static str,
172 program: String,
173 args: Vec<String>,
174 cwd: Option<PathBuf>,
175 env: BTreeMap<String, String>,
176 session_id: String,
177) -> Result<LongRunningHandleInfo, HostlibError> {
178 spawn_long_running_with_options(
179 builtin,
180 program,
181 args,
182 cwd,
183 env,
184 LongRunningSpawnOptions {
185 env_mode: EnvMode::InheritClean,
186 capture: CaptureConfig::default(),
187 session_id,
188 progress_interval: None,
189 progress_max_inline_bytes: CaptureConfig::default().max_inline_bytes,
190 },
191 )
192}
193
194pub(crate) fn spawn_long_running_with_options(
195 builtin: &'static str,
196 program: String,
197 args: Vec<String>,
198 cwd: Option<PathBuf>,
199 env: BTreeMap<String, String>,
200 options: LongRunningSpawnOptions,
201) -> Result<LongRunningHandleInfo, HostlibError> {
202 let mut env = env;
203 proc::apply_toolchain_path(cwd.as_deref(), &mut env, options.env_mode);
204 let spec = SpawnSpec {
205 builtin,
206 program: program.clone(),
207 args: args.clone(),
208 cwd,
209 env,
210 env_mode: options.env_mode,
211 use_stdin: false,
212 configure_process_group: true,
213 };
214 let handle = process_handle::spawn_process(spec)
215 .map_err(|e| proc::process_error_to_hostlib(builtin, e))?;
216
217 let pid = handle.pid().unwrap_or(0);
218 let process_group_id = handle.process_group_id();
219 let killer = handle.killer();
220 let id = HANDLE_COUNTER.fetch_add(1, Ordering::SeqCst);
221 let handle_id = format!("hto-{:x}-{id}", std::process::id());
222 let command_id = proc::next_command_id();
223 let started_at = proc::now_rfc3339();
224
225 let mut all_argv = vec![program];
226 all_argv.extend(args.iter().cloned());
227 let command_display = all_argv.join(" ");
228
229 let cancel_state = Arc::new(CancelState {
230 cancelled: AtomicBool::new(false),
231 timed_out: AtomicBool::new(false),
232 });
233
234 {
235 let mut store = HANDLE_STORE
236 .lock()
237 .expect("long-running handle store poisoned");
238 store.entries.insert(
239 handle_id.clone(),
240 HandleEntry {
241 handle: Some(handle),
242 killer,
243 session_id: options.session_id.clone(),
244 cancel_state: cancel_state.clone(),
245 completion_tx: None,
246 result_tx: None,
247 },
248 );
249 }
250
251 let waiter_context = WaiterContext {
252 command_id: command_id.clone(),
253 handle_id: handle_id.clone(),
254 session_id: options.session_id,
255 started_at: started_at.clone(),
256 process_group_id,
257 command_display: command_display.clone(),
258 progress_interval: options.progress_interval,
259 progress_max_inline_bytes: options.progress_max_inline_bytes,
260 };
261 let waiter_thread_name = waiter_context.handle_id.clone();
262 let capture = options.capture;
263 std::thread::Builder::new()
264 .name(format!("hto-waiter-{waiter_thread_name}"))
265 .spawn(move || {
266 waiter_thread(waiter_context, cancel_state, capture);
267 })
268 .map_err(|e| HostlibError::Backend {
269 builtin,
270 message: format!("failed to spawn waiter thread: {e}"),
271 })?;
272
273 Ok(LongRunningHandleInfo {
274 command_id,
275 handle_id,
276 started_at,
277 pid,
278 process_group_id,
279 command_display,
280 })
281}
282
283fn waiter_thread(context: WaiterContext, cancel_state: Arc<CancelState>, capture: CaptureConfig) {
285 let waiter_start = std::time::Instant::now();
286
287 let mut handle = {
290 let mut store = HANDLE_STORE
291 .lock()
292 .expect("long-running handle store poisoned");
293 match store.entries.get_mut(&context.handle_id) {
294 Some(entry) => match entry.handle.take() {
295 Some(h) => h,
296 None => return, },
298 None => return, }
300 };
301
302 let output_state = Arc::new(Mutex::new(OutputState::default()));
303 let done = Arc::new(AtomicBool::new(false));
304 let planned = proc::planned_artifact_paths(&context.command_id);
305 if let Some(parent) = planned.output_path.parent() {
306 let _ = std::fs::create_dir_all(parent);
307 }
308 let _ = std::fs::File::create(&planned.stdout_path);
309 let _ = std::fs::File::create(&planned.stderr_path);
310 let combined_file = std::fs::File::create(&planned.output_path)
311 .ok()
312 .map(|file| Arc::new(Mutex::new(file)));
313
314 let stdout_thread = handle.take_stdout().map(|out| {
315 spawn_output_drain(
316 out,
317 output_state.clone(),
318 planned.stdout_path.clone(),
319 combined_file.clone(),
320 true,
321 )
322 });
323 let stderr_thread = handle.take_stderr().map(|err| {
324 spawn_output_drain(
325 err,
326 output_state.clone(),
327 planned.stderr_path.clone(),
328 combined_file.clone(),
329 false,
330 )
331 });
332
333 let progress_thread = context
334 .progress_interval
335 .filter(|interval| !interval.is_zero())
336 .map(|interval| {
337 spawn_progress_thread(ProgressThreadContext {
338 command_id: context.command_id.clone(),
339 handle_id: context.handle_id.clone(),
340 session_id: context.session_id.clone(),
341 started_at: context.started_at.clone(),
342 command_display: context.command_display.clone(),
343 process_group_id: context.process_group_id,
344 output_path: planned.output_path.clone(),
345 stdout_path: planned.stdout_path.clone(),
346 stderr_path: planned.stderr_path.clone(),
347 output_state: output_state.clone(),
348 cancel_state: cancel_state.clone(),
349 done: done.clone(),
350 started: waiter_start,
351 interval,
352 max_inline_bytes: context.progress_max_inline_bytes,
353 })
354 });
355
356 let status = handle.wait().ok();
357
358 if let Some(thread) = stdout_thread {
359 let _ = thread.join();
360 }
361 if let Some(thread) = stderr_thread {
362 let _ = thread.join();
363 }
364 done.store(true, Ordering::Release);
365 drop(progress_thread);
366 let (stdout, stderr) = {
367 let state = output_state
368 .lock()
369 .unwrap_or_else(|poison| poison.into_inner());
370 (state.stdout.clone(), state.stderr.clone())
371 };
372
373 let (completion_tx, result_tx) = {
376 let mut store = HANDLE_STORE
377 .lock()
378 .expect("long-running handle store poisoned");
379 let entry = store
380 .entries
381 .remove(&context.handle_id)
382 .map(|mut e| (e.completion_tx.take(), e.result_tx.take()));
383 entry.unwrap_or((None, None))
384 };
385
386 let signal_done = move || {
387 if let Some(tx) = completion_tx {
388 let _ = tx.try_send(());
389 }
390 };
391
392 let cancelled = cancel_state.cancelled.load(Ordering::Acquire);
393 let timed_out = cancelled && cancel_state.timed_out.load(Ordering::Acquire);
394
395 let (exit_code, signal_name) = match status {
396 Some(s) => decode_exit_status(s),
397 None => (-1, Some("SIGKILL".to_string())),
399 };
400 let command_status = if timed_out {
401 CommandStatus::TimedOut
402 } else if cancelled {
403 CommandStatus::Killed
404 } else {
405 CommandStatus::Completed
406 };
407 let duration = waiter_start.elapsed();
408 let duration_ms = duration.as_millis() as i64;
409 let artifacts = match proc::persist_artifacts(
410 &context.command_id,
411 &stdout,
412 &stderr,
413 Some(&context.handle_id),
414 ) {
415 Ok(artifacts) => artifacts,
416 Err(_) => return,
417 };
418 let (inline_stdout, inline_stderr) = proc::inline_output(&stdout, &stderr, capture);
419
420 let mut payload = serde_json::Map::new();
421 payload.insert(
422 "command_id".into(),
423 serde_json::Value::String(context.command_id.clone()),
424 );
425 payload.insert(
426 "status".into(),
427 serde_json::Value::String(command_status.as_str().to_string()),
428 );
429 payload.insert(
430 "handle_id".into(),
431 serde_json::Value::String(context.handle_id),
432 );
433 payload.insert(
434 "command_or_op_descriptor".into(),
435 serde_json::Value::String(context.command_display),
436 );
437 payload.insert(
438 "started_at".into(),
439 serde_json::Value::String(context.started_at),
440 );
441 payload.insert(
442 "ended_at".into(),
443 serde_json::Value::String(proc::now_rfc3339()),
444 );
445 payload.insert(
446 "duration_ms".into(),
447 serde_json::Value::Number(duration_ms.into()),
448 );
449 payload.insert(
450 "exit_code".into(),
451 serde_json::Value::Number(exit_code.into()),
452 );
453 payload.insert("timed_out".into(), serde_json::Value::Bool(timed_out));
454 payload.insert("stdout".into(), serde_json::Value::String(inline_stdout));
455 payload.insert("stderr".into(), serde_json::Value::String(inline_stderr));
456 payload.insert(
457 "output_path".into(),
458 serde_json::Value::String(artifacts.output_path.display().to_string()),
459 );
460 payload.insert(
461 "stdout_path".into(),
462 serde_json::Value::String(artifacts.stdout_path.display().to_string()),
463 );
464 payload.insert(
465 "stderr_path".into(),
466 serde_json::Value::String(artifacts.stderr_path.display().to_string()),
467 );
468 payload.insert(
469 "line_count".into(),
470 serde_json::Value::Number(artifacts.line_count.into()),
471 );
472 payload.insert(
473 "byte_count".into(),
474 serde_json::Value::Number(artifacts.byte_count.into()),
475 );
476 payload.insert(
477 "output_sha256".into(),
478 serde_json::Value::String(artifacts.output_sha256),
479 );
480 if let Some(pgid) = context.process_group_id {
481 payload.insert(
482 "process_group_id".into(),
483 serde_json::Value::Number((pgid as u64).into()),
484 );
485 }
486 if let Some(sig) = signal_name {
487 payload.insert("signal".into(), serde_json::Value::String(sig));
488 } else {
489 payload.insert("signal".into(), serde_json::Value::Null);
490 }
491
492 if let Some(tx) = result_tx {
493 let value = serde_json::Value::Object(payload.clone());
494 let _ = tx.try_send(harn_vm::json_to_vm_value(&value));
495 }
496 if !cancelled {
497 let content = serde_json::to_string(&payload).unwrap_or_default();
498 harn_vm::orchestration::agent_inbox::push(
499 &context.session_id,
500 "tool_result",
501 &content,
502 "hostlib.long_running.exit",
503 );
504 }
505 signal_done();
506}
507
508fn spawn_output_drain(
509 mut reader: Box<dyn Read + Send>,
510 state: Arc<Mutex<OutputState>>,
511 path: std::path::PathBuf,
512 combined_file: Option<Arc<Mutex<std::fs::File>>>,
513 stdout: bool,
514) -> std::thread::JoinHandle<()> {
515 std::thread::spawn(move || {
516 let mut file = std::fs::File::create(path).ok();
517 let mut buf = [0_u8; 8192];
518 loop {
519 let read = match reader.read(&mut buf) {
520 Ok(0) => break,
521 Ok(read) => read,
522 Err(_) => break,
523 };
524 let chunk = &buf[..read];
525 if let Some(file) = file.as_mut() {
526 let _ = file.write_all(chunk);
527 }
528 if let Some(combined) = combined_file.as_ref() {
529 if let Ok(mut combined) = combined.lock() {
530 let _ = combined.write_all(chunk);
531 }
532 }
533 if let Ok(mut state) = state.lock() {
534 if stdout {
535 state.stdout.extend_from_slice(chunk);
536 } else {
537 state.stderr.extend_from_slice(chunk);
538 }
539 }
540 }
541 })
542}
543
544fn spawn_progress_thread(context: ProgressThreadContext) -> std::thread::JoinHandle<()> {
545 std::thread::spawn(move || {
546 while !context.done.load(Ordering::Acquire)
547 && !context.cancel_state.cancelled.load(Ordering::Acquire)
548 {
549 std::thread::sleep(context.interval);
550 if context.done.load(Ordering::Acquire)
551 || context.cancel_state.cancelled.load(Ordering::Acquire)
552 {
553 break;
554 }
555 let (stdout, stderr) = {
556 let state = context
557 .output_state
558 .lock()
559 .unwrap_or_else(|poison| poison.into_inner());
560 (state.stdout.clone(), state.stderr.clone())
561 };
562 let capture = CaptureConfig {
563 max_inline_bytes: context.max_inline_bytes,
564 ..CaptureConfig::default()
565 };
566 let (inline_stdout, inline_stderr) = proc::inline_output(&stdout, &stderr, capture);
567 let byte_count = stdout.len().saturating_add(stderr.len());
568 let payload = serde_json::json!({
569 "command_id": &context.command_id,
570 "handle_id": &context.handle_id,
571 "status": CommandStatus::Running.as_str(),
572 "command_or_op_descriptor": &context.command_display,
573 "started_at": &context.started_at,
574 "ended_at": null,
575 "duration_ms": context.started.elapsed().as_millis() as i64,
576 "exit_code": null,
577 "signal": null,
578 "stdout": inline_stdout,
579 "stderr": inline_stderr,
580 "output_path": context.output_path.display().to_string(),
581 "stdout_path": context.stdout_path.display().to_string(),
582 "stderr_path": context.stderr_path.display().to_string(),
583 "byte_count": byte_count as i64,
584 "line_count": stdout.iter().chain(stderr.iter()).filter(|byte| **byte == b'\n').count() as i64,
585 "process_group_id": context.process_group_id,
586 });
587 harn_vm::orchestration::agent_inbox::push(
588 &context.session_id,
589 "tool_progress",
590 &payload.to_string(),
591 "hostlib.long_running.progress",
592 );
593 }
594 })
595}
596
597pub(crate) struct CancelOptions {
598 pub(crate) timed_out: bool,
599 pub(crate) wait_result: Option<Duration>,
600}
601
602pub(crate) struct CancelOutcome {
603 pub(crate) cancelled: bool,
604 pub(crate) result: Option<VmValue>,
605}
606
607pub fn cancel_handle(handle_id: &str) -> bool {
611 cancel_handle_with_options(
612 handle_id,
613 CancelOptions {
614 timed_out: false,
615 wait_result: None,
616 },
617 )
618 .cancelled
619}
620
621pub(crate) fn cancel_handle_with_options(handle_id: &str, options: CancelOptions) -> CancelOutcome {
622 let (killer, cancel_state, result_rx) = {
623 let mut store = HANDLE_STORE
624 .lock()
625 .expect("long-running handle store poisoned");
626 let Some(entry) = store.entries.get_mut(handle_id) else {
627 return CancelOutcome {
628 cancelled: false,
629 result: None,
630 };
631 };
632 if entry.cancel_state.cancelled.swap(true, Ordering::AcqRel) {
633 return CancelOutcome {
634 cancelled: false,
635 result: None,
636 };
637 }
638 entry
639 .cancel_state
640 .timed_out
641 .store(options.timed_out, Ordering::Release);
642 let result_rx = options.wait_result.map(|_| {
643 let (tx, rx) = std::sync::mpsc::sync_channel::<VmValue>(1);
644 entry.result_tx = Some(tx);
645 rx
646 });
647 (entry.killer.clone(), entry.cancel_state.clone(), result_rx)
648 };
649 do_kill(killer, cancel_state);
650 let result = match (options.wait_result, result_rx) {
651 (Some(timeout), Some(rx)) => rx.recv_timeout(timeout).ok(),
652 _ => None,
653 };
654 CancelOutcome {
655 cancelled: true,
656 result,
657 }
658}
659
660type SessionKillEntry = (Arc<dyn ProcessKiller>, Arc<CancelState>);
664
665pub fn cancel_session_handles(session_id: &str) {
668 let to_kill: Vec<SessionKillEntry> = {
669 let store = HANDLE_STORE
670 .lock()
671 .expect("long-running handle store poisoned");
672 let matching: Vec<String> = store
673 .entries
674 .iter()
675 .filter(|(_, e)| e.session_id == session_id)
676 .map(|(id, _)| id.clone())
677 .collect();
678 matching
679 .into_iter()
680 .filter_map(|id| {
681 let entry = store.entries.get(&id)?;
682 if entry.cancel_state.cancelled.swap(true, Ordering::AcqRel) {
683 return None;
684 }
685 entry.cancel_state.timed_out.store(false, Ordering::Release);
686 Some((entry.killer.clone(), entry.cancel_state.clone()))
687 })
688 .collect()
689 };
690 for (killer, cancel_state) in to_kill {
691 do_kill(killer, cancel_state);
692 }
693}
694
695fn do_kill(killer: Arc<dyn ProcessKiller>, cancel_state: Arc<CancelState>) {
698 killer.kill();
701 cancel_state.cancelled.store(true, Ordering::Release);
702}
703
704pub(crate) fn register_cleanup_hook() {
708 static REGISTERED: OnceLock<()> = OnceLock::new();
709 REGISTERED.get_or_init(|| {
710 let hook: Arc<dyn Fn(&str) + Send + Sync> = Arc::new(|session_id: &str| {
711 cancel_session_handles(session_id);
712 });
713 harn_vm::register_session_end_hook(hook);
714 });
715}
716
717fn decode_exit_status(status: process_handle::ExitStatus) -> (i32, Option<String>) {
718 if let Some(code) = status.code {
719 return (code, None);
720 }
721 if let Some(sig) = status.signal {
722 return (-1, Some(format!("SIG{sig}")));
723 }
724 (-1, None)
725}
726
727pub fn register_completion_notifier(handle_id: &str) -> Option<std::sync::mpsc::Receiver<()>> {
733 let (tx, rx) = std::sync::mpsc::sync_channel::<()>(1);
734 let mut store = HANDLE_STORE
735 .lock()
736 .expect("long-running handle store poisoned");
737 let entry = store.entries.get_mut(handle_id)?;
738 entry.completion_tx = Some(tx);
739 Some(rx)
740}