forge_core_utils/
stream_ext.rs

1use std::io;
2
3use futures::{Stream, StreamExt};
4use tokio::time::{Duration, Instant, sleep_until};
5
6use crate::log_msg::LogMsg;
7
8const WINDOW_MS: u64 = 10;
9const WINDOW_LIMIT: usize = 100 * 1024; // 100 KiB per window
10// To avoid unbounded growth within a window, cap accumulation.
11// We allow collecting more than WINDOW_LIMIT to preserve both head and tail,
12// then apply middle truncation on flush.
13const COLLECT_LIMIT: usize = WINDOW_LIMIT * 2;
14
15const TRUNC_MARKER: &str = " [truncated] ";
16
17fn middle_truncate_bytes(bytes: &[u8], limit: usize, marker: &str) -> String {
18    if bytes.len() <= limit {
19        return String::from_utf8_lossy(bytes).into_owned();
20    }
21    let m = marker.as_bytes();
22    let mlen = m.len();
23    if limit <= mlen {
24        // Degenerate case: not enough room; return a cut marker
25        return String::from_utf8_lossy(&m[..limit]).into_owned();
26    }
27    let keep_prefix = (limit - mlen) / 2;
28    let keep_suffix = limit - mlen - keep_prefix;
29
30    let mut out = Vec::with_capacity(limit);
31    out.extend_from_slice(&bytes[..keep_prefix]);
32    out.extend_from_slice(m);
33    out.extend_from_slice(&bytes[bytes.len() - keep_suffix..]);
34    String::from_utf8_lossy(&out).into_owned()
35}
36
37fn shrink_middle(buf: &mut Vec<u8>, target_len: usize) {
38    if buf.len() <= target_len {
39        return;
40    }
41    let extra = buf.len() - target_len;
42    let mid = buf.len() / 2;
43    let start = mid.saturating_sub(extra / 2);
44    let end = start + extra;
45    buf.drain(start..end);
46}
47
48// Helper that flushes buffer, inserting a middle [truncated] marker when needed
49fn flush_buf(
50    buf: &mut Vec<u8>,
51    kind: Option<bool>,
52    truncated_in_window: &mut bool,
53) -> Option<LogMsg> {
54    if buf.is_empty() && !*truncated_in_window {
55        return None;
56    }
57
58    let needs_marker = *truncated_in_window || buf.len() > WINDOW_LIMIT;
59    let out = if needs_marker {
60        middle_truncate_bytes(buf, WINDOW_LIMIT, TRUNC_MARKER)
61    } else {
62        String::from_utf8_lossy(buf).into_owned()
63    };
64
65    buf.clear();
66    *truncated_in_window = false;
67
68    match kind {
69        Some(true) => Some(LogMsg::Stdout(out)),
70        Some(false) => Some(LogMsg::Stderr(out)),
71        None => None,
72    }
73}
74
75pub fn debounce_logs<S>(input: S) -> impl Stream<Item = Result<LogMsg, io::Error>>
76where
77    S: Stream<Item = Result<LogMsg, io::Error>> + Unpin,
78{
79    async_stream::stream! {
80        // Single accumulation buffer per window; we trim from the middle when exceeding COLLECT_LIMIT
81        let mut buf: Vec<u8> = Vec::with_capacity(WINDOW_LIMIT);
82        let mut current_stream_type: Option<bool> = None; // Some(true)=stdout, Some(false)=stderr
83        let mut timer = Instant::now() + Duration::from_millis(WINDOW_MS);
84
85        // per-window accounting
86        let mut truncated_in_window: bool = false;
87
88        tokio::pin!(input);
89
90        loop {
91            tokio::select! {
92                maybe = input.next() => {
93                    let msg = match maybe {
94                        Some(Ok(v)) => v,
95                        Some(Err(e)) => { yield Err(e); continue; }
96                        None => break,
97                    };
98
99                    match &msg {
100                        LogMsg::Stdout(s) | LogMsg::Stderr(s) => {
101                            let is_stdout = matches!(msg, LogMsg::Stdout(_));
102
103                            // Flush if switching stream kind
104                            if current_stream_type != Some(is_stdout) {
105                                if let Some(flushed) = flush_buf(&mut buf, current_stream_type, &mut truncated_in_window) {
106                                    yield Ok(flushed);
107                                }
108                                current_stream_type = Some(is_stdout);
109                                buf.clear();
110                                truncated_in_window = false;
111                            }
112
113                            let bytes = s.as_bytes();
114                            buf.extend_from_slice(bytes);
115                            if buf.len() > COLLECT_LIMIT {
116                                truncated_in_window = true;
117                                shrink_middle(&mut buf, COLLECT_LIMIT);
118                            }
119                        }
120
121                        _ => {
122                            // Flush accumulated stdout/stderr before passing through other messages
123                            if let Some(flushed) = flush_buf(&mut buf, current_stream_type, &mut truncated_in_window) {
124                                yield Ok(flushed);
125                            }
126                            current_stream_type = None;
127                            yield Ok(msg);
128                        }
129                    }
130                }
131
132                _ = sleep_until(timer) => {
133                    if let Some(flushed) = {
134                        let kind = current_stream_type;
135                        flush_buf(&mut buf, kind, &mut truncated_in_window)
136                    } {
137                        yield Ok(flushed);
138                    }
139                    // Start a fresh time window
140                    timer = Instant::now() + Duration::from_millis(WINDOW_MS);
141                    buf.clear();
142                    truncated_in_window = false;
143                }
144            }
145        }
146
147        // Final flush on stream end
148        if let Some(flushed) = {
149            let kind = current_stream_type;
150            flush_buf(&mut buf, kind, &mut truncated_in_window)
151        } {
152            yield Ok(flushed);
153        }
154    }
155}