use std::sync::Arc;
use std::time::Duration;
use crate::{
http_sig::{sign_request, OutboundRequest},
store::Store,
};
const BACKOFF_SECONDS: &[i64] = &[30, 120, 600, 3_600, 21_600, 86_400];
const MAX_ATTEMPTS: i64 = BACKOFF_SECONDS.len() as i64;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DeliveryOutcome {
Delivered,
Dropped,
Rescheduled { next_retry_secs: i64 },
Idle,
}
#[derive(Clone)]
pub struct DeliveryConfig {
pub private_key_pem: String,
pub key_id: String,
}
pub struct DeliveryWorker {
store: Store,
config: DeliveryConfig,
http: reqwest::Client,
}
impl DeliveryWorker {
pub fn new(store: Store, config: DeliveryConfig) -> Self {
Self {
store,
config,
http: reqwest::Client::builder()
.user_agent("solid-pod-rs-activitypub/0.4.0")
.timeout(Duration::from_secs(30))
.build()
.expect("reqwest client builds"),
}
}
pub async fn drain_once(&self) -> Result<DeliveryOutcome, crate::error::OutboxError> {
let Some(item) = self.store.next_due_delivery().await? else {
return Ok(DeliveryOutcome::Idle);
};
let Some(activity) = self.store.load_activity(&item.activity_id).await? else {
self.store.drop_delivery(item.queue_id).await?;
return Ok(DeliveryOutcome::Dropped);
};
let body =
serde_json::to_vec(&activity).map_err(|e| crate::error::OutboxError::Delivery(e.to_string()))?;
let mut req = OutboundRequest {
method: "POST".into(),
url: item.inbox_url.clone(),
headers: vec![(
"Content-Type".into(),
"application/activity+json".into(),
)],
body,
};
sign_request(&mut req, &self.config.private_key_pem, &self.config.key_id)?;
let request = self.http.post(&req.url);
let request = req
.headers
.iter()
.fold(request, |b, (k, v)| b.header(k, v))
.body(req.body.clone());
match request.send().await {
Ok(resp) => {
let status = resp.status();
if status.is_success() {
self.store.drop_delivery(item.queue_id).await?;
self.store
.mark_outbox_state(&item.activity_id, "delivered")
.await?;
Ok(DeliveryOutcome::Delivered)
} else if status.is_client_error()
&& status.as_u16() != 408
&& status.as_u16() != 429
{
self.store.drop_delivery(item.queue_id).await?;
self.store
.mark_outbox_state(&item.activity_id, "failed")
.await?;
Ok(DeliveryOutcome::Dropped)
} else {
let next_attempt = item.attempts + 1;
if next_attempt >= MAX_ATTEMPTS {
self.store.drop_delivery(item.queue_id).await?;
self.store
.mark_outbox_state(&item.activity_id, "failed")
.await?;
return Ok(DeliveryOutcome::Dropped);
}
let idx = item.attempts.max(0) as usize;
let delay = BACKOFF_SECONDS[idx.min(BACKOFF_SECONDS.len() - 1)];
self.store
.reschedule_delivery(
item.queue_id,
delay,
&format!("HTTP {}", status.as_u16()),
)
.await?;
Ok(DeliveryOutcome::Rescheduled {
next_retry_secs: delay,
})
}
}
Err(e) => {
let next_attempt = item.attempts + 1;
if next_attempt >= MAX_ATTEMPTS {
self.store.drop_delivery(item.queue_id).await?;
self.store
.mark_outbox_state(&item.activity_id, "failed")
.await?;
return Ok(DeliveryOutcome::Dropped);
}
let idx = item.attempts.max(0) as usize;
let delay = BACKOFF_SECONDS[idx.min(BACKOFF_SECONDS.len() - 1)];
self.store
.reschedule_delivery(item.queue_id, delay, &e.to_string())
.await?;
Ok(DeliveryOutcome::Rescheduled {
next_retry_secs: delay,
})
}
}
}
pub async fn enqueue_to_inboxes(
&self,
activity_id: &str,
inboxes: &[String],
) -> Result<usize, crate::error::OutboxError> {
for inbox in inboxes {
self.store
.enqueue_delivery(activity_id, inbox)
.await
.map_err(crate::error::OutboxError::Storage)?;
}
Ok(inboxes.len())
}
pub async fn run(self: Arc<Self>, tick: Duration) {
loop {
loop {
match self.drain_once().await {
Ok(DeliveryOutcome::Idle) => break,
Ok(_) => continue,
Err(e) => {
tracing::warn!(error = %e, "delivery worker tick failed");
break;
}
}
}
tokio::time::sleep(tick).await;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::actor::{generate_actor_keypair, render_actor};
use crate::outbox::handle_outbox;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
async fn scaffold() -> (Store, DeliveryConfig) {
let store = Store::in_memory().await.unwrap();
let (priv_pem, _pub_pem) = generate_actor_keypair().unwrap();
let config = DeliveryConfig {
private_key_pem: priv_pem,
key_id: "https://pod.example/profile/card.jsonld#main-key".into(),
};
(store, config)
}
#[tokio::test]
async fn delivery_succeeds_and_drops_queue_item() {
let (store, config) = scaffold().await;
let actor = render_actor("https://pod.example", "me", "Me", None, "PEM");
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/inbox"))
.respond_with(ResponseTemplate::new(202))
.expect(1)
.mount(&server)
.await;
let inbox_url = format!("{}/inbox", server.uri());
store
.add_follower(&actor.id, "follower-a", Some(&inbox_url))
.await
.unwrap();
handle_outbox(
&store,
&actor,
serde_json::json!({
"type": "Create",
"object": {"type": "Note", "content": "hi"}
}),
)
.await
.unwrap();
let worker = DeliveryWorker::new(store.clone(), config);
let outcome = worker.drain_once().await.unwrap();
assert_eq!(outcome, DeliveryOutcome::Delivered);
assert_eq!(
worker.drain_once().await.unwrap(),
DeliveryOutcome::Idle
);
}
#[tokio::test]
async fn delivery_retries_on_5xx() {
let (store, config) = scaffold().await;
let actor = render_actor("https://pod.example", "me", "Me", None, "PEM");
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/inbox"))
.respond_with(ResponseTemplate::new(503))
.expect(1)
.mount(&server)
.await;
let inbox_url = format!("{}/inbox", server.uri());
store
.add_follower(&actor.id, "fa", Some(&inbox_url))
.await
.unwrap();
handle_outbox(
&store,
&actor,
serde_json::json!({"type": "Create", "object": {"type": "Note", "content": "x"}}),
)
.await
.unwrap();
let worker = DeliveryWorker::new(store.clone(), config);
match worker.drain_once().await.unwrap() {
DeliveryOutcome::Rescheduled { next_retry_secs } => {
assert_eq!(next_retry_secs, BACKOFF_SECONDS[0]);
}
other => panic!("expected Rescheduled, got {other:?}"),
}
}
#[tokio::test]
async fn delivery_drops_on_4xx() {
let (store, config) = scaffold().await;
let actor = render_actor("https://pod.example", "me", "Me", None, "PEM");
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/inbox"))
.respond_with(ResponseTemplate::new(403))
.expect(1)
.mount(&server)
.await;
let inbox_url = format!("{}/inbox", server.uri());
store
.add_follower(&actor.id, "fa", Some(&inbox_url))
.await
.unwrap();
handle_outbox(
&store,
&actor,
serde_json::json!({"type": "Create", "object": {"type": "Note", "content": "x"}}),
)
.await
.unwrap();
let worker = DeliveryWorker::new(store.clone(), config);
assert_eq!(worker.drain_once().await.unwrap(), DeliveryOutcome::Dropped);
}
#[tokio::test]
async fn delivery_idle_when_queue_empty() {
let (store, config) = scaffold().await;
let worker = DeliveryWorker::new(store, config);
assert_eq!(worker.drain_once().await.unwrap(), DeliveryOutcome::Idle);
}
}