Skip to main content

actpub_axum/
inbox.rs

1//! axum router that funnels inbox POSTs into an
2//! [`InboxPipeline`](actpub_federation::InboxPipeline).
3//!
4//! Designed as a pre-built drop-in: the user constructs an
5//! [`InboxPipeline`] (with their fetcher, handler, and federation
6//! config) and hands it to [`inbox_router`]; the result is a fully
7//! wired axum [`Router`] mountable at any prefix.
8//!
9//! # Wire contract
10//!
11//! - `POST <prefix>/inbox` accepts the activity body up to
12//!   [`InboxState::max_body_bytes`] (default 1 MiB) and feeds it,
13//!   together with the request headers, into
14//!   [`InboxPipeline::process`].
15//! - `202 Accepted` is returned on successful verification or on a
16//!   detected duplicate (the latter still 2xx so the sender does not
17//!   retry, matching what every Mastodon-compatible peer expects).
18//! - `400 Bad Request` for body parse / size / signature issues;
19//!   `502 Bad Gateway` when actor resolution fails;
20//!   `500 Internal Server Error` for unexpected server-side errors
21//!   (e.g. a panicking handler).
22
23use std::sync::Arc;
24
25use actpub_federation::{ActivityHandler, Error, Fetcher, InboxPipeline};
26use axum::extract::{Request, State};
27use axum::http::StatusCode;
28use axum::response::IntoResponse;
29use axum::routing::post;
30use axum::{Router, body};
31
32/// Default inbox body cap (1 MiB) — large enough for any realistic
33/// activity, small enough to bound memory under hostile load.
34pub const DEFAULT_MAX_INBOX_BYTES: usize = 1 << 20;
35
36/// Shared state handed to the inbox handler.
37///
38/// Cheap to clone (the pipeline is itself `Arc`-shared internally).
39pub struct InboxState<F: Fetcher, H: ActivityHandler> {
40    pipeline: Arc<InboxPipeline<F, H>>,
41    max_body_bytes: usize,
42}
43
44impl<F, H> InboxState<F, H>
45where
46    F: Fetcher,
47    H: ActivityHandler,
48{
49    /// Wraps `pipeline` in a state object using the
50    /// [`DEFAULT_MAX_INBOX_BYTES`] body cap.
51    #[must_use]
52    pub fn new(pipeline: InboxPipeline<F, H>) -> Self {
53        Self {
54            pipeline: Arc::new(pipeline),
55            max_body_bytes: DEFAULT_MAX_INBOX_BYTES,
56        }
57    }
58
59    /// Overrides the maximum inbox body size.
60    #[must_use]
61    pub const fn with_max_body_bytes(mut self, bytes: usize) -> Self {
62        self.max_body_bytes = bytes;
63        self
64    }
65}
66
67impl<F, H> Clone for InboxState<F, H>
68where
69    F: Fetcher,
70    H: ActivityHandler,
71{
72    fn clone(&self) -> Self {
73        Self {
74            pipeline: Arc::clone(&self.pipeline),
75            max_body_bytes: self.max_body_bytes,
76        }
77    }
78}
79
80impl<F, H> std::fmt::Debug for InboxState<F, H>
81where
82    F: Fetcher,
83    H: ActivityHandler,
84{
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        // `pipeline` does not implement Debug across arbitrary F/H,
87        // so the formatter elides it and surfaces the configurable
88        // body cap which is the only useful operator-facing field.
89        f.debug_struct("InboxState")
90            .field("max_body_bytes", &self.max_body_bytes)
91            .finish_non_exhaustive()
92    }
93}
94
95/// Builds an axum [`Router`] mounted at `/inbox` that POSTs into
96/// the supplied state's pipeline.
97///
98/// Wire example:
99///
100/// ```ignore
101/// use actpub_axum::{inbox_router, InboxState};
102///
103/// let app = axum::Router::new().nest(
104///     "/users/:name",
105///     inbox_router(InboxState::new(my_pipeline)),
106/// );
107/// ```
108pub fn inbox_router<F, H>(state: InboxState<F, H>) -> Router
109where
110    F: Fetcher + 'static,
111    H: ActivityHandler + 'static,
112{
113    Router::new()
114        .route("/inbox", post(handle::<F, H>))
115        .with_state(state)
116}
117
118async fn handle<F, H>(State(state): State<InboxState<F, H>>, request: Request) -> impl IntoResponse
119where
120    F: Fetcher,
121    H: ActivityHandler,
122{
123    let (parts, body) = request.into_parts();
124    let bytes = match body::to_bytes(body, state.max_body_bytes).await {
125        Ok(b) => b,
126        Err(err) => {
127            tracing::warn!(target: "actpub::axum::inbox", %err, "inbox body read failed");
128            return (StatusCode::PAYLOAD_TOO_LARGE, "request body too large").into_response();
129        }
130    };
131
132    match state.pipeline.process(&parts, bytes).await {
133        Ok(_) => StatusCode::ACCEPTED.into_response(),
134        Err(err) => {
135            let status = status_for(&err);
136            tracing::warn!(
137                target: "actpub::axum::inbox",
138                %err,
139                status = status.as_u16(),
140                "inbox processing failed",
141            );
142            (status, err.to_string()).into_response()
143        }
144    }
145}
146
147/// Maps a federation [`Error`] to the wire-appropriate HTTP status
148/// code.
149///
150/// The mapping follows the Mastodon inbox contract as closely as
151/// the federation error type permits:
152///
153/// - **400 Bad Request** for protocol-level defects in the inbound
154///   document that the sender can fix (malformed JSON, bogus
155///   content-type, body over size).
156/// - **401 Unauthorized** when the HTTP signature verifies against
157///   the wrong key or an actor impersonation is detected
158///   ([`Error::SignerKeyMismatch`]). The sender has a key problem
159///   and must re-sign with correct identity binding.
160/// - **403 Forbidden** when [`UrlPolicy`](actpub_federation::UrlPolicy)
161///   rejects a URL (a federation-level refusal, not a sender bug).
162/// - **422 Unprocessable Entity** when the signing actor is missing
163///   the key our verifier needs.
164/// - **502 Bad Gateway** for upstream fetch failures that are not
165///   the sender's fault.
166const fn status_for(err: &Error) -> StatusCode {
167    match err {
168        Error::HttpSig(_)
169        | Error::Json(_)
170        | Error::Cryptosuite(_)
171        | Error::ResponseTooLarge { .. }
172        | Error::UnexpectedContentType { .. } => StatusCode::BAD_REQUEST,
173        Error::SignerKeyMismatch(_) => StatusCode::UNAUTHORIZED,
174        Error::PolicyViolation { .. } => StatusCode::FORBIDDEN,
175        Error::ActorWithoutKey(_) | Error::ActorWithoutInbox(_) => StatusCode::UNPROCESSABLE_ENTITY,
176        Error::Status { .. } | Error::Http(_) | Error::Timeout { .. } => StatusCode::BAD_GATEWAY,
177        // `Error` is `#[non_exhaustive]`; HandlerFailed / InvalidUrl
178        // and any future variants default to 500.
179        _ => StatusCode::INTERNAL_SERVER_ERROR,
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use std::sync::Arc;
186    use std::sync::atomic::{AtomicUsize, Ordering};
187
188    use actpub_federation::{
189        ActivityHandler, Error, FederationConfig, Fetcher, InboxPipeline, UrlPolicy,
190    };
191    use actpub_httpsig::{
192        CavageSigner, SigningKey, content_digest_header_with, sha256_digest_header,
193    };
194    use axum::body::Body;
195    use axum::http::{Method, Request, StatusCode};
196    use serde_json::{Value, json};
197    use tower::ServiceExt;
198
199    use super::*;
200
201    struct FakeFetcher(Value);
202
203    impl Fetcher for FakeFetcher {
204        #[allow(
205            unknown_lints,
206            clippy::unused_async_trait_impl,
207            reason = "trait definition requires async but mock implementation has no await"
208        )]
209        async fn fetch_raw(
210            &self,
211            _url: &url::Url,
212            _ctx: &actpub_federation::FetchContext,
213        ) -> Result<Value, Error> {
214            Ok(self.0.clone())
215        }
216    }
217
218    #[derive(Default)]
219    struct CountHandler(AtomicUsize);
220
221    impl ActivityHandler for CountHandler {
222        type Error = std::convert::Infallible;
223        #[allow(
224            unknown_lints,
225            clippy::unused_async_trait_impl,
226            reason = "trait definition requires async but mock implementation has no await"
227        )]
228        async fn handle(
229            &self,
230            _activity: Value,
231            _actor: Value,
232            _ctx: actpub_federation::FetchContext,
233        ) -> Result<(), Self::Error> {
234            self.0.fetch_add(1, Ordering::SeqCst);
235            Ok(())
236        }
237    }
238
239    fn test_config() -> Arc<FederationConfig> {
240        FederationConfig::builder()
241            .signing_key(SigningKey::generate_ed25519())
242            .key_id("https://test/sender#key".parse().unwrap())
243            .url_policy(UrlPolicy::permissive_for_tests())
244            .cache_capacity(64)
245            .build()
246            .shared()
247    }
248
249    /// Build a signed inbox POST against the wiremock-style server URI
250    /// `recv_uri` plus `path`.
251    fn signed_inbox_post(
252        activity: &Value,
253        recv_uri: &str,
254        path: &str,
255    ) -> (Request<Body>, actpub_httpsig::VerifyingKey) {
256        let body = serde_json::to_vec(activity).unwrap();
257        let key = SigningKey::generate_ed25519();
258        let public = key.verifying_key();
259        let url = format!("{recv_uri}{path}");
260        let mut req = Request::builder()
261            .method(Method::POST)
262            .uri(&url)
263            .header(
264                "host",
265                url::Url::parse(&url).unwrap().host_str().unwrap_or(""),
266            )
267            .header(
268                "date",
269                httpdate::fmt_http_date(std::time::SystemTime::now()),
270            )
271            .header("content-type", "application/activity+json")
272            .header("digest", sha256_digest_header(&body))
273            .header(
274                "content-digest",
275                content_digest_header_with(&body, &[actpub_httpsig::DigestAlgorithm::Sha256]),
276            )
277            .body(body.clone())
278            .unwrap();
279        CavageSigner::new(&key, "https://send.example.com/users/alice#key")
280            .sign(&mut req)
281            .unwrap();
282        let (parts, body_vec) = req.into_parts();
283        let mut axum_req = Request::from_parts(parts, Body::from(body_vec));
284        // Body is already set; ensure Content-Length matches.
285        axum_req
286            .headers_mut()
287            .insert("content-length", body.len().to_string().parse().unwrap());
288        (axum_req, public)
289    }
290
291    #[tokio::test]
292    async fn router_returns_202_for_a_valid_signed_post() {
293        let activity = json!({
294            "id": "https://send.example.com/activities/01",
295            "type": "Create",
296            "actor": "https://send.example.com/users/alice"
297        });
298        let (req, public) = signed_inbox_post(&activity, "https://recv.example.com", "/inbox");
299        let multibase = match &public {
300            actpub_httpsig::VerifyingKey::Ed25519(k) => actpub_httpsig::Multikey::encode_ed25519(k),
301            other => unreachable!("test signs Ed25519, got {other:?}"),
302        };
303        let actor = json!({
304            "id": "https://send.example.com/users/alice",
305            "type": "Person",
306            "assertionMethod": [{
307                "id": "https://send.example.com/users/alice#key",
308                "type": "Multikey",
309                "controller": "https://send.example.com/users/alice",
310                "publicKeyMultibase": multibase,
311            }]
312        });
313        let pipeline =
314            InboxPipeline::new(FakeFetcher(actor), CountHandler::default(), test_config());
315        let app = inbox_router(InboxState::new(pipeline));
316
317        let resp = app.oneshot(req).await.unwrap();
318        assert_eq!(resp.status(), StatusCode::ACCEPTED);
319    }
320
321    #[tokio::test]
322    async fn router_returns_400_for_a_missing_signature() {
323        // Signed request, then strip the Signature header.
324        let activity = json!({ "id": "https://send.example.com/a/2", "type": "Create" });
325        let (mut req, _public) = signed_inbox_post(&activity, "https://recv.example.com", "/inbox");
326        req.headers_mut().remove("signature");
327        let pipeline = InboxPipeline::new(
328            FakeFetcher(json!({"id": "https://send.example.com/users/alice", "type": "Person"})),
329            CountHandler::default(),
330            test_config(),
331        );
332        let app = inbox_router(InboxState::new(pipeline));
333
334        let resp = app.oneshot(req).await.unwrap();
335        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
336    }
337
338    #[tokio::test]
339    async fn router_returns_413_when_body_exceeds_cap() {
340        // 16 KiB body, 1 KiB cap — the body parse must fail before the
341        // pipeline ever runs.
342        let activity_str = format!(
343            "{{\"id\":\"x\",\"type\":\"Note\",\"content\":\"{}\"}}",
344            "x".repeat(16_000),
345        );
346        let req = Request::builder()
347            .method(Method::POST)
348            .uri("https://recv.example.com/inbox")
349            .body(Body::from(activity_str))
350            .unwrap();
351        let pipeline = InboxPipeline::new(
352            FakeFetcher(json!({"id": "x", "type": "Person"})),
353            CountHandler::default(),
354            test_config(),
355        );
356        let app = inbox_router(InboxState::new(pipeline).with_max_body_bytes(1024));
357
358        let resp = app.oneshot(req).await.unwrap();
359        assert_eq!(resp.status(), StatusCode::PAYLOAD_TOO_LARGE);
360    }
361
362    #[tokio::test]
363    async fn router_returns_405_for_get_on_inbox() {
364        let pipeline = InboxPipeline::new(
365            FakeFetcher(json!({"id": "x", "type": "Person"})),
366            CountHandler::default(),
367            test_config(),
368        );
369        let app = inbox_router(InboxState::new(pipeline));
370        let req = Request::builder()
371            .method(Method::GET)
372            .uri("https://recv.example.com/inbox")
373            .body(Body::empty())
374            .unwrap();
375        let resp = app.oneshot(req).await.unwrap();
376        // axum 0.8 returns 405 with a list of allowed methods.
377        assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED);
378    }
379}