forge_core_utils/
stream_ext.rs1use std::io;
2
3use futures::{Stream, StreamExt};
4use tokio::time::{Duration, Instant, sleep_until};
5
6use crate::log_msg::LogMsg;
7
8const WINDOW_MS: u64 = 10;
9const WINDOW_LIMIT: usize = 100 * 1024; const COLLECT_LIMIT: usize = WINDOW_LIMIT * 2;
14
15const TRUNC_MARKER: &str = " [truncated] ";
16
17fn middle_truncate_bytes(bytes: &[u8], limit: usize, marker: &str) -> String {
18 if bytes.len() <= limit {
19 return String::from_utf8_lossy(bytes).into_owned();
20 }
21 let m = marker.as_bytes();
22 let mlen = m.len();
23 if limit <= mlen {
24 return String::from_utf8_lossy(&m[..limit]).into_owned();
26 }
27 let keep_prefix = (limit - mlen) / 2;
28 let keep_suffix = limit - mlen - keep_prefix;
29
30 let mut out = Vec::with_capacity(limit);
31 out.extend_from_slice(&bytes[..keep_prefix]);
32 out.extend_from_slice(m);
33 out.extend_from_slice(&bytes[bytes.len() - keep_suffix..]);
34 String::from_utf8_lossy(&out).into_owned()
35}
36
37fn shrink_middle(buf: &mut Vec<u8>, target_len: usize) {
38 if buf.len() <= target_len {
39 return;
40 }
41 let extra = buf.len() - target_len;
42 let mid = buf.len() / 2;
43 let start = mid.saturating_sub(extra / 2);
44 let end = start + extra;
45 buf.drain(start..end);
46}
47
48fn flush_buf(
50 buf: &mut Vec<u8>,
51 kind: Option<bool>,
52 truncated_in_window: &mut bool,
53) -> Option<LogMsg> {
54 if buf.is_empty() && !*truncated_in_window {
55 return None;
56 }
57
58 let needs_marker = *truncated_in_window || buf.len() > WINDOW_LIMIT;
59 let out = if needs_marker {
60 middle_truncate_bytes(buf, WINDOW_LIMIT, TRUNC_MARKER)
61 } else {
62 String::from_utf8_lossy(buf).into_owned()
63 };
64
65 buf.clear();
66 *truncated_in_window = false;
67
68 match kind {
69 Some(true) => Some(LogMsg::Stdout(out)),
70 Some(false) => Some(LogMsg::Stderr(out)),
71 None => None,
72 }
73}
74
75pub fn debounce_logs<S>(input: S) -> impl Stream<Item = Result<LogMsg, io::Error>>
76where
77 S: Stream<Item = Result<LogMsg, io::Error>> + Unpin,
78{
79 async_stream::stream! {
80 let mut buf: Vec<u8> = Vec::with_capacity(WINDOW_LIMIT);
82 let mut current_stream_type: Option<bool> = None; let mut timer = Instant::now() + Duration::from_millis(WINDOW_MS);
84
85 let mut truncated_in_window: bool = false;
87
88 tokio::pin!(input);
89
90 loop {
91 tokio::select! {
92 maybe = input.next() => {
93 let msg = match maybe {
94 Some(Ok(v)) => v,
95 Some(Err(e)) => { yield Err(e); continue; }
96 None => break,
97 };
98
99 match &msg {
100 LogMsg::Stdout(s) | LogMsg::Stderr(s) => {
101 let is_stdout = matches!(msg, LogMsg::Stdout(_));
102
103 if current_stream_type != Some(is_stdout) {
105 if let Some(flushed) = flush_buf(&mut buf, current_stream_type, &mut truncated_in_window) {
106 yield Ok(flushed);
107 }
108 current_stream_type = Some(is_stdout);
109 buf.clear();
110 truncated_in_window = false;
111 }
112
113 let bytes = s.as_bytes();
114 buf.extend_from_slice(bytes);
115 if buf.len() > COLLECT_LIMIT {
116 truncated_in_window = true;
117 shrink_middle(&mut buf, COLLECT_LIMIT);
118 }
119 }
120
121 _ => {
122 if let Some(flushed) = flush_buf(&mut buf, current_stream_type, &mut truncated_in_window) {
124 yield Ok(flushed);
125 }
126 current_stream_type = None;
127 yield Ok(msg);
128 }
129 }
130 }
131
132 _ = sleep_until(timer) => {
133 if let Some(flushed) = {
134 let kind = current_stream_type;
135 flush_buf(&mut buf, kind, &mut truncated_in_window)
136 } {
137 yield Ok(flushed);
138 }
139 timer = Instant::now() + Duration::from_millis(WINDOW_MS);
141 buf.clear();
142 truncated_in_window = false;
143 }
144 }
145 }
146
147 if let Some(flushed) = {
149 let kind = current_stream_type;
150 flush_buf(&mut buf, kind, &mut truncated_in_window)
151 } {
152 yield Ok(flushed);
153 }
154 }
155}