1use std::sync::Arc;
17
18use crate::executor::ToolExecutor;
19use crate::{ToolCall, ToolError, ToolOutput};
20
21use super::OutputCompressor;
22
23#[derive(Debug)]
41pub struct CompressedExecutor<E: ToolExecutor> {
42 inner: E,
43 compressor: Arc<dyn OutputCompressor>,
44 min_lines_to_compress: usize,
45}
46
47impl<E: ToolExecutor> CompressedExecutor<E> {
48 #[must_use]
52 pub fn new(inner: E, compressor: Arc<dyn OutputCompressor>, min_lines: usize) -> Self {
53 Self {
54 inner,
55 compressor,
56 min_lines_to_compress: min_lines,
57 }
58 }
59
60 async fn maybe_compress(&self, output: ToolOutput) -> ToolOutput {
62 let line_count = output.summary.lines().count();
63 if line_count < self.min_lines_to_compress {
64 return output;
65 }
66
67 match self
68 .compressor
69 .compress(&output.tool_name, &output.summary)
70 .await
71 {
72 Ok(Some(compressed)) => {
73 tracing::debug!(
74 compressor = self.compressor.name(),
75 tool = %output.tool_name.as_str(),
76 original_len = output.summary.len(),
77 compressed_len = compressed.len(),
78 "CompressedExecutor: output compressed"
79 );
80 ToolOutput {
81 summary: compressed,
82 ..output
83 }
84 }
85 Ok(None) => output,
86 Err(e) => {
87 tracing::warn!(
88 compressor = self.compressor.name(),
89 error = %e,
90 "CompressedExecutor: compression error, using raw output"
91 );
92 output
93 }
94 }
95 }
96}
97
98impl<E: ToolExecutor> ToolExecutor for CompressedExecutor<E> {
99 async fn execute(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
100 let result = self.inner.execute(response).await?;
101 match result {
102 Some(out) => Ok(Some(self.maybe_compress(out).await)),
103 None => Ok(None),
104 }
105 }
106
107 async fn execute_confirmed(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
108 let result = self.inner.execute_confirmed(response).await?;
109 match result {
110 Some(out) => Ok(Some(self.maybe_compress(out).await)),
111 None => Ok(None),
112 }
113 }
114
115 fn tool_definitions(&self) -> Vec<crate::registry::ToolDef> {
116 self.inner.tool_definitions()
117 }
118
119 async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
120 let result = self.inner.execute_tool_call(call).await?;
121 match result {
122 Some(out) => Ok(Some(self.maybe_compress(out).await)),
123 None => Ok(None),
124 }
125 }
126
127 async fn execute_tool_call_confirmed(
128 &self,
129 call: &ToolCall,
130 ) -> Result<Option<ToolOutput>, ToolError> {
131 let result = self.inner.execute_tool_call_confirmed(call).await?;
132 match result {
133 Some(out) => Ok(Some(self.maybe_compress(out).await)),
134 None => Ok(None),
135 }
136 }
137
138 fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
139 self.inner.set_skill_env(env);
140 }
141
142 fn set_effective_trust(&self, level: crate::SkillTrustLevel) {
143 self.inner.set_effective_trust(level);
144 }
145
146 fn is_tool_retryable(&self, tool_id: &str) -> bool {
147 self.inner.is_tool_retryable(tool_id)
148 }
149
150 fn is_tool_speculatable(&self, tool_id: &str) -> bool {
151 self.inner.is_tool_speculatable(tool_id)
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use std::collections::HashMap;
158 use std::pin::Pin;
159 use std::sync::{Arc, Mutex};
160
161 use zeph_common::ToolName;
162
163 use super::*;
164 use crate::compression::{CompressionError, OutputCompressor};
165 use crate::{SkillTrustLevel, ToolCall, ToolError, ToolOutput, registry::ToolDef};
166
167 struct SpyExecutor {
169 received_summary: Arc<Mutex<Option<String>>>,
170 raw_output: String,
171 }
172
173 impl SpyExecutor {
174 fn new(raw: impl Into<String>) -> (Self, Arc<Mutex<Option<String>>>) {
175 let spy = Arc::new(Mutex::new(None));
176 (
177 Self {
178 received_summary: Arc::clone(&spy),
179 raw_output: raw.into(),
180 },
181 spy,
182 )
183 }
184 }
185
186 fn make_output(tool_name: ToolName, summary: String) -> ToolOutput {
187 ToolOutput {
188 tool_name,
189 summary,
190 blocks_executed: 0,
191 filter_stats: None,
192 diff: None,
193 streamed: false,
194 terminal_id: None,
195 locations: None,
196 raw_response: None,
197 claim_source: None,
198 }
199 }
200
201 impl ToolExecutor for SpyExecutor {
202 async fn execute(&self, _response: &str) -> Result<Option<ToolOutput>, ToolError> {
203 Ok(Some(make_output(
204 ToolName::new("spy"),
205 self.raw_output.clone(),
206 )))
207 }
208
209 async fn execute_confirmed(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
210 self.execute(response).await
211 }
212
213 fn tool_definitions(&self) -> Vec<ToolDef> {
214 vec![]
215 }
216
217 async fn execute_tool_call(
218 &self,
219 call: &ToolCall,
220 ) -> Result<Option<ToolOutput>, ToolError> {
221 let out = make_output(call.tool_id.clone(), self.raw_output.clone());
222 *self.received_summary.lock().unwrap() = Some(out.summary.clone());
223 Ok(Some(out))
224 }
225
226 async fn execute_tool_call_confirmed(
227 &self,
228 call: &ToolCall,
229 ) -> Result<Option<ToolOutput>, ToolError> {
230 self.execute_tool_call(call).await
231 }
232
233 fn set_skill_env(&self, _env: Option<HashMap<String, String>>) {}
234 fn set_effective_trust(&self, _level: SkillTrustLevel) {}
235 fn is_tool_retryable(&self, _tool_id: &str) -> bool {
236 false
237 }
238 fn is_tool_speculatable(&self, _tool_id: &str) -> bool {
239 false
240 }
241 }
242
243 #[derive(Debug)]
245 struct StubCompressor;
246
247 impl OutputCompressor for StubCompressor {
248 fn compress<'a>(
249 &'a self,
250 _tool_name: &'a ToolName,
251 _output: &'a str,
252 ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CompressionError>> + Send + 'a>>
253 {
254 Box::pin(async move { Ok(Some("COMPRESSED".to_owned())) })
255 }
256
257 fn name(&self) -> &'static str {
258 "stub"
259 }
260 }
261
262 #[tokio::test]
268 async fn t4_audit_sees_raw_llm_sees_compressed() {
269 let raw = "line\n".repeat(300);
270 let (spy, received) = SpyExecutor::new(raw.clone());
271 let executor = CompressedExecutor::new(spy, Arc::new(StubCompressor), 10);
272
273 let call = ToolCall {
274 tool_id: ToolName::new("spy"),
275 params: serde_json::Map::new(),
276 caller_id: None,
277 context: None,
278
279 tool_call_id: String::new(),
280 };
281 let out = executor.execute_tool_call(&call).await.unwrap().unwrap();
282
283 assert_eq!(received.lock().unwrap().as_deref(), Some(raw.as_str()));
285 assert_eq!(out.summary, "COMPRESSED");
287 }
288
289 #[tokio::test]
291 async fn maybe_compress_skips_when_below_threshold() {
292 let short = "line\n".repeat(5);
293 let (spy, _received) = SpyExecutor::new(short.clone());
294 let executor = CompressedExecutor::new(spy, Arc::new(StubCompressor), 100);
295
296 let call = ToolCall {
297 tool_id: ToolName::new("spy"),
298 params: serde_json::Map::new(),
299 caller_id: None,
300 context: None,
301
302 tool_call_id: String::new(),
303 };
304 let out = executor.execute_tool_call(&call).await.unwrap().unwrap();
305 assert_eq!(out.summary, short);
307 }
308
309 #[tokio::test]
311 async fn compression_error_falls_back_to_raw() {
312 #[derive(Debug)]
313 struct ErrorCompressor;
314 impl OutputCompressor for ErrorCompressor {
315 fn compress<'a>(
316 &'a self,
317 _tool_name: &'a ToolName,
318 _output: &'a str,
319 ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CompressionError>> + Send + 'a>>
320 {
321 Box::pin(async move { Err(CompressionError::CompileTimeout) })
322 }
323 fn name(&self) -> &'static str {
324 "error"
325 }
326 }
327
328 let raw = "line\n".repeat(300);
329 let (spy, _) = SpyExecutor::new(raw.clone());
330 let executor = CompressedExecutor::new(spy, Arc::new(ErrorCompressor), 10);
331
332 let call = ToolCall {
333 tool_id: ToolName::new("spy"),
334 params: serde_json::Map::new(),
335 caller_id: None,
336 context: None,
337
338 tool_call_id: String::new(),
339 };
340 let out = executor.execute_tool_call(&call).await.unwrap().unwrap();
341 assert_eq!(out.summary, raw);
343 }
344}