1use std::collections::HashMap;
49use std::io;
50use std::path::{Path, PathBuf};
51use std::sync::Arc;
52use std::time::{Duration, SystemTime, UNIX_EPOCH};
53
54use serde::{Deserialize, Serialize};
55use tokio::sync::{Mutex, OwnedMutexGuard, RwLock};
56
57use crate::secrets::{BrokerError, RedactedString, ResolveFuture, SecretBundle, TokenBroker};
58
59pub const DEFAULT_REFRESH_WINDOW: Duration = Duration::from_secs(5 * 60);
63
64#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
68pub struct FileTokenRecord {
69 pub access_token: String,
70 pub refresh_token: String,
71 pub expires_at_unix: u64,
75}
76
77impl FileTokenRecord {
78 pub fn expires_at(&self) -> SystemTime {
79 UNIX_EPOCH + Duration::from_secs(self.expires_at_unix)
80 }
81
82 pub fn from_expires_at(
83 access_token: impl Into<String>,
84 refresh_token: impl Into<String>,
85 expires_at: SystemTime,
86 ) -> Self {
87 let expires_at_unix = expires_at
88 .duration_since(UNIX_EPOCH)
89 .map(|d| d.as_secs())
90 .unwrap_or(0);
91 Self {
92 access_token: access_token.into(),
93 refresh_token: refresh_token.into(),
94 expires_at_unix,
95 }
96 }
97}
98
99#[derive(Clone)]
100struct CachedEntry {
101 bundle: Arc<SecretBundle>,
102 expires_at: SystemTime,
103}
104
105pub struct FileTokenBroker {
106 root: PathBuf,
107 refresh_window: Duration,
108 cache: RwLock<HashMap<String, CachedEntry>>,
109 refresh_locks: Mutex<HashMap<String, Arc<Mutex<()>>>>,
110}
111
112impl FileTokenBroker {
113 pub fn new(root: impl Into<PathBuf>) -> Self {
114 Self {
115 root: root.into(),
116 refresh_window: DEFAULT_REFRESH_WINDOW,
117 cache: RwLock::new(HashMap::new()),
118 refresh_locks: Mutex::new(HashMap::new()),
119 }
120 }
121
122 pub fn with_refresh_window(mut self, window: Duration) -> Self {
123 self.refresh_window = window;
124 self
125 }
126
127 pub fn refresh_window(&self) -> Duration {
128 self.refresh_window
129 }
130
131 pub fn root(&self) -> &Path {
132 &self.root
133 }
134
135 fn bearer_dir(&self, bearer_id: &str) -> PathBuf {
136 self.root.join(bearer_id)
137 }
138
139 pub async fn put(&self, bearer_id: &str, rec: FileTokenRecord) -> io::Result<()> {
141 let dir = self.bearer_dir(bearer_id);
142 tokio::fs::create_dir_all(&dir).await?;
143 #[cfg(unix)]
144 set_unix_mode(&dir, 0o700).await?;
145
146 write_secret_file(&dir.join("access_token.json"), &rec.access_token).await?;
147 write_secret_file(&dir.join("refresh_token.json"), &rec.refresh_token).await?;
148 write_secret_file(
149 &dir.join("expires_at.json"),
150 &rec.expires_at_unix.to_string(),
151 )
152 .await?;
153
154 let bundle = bundle_from_record(&rec);
155 self.cache.write().await.insert(
156 bearer_id.to_string(),
157 CachedEntry {
158 bundle: Arc::new(bundle),
159 expires_at: rec.expires_at(),
160 },
161 );
162 Ok(())
163 }
164
165 pub async fn read_record(&self, bearer_id: &str) -> io::Result<Option<FileTokenRecord>> {
169 let dir = self.bearer_dir(bearer_id);
170 if !tokio::fs::try_exists(&dir).await? {
171 return Ok(None);
172 }
173 let access_token = read_secret_file(&dir.join("access_token.json")).await?;
174 let refresh_token = read_secret_file(&dir.join("refresh_token.json")).await?;
175 let expires_at_unix = read_secret_file(&dir.join("expires_at.json"))
176 .await?
177 .parse::<u64>()
178 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
179 Ok(Some(FileTokenRecord {
180 access_token,
181 refresh_token,
182 expires_at_unix,
183 }))
184 }
185
186 pub async fn is_near_expiry(&self, bearer_id: &str) -> bool {
191 let now = SystemTime::now();
192 match self.cache.read().await.get(bearer_id) {
193 Some(entry) => entry
194 .expires_at
195 .duration_since(now)
196 .map(|remaining| remaining <= self.refresh_window)
197 .unwrap_or(true),
198 None => false,
199 }
200 }
201
202 pub async fn lock_refresh(&self, bearer_id: &str) -> OwnedMutexGuard<()> {
208 let arc = {
209 let mut map = self.refresh_locks.lock().await;
210 map.entry(bearer_id.to_string())
211 .or_insert_with(|| Arc::new(Mutex::new(())))
212 .clone()
213 };
214 arc.lock_owned().await
215 }
216}
217
218impl TokenBroker for FileTokenBroker {
219 fn resolve<'a>(&'a self, caller_id: Option<&'a str>) -> ResolveFuture<'a> {
220 Box::pin(async move {
221 let Some(id) = caller_id else {
222 return Ok(None);
223 };
224 if let Some(entry) = self.cache.read().await.get(id).cloned() {
225 return Ok(Some(entry.bundle));
226 }
227 match self.read_record(id).await {
228 Ok(Some(rec)) => {
229 let bundle = Arc::new(bundle_from_record(&rec));
230 self.cache.write().await.insert(
231 id.to_string(),
232 CachedEntry {
233 bundle: bundle.clone(),
234 expires_at: rec.expires_at(),
235 },
236 );
237 Ok(Some(bundle))
238 }
239 Ok(None) => Ok(None),
240 Err(e) => Err(BrokerError::Lookup(format!(
241 "file broker read failed for {id}: {e}"
242 ))),
243 }
244 })
245 }
246
247 fn accepted_token_formats(&self) -> &'static [&'static str] {
248 &["opaque"]
249 }
250}
251
252fn bundle_from_record(rec: &FileTokenRecord) -> SecretBundle {
253 let mut b = SecretBundle::new();
254 b.insert(
255 "access_token".to_string(),
256 RedactedString::new(rec.access_token.clone()),
257 );
258 b.insert(
259 "refresh_token".to_string(),
260 RedactedString::new(rec.refresh_token.clone()),
261 );
262 b
263}
264
265async fn write_secret_file(path: &Path, contents: &str) -> io::Result<()> {
266 let json = serde_json::to_string(contents).map_err(io::Error::other)?;
267 tokio::fs::write(path, json.as_bytes()).await?;
268 #[cfg(unix)]
269 set_unix_mode(path, 0o600).await?;
270 Ok(())
271}
272
273async fn read_secret_file(path: &Path) -> io::Result<String> {
274 let raw = tokio::fs::read_to_string(path).await?;
275 serde_json::from_str::<String>(&raw).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
276}
277
278#[cfg(unix)]
279async fn set_unix_mode(path: &Path, mode: u32) -> io::Result<()> {
280 use std::os::unix::fs::PermissionsExt;
281 let perms = std::fs::Permissions::from_mode(mode);
282 tokio::fs::set_permissions(path, perms).await
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288 use std::time::Duration;
289
290 fn now_plus(secs: u64) -> SystemTime {
291 SystemTime::now() + Duration::from_secs(secs)
292 }
293
294 #[tokio::test]
295 async fn put_then_resolve_returns_bundle() {
296 let dir = tempfile::tempdir().unwrap();
297 let broker = FileTokenBroker::new(dir.path());
298 broker
299 .put(
300 "agent-A",
301 FileTokenRecord::from_expires_at("acc-A", "ref-A", now_plus(3600)),
302 )
303 .await
304 .unwrap();
305 let bundle = broker
306 .resolve(Some("agent-A"))
307 .await
308 .unwrap()
309 .expect("bundle present");
310 assert_eq!(bundle.get("access_token").unwrap().expose(), "acc-A");
311 assert_eq!(bundle.get("refresh_token").unwrap().expose(), "ref-A");
312 }
313
314 #[tokio::test]
315 async fn resolve_unknown_bearer_is_none() {
316 let dir = tempfile::tempdir().unwrap();
317 let broker = FileTokenBroker::new(dir.path());
318 assert!(
319 broker
320 .resolve(Some("does-not-exist"))
321 .await
322 .unwrap()
323 .is_none()
324 );
325 }
326
327 #[tokio::test]
328 async fn resolve_anonymous_caller_is_none() {
329 let dir = tempfile::tempdir().unwrap();
330 let broker = FileTokenBroker::new(dir.path());
331 broker
332 .put(
333 "agent-A",
334 FileTokenRecord::from_expires_at("a", "r", now_plus(3600)),
335 )
336 .await
337 .unwrap();
338 assert!(broker.resolve(None).await.unwrap().is_none());
339 }
340
341 #[tokio::test]
342 async fn read_record_round_trips() {
343 let dir = tempfile::tempdir().unwrap();
344 let broker = FileTokenBroker::new(dir.path());
345 let rec = FileTokenRecord::from_expires_at("aaa", "rrr", now_plus(7200));
346 broker.put("agent-A", rec.clone()).await.unwrap();
347 let back = broker.read_record("agent-A").await.unwrap().unwrap();
348 assert_eq!(back, rec);
349 }
350
351 #[tokio::test]
352 async fn persistence_survives_broker_restart() {
353 let dir = tempfile::tempdir().unwrap();
354 {
355 let b1 = FileTokenBroker::new(dir.path());
356 b1.put(
357 "agent-A",
358 FileTokenRecord::from_expires_at("acc1", "ref1", now_plus(3600)),
359 )
360 .await
361 .unwrap();
362 }
363 let b2 = FileTokenBroker::new(dir.path());
364 let bundle = b2.resolve(Some("agent-A")).await.unwrap().unwrap();
365 assert_eq!(bundle.get("access_token").unwrap().expose(), "acc1");
366 }
367
368 #[tokio::test]
369 async fn cache_isolated_between_bearers() {
370 let dir = tempfile::tempdir().unwrap();
371 let broker = FileTokenBroker::new(dir.path());
372 broker
373 .put(
374 "agent-A",
375 FileTokenRecord::from_expires_at("acc-A", "ref-A", now_plus(3600)),
376 )
377 .await
378 .unwrap();
379 broker
380 .put(
381 "agent-B",
382 FileTokenRecord::from_expires_at("acc-B", "ref-B", now_plus(3600)),
383 )
384 .await
385 .unwrap();
386 let a = broker.resolve(Some("agent-A")).await.unwrap().unwrap();
387 let b = broker.resolve(Some("agent-B")).await.unwrap().unwrap();
388 assert_eq!(a.get("access_token").unwrap().expose(), "acc-A");
389 assert_eq!(b.get("access_token").unwrap().expose(), "acc-B");
390 }
391
392 #[tokio::test]
393 async fn is_near_expiry_true_inside_window() {
394 let dir = tempfile::tempdir().unwrap();
395 let broker = FileTokenBroker::new(dir.path()).with_refresh_window(Duration::from_secs(300));
396 broker
397 .put(
398 "agent-A",
399 FileTokenRecord::from_expires_at("a", "r", now_plus(120)),
400 )
401 .await
402 .unwrap();
403 assert!(broker.is_near_expiry("agent-A").await);
404 }
405
406 #[tokio::test]
407 async fn is_near_expiry_false_outside_window() {
408 let dir = tempfile::tempdir().unwrap();
409 let broker = FileTokenBroker::new(dir.path()).with_refresh_window(Duration::from_secs(300));
410 broker
411 .put(
412 "agent-A",
413 FileTokenRecord::from_expires_at("a", "r", now_plus(3600)),
414 )
415 .await
416 .unwrap();
417 assert!(!broker.is_near_expiry("agent-A").await);
418 }
419
420 #[tokio::test]
421 async fn is_near_expiry_true_when_already_expired() {
422 let dir = tempfile::tempdir().unwrap();
423 let broker = FileTokenBroker::new(dir.path());
424 broker
426 .put(
427 "agent-A",
428 FileTokenRecord {
429 access_token: "a".into(),
430 refresh_token: "r".into(),
431 expires_at_unix: 0,
432 },
433 )
434 .await
435 .unwrap();
436 assert!(broker.is_near_expiry("agent-A").await);
437 }
438
439 #[tokio::test]
440 async fn is_near_expiry_unknown_bearer_false() {
441 let dir = tempfile::tempdir().unwrap();
442 let broker = FileTokenBroker::new(dir.path());
443 assert!(!broker.is_near_expiry("never-seen").await);
444 }
445
446 #[tokio::test]
447 async fn refresh_lock_serialises_same_bearer() {
448 use tokio::sync::Notify;
449
450 let dir = tempfile::tempdir().unwrap();
451 let broker = Arc::new(FileTokenBroker::new(dir.path()));
452
453 let started = Arc::new(Notify::new());
454 let release = Arc::new(Notify::new());
455 let broker_a = broker.clone();
456 let started_a = started.clone();
457 let release_a = release.clone();
458
459 let task_a = tokio::spawn(async move {
461 let _guard = broker_a.lock_refresh("agent-A").await;
462 started_a.notify_one();
463 release_a.notified().await;
464 });
465
466 started.notified().await;
467
468 let broker_b = broker.clone();
470 let task_b = tokio::spawn(async move {
471 let _guard = broker_b.lock_refresh("agent-A").await;
472 "got-it"
473 });
474
475 tokio::time::sleep(Duration::from_millis(50)).await;
477 assert!(!task_b.is_finished(), "task B acquired before A released");
478
479 release.notify_one();
480 task_a.await.unwrap();
481 assert_eq!(task_b.await.unwrap(), "got-it");
482 }
483
484 #[tokio::test]
485 async fn refresh_lock_independent_across_bearers() {
486 let dir = tempfile::tempdir().unwrap();
487 let broker = Arc::new(FileTokenBroker::new(dir.path()));
488
489 let _g_a = broker.lock_refresh("agent-A").await;
490 let started_b = std::time::Instant::now();
492 let _g_b = broker.lock_refresh("agent-B").await;
493 assert!(
494 started_b.elapsed() < Duration::from_millis(50),
495 "lock_refresh blocked across bearers"
496 );
497 }
498
499 #[cfg(unix)]
500 #[tokio::test]
501 async fn unix_file_permissions_are_0600() {
502 use std::os::unix::fs::PermissionsExt;
503 let dir = tempfile::tempdir().unwrap();
504 let broker = FileTokenBroker::new(dir.path());
505 broker
506 .put(
507 "agent-A",
508 FileTokenRecord::from_expires_at("a", "r", now_plus(3600)),
509 )
510 .await
511 .unwrap();
512 let bearer_dir = dir.path().join("agent-A");
513 let dir_mode = std::fs::metadata(&bearer_dir).unwrap().permissions().mode() & 0o777;
514 assert_eq!(
515 dir_mode, 0o700,
516 "bearer dir should be 0700, got {dir_mode:o}"
517 );
518 for name in ["access_token.json", "refresh_token.json", "expires_at.json"] {
519 let mode = std::fs::metadata(bearer_dir.join(name))
520 .unwrap()
521 .permissions()
522 .mode()
523 & 0o777;
524 assert_eq!(mode, 0o600, "{name} should be 0600, got {mode:o}");
525 }
526 }
527}