use chrono::Utc;
use serde::{Deserialize, Serialize};
use crate::{
actor::Actor,
error::OutboxError,
store::Store,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OutboundDelivery {
pub activity_id: String,
pub queued_inboxes: usize,
pub activity: serde_json::Value,
}
pub async fn handle_outbox(
store: &Store,
actor: &Actor,
activity: serde_json::Value,
) -> Result<OutboundDelivery, OutboxError> {
let activity_type = activity
.get("type")
.and_then(|v| v.as_str())
.ok_or_else(|| OutboxError::InvalidActivity("missing type".into()))?
.to_string();
let mut activity = activity;
if activity
.get("id")
.and_then(|v| v.as_str())
.map(|s| s.is_empty())
.unwrap_or(true)
{
let base = actor.id.trim_end_matches("#me");
let fresh_id = format!("{base}/activities/{}", uuid::Uuid::new_v4());
activity["id"] = serde_json::Value::String(fresh_id);
}
if activity.get("actor").and_then(|v| v.as_str()).is_none() {
activity["actor"] = serde_json::Value::String(actor.id.clone());
}
let activity_id = store.record_outbox(&actor.id, &activity).await?;
let inboxes: Vec<String> = match activity_type.as_str() {
"Follow" => activity
.get("targetInbox")
.and_then(|v| v.as_str())
.map(|s| vec![s.to_string()])
.unwrap_or_default(),
_ => store
.follower_inboxes(&actor.id)
.await
.map_err(OutboxError::Storage)?,
};
for inbox in &inboxes {
store
.enqueue_delivery(&activity_id, inbox)
.await
.map_err(OutboxError::Storage)?;
}
Ok(OutboundDelivery {
activity_id,
queued_inboxes: inboxes.len(),
activity,
})
}
fn wrap_note_in_create(actor: &Actor, note: serde_json::Value) -> serde_json::Value {
let base = actor.id.trim_end_matches("#me");
let activity_id = format!("{base}/activities/{}", uuid::Uuid::new_v4());
let now = Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
let mut note = note;
if note.get("id").and_then(|v| v.as_str()).map(|s| s.is_empty()).unwrap_or(true) {
let note_id = format!("{base}/posts/{}", uuid::Uuid::new_v4());
note["id"] = serde_json::Value::String(note_id);
}
if note.get("attributedTo").is_none() {
note["attributedTo"] = serde_json::Value::String(actor.id.clone());
}
if note.get("published").is_none() {
note["published"] = serde_json::Value::String(now.clone());
}
serde_json::json!({
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Create",
"id": activity_id,
"actor": actor.id,
"published": now,
"object": note,
})
}
pub async fn handle_outbox_post(
store: &Store,
actor: &Actor,
body: serde_json::Value,
) -> Result<OutboundDelivery, OutboxError> {
let activity_type = body.get("type").and_then(|v| v.as_str()).unwrap_or("");
let activity = match activity_type {
"Create" | "Follow" | "Update" | "Delete" | "Announce" | "Like" | "Undo" | "Accept"
| "Reject" | "Add" | "Remove" | "Block" => body,
"Note" => wrap_note_in_create(actor, body),
"" if body.get("content").is_some() => {
let mut note = body;
note["type"] = serde_json::Value::String("Note".into());
wrap_note_in_create(actor, note)
}
other => {
return Err(OutboxError::InvalidActivity(format!(
"unsupported activity type for outbox POST: {other}"
)));
}
};
handle_outbox(store, actor, activity).await
}
#[cfg(test)]
mod tests {
use super::*;
use crate::actor::render_actor;
fn sample_actor() -> Actor {
render_actor("https://pod.example", "me", "Me", None, "PEM")
}
#[tokio::test]
async fn outbox_create_broadcasts_to_followers() {
let store = Store::in_memory().await.unwrap();
let actor = sample_actor();
store
.add_follower(&actor.id, "follower-a", Some("https://a/inbox"))
.await
.unwrap();
store
.add_follower(&actor.id, "follower-b", Some("https://b/inbox"))
.await
.unwrap();
let note_activity = serde_json::json!({
"type": "Create",
"object": {"type": "Note", "content": "hello world"}
});
let delivery = handle_outbox(&store, &actor, note_activity)
.await
.unwrap();
assert_eq!(delivery.queued_inboxes, 2);
assert!(delivery.activity.get("id").is_some());
assert_eq!(delivery.activity["actor"], actor.id);
let (n,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM delivery_queue")
.fetch_one(store.pool())
.await
.unwrap();
assert_eq!(n, 2);
}
#[tokio::test]
async fn outbox_follow_queues_delivery_to_target() {
let store = Store::in_memory().await.unwrap();
let actor = sample_actor();
let follow = serde_json::json!({
"type": "Follow",
"object": "https://other/actor",
"targetInbox": "https://other/inbox"
});
let delivery = handle_outbox(&store, &actor, follow).await.unwrap();
assert_eq!(delivery.queued_inboxes, 1);
}
#[tokio::test]
async fn outbox_rejects_missing_type() {
let store = Store::in_memory().await.unwrap();
let actor = sample_actor();
let err = handle_outbox(&store, &actor, serde_json::json!({})).await.unwrap_err();
assert!(matches!(err, OutboxError::InvalidActivity(_)));
}
#[tokio::test]
async fn outbox_generates_id_if_missing() {
let store = Store::in_memory().await.unwrap();
let actor = sample_actor();
let act = serde_json::json!({"type": "Create", "object": {"type": "Note"}});
let d = handle_outbox(&store, &actor, act).await.unwrap();
assert!(d.activity_id.starts_with("https://pod.example/profile/card.jsonld/activities/"));
}
#[tokio::test]
async fn outbox_post_wraps_raw_note_in_create() {
let store = Store::in_memory().await.unwrap();
let actor = sample_actor();
let note = serde_json::json!({
"type": "Note",
"content": "Hello from outbox POST"
});
let delivery = handle_outbox_post(&store, &actor, note).await.unwrap();
assert_eq!(delivery.activity["type"], "Create");
assert_eq!(delivery.activity["object"]["type"], "Note");
assert_eq!(delivery.activity["object"]["content"], "Hello from outbox POST");
assert_eq!(delivery.activity["object"]["attributedTo"], actor.id);
assert!(delivery.activity["object"]["published"].as_str().is_some());
assert!(delivery.activity["id"].as_str().is_some());
assert!(delivery.activity["published"].as_str().is_some());
}
#[tokio::test]
async fn outbox_post_passes_through_create_activity() {
let store = Store::in_memory().await.unwrap();
let actor = sample_actor();
let create = serde_json::json!({
"type": "Create",
"object": {"type": "Note", "content": "pre-wrapped"}
});
let delivery = handle_outbox_post(&store, &actor, create).await.unwrap();
assert_eq!(delivery.activity["type"], "Create");
assert_eq!(delivery.activity["object"]["content"], "pre-wrapped");
}
#[tokio::test]
async fn outbox_post_wraps_content_only_body_as_note() {
let store = Store::in_memory().await.unwrap();
let actor = sample_actor();
let body = serde_json::json!({"content": "implicit note"});
let delivery = handle_outbox_post(&store, &actor, body).await.unwrap();
assert_eq!(delivery.activity["type"], "Create");
assert_eq!(delivery.activity["object"]["type"], "Note");
assert_eq!(delivery.activity["object"]["content"], "implicit note");
}
#[tokio::test]
async fn outbox_post_rejects_unsupported_type() {
let store = Store::in_memory().await.unwrap();
let actor = sample_actor();
let body = serde_json::json!({"type": "TentacleWiggle"});
let err = handle_outbox_post(&store, &actor, body).await.unwrap_err();
assert!(matches!(err, OutboxError::InvalidActivity(_)));
}
#[tokio::test]
async fn outbox_post_note_delivers_to_followers() {
let store = Store::in_memory().await.unwrap();
let actor = sample_actor();
store.add_follower(&actor.id, "f1", Some("https://f1/inbox")).await.unwrap();
store.add_follower(&actor.id, "f2", Some("https://f2/inbox")).await.unwrap();
let note = serde_json::json!({"type": "Note", "content": "fan-out test"});
let delivery = handle_outbox_post(&store, &actor, note).await.unwrap();
assert_eq!(delivery.queued_inboxes, 2);
}
}