Skip to main content

zeph_tools/compression/
decorator.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! `CompressedExecutor<E>` — decorator that post-processes tool output through a compressor.
5//!
6//! Wraps the ROOT executor (any `ToolExecutor` implementation). The compressor is applied
7//! only to successful `ToolOutput.summary` strings — on error, the raw result is returned
8//! unchanged.
9//!
10//! # Invariant (T4)
11//!
12//! Audit logging is performed by the wrapped tool implementations, not here. Because
13//! `CompressedExecutor` wraps *outside* the tool boundary, audit JSONL always records
14//! the raw pre-compression payload.
15
16use std::sync::Arc;
17
18use crate::executor::ToolExecutor;
19use crate::{ToolCall, ToolError, ToolOutput};
20
21use super::OutputCompressor;
22
23/// Decorator that runs a compressor on each successful tool output.
24///
25/// The `inner` executor is called first; its output is then passed to `compressor.compress`.
26/// If compression returns `Ok(None)` or `Err(...)`, the original `summary` is kept intact.
27/// Compression errors are logged as warnings but never propagate to the caller.
28///
29/// # Type parameters
30///
31/// - `E` — the wrapped [`ToolExecutor`]. Often `CompositeExecutor` or `DynExecutor`.
32///
33/// # Examples
34///
35/// ```rust,no_run
36/// use std::sync::Arc;
37/// use zeph_tools::compression::{CompressedExecutor, IdentityCompressor};
38/// // let executor = CompressedExecutor::new(inner_executor, Arc::new(IdentityCompressor), 200);
39/// ```
40#[derive(Debug)]
41pub struct CompressedExecutor<E: ToolExecutor> {
42    inner: E,
43    compressor: Arc<dyn OutputCompressor>,
44    min_lines_to_compress: usize,
45}
46
47impl<E: ToolExecutor> CompressedExecutor<E> {
48    /// Wrap `inner` with `compressor`.
49    ///
50    /// Outputs with fewer than `min_lines` lines skip the compressor entirely.
51    #[must_use]
52    pub fn new(inner: E, compressor: Arc<dyn OutputCompressor>, min_lines: usize) -> Self {
53        Self {
54            inner,
55            compressor,
56            min_lines_to_compress: min_lines,
57        }
58    }
59
60    /// Apply compression to `output`, logging on error and returning the original on failure.
61    async fn maybe_compress(&self, output: ToolOutput) -> ToolOutput {
62        let line_count = output.summary.lines().count();
63        if line_count < self.min_lines_to_compress {
64            return output;
65        }
66
67        match self
68            .compressor
69            .compress(&output.tool_name, &output.summary)
70            .await
71        {
72            Ok(Some(compressed)) => {
73                tracing::debug!(
74                    compressor = self.compressor.name(),
75                    tool = %output.tool_name.as_str(),
76                    original_len = output.summary.len(),
77                    compressed_len = compressed.len(),
78                    "CompressedExecutor: output compressed"
79                );
80                ToolOutput {
81                    summary: compressed,
82                    ..output
83                }
84            }
85            Ok(None) => output,
86            Err(e) => {
87                tracing::warn!(
88                    compressor = self.compressor.name(),
89                    error = %e,
90                    "CompressedExecutor: compression error, using raw output"
91                );
92                output
93            }
94        }
95    }
96}
97
98impl<E: ToolExecutor> ToolExecutor for CompressedExecutor<E> {
99    async fn execute(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
100        let result = self.inner.execute(response).await?;
101        match result {
102            Some(out) => Ok(Some(self.maybe_compress(out).await)),
103            None => Ok(None),
104        }
105    }
106
107    async fn execute_confirmed(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
108        let result = self.inner.execute_confirmed(response).await?;
109        match result {
110            Some(out) => Ok(Some(self.maybe_compress(out).await)),
111            None => Ok(None),
112        }
113    }
114
115    fn tool_definitions(&self) -> Vec<crate::registry::ToolDef> {
116        self.inner.tool_definitions()
117    }
118
119    async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
120        let result = self.inner.execute_tool_call(call).await?;
121        match result {
122            Some(out) => Ok(Some(self.maybe_compress(out).await)),
123            None => Ok(None),
124        }
125    }
126
127    async fn execute_tool_call_confirmed(
128        &self,
129        call: &ToolCall,
130    ) -> Result<Option<ToolOutput>, ToolError> {
131        let result = self.inner.execute_tool_call_confirmed(call).await?;
132        match result {
133            Some(out) => Ok(Some(self.maybe_compress(out).await)),
134            None => Ok(None),
135        }
136    }
137
138    fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
139        self.inner.set_skill_env(env);
140    }
141
142    fn set_effective_trust(&self, level: crate::SkillTrustLevel) {
143        self.inner.set_effective_trust(level);
144    }
145
146    fn is_tool_retryable(&self, tool_id: &str) -> bool {
147        self.inner.is_tool_retryable(tool_id)
148    }
149
150    fn is_tool_speculatable(&self, tool_id: &str) -> bool {
151        self.inner.is_tool_speculatable(tool_id)
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use std::collections::HashMap;
158    use std::pin::Pin;
159    use std::sync::{Arc, Mutex};
160
161    use zeph_common::ToolName;
162
163    use super::*;
164    use crate::compression::{CompressionError, OutputCompressor};
165    use crate::{SkillTrustLevel, ToolCall, ToolError, ToolOutput, registry::ToolDef};
166
167    /// Records the raw output it receives so the test can assert on it.
168    struct SpyExecutor {
169        received_summary: Arc<Mutex<Option<String>>>,
170        raw_output: String,
171    }
172
173    impl SpyExecutor {
174        fn new(raw: impl Into<String>) -> (Self, Arc<Mutex<Option<String>>>) {
175            let spy = Arc::new(Mutex::new(None));
176            (
177                Self {
178                    received_summary: Arc::clone(&spy),
179                    raw_output: raw.into(),
180                },
181                spy,
182            )
183        }
184    }
185
186    fn make_output(tool_name: ToolName, summary: String) -> ToolOutput {
187        ToolOutput {
188            tool_name,
189            summary,
190            blocks_executed: 0,
191            filter_stats: None,
192            diff: None,
193            streamed: false,
194            terminal_id: None,
195            locations: None,
196            raw_response: None,
197            claim_source: None,
198        }
199    }
200
201    impl ToolExecutor for SpyExecutor {
202        async fn execute(&self, _response: &str) -> Result<Option<ToolOutput>, ToolError> {
203            Ok(Some(make_output(
204                ToolName::new("spy"),
205                self.raw_output.clone(),
206            )))
207        }
208
209        async fn execute_confirmed(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
210            self.execute(response).await
211        }
212
213        fn tool_definitions(&self) -> Vec<ToolDef> {
214            vec![]
215        }
216
217        async fn execute_tool_call(
218            &self,
219            call: &ToolCall,
220        ) -> Result<Option<ToolOutput>, ToolError> {
221            let out = make_output(call.tool_id.clone(), self.raw_output.clone());
222            *self.received_summary.lock().unwrap() = Some(out.summary.clone());
223            Ok(Some(out))
224        }
225
226        async fn execute_tool_call_confirmed(
227            &self,
228            call: &ToolCall,
229        ) -> Result<Option<ToolOutput>, ToolError> {
230            self.execute_tool_call(call).await
231        }
232
233        fn set_skill_env(&self, _env: Option<HashMap<String, String>>) {}
234        fn set_effective_trust(&self, _level: SkillTrustLevel) {}
235        fn is_tool_retryable(&self, _tool_id: &str) -> bool {
236            false
237        }
238        fn is_tool_speculatable(&self, _tool_id: &str) -> bool {
239            false
240        }
241    }
242
243    /// Always replaces output with a fixed "compressed" string.
244    #[derive(Debug)]
245    struct StubCompressor;
246
247    impl OutputCompressor for StubCompressor {
248        fn compress<'a>(
249            &'a self,
250            _tool_name: &'a ToolName,
251            _output: &'a str,
252        ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CompressionError>> + Send + 'a>>
253        {
254            Box::pin(async move { Ok(Some("COMPRESSED".to_owned())) })
255        }
256
257        fn name(&self) -> &'static str {
258            "stub"
259        }
260    }
261
262    /// T4 invariant: audit (inner) receives raw output; LLM context receives compressed output.
263    ///
264    /// The inner executor (`SpyExecutor`) records what it emits before `CompressedExecutor`
265    /// applies the compressor. The assertion confirms that the inner layer saw the full raw
266    /// string, while the outer `CompressedExecutor` returns the shortened version.
267    #[tokio::test]
268    async fn t4_audit_sees_raw_llm_sees_compressed() {
269        let raw = "line\n".repeat(300);
270        let (spy, received) = SpyExecutor::new(raw.clone());
271        let executor = CompressedExecutor::new(spy, Arc::new(StubCompressor), 10);
272
273        let call = ToolCall {
274            tool_id: ToolName::new("spy"),
275            params: serde_json::Map::new(),
276            caller_id: None,
277            context: None,
278        };
279        let out = executor.execute_tool_call(&call).await.unwrap().unwrap();
280
281        // Inner executor (audit layer) received the raw payload.
282        assert_eq!(received.lock().unwrap().as_deref(), Some(raw.as_str()));
283        // Outer executor (LLM context layer) received the compressed payload.
284        assert_eq!(out.summary, "COMPRESSED");
285    }
286
287    /// Output below the line-count threshold passes through without compression.
288    #[tokio::test]
289    async fn maybe_compress_skips_when_below_threshold() {
290        let short = "line\n".repeat(5);
291        let (spy, _received) = SpyExecutor::new(short.clone());
292        let executor = CompressedExecutor::new(spy, Arc::new(StubCompressor), 100);
293
294        let call = ToolCall {
295            tool_id: ToolName::new("spy"),
296            params: serde_json::Map::new(),
297            caller_id: None,
298            context: None,
299        };
300        let out = executor.execute_tool_call(&call).await.unwrap().unwrap();
301        // StubCompressor would return "COMPRESSED" — but threshold not met, so raw passes through.
302        assert_eq!(out.summary, short);
303    }
304
305    /// Compressor error falls back to the raw (uncompressed) output.
306    #[tokio::test]
307    async fn compression_error_falls_back_to_raw() {
308        #[derive(Debug)]
309        struct ErrorCompressor;
310        impl OutputCompressor for ErrorCompressor {
311            fn compress<'a>(
312                &'a self,
313                _tool_name: &'a ToolName,
314                _output: &'a str,
315            ) -> Pin<Box<dyn Future<Output = Result<Option<String>, CompressionError>> + Send + 'a>>
316            {
317                Box::pin(async move { Err(CompressionError::CompileTimeout) })
318            }
319            fn name(&self) -> &'static str {
320                "error"
321            }
322        }
323
324        let raw = "line\n".repeat(300);
325        let (spy, _) = SpyExecutor::new(raw.clone());
326        let executor = CompressedExecutor::new(spy, Arc::new(ErrorCompressor), 10);
327
328        let call = ToolCall {
329            tool_id: ToolName::new("spy"),
330            params: serde_json::Map::new(),
331            caller_id: None,
332            context: None,
333        };
334        let out = executor.execute_tool_call(&call).await.unwrap().unwrap();
335        // Error compressor → raw output preserved (T4 safety invariant).
336        assert_eq!(out.summary, raw);
337    }
338}