Skip to main content

arcp_runtime/runtime/
credentials.rs

1//! Provisioned credential support for lease-bound jobs (ARCP v1.1 §9.8).
2
3use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6
7use async_trait::async_trait;
8
9use arcp_core::error::ARCPError;
10use arcp_core::ids::{JobId, SessionId};
11use arcp_core::messages::LeaseRequest;
12
13// Wire types now live in `arcp_core::messages`; re-export them at the runtime
14// `credentials` path for backwards compatibility with v1.x users.
15pub use arcp_core::messages::{CredentialId, CredentialScheme, ProvisionedCredential};
16
17/// Job metadata supplied to a [`CredentialProvisioner`].
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct CredentialJobContext {
20    /// Job receiving the credential.
21    pub job_id: JobId,
22    /// Owning session.
23    pub session_id: SessionId,
24    /// Authenticated principal, if any.
25    pub principal: Option<String>,
26    /// Parent job for delegated jobs.
27    pub parent_job_id: Option<JobId>,
28}
29
30/// Vendor-neutral async provisioner interface.
31#[async_trait]
32pub trait CredentialProvisioner: Send + Sync {
33    /// Issue credentials constrained by `lease` for `ctx`.
34    ///
35    /// # Errors
36    ///
37    /// Returns [`ARCPError`] when the upstream cannot issue the requested
38    /// credential.
39    async fn issue(
40        &self,
41        lease: &LeaseRequest,
42        ctx: &CredentialJobContext,
43    ) -> Result<Vec<ProvisionedCredential>, ARCPError>;
44
45    /// Revoke one credential by id.
46    ///
47    /// # Errors
48    ///
49    /// Returns [`ARCPError`] when revocation failed.
50    async fn revoke(&self, id: &CredentialId) -> Result<(), ARCPError>;
51}
52
53/// Reference provisioner for tests and examples.
54#[derive(Default)]
55pub struct InMemoryCredentialProvisioner {
56    counter: AtomicU64,
57    issued: Mutex<Vec<ProvisionedCredential>>,
58    revoked: Mutex<Vec<CredentialId>>,
59}
60
61impl std::fmt::Debug for InMemoryCredentialProvisioner {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        f.debug_struct("InMemoryCredentialProvisioner")
64            .finish_non_exhaustive()
65    }
66}
67
68impl InMemoryCredentialProvisioner {
69    /// Snapshot issued credentials. Secret values are present for test assertions.
70    #[must_use]
71    pub fn issued_credentials(&self) -> Vec<ProvisionedCredential> {
72        self.issued
73            .lock()
74            .map_or_else(|_| Vec::new(), |g| g.clone())
75    }
76
77    /// Snapshot revoked credential ids.
78    #[must_use]
79    pub fn revoked_ids(&self) -> Vec<CredentialId> {
80        self.revoked
81            .lock()
82            .map_or_else(|_| Vec::new(), |g| g.clone())
83    }
84
85    /// Validate a child credential lease against a parent lease.
86    ///
87    /// # Errors
88    ///
89    /// Returns [`ARCPError::LeaseSubsetViolation`] when `child` widens `parent`.
90    pub fn validate_child_constraints(
91        parent: &LeaseRequest,
92        child: &LeaseRequest,
93    ) -> Result<(), ARCPError> {
94        if let Some(violation) = parent.subset_violation(child, &HashMap::new()) {
95            return Err(ARCPError::LeaseSubsetViolation {
96                detail: format!("{violation:?}"),
97            });
98        }
99        Ok(())
100    }
101}
102
103#[async_trait]
104impl CredentialProvisioner for InMemoryCredentialProvisioner {
105    async fn issue(
106        &self,
107        lease: &LeaseRequest,
108        _ctx: &CredentialJobContext,
109    ) -> Result<Vec<ProvisionedCredential>, ARCPError> {
110        let n = self.counter.fetch_add(1, Ordering::AcqRel) + 1;
111        let credential = ProvisionedCredential {
112            id: CredentialId::new(n),
113            scheme: CredentialScheme::Bearer,
114            value: format!("test-token-{n}"),
115            endpoint: "https://example.invalid/llm".into(),
116            profile: Some("test".into()),
117            constraints: Some(lease.clone()),
118        };
119        self.issued
120            .lock()
121            .map_err(|_| ARCPError::Internal {
122                detail: "credential provisioner mutex poisoned".into(),
123            })?
124            .push(credential.clone());
125        Ok(vec![credential])
126    }
127
128    async fn revoke(&self, id: &CredentialId) -> Result<(), ARCPError> {
129        self.revoked
130            .lock()
131            .map_err(|_| ARCPError::Internal {
132                detail: "credential provisioner mutex poisoned".into(),
133            })?
134            .push(id.clone());
135        Ok(())
136    }
137}
138
139/// In-memory ledger of outstanding credential ids by job.
140#[derive(Clone, Default)]
141pub struct CredentialLedger {
142    inner: Arc<dashmap::DashMap<JobId, Vec<CredentialId>>>,
143}
144
145impl std::fmt::Debug for CredentialLedger {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        f.debug_struct("CredentialLedger")
148            .field("jobs", &self.inner.len())
149            .finish()
150    }
151}
152
153impl CredentialLedger {
154    /// Construct an empty ledger.
155    #[must_use]
156    pub fn new() -> Self {
157        Self::default()
158    }
159
160    /// Record credentials issued for `job_id`.
161    pub fn record_issued(&self, job_id: &JobId, credentials: &[ProvisionedCredential]) {
162        if credentials.is_empty() {
163            return;
164        }
165        let ids = credentials.iter().map(|c| c.id.clone()).collect::<Vec<_>>();
166        self.inner
167            .entry(job_id.clone())
168            .and_modify(|existing| existing.extend(ids.clone()))
169            .or_insert(ids);
170    }
171
172    /// Outstanding ids for `job_id`.
173    #[must_use]
174    pub fn outstanding_for_job(&self, job_id: &JobId) -> Vec<CredentialId> {
175        self.inner
176            .get(job_id)
177            .map_or_else(Vec::new, |entry| entry.value().clone())
178    }
179
180    /// Mark a credential as revoked.
181    pub fn mark_revoked(&self, job_id: &JobId, credential_id: &CredentialId) {
182        if let Some(mut ids) = self.inner.get_mut(job_id) {
183            ids.retain(|id| id != credential_id);
184            if ids.is_empty() {
185                drop(ids);
186                self.inner.remove(job_id);
187            }
188        }
189    }
190}
191
192/// Revoke every outstanding credential for a job.
193///
194/// # Errors
195///
196/// Returns the last revocation error if all retry attempts fail for any
197/// credential.
198pub async fn revoke_all_for_job(
199    ledger: &CredentialLedger,
200    provisioner: &Arc<dyn CredentialProvisioner>,
201    job_id: &JobId,
202) -> Result<(), ARCPError> {
203    let mut last_error = None;
204    for id in ledger.outstanding_for_job(job_id) {
205        let mut revoked = false;
206        for attempt in 0..3 {
207            match provisioner.revoke(&id).await {
208                Ok(()) => {
209                    ledger.mark_revoked(job_id, &id);
210                    revoked = true;
211                    break;
212                }
213                Err(e) => {
214                    last_error = Some(e);
215                    let delay = 10_u64.saturating_mul(1 << attempt);
216                    tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
217                }
218            }
219        }
220        if !revoked {
221            tracing::warn!(credential_id = %id, job_id = %job_id, "credential revocation failed");
222        }
223    }
224    if ledger.outstanding_for_job(job_id).is_empty() {
225        Ok(())
226    } else {
227        Err(last_error.unwrap_or_else(|| ARCPError::Unavailable {
228            detail: "credential revocation failed".into(),
229        }))
230    }
231}
232
233#[cfg(test)]
234#[allow(
235    clippy::expect_used,
236    clippy::unwrap_used,
237    clippy::panic,
238    clippy::missing_panics_doc
239)]
240mod tests {
241    use super::*;
242    use arcp_core::messages::{CostBudget, CostBudgetAmount, ModelUse};
243
244    fn lease(pattern: &str) -> LeaseRequest {
245        LeaseRequest {
246            cost_budget: Some(CostBudget {
247                amounts: vec![CostBudgetAmount {
248                    currency: "USD".into(),
249                    amount: 1.0,
250                }],
251            }),
252            model_use: Some(ModelUse {
253                patterns: vec![pattern.into()],
254            }),
255            expires_at: None,
256            extra: std::collections::BTreeMap::default(),
257        }
258    }
259
260    #[tokio::test]
261    async fn in_memory_provisioner_issues_and_revokes_round_trip() {
262        let provisioner = InMemoryCredentialProvisioner::default();
263        let ctx = CredentialJobContext {
264            job_id: JobId::new(),
265            session_id: SessionId::new(),
266            principal: Some("p".into()),
267            parent_job_id: None,
268        };
269        let creds = provisioner
270            .issue(&lease("tier-fast/*"), &ctx)
271            .await
272            .expect("issue");
273        assert_eq!(creds.len(), 1);
274        assert_eq!(creds[0].value, "test-token-1");
275        provisioner.revoke(&creds[0].id).await.expect("revoke");
276        assert_eq!(provisioner.revoked_ids(), vec![creds[0].id.clone()]);
277    }
278
279    #[test]
280    fn ledger_records_outstanding_until_revoke() {
281        let ledger = CredentialLedger::new();
282        let job_id = JobId::new();
283        let credential = ProvisionedCredential {
284            id: CredentialId::new(7),
285            scheme: CredentialScheme::Bearer,
286            value: "secret".into(),
287            endpoint: "https://example.invalid".into(),
288            profile: None,
289            constraints: None,
290        };
291        ledger.record_issued(&job_id, std::slice::from_ref(&credential));
292        assert_eq!(
293            ledger.outstanding_for_job(&job_id),
294            vec![credential.id.clone()]
295        );
296        ledger.mark_revoked(&job_id, &credential.id);
297        assert!(ledger.outstanding_for_job(&job_id).is_empty());
298    }
299
300    #[test]
301    fn child_credential_must_be_subset_of_parent() {
302        let parent = lease("tier-fast/*");
303        let child = lease("tier-fast/small");
304        InMemoryCredentialProvisioner::validate_child_constraints(&parent, &child).expect("subset");
305        let widened = lease("*");
306        assert!(matches!(
307            InMemoryCredentialProvisioner::validate_child_constraints(&parent, &widened),
308            Err(ARCPError::LeaseSubsetViolation { .. })
309        ));
310    }
311}