use std::sync::Arc;
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use trusty_mpm::core::sm::prune::{PruneAction, decide};
pub(crate) use trusty_mpm::core::sm::prune::PruneError;
const MAX_INFLIGHT_VERDICTS: usize = 12;
pub(crate) const EXIT_SM_UNAVAILABLE: i32 = 75;
#[derive(Debug, Clone)]
pub(crate) struct SessionVerdict {
pub(crate) id: String,
pub(crate) name: String,
pub(crate) verdict: Option<String>,
}
#[derive(Debug, Clone)]
pub(crate) struct PlannedAction {
pub(crate) id: String,
pub(crate) name: String,
pub(crate) verdict: String,
pub(crate) action: PruneAction,
}
#[derive(Debug, Serialize)]
struct JsonRow {
id: String,
name: String,
verdict: String,
action: String,
reason: String,
}
#[derive(Debug, Serialize)]
struct JsonPlan {
dry_run: bool,
actionable: usize,
total: usize,
sessions: Vec<JsonRow>,
sm_available: bool,
}
pub(crate) fn build_plan(rows: &[SessionVerdict]) -> Vec<PlannedAction> {
rows.iter()
.map(|row| PlannedAction {
id: row.id.clone(),
name: row.name.clone(),
verdict: row.verdict.clone().unwrap_or_else(|| "none".to_string()),
action: decide(row.verdict.as_deref()),
})
.collect()
}
pub(crate) fn actionable_count(plan: &[PlannedAction]) -> usize {
plan.iter().filter(|p| p.action.is_actionable()).count()
}
pub(crate) fn render_plan_text(plan: &[PlannedAction], dry_run: bool) -> String {
if plan.is_empty() {
return "no managed sessions to prune\n".to_string();
}
let mut out = String::new();
for p in plan {
let reason = match &p.action {
PruneAction::Skip(why) => format!(": {why}"),
_ => String::new(),
};
out.push_str(&format!(
"{:<13} {} ({}) verdict={}{}\n",
p.action.label(),
p.name,
short_id(&p.id),
p.verdict,
reason,
));
}
let n = actionable_count(plan);
let verb = if dry_run { "would act on" } else { "acted on" };
out.push_str(&format!(
"{verb} {n} of {} session(s){}\n",
plan.len(),
if dry_run { " (dry run)" } else { "" }
));
out
}
pub(crate) fn render_plan_json(plan: &[PlannedAction], dry_run: bool) -> anyhow::Result<String> {
let sessions = plan
.iter()
.map(|p| JsonRow {
id: p.id.clone(),
name: p.name.clone(),
verdict: p.verdict.clone(),
action: p.action.label().to_string(),
reason: match &p.action {
PruneAction::Skip(why) => (*why).to_string(),
_ => String::new(),
},
})
.collect::<Vec<_>>();
let doc = JsonPlan {
dry_run,
actionable: actionable_count(plan),
total: plan.len(),
sessions,
sm_available: true,
};
Ok(serde_json::to_string_pretty(&doc)?)
}
pub(crate) fn render_unavailable_json(dry_run: bool) -> anyhow::Result<String> {
let doc = JsonPlan {
dry_run,
actionable: 0,
total: 0,
sessions: Vec::new(),
sm_available: false,
};
Ok(serde_json::to_string_pretty(&doc)?)
}
fn short_id(id: &str) -> &str {
id.split('-').next().unwrap_or(id)
}
pub(crate) async fn prune_idle(
client: &reqwest::Client,
url: &str,
dry_run: bool,
json: bool,
) -> anyhow::Result<()> {
let sessions = match fetch_sessions(client, url).await {
Ok(sessions) => sessions,
Err(FetchSessionsError::Unreachable) => {
if json {
println!("{}", render_unavailable_json(dry_run)?);
} else {
eprintln!("{}", PruneError::SmUnavailable);
}
return Err(PruneError::SmUnavailable.into());
}
Err(FetchSessionsError::Http(e)) => return Err(e),
};
let rows = fetch_verdicts(client, url, sessions).await;
let plan = build_plan(&rows);
if json {
println!("{}", render_plan_json(&plan, dry_run)?);
} else {
print!("{}", render_plan_text(&plan, dry_run));
}
if dry_run {
return Ok(());
}
for p in &plan {
match p.action {
PruneAction::Stop => {
super::managed::session_stop(client, url, p.id.clone()).await?;
}
PruneAction::Decommission => {
super::managed::session_decommission(client, url, p.id.clone()).await?;
}
PruneAction::Skip(_) => {}
}
}
Ok(())
}
#[derive(Debug, Clone, Deserialize)]
struct SessionRef {
id: String,
name: String,
}
enum FetchSessionsError {
Unreachable,
Http(anyhow::Error),
}
async fn fetch_sessions(
client: &reqwest::Client,
url: &str,
) -> Result<Vec<SessionRef>, FetchSessionsError> {
#[derive(Deserialize)]
struct ListResp {
sessions: Vec<SessionRef>,
}
let resp = client
.get(format!("{url}/api/v1/sessions/managed"))
.send()
.await
.map_err(|_| FetchSessionsError::Unreachable)?;
let status = resp.status();
if !status.is_success() {
return Err(FetchSessionsError::Http(anyhow::anyhow!(
"session manager returned HTTP {status} listing managed sessions"
)));
}
let body: ListResp = resp
.json()
.await
.map_err(|e| FetchSessionsError::Http(e.into()))?;
Ok(body.sessions)
}
async fn fetch_verdicts(
client: &reqwest::Client,
url: &str,
sessions: Vec<SessionRef>,
) -> Vec<SessionVerdict> {
let semaphore = Arc::new(Semaphore::new(MAX_INFLIGHT_VERDICTS));
let mut join_set: JoinSet<(usize, SessionVerdict)> = JoinSet::new();
for (idx, s) in sessions.into_iter().enumerate() {
let client = client.clone();
let url = url.to_string();
let semaphore = Arc::clone(&semaphore);
join_set.spawn(async move {
let _permit = semaphore
.acquire()
.await
.expect("prune verdict semaphore is never closed");
let verdict = fetch_verdict(&client, &url, &s.id).await;
(
idx,
SessionVerdict {
id: s.id,
name: s.name,
verdict,
},
)
});
}
let mut indexed: Vec<(usize, SessionVerdict)> = Vec::new();
while let Some(joined) = join_set.join_next().await {
let row = joined.expect("prune verdict task panicked");
indexed.push(row);
}
reorder_by_index(indexed)
}
fn reorder_by_index(mut indexed: Vec<(usize, SessionVerdict)>) -> Vec<SessionVerdict> {
indexed.sort_by_key(|(idx, _)| *idx);
indexed.into_iter().map(|(_, row)| row).collect()
}
async fn fetch_verdict(client: &reqwest::Client, url: &str, id: &str) -> Option<String> {
#[derive(Deserialize)]
struct ActivityResp {
#[serde(default)]
state: Option<String>,
#[serde(default)]
classification: Option<String>,
}
let resp = client
.get(format!("{url}/api/v1/sessions/managed/{id}/activity"))
.send()
.await
.ok()?;
if !resp.status().is_success() {
return None;
}
let body: ActivityResp = resp.json().await.ok()?;
body.classification.or(body.state)
}
#[cfg(test)]
mod tests {
use super::*;
fn row(id: &str, name: &str, verdict: Option<&str>) -> SessionVerdict {
SessionVerdict {
id: id.to_string(),
name: name.to_string(),
verdict: verdict.map(str::to_string),
}
}
#[test]
fn build_plan_maps_each_verdict() {
let rows = vec![
row("11111111-a", "alpha", Some("idle")),
row("22222222-b", "bravo", Some("done")),
row("33333333-c", "charlie", Some("working")),
row("44444444-d", "delta", None),
];
let plan = build_plan(&rows);
assert_eq!(plan[0].action, PruneAction::Stop);
assert_eq!(plan[1].action, PruneAction::Decommission);
assert!(matches!(plan[2].action, PruneAction::Skip(_)));
assert!(matches!(plan[3].action, PruneAction::Skip(_)));
assert_eq!(plan[3].verdict, "none");
}
#[test]
fn build_plan_preserves_order() {
let rows = vec![
row("aaaa-1", "a", Some("idle")),
row("bbbb-2", "b", Some("idle")),
];
let plan = build_plan(&rows);
assert_eq!(plan[0].id, "aaaa-1");
assert_eq!(plan[1].id, "bbbb-2");
}
#[test]
fn build_plan_dry_run_matches_live_plan() {
let rows = vec![
row("1-a", "a", Some("idle")),
row("2-b", "b", Some("done")),
row("3-c", "c", Some("errored")),
];
let dry = build_plan(&rows);
let live = build_plan(&rows);
let labels = |p: &[PlannedAction]| p.iter().map(|x| x.action.label()).collect::<Vec<_>>();
assert_eq!(labels(&dry), labels(&live));
assert_eq!(actionable_count(&dry), 2);
}
#[test]
fn actionable_count_excludes_skips() {
let rows = vec![
row("1", "a", Some("idle")),
row("2", "b", Some("working")),
row("3", "c", Some("done")),
];
assert_eq!(actionable_count(&build_plan(&rows)), 2);
}
#[test]
fn render_plan_text_lists_actions() {
let rows = vec![
row("11111111-aaaa", "alpha", Some("idle")),
row("22222222-bbbb", "bravo", Some("working")),
];
let out = render_plan_text(&build_plan(&rows), true);
assert!(out.contains("stop"));
assert!(out.contains("alpha"));
assert!(out.contains("11111111"));
assert!(out.contains("skip"));
assert!(out.contains("bravo"));
assert!(out.contains("dry run"));
assert!(out.contains("would act on 1 of 2"));
}
#[test]
fn render_plan_text_empty() {
assert_eq!(
render_plan_text(&[], true),
"no managed sessions to prune\n"
);
}
#[test]
fn render_plan_json_shape() {
let rows = vec![
row("1-a", "alpha", Some("idle")),
row("2-b", "bravo", Some("working")),
];
let json = render_plan_json(&build_plan(&rows), true).expect("json");
let v: serde_json::Value = serde_json::from_str(&json).expect("parse");
assert_eq!(v["dry_run"], true);
assert_eq!(v["actionable"], 1);
assert_eq!(v["total"], 2);
assert_eq!(v["sessions"][0]["action"], "stop");
assert_eq!(v["sessions"][0]["verdict"], "idle");
assert_eq!(v["sessions"][1]["action"], "skip");
assert_eq!(v["sessions"][1]["reason"], "working");
}
#[test]
fn fetch_verdicts_preserves_order() {
let shuffled = vec![
(2, row("c", "charlie", Some("done"))),
(0, row("a", "alpha", Some("idle"))),
(1, row("b", "bravo", Some("working"))),
];
let ordered = reorder_by_index(shuffled);
let ids: Vec<&str> = ordered.iter().map(|r| r.id.as_str()).collect();
assert_eq!(ids, ["a", "b", "c"]);
}
#[test]
fn render_unavailable_json_shape() {
let json = render_unavailable_json(true).expect("json");
let v: serde_json::Value = serde_json::from_str(&json).expect("parse");
assert_eq!(v["dry_run"], true);
assert_eq!(v["actionable"], 0);
assert_eq!(v["total"], 0);
assert!(v["sessions"].is_array());
assert_eq!(v["sessions"].as_array().expect("array").len(), 0);
assert_eq!(v["sm_available"], false);
let avail = render_plan_json(&[], false).expect("json");
let av: serde_json::Value = serde_json::from_str(&avail).expect("parse");
assert_eq!(av["sm_available"], true);
}
#[test]
fn unavailable_exit_code_is_stable() {
assert_eq!(EXIT_SM_UNAVAILABLE, 75);
}
}