1use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6
7use async_trait::async_trait;
8
9use arcp_core::error::ARCPError;
10use arcp_core::ids::{JobId, SessionId};
11use arcp_core::messages::LeaseRequest;
12
13pub use arcp_core::messages::{CredentialId, CredentialScheme, ProvisionedCredential};
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct CredentialJobContext {
20 pub job_id: JobId,
22 pub session_id: SessionId,
24 pub principal: Option<String>,
26 pub parent_job_id: Option<JobId>,
28}
29
30#[async_trait]
32pub trait CredentialProvisioner: Send + Sync {
33 async fn issue(
40 &self,
41 lease: &LeaseRequest,
42 ctx: &CredentialJobContext,
43 ) -> Result<Vec<ProvisionedCredential>, ARCPError>;
44
45 async fn revoke(&self, id: &CredentialId) -> Result<(), ARCPError>;
51}
52
53#[derive(Default)]
55pub struct InMemoryCredentialProvisioner {
56 counter: AtomicU64,
57 issued: Mutex<Vec<ProvisionedCredential>>,
58 revoked: Mutex<Vec<CredentialId>>,
59}
60
61impl std::fmt::Debug for InMemoryCredentialProvisioner {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 f.debug_struct("InMemoryCredentialProvisioner")
64 .finish_non_exhaustive()
65 }
66}
67
68impl InMemoryCredentialProvisioner {
69 #[must_use]
71 pub fn issued_credentials(&self) -> Vec<ProvisionedCredential> {
72 self.issued
73 .lock()
74 .map_or_else(|_| Vec::new(), |g| g.clone())
75 }
76
77 #[must_use]
79 pub fn revoked_ids(&self) -> Vec<CredentialId> {
80 self.revoked
81 .lock()
82 .map_or_else(|_| Vec::new(), |g| g.clone())
83 }
84
85 pub fn validate_child_constraints(
91 parent: &LeaseRequest,
92 child: &LeaseRequest,
93 ) -> Result<(), ARCPError> {
94 if let Some(violation) = parent.subset_violation(child, &HashMap::new()) {
95 return Err(ARCPError::LeaseSubsetViolation {
96 detail: format!("{violation:?}"),
97 });
98 }
99 Ok(())
100 }
101}
102
103#[async_trait]
104impl CredentialProvisioner for InMemoryCredentialProvisioner {
105 async fn issue(
106 &self,
107 lease: &LeaseRequest,
108 _ctx: &CredentialJobContext,
109 ) -> Result<Vec<ProvisionedCredential>, ARCPError> {
110 let n = self.counter.fetch_add(1, Ordering::AcqRel) + 1;
111 let credential = ProvisionedCredential {
112 id: CredentialId::new(n),
113 scheme: CredentialScheme::Bearer,
114 value: format!("test-token-{n}"),
115 endpoint: "https://example.invalid/llm".into(),
116 profile: Some("test".into()),
117 constraints: Some(lease.clone()),
118 };
119 self.issued
120 .lock()
121 .map_err(|_| ARCPError::Internal {
122 detail: "credential provisioner mutex poisoned".into(),
123 })?
124 .push(credential.clone());
125 Ok(vec![credential])
126 }
127
128 async fn revoke(&self, id: &CredentialId) -> Result<(), ARCPError> {
129 self.revoked
130 .lock()
131 .map_err(|_| ARCPError::Internal {
132 detail: "credential provisioner mutex poisoned".into(),
133 })?
134 .push(id.clone());
135 Ok(())
136 }
137}
138
139#[derive(Clone, Default)]
141pub struct CredentialLedger {
142 inner: Arc<dashmap::DashMap<JobId, Vec<CredentialId>>>,
143}
144
145impl std::fmt::Debug for CredentialLedger {
146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 f.debug_struct("CredentialLedger")
148 .field("jobs", &self.inner.len())
149 .finish()
150 }
151}
152
153impl CredentialLedger {
154 #[must_use]
156 pub fn new() -> Self {
157 Self::default()
158 }
159
160 pub fn record_issued(&self, job_id: &JobId, credentials: &[ProvisionedCredential]) {
162 if credentials.is_empty() {
163 return;
164 }
165 let ids = credentials.iter().map(|c| c.id.clone()).collect::<Vec<_>>();
166 self.inner
167 .entry(job_id.clone())
168 .and_modify(|existing| existing.extend(ids.clone()))
169 .or_insert(ids);
170 }
171
172 #[must_use]
174 pub fn outstanding_for_job(&self, job_id: &JobId) -> Vec<CredentialId> {
175 self.inner
176 .get(job_id)
177 .map_or_else(Vec::new, |entry| entry.value().clone())
178 }
179
180 pub fn mark_revoked(&self, job_id: &JobId, credential_id: &CredentialId) {
182 if let Some(mut ids) = self.inner.get_mut(job_id) {
183 ids.retain(|id| id != credential_id);
184 if ids.is_empty() {
185 drop(ids);
186 self.inner.remove(job_id);
187 }
188 }
189 }
190}
191
192pub async fn revoke_all_for_job(
199 ledger: &CredentialLedger,
200 provisioner: &Arc<dyn CredentialProvisioner>,
201 job_id: &JobId,
202) -> Result<(), ARCPError> {
203 let mut last_error = None;
204 for id in ledger.outstanding_for_job(job_id) {
205 let mut revoked = false;
206 for attempt in 0..3 {
207 match provisioner.revoke(&id).await {
208 Ok(()) => {
209 ledger.mark_revoked(job_id, &id);
210 revoked = true;
211 break;
212 }
213 Err(e) => {
214 last_error = Some(e);
215 let delay = 10_u64.saturating_mul(1 << attempt);
216 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
217 }
218 }
219 }
220 if !revoked {
221 tracing::warn!(credential_id = %id, job_id = %job_id, "credential revocation failed");
222 }
223 }
224 if ledger.outstanding_for_job(job_id).is_empty() {
225 Ok(())
226 } else {
227 Err(last_error.unwrap_or_else(|| ARCPError::Unavailable {
228 detail: "credential revocation failed".into(),
229 }))
230 }
231}
232
233#[cfg(test)]
234#[allow(
235 clippy::expect_used,
236 clippy::unwrap_used,
237 clippy::panic,
238 clippy::missing_panics_doc
239)]
240mod tests {
241 use super::*;
242 use arcp_core::messages::{CostBudget, CostBudgetAmount, ModelUse};
243
244 fn lease(pattern: &str) -> LeaseRequest {
245 LeaseRequest {
246 cost_budget: Some(CostBudget {
247 amounts: vec![CostBudgetAmount {
248 currency: "USD".into(),
249 amount: 1.0,
250 }],
251 }),
252 model_use: Some(ModelUse {
253 patterns: vec![pattern.into()],
254 }),
255 expires_at: None,
256 extra: std::collections::BTreeMap::default(),
257 }
258 }
259
260 #[tokio::test]
261 async fn in_memory_provisioner_issues_and_revokes_round_trip() {
262 let provisioner = InMemoryCredentialProvisioner::default();
263 let ctx = CredentialJobContext {
264 job_id: JobId::new(),
265 session_id: SessionId::new(),
266 principal: Some("p".into()),
267 parent_job_id: None,
268 };
269 let creds = provisioner
270 .issue(&lease("tier-fast/*"), &ctx)
271 .await
272 .expect("issue");
273 assert_eq!(creds.len(), 1);
274 assert_eq!(creds[0].value, "test-token-1");
275 provisioner.revoke(&creds[0].id).await.expect("revoke");
276 assert_eq!(provisioner.revoked_ids(), vec![creds[0].id.clone()]);
277 }
278
279 #[test]
280 fn ledger_records_outstanding_until_revoke() {
281 let ledger = CredentialLedger::new();
282 let job_id = JobId::new();
283 let credential = ProvisionedCredential {
284 id: CredentialId::new(7),
285 scheme: CredentialScheme::Bearer,
286 value: "secret".into(),
287 endpoint: "https://example.invalid".into(),
288 profile: None,
289 constraints: None,
290 };
291 ledger.record_issued(&job_id, std::slice::from_ref(&credential));
292 assert_eq!(
293 ledger.outstanding_for_job(&job_id),
294 vec![credential.id.clone()]
295 );
296 ledger.mark_revoked(&job_id, &credential.id);
297 assert!(ledger.outstanding_for_job(&job_id).is_empty());
298 }
299
300 #[test]
301 fn child_credential_must_be_subset_of_parent() {
302 let parent = lease("tier-fast/*");
303 let child = lease("tier-fast/small");
304 InMemoryCredentialProvisioner::validate_child_constraints(&parent, &child).expect("subset");
305 let widened = lease("*");
306 assert!(matches!(
307 InMemoryCredentialProvisioner::validate_child_constraints(&parent, &widened),
308 Err(ARCPError::LeaseSubsetViolation { .. })
309 ));
310 }
311}