axess-core 0.2.0

Core implementation for the axess library. Session state machine, multi-factor authentication engine, Cedar Policy evaluation, and pluggable storage backends. Use the `axess` facade crate unless you need direct access to internals.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
//! Cluster-tier (Valkey/Redis) cache decorator for [`RequestEntityProvider`].
//!
//! Wraps any [`RequestEntityProvider`] in a network-shared cache so that
//! repeat authorization checks across pods reuse the same entity build.
//! Typically composed under [`super::MokaEntityCache`] as the L2 tier.

use std::sync::Arc;
use std::time::Duration;

use cedar_policy::{Entities, EntityUid};
use fred::prelude::*;
use tracing::warn;

use crate::authz::error::AuthzError;
use crate::authz::provider::RequestEntityProvider;
use crate::health::{HealthCheck, HealthStatus};
use crate::session::AuthSession;

const DEFAULT_PREFIX: &str = "axess";
const DEFAULT_TTL_SECS: u64 = 60;

/// Cluster-tier cache decorator over a [`RequestEntityProvider`].
///
/// # Tier
///
/// Network-shared cache via [`fred`] (Valkey/Redis). Sub-millisecond
/// lookup, cluster-wide scope. Use as the L2 tier; combine under an
/// in-process [`super::MokaEntityCache`] L1 for sub-microsecond hot-path
/// hits.
///
/// # Construction
///
/// ```rust,ignore
/// use axess_core::authz::cache::{MokaEntityCache, ValkeyEntityCache};
/// use std::sync::Arc;
/// use std::time::Duration;
///
/// let provider = MyEntityProvider::new(db);
/// let provider = ValkeyEntityCache::new(provider, valkey_client)
///     .with_ttl(Duration::from_secs(60));
/// let provider = MokaEntityCache::new(provider).with_ttl(Duration::from_secs(5));
/// let provider: Arc<dyn axess_core::authz::RequestEntityProvider> = Arc::new(provider);
/// ```
///
/// # Wire format
///
/// Cedar `Entities` are serialised as JSON via
/// `Entities::to_json_value` / `Entities::from_json_value` and stored as
/// UTF-8 bytes in Valkey. JSON is the only stable serialization format
/// `cedar_policy::Entities` exposes today; round-trip preserves entity
/// attributes, parents, and types.
///
/// # Cache key
///
/// `{prefix}:authz-entities:{principal}|{resource}|{action}`: the three
/// `EntityUid` `Display` representations joined with `|`. Cedar UIDs are
/// already serialisable text and unique enough that no extra hashing is
/// needed.
///
/// # Cross-pod invalidation
///
/// Apps that need cross-pod invalidation publish on the channel
/// `{prefix}:authz-entities-invalidated:{principal}` and have each pod
/// subscribe at startup. Because the cache key includes the principal
/// UID first, listeners can match by prefix or wildcard. axess does not
/// auto-subscribe; invalidation policy is application-specific (which
/// data mutations require which scope of cache eviction). See the design
/// doc for the recommended pattern.
///
/// # Errors
///
/// Errors from the inner provider are NOT cached. Errors from Valkey
/// (network, serialization) degrade silently to "cache miss + no
/// populate": the inner provider still answers correctly; the next
/// request retries. axess emits `tracing::warn!` on cache I/O failure
/// so operators can detect Valkey outages without authorization breaking.
pub struct ValkeyEntityCache<P>
where
    P: RequestEntityProvider,
{
    inner: P,
    client: Client,
    prefix: Arc<str>,
    ttl: Duration,
}

impl<P> ValkeyEntityCache<P>
where
    P: RequestEntityProvider,
{
    /// Wrap `inner` with a Valkey cache using the default prefix
    /// (`axess`) and 60-second TTL.
    pub fn new(inner: P, client: Client) -> Self {
        Self {
            inner,
            client,
            prefix: DEFAULT_PREFIX.into(),
            ttl: Duration::from_secs(DEFAULT_TTL_SECS),
        }
    }

    /// Construct with explicit prefix and TTL.
    pub fn with_options(
        inner: P,
        client: Client,
        prefix: impl Into<Arc<str>>,
        ttl: Duration,
    ) -> Self {
        Self {
            inner,
            client,
            prefix: prefix.into(),
            ttl,
        }
    }

    /// Fluent: set TTL.
    pub fn with_ttl(mut self, ttl: Duration) -> Self {
        self.ttl = ttl;
        self
    }

    /// Fluent: set key prefix.
    pub fn with_prefix(mut self, prefix: impl Into<Arc<str>>) -> Self {
        self.prefix = prefix.into();
        self
    }

    /// Build the cache key for `(principal, tenant, resource, action)`.
    /// Tenant from session disambiguates entries for the same user
    /// acting in different tenants (membership model).
    fn cache_key(
        &self,
        principal: &EntityUid,
        tenant: Option<&str>,
        resource: &EntityUid,
        action: &EntityUid,
    ) -> String {
        format!(
            "{}:authz-entities:{principal}|{t}|{resource}|{action}",
            self.prefix,
            t = tenant.unwrap_or(""),
        )
    }

    /// Invalidate the cache entry for `(principal, tenant, resource, action)`.
    ///
    /// Operates on the local Valkey state only; for cross-pod propagation,
    /// publish on `{prefix}:authz-entities-invalidated:{principal}` and
    /// have each pod subscribe.
    pub async fn invalidate(
        &self,
        principal: &EntityUid,
        tenant: Option<&str>,
        resource: &EntityUid,
        action: &EntityUid,
    ) {
        let key = self.cache_key(principal, tenant, resource, action);
        let _: Result<(), _> = self.client.del(&key).await;
    }

    /// Borrow the inner provider for read access.
    pub fn inner(&self) -> &P {
        &self.inner
    }

    /// Glob-pattern prefix matching every cache key this decorator
    /// writes. Used by the scope-invalidation helpers to scope SCAN
    /// to this decorator's keys instead of the whole Valkey instance.
    fn key_prefix_pattern(&self) -> String {
        format!("{}:authz-entities:*", self.prefix)
    }

    /// SCAN + DEL every key matching `pattern`. Iterates with a small
    /// COUNT hint so a populated cache doesn't block the Valkey
    /// connection on a single round-trip.
    ///
    /// Uses cursor-based iteration directly (no `Stream` adapter) to
    /// avoid pulling `futures-util` into axess-core's transitive
    /// surface for one call site.
    async fn scan_and_delete(&self, pattern: &str) -> Result<(), fred::error::Error> {
        let mut cursor: String = "0".to_string();
        loop {
            let (next_cursor, batch): (String, Vec<String>) = self
                .client
                .scan_page(cursor.clone(), pattern, Some(256), None)
                .await?;
            if !batch.is_empty() {
                let _: () = self.client.del(batch).await?;
            }
            if next_cursor == "0" {
                break;
            }
            cursor = next_cursor;
        }
        Ok(())
    }

    /// Drop every entry under this decorator's prefix.
    ///
    /// Runs a SCAN over `{prefix}:authz-entities:*` and DELs the
    /// matching keys in batches. Cheap for small caches, O(N) over
    /// the live key set otherwise.
    pub async fn invalidate_all(&self) -> Result<(), fred::error::Error> {
        let pattern = self.key_prefix_pattern();
        self.scan_and_delete(&pattern).await
    }

    /// Drop every cache entry whose `principal` segment matches.
    ///
    /// Uses Valkey's `SCAN MATCH` with the per-key encoding
    /// `{prefix}:authz-entities:{principal}|{tenant}|{resource}|{action}`,
    /// so the glob pattern `{prefix}:authz-entities:{principal}|*`
    /// touches only matching entries; no whole-keyspace scan.
    pub async fn invalidate_principal(
        &self,
        principal: &EntityUid,
    ) -> Result<(), fred::error::Error> {
        let pattern = format!("{}:authz-entities:{principal}|*", self.prefix);
        self.scan_and_delete(&pattern).await
    }

    /// Drop every cache entry whose `tenant` segment matches.
    ///
    /// Implementation note: the cache key encodes principal **before**
    /// tenant, so a glob like `*|{tenant}|*` would match across the
    /// whole `*` (principal) range, which Valkey supports but
    /// requires care if the principal stringification can contain
    /// `|`. Cedar EntityUid strings do not contain `|` in practice,
    /// so the pattern is safe; if you adopt a custom EntityUid format
    /// that does, prefer a per-tenant key prefix instead.
    pub async fn invalidate_tenant(&self, tenant: &str) -> Result<(), fred::error::Error> {
        let pattern = format!("{}:authz-entities:*|{tenant}|*", self.prefix);
        self.scan_and_delete(&pattern).await
    }
}

impl<P> super::invalidator::CacheInvalidator for ValkeyEntityCache<P>
where
    P: RequestEntityProvider + 'static,
{
    type Error = fred::error::Error;

    async fn invalidate_principal(&self, principal: &EntityUid) -> Result<(), Self::Error> {
        ValkeyEntityCache::invalidate_principal(self, principal).await
    }

    async fn invalidate_tenant(&self, tenant: &str) -> Result<(), Self::Error> {
        ValkeyEntityCache::invalidate_tenant(self, tenant).await
    }

    async fn invalidate_all(&self) -> Result<(), Self::Error> {
        ValkeyEntityCache::invalidate_all(self).await
    }
}

impl<P> RequestEntityProvider for ValkeyEntityCache<P>
where
    P: RequestEntityProvider,
{
    fn entities_for<'a>(
        &'a self,
        session: &'a AuthSession,
        principal: &'a EntityUid,
        resource: &'a EntityUid,
        action: &'a EntityUid,
    ) -> std::pin::Pin<
        Box<dyn std::future::Future<Output = Result<Entities, AuthzError>> + Send + 'a>,
    > {
        Box::pin(async move {
            let tenant = session.tenant_id().await;
            let tenant_str = tenant.as_ref().map(|t| t.to_string());
            let key = self.cache_key(principal, tenant_str.as_deref(), resource, action);

            // Cache lookup. Failures (network, deser) degrade to miss
            // with a warn; never break authorization for a cache fault.
            match self.client.get::<Option<String>, _>(&key).await {
                Ok(Some(json_str)) => match serde_json::from_str::<serde_json::Value>(&json_str) {
                    Ok(json) => match Entities::from_json_value(json, None) {
                        Ok(entities) => return Ok(entities),
                        Err(e) => {
                            warn!(?e, key = %key, "valkey cache: Entities::from_json_value failed; treating as miss");
                        }
                    },
                    Err(e) => {
                        warn!(?e, key = %key, "valkey cache: serde_json parse failed; treating as miss");
                    }
                },
                Ok(None) => {} // miss; fall through
                Err(e) => {
                    warn!(?e, key = %key, "valkey cache: GET failed; treating as miss");
                }
            }

            // Build via inner provider.
            let entities = self
                .inner
                .entities_for(session, principal, resource, action)
                .await?;

            // Best-effort populate. Failure to write is a warn, not a hard
            // error; the request already has its answer.
            match entities.to_json_value() {
                Ok(json) => match serde_json::to_string(&json) {
                    Ok(s) => {
                        let ttl_secs = self.ttl.as_secs().min(i64::MAX as u64) as i64;
                        let res: Result<(), _> = self
                            .client
                            .set(&key, s, Some(Expiration::EX(ttl_secs)), None, false)
                            .await;
                        if let Err(e) = res {
                            warn!(?e, key = %key, "valkey cache: SET failed");
                        }
                    }
                    Err(e) => warn!(?e, key = %key, "valkey cache: serde_json serialize failed"),
                },
                Err(e) => warn!(?e, key = %key, "valkey cache: Entities::to_json_value failed"),
            }

            Ok(entities)
        })
    }
}

// ── HealthCheck ──────────────────────────────────────────────────────────────

/// Bounded health check: round-trips `PING` to Valkey with a 2-second
/// ceiling. The cache itself is fail-soft (errors degrade to
/// cache-miss + warn-log, not a denied authorization), so an
/// unhealthy result here is operational information rather than a
/// circuit-breaker trigger; adopters running readiness probes can
/// surface "Valkey down → no shared L2 cache, falling back to direct
/// provider" as a degraded-but-serving state.
impl<P> HealthCheck for ValkeyEntityCache<P>
where
    P: RequestEntityProvider + Send + Sync,
{
    fn check(
        &self,
    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = HealthStatus> + Send + '_>> {
        Box::pin(async {
            match tokio::time::timeout(
                std::time::Duration::from_secs(2),
                self.client.ping::<()>(None),
            )
            .await
            {
                Ok(Ok(_)) => HealthStatus::Healthy,
                Ok(Err(e)) => HealthStatus::Unhealthy(format!("valkey cache PING failed: {e}")),
                Err(_) => HealthStatus::Unhealthy("valkey cache PING timeout (2s)".into()),
            }
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use cedar_policy::Entities;
    use std::str::FromStr;

    struct StubProvider;
    impl RequestEntityProvider for StubProvider {
        fn entities_for<'a>(
            &'a self,
            _session: &'a AuthSession,
            _principal: &'a EntityUid,
            _resource: &'a EntityUid,
            _action: &'a EntityUid,
        ) -> std::pin::Pin<
            Box<dyn std::future::Future<Output = Result<Entities, AuthzError>> + Send + 'a>,
        > {
            Box::pin(async { Ok(Entities::empty()) })
        }
    }

    fn uid(s: &str) -> EntityUid {
        EntityUid::from_str(s).expect("valid EntityUid literal")
    }

    fn cache() -> ValkeyEntityCache<StubProvider> {
        ValkeyEntityCache::new(StubProvider, Client::default())
    }

    #[test]
    fn cache_key_distinguishes_inputs_and_carries_components() {
        let c = cache();
        let p = uid("App::User::\"alice\"");
        let r1 = uid("App::Doc::\"doc-1\"");
        let r2 = uid("App::Doc::\"doc-2\"");
        let a = uid("App::Action::\"View\"");

        let k1 = c.cache_key(&p, Some("t1"), &r1, &a);
        let k2 = c.cache_key(&p, Some("t1"), &r2, &a);
        let k3 = c.cache_key(&p, Some("t2"), &r1, &a);
        let k_none = c.cache_key(&p, None, &r1, &a);

        assert!(
            k1.contains("alice") && k1.contains("doc-1") && k1.contains("View"),
            "key must carry principal/resource/action: {k1}"
        );
        assert!(
            k1.starts_with(&format!("{DEFAULT_PREFIX}:authz-entities:")),
            "key must carry prefix namespace: {k1}"
        );
        assert_ne!(k1, k2, "different resources must yield different keys");
        assert_ne!(k1, k3, "different tenants must yield different keys");
        assert_ne!(
            k1, k_none,
            "Some(tenant) and None must yield different keys"
        );
        assert!(
            !k1.is_empty() && k1 != "xyzzy",
            "kills `String::new()` and `xyzzy` mutations"
        );
    }
}