Skip to main content

atd_runtime/
file_token_broker.rs

1//! Disk-backed [`TokenBroker`] for adopters that need cross-process
2//! persistence (Phase L.0 deliverable, `docs/issues/`
3//! 2026-04-24-security-capability-tokens-deferred path, and the
4//! `atd#6` GitHub issue).
5//!
6//! ## Layout
7//!
8//! ```text
9//! ${root}/
10//!   ${bearer_id}/                  (mode 0700)
11//!     access_token.json            (mode 0600)
12//!     refresh_token.json           (mode 0600)
13//!     expires_at.json              (mode 0600)
14//! ```
15//!
16//! Each JSON file holds a single string. The split-file shape matches
17//! the per-tenant on-disk layout the adopters already use in
18//! `healthkit_cli` (single-tenant since v1.2.0), so the disk migration
19//! when adopters switch from `InMemoryTokenBroker` to this is purely
20//! additive — drop the existing three files into a `${bearer_id}/`
21//! subdir.
22//!
23//! ## Refresh coordination
24//!
25//! [`FileTokenBroker::lock_refresh`] hands out a per-bearer mutex so
26//! parallel OAuth-refresh attempts for the same bearer can't both
27//! round-trip to the upstream provider. Adopters acquire the guard,
28//! re-check whether refresh is still needed (some other task may have
29//! finished while we were waiting), call their provider, then
30//! `put()` the new record. Other concurrent `resolve()` calls for
31//! the same bearer continue to serve the pre-refresh secrets — the
32//! lock guards the refresh *flow*, not the read path. Drop the guard
33//! to release.
34//!
35//! [`FileTokenBroker::is_near_expiry`] is the predicate adopters call
36//! to decide whether to take the refresh path at all. It's a no-IO
37//! check against the in-memory cache populated by `put`.
38//!
39//! ## What this is NOT
40//!
41//! - Not an OAuth client. Vendor-specific refresh flows live in adopter
42//!   code; the broker is purely the persistent token store + refresh
43//!   serialization point.
44//! - Not encrypted at rest. File permissions (0600) are the access
45//!   control. Adopters running on a multi-user host should layer an
46//!   FS-encryption story (`fscrypt`, LUKS, etc.) underneath.
47
48use std::collections::HashMap;
49use std::io;
50use std::path::{Path, PathBuf};
51use std::sync::Arc;
52use std::time::{Duration, SystemTime, UNIX_EPOCH};
53
54use serde::{Deserialize, Serialize};
55use tokio::sync::{Mutex, OwnedMutexGuard, RwLock};
56
57use crate::secrets::{BrokerError, RedactedString, ResolveFuture, SecretBundle, TokenBroker};
58
59/// Default refresh window — `resolve()` callers should treat a bearer
60/// as "needs refresh" if `expires_at - now < this`. Matches
61/// `healthkit_cli`'s single-tenant value (`auth::REFRESH_GUARD`).
62pub const DEFAULT_REFRESH_WINDOW: Duration = Duration::from_secs(5 * 60);
63
64/// On-disk shape persisted to `${root}/${bearer_id}/...`. Public so
65/// adopters can construct records for `put()` and compare what they
66/// read back from `read_record()`.
67#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
68pub struct FileTokenRecord {
69    pub access_token: String,
70    pub refresh_token: String,
71    /// Unix epoch seconds at which `access_token` ceases to be valid.
72    /// Stored as `u64` (no JSON Number precision loss for any
73    /// realistic OAuth expiry).
74    pub expires_at_unix: u64,
75}
76
77impl FileTokenRecord {
78    pub fn expires_at(&self) -> SystemTime {
79        UNIX_EPOCH + Duration::from_secs(self.expires_at_unix)
80    }
81
82    pub fn from_expires_at(
83        access_token: impl Into<String>,
84        refresh_token: impl Into<String>,
85        expires_at: SystemTime,
86    ) -> Self {
87        let expires_at_unix = expires_at
88            .duration_since(UNIX_EPOCH)
89            .map(|d| d.as_secs())
90            .unwrap_or(0);
91        Self {
92            access_token: access_token.into(),
93            refresh_token: refresh_token.into(),
94            expires_at_unix,
95        }
96    }
97}
98
99#[derive(Clone)]
100struct CachedEntry {
101    bundle: Arc<SecretBundle>,
102    expires_at: SystemTime,
103}
104
105pub struct FileTokenBroker {
106    root: PathBuf,
107    refresh_window: Duration,
108    cache: RwLock<HashMap<String, CachedEntry>>,
109    refresh_locks: Mutex<HashMap<String, Arc<Mutex<()>>>>,
110}
111
112impl FileTokenBroker {
113    pub fn new(root: impl Into<PathBuf>) -> Self {
114        Self {
115            root: root.into(),
116            refresh_window: DEFAULT_REFRESH_WINDOW,
117            cache: RwLock::new(HashMap::new()),
118            refresh_locks: Mutex::new(HashMap::new()),
119        }
120    }
121
122    pub fn with_refresh_window(mut self, window: Duration) -> Self {
123        self.refresh_window = window;
124        self
125    }
126
127    pub fn refresh_window(&self) -> Duration {
128        self.refresh_window
129    }
130
131    pub fn root(&self) -> &Path {
132        &self.root
133    }
134
135    fn bearer_dir(&self, bearer_id: &str) -> PathBuf {
136        self.root.join(bearer_id)
137    }
138
139    /// Write (or overwrite) the three-file record for `bearer_id`.
140    pub async fn put(&self, bearer_id: &str, rec: FileTokenRecord) -> io::Result<()> {
141        let dir = self.bearer_dir(bearer_id);
142        tokio::fs::create_dir_all(&dir).await?;
143        #[cfg(unix)]
144        set_unix_mode(&dir, 0o700).await?;
145
146        write_secret_file(&dir.join("access_token.json"), &rec.access_token).await?;
147        write_secret_file(&dir.join("refresh_token.json"), &rec.refresh_token).await?;
148        write_secret_file(
149            &dir.join("expires_at.json"),
150            &rec.expires_at_unix.to_string(),
151        )
152        .await?;
153
154        let bundle = bundle_from_record(&rec);
155        self.cache.write().await.insert(
156            bearer_id.to_string(),
157            CachedEntry {
158                bundle: Arc::new(bundle),
159                expires_at: rec.expires_at(),
160            },
161        );
162        Ok(())
163    }
164
165    /// Read the three-file record for `bearer_id` from disk. Does NOT
166    /// touch the in-memory cache; useful for verifying persisted state
167    /// in tests or for adopters that want to force a re-read.
168    pub async fn read_record(&self, bearer_id: &str) -> io::Result<Option<FileTokenRecord>> {
169        let dir = self.bearer_dir(bearer_id);
170        if !tokio::fs::try_exists(&dir).await? {
171            return Ok(None);
172        }
173        let access_token = read_secret_file(&dir.join("access_token.json")).await?;
174        let refresh_token = read_secret_file(&dir.join("refresh_token.json")).await?;
175        let expires_at_unix = read_secret_file(&dir.join("expires_at.json"))
176            .await?
177            .parse::<u64>()
178            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
179        Ok(Some(FileTokenRecord {
180            access_token,
181            refresh_token,
182            expires_at_unix,
183        }))
184    }
185
186    /// Cache-only predicate. Returns true when the cached `expires_at`
187    /// for this bearer is `<= now + refresh_window`. Returns false if
188    /// the bearer is not in cache (the caller should `resolve()` first
189    /// to populate it, or treat unknown as "not near expiry").
190    pub async fn is_near_expiry(&self, bearer_id: &str) -> bool {
191        let now = SystemTime::now();
192        match self.cache.read().await.get(bearer_id) {
193            Some(entry) => entry
194                .expires_at
195                .duration_since(now)
196                .map(|remaining| remaining <= self.refresh_window)
197                .unwrap_or(true),
198            None => false,
199        }
200    }
201
202    /// Acquire the per-bearer refresh mutex. Other concurrent acquirers
203    /// for the same `bearer_id` block until this guard drops; acquirers
204    /// for *different* bearers are unaffected. `resolve()` is NOT
205    /// gated on this lock — readers continue to serve pre-refresh
206    /// secrets.
207    pub async fn lock_refresh(&self, bearer_id: &str) -> OwnedMutexGuard<()> {
208        let arc = {
209            let mut map = self.refresh_locks.lock().await;
210            map.entry(bearer_id.to_string())
211                .or_insert_with(|| Arc::new(Mutex::new(())))
212                .clone()
213        };
214        arc.lock_owned().await
215    }
216}
217
218impl TokenBroker for FileTokenBroker {
219    fn resolve<'a>(&'a self, caller_id: Option<&'a str>) -> ResolveFuture<'a> {
220        Box::pin(async move {
221            let Some(id) = caller_id else {
222                return Ok(None);
223            };
224            if let Some(entry) = self.cache.read().await.get(id).cloned() {
225                return Ok(Some(entry.bundle));
226            }
227            match self.read_record(id).await {
228                Ok(Some(rec)) => {
229                    let bundle = Arc::new(bundle_from_record(&rec));
230                    self.cache.write().await.insert(
231                        id.to_string(),
232                        CachedEntry {
233                            bundle: bundle.clone(),
234                            expires_at: rec.expires_at(),
235                        },
236                    );
237                    Ok(Some(bundle))
238                }
239                Ok(None) => Ok(None),
240                Err(e) => Err(BrokerError::Lookup(format!(
241                    "file broker read failed for {id}: {e}"
242                ))),
243            }
244        })
245    }
246
247    fn accepted_token_formats(&self) -> &'static [&'static str] {
248        &["opaque"]
249    }
250}
251
252fn bundle_from_record(rec: &FileTokenRecord) -> SecretBundle {
253    let mut b = SecretBundle::new();
254    b.insert(
255        "access_token".to_string(),
256        RedactedString::new(rec.access_token.clone()),
257    );
258    b.insert(
259        "refresh_token".to_string(),
260        RedactedString::new(rec.refresh_token.clone()),
261    );
262    b
263}
264
265async fn write_secret_file(path: &Path, contents: &str) -> io::Result<()> {
266    let json = serde_json::to_string(contents).map_err(io::Error::other)?;
267    tokio::fs::write(path, json.as_bytes()).await?;
268    #[cfg(unix)]
269    set_unix_mode(path, 0o600).await?;
270    Ok(())
271}
272
273async fn read_secret_file(path: &Path) -> io::Result<String> {
274    let raw = tokio::fs::read_to_string(path).await?;
275    serde_json::from_str::<String>(&raw).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
276}
277
278#[cfg(unix)]
279async fn set_unix_mode(path: &Path, mode: u32) -> io::Result<()> {
280    use std::os::unix::fs::PermissionsExt;
281    let perms = std::fs::Permissions::from_mode(mode);
282    tokio::fs::set_permissions(path, perms).await
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288    use std::time::Duration;
289
290    fn now_plus(secs: u64) -> SystemTime {
291        SystemTime::now() + Duration::from_secs(secs)
292    }
293
294    #[tokio::test]
295    async fn put_then_resolve_returns_bundle() {
296        let dir = tempfile::tempdir().unwrap();
297        let broker = FileTokenBroker::new(dir.path());
298        broker
299            .put(
300                "agent-A",
301                FileTokenRecord::from_expires_at("acc-A", "ref-A", now_plus(3600)),
302            )
303            .await
304            .unwrap();
305        let bundle = broker
306            .resolve(Some("agent-A"))
307            .await
308            .unwrap()
309            .expect("bundle present");
310        assert_eq!(bundle.get("access_token").unwrap().expose(), "acc-A");
311        assert_eq!(bundle.get("refresh_token").unwrap().expose(), "ref-A");
312    }
313
314    #[tokio::test]
315    async fn resolve_unknown_bearer_is_none() {
316        let dir = tempfile::tempdir().unwrap();
317        let broker = FileTokenBroker::new(dir.path());
318        assert!(
319            broker
320                .resolve(Some("does-not-exist"))
321                .await
322                .unwrap()
323                .is_none()
324        );
325    }
326
327    #[tokio::test]
328    async fn resolve_anonymous_caller_is_none() {
329        let dir = tempfile::tempdir().unwrap();
330        let broker = FileTokenBroker::new(dir.path());
331        broker
332            .put(
333                "agent-A",
334                FileTokenRecord::from_expires_at("a", "r", now_plus(3600)),
335            )
336            .await
337            .unwrap();
338        assert!(broker.resolve(None).await.unwrap().is_none());
339    }
340
341    #[tokio::test]
342    async fn read_record_round_trips() {
343        let dir = tempfile::tempdir().unwrap();
344        let broker = FileTokenBroker::new(dir.path());
345        let rec = FileTokenRecord::from_expires_at("aaa", "rrr", now_plus(7200));
346        broker.put("agent-A", rec.clone()).await.unwrap();
347        let back = broker.read_record("agent-A").await.unwrap().unwrap();
348        assert_eq!(back, rec);
349    }
350
351    #[tokio::test]
352    async fn persistence_survives_broker_restart() {
353        let dir = tempfile::tempdir().unwrap();
354        {
355            let b1 = FileTokenBroker::new(dir.path());
356            b1.put(
357                "agent-A",
358                FileTokenRecord::from_expires_at("acc1", "ref1", now_plus(3600)),
359            )
360            .await
361            .unwrap();
362        }
363        let b2 = FileTokenBroker::new(dir.path());
364        let bundle = b2.resolve(Some("agent-A")).await.unwrap().unwrap();
365        assert_eq!(bundle.get("access_token").unwrap().expose(), "acc1");
366    }
367
368    #[tokio::test]
369    async fn cache_isolated_between_bearers() {
370        let dir = tempfile::tempdir().unwrap();
371        let broker = FileTokenBroker::new(dir.path());
372        broker
373            .put(
374                "agent-A",
375                FileTokenRecord::from_expires_at("acc-A", "ref-A", now_plus(3600)),
376            )
377            .await
378            .unwrap();
379        broker
380            .put(
381                "agent-B",
382                FileTokenRecord::from_expires_at("acc-B", "ref-B", now_plus(3600)),
383            )
384            .await
385            .unwrap();
386        let a = broker.resolve(Some("agent-A")).await.unwrap().unwrap();
387        let b = broker.resolve(Some("agent-B")).await.unwrap().unwrap();
388        assert_eq!(a.get("access_token").unwrap().expose(), "acc-A");
389        assert_eq!(b.get("access_token").unwrap().expose(), "acc-B");
390    }
391
392    #[tokio::test]
393    async fn is_near_expiry_true_inside_window() {
394        let dir = tempfile::tempdir().unwrap();
395        let broker = FileTokenBroker::new(dir.path()).with_refresh_window(Duration::from_secs(300));
396        broker
397            .put(
398                "agent-A",
399                FileTokenRecord::from_expires_at("a", "r", now_plus(120)),
400            )
401            .await
402            .unwrap();
403        assert!(broker.is_near_expiry("agent-A").await);
404    }
405
406    #[tokio::test]
407    async fn is_near_expiry_false_outside_window() {
408        let dir = tempfile::tempdir().unwrap();
409        let broker = FileTokenBroker::new(dir.path()).with_refresh_window(Duration::from_secs(300));
410        broker
411            .put(
412                "agent-A",
413                FileTokenRecord::from_expires_at("a", "r", now_plus(3600)),
414            )
415            .await
416            .unwrap();
417        assert!(!broker.is_near_expiry("agent-A").await);
418    }
419
420    #[tokio::test]
421    async fn is_near_expiry_true_when_already_expired() {
422        let dir = tempfile::tempdir().unwrap();
423        let broker = FileTokenBroker::new(dir.path());
424        // expires_at = UNIX_EPOCH; firmly in the past.
425        broker
426            .put(
427                "agent-A",
428                FileTokenRecord {
429                    access_token: "a".into(),
430                    refresh_token: "r".into(),
431                    expires_at_unix: 0,
432                },
433            )
434            .await
435            .unwrap();
436        assert!(broker.is_near_expiry("agent-A").await);
437    }
438
439    #[tokio::test]
440    async fn is_near_expiry_unknown_bearer_false() {
441        let dir = tempfile::tempdir().unwrap();
442        let broker = FileTokenBroker::new(dir.path());
443        assert!(!broker.is_near_expiry("never-seen").await);
444    }
445
446    #[tokio::test]
447    async fn refresh_lock_serialises_same_bearer() {
448        use tokio::sync::Notify;
449
450        let dir = tempfile::tempdir().unwrap();
451        let broker = Arc::new(FileTokenBroker::new(dir.path()));
452
453        let started = Arc::new(Notify::new());
454        let release = Arc::new(Notify::new());
455        let broker_a = broker.clone();
456        let started_a = started.clone();
457        let release_a = release.clone();
458
459        // Task A grabs the lock and waits to be released.
460        let task_a = tokio::spawn(async move {
461            let _guard = broker_a.lock_refresh("agent-A").await;
462            started_a.notify_one();
463            release_a.notified().await;
464        });
465
466        started.notified().await;
467
468        // Task B must NOT acquire while A holds.
469        let broker_b = broker.clone();
470        let task_b = tokio::spawn(async move {
471            let _guard = broker_b.lock_refresh("agent-A").await;
472            "got-it"
473        });
474
475        // Give B a real shot at acquiring; it shouldn't.
476        tokio::time::sleep(Duration::from_millis(50)).await;
477        assert!(!task_b.is_finished(), "task B acquired before A released");
478
479        release.notify_one();
480        task_a.await.unwrap();
481        assert_eq!(task_b.await.unwrap(), "got-it");
482    }
483
484    #[tokio::test]
485    async fn refresh_lock_independent_across_bearers() {
486        let dir = tempfile::tempdir().unwrap();
487        let broker = Arc::new(FileTokenBroker::new(dir.path()));
488
489        let _g_a = broker.lock_refresh("agent-A").await;
490        // Holding A's lock must not block B.
491        let started_b = std::time::Instant::now();
492        let _g_b = broker.lock_refresh("agent-B").await;
493        assert!(
494            started_b.elapsed() < Duration::from_millis(50),
495            "lock_refresh blocked across bearers"
496        );
497    }
498
499    #[cfg(unix)]
500    #[tokio::test]
501    async fn unix_file_permissions_are_0600() {
502        use std::os::unix::fs::PermissionsExt;
503        let dir = tempfile::tempdir().unwrap();
504        let broker = FileTokenBroker::new(dir.path());
505        broker
506            .put(
507                "agent-A",
508                FileTokenRecord::from_expires_at("a", "r", now_plus(3600)),
509            )
510            .await
511            .unwrap();
512        let bearer_dir = dir.path().join("agent-A");
513        let dir_mode = std::fs::metadata(&bearer_dir).unwrap().permissions().mode() & 0o777;
514        assert_eq!(
515            dir_mode, 0o700,
516            "bearer dir should be 0700, got {dir_mode:o}"
517        );
518        for name in ["access_token.json", "refresh_token.json", "expires_at.json"] {
519            let mode = std::fs::metadata(bearer_dir.join(name))
520                .unwrap()
521                .permissions()
522                .mode()
523                & 0o777;
524            assert_eq!(mode, 0o600, "{name} should be 0600, got {mode:o}");
525        }
526    }
527}