use std::collections::HashMap;
use objectiveai_sdk::cli::command::agents::instances::list::ResponseItem;
use sqlx::Row as _;
use super::{Error, Pool};
async fn aggregate(
pool: &Pool,
exact: Option<&str>,
prefix: Option<&str>,
prefix_deep: Option<&str>,
) -> Result<Vec<ResponseItem>, Error> {
let rows = sqlx::query(
"WITH log_agg AS ( \
SELECT agent_instance_hierarchy AS aih, \
MIN(\"timestamp\") AS spawned, \
MAX(\"timestamp\") AS active, \
COUNT(*) AS logged \
FROM logs.messages \
WHERE ( \
($1::text IS NOT NULL AND agent_instance_hierarchy = $1) \
OR ($2::text IS NOT NULL \
AND agent_instance_hierarchy LIKE $2 \
AND agent_instance_hierarchy NOT LIKE $3) \
) \
GROUP BY agent_instance_hierarchy \
), \
queue_agg AS ( \
SELECT eff_aih AS aih, COUNT(*) AS queued \
FROM ( \
SELECT COALESCE( \
mq.agent_instance_hierarchy, \
t.agent_instance_hierarchy \
) AS eff_aih \
FROM message_queue mq \
LEFT JOIN tags t \
ON mq.agent_tag IS NOT NULL AND t.name = mq.agent_tag \
WHERE mq.active = TRUE \
) x \
WHERE ( \
($1::text IS NOT NULL AND x.eff_aih = $1) \
OR ($2::text IS NOT NULL \
AND x.eff_aih LIKE $2 \
AND x.eff_aih NOT LIKE $3) \
) \
GROUP BY eff_aih \
) \
SELECT COALESCE(l.aih, q.aih) AS aih, \
l.spawned AS spawned, \
l.active AS active, \
COALESCE(l.logged, 0)::bigint AS logged, \
COALESCE(q.queued, 0)::bigint AS queued \
FROM log_agg l \
FULL OUTER JOIN queue_agg q ON q.aih = l.aih \
ORDER BY aih ASC",
)
.bind(exact)
.bind(prefix)
.bind(prefix_deep)
.fetch_all(&**pool)
.await?;
let tag_rows = sqlx::query(
"SELECT agent_instance_hierarchy, name FROM tags \
WHERE ( \
($1::text IS NOT NULL AND agent_instance_hierarchy = $1) \
OR ($2::text IS NOT NULL \
AND agent_instance_hierarchy LIKE $2 \
AND agent_instance_hierarchy NOT LIKE $3) \
) \
ORDER BY updated_at DESC",
)
.bind(exact)
.bind(prefix)
.bind(prefix_deep)
.fetch_all(&**pool)
.await?;
let mut tags_by_aih: HashMap<String, Vec<String>> = HashMap::new();
for row in tag_rows {
let aih: String = row.try_get("agent_instance_hierarchy")?;
let name: String = row.try_get("name")?;
tags_by_aih.entry(aih).or_default().push(name);
}
let mut out = Vec::with_capacity(rows.len());
for row in rows {
let aih: String = row.try_get("aih")?;
let spawned: Option<i64> = row.try_get("spawned")?;
let active: Option<i64> = row.try_get("active")?;
let logged: i64 = row.try_get("logged")?;
let queued: i64 = row.try_get("queued")?;
let tags = tags_by_aih.remove(&aih).unwrap_or_default();
out.push(ResponseItem {
agent_instance_hierarchy: aih,
tags,
queued: queued as u64,
timestamp_spawned: spawned,
timestamp_active: active,
logged: logged as u64,
});
}
Ok(out)
}
pub async fn list_under_parent(
pool: &Pool,
parent: &str,
) -> Result<Vec<ResponseItem>, Error> {
let prefix = format!("{parent}/%");
let prefix_deep = format!("{parent}/%/%");
aggregate(pool, None, Some(&prefix), Some(&prefix_deep)).await
}
pub async fn get_exact(pool: &Pool, aih: &str) -> Result<ResponseItem, Error> {
let mut items = aggregate(pool, Some(aih), None, None).await?;
if let Some(item) = items.pop() {
return Ok(item);
}
let tags = super::tags::tags_for_hierarchy(pool, aih).await?;
Ok(ResponseItem {
agent_instance_hierarchy: aih.to_string(),
tags,
queued: 0,
timestamp_spawned: None,
timestamp_active: None,
logged: 0,
})
}