cloudillo_action/
delivery.rs1use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6
7use cloudillo_core::scheduler::{Task, TaskId};
8
9use crate::prelude::*;
10
11#[derive(Debug, Serialize, Deserialize)]
14pub struct ActionDeliveryTask {
15 pub tn_id: TnId,
16 pub action_id: Box<str>,
17 pub target_instance: Box<str>, pub target_id_tag: Box<str>, #[serde(default, skip_serializing_if = "Option::is_none")]
22 pub related_action_id: Option<Box<str>>,
23}
24
25impl ActionDeliveryTask {
26 pub fn new(
27 tn_id: TnId,
28 action_id: Box<str>,
29 target_instance: Box<str>,
30 target_id_tag: Box<str>,
31 ) -> Arc<Self> {
32 Arc::new(Self { tn_id, action_id, target_instance, target_id_tag, related_action_id: None })
33 }
34
35 pub fn new_with_related(
37 tn_id: TnId,
38 action_id: Box<str>,
39 target_instance: Box<str>,
40 target_id_tag: Box<str>,
41 related_action_id: Option<Box<str>>,
42 ) -> Arc<Self> {
43 Arc::new(Self { tn_id, action_id, target_instance, target_id_tag, related_action_id })
44 }
45}
46
47#[async_trait]
48impl Task<App> for ActionDeliveryTask {
49 fn kind() -> &'static str {
50 "action.delivery"
51 }
52
53 fn kind_of(&self) -> &'static str {
54 Self::kind()
55 }
56
57 fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<App>>> {
58 let task: ActionDeliveryTask = serde_json::from_str(ctx)?;
59 Ok(Arc::new(task))
60 }
61
62 fn serialize(&self) -> String {
63 serde_json::to_string(self).unwrap_or_else(|e| {
66 error!("Failed to serialize ActionDeliveryTask: {}", e);
67 "{}".to_string()
68 })
69 }
70
71 async fn run(&self, app: &App) -> ClResult<()> {
72 debug!("→ DELIVER: {} to {}", self.action_id, self.target_instance);
73
74 let action = app.meta_adapter.get_action(self.tn_id, &self.action_id).await?;
76
77 let _action = match action {
78 Some(a) => a,
79 None => {
80 warn!("Action {} not found for delivery task, marking as complete", self.action_id);
82 return Ok(());
83 }
84 };
85
86 let action_token = app.meta_adapter.get_action_token(self.tn_id, &self.action_id).await?;
88
89 let action_token = match action_token {
90 Some(token) => token,
91 None => {
92 error!("No action token found for action {}", self.action_id);
93 return Err(Error::Internal(format!(
94 "action token not found for action {}",
95 self.action_id
96 )));
97 }
98 };
99
100 let mut payload = serde_json::json!({
102 "token": action_token.clone()
103 });
104
105 if let Some(ref related_id) = self.related_action_id {
107 if let Ok(Some(related_token)) =
108 app.meta_adapter.get_action_token(self.tn_id, related_id).await
109 {
110 payload["related"] = serde_json::json!([related_token]);
111 debug!(
112 "Including related action {} token in delivery to {}",
113 related_id, self.target_instance
114 );
115 } else {
116 warn!("Related action {} token not found, delivering without it", related_id);
117 }
118 }
119
120 match app
122 .request
123 .post::<serde_json::Value>(self.tn_id, &self.target_id_tag, "/inbox", &payload)
124 .await
125 {
126 Ok(_) => {
127 info!("← DELIVERED: {} to {}", self.action_id, self.target_instance);
129 Ok(())
130 }
131 Err(e) => {
132 warn!(
134 "Failed to deliver action {} to {}: {}",
135 self.action_id, self.target_instance, e
136 );
137 Err(e)
138 }
139 }
140 }
141}
142
143impl Clone for ActionDeliveryTask {
144 fn clone(&self) -> Self {
145 Self {
146 tn_id: self.tn_id,
147 action_id: self.action_id.clone(),
148 target_instance: self.target_instance.clone(),
149 target_id_tag: self.target_id_tag.clone(),
150 related_action_id: self.related_action_id.clone(),
151 }
152 }
153}
154
155