Skip to main content

solid_pod_rs_activitypub/
delivery.rs

1//! Background federated-delivery worker.
2//!
3//! Pulls items from `delivery_queue`, signs each POST per
4//! draft-cavage HTTP Signatures (see [`crate::http_sig::sign_request`])
5//! and ships it to the target inbox over HTTPS.
6//!
7//! Retry policy (mirrors `solid_pod_rs::notifications::WebhookChannelManager`):
8//!   * 2xx → drop from queue, mark outbox delivered.
9//!   * 4xx (except 408/429) → permanent failure — drop, log.
10//!   * 5xx/408/429/network → exponential backoff (30s, 2m, 10m, 1h, 6h, 24h).
11//!
12//! The worker is cooperative: it polls the queue on a tick. Consumers
13//! can also trigger a one-shot tick via [`DeliveryWorker::drain_once`]
14//! which is how the test-suite exercises the retry logic.
15
16use std::sync::Arc;
17use std::time::Duration;
18
19use crate::{
20    http_sig::{sign_request, OutboundRequest},
21    store::Store,
22};
23
24/// Retry backoff schedule in seconds. Index = attempt count prior to
25/// this attempt; we cap at the final step.
26const BACKOFF_SECONDS: &[i64] = &[30, 120, 600, 3_600, 21_600, 86_400];
27
28/// Maximum attempts before we give up and drop the queue entry.
29const MAX_ATTEMPTS: i64 = BACKOFF_SECONDS.len() as i64;
30
31/// Outcome of a single delivery attempt — exposed for tests + metrics.
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub enum DeliveryOutcome {
34    Delivered,
35    /// Permanent — no more retries. Either 4xx or max attempts.
36    Dropped,
37    /// Transient — will retry after `next_retry_secs`.
38    Rescheduled { next_retry_secs: i64 },
39    /// Nothing was due.
40    Idle,
41}
42
43/// Static delivery config — the pod's signing key and Actor key_id
44/// (which is published in the Actor document).
45#[derive(Clone)]
46pub struct DeliveryConfig {
47    pub private_key_pem: String,
48    pub key_id: String,
49}
50
51/// Background worker. Hold an `Arc<DeliveryWorker>` to share between
52/// the HTTP task and any admin endpoints that manually flush the
53/// queue.
54pub struct DeliveryWorker {
55    store: Store,
56    config: DeliveryConfig,
57    http: reqwest::Client,
58}
59
60impl DeliveryWorker {
61    pub fn new(store: Store, config: DeliveryConfig) -> Self {
62        Self {
63            store,
64            config,
65            http: reqwest::Client::builder()
66                .user_agent("solid-pod-rs-activitypub/0.4.0")
67                .timeout(Duration::from_secs(30))
68                .build()
69                .expect("reqwest client builds"),
70        }
71    }
72
73    /// Pop the next due delivery (if any) and attempt it exactly once.
74    /// Returns the outcome so tests and observability surfaces can
75    /// assert on the transition.
76    pub async fn drain_once(&self) -> Result<DeliveryOutcome, crate::error::OutboxError> {
77        let Some(item) = self.store.next_due_delivery().await? else {
78            return Ok(DeliveryOutcome::Idle);
79        };
80        let Some(activity) = self.store.load_activity(&item.activity_id).await? else {
81            // Orphaned queue row — the activity is gone. Drop.
82            self.store.drop_delivery(item.queue_id).await?;
83            return Ok(DeliveryOutcome::Dropped);
84        };
85
86        let body =
87            serde_json::to_vec(&activity).map_err(|e| crate::error::OutboxError::Delivery(e.to_string()))?;
88        let mut req = OutboundRequest {
89            method: "POST".into(),
90            url: item.inbox_url.clone(),
91            headers: vec![(
92                "Content-Type".into(),
93                "application/activity+json".into(),
94            )],
95            body,
96        };
97        sign_request(&mut req, &self.config.private_key_pem, &self.config.key_id)?;
98
99        let request = self.http.post(&req.url);
100        let request = req
101            .headers
102            .iter()
103            .fold(request, |b, (k, v)| b.header(k, v))
104            .body(req.body.clone());
105
106        match request.send().await {
107            Ok(resp) => {
108                let status = resp.status();
109                if status.is_success() {
110                    self.store.drop_delivery(item.queue_id).await?;
111                    self.store
112                        .mark_outbox_state(&item.activity_id, "delivered")
113                        .await?;
114                    Ok(DeliveryOutcome::Delivered)
115                } else if status.is_client_error()
116                    && status.as_u16() != 408
117                    && status.as_u16() != 429
118                {
119                    self.store.drop_delivery(item.queue_id).await?;
120                    self.store
121                        .mark_outbox_state(&item.activity_id, "failed")
122                        .await?;
123                    Ok(DeliveryOutcome::Dropped)
124                } else {
125                    let next_attempt = item.attempts + 1;
126                    if next_attempt >= MAX_ATTEMPTS {
127                        self.store.drop_delivery(item.queue_id).await?;
128                        self.store
129                            .mark_outbox_state(&item.activity_id, "failed")
130                            .await?;
131                        return Ok(DeliveryOutcome::Dropped);
132                    }
133                    let idx = item.attempts.max(0) as usize;
134                    let delay = BACKOFF_SECONDS[idx.min(BACKOFF_SECONDS.len() - 1)];
135                    self.store
136                        .reschedule_delivery(
137                            item.queue_id,
138                            delay,
139                            &format!("HTTP {}", status.as_u16()),
140                        )
141                        .await?;
142                    Ok(DeliveryOutcome::Rescheduled {
143                        next_retry_secs: delay,
144                    })
145                }
146            }
147            Err(e) => {
148                let next_attempt = item.attempts + 1;
149                if next_attempt >= MAX_ATTEMPTS {
150                    self.store.drop_delivery(item.queue_id).await?;
151                    self.store
152                        .mark_outbox_state(&item.activity_id, "failed")
153                        .await?;
154                    return Ok(DeliveryOutcome::Dropped);
155                }
156                let idx = item.attempts.max(0) as usize;
157                let delay = BACKOFF_SECONDS[idx.min(BACKOFF_SECONDS.len() - 1)];
158                self.store
159                    .reschedule_delivery(item.queue_id, delay, &e.to_string())
160                    .await?;
161                Ok(DeliveryOutcome::Rescheduled {
162                    next_retry_secs: delay,
163                })
164            }
165        }
166    }
167
168    /// Enqueue delivery of `activity_id` to an explicit list of inbox
169    /// URLs. This is the fan-out entry point used by outbox POST and
170    /// matches the JSS v0.0.67 `deliverToFollowers` pattern.
171    ///
172    /// Returns the number of inboxes enqueued.
173    pub async fn enqueue_to_inboxes(
174        &self,
175        activity_id: &str,
176        inboxes: &[String],
177    ) -> Result<usize, crate::error::OutboxError> {
178        for inbox in inboxes {
179            self.store
180                .enqueue_delivery(activity_id, inbox)
181                .await
182                .map_err(crate::error::OutboxError::Storage)?;
183        }
184        Ok(inboxes.len())
185    }
186
187    /// Long-running poller. Ticks every `tick` and calls
188    /// [`Self::drain_once`] until the queue is empty, then sleeps.
189    pub async fn run(self: Arc<Self>, tick: Duration) {
190        loop {
191            loop {
192                match self.drain_once().await {
193                    Ok(DeliveryOutcome::Idle) => break,
194                    Ok(_) => continue,
195                    Err(e) => {
196                        tracing::warn!(error = %e, "delivery worker tick failed");
197                        break;
198                    }
199                }
200            }
201            tokio::time::sleep(tick).await;
202        }
203    }
204}
205
206// ---------------------------------------------------------------------------
207// Tests
208// ---------------------------------------------------------------------------
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213    use crate::actor::{generate_actor_keypair, render_actor};
214    use crate::outbox::handle_outbox;
215    use wiremock::matchers::{method, path};
216    use wiremock::{Mock, MockServer, ResponseTemplate};
217
218    async fn scaffold() -> (Store, DeliveryConfig) {
219        let store = Store::in_memory().await.unwrap();
220        let (priv_pem, _pub_pem) = generate_actor_keypair().unwrap();
221        let config = DeliveryConfig {
222            private_key_pem: priv_pem,
223            key_id: "https://pod.example/profile/card.jsonld#main-key".into(),
224        };
225        (store, config)
226    }
227
228    #[tokio::test]
229    async fn delivery_succeeds_and_drops_queue_item() {
230        let (store, config) = scaffold().await;
231        let actor = render_actor("https://pod.example", "me", "Me", None, "PEM");
232        let server = MockServer::start().await;
233        Mock::given(method("POST"))
234            .and(path("/inbox"))
235            .respond_with(ResponseTemplate::new(202))
236            .expect(1)
237            .mount(&server)
238            .await;
239
240        let inbox_url = format!("{}/inbox", server.uri());
241        store
242            .add_follower(&actor.id, "follower-a", Some(&inbox_url))
243            .await
244            .unwrap();
245        handle_outbox(
246            &store,
247            &actor,
248            serde_json::json!({
249                "type": "Create",
250                "object": {"type": "Note", "content": "hi"}
251            }),
252        )
253        .await
254        .unwrap();
255
256        let worker = DeliveryWorker::new(store.clone(), config);
257        let outcome = worker.drain_once().await.unwrap();
258        assert_eq!(outcome, DeliveryOutcome::Delivered);
259        assert_eq!(
260            worker.drain_once().await.unwrap(),
261            DeliveryOutcome::Idle
262        );
263    }
264
265    #[tokio::test]
266    async fn delivery_retries_on_5xx() {
267        let (store, config) = scaffold().await;
268        let actor = render_actor("https://pod.example", "me", "Me", None, "PEM");
269        let server = MockServer::start().await;
270        Mock::given(method("POST"))
271            .and(path("/inbox"))
272            .respond_with(ResponseTemplate::new(503))
273            .expect(1)
274            .mount(&server)
275            .await;
276
277        let inbox_url = format!("{}/inbox", server.uri());
278        store
279            .add_follower(&actor.id, "fa", Some(&inbox_url))
280            .await
281            .unwrap();
282        handle_outbox(
283            &store,
284            &actor,
285            serde_json::json!({"type": "Create", "object": {"type": "Note", "content": "x"}}),
286        )
287        .await
288        .unwrap();
289
290        let worker = DeliveryWorker::new(store.clone(), config);
291        match worker.drain_once().await.unwrap() {
292            DeliveryOutcome::Rescheduled { next_retry_secs } => {
293                assert_eq!(next_retry_secs, BACKOFF_SECONDS[0]);
294            }
295            other => panic!("expected Rescheduled, got {other:?}"),
296        }
297    }
298
299    #[tokio::test]
300    async fn delivery_drops_on_4xx() {
301        let (store, config) = scaffold().await;
302        let actor = render_actor("https://pod.example", "me", "Me", None, "PEM");
303        let server = MockServer::start().await;
304        Mock::given(method("POST"))
305            .and(path("/inbox"))
306            .respond_with(ResponseTemplate::new(403))
307            .expect(1)
308            .mount(&server)
309            .await;
310
311        let inbox_url = format!("{}/inbox", server.uri());
312        store
313            .add_follower(&actor.id, "fa", Some(&inbox_url))
314            .await
315            .unwrap();
316        handle_outbox(
317            &store,
318            &actor,
319            serde_json::json!({"type": "Create", "object": {"type": "Note", "content": "x"}}),
320        )
321        .await
322        .unwrap();
323
324        let worker = DeliveryWorker::new(store.clone(), config);
325        assert_eq!(worker.drain_once().await.unwrap(), DeliveryOutcome::Dropped);
326    }
327
328    #[tokio::test]
329    async fn delivery_idle_when_queue_empty() {
330        let (store, config) = scaffold().await;
331        let worker = DeliveryWorker::new(store, config);
332        assert_eq!(worker.drain_once().await.unwrap(), DeliveryOutcome::Idle);
333    }
334}