solid_pod_rs_activitypub/
delivery.rs1use std::sync::Arc;
17use std::time::Duration;
18
19use crate::{
20 http_sig::{sign_request, OutboundRequest},
21 store::Store,
22};
23
24const BACKOFF_SECONDS: &[i64] = &[30, 120, 600, 3_600, 21_600, 86_400];
27
28const MAX_ATTEMPTS: i64 = BACKOFF_SECONDS.len() as i64;
30
31#[derive(Debug, Clone, PartialEq, Eq)]
33pub enum DeliveryOutcome {
34 Delivered,
35 Dropped,
37 Rescheduled { next_retry_secs: i64 },
39 Idle,
41}
42
43#[derive(Clone)]
46pub struct DeliveryConfig {
47 pub private_key_pem: String,
48 pub key_id: String,
49}
50
51pub struct DeliveryWorker {
55 store: Store,
56 config: DeliveryConfig,
57 http: reqwest::Client,
58}
59
60impl DeliveryWorker {
61 pub fn new(store: Store, config: DeliveryConfig) -> Self {
62 Self {
63 store,
64 config,
65 http: reqwest::Client::builder()
66 .user_agent("solid-pod-rs-activitypub/0.4.0")
67 .timeout(Duration::from_secs(30))
68 .build()
69 .expect("reqwest client builds"),
70 }
71 }
72
73 pub async fn drain_once(&self) -> Result<DeliveryOutcome, crate::error::OutboxError> {
77 let Some(item) = self.store.next_due_delivery().await? else {
78 return Ok(DeliveryOutcome::Idle);
79 };
80 let Some(activity) = self.store.load_activity(&item.activity_id).await? else {
81 self.store.drop_delivery(item.queue_id).await?;
83 return Ok(DeliveryOutcome::Dropped);
84 };
85
86 let body =
87 serde_json::to_vec(&activity).map_err(|e| crate::error::OutboxError::Delivery(e.to_string()))?;
88 let mut req = OutboundRequest {
89 method: "POST".into(),
90 url: item.inbox_url.clone(),
91 headers: vec![(
92 "Content-Type".into(),
93 "application/activity+json".into(),
94 )],
95 body,
96 };
97 sign_request(&mut req, &self.config.private_key_pem, &self.config.key_id)?;
98
99 let request = self.http.post(&req.url);
100 let request = req
101 .headers
102 .iter()
103 .fold(request, |b, (k, v)| b.header(k, v))
104 .body(req.body.clone());
105
106 match request.send().await {
107 Ok(resp) => {
108 let status = resp.status();
109 if status.is_success() {
110 self.store.drop_delivery(item.queue_id).await?;
111 self.store
112 .mark_outbox_state(&item.activity_id, "delivered")
113 .await?;
114 Ok(DeliveryOutcome::Delivered)
115 } else if status.is_client_error()
116 && status.as_u16() != 408
117 && status.as_u16() != 429
118 {
119 self.store.drop_delivery(item.queue_id).await?;
120 self.store
121 .mark_outbox_state(&item.activity_id, "failed")
122 .await?;
123 Ok(DeliveryOutcome::Dropped)
124 } else {
125 let next_attempt = item.attempts + 1;
126 if next_attempt >= MAX_ATTEMPTS {
127 self.store.drop_delivery(item.queue_id).await?;
128 self.store
129 .mark_outbox_state(&item.activity_id, "failed")
130 .await?;
131 return Ok(DeliveryOutcome::Dropped);
132 }
133 let idx = item.attempts.max(0) as usize;
134 let delay = BACKOFF_SECONDS[idx.min(BACKOFF_SECONDS.len() - 1)];
135 self.store
136 .reschedule_delivery(
137 item.queue_id,
138 delay,
139 &format!("HTTP {}", status.as_u16()),
140 )
141 .await?;
142 Ok(DeliveryOutcome::Rescheduled {
143 next_retry_secs: delay,
144 })
145 }
146 }
147 Err(e) => {
148 let next_attempt = item.attempts + 1;
149 if next_attempt >= MAX_ATTEMPTS {
150 self.store.drop_delivery(item.queue_id).await?;
151 self.store
152 .mark_outbox_state(&item.activity_id, "failed")
153 .await?;
154 return Ok(DeliveryOutcome::Dropped);
155 }
156 let idx = item.attempts.max(0) as usize;
157 let delay = BACKOFF_SECONDS[idx.min(BACKOFF_SECONDS.len() - 1)];
158 self.store
159 .reschedule_delivery(item.queue_id, delay, &e.to_string())
160 .await?;
161 Ok(DeliveryOutcome::Rescheduled {
162 next_retry_secs: delay,
163 })
164 }
165 }
166 }
167
168 pub async fn enqueue_to_inboxes(
174 &self,
175 activity_id: &str,
176 inboxes: &[String],
177 ) -> Result<usize, crate::error::OutboxError> {
178 for inbox in inboxes {
179 self.store
180 .enqueue_delivery(activity_id, inbox)
181 .await
182 .map_err(crate::error::OutboxError::Storage)?;
183 }
184 Ok(inboxes.len())
185 }
186
187 pub async fn run(self: Arc<Self>, tick: Duration) {
190 loop {
191 loop {
192 match self.drain_once().await {
193 Ok(DeliveryOutcome::Idle) => break,
194 Ok(_) => continue,
195 Err(e) => {
196 tracing::warn!(error = %e, "delivery worker tick failed");
197 break;
198 }
199 }
200 }
201 tokio::time::sleep(tick).await;
202 }
203 }
204}
205
206#[cfg(test)]
211mod tests {
212 use super::*;
213 use crate::actor::{generate_actor_keypair, render_actor};
214 use crate::outbox::handle_outbox;
215 use wiremock::matchers::{method, path};
216 use wiremock::{Mock, MockServer, ResponseTemplate};
217
218 async fn scaffold() -> (Store, DeliveryConfig) {
219 let store = Store::in_memory().await.unwrap();
220 let (priv_pem, _pub_pem) = generate_actor_keypair().unwrap();
221 let config = DeliveryConfig {
222 private_key_pem: priv_pem,
223 key_id: "https://pod.example/profile/card.jsonld#main-key".into(),
224 };
225 (store, config)
226 }
227
228 #[tokio::test]
229 async fn delivery_succeeds_and_drops_queue_item() {
230 let (store, config) = scaffold().await;
231 let actor = render_actor("https://pod.example", "me", "Me", None, "PEM");
232 let server = MockServer::start().await;
233 Mock::given(method("POST"))
234 .and(path("/inbox"))
235 .respond_with(ResponseTemplate::new(202))
236 .expect(1)
237 .mount(&server)
238 .await;
239
240 let inbox_url = format!("{}/inbox", server.uri());
241 store
242 .add_follower(&actor.id, "follower-a", Some(&inbox_url))
243 .await
244 .unwrap();
245 handle_outbox(
246 &store,
247 &actor,
248 serde_json::json!({
249 "type": "Create",
250 "object": {"type": "Note", "content": "hi"}
251 }),
252 )
253 .await
254 .unwrap();
255
256 let worker = DeliveryWorker::new(store.clone(), config);
257 let outcome = worker.drain_once().await.unwrap();
258 assert_eq!(outcome, DeliveryOutcome::Delivered);
259 assert_eq!(
260 worker.drain_once().await.unwrap(),
261 DeliveryOutcome::Idle
262 );
263 }
264
265 #[tokio::test]
266 async fn delivery_retries_on_5xx() {
267 let (store, config) = scaffold().await;
268 let actor = render_actor("https://pod.example", "me", "Me", None, "PEM");
269 let server = MockServer::start().await;
270 Mock::given(method("POST"))
271 .and(path("/inbox"))
272 .respond_with(ResponseTemplate::new(503))
273 .expect(1)
274 .mount(&server)
275 .await;
276
277 let inbox_url = format!("{}/inbox", server.uri());
278 store
279 .add_follower(&actor.id, "fa", Some(&inbox_url))
280 .await
281 .unwrap();
282 handle_outbox(
283 &store,
284 &actor,
285 serde_json::json!({"type": "Create", "object": {"type": "Note", "content": "x"}}),
286 )
287 .await
288 .unwrap();
289
290 let worker = DeliveryWorker::new(store.clone(), config);
291 match worker.drain_once().await.unwrap() {
292 DeliveryOutcome::Rescheduled { next_retry_secs } => {
293 assert_eq!(next_retry_secs, BACKOFF_SECONDS[0]);
294 }
295 other => panic!("expected Rescheduled, got {other:?}"),
296 }
297 }
298
299 #[tokio::test]
300 async fn delivery_drops_on_4xx() {
301 let (store, config) = scaffold().await;
302 let actor = render_actor("https://pod.example", "me", "Me", None, "PEM");
303 let server = MockServer::start().await;
304 Mock::given(method("POST"))
305 .and(path("/inbox"))
306 .respond_with(ResponseTemplate::new(403))
307 .expect(1)
308 .mount(&server)
309 .await;
310
311 let inbox_url = format!("{}/inbox", server.uri());
312 store
313 .add_follower(&actor.id, "fa", Some(&inbox_url))
314 .await
315 .unwrap();
316 handle_outbox(
317 &store,
318 &actor,
319 serde_json::json!({"type": "Create", "object": {"type": "Note", "content": "x"}}),
320 )
321 .await
322 .unwrap();
323
324 let worker = DeliveryWorker::new(store.clone(), config);
325 assert_eq!(worker.drain_once().await.unwrap(), DeliveryOutcome::Dropped);
326 }
327
328 #[tokio::test]
329 async fn delivery_idle_when_queue_empty() {
330 let (store, config) = scaffold().await;
331 let worker = DeliveryWorker::new(store, config);
332 assert_eq!(worker.drain_once().await.unwrap(), DeliveryOutcome::Idle);
333 }
334}