Skip to main content

spark/
http.rs

1//! HTTP handlers for Spark: the `/_spark/update` round-trip endpoint and the
2//! `/_spark/spark.js` runtime asset.
3
4use std::collections::HashMap;
5use std::time::Duration;
6
7use axum::body::Body;
8use axum::extract::State;
9use axum::http::{header, StatusCode};
10use axum::response::{IntoResponse, Response};
11use axum::Json;
12use once_cell::sync::Lazy;
13use serde_json::json;
14use tower_sessions::Session;
15
16use anvil_core::Container;
17use anvil_core::Error;
18
19use crate::component::Ctx;
20use crate::morph;
21use crate::registry;
22use crate::request::UpdateRequest;
23use crate::response::{ComponentResult, Effects, IslandHtml, UpdateResponse};
24use crate::snapshot::{self, Memo};
25
26pub const RUNTIME_JS: &[u8] = include_bytes!("../dist/spark.min.js");
27
28/// `GET /_spark/spark.js` — serve the embedded JS runtime.
29pub async fn runtime_js() -> impl IntoResponse {
30    (
31        StatusCode::OK,
32        [
33            (
34                header::CONTENT_TYPE,
35                "application/javascript; charset=utf-8",
36            ),
37            (header::CACHE_CONTROL, "public, max-age=31536000, immutable"),
38        ],
39        RUNTIME_JS,
40    )
41}
42
43/// HTTP 419 "Page Expired" — Laravel/Livewire convention for stale-session
44/// failures. Not in the IANA registry but recognized by the Spark JS runtime,
45/// which reloads the page on receipt.
46const STATUS_PAGE_EXPIRED: u16 = 419;
47
48/// HTTP 426 "Upgrade Required" — returned when the client's snapshot is in a
49/// newer wire-format version than this build of Spark understands. The
50/// browser refreshing the page will pick up the new asset.
51const STATUS_UPGRADE_REQUIRED: u16 = 426;
52
53/// Per-component-instance revision counter for optimistic concurrency control.
54///
55/// Each entry maps `memo.id` → the latest revision the server has issued.
56/// Bounded by capacity (LRU eviction) and TTL so it doesn't grow unbounded
57/// for long-lived processes. A stale snapshot whose `rev` doesn't match the
58/// stored value is rejected with HTTP 409, which prevents two simultaneous
59/// `/update` POSTs from silently producing a last-write-wins outcome.
60///
61/// **Memory note for the architecture doc:** this is the *only* server-side
62/// per-component state Spark holds between requests. It's a `(String, u64)`
63/// pair per active component id, not a component instance — kilobytes per
64/// 100k actives, not megabytes.
65static REVISION_TRACKER: Lazy<moka::sync::Cache<String, u64>> = Lazy::new(|| {
66    moka::sync::Cache::builder()
67        .max_capacity(50_000)
68        // 24h idle TTL — sized for replay protection. Any envelope replayed
69        // within 24h of last interaction fails the rev check (the tracker
70        // expects rev+1 and the replay carries rev). Beyond 24h the entry
71        // evicts; replayed envelopes for long-abandoned components are
72        // accepted as fresh, which is acceptable since the snapshot's
73        // HMAC still has to validate. Memory budget: ~3 MB at 50k entries.
74        .time_to_idle(Duration::from_secs(60 * 60 * 24))
75        .build()
76});
77
78/// `POST /_spark/update` — decode each component's snapshot, apply property writes,
79/// dispatch the requested method, and return refreshed HTML + new snapshots.
80///
81/// CSRF model:
82///   - If the session layer is installed (typical), the `_token` field on the
83///     request body must match the session-bound CSRF token. Cross-origin
84///     POSTs that replay a leaked snapshot get HTTP 419 + a page reload.
85///   - If no session layer is present, the check is skipped — matching the
86///     pass-through behavior of `anvil_core::middleware::builtin::csrf` so
87///     apps that don't enable sessions aren't forced to think about CSRF.
88pub async fn update(
89    State(container): State<Container>,
90    session: Option<Session>,
91    Json(req): Json<UpdateRequest>,
92) -> Result<Response, Error> {
93    if let Some(session) = session.as_ref() {
94        let expected = session
95            .get::<String>(anvil_core::middleware::builtin::CSRF_SESSION_KEY)
96            .await
97            .map_err(|e| Error::Internal(e.to_string()))?;
98        if let Some(expected) = expected {
99            let submitted = req.csrf_token.as_deref().unwrap_or("");
100            if !crate::const_eq(expected.as_bytes(), submitted.as_bytes()) {
101                tracing::debug!("spark /_spark/update: CSRF token mismatch");
102                let mut resp = Response::new(Body::from("CSRF token mismatch"));
103                *resp.status_mut() =
104                    StatusCode::from_u16(STATUS_PAGE_EXPIRED).unwrap_or(StatusCode::FORBIDDEN);
105                return Ok(resp);
106            }
107        }
108        // No token in session yet → first interaction; the page render will
109        // have minted one. Pass through; the snapshot's HMAC still proves
110        // it came from this app's render path.
111    }
112
113    let (_app_key, encrypt) = crate::render::signing();
114    // Build a rotation-aware keyring for HMAC verification. The encoder still
115    // signs under the active key (first entry) inside `render::rerender`;
116    // verification accepts any key the server is currently holding so apps
117    // can rotate `APP_KEY` without forcing every in-flight client to reload.
118    let keyring_owned = crate::render::keyring();
119    let keyring: Vec<(u8, &str)> = keyring_owned
120        .iter()
121        .map(|(k, v)| (*k, v.as_str()))
122        .collect();
123    let mut out = UpdateResponse {
124        components: Vec::with_capacity(req.components.len()),
125    };
126
127    for comp in req.components {
128        // Decode phase — verify HMAC + parse the envelope. Version mismatches
129        // (snapshot from a newer client/asset than this server understands)
130        // get a dedicated 426 response so the browser knows to refresh, not a
131        // generic 4xx that looks like a bug.
132        let decode_started = std::time::Instant::now();
133        let envelope = match snapshot::decode_with_keys(&comp.snapshot, &keyring) {
134            Ok(env) => env,
135            Err(crate::Error::SnapshotVersionMismatch { client_v, server_v }) => {
136                tracing::info!(
137                    client_v,
138                    server_v,
139                    "spark /_spark/update: client snapshot is from a newer build; sending 426"
140                );
141                let mut resp = Response::new(Body::from(format!(
142                    "snapshot v{client_v} is newer than this server understands (v{server_v}) — refresh the page"
143                )));
144                *resp.status_mut() =
145                    StatusCode::from_u16(STATUS_UPGRADE_REQUIRED).unwrap_or(StatusCode::CONFLICT);
146                return Ok(resp);
147            }
148            Err(e) => return Err(Error::from(e)),
149        };
150        let decode_us = decode_started.elapsed().as_micros() as u64;
151
152        // Span covers the whole per-component lifecycle so a request with N
153        // components produces N child spans, each annotated with the component
154        // class, id, revision, and per-phase timings. Production apps tail
155        // `RUST_LOG=spark=info` to see per-interaction latency without adding
156        // any external dependency.
157        let span = tracing::info_span!(
158            "spark.update",
159            component = %envelope.memo.class,
160            id = %envelope.memo.id,
161            rev = envelope.memo.rev,
162            decode_us,
163            // Filled in below as each phase completes.
164            dispatch_us = tracing::field::Empty,
165            render_us = tracing::field::Empty,
166            encode_us = tracing::field::Empty,
167        );
168        let _span_guard = span.enter();
169
170        let entry = registry::resolve(&envelope.memo.class).map_err(Error::from)?;
171        let mut boxed = (entry.load)(&envelope.data).map_err(Error::from)?;
172
173        // Optimistic concurrency check: the snapshot's `rev` must match the
174        // last one this server issued for `memo.id`. A bootstrap miss (no
175        // entry yet) is accepted at rev 0, which lets older snapshots from
176        // before this change deserialize cleanly. The tracker is bumped to
177        // `rev + 1` on success; the new value is written into the next memo
178        // so the client echoes it back on its next interaction.
179        let expected_rev = REVISION_TRACKER.get(&envelope.memo.id).unwrap_or(0);
180        if envelope.memo.rev != expected_rev {
181            tracing::debug!(
182                server_rev = expected_rev,
183                client_rev = envelope.memo.rev,
184                "spark /_spark/update: stale snapshot rejected"
185            );
186            return Err(Error::from(crate::Error::SnapshotStale {
187                server_rev: expected_rev,
188                client_rev: envelope.memo.rev,
189            }));
190        }
191        let next_rev = expected_rev.saturating_add(1);
192        REVISION_TRACKER.insert(envelope.memo.id.clone(), next_rev);
193
194        let mut ctx = Ctx::new(Some(container.clone()));
195        let dispatch_started = std::time::Instant::now();
196
197        if !comp.updates.is_empty() {
198            boxed
199                .state
200                .apply_writes(&comp.updates, &mut ctx)
201                .await
202                .map_err(Error::from)?;
203        }
204
205        let mut requested_island: Option<String> = None;
206        for call in comp.calls {
207            ctx.island = call.island.clone();
208            let method = call.method.clone();
209            match boxed
210                .state
211                .dispatch_call(&method, call.params, &mut ctx)
212                .await
213            {
214                Ok(()) => {}
215                Err(spark_err) => {
216                    // User-shaped errors (validation, missing method, bad args)
217                    // surface as `ctx.errors` entries so the JS runtime can
218                    // render the message inline — same channel form-request
219                    // validation uses — instead of collapsing to HTTP 500.
220                    // System-shaped errors (IO, serde, template) still bail
221                    // so the operator sees them as 5xx and traces fire.
222                    if is_user_facing(&spark_err) {
223                        tracing::debug!(
224                            method,
225                            error = %spark_err,
226                            "spark action returned user-facing error; surfacing via Effects.errors"
227                        );
228                        ctx.errors
229                            .entry(format!("action:{method}"))
230                            .or_default()
231                            .push(spark_err.to_string());
232                    } else {
233                        return Err(Error::from(spark_err));
234                    }
235                }
236            }
237            if let Some(island) = ctx.island.take() {
238                requested_island = Some(island);
239            }
240        }
241        span.record("dispatch_us", dispatch_started.elapsed().as_micros() as u64);
242
243        // Build the next snapshot from the current state.
244        let render_started = std::time::Instant::now();
245        let next_memo = Memo {
246            id: envelope.memo.id.clone(),
247            class: envelope.memo.class.clone(),
248            view: envelope.memo.view.clone(),
249            listeners: (entry.listeners)(),
250            errors: if ctx.errors.is_empty() {
251                None
252            } else {
253                Some(serde_json::to_value(&ctx.errors).unwrap_or(serde_json::Value::Null))
254            },
255            rev: next_rev,
256        };
257        let (html, wire) = crate::render::rerender(&boxed, &next_memo).map_err(Error::from)?;
258        span.record("render_us", render_started.elapsed().as_micros() as u64);
259        let encode_started = std::time::Instant::now();
260        let full_html = crate::render::wrap_rerender(&html, &next_memo, &wire);
261        span.record("encode_us", encode_started.elapsed().as_micros() as u64);
262
263        // Snapshot-size telemetry — warn before the 64 KB hard cap so ops
264        // can spot bloat without waiting for a hard failure in prod.
265        const SNAPSHOT_WARN_BYTES: usize = 32 * 1024;
266        if wire.len() > SNAPSHOT_WARN_BYTES {
267            tracing::warn!(
268                size = wire.len(),
269                limit = 64 * 1024,
270                "spark: snapshot is approaching the 64 KB hard cap — consider trimming \
271                 component state or moving heavy fields to a backing DB row"
272            );
273        }
274
275        let islands = if let Some(island_name) = requested_island.as_deref() {
276            if let Some(island_html) = morph::slice_island(&full_html, island_name) {
277                vec![IslandHtml {
278                    name: island_name.to_string(),
279                    html: island_html,
280                }]
281            } else {
282                Vec::new()
283            }
284        } else {
285            Vec::new()
286        };
287
288        let effects = Effects {
289            dispatched: std::mem::take(&mut ctx.dispatched),
290            emitted: std::mem::take(&mut ctx.emitted),
291            redirect: ctx.redirect.clone(),
292            errors: std::mem::take(&mut ctx.errors)
293                .into_iter()
294                .collect::<HashMap<_, _>>(),
295            islands,
296        };
297
298        out.components.push(ComponentResult {
299            snapshot: wire,
300            html: full_html,
301            effects,
302        });
303    }
304
305    let _ = encrypt; // already applied inside snapshot::encode via `signing()`.
306    Ok(Json(out).into_response())
307}
308
309/// `POST /_spark/auth` — stub auth endpoint for private channels. v1 always
310/// returns 200 with a dummy auth payload. Real authorization lands in v1.1 via
311/// a `SparkAuthorizer` trait.
312pub async fn channel_auth() -> impl IntoResponse {
313    (
314        StatusCode::OK,
315        Json(json!({
316            "auth": "spark:placeholder",
317            "channel_data": null,
318        })),
319    )
320}
321
322/// Classify a Spark dispatch error as "user-facing" (surface inline via
323/// `Effects.errors`, return 200) vs "system-facing" (bubble to HTTP 500
324/// so the operator's pager catches it).
325///
326/// User-facing: bad arguments, unknown methods — things that come from
327/// what the browser sent, not from a server-side defect.
328/// System-facing: IO failures, JSON/serde catastrophes, template errors,
329/// snapshot decode/tamper issues — operator must know.
330fn is_user_facing(err: &crate::Error) -> bool {
331    matches!(
332        err,
333        crate::Error::InvalidArguments { .. } | crate::Error::UnknownMethod { .. }
334    )
335}