1use std::sync::Arc;
24
25use actpub_federation::{ActivityHandler, Error, Fetcher, InboxPipeline};
26use axum::extract::{Request, State};
27use axum::http::StatusCode;
28use axum::response::IntoResponse;
29use axum::routing::post;
30use axum::{Router, body};
31
32pub const DEFAULT_MAX_INBOX_BYTES: usize = 1 << 20;
35
36pub struct InboxState<F: Fetcher, H: ActivityHandler> {
40 pipeline: Arc<InboxPipeline<F, H>>,
41 max_body_bytes: usize,
42}
43
44impl<F, H> InboxState<F, H>
45where
46 F: Fetcher,
47 H: ActivityHandler,
48{
49 #[must_use]
52 pub fn new(pipeline: InboxPipeline<F, H>) -> Self {
53 Self {
54 pipeline: Arc::new(pipeline),
55 max_body_bytes: DEFAULT_MAX_INBOX_BYTES,
56 }
57 }
58
59 #[must_use]
61 pub const fn with_max_body_bytes(mut self, bytes: usize) -> Self {
62 self.max_body_bytes = bytes;
63 self
64 }
65}
66
67impl<F, H> Clone for InboxState<F, H>
68where
69 F: Fetcher,
70 H: ActivityHandler,
71{
72 fn clone(&self) -> Self {
73 Self {
74 pipeline: Arc::clone(&self.pipeline),
75 max_body_bytes: self.max_body_bytes,
76 }
77 }
78}
79
80impl<F, H> std::fmt::Debug for InboxState<F, H>
81where
82 F: Fetcher,
83 H: ActivityHandler,
84{
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 f.debug_struct("InboxState")
90 .field("max_body_bytes", &self.max_body_bytes)
91 .finish_non_exhaustive()
92 }
93}
94
95pub fn inbox_router<F, H>(state: InboxState<F, H>) -> Router
109where
110 F: Fetcher + 'static,
111 H: ActivityHandler + 'static,
112{
113 Router::new()
114 .route("/inbox", post(handle::<F, H>))
115 .with_state(state)
116}
117
118async fn handle<F, H>(State(state): State<InboxState<F, H>>, request: Request) -> impl IntoResponse
119where
120 F: Fetcher,
121 H: ActivityHandler,
122{
123 let (parts, body) = request.into_parts();
124 let bytes = match body::to_bytes(body, state.max_body_bytes).await {
125 Ok(b) => b,
126 Err(err) => {
127 tracing::warn!(target: "actpub::axum::inbox", %err, "inbox body read failed");
128 return (StatusCode::PAYLOAD_TOO_LARGE, "request body too large").into_response();
129 }
130 };
131
132 match state.pipeline.process(&parts, bytes).await {
133 Ok(_) => StatusCode::ACCEPTED.into_response(),
134 Err(err) => {
135 let status = status_for(&err);
136 tracing::warn!(
137 target: "actpub::axum::inbox",
138 %err,
139 status = status.as_u16(),
140 "inbox processing failed",
141 );
142 (status, err.to_string()).into_response()
143 }
144 }
145}
146
147const fn status_for(err: &Error) -> StatusCode {
167 match err {
168 Error::HttpSig(_)
169 | Error::Json(_)
170 | Error::Cryptosuite(_)
171 | Error::ResponseTooLarge { .. }
172 | Error::UnexpectedContentType { .. } => StatusCode::BAD_REQUEST,
173 Error::SignerKeyMismatch(_) => StatusCode::UNAUTHORIZED,
174 Error::PolicyViolation { .. } => StatusCode::FORBIDDEN,
175 Error::ActorWithoutKey(_) | Error::ActorWithoutInbox(_) => StatusCode::UNPROCESSABLE_ENTITY,
176 Error::Status { .. } | Error::Http(_) | Error::Timeout { .. } => StatusCode::BAD_GATEWAY,
177 _ => StatusCode::INTERNAL_SERVER_ERROR,
180 }
181}
182
183#[cfg(test)]
184mod tests {
185 use std::sync::Arc;
186 use std::sync::atomic::{AtomicUsize, Ordering};
187
188 use actpub_federation::{
189 ActivityHandler, Error, FederationConfig, Fetcher, InboxPipeline, UrlPolicy,
190 };
191 use actpub_httpsig::{
192 CavageSigner, SigningKey, content_digest_header_with, sha256_digest_header,
193 };
194 use axum::body::Body;
195 use axum::http::{Method, Request, StatusCode};
196 use serde_json::{Value, json};
197 use tower::ServiceExt;
198
199 use super::*;
200
201 struct FakeFetcher(Value);
202
203 impl Fetcher for FakeFetcher {
204 #[allow(
205 unknown_lints,
206 clippy::unused_async_trait_impl,
207 reason = "trait definition requires async but mock implementation has no await"
208 )]
209 async fn fetch_raw(
210 &self,
211 _url: &url::Url,
212 _ctx: &actpub_federation::FetchContext,
213 ) -> Result<Value, Error> {
214 Ok(self.0.clone())
215 }
216 }
217
218 #[derive(Default)]
219 struct CountHandler(AtomicUsize);
220
221 impl ActivityHandler for CountHandler {
222 type Error = std::convert::Infallible;
223 #[allow(
224 unknown_lints,
225 clippy::unused_async_trait_impl,
226 reason = "trait definition requires async but mock implementation has no await"
227 )]
228 async fn handle(
229 &self,
230 _activity: Value,
231 _actor: Value,
232 _ctx: actpub_federation::FetchContext,
233 ) -> Result<(), Self::Error> {
234 self.0.fetch_add(1, Ordering::SeqCst);
235 Ok(())
236 }
237 }
238
239 fn test_config() -> Arc<FederationConfig> {
240 FederationConfig::builder()
241 .signing_key(SigningKey::generate_ed25519())
242 .key_id("https://test/sender#key".parse().unwrap())
243 .url_policy(UrlPolicy::permissive_for_tests())
244 .cache_capacity(64)
245 .build()
246 .shared()
247 }
248
249 fn signed_inbox_post(
252 activity: &Value,
253 recv_uri: &str,
254 path: &str,
255 ) -> (Request<Body>, actpub_httpsig::VerifyingKey) {
256 let body = serde_json::to_vec(activity).unwrap();
257 let key = SigningKey::generate_ed25519();
258 let public = key.verifying_key();
259 let url = format!("{recv_uri}{path}");
260 let mut req = Request::builder()
261 .method(Method::POST)
262 .uri(&url)
263 .header(
264 "host",
265 url::Url::parse(&url).unwrap().host_str().unwrap_or(""),
266 )
267 .header(
268 "date",
269 httpdate::fmt_http_date(std::time::SystemTime::now()),
270 )
271 .header("content-type", "application/activity+json")
272 .header("digest", sha256_digest_header(&body))
273 .header(
274 "content-digest",
275 content_digest_header_with(&body, &[actpub_httpsig::DigestAlgorithm::Sha256]),
276 )
277 .body(body.clone())
278 .unwrap();
279 CavageSigner::new(&key, "https://send.example.com/users/alice#key")
280 .sign(&mut req)
281 .unwrap();
282 let (parts, body_vec) = req.into_parts();
283 let mut axum_req = Request::from_parts(parts, Body::from(body_vec));
284 axum_req
286 .headers_mut()
287 .insert("content-length", body.len().to_string().parse().unwrap());
288 (axum_req, public)
289 }
290
291 #[tokio::test]
292 async fn router_returns_202_for_a_valid_signed_post() {
293 let activity = json!({
294 "id": "https://send.example.com/activities/01",
295 "type": "Create",
296 "actor": "https://send.example.com/users/alice"
297 });
298 let (req, public) = signed_inbox_post(&activity, "https://recv.example.com", "/inbox");
299 let multibase = match &public {
300 actpub_httpsig::VerifyingKey::Ed25519(k) => actpub_httpsig::Multikey::encode_ed25519(k),
301 other => unreachable!("test signs Ed25519, got {other:?}"),
302 };
303 let actor = json!({
304 "id": "https://send.example.com/users/alice",
305 "type": "Person",
306 "assertionMethod": [{
307 "id": "https://send.example.com/users/alice#key",
308 "type": "Multikey",
309 "controller": "https://send.example.com/users/alice",
310 "publicKeyMultibase": multibase,
311 }]
312 });
313 let pipeline =
314 InboxPipeline::new(FakeFetcher(actor), CountHandler::default(), test_config());
315 let app = inbox_router(InboxState::new(pipeline));
316
317 let resp = app.oneshot(req).await.unwrap();
318 assert_eq!(resp.status(), StatusCode::ACCEPTED);
319 }
320
321 #[tokio::test]
322 async fn router_returns_400_for_a_missing_signature() {
323 let activity = json!({ "id": "https://send.example.com/a/2", "type": "Create" });
325 let (mut req, _public) = signed_inbox_post(&activity, "https://recv.example.com", "/inbox");
326 req.headers_mut().remove("signature");
327 let pipeline = InboxPipeline::new(
328 FakeFetcher(json!({"id": "https://send.example.com/users/alice", "type": "Person"})),
329 CountHandler::default(),
330 test_config(),
331 );
332 let app = inbox_router(InboxState::new(pipeline));
333
334 let resp = app.oneshot(req).await.unwrap();
335 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
336 }
337
338 #[tokio::test]
339 async fn router_returns_413_when_body_exceeds_cap() {
340 let activity_str = format!(
343 "{{\"id\":\"x\",\"type\":\"Note\",\"content\":\"{}\"}}",
344 "x".repeat(16_000),
345 );
346 let req = Request::builder()
347 .method(Method::POST)
348 .uri("https://recv.example.com/inbox")
349 .body(Body::from(activity_str))
350 .unwrap();
351 let pipeline = InboxPipeline::new(
352 FakeFetcher(json!({"id": "x", "type": "Person"})),
353 CountHandler::default(),
354 test_config(),
355 );
356 let app = inbox_router(InboxState::new(pipeline).with_max_body_bytes(1024));
357
358 let resp = app.oneshot(req).await.unwrap();
359 assert_eq!(resp.status(), StatusCode::PAYLOAD_TOO_LARGE);
360 }
361
362 #[tokio::test]
363 async fn router_returns_405_for_get_on_inbox() {
364 let pipeline = InboxPipeline::new(
365 FakeFetcher(json!({"id": "x", "type": "Person"})),
366 CountHandler::default(),
367 test_config(),
368 );
369 let app = inbox_router(InboxState::new(pipeline));
370 let req = Request::builder()
371 .method(Method::GET)
372 .uri("https://recv.example.com/inbox")
373 .body(Body::empty())
374 .unwrap();
375 let resp = app.oneshot(req).await.unwrap();
376 assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED);
378 }
379}