use std::sync::Arc;
use crate::executor::ToolExecutor;
use crate::{ToolCall, ToolError, ToolOutput};
use super::OutputCompressor;
#[derive(Debug)]
pub struct CompressedExecutor<E: ToolExecutor> {
inner: E,
compressor: Arc<dyn OutputCompressor>,
min_lines_to_compress: usize,
}
impl<E: ToolExecutor> CompressedExecutor<E> {
#[must_use]
pub fn new(inner: E, compressor: Arc<dyn OutputCompressor>, min_lines: usize) -> Self {
Self {
inner,
compressor,
min_lines_to_compress: min_lines,
}
}
async fn maybe_compress(&self, output: ToolOutput) -> ToolOutput {
let line_count = output.summary.lines().count();
if line_count < self.min_lines_to_compress {
return output;
}
match self
.compressor
.compress(&output.tool_name, &output.summary)
.await
{
Ok(Some(compressed)) => {
tracing::debug!(
compressor = self.compressor.name(),
tool = %output.tool_name.as_str(),
original_len = output.summary.len(),
compressed_len = compressed.len(),
"CompressedExecutor: output compressed"
);
ToolOutput {
summary: compressed,
..output
}
}
Ok(None) => output,
Err(e) => {
tracing::warn!(
compressor = self.compressor.name(),
error = %e,
"CompressedExecutor: compression error, using raw output"
);
output
}
}
}
}
impl<E: ToolExecutor> ToolExecutor for CompressedExecutor<E> {
async fn execute(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
let result = self.inner.execute(response).await?;
match result {
Some(out) => Ok(Some(self.maybe_compress(out).await)),
None => Ok(None),
}
}
async fn execute_confirmed(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
let result = self.inner.execute_confirmed(response).await?;
match result {
Some(out) => Ok(Some(self.maybe_compress(out).await)),
None => Ok(None),
}
}
fn tool_definitions(&self) -> Vec<crate::registry::ToolDef> {
self.inner.tool_definitions()
}
async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
let result = self.inner.execute_tool_call(call).await?;
match result {
Some(out) => Ok(Some(self.maybe_compress(out).await)),
None => Ok(None),
}
}
async fn execute_tool_call_confirmed(
&self,
call: &ToolCall,
) -> Result<Option<ToolOutput>, ToolError> {
let result = self.inner.execute_tool_call_confirmed(call).await?;
match result {
Some(out) => Ok(Some(self.maybe_compress(out).await)),
None => Ok(None),
}
}
fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
self.inner.set_skill_env(env);
}
fn set_effective_trust(&self, level: crate::SkillTrustLevel) {
self.inner.set_effective_trust(level);
}
fn is_tool_retryable(&self, tool_id: &str) -> bool {
self.inner.is_tool_retryable(tool_id)
}
fn is_tool_speculatable(&self, tool_id: &str) -> bool {
self.inner.is_tool_speculatable(tool_id)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use zeph_common::ToolName;
use super::*;
use crate::compression::{CompressionError, OutputCompressor};
use crate::{SkillTrustLevel, ToolCall, ToolError, ToolOutput, registry::ToolDef};
struct SpyExecutor {
received_summary: Arc<Mutex<Option<String>>>,
raw_output: String,
}
impl SpyExecutor {
fn new(raw: impl Into<String>) -> (Self, Arc<Mutex<Option<String>>>) {
let spy = Arc::new(Mutex::new(None));
(
Self {
received_summary: Arc::clone(&spy),
raw_output: raw.into(),
},
spy,
)
}
}
fn make_output(tool_name: ToolName, summary: String) -> ToolOutput {
ToolOutput {
tool_name,
summary,
blocks_executed: 0,
filter_stats: None,
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
claim_source: None,
}
}
impl ToolExecutor for SpyExecutor {
async fn execute(&self, _response: &str) -> Result<Option<ToolOutput>, ToolError> {
Ok(Some(make_output(
ToolName::new("spy"),
self.raw_output.clone(),
)))
}
async fn execute_confirmed(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
self.execute(response).await
}
fn tool_definitions(&self) -> Vec<ToolDef> {
vec![]
}
async fn execute_tool_call(
&self,
call: &ToolCall,
) -> Result<Option<ToolOutput>, ToolError> {
let out = make_output(call.tool_id.clone(), self.raw_output.clone());
*self.received_summary.lock().unwrap() = Some(out.summary.clone());
Ok(Some(out))
}
async fn execute_tool_call_confirmed(
&self,
call: &ToolCall,
) -> Result<Option<ToolOutput>, ToolError> {
self.execute_tool_call(call).await
}
fn set_skill_env(&self, _env: Option<HashMap<String, String>>) {}
fn set_effective_trust(&self, _level: SkillTrustLevel) {}
fn is_tool_retryable(&self, _tool_id: &str) -> bool {
false
}
fn is_tool_speculatable(&self, _tool_id: &str) -> bool {
false
}
}
#[derive(Debug)]
struct StubCompressor;
impl OutputCompressor for StubCompressor {
fn compress<'a>(
&'a self,
_tool_name: &'a ToolName,
_output: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Option<String>, CompressionError>> + Send + 'a>>
{
Box::pin(async move { Ok(Some("COMPRESSED".to_owned())) })
}
fn name(&self) -> &'static str {
"stub"
}
}
#[tokio::test]
async fn t4_audit_sees_raw_llm_sees_compressed() {
let raw = "line\n".repeat(300);
let (spy, received) = SpyExecutor::new(raw.clone());
let executor = CompressedExecutor::new(spy, Arc::new(StubCompressor), 10);
let call = ToolCall {
tool_id: ToolName::new("spy"),
params: serde_json::Map::new(),
caller_id: None,
context: None,
};
let out = executor.execute_tool_call(&call).await.unwrap().unwrap();
assert_eq!(received.lock().unwrap().as_deref(), Some(raw.as_str()));
assert_eq!(out.summary, "COMPRESSED");
}
#[tokio::test]
async fn maybe_compress_skips_when_below_threshold() {
let short = "line\n".repeat(5);
let (spy, _received) = SpyExecutor::new(short.clone());
let executor = CompressedExecutor::new(spy, Arc::new(StubCompressor), 100);
let call = ToolCall {
tool_id: ToolName::new("spy"),
params: serde_json::Map::new(),
caller_id: None,
context: None,
};
let out = executor.execute_tool_call(&call).await.unwrap().unwrap();
assert_eq!(out.summary, short);
}
#[tokio::test]
async fn compression_error_falls_back_to_raw() {
#[derive(Debug)]
struct ErrorCompressor;
impl OutputCompressor for ErrorCompressor {
fn compress<'a>(
&'a self,
_tool_name: &'a ToolName,
_output: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Option<String>, CompressionError>> + Send + 'a>>
{
Box::pin(async move { Err(CompressionError::CompileTimeout) })
}
fn name(&self) -> &'static str {
"error"
}
}
let raw = "line\n".repeat(300);
let (spy, _) = SpyExecutor::new(raw.clone());
let executor = CompressedExecutor::new(spy, Arc::new(ErrorCompressor), 10);
let call = ToolCall {
tool_id: ToolName::new("spy"),
params: serde_json::Map::new(),
caller_id: None,
context: None,
};
let out = executor.execute_tool_call(&call).await.unwrap().unwrap();
assert_eq!(out.summary, raw);
}
}