autumn-web 0.4.0

An opinionated, convention-over-configuration web framework for Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
//! ISR coordination backends.
//!
//! Controls which replica may regenerate a given route within a revalidation
//! window. The [`LocalIsrCoordinator`] is a true no-op: all local
//! deduplication is already handled by the per-route `AtomicBool` in
//! `StaticFileLayer`, so a single process never needs an additional lock.
//! The [`PostgresIsrCoordinator`] uses `pg_try_advisory_lock` to ensure that
//! at most one replica regenerates a given route per revalidation window
//! across the entire fleet.
//!
//! ## Local vs multi-replica contract
//!
//! | Deployment | Recommended coordinator | Guarantee |
//! |------------|-------------------------|-----------|
//! | Single process / dev | `LocalIsrCoordinator` (default) | At most one in-flight task per route per process |
//! | Multi-replica (shared `dist/`) | `PostgresIsrCoordinator` | At most one regeneration per route per revalidation window across the fleet |
//! | Read-only `dist/` (build-time only) | Disable ISR (`revalidate` = None) | No runtime writes |
//!
//! ## Atomic writes
//!
//! Regardless of coordinator, regeneration always writes to a `.tmp` file
//! then renames atomically so a reader never observes a partially written
//! page.

use std::future::Future;
use std::pin::Pin;

use sha2::{Digest as _, Sha256};

/// Boxed async future returned by coordinator operations.
pub type IsrFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

/// Coordination backend for ISR background regeneration.
///
/// Implementations decide whether *this* replica may regenerate a given route
/// for a given revalidation window. Returning `false` from
/// [`try_acquire`](Self::try_acquire) means another task or replica has
/// already claimed the work; the caller should skip regeneration.
///
/// Implementors must also call [`release`](Self::release) after the
/// regeneration attempt so that subsequent windows can be acquired.
pub trait IsrCoordinator: Send + Sync + 'static {
    /// Short backend identifier used in log messages.
    fn backend(&self) -> &'static str;

    /// Try to acquire the right to regenerate `url_path` for `window_key`.
    ///
    /// `window_key` is derived from [`isr_window_key`] and encodes both the
    /// route and the current revalidation bucket; two replicas that observe
    /// the same stale file within the same window will produce the same key.
    ///
    /// Returns `true` when this caller may proceed; `false` when another
    /// task or replica already holds the lock for this (route, window) pair.
    fn try_acquire<'a>(&'a self, url_path: &'a str, window_key: &'a str) -> IsrFuture<'a, bool>;

    /// Release the lock after regeneration completes (success or failure).
    ///
    /// Must be called exactly once for every successful [`try_acquire`](Self::try_acquire).
    fn release<'a>(&'a self, url_path: &'a str, window_key: &'a str) -> IsrFuture<'a, ()>;
}

/// In-process ISR coordinator — always grants the lock.
///
/// This is the **default** coordinator and is a true no-op. Local
/// deduplication is entirely handled by the per-route `AtomicBool` in
/// `StaticFileLayer`: only one background task can be spawned per route at
/// a time, so the coordinator will never see a concurrent call for the same
/// (route, window) pair within a single process.
///
/// For multi-replica deployments use [`PostgresIsrCoordinator`] (feature
/// `db`) which enforces fleet-wide deduplication via `pg_try_advisory_lock`.
#[derive(Debug, Default)]
pub struct LocalIsrCoordinator;

impl LocalIsrCoordinator {
    /// Create a new in-process coordinator.
    #[must_use]
    pub const fn new() -> Self {
        Self
    }
}

impl IsrCoordinator for LocalIsrCoordinator {
    fn backend(&self) -> &'static str {
        "local"
    }

    fn try_acquire<'a>(&'a self, _url_path: &'a str, _window_key: &'a str) -> IsrFuture<'a, bool> {
        Box::pin(async move { true })
    }

    fn release<'a>(&'a self, _url_path: &'a str, _window_key: &'a str) -> IsrFuture<'a, ()> {
        Box::pin(async move {})
    }
}

/// Postgres advisory-lock ISR coordinator.
///
/// Uses `pg_try_advisory_lock` / `pg_advisory_unlock` to prevent duplicate
/// regeneration across replicas. Requires the `db` feature.
///
/// The advisory lock is keyed on [`isr_advisory_lock_key`] which is derived
/// from (route, window) -- two replicas that see the same stale page in the
/// same revalidation window will attempt the same lock key, and only the
/// first will succeed.
///
/// ## Connection management
///
/// Postgres session-level advisory locks are bound to the database connection
/// that acquired them. This coordinator therefore **holds the connection** that
/// won the lock (stored in an internal map keyed by advisory lock key) and
/// retrieves the same connection in [`IsrCoordinator::release`] to call
/// `pg_advisory_unlock` before returning it to the pool.
///
/// Connections held during regeneration are not available to other tasks.
/// Regeneration tasks that fail to acquire the Postgres lock return their
/// pooled connection immediately.
#[cfg(feature = "db")]
pub struct PostgresIsrCoordinator {
    pool: diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>,
    /// Connections holding active advisory locks, keyed by advisory lock key.
    /// The connection must be reused for the unlock call because session-level
    /// advisory locks are connection-scoped.
    held: std::sync::Mutex<
        std::collections::HashMap<
            i64,
            diesel_async::pooled_connection::deadpool::Object<diesel_async::AsyncPgConnection>,
        >,
    >,
}

#[cfg(feature = "db")]
impl PostgresIsrCoordinator {
    /// Create a Postgres ISR coordinator backed by the given connection pool.
    #[must_use]
    pub fn new(
        pool: diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>,
    ) -> Self {
        Self {
            pool,
            held: std::sync::Mutex::new(std::collections::HashMap::new()),
        }
    }
}

#[cfg(feature = "db")]
impl IsrCoordinator for PostgresIsrCoordinator {
    fn backend(&self) -> &'static str {
        "postgres"
    }

    fn try_acquire<'a>(&'a self, url_path: &'a str, window_key: &'a str) -> IsrFuture<'a, bool> {
        let lock_key = isr_advisory_lock_key(url_path, window_key);
        Box::pin(async move {
            let mut conn = match self.pool.get().await {
                Ok(c) => c,
                Err(e) => {
                    tracing::warn!(error = %e, "ISR coordinator: could not get Postgres connection");
                    return false;
                }
            };
            match try_pg_advisory_lock(&mut conn, lock_key).await {
                Ok(true) => {
                    // Hold the connection so we can unlock on the same session.
                    self.held.lock().unwrap().insert(lock_key, conn);
                    true
                }
                Ok(false) => false,
                Err(e) => {
                    tracing::warn!(error = %e, "ISR coordinator: pg_try_advisory_lock failed");
                    false
                }
            }
        })
    }

    fn release<'a>(&'a self, url_path: &'a str, window_key: &'a str) -> IsrFuture<'a, ()> {
        let lock_key = isr_advisory_lock_key(url_path, window_key);
        Box::pin(async move {
            // Retrieve the connection that holds the lock.
            // Extract outside `let...else` so the mutex guard drops immediately.
            let maybe_conn = self.held.lock().unwrap().remove(&lock_key);
            let Some(mut conn) = maybe_conn else {
                tracing::warn!(
                    lock_key,
                    "ISR coordinator: release called without a held connection"
                );
                return;
            };
            match unlock_pg_advisory_lock(&mut conn, lock_key).await {
                Ok(false) => {
                    tracing::warn!(
                        lock_key,
                        "ISR coordinator: pg_advisory_unlock returned false (lock already released)"
                    );
                }
                Ok(true) => {}
                Err(e) => {
                    tracing::warn!(error = %e, "ISR coordinator: pg_advisory_unlock failed");
                }
            }
            // conn is dropped here, returning the connection to the pool.
        })
    }
}

// ---------------------------------------------------------------------------
// Key derivation utilities (public -- useful in application code and tests)
// ---------------------------------------------------------------------------

/// Compute the revalidation window key for a route and the current time.
///
/// All replicas that evaluate a given route as stale within the same
/// revalidation period will produce the same key, making it safe to use
/// as a distributed lock discriminator.
///
/// # Arguments
///
/// * `url_path` -- The URL path of the route (e.g. `"/about"`).
/// * `revalidate_secs` -- The ISR interval in seconds (0 is treated as 1).
/// * `now_unix_secs` -- Current Unix timestamp in seconds.
///
/// # Returns
///
/// A string of the form `"{url_path}:{bucket}"` where `bucket = now / interval`.
///
/// # Examples
///
/// ```rust
/// use autumn_web::static_gen::isr_window_key;
///
/// let url_path = "/about";
/// let revalidate_secs = 60;
/// let now_unix_secs = 1_700_000_000;
/// let key = isr_window_key(url_path, revalidate_secs, now_unix_secs);
/// assert_eq!(key, "/about:28333333");
/// ```
#[must_use]
pub fn isr_window_key(url_path: &str, revalidate_secs: u64, now_unix_secs: u64) -> String {
    let interval = revalidate_secs.max(1);
    let bucket = now_unix_secs / interval;
    format!("{url_path}:{bucket}")
}

/// Derive a stable signed 64-bit advisory lock key for a (route, window) pair.
///
/// Suitable for `pg_try_advisory_lock`. The result is a deterministic hash
/// of the inputs; different routes or different windows produce different keys
/// with overwhelming probability.
///
/// # Examples
///
/// ```rust
/// use autumn_web::static_gen::{isr_window_key, isr_advisory_lock_key};
///
/// let window_key = isr_window_key("/about", 60, 1_700_000_000);
/// let lock_key = isr_advisory_lock_key("/about", &window_key);
/// // The value is a stable 64-bit integer
/// assert_ne!(lock_key, 0);
/// ```
#[must_use]
pub fn isr_advisory_lock_key(url_path: &str, window_key: &str) -> i64 {
    let mut hasher = Sha256::new();
    hasher.update(b"isr\0");
    hasher.update(url_path.as_bytes());
    hasher.update(b"\0");
    hasher.update(window_key.as_bytes());
    let digest = hasher.finalize();
    let mut bytes = [0_u8; 8];
    bytes.copy_from_slice(&digest[..8]);
    i64::from_be_bytes(bytes)
}

// ---------------------------------------------------------------------------
// Postgres helpers (feature = "db")
// ---------------------------------------------------------------------------

#[cfg(feature = "db")]
#[derive(diesel::QueryableByName)]
struct PgAdvisoryLockRow {
    #[diesel(sql_type = diesel::sql_types::Bool)]
    acquired: bool,
}

#[cfg(feature = "db")]
async fn try_pg_advisory_lock(
    conn: &mut diesel_async::pooled_connection::deadpool::Object<diesel_async::AsyncPgConnection>,
    key: i64,
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
    use diesel_async::RunQueryDsl as _;

    let row = diesel::sql_query("SELECT pg_try_advisory_lock($1) AS acquired")
        .bind::<diesel::sql_types::BigInt, _>(key)
        .get_result::<PgAdvisoryLockRow>(&mut **conn)
        .await?;
    Ok(row.acquired)
}

#[cfg(feature = "db")]
#[derive(diesel::QueryableByName)]
struct PgAdvisoryUnlockRow {
    #[diesel(sql_type = diesel::sql_types::Bool)]
    released: bool,
}

#[cfg(feature = "db")]
async fn unlock_pg_advisory_lock(
    conn: &mut diesel_async::pooled_connection::deadpool::Object<diesel_async::AsyncPgConnection>,
    key: i64,
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
    use diesel_async::RunQueryDsl as _;

    let row = diesel::sql_query("SELECT pg_advisory_unlock($1) AS released")
        .bind::<diesel::sql_types::BigInt, _>(key)
        .get_result::<PgAdvisoryUnlockRow>(&mut **conn)
        .await?;
    Ok(row.released)
}

// ---------------------------------------------------------------------------
// Unit tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;

    // --- isr_window_key ---

    #[test]
    fn window_key_stable_within_interval() {
        // Bucket 28_333_333 covers [1_699_999_980, 1_700_000_039].
        let a = isr_window_key("/about", 60, 1_700_000_000);
        let b = isr_window_key("/about", 60, 1_700_000_039);
        assert_eq!(a, b);
    }

    #[test]
    fn window_key_changes_on_boundary() {
        // 1_700_000_039 is bucket 28_333_333; 1_700_000_040 is bucket 28_333_334.
        let a = isr_window_key("/about", 60, 1_700_000_039);
        let b = isr_window_key("/about", 60, 1_700_000_040);
        assert_ne!(a, b);
    }

    #[test]
    fn window_key_route_prefix() {
        let key = isr_window_key("/about", 60, 1_700_000_000);
        assert!(
            key.starts_with("/about:"),
            "key should start with route: {key}"
        );
    }

    #[test]
    fn window_key_zero_revalidate_no_panic() {
        let key = isr_window_key("/edge", 0, 42);
        assert!(!key.is_empty());
    }

    // --- isr_advisory_lock_key ---

    #[test]
    fn advisory_key_deterministic() {
        let a = isr_advisory_lock_key("/about", "/about:28333333");
        let b = isr_advisory_lock_key("/about", "/about:28333333");
        assert_eq!(a, b);
    }

    #[test]
    fn advisory_key_differs_by_route() {
        let a = isr_advisory_lock_key("/", "/about:28333333");
        let b = isr_advisory_lock_key("/about", "/about:28333333");
        assert_ne!(a, b);
    }

    #[test]
    fn advisory_key_differs_by_window() {
        let a = isr_advisory_lock_key("/about", "/about:1");
        let b = isr_advisory_lock_key("/about", "/about:2");
        assert_ne!(a, b);
    }

    // --- LocalIsrCoordinator ---

    #[tokio::test]
    async fn local_coordinator_always_grants() {
        // LocalIsrCoordinator is a no-op: local dedup is handled by the
        // AtomicBool in StaticFileLayer, not by this coordinator.
        let c = LocalIsrCoordinator::new();
        assert!(c.try_acquire("/a", "w1").await);
        // Even a second concurrent call returns true (no-op).
        assert!(c.try_acquire("/a", "w1").await);
    }

    #[tokio::test]
    async fn local_coordinator_release_is_noop() {
        let c = LocalIsrCoordinator::new();
        // release should not panic and the coordinator continues to grant.
        c.release("/a", "w1").await;
        assert!(c.try_acquire("/a", "w1").await);
    }
}