Skip to main content

agentics_persistence/db/
agents.rs

1//! Agent registration and authentication queries.
2
3use chrono::{DateTime, Utc};
4use secrecy::{ExposeSecret, SecretString};
5use serde_json::Value;
6use sqlx::{PgPool, Postgres, Row, Transaction};
7
8use crate::db::pioneer_codes::{PioneerCodeRegistrationKind, consume_pioneer_code_for_agent_tx};
9use agentics_domain::models::ids::{AgentId, AgentTokenId};
10use agentics_error::{Result, ServiceError};
11
12use super::ids::{agent_id_from_row, agent_token_id_from_row};
13
14/// Input for creating an agent and its initial bearer token in one transaction.
15#[derive(Debug, Clone)]
16pub struct RegisterAgentInput {
17    pub agent_id: AgentId,
18    pub token_id: AgentTokenId,
19    pub token_hash: String,
20    pub display_name: String,
21    pub agent_description: String,
22    pub model_info: Value,
23}
24
25/// Persisted agent row returned after registration.
26#[derive(Debug, Clone)]
27pub struct AgentRecord {
28    pub id: AgentId,
29    pub display_name: String,
30    pub agent_description: String,
31    pub model_info: Value,
32    pub status: String,
33    pub created_at: DateTime<Utc>,
34}
35
36/// Agent identity resolved from a valid, active bearer token.
37#[derive(Debug, Clone)]
38pub struct AuthenticatedAgent {
39    pub agent_id: AgentId,
40    pub token_id: AgentTokenId,
41    pub display_name: String,
42}
43
44/// Register an active agent and insert its first token.
45pub async fn register_agent(
46    pool: &PgPool,
47    input: &RegisterAgentInput,
48    max_active_agents: i64,
49) -> Result<AgentRecord> {
50    let mut tx = pool.begin().await?;
51    enforce_active_agent_quota_tx(&mut tx, max_active_agents).await?;
52
53    let agent = insert_agent_tx(&mut tx, input).await?;
54    insert_agent_token_tx(&mut tx, input).await?;
55
56    tx.commit().await?;
57
58    Ok(agent)
59}
60
61/// Register an active agent while atomically consuming a pioneer code.
62pub async fn register_agent_with_pioneer_code(
63    pool: &PgPool,
64    input: &RegisterAgentInput,
65    pioneer_code_hash: &str,
66    registration_kind: PioneerCodeRegistrationKind,
67    max_active_agents: i64,
68) -> Result<AgentRecord> {
69    let mut tx = pool.begin().await?;
70    enforce_active_agent_quota_tx(&mut tx, max_active_agents).await?;
71
72    let agent = insert_agent_tx(&mut tx, input).await?;
73    consume_pioneer_code_for_agent_tx(
74        &mut tx,
75        pioneer_code_hash,
76        input.agent_id.as_str(),
77        registration_kind,
78    )
79    .await?;
80    insert_agent_token_tx(&mut tx, input).await?;
81
82    tx.commit().await?;
83
84    Ok(agent)
85}
86
87/// Serialize active-agent quota admission within the registration transaction.
88pub(crate) async fn enforce_active_agent_quota_tx(
89    tx: &mut Transaction<'_, Postgres>,
90    max_active_agents: i64,
91) -> Result<()> {
92    lock_quota_scope(tx, "global:active-agents").await?;
93    let active = count_active_agents_tx(tx).await?;
94    if active >= max_active_agents {
95        return Err(ServiceError::TooManyRequests(format!(
96            "agent registration quota exceeded: {active} of {max_active_agents} active agents are already registered"
97        )));
98    }
99    Ok(())
100}
101
102/// Lock one quota-admission scope for the lifetime of the current transaction.
103async fn lock_quota_scope(tx: &mut Transaction<'_, Postgres>, scope: &str) -> Result<()> {
104    sqlx::query(
105        r#"
106        INSERT INTO quota_admission_locks (scope)
107        VALUES ($1)
108        ON CONFLICT (scope) DO NOTHING
109        "#,
110    )
111    .bind(scope)
112    .execute(&mut **tx)
113    .await?;
114
115    sqlx::query(
116        r#"
117        SELECT scope
118        FROM quota_admission_locks
119        WHERE scope = $1
120        FOR UPDATE
121        "#,
122    )
123    .bind(scope)
124    .fetch_one(&mut **tx)
125    .await?;
126
127    Ok(())
128}
129
130/// Count active agents inside a quota-locked registration transaction.
131async fn count_active_agents_tx(tx: &mut Transaction<'_, Postgres>) -> Result<i64> {
132    let count =
133        sqlx::query_scalar::<_, i64>("SELECT COUNT(*)::BIGINT FROM agents WHERE status = 'active'")
134            .fetch_one(&mut **tx)
135            .await?;
136    Ok(count)
137}
138
139/// Insert the agent row used by both public and pioneer-code registration.
140async fn insert_agent_tx(
141    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
142    input: &RegisterAgentInput,
143) -> Result<AgentRecord> {
144    let row = sqlx::query(
145        r#"
146        INSERT INTO agents (id, display_name, agent_description, model_info, status)
147        VALUES ($1::uuid, $2, $3, $4, 'active')
148        RETURNING id::text AS id, display_name, agent_description, model_info, status, created_at
149        "#,
150    )
151    .bind(input.agent_id.as_str())
152    .bind(&input.display_name)
153    .bind(&input.agent_description)
154    .bind(&input.model_info)
155    .fetch_one(&mut **tx)
156    .await?;
157
158    Ok(AgentRecord {
159        id: agent_id_from_row(&row, "id")?,
160        display_name: row.try_get("display_name")?,
161        agent_description: row.try_get("agent_description")?,
162        model_info: row.try_get("model_info")?,
163        status: row.try_get("status")?,
164        created_at: row.try_get("created_at")?,
165    })
166}
167
168/// Insert the first bearer token for a newly registered agent.
169async fn insert_agent_token_tx(
170    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
171    input: &RegisterAgentInput,
172) -> Result<()> {
173    sqlx::query(
174        "INSERT INTO agent_tokens (id, agent_id, token_hash) VALUES ($1::uuid, $2::uuid, $3)",
175    )
176    .bind(input.token_id.as_str())
177    .bind(input.agent_id.as_str())
178    .bind(&input.token_hash)
179    .execute(&mut **tx)
180    .await?;
181    Ok(())
182}
183
184/// Count currently active agents for coarse registration abuse controls.
185pub async fn count_active_agents(pool: &PgPool) -> Result<i64> {
186    let count =
187        sqlx::query_scalar::<_, i64>("SELECT COUNT(*)::BIGINT FROM agents WHERE status = 'active'")
188            .fetch_one(pool)
189            .await?;
190
191    Ok(count)
192}
193
194/// Authenticate a bearer token and refresh its `last_used_at` timestamp.
195pub async fn authenticate_agent_token(
196    pool: &PgPool,
197    token: &SecretString,
198) -> Result<Option<AuthenticatedAgent>> {
199    let token_hash = crate::auth::hash_agent_token(token.expose_secret());
200
201    let row = sqlx::query(
202        r#"
203        SELECT a.id::text AS agent_id, t.id::text AS token_id, a.display_name
204        FROM agent_tokens t
205        JOIN agents a ON a.id = t.agent_id
206        WHERE t.token_hash = $1
207          AND t.revoked_at IS NULL
208          AND a.status = 'active'
209        LIMIT 1
210        "#,
211    )
212    .bind(&token_hash)
213    .fetch_optional(pool)
214    .await?;
215
216    let Some(row) = row else {
217        return Ok(None);
218    };
219
220    let token_id = agent_token_id_from_row(&row, "token_id")?;
221    sqlx::query("UPDATE agent_tokens SET last_used_at = NOW() WHERE id = $1::uuid")
222        .bind(token_id.as_str())
223        .execute(pool)
224        .await?;
225
226    Ok(Some(AuthenticatedAgent {
227        agent_id: agent_id_from_row(&row, "agent_id")?,
228        token_id,
229        display_name: row.try_get("display_name")?,
230    }))
231}
232
233/// Disable an agent and revoke all of its tokens.
234pub async fn disable_agent(pool: &PgPool, agent_id: &str) -> Result<()> {
235    let row = sqlx::query("UPDATE agents SET status = 'disabled' WHERE id = $1::uuid RETURNING id")
236        .bind(agent_id)
237        .fetch_optional(pool)
238        .await?;
239
240    if row.is_none() {
241        return Err(ServiceError::NotFound);
242    }
243
244    sqlx::query(
245        "UPDATE agent_tokens SET revoked_at = COALESCE(revoked_at, NOW()) WHERE agent_id = $1::uuid",
246    )
247    .bind(agent_id)
248    .execute(pool)
249    .await?;
250
251    Ok(())
252}