solid_pod_rs_activitypub/
outbox.rs1use chrono::Utc;
11use serde::{Deserialize, Serialize};
12
13use crate::{
14 actor::Actor,
15 error::OutboxError,
16 store::Store,
17};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct OutboundDelivery {
22 pub activity_id: String,
23 pub queued_inboxes: usize,
25 pub activity: serde_json::Value,
28}
29
30pub async fn handle_outbox(
38 store: &Store,
39 actor: &Actor,
40 activity: serde_json::Value,
41) -> Result<OutboundDelivery, OutboxError> {
42 let activity_type = activity
43 .get("type")
44 .and_then(|v| v.as_str())
45 .ok_or_else(|| OutboxError::InvalidActivity("missing type".into()))?
46 .to_string();
47
48 let mut activity = activity;
50 if activity
51 .get("id")
52 .and_then(|v| v.as_str())
53 .map(|s| s.is_empty())
54 .unwrap_or(true)
55 {
56 let base = actor.id.trim_end_matches("#me");
57 let fresh_id = format!("{base}/activities/{}", uuid::Uuid::new_v4());
58 activity["id"] = serde_json::Value::String(fresh_id);
59 }
60
61 if activity.get("actor").and_then(|v| v.as_str()).is_none() {
63 activity["actor"] = serde_json::Value::String(actor.id.clone());
64 }
65
66 let activity_id = store.record_outbox(&actor.id, &activity).await?;
67
68 let inboxes: Vec<String> = match activity_type.as_str() {
74 "Follow" => activity
75 .get("targetInbox")
76 .and_then(|v| v.as_str())
77 .map(|s| vec![s.to_string()])
78 .unwrap_or_default(),
79 _ => store
80 .follower_inboxes(&actor.id)
81 .await
82 .map_err(OutboxError::Storage)?,
83 };
84
85 for inbox in &inboxes {
86 store
87 .enqueue_delivery(&activity_id, inbox)
88 .await
89 .map_err(OutboxError::Storage)?;
90 }
91
92 Ok(OutboundDelivery {
93 activity_id,
94 queued_inboxes: inboxes.len(),
95 activity,
96 })
97}
98
99fn wrap_note_in_create(actor: &Actor, note: serde_json::Value) -> serde_json::Value {
105 let base = actor.id.trim_end_matches("#me");
106 let activity_id = format!("{base}/activities/{}", uuid::Uuid::new_v4());
107 let now = Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
108
109 let mut note = note;
111 if note.get("id").and_then(|v| v.as_str()).map(|s| s.is_empty()).unwrap_or(true) {
112 let note_id = format!("{base}/posts/{}", uuid::Uuid::new_v4());
113 note["id"] = serde_json::Value::String(note_id);
114 }
115 if note.get("attributedTo").is_none() {
117 note["attributedTo"] = serde_json::Value::String(actor.id.clone());
118 }
119 if note.get("published").is_none() {
121 note["published"] = serde_json::Value::String(now.clone());
122 }
123
124 serde_json::json!({
125 "@context": "https://www.w3.org/ns/activitystreams",
126 "type": "Create",
127 "id": activity_id,
128 "actor": actor.id,
129 "published": now,
130 "object": note,
131 })
132}
133
134pub async fn handle_outbox_post(
143 store: &Store,
144 actor: &Actor,
145 body: serde_json::Value,
146) -> Result<OutboundDelivery, OutboxError> {
147 let activity_type = body.get("type").and_then(|v| v.as_str()).unwrap_or("");
148
149 let activity = match activity_type {
150 "Create" | "Follow" | "Update" | "Delete" | "Announce" | "Like" | "Undo" | "Accept"
152 | "Reject" | "Add" | "Remove" | "Block" => body,
153 "Note" => wrap_note_in_create(actor, body),
155 "" if body.get("content").is_some() => {
157 let mut note = body;
158 note["type"] = serde_json::Value::String("Note".into());
159 wrap_note_in_create(actor, note)
160 }
161 other => {
163 return Err(OutboxError::InvalidActivity(format!(
164 "unsupported activity type for outbox POST: {other}"
165 )));
166 }
167 };
168
169 handle_outbox(store, actor, activity).await
170}
171
172#[cfg(test)]
177mod tests {
178 use super::*;
179 use crate::actor::render_actor;
180
181 fn sample_actor() -> Actor {
182 render_actor("https://pod.example", "me", "Me", None, "PEM")
183 }
184
185 #[tokio::test]
186 async fn outbox_create_broadcasts_to_followers() {
187 let store = Store::in_memory().await.unwrap();
188 let actor = sample_actor();
189 store
191 .add_follower(&actor.id, "follower-a", Some("https://a/inbox"))
192 .await
193 .unwrap();
194 store
195 .add_follower(&actor.id, "follower-b", Some("https://b/inbox"))
196 .await
197 .unwrap();
198
199 let note_activity = serde_json::json!({
200 "type": "Create",
201 "object": {"type": "Note", "content": "hello world"}
202 });
203 let delivery = handle_outbox(&store, &actor, note_activity)
204 .await
205 .unwrap();
206 assert_eq!(delivery.queued_inboxes, 2);
207 assert!(delivery.activity.get("id").is_some());
208 assert_eq!(delivery.activity["actor"], actor.id);
209
210 let (n,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM delivery_queue")
212 .fetch_one(store.pool())
213 .await
214 .unwrap();
215 assert_eq!(n, 2);
216 }
217
218 #[tokio::test]
219 async fn outbox_follow_queues_delivery_to_target() {
220 let store = Store::in_memory().await.unwrap();
221 let actor = sample_actor();
222 let follow = serde_json::json!({
223 "type": "Follow",
224 "object": "https://other/actor",
225 "targetInbox": "https://other/inbox"
226 });
227 let delivery = handle_outbox(&store, &actor, follow).await.unwrap();
228 assert_eq!(delivery.queued_inboxes, 1);
229 }
230
231 #[tokio::test]
232 async fn outbox_rejects_missing_type() {
233 let store = Store::in_memory().await.unwrap();
234 let actor = sample_actor();
235 let err = handle_outbox(&store, &actor, serde_json::json!({})).await.unwrap_err();
236 assert!(matches!(err, OutboxError::InvalidActivity(_)));
237 }
238
239 #[tokio::test]
240 async fn outbox_generates_id_if_missing() {
241 let store = Store::in_memory().await.unwrap();
242 let actor = sample_actor();
243 let act = serde_json::json!({"type": "Create", "object": {"type": "Note"}});
244 let d = handle_outbox(&store, &actor, act).await.unwrap();
245 assert!(d.activity_id.starts_with("https://pod.example/profile/card.jsonld/activities/"));
246 }
247
248 #[tokio::test]
251 async fn outbox_post_wraps_raw_note_in_create() {
252 let store = Store::in_memory().await.unwrap();
253 let actor = sample_actor();
254 let note = serde_json::json!({
255 "type": "Note",
256 "content": "Hello from outbox POST"
257 });
258 let delivery = handle_outbox_post(&store, &actor, note).await.unwrap();
259 assert_eq!(delivery.activity["type"], "Create");
260 assert_eq!(delivery.activity["object"]["type"], "Note");
261 assert_eq!(delivery.activity["object"]["content"], "Hello from outbox POST");
262 assert_eq!(delivery.activity["object"]["attributedTo"], actor.id);
264 assert!(delivery.activity["object"]["published"].as_str().is_some());
265 assert!(delivery.activity["id"].as_str().is_some());
267 assert!(delivery.activity["published"].as_str().is_some());
268 }
269
270 #[tokio::test]
271 async fn outbox_post_passes_through_create_activity() {
272 let store = Store::in_memory().await.unwrap();
273 let actor = sample_actor();
274 let create = serde_json::json!({
275 "type": "Create",
276 "object": {"type": "Note", "content": "pre-wrapped"}
277 });
278 let delivery = handle_outbox_post(&store, &actor, create).await.unwrap();
279 assert_eq!(delivery.activity["type"], "Create");
280 assert_eq!(delivery.activity["object"]["content"], "pre-wrapped");
281 }
282
283 #[tokio::test]
284 async fn outbox_post_wraps_content_only_body_as_note() {
285 let store = Store::in_memory().await.unwrap();
286 let actor = sample_actor();
287 let body = serde_json::json!({"content": "implicit note"});
289 let delivery = handle_outbox_post(&store, &actor, body).await.unwrap();
290 assert_eq!(delivery.activity["type"], "Create");
291 assert_eq!(delivery.activity["object"]["type"], "Note");
292 assert_eq!(delivery.activity["object"]["content"], "implicit note");
293 }
294
295 #[tokio::test]
296 async fn outbox_post_rejects_unsupported_type() {
297 let store = Store::in_memory().await.unwrap();
298 let actor = sample_actor();
299 let body = serde_json::json!({"type": "TentacleWiggle"});
300 let err = handle_outbox_post(&store, &actor, body).await.unwrap_err();
301 assert!(matches!(err, OutboxError::InvalidActivity(_)));
302 }
303
304 #[tokio::test]
305 async fn outbox_post_note_delivers_to_followers() {
306 let store = Store::in_memory().await.unwrap();
307 let actor = sample_actor();
308 store.add_follower(&actor.id, "f1", Some("https://f1/inbox")).await.unwrap();
309 store.add_follower(&actor.id, "f2", Some("https://f2/inbox")).await.unwrap();
310
311 let note = serde_json::json!({"type": "Note", "content": "fan-out test"});
312 let delivery = handle_outbox_post(&store, &actor, note).await.unwrap();
313 assert_eq!(delivery.queued_inboxes, 2);
314 }
315}