Skip to main content

impactsense_parser/
compress.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::time::Duration;
3
4use base64::{engine::general_purpose::STANDARD as B64, Engine as _};
5use reqwest::Client;
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9use crate::LanguageId;
10
11pub const DEFAULT_COMPRESSOR_URL: &str = "http://10.166.1.220:8787";
12const MIN_SNIPPET_BYTES: usize = 8;
13
14#[derive(Debug, Clone)]
15pub struct CompressorConfig {
16    pub base_url: String,
17    pub enabled: bool,
18}
19
20impl Default for CompressorConfig {
21    fn default() -> Self {
22        Self {
23            base_url: std::env::var("REDCOMPRESSOR_URL")
24                .unwrap_or_else(|_| DEFAULT_COMPRESSOR_URL.to_string()),
25            enabled: false,
26        }
27    }
28}
29
30#[derive(Debug, Error)]
31pub enum CompressError {
32    #[error("HTTP request failed: {0}")]
33    Http(#[from] reqwest::Error),
34    #[error("compress API returned {status}: {message}")]
35    Api { status: u16, message: String },
36    #[error("invalid base64 in compress response: {0}")]
37    BadBase64(#[from] base64::DecodeError),
38}
39
40#[derive(Debug, Serialize)]
41struct CompressRequest<'a> {
42    code: &'a str,
43    language: &'a str,
44}
45
46#[derive(Debug, Deserialize)]
47struct CompressResponse {
48    blob_b64: String,
49}
50
51#[derive(Debug, Deserialize)]
52struct ApiErrorBody {
53    error: Option<String>,
54    message: Option<String>,
55}
56
57/// HTTP client for the production RedCompressor sidecar.
58pub struct CompressorClient {
59    client: Client,
60    base_url: String,
61    logged_network_failure: AtomicBool,
62    logged_dict_missing: AtomicBool,
63}
64
65impl CompressorClient {
66    pub fn new(base_url: impl Into<String>) -> Result<Self, CompressError> {
67        let client = Client::builder()
68            .timeout(Duration::from_secs(30))
69            .build()?;
70        Ok(Self {
71            client,
72            base_url: base_url.into().trim_end_matches('/').to_string(),
73            logged_network_failure: AtomicBool::new(false),
74            logged_dict_missing: AtomicBool::new(false),
75        })
76    }
77
78    pub fn from_config(config: &CompressorConfig) -> Result<Self, CompressError> {
79        Self::new(config.base_url.clone())
80    }
81
82    pub async fn health_check(&self) -> Result<(), CompressError> {
83        let url = format!("{}/healthz", self.base_url);
84        let resp = self.client.get(&url).send().await?;
85        if resp.status().is_success() {
86            Ok(())
87        } else {
88            Err(CompressError::Api {
89                status: resp.status().as_u16(),
90                message: format!("healthz returned {}", resp.status()),
91            })
92        }
93    }
94
95    pub async fn compress_code(
96        &self,
97        code: &str,
98        language: LanguageId,
99    ) -> Option<Vec<u8>> {
100        if code.len() < MIN_SNIPPET_BYTES {
101            return None;
102        }
103        let Some(lang) = compressor_language_name(language) else {
104            return None;
105        };
106
107        match self.compress_code_raw(code, lang).await {
108            Ok(blob) => Some(blob),
109            Err(CompressError::Api { status, message }) => {
110                if status == 422 && message.contains("dict_missing") {
111                    if !self.logged_dict_missing.swap(true, Ordering::Relaxed) {
112                        eprintln!(
113                            "RedCompressor: dict_missing for language `{lang}` — skipping code_bytes (further warnings suppressed)"
114                        );
115                    }
116                } else if status == 400 && message.contains("unknown_language") {
117                    eprintln!("RedCompressor: unknown language `{lang}`");
118                } else {
119                    eprintln!("RedCompressor: API error {status}: {message}");
120                }
121                None
122            }
123            Err(e) => {
124                if !self.logged_network_failure.swap(true, Ordering::Relaxed) {
125                    eprintln!("RedCompressor: request failed ({e}) — skipping code_bytes (further warnings suppressed)");
126                }
127                None
128            }
129        }
130    }
131
132    async fn compress_code_raw(&self, code: &str, language: &str) -> Result<Vec<u8>, CompressError> {
133        let url = format!("{}/v1/compress", self.base_url);
134        let body = CompressRequest { code, language };
135        let resp = self.client.post(&url).json(&body).send().await?;
136
137        let status = resp.status();
138        if status.is_success() {
139            let parsed: CompressResponse = resp.json().await?;
140            return B64.decode(parsed.blob_b64).map_err(CompressError::BadBase64);
141        }
142
143        let text = resp.text().await.unwrap_or_default();
144        let message = serde_json::from_str::<ApiErrorBody>(&text)
145            .ok()
146            .and_then(|e| e.message.or(e.error))
147            .unwrap_or(text);
148        Err(CompressError::Api {
149            status: status.as_u16(),
150            message,
151        })
152    }
153}
154
155/// Map parser language to RedCompressor API short name.
156pub fn compressor_language_name(language: LanguageId) -> Option<&'static str> {
157    match language {
158        LanguageId::Java => Some("java"),
159        LanguageId::JavaScript => Some("javascript"),
160        LanguageId::TypeScript | LanguageId::Tsx => Some("typescript"),
161        LanguageId::Python => Some("python"),
162        LanguageId::Rust => Some("rust"),
163        LanguageId::Go => Some("go"),
164        LanguageId::Erlang => Some("erlang"),
165        LanguageId::CSharp => Some("csharp"),
166    }
167}
168
169/// Slice source by byte range and compress via the HTTP API.
170pub async fn compress_snippet(
171    source: &str,
172    span: Option<(usize, usize)>,
173    language: LanguageId,
174    client: &CompressorClient,
175) -> Option<Vec<u8>> {
176    let (lo, hi) = span?;
177    let lo = lo.min(source.len());
178    let hi = hi.min(source.len());
179    if lo >= hi {
180        return None;
181    }
182    client.compress_code(&source[lo..hi], language).await
183}
184
185/// Compress full source text (e.g. Erlang Module node).
186pub async fn compress_full_source(
187    source: &str,
188    language: LanguageId,
189    client: &CompressorClient,
190) -> Option<Vec<u8>> {
191    if source.is_empty() {
192        return None;
193    }
194    client.compress_code(source, language).await
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200
201    #[test]
202    fn maps_all_parser_languages_including_erlang() {
203        assert_eq!(compressor_language_name(LanguageId::Erlang), Some("erlang"));
204        assert_eq!(compressor_language_name(LanguageId::CSharp), Some("csharp"));
205        assert_eq!(compressor_language_name(LanguageId::Tsx), Some("typescript"));
206        assert_eq!(compressor_language_name(LanguageId::Java), Some("java"));
207    }
208
209    #[test]
210    fn compress_request_json_matches_script_format() {
211        let req = CompressRequest {
212            code: "def f():\n    return 1\n",
213            language: "python",
214        };
215        let json = serde_json::to_string(&req).unwrap();
216        assert!(json.contains(r#""language":"python""#));
217        assert!(json.contains(r#"\n"#));
218    }
219
220    #[test]
221    fn decodes_compress_response_blob() {
222        let sample = b"hello world".to_vec();
223        let b64 = B64.encode(&sample);
224        let resp = CompressResponse { blob_b64: b64 };
225        let decoded = B64.decode(resp.blob_b64).unwrap();
226        assert_eq!(decoded, sample);
227    }
228}