Skip to main content

systemprompt_users/repository/
federated_identity.rs

1//! Repository for `federated_identities` — the `{issuer, external_sub} ->
2//! users.id` mapping used by RFC 8693 token-exchange first-touch.
3
4use chrono::Utc;
5use sqlx::Acquire;
6use systemprompt_identifiers::UserId;
7use systemprompt_traits::FederatedIdentityClaims;
8
9use crate::error::Result;
10use crate::models::{User, UserRole, UserStatus};
11use crate::repository::UserRepository;
12
13impl UserRepository {
14    /// Look up the local `UserId` for an external `(issuer, external_sub)`
15    /// without side effects. Returns `Ok(None)` if no mapping exists yet.
16    pub async fn find_federated(&self, issuer: &str, external_sub: &str) -> Result<Option<UserId>> {
17        let row = sqlx::query!(
18            "SELECT user_id FROM federated_identities WHERE issuer = $1 AND external_sub = $2",
19            issuer,
20            external_sub
21        )
22        .fetch_optional(&*self.pool)
23        .await?;
24
25        Ok(row.map(|r| UserId::new(r.user_id)))
26    }
27
28    /// Resolve a federated identity to a local `User`, creating both the
29    /// `users` row and the `federated_identities` mapping on first touch.
30    ///
31    /// All writes happen in a single transaction so a race between two
32    /// concurrent first-touch requests for the same `(issuer, external_sub)`
33    /// cannot produce two local users — the second loser observes the
34    /// primary-key conflict and re-reads the mapping.
35    pub async fn find_or_create_federated(
36        &self,
37        issuer: &str,
38        external_sub: &str,
39        claims: &FederatedIdentityClaims,
40    ) -> Result<User> {
41        let mut conn = self.write_pool.acquire().await?;
42        let mut tx = conn.begin().await?;
43
44        if let Some(existing) = sqlx::query!(
45            "UPDATE federated_identities SET last_seen_at = CURRENT_TIMESTAMP WHERE issuer = $1 \
46             AND external_sub = $2 RETURNING user_id",
47            issuer,
48            external_sub
49        )
50        .fetch_optional(&mut *tx)
51        .await?
52        {
53            let user = sqlx::query_as!(
54                User,
55                r#"
56                SELECT id, name, email, full_name, display_name, status,
57                       email_verified, roles, avatar_url, is_bot, is_scanner,
58                       created_at, updated_at
59                FROM users WHERE id = $1
60                "#,
61                existing.user_id
62            )
63            .fetch_one(&mut *tx)
64            .await?;
65            tx.commit().await?;
66            return Ok(user);
67        }
68
69        let now = Utc::now();
70        let id = UserId::new(uuid::Uuid::new_v4().to_string());
71        let name = claims
72            .preferred_username
73            .clone()
74            .or_else(|| claims.name.clone())
75            .unwrap_or_else(|| format!("fed_{}_{}", short_hash(issuer), short_hash(external_sub)));
76        let synthetic_email = || {
77            format!(
78                "{}@{}.federated.local",
79                short_hash(external_sub),
80                short_host(issuer)
81            )
82        };
83        let email = match (claims.email.as_deref(), claims.email_verified) {
84            (Some(addr), true) => addr.to_string(),
85            (Some(addr), false) => {
86                tracing::warn!(
87                    issuer,
88                    external_sub,
89                    upstream_email = addr,
90                    "upstream IdP did not assert email_verified; using synthetic local email to \
91                     prevent account-claim attacks"
92                );
93                synthetic_email()
94            },
95            (None, _) => synthetic_email(),
96        };
97        let display_name = claims.name.clone();
98        let status = UserStatus::Active.as_str();
99        let roles = normalised_roles(&claims.roles);
100
101        let user = sqlx::query_as!(
102            User,
103            r#"
104            INSERT INTO users (
105                id, name, email, full_name, display_name,
106                status, email_verified, roles, is_bot,
107                created_at, updated_at
108            )
109            VALUES ($1, $2, $3, $4, $5, $6, false, $7::TEXT[], false, $8, $8)
110            RETURNING id, name, email, full_name, display_name, status, email_verified,
111                      roles, avatar_url, is_bot, is_scanner, created_at, updated_at
112            "#,
113            id.as_str(),
114            name,
115            email,
116            display_name.as_deref(),
117            display_name.as_deref(),
118            status,
119            &roles,
120            now,
121        )
122        .fetch_one(&mut *tx)
123        .await?;
124
125        sqlx::query!(
126            "INSERT INTO federated_identities (issuer, external_sub, user_id) VALUES ($1, $2, $3)",
127            issuer,
128            external_sub,
129            user.id.as_str()
130        )
131        .execute(&mut *tx)
132        .await?;
133
134        tx.commit().await?;
135        Ok(user)
136    }
137}
138
139fn normalised_roles(claim_roles: &[String]) -> Vec<String> {
140    if claim_roles.is_empty() {
141        vec![UserRole::User.as_str().to_string()]
142    } else {
143        claim_roles.to_vec()
144    }
145}
146
147fn short_hash(s: &str) -> String {
148    use sha2::{Digest, Sha256};
149    let digest = Sha256::digest(s.as_bytes());
150    hex::encode(&digest[..6])
151}
152
153fn short_host(issuer: &str) -> String {
154    issuer
155        .trim_start_matches("https://")
156        .trim_start_matches("http://")
157        .split('/')
158        .next()
159        .unwrap_or("issuer")
160        .replace(['.', ':'], "-")
161}