1use std::sync::Arc;
17
18use crate::executor::ToolExecutor;
19use crate::{ToolCall, ToolError, ToolOutput};
20
21use super::OutputCompressor;
22
23#[derive(Debug)]
41pub struct CompressedExecutor<E: ToolExecutor> {
42 inner: E,
43 compressor: Arc<dyn OutputCompressor>,
44 min_lines_to_compress: usize,
45}
46
47impl<E: ToolExecutor> CompressedExecutor<E> {
48 #[must_use]
52 pub fn new(inner: E, compressor: Arc<dyn OutputCompressor>, min_lines: usize) -> Self {
53 Self {
54 inner,
55 compressor,
56 min_lines_to_compress: min_lines,
57 }
58 }
59
60 async fn maybe_compress(&self, output: ToolOutput) -> ToolOutput {
62 let line_count = output.summary.lines().count();
63 if line_count < self.min_lines_to_compress {
64 return output;
65 }
66
67 match self
68 .compressor
69 .compress(&output.tool_name, &output.summary)
70 .await
71 {
72 Ok(Some(compressed)) => {
73 tracing::debug!(
74 compressor = self.compressor.name(),
75 tool = %output.tool_name.as_str(),
76 original_len = output.summary.len(),
77 compressed_len = compressed.len(),
78 "CompressedExecutor: output compressed"
79 );
80 ToolOutput {
81 summary: compressed,
82 ..output
83 }
84 }
85 Ok(None) => output,
86 Err(e) => {
87 tracing::warn!(
88 compressor = self.compressor.name(),
89 error = %e,
90 "CompressedExecutor: compression error, using raw output"
91 );
92 output
93 }
94 }
95 }
96}
97
98impl<E: ToolExecutor> ToolExecutor for CompressedExecutor<E> {
99 async fn execute(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
100 let result = self.inner.execute(response).await?;
101 match result {
102 Some(out) => Ok(Some(self.maybe_compress(out).await)),
103 None => Ok(None),
104 }
105 }
106
107 async fn execute_confirmed(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
108 let result = self.inner.execute_confirmed(response).await?;
109 match result {
110 Some(out) => Ok(Some(self.maybe_compress(out).await)),
111 None => Ok(None),
112 }
113 }
114
115 fn tool_definitions(&self) -> Vec<crate::registry::ToolDef> {
116 self.inner.tool_definitions()
117 }
118
119 async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
120 let result = self.inner.execute_tool_call(call).await?;
121 match result {
122 Some(out) => Ok(Some(self.maybe_compress(out).await)),
123 None => Ok(None),
124 }
125 }
126
127 async fn execute_tool_call_confirmed(
128 &self,
129 call: &ToolCall,
130 ) -> Result<Option<ToolOutput>, ToolError> {
131 let result = self.inner.execute_tool_call_confirmed(call).await?;
132 match result {
133 Some(out) => Ok(Some(self.maybe_compress(out).await)),
134 None => Ok(None),
135 }
136 }
137
138 fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
139 self.inner.set_skill_env(env);
140 }
141
142 fn set_effective_trust(&self, level: crate::SkillTrustLevel) {
143 self.inner.set_effective_trust(level);
144 }
145
146 fn is_tool_retryable(&self, tool_id: &str) -> bool {
147 self.inner.is_tool_retryable(tool_id)
148 }
149
150 fn is_tool_speculatable(&self, tool_id: &str) -> bool {
151 self.inner.is_tool_speculatable(tool_id)
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use std::collections::HashMap;
158 use std::pin::Pin;
159 use std::sync::{Arc, Mutex};
160
161 use zeph_common::ToolName;
162
163 use super::*;
164 use crate::compression::{CompressionError, OutputCompressor};
165 use crate::{SkillTrustLevel, ToolCall, ToolError, ToolOutput, registry::ToolDef};
166
167 struct SpyExecutor {
169 received_summary: Arc<Mutex<Option<String>>>,
170 raw_output: String,
171 }
172
173 impl SpyExecutor {
174 fn new(raw: impl Into<String>) -> (Self, Arc<Mutex<Option<String>>>) {
175 let spy = Arc::new(Mutex::new(None));
176 (
177 Self {
178 received_summary: Arc::clone(&spy),
179 raw_output: raw.into(),
180 },
181 spy,
182 )
183 }
184 }
185
186 fn make_output(tool_name: ToolName, summary: String) -> ToolOutput {
187 ToolOutput {
188 tool_name,
189 summary,
190 blocks_executed: 0,
191 filter_stats: None,
192 diff: None,
193 streamed: false,
194 terminal_id: None,
195 locations: None,
196 raw_response: None,
197 claim_source: None,
198 }
199 }
200
201 impl ToolExecutor for SpyExecutor {
202 async fn execute(&self, _response: &str) -> Result<Option<ToolOutput>, ToolError> {
203 Ok(Some(make_output(
204 ToolName::new("spy"),
205 self.raw_output.clone(),
206 )))
207 }
208
209 async fn execute_confirmed(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
210 self.execute(response).await
211 }
212
213 fn tool_definitions(&self) -> Vec<ToolDef> {
214 vec![]
215 }
216
217 async fn execute_tool_call(
218 &self,
219 call: &ToolCall,
220 ) -> Result<Option<ToolOutput>, ToolError> {
221 let out = make_output(call.tool_id.clone(), self.raw_output.clone());
222 *self.received_summary.lock().unwrap() = Some(out.summary.clone());
223 Ok(Some(out))
224 }
225
226 async fn execute_tool_call_confirmed(
227 &self,
228 call: &ToolCall,
229 ) -> Result<Option<ToolOutput>, ToolError> {
230 self.execute_tool_call(call).await
231 }
232
233 fn set_skill_env(&self, _env: Option<HashMap<String, String>>) {}
234 fn set_effective_trust(&self, _level: SkillTrustLevel) {}
235 fn is_tool_retryable(&self, _tool_id: &str) -> bool {
236 false
237 }
238 fn is_tool_speculatable(&self, _tool_id: &str) -> bool {
239 false
240 }
241 }
242
243 #[derive(Debug)]
245 struct StubCompressor;
246
247 impl OutputCompressor for StubCompressor {
248 fn compress<'a>(
249 &'a self,
250 _tool_name: &'a ToolName,
251 _output: &'a str,
252 ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CompressionError>> + Send + 'a>>
253 {
254 Box::pin(async move { Ok(Some("COMPRESSED".to_owned())) })
255 }
256
257 fn name(&self) -> &'static str {
258 "stub"
259 }
260 }
261
262 #[tokio::test]
268 async fn t4_audit_sees_raw_llm_sees_compressed() {
269 let raw = "line\n".repeat(300);
270 let (spy, received) = SpyExecutor::new(raw.clone());
271 let executor = CompressedExecutor::new(spy, Arc::new(StubCompressor), 10);
272
273 let call = ToolCall {
274 tool_id: ToolName::new("spy"),
275 params: serde_json::Map::new(),
276 caller_id: None,
277 context: None,
278 };
279 let out = executor.execute_tool_call(&call).await.unwrap().unwrap();
280
281 assert_eq!(received.lock().unwrap().as_deref(), Some(raw.as_str()));
283 assert_eq!(out.summary, "COMPRESSED");
285 }
286
287 #[tokio::test]
289 async fn maybe_compress_skips_when_below_threshold() {
290 let short = "line\n".repeat(5);
291 let (spy, _received) = SpyExecutor::new(short.clone());
292 let executor = CompressedExecutor::new(spy, Arc::new(StubCompressor), 100);
293
294 let call = ToolCall {
295 tool_id: ToolName::new("spy"),
296 params: serde_json::Map::new(),
297 caller_id: None,
298 context: None,
299 };
300 let out = executor.execute_tool_call(&call).await.unwrap().unwrap();
301 assert_eq!(out.summary, short);
303 }
304
305 #[tokio::test]
307 async fn compression_error_falls_back_to_raw() {
308 #[derive(Debug)]
309 struct ErrorCompressor;
310 impl OutputCompressor for ErrorCompressor {
311 fn compress<'a>(
312 &'a self,
313 _tool_name: &'a ToolName,
314 _output: &'a str,
315 ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CompressionError>> + Send + 'a>>
316 {
317 Box::pin(async move { Err(CompressionError::CompileTimeout) })
318 }
319 fn name(&self) -> &'static str {
320 "error"
321 }
322 }
323
324 let raw = "line\n".repeat(300);
325 let (spy, _) = SpyExecutor::new(raw.clone());
326 let executor = CompressedExecutor::new(spy, Arc::new(ErrorCompressor), 10);
327
328 let call = ToolCall {
329 tool_id: ToolName::new("spy"),
330 params: serde_json::Map::new(),
331 caller_id: None,
332 context: None,
333 };
334 let out = executor.execute_tool_call(&call).await.unwrap().unwrap();
335 assert_eq!(out.summary, raw);
337 }
338}