1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// ABOUTME: Stream wrapper that owns a child process for proper lifecycle management
// ABOUTME: Prevents zombie processes, drains stderr, and kills child on drop
//
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2026 dravr.ai
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::process::Child;
use tokio::task::JoinHandle;
use tokio_stream::Stream;
use tracing::debug;
use crate::types::{RunnerError, StreamChunk};
/// Maximum stderr to buffer during streaming (1 MiB)
pub(crate) const MAX_STREAMING_STDERR_BYTES: usize = 1024 * 1024;
/// Guards a child process for the lifetime of a streaming response.
///
/// When the stream is dropped (after natural completion or early
/// cancellation), the child process is killed and the stderr drain
/// task is aborted. This prevents zombie processes and resource leaks.
///
/// All fields are `Unpin`, so `GuardedStream` is `Unpin` and the
/// `Stream` impl can safely access inner fields through `Pin<&mut Self>`.
pub struct GuardedStream {
inner: Pin<Box<dyn Stream<Item = Result<StreamChunk, RunnerError>> + Send>>,
child: Option<Child>,
stderr_task: Option<JoinHandle<Vec<u8>>>,
}
impl GuardedStream {
/// Create a guarded stream wrapping a child process.
///
/// The `stderr_task` drains the child's stderr in the background
/// to prevent buffer-full deadlocks where the child blocks on
/// write to a full stderr pipe.
pub fn new(
inner: impl Stream<Item = Result<StreamChunk, RunnerError>> + Send + 'static,
child: Child,
stderr_task: JoinHandle<Vec<u8>>,
) -> Self {
debug!(child_pid = child.id().unwrap_or(0), "GuardedStream created");
Self {
inner: Box::pin(inner),
child: Some(child),
stderr_task: Some(stderr_task),
}
}
}
impl Stream for GuardedStream {
type Item = Result<StreamChunk, RunnerError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.as_mut().poll_next(cx)
}
}
impl Drop for GuardedStream {
fn drop(&mut self) {
debug!("GuardedStream dropped, cleaning up child process");
if let Some(mut child) = self.child.take() {
let _ = child.start_kill();
}
if let Some(task) = self.stderr_task.take() {
task.abort();
}
}
}