1use chrono::{DateTime, Utc};
4use secrecy::{ExposeSecret, SecretString};
5use serde_json::Value;
6use sqlx::{PgPool, Postgres, Row, Transaction};
7
8use crate::db::pioneer_codes::{PioneerCodeRegistrationKind, consume_pioneer_code_for_agent_tx};
9use agentics_domain::models::ids::{AgentId, AgentTokenId};
10use agentics_error::{Result, ServiceError};
11
12use super::ids::{agent_id_from_row, agent_token_id_from_row};
13
14#[derive(Debug, Clone)]
16pub struct RegisterAgentInput {
17 pub agent_id: AgentId,
18 pub token_id: AgentTokenId,
19 pub token_hash: String,
20 pub display_name: String,
21 pub agent_description: String,
22 pub model_info: Value,
23}
24
25#[derive(Debug, Clone)]
27pub struct AgentRecord {
28 pub id: AgentId,
29 pub display_name: String,
30 pub agent_description: String,
31 pub model_info: Value,
32 pub status: String,
33 pub created_at: DateTime<Utc>,
34}
35
36#[derive(Debug, Clone)]
38pub struct AuthenticatedAgent {
39 pub agent_id: AgentId,
40 pub token_id: AgentTokenId,
41 pub display_name: String,
42}
43
44pub async fn register_agent(
46 pool: &PgPool,
47 input: &RegisterAgentInput,
48 max_active_agents: i64,
49) -> Result<AgentRecord> {
50 let mut tx = pool.begin().await?;
51 enforce_active_agent_quota_tx(&mut tx, max_active_agents).await?;
52
53 let agent = insert_agent_tx(&mut tx, input).await?;
54 insert_agent_token_tx(&mut tx, input).await?;
55
56 tx.commit().await?;
57
58 Ok(agent)
59}
60
61pub async fn register_agent_with_pioneer_code(
63 pool: &PgPool,
64 input: &RegisterAgentInput,
65 pioneer_code_hash: &str,
66 registration_kind: PioneerCodeRegistrationKind,
67 max_active_agents: i64,
68) -> Result<AgentRecord> {
69 let mut tx = pool.begin().await?;
70 enforce_active_agent_quota_tx(&mut tx, max_active_agents).await?;
71
72 let agent = insert_agent_tx(&mut tx, input).await?;
73 consume_pioneer_code_for_agent_tx(
74 &mut tx,
75 pioneer_code_hash,
76 input.agent_id.as_str(),
77 registration_kind,
78 )
79 .await?;
80 insert_agent_token_tx(&mut tx, input).await?;
81
82 tx.commit().await?;
83
84 Ok(agent)
85}
86
87pub(crate) async fn enforce_active_agent_quota_tx(
89 tx: &mut Transaction<'_, Postgres>,
90 max_active_agents: i64,
91) -> Result<()> {
92 lock_quota_scope(tx, "global:active-agents").await?;
93 let active = count_active_agents_tx(tx).await?;
94 if active >= max_active_agents {
95 return Err(ServiceError::TooManyRequests(format!(
96 "agent registration quota exceeded: {active} of {max_active_agents} active agents are already registered"
97 )));
98 }
99 Ok(())
100}
101
102async fn lock_quota_scope(tx: &mut Transaction<'_, Postgres>, scope: &str) -> Result<()> {
104 sqlx::query(
105 r#"
106 INSERT INTO quota_admission_locks (scope)
107 VALUES ($1)
108 ON CONFLICT (scope) DO NOTHING
109 "#,
110 )
111 .bind(scope)
112 .execute(&mut **tx)
113 .await?;
114
115 sqlx::query(
116 r#"
117 SELECT scope
118 FROM quota_admission_locks
119 WHERE scope = $1
120 FOR UPDATE
121 "#,
122 )
123 .bind(scope)
124 .fetch_one(&mut **tx)
125 .await?;
126
127 Ok(())
128}
129
130async fn count_active_agents_tx(tx: &mut Transaction<'_, Postgres>) -> Result<i64> {
132 let count =
133 sqlx::query_scalar::<_, i64>("SELECT COUNT(*)::BIGINT FROM agents WHERE status = 'active'")
134 .fetch_one(&mut **tx)
135 .await?;
136 Ok(count)
137}
138
139async fn insert_agent_tx(
141 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
142 input: &RegisterAgentInput,
143) -> Result<AgentRecord> {
144 let row = sqlx::query(
145 r#"
146 INSERT INTO agents (id, display_name, agent_description, model_info, status)
147 VALUES ($1::uuid, $2, $3, $4, 'active')
148 RETURNING id::text AS id, display_name, agent_description, model_info, status, created_at
149 "#,
150 )
151 .bind(input.agent_id.as_str())
152 .bind(&input.display_name)
153 .bind(&input.agent_description)
154 .bind(&input.model_info)
155 .fetch_one(&mut **tx)
156 .await?;
157
158 Ok(AgentRecord {
159 id: agent_id_from_row(&row, "id")?,
160 display_name: row.try_get("display_name")?,
161 agent_description: row.try_get("agent_description")?,
162 model_info: row.try_get("model_info")?,
163 status: row.try_get("status")?,
164 created_at: row.try_get("created_at")?,
165 })
166}
167
168async fn insert_agent_token_tx(
170 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
171 input: &RegisterAgentInput,
172) -> Result<()> {
173 sqlx::query(
174 "INSERT INTO agent_tokens (id, agent_id, token_hash) VALUES ($1::uuid, $2::uuid, $3)",
175 )
176 .bind(input.token_id.as_str())
177 .bind(input.agent_id.as_str())
178 .bind(&input.token_hash)
179 .execute(&mut **tx)
180 .await?;
181 Ok(())
182}
183
184pub async fn count_active_agents(pool: &PgPool) -> Result<i64> {
186 let count =
187 sqlx::query_scalar::<_, i64>("SELECT COUNT(*)::BIGINT FROM agents WHERE status = 'active'")
188 .fetch_one(pool)
189 .await?;
190
191 Ok(count)
192}
193
194pub async fn authenticate_agent_token(
196 pool: &PgPool,
197 token: &SecretString,
198) -> Result<Option<AuthenticatedAgent>> {
199 let token_hash = crate::auth::hash_agent_token(token.expose_secret());
200
201 let row = sqlx::query(
202 r#"
203 SELECT a.id::text AS agent_id, t.id::text AS token_id, a.display_name
204 FROM agent_tokens t
205 JOIN agents a ON a.id = t.agent_id
206 WHERE t.token_hash = $1
207 AND t.revoked_at IS NULL
208 AND a.status = 'active'
209 LIMIT 1
210 "#,
211 )
212 .bind(&token_hash)
213 .fetch_optional(pool)
214 .await?;
215
216 let Some(row) = row else {
217 return Ok(None);
218 };
219
220 let token_id = agent_token_id_from_row(&row, "token_id")?;
221 sqlx::query("UPDATE agent_tokens SET last_used_at = NOW() WHERE id = $1::uuid")
222 .bind(token_id.as_str())
223 .execute(pool)
224 .await?;
225
226 Ok(Some(AuthenticatedAgent {
227 agent_id: agent_id_from_row(&row, "agent_id")?,
228 token_id,
229 display_name: row.try_get("display_name")?,
230 }))
231}
232
233pub async fn disable_agent(pool: &PgPool, agent_id: &str) -> Result<()> {
235 let row = sqlx::query("UPDATE agents SET status = 'disabled' WHERE id = $1::uuid RETURNING id")
236 .bind(agent_id)
237 .fetch_optional(pool)
238 .await?;
239
240 if row.is_none() {
241 return Err(ServiceError::NotFound);
242 }
243
244 sqlx::query(
245 "UPDATE agent_tokens SET revoked_at = COALESCE(revoked_at, NOW()) WHERE agent_id = $1::uuid",
246 )
247 .bind(agent_id)
248 .execute(pool)
249 .await?;
250
251 Ok(())
252}