use async_trait::async_trait;
use std::io::Write as _;
use std::process::{Command, Stdio};
use crate::integrations::analyze_client::{
AnalyzeClient, AnalyzeClientError, AnalyzeHealthResponse, ComplexityHotspot, Smell,
};
use super::{DEFAULT_ANALYZE_BIN, ENV_ANALYZE_BIN, SubprocessReviewReport, map_report};
pub struct SubprocessAnalyzeClient {
pub(super) binary: String,
pub(super) search_url: String,
pub(super) probe_http: reqwest::Client,
}
impl SubprocessAnalyzeClient {
pub fn new(
binary: impl Into<String>,
search_url: impl Into<String>,
) -> Result<Self, AnalyzeClientError> {
let probe_http = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()
.map_err(|e| AnalyzeClientError::ClientInit(e.to_string()))?;
Ok(Self {
binary: binary.into(),
search_url: search_url.into(),
probe_http,
})
}
pub fn from_config(config: &crate::config::ReviewConfig) -> Result<Self, AnalyzeClientError> {
let binary = std::env::var(ENV_ANALYZE_BIN)
.ok()
.filter(|s| !s.is_empty())
.unwrap_or_else(|| DEFAULT_ANALYZE_BIN.to_string());
Self::new(binary, config.search_url.clone())
}
pub fn binary(&self) -> &str {
&self.binary
}
pub async fn analyze_diff(
&self,
diff_text: &str,
index_id: &str,
) -> Result<(Vec<ComplexityHotspot>, Vec<Smell>), AnalyzeClientError> {
let binary = self.binary.clone();
let index_id = index_id.to_string();
let diff_owned = diff_text.to_string();
tokio::task::spawn_blocking(move || spawn_analyze_review(&binary, &index_id, &diff_owned))
.await
.map_err(|e| AnalyzeClientError::Transport(format!("spawn_blocking join error: {e}")))?
}
}
pub(super) fn spawn_analyze_review(
binary: &str,
index_id: &str,
diff: &str,
) -> Result<(Vec<ComplexityHotspot>, Vec<Smell>), AnalyzeClientError> {
let mut child = Command::new(binary)
.args(["review", "--index-id", index_id, "--format", "json", "-"])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| AnalyzeClientError::Unavailable(format!("failed to spawn {binary}: {e}")))?;
{
let stdin = child.stdin.as_mut().expect("stdin pipe always present");
match stdin.write_all(diff.as_bytes()) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
}
Err(e) => {
return Err(AnalyzeClientError::Transport(format!(
"write to stdin: {e}"
)));
}
}
}
let output = child
.wait_with_output()
.map_err(|e| AnalyzeClientError::Transport(format!("wait_with_output: {e}")))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(AnalyzeClientError::Unavailable(format!(
"trusty-analyze review exited with {}: {}",
output.status,
stderr.trim()
)));
}
let json = std::str::from_utf8(&output.stdout)
.map_err(|e| AnalyzeClientError::Parse(format!("stdout is not UTF-8: {e}")))?;
let report: SubprocessReviewReport = serde_json::from_str(json)
.map_err(|e| AnalyzeClientError::Parse(format!("ReviewReport parse error: {e}")))?;
Ok(map_report(&report))
}
#[async_trait]
impl AnalyzeClient for SubprocessAnalyzeClient {
async fn health(&self) -> Result<AnalyzeHealthResponse, AnalyzeClientError> {
let url = format!("{}/health", self.search_url.trim_end_matches('/'));
let resp = self
.probe_http
.get(&url)
.send()
.await
.map_err(|e| AnalyzeClientError::Unavailable(format!("GET {url}: {e}")))?;
let status = resp.status();
let body = resp
.text()
.await
.map_err(|e| AnalyzeClientError::Transport(format!("read body of {url}: {e}")))?;
if !status.is_success() {
return Err(AnalyzeClientError::Unavailable(format!(
"GET {url} returned {status}: {body}"
)));
}
#[derive(serde::Deserialize)]
struct SearchHealth {
status: String,
}
let sh: SearchHealth = serde_json::from_str(&body)
.map_err(|e| AnalyzeClientError::Parse(format!("search health parse: {e}")))?;
let binary_ok = {
let binary = self.binary.clone();
tokio::task::spawn_blocking(move || {
Command::new(&binary)
.arg("--version")
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.is_ok()
})
.await
.unwrap_or(false)
};
if !binary_ok {
return Err(AnalyzeClientError::Unavailable(format!(
"trusty-analyze binary '{}' is not on PATH or not executable",
self.binary
)));
}
Ok(AnalyzeHealthResponse {
status: sh.status.clone(),
search_reachable: sh.status == "ok",
})
}
async fn has_analysis(&self, _index_id: &str) -> bool {
match self.health().await {
Ok(h) => h.is_healthy(),
Err(e) => {
tracing::debug!("trusty-analyze subprocess health check failed (optional): {e}");
false
}
}
}
async fn complexity_hotspots(
&self,
_index_id: &str,
_top_k: Option<u32>,
) -> Result<Vec<ComplexityHotspot>, AnalyzeClientError> {
Ok(vec![])
}
async fn smells(&self, _index_id: &str) -> Result<Vec<Smell>, AnalyzeClientError> {
Ok(vec![])
}
}