Skip to main content

solid_pod_rs_activitypub/
outbox.rs

1//! Outbox handler — persists a new activity and queues federated
2//! delivery to followers.
3//!
4//! JSS parity: mirrors `src/ap/routes/outbox.js`. The Rust version
5//! separates "record activity" (synchronous, durable) from "deliver to
6//! follower inboxes" (async via [`crate::delivery`]). JSS uses
7//! `Promise.allSettled` inline; we queue with retry so restarts don't
8//! drop signed deliveries.
9
10use chrono::Utc;
11use serde::{Deserialize, Serialize};
12
13use crate::{
14    actor::Actor,
15    error::OutboxError,
16    store::Store,
17};
18
19/// Result of submitting an activity to the outbox.
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct OutboundDelivery {
22    pub activity_id: String,
23    /// Number of follower inboxes the activity was queued for.
24    pub queued_inboxes: usize,
25    /// The canonical activity (with `id` filled in if the caller left
26    /// it blank).
27    pub activity: serde_json::Value,
28}
29
30/// Submit an activity to the outbox. The caller already constructed a
31/// full ActivityPub activity document (e.g. `Create`, `Follow`,
32/// `Delete`). This function:
33///
34/// 1. Stamps a UUID `id` if missing.
35/// 2. Persists the activity in the outbox table.
36/// 3. Enqueues a signed delivery per follower inbox.
37pub async fn handle_outbox(
38    store: &Store,
39    actor: &Actor,
40    activity: serde_json::Value,
41) -> Result<OutboundDelivery, OutboxError> {
42    let activity_type = activity
43        .get("type")
44        .and_then(|v| v.as_str())
45        .ok_or_else(|| OutboxError::InvalidActivity("missing type".into()))?
46        .to_string();
47
48    // Ensure id is present; generate one otherwise.
49    let mut activity = activity;
50    if activity
51        .get("id")
52        .and_then(|v| v.as_str())
53        .map(|s| s.is_empty())
54        .unwrap_or(true)
55    {
56        let base = actor.id.trim_end_matches("#me");
57        let fresh_id = format!("{base}/activities/{}", uuid::Uuid::new_v4());
58        activity["id"] = serde_json::Value::String(fresh_id);
59    }
60
61    // Ensure actor field is present and matches.
62    if activity.get("actor").and_then(|v| v.as_str()).is_none() {
63        activity["actor"] = serde_json::Value::String(actor.id.clone());
64    }
65
66    let activity_id = store.record_outbox(&actor.id, &activity).await?;
67
68    // Figure out delivery targets. For `Create` + `Announce` + `Update`
69    // + `Delete` we broadcast to followers; for `Follow` we deliver to
70    // the target's inbox (pulled from activity.object.inbox if
71    // pre-hydrated, else 0 — the caller is expected to hydrate via
72    // their resolver prior to calling).
73    let inboxes: Vec<String> = match activity_type.as_str() {
74        "Follow" => activity
75            .get("targetInbox")
76            .and_then(|v| v.as_str())
77            .map(|s| vec![s.to_string()])
78            .unwrap_or_default(),
79        _ => store
80            .follower_inboxes(&actor.id)
81            .await
82            .map_err(OutboxError::Storage)?,
83    };
84
85    for inbox in &inboxes {
86        store
87            .enqueue_delivery(&activity_id, inbox)
88            .await
89            .map_err(OutboxError::Storage)?;
90    }
91
92    Ok(OutboundDelivery {
93        activity_id,
94        queued_inboxes: inboxes.len(),
95        activity,
96    })
97}
98
99/// Wrap a raw Note (or content-only object) in a `Create` activity.
100///
101/// JSS v0.0.67 accepts both raw Notes and pre-wrapped Create activities
102/// on the outbox POST endpoint. This helper normalises the former into
103/// the latter so downstream processing always sees a proper activity.
104fn wrap_note_in_create(actor: &Actor, note: serde_json::Value) -> serde_json::Value {
105    let base = actor.id.trim_end_matches("#me");
106    let activity_id = format!("{base}/activities/{}", uuid::Uuid::new_v4());
107    let now = Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
108
109    // Ensure the Note itself has an id.
110    let mut note = note;
111    if note.get("id").and_then(|v| v.as_str()).map(|s| s.is_empty()).unwrap_or(true) {
112        let note_id = format!("{base}/posts/{}", uuid::Uuid::new_v4());
113        note["id"] = serde_json::Value::String(note_id);
114    }
115    // Stamp attributedTo on the Note if missing.
116    if note.get("attributedTo").is_none() {
117        note["attributedTo"] = serde_json::Value::String(actor.id.clone());
118    }
119    // Stamp published on the Note if missing.
120    if note.get("published").is_none() {
121        note["published"] = serde_json::Value::String(now.clone());
122    }
123
124    serde_json::json!({
125        "@context": "https://www.w3.org/ns/activitystreams",
126        "type": "Create",
127        "id": activity_id,
128        "actor": actor.id,
129        "published": now,
130        "object": note,
131    })
132}
133
134/// Handle a POST to the outbox endpoint. Accepts either:
135///
136/// 1. A pre-formed Activity (has `type` == `Create`/`Follow`/etc.) — passed
137///    through to [`handle_outbox`].
138/// 2. A raw Note (`type` == `"Note"`, or has `content` but no `type`) —
139///    wrapped in a `Create` activity first, matching JSS v0.0.67 behaviour.
140///
141/// Returns the created/submitted activity via [`OutboundDelivery`].
142pub async fn handle_outbox_post(
143    store: &Store,
144    actor: &Actor,
145    body: serde_json::Value,
146) -> Result<OutboundDelivery, OutboxError> {
147    let activity_type = body.get("type").and_then(|v| v.as_str()).unwrap_or("");
148
149    let activity = match activity_type {
150        // Already a well-formed activity — pass through.
151        "Create" | "Follow" | "Update" | "Delete" | "Announce" | "Like" | "Undo" | "Accept"
152        | "Reject" | "Add" | "Remove" | "Block" => body,
153        // Raw Note — wrap in Create.
154        "Note" => wrap_note_in_create(actor, body),
155        // No type but has content — treat as implicit Note.
156        "" if body.get("content").is_some() => {
157            let mut note = body;
158            note["type"] = serde_json::Value::String("Note".into());
159            wrap_note_in_create(actor, note)
160        }
161        // Unknown type — try wrapping in Create as a best-effort.
162        other => {
163            return Err(OutboxError::InvalidActivity(format!(
164                "unsupported activity type for outbox POST: {other}"
165            )));
166        }
167    };
168
169    handle_outbox(store, actor, activity).await
170}
171
172// ---------------------------------------------------------------------------
173// Tests
174// ---------------------------------------------------------------------------
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use crate::actor::render_actor;
180
181    fn sample_actor() -> Actor {
182        render_actor("https://pod.example", "me", "Me", None, "PEM")
183    }
184
185    #[tokio::test]
186    async fn outbox_create_broadcasts_to_followers() {
187        let store = Store::in_memory().await.unwrap();
188        let actor = sample_actor();
189        // Add two followers.
190        store
191            .add_follower(&actor.id, "follower-a", Some("https://a/inbox"))
192            .await
193            .unwrap();
194        store
195            .add_follower(&actor.id, "follower-b", Some("https://b/inbox"))
196            .await
197            .unwrap();
198
199        let note_activity = serde_json::json!({
200            "type": "Create",
201            "object": {"type": "Note", "content": "hello world"}
202        });
203        let delivery = handle_outbox(&store, &actor, note_activity)
204            .await
205            .unwrap();
206        assert_eq!(delivery.queued_inboxes, 2);
207        assert!(delivery.activity.get("id").is_some());
208        assert_eq!(delivery.activity["actor"], actor.id);
209
210        // Confirm two rows exist in the delivery_queue.
211        let (n,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM delivery_queue")
212            .fetch_one(store.pool())
213            .await
214            .unwrap();
215        assert_eq!(n, 2);
216    }
217
218    #[tokio::test]
219    async fn outbox_follow_queues_delivery_to_target() {
220        let store = Store::in_memory().await.unwrap();
221        let actor = sample_actor();
222        let follow = serde_json::json!({
223            "type": "Follow",
224            "object": "https://other/actor",
225            "targetInbox": "https://other/inbox"
226        });
227        let delivery = handle_outbox(&store, &actor, follow).await.unwrap();
228        assert_eq!(delivery.queued_inboxes, 1);
229    }
230
231    #[tokio::test]
232    async fn outbox_rejects_missing_type() {
233        let store = Store::in_memory().await.unwrap();
234        let actor = sample_actor();
235        let err = handle_outbox(&store, &actor, serde_json::json!({})).await.unwrap_err();
236        assert!(matches!(err, OutboxError::InvalidActivity(_)));
237    }
238
239    #[tokio::test]
240    async fn outbox_generates_id_if_missing() {
241        let store = Store::in_memory().await.unwrap();
242        let actor = sample_actor();
243        let act = serde_json::json!({"type": "Create", "object": {"type": "Note"}});
244        let d = handle_outbox(&store, &actor, act).await.unwrap();
245        assert!(d.activity_id.starts_with("https://pod.example/profile/card.jsonld/activities/"));
246    }
247
248    // --- handle_outbox_post tests ---
249
250    #[tokio::test]
251    async fn outbox_post_wraps_raw_note_in_create() {
252        let store = Store::in_memory().await.unwrap();
253        let actor = sample_actor();
254        let note = serde_json::json!({
255            "type": "Note",
256            "content": "Hello from outbox POST"
257        });
258        let delivery = handle_outbox_post(&store, &actor, note).await.unwrap();
259        assert_eq!(delivery.activity["type"], "Create");
260        assert_eq!(delivery.activity["object"]["type"], "Note");
261        assert_eq!(delivery.activity["object"]["content"], "Hello from outbox POST");
262        // The Note should have attributedTo and published stamped.
263        assert_eq!(delivery.activity["object"]["attributedTo"], actor.id);
264        assert!(delivery.activity["object"]["published"].as_str().is_some());
265        // The Create should have an id and published.
266        assert!(delivery.activity["id"].as_str().is_some());
267        assert!(delivery.activity["published"].as_str().is_some());
268    }
269
270    #[tokio::test]
271    async fn outbox_post_passes_through_create_activity() {
272        let store = Store::in_memory().await.unwrap();
273        let actor = sample_actor();
274        let create = serde_json::json!({
275            "type": "Create",
276            "object": {"type": "Note", "content": "pre-wrapped"}
277        });
278        let delivery = handle_outbox_post(&store, &actor, create).await.unwrap();
279        assert_eq!(delivery.activity["type"], "Create");
280        assert_eq!(delivery.activity["object"]["content"], "pre-wrapped");
281    }
282
283    #[tokio::test]
284    async fn outbox_post_wraps_content_only_body_as_note() {
285        let store = Store::in_memory().await.unwrap();
286        let actor = sample_actor();
287        // No type, but has content — should be treated as an implicit Note.
288        let body = serde_json::json!({"content": "implicit note"});
289        let delivery = handle_outbox_post(&store, &actor, body).await.unwrap();
290        assert_eq!(delivery.activity["type"], "Create");
291        assert_eq!(delivery.activity["object"]["type"], "Note");
292        assert_eq!(delivery.activity["object"]["content"], "implicit note");
293    }
294
295    #[tokio::test]
296    async fn outbox_post_rejects_unsupported_type() {
297        let store = Store::in_memory().await.unwrap();
298        let actor = sample_actor();
299        let body = serde_json::json!({"type": "TentacleWiggle"});
300        let err = handle_outbox_post(&store, &actor, body).await.unwrap_err();
301        assert!(matches!(err, OutboxError::InvalidActivity(_)));
302    }
303
304    #[tokio::test]
305    async fn outbox_post_note_delivers_to_followers() {
306        let store = Store::in_memory().await.unwrap();
307        let actor = sample_actor();
308        store.add_follower(&actor.id, "f1", Some("https://f1/inbox")).await.unwrap();
309        store.add_follower(&actor.id, "f2", Some("https://f2/inbox")).await.unwrap();
310
311        let note = serde_json::json!({"type": "Note", "content": "fan-out test"});
312        let delivery = handle_outbox_post(&store, &actor, note).await.unwrap();
313        assert_eq!(delivery.queued_inboxes, 2);
314    }
315}