Skip to main content

cloudillo_action/
delivery.rs

1//! Action delivery task for federated action distribution
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6
7use cloudillo_core::scheduler::{Task, TaskId};
8
9use crate::prelude::*;
10
11/// Task for delivering federated actions
12/// Retry logic is handled by the scheduler with RetryPolicy
13#[derive(Debug, Serialize, Deserialize)]
14pub struct ActionDeliveryTask {
15	pub tn_id: TnId,
16	pub action_id: Box<str>,
17	pub target_instance: Box<str>, // Base domain of target instance
18	pub target_id_tag: Box<str>,   // User on target instance to deliver to
19	/// Optional related action ID (e.g., for APRV, this is the subject action being approved)
20	/// When set, the related action's token is included in the `related` field of the inbox payload
21	#[serde(default, skip_serializing_if = "Option::is_none")]
22	pub related_action_id: Option<Box<str>>,
23}
24
25impl ActionDeliveryTask {
26	pub fn new(
27		tn_id: TnId,
28		action_id: Box<str>,
29		target_instance: Box<str>,
30		target_id_tag: Box<str>,
31	) -> Arc<Self> {
32		Arc::new(Self { tn_id, action_id, target_instance, target_id_tag, related_action_id: None })
33	}
34
35	/// Create a delivery task with a related action (used for APRV fan-out to include the approved action)
36	pub fn new_with_related(
37		tn_id: TnId,
38		action_id: Box<str>,
39		target_instance: Box<str>,
40		target_id_tag: Box<str>,
41		related_action_id: Option<Box<str>>,
42	) -> Arc<Self> {
43		Arc::new(Self { tn_id, action_id, target_instance, target_id_tag, related_action_id })
44	}
45}
46
47#[async_trait]
48impl Task<App> for ActionDeliveryTask {
49	fn kind() -> &'static str {
50		"action.delivery"
51	}
52
53	fn kind_of(&self) -> &'static str {
54		Self::kind()
55	}
56
57	fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<App>>> {
58		let task: ActionDeliveryTask = serde_json::from_str(ctx)?;
59		Ok(Arc::new(task))
60	}
61
62	fn serialize(&self) -> String {
63		// Safe: ActionDeliveryTask is a simple struct with all serializable fields
64		// This should never fail unless there's a bug in serde
65		serde_json::to_string(self).unwrap_or_else(|e| {
66			error!("Failed to serialize ActionDeliveryTask: {}", e);
67			"{}".to_string()
68		})
69	}
70
71	async fn run(&self, app: &App) -> ClResult<()> {
72		debug!("→ DELIVER: {} to {}", self.action_id, self.target_instance);
73
74		// Fetch action from database
75		let action = app.meta_adapter.get_action(self.tn_id, &self.action_id).await?;
76
77		let _action = match action {
78			Some(a) => a,
79			None => {
80				// Action was deleted, mark delivery task as complete
81				warn!("Action {} not found for delivery task, marking as complete", self.action_id);
82				return Ok(());
83			}
84		};
85
86		// Get action token
87		let action_token = app.meta_adapter.get_action_token(self.tn_id, &self.action_id).await?;
88
89		let action_token = match action_token {
90			Some(token) => token,
91			None => {
92				error!("No action token found for action {}", self.action_id);
93				return Err(Error::Internal(format!(
94					"action token not found for action {}",
95					self.action_id
96				)));
97			}
98		};
99
100		// Prepare inbox request payload
101		let mut payload = serde_json::json!({
102			"token": action_token.clone()
103		});
104
105		// If there's a related action (e.g., for APRV fan-out), include its token
106		if let Some(ref related_id) = self.related_action_id {
107			if let Ok(Some(related_token)) =
108				app.meta_adapter.get_action_token(self.tn_id, related_id).await
109			{
110				payload["related"] = serde_json::json!([related_token]);
111				debug!(
112					"Including related action {} token in delivery to {}",
113					related_id, self.target_instance
114				);
115			} else {
116				warn!("Related action {} token not found, delivering without it", related_id);
117			}
118		}
119
120		// POST to remote instance inbox
121		match app
122			.request
123			.post::<serde_json::Value>(self.tn_id, &self.target_id_tag, "/inbox", &payload)
124			.await
125		{
126			Ok(_) => {
127				// Success - action delivered
128				info!("← DELIVERED: {} to {}", self.action_id, self.target_instance);
129				Ok(())
130			}
131			Err(e) => {
132				// Delivery failed - scheduler will handle retries with RetryPolicy
133				warn!(
134					"Failed to deliver action {} to {}: {}",
135					self.action_id, self.target_instance, e
136				);
137				Err(e)
138			}
139		}
140	}
141}
142
143impl Clone for ActionDeliveryTask {
144	fn clone(&self) -> Self {
145		Self {
146			tn_id: self.tn_id,
147			action_id: self.action_id.clone(),
148			target_instance: self.target_instance.clone(),
149			target_id_tag: self.target_id_tag.clone(),
150			related_action_id: self.related_action_id.clone(),
151		}
152	}
153}
154
155// vim: ts=4