use std::collections::{BTreeMap, BTreeSet};
use axum::Json;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use futures::{StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use tracing::{info, warn};
use kanade_shared::kv::{BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, parse_agent_config_group_key};
use kanade_shared::wire::AgentGroups;
use super::AppState;
#[derive(Serialize)]
pub struct GroupSummary {
pub name: String,
pub members: Vec<String>,
pub has_config: bool,
}
#[derive(Serialize)]
pub struct GroupsOverview {
pub groups: Vec<GroupSummary>,
}
pub async fn list_all_groups(
State(state): State<AppState>,
) -> Result<Json<GroupsOverview>, (StatusCode, String)> {
let kv = open_bucket(&state).await?;
let mut pc_ids: Vec<String> = Vec::new();
match kv.keys().await {
Ok(mut keys) => {
while let Some(k) = keys.next().await {
match k {
Ok(k) => pc_ids.push(k),
Err(e) => warn!(error = %e, "agent_groups keys()"),
}
}
}
Err(e) => {
warn!(error = %e, "agent_groups keys() for overview");
}
}
const READ_CONCURRENCY: usize = 16;
let rows: Vec<(String, AgentGroups)> = futures::stream::iter(pc_ids)
.map(|pc_id| {
let kv = kv.clone();
async move {
let g = read_or_default(&kv, &pc_id).await?;
Ok::<_, (StatusCode, String)>((pc_id, g))
}
})
.buffer_unordered(READ_CONCURRENCY)
.try_collect()
.await?;
let mut by_group: BTreeMap<String, Vec<String>> = BTreeMap::new();
for (pc_id, g) in rows {
for name in g.groups {
by_group.entry(name).or_default().push(pc_id.clone());
}
}
let mut with_config: BTreeSet<String> = BTreeSet::new();
if let Ok(cfg_kv) = state.jetstream.get_key_value(BUCKET_AGENT_CONFIG).await
&& let Ok(mut cfg_keys) = cfg_kv.keys().await
{
while let Some(k) = cfg_keys.next().await {
if let Ok(key) = k
&& let Some(name) = parse_agent_config_group_key(&key)
{
with_config.insert(name.to_string());
}
}
}
let mut all_names: BTreeSet<String> = by_group.keys().cloned().collect();
all_names.extend(with_config.iter().cloned());
let groups = all_names
.into_iter()
.map(|name| {
let mut members = by_group.remove(&name).unwrap_or_default();
members.sort();
GroupSummary {
has_config: with_config.contains(&name),
name,
members,
}
})
.collect();
Ok(Json(GroupsOverview { groups }))
}
pub async fn list_groups(
State(state): State<AppState>,
Path(pc_id): Path<String>,
) -> Result<Json<AgentGroups>, (StatusCode, String)> {
let kv = open_bucket(&state).await?;
Ok(Json(read_or_default(&kv, &pc_id).await?))
}
pub async fn set_groups(
State(state): State<AppState>,
Path(pc_id): Path<String>,
Json(payload): Json<AgentGroups>,
) -> Result<Json<AgentGroups>, (StatusCode, String)> {
let normalised = AgentGroups::new(payload.groups);
let kv = open_bucket(&state).await?;
write(&kv, &pc_id, &normalised).await?;
info!(pc_id = %pc_id, groups = ?normalised.groups, "agent_groups replaced");
Ok(Json(normalised))
}
#[derive(Deserialize)]
pub struct AddGroupBody {
pub group: String,
}
pub async fn add_group(
State(state): State<AppState>,
Path(pc_id): Path<String>,
Json(body): Json<AddGroupBody>,
) -> Result<Json<AgentGroups>, (StatusCode, String)> {
if body.group.trim().is_empty() {
return Err((
StatusCode::BAD_REQUEST,
"group name must not be empty".into(),
));
}
let kv = open_bucket(&state).await?;
let mut current = read_or_default(&kv, &pc_id).await?;
if current.insert(&body.group) {
write(&kv, &pc_id, ¤t).await?;
info!(pc_id = %pc_id, group = %body.group, "agent_group added");
}
Ok(Json(current))
}
pub async fn remove_group(
State(state): State<AppState>,
Path((pc_id, group)): Path<(String, String)>,
) -> Result<Json<AgentGroups>, (StatusCode, String)> {
let kv = open_bucket(&state).await?;
let mut current = read_or_default(&kv, &pc_id).await?;
if current.remove(&group) {
write(&kv, &pc_id, ¤t).await?;
info!(pc_id = %pc_id, group = %group, "agent_group removed");
}
Ok(Json(current))
}
async fn open_bucket(
state: &AppState,
) -> Result<async_nats::jetstream::kv::Store, (StatusCode, String)> {
state
.jetstream
.get_key_value(BUCKET_AGENT_GROUPS)
.await
.map_err(|e| {
warn!(error = %e, bucket = BUCKET_AGENT_GROUPS, "open agent_groups KV bucket");
(
StatusCode::SERVICE_UNAVAILABLE,
format!("agent_groups KV bucket unavailable: {e}"),
)
})
}
async fn read_or_default(
kv: &async_nats::jetstream::kv::Store,
pc_id: &str,
) -> Result<AgentGroups, (StatusCode, String)> {
match kv.get(pc_id).await {
Ok(Some(bytes)) => serde_json::from_slice(&bytes).map_err(|e| {
warn!(error = %e, pc_id, "decode agent_groups");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("decode agent_groups for {pc_id}: {e}"),
)
}),
Ok(None) => Ok(AgentGroups::default()),
Err(e) => {
warn!(error = %e, pc_id, "read agent_groups");
Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("read agent_groups for {pc_id}: {e}"),
))
}
}
}
async fn write(
kv: &async_nats::jetstream::kv::Store,
pc_id: &str,
groups: &AgentGroups,
) -> Result<(), (StatusCode, String)> {
let bytes = serde_json::to_vec(groups).map_err(|e| {
warn!(error = %e, pc_id, "encode agent_groups");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("encode agent_groups for {pc_id}: {e}"),
)
})?;
kv.put(pc_id, bytes.into()).await.map_err(|e| {
warn!(error = %e, pc_id, "write agent_groups");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("write agent_groups for {pc_id}: {e}"),
)
})?;
Ok(())
}