1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use std::collections::HashSet;
4use std::sync::Arc;
5
6pub use cloudillo_types::action_types::{CreateAction, ACCESS_TOKEN_EXPIRY};
8
9use cloudillo_core::scheduler::{RetryPolicy, Task, TaskId};
10use cloudillo_file::descriptor;
11use cloudillo_file::management::upgrade_file_visibility;
12use cloudillo_types::hasher;
13use cloudillo_types::meta_adapter;
14
15use crate::{
16 delivery::ActionDeliveryTask,
17 dsl::DslEngine,
18 helpers,
19 post_store::{self, ProcessingContext},
20 prelude::*,
21 process,
22};
23
24pub async fn create_action(
25 app: &App,
26 tn_id: TnId,
27 id_tag: &str,
28 action: CreateAction,
29) -> ClResult<Box<str>> {
30 let dsl = app.ext::<Arc<DslEngine>>()?;
31
32 let is_ephemeral = dsl
34 .get_behavior(action.typ.as_ref())
35 .map(|b| b.ephemeral.unwrap_or(false))
36 .unwrap_or(false);
37
38 if is_ephemeral {
39 return create_ephemeral_action(app, tn_id, id_tag, action).await;
40 }
41
42 let behavior = dsl.get_behavior(action.typ.as_ref());
44
45 let allow_unknown = behavior.as_ref().and_then(|b| b.allow_unknown).unwrap_or(false);
48 if !allow_unknown {
49 if let Some(ref audience_tag) = action.audience_tag {
50 if audience_tag.as_ref() != id_tag {
52 let has_relationship = app
53 .meta_adapter
54 .read_profile(tn_id, audience_tag)
55 .await
56 .ok()
57 .map(|(_, p)| p.following || p.connected.is_connected())
58 .unwrap_or(false);
59
60 if !has_relationship {
61 return Err(Error::ValidationError(format!(
62 "Cannot send {} to unknown recipient {}",
63 action.typ, audience_tag
64 )));
65 }
66 }
67 }
68 }
69
70 let requires_subscription =
73 behavior.as_ref().and_then(|b| b.requires_subscription).unwrap_or(false);
74 if requires_subscription {
75 let target_id = action.subject.as_deref().or(action.parent_id.as_deref());
76 if let Some(target_id) = target_id {
77 if !target_id.starts_with('@') {
79 let target_action = app.meta_adapter.get_action(tn_id, target_id).await?;
81 if let Some(target) = target_action {
82 if target.issuer.id_tag.as_ref() != id_tag {
84 let subs_key = format!("SUBS:{}:{}", target_id, id_tag);
86 let subscription =
87 app.meta_adapter.get_action_by_key(tn_id, &subs_key).await?;
88
89 if subscription.is_none() {
90 let root_sub = if let Some(root_id) = &target.root_id {
92 let root_subs_key = format!("SUBS:{}:{}", root_id, id_tag);
93 app.meta_adapter.get_action_by_key(tn_id, &root_subs_key).await?
94 } else {
95 None
96 };
97
98 if root_sub.is_none() {
99 return Err(Error::ValidationError(format!(
100 "Cannot send {} without subscription to {}",
101 action.typ, target_id
102 )));
103 }
104 }
105 }
106 }
107 }
108 }
109 }
110
111 {
113 let (_action_type, sub_type) = helpers::extract_type_and_subtype(&action.typ);
114 let is_delete = sub_type.as_deref() == Some("DEL");
115
116 if !is_delete {
117 if let Some(flag) = behavior.as_ref().and_then(|b| b.gated_by_parent_flag) {
118 if let Some(ref parent_id) = action.parent_id {
119 if !parent_id.starts_with('@') {
120 if let Ok(Some(parent)) =
121 app.meta_adapter.get_action(tn_id, parent_id).await
122 {
123 if !helpers::is_capability_enabled(parent.flags.as_deref(), flag) {
124 return Err(Error::ValidationError(format!(
125 "{} is disabled on the parent action",
126 action.typ
127 )));
128 }
129 }
130 }
131 }
132 }
133 if let Some(flag) = behavior.as_ref().and_then(|b| b.gated_by_subject_flag) {
134 if let Some(ref subject_id) = action.subject {
135 if !subject_id.starts_with('@') {
136 if let Ok(Some(subject)) =
137 app.meta_adapter.get_action(tn_id, subject_id).await
138 {
139 if !helpers::is_capability_enabled(subject.flags.as_deref(), flag) {
140 return Err(Error::ValidationError(format!(
141 "{} is disabled on the subject action",
142 action.typ
143 )));
144 }
145 }
146 }
147 }
148 }
149 }
150 }
151
152 let content_str = helpers::serialize_content(action.content.as_ref());
154
155 let visibility = helpers::inherit_visibility(
157 app.meta_adapter.as_ref(),
158 tn_id,
159 action.visibility,
160 action.parent_id.as_deref(),
161 )
162 .await;
163
164 let visibility = if visibility.is_some() {
166 visibility
167 } else {
168 match app.settings.get_string(tn_id, "privacy.default_visibility").await {
170 Ok(default_vis) => default_vis.chars().next(),
171 Err(_) => Some('F'), }
173 };
174
175 let visibility = if helpers::is_open(action.flags.as_deref()) {
177 Some('C') } else {
179 visibility
180 };
181
182 let root_id =
184 helpers::resolve_root_id(app.meta_adapter.as_ref(), tn_id, action.parent_id.as_deref())
185 .await;
186
187 let pending_action = meta_adapter::Action {
189 action_id: "", issuer_tag: id_tag,
191 typ: action.typ.as_ref(),
192 sub_typ: action.sub_typ.as_deref(),
193 parent_id: action.parent_id.as_deref(),
194 root_id: root_id.as_deref(),
195 audience_tag: action.audience_tag.as_deref(),
196 content: content_str.as_deref(),
197 attachments: action.attachments.as_ref().map(|v| v.iter().map(|a| a.as_ref()).collect()),
198 subject: action.subject.as_deref(),
199 expires_at: action.expires_at,
200 created_at: Timestamp::now(),
201 visibility,
202 flags: action.flags.as_deref(),
203 x: action.x.clone(),
204 };
205
206 let key = dsl.get_key_pattern(action.typ.as_ref()).map(|pattern| {
208 helpers::apply_key_pattern(
209 pattern,
210 action.typ.as_ref(),
211 id_tag,
212 action.audience_tag.as_deref(),
213 action.parent_id.as_deref(),
214 action.subject.as_deref(),
215 )
216 });
217 let action_result =
218 app.meta_adapter.create_action(tn_id, &pending_action, key.as_deref()).await?;
219
220 let a_id = match action_result {
221 meta_adapter::ActionId::AId(a_id) => a_id,
222 meta_adapter::ActionId::ActionId(_) => {
223 return Err(Error::Internal("Unexpected ActionId result".into()));
225 }
226 };
227
228 let attachments_to_wait = if let Some(attachments) = &action.attachments {
230 attachments
231 .iter()
232 .filter(|a| a.starts_with("@"))
233 .map(|a| format!("{},{}", tn_id, &a[1..]).into_boxed_str())
234 .collect::<Vec<_>>()
235 } else {
236 Vec::new()
237 };
238
239 let subject_key = action.subject.as_ref().and_then(|s| {
241 s.strip_prefix('@')
242 .map(|a_id_str| format!("{},{}", tn_id, a_id_str).into_boxed_str())
243 });
244
245 debug!(
246 "Dependencies for a_id={}: attachments={:?}, subject={:?}",
247 a_id, attachments_to_wait, subject_key
248 );
249
250 let file_deps = app
252 .meta_adapter
253 .list_task_ids(
254 descriptor::FileIdGeneratorTask::kind(),
255 &attachments_to_wait.into_boxed_slice(),
256 )
257 .await?;
258
259 let subject_deps = if let Some(ref key) = subject_key {
261 let keys = vec![key.clone()];
262 app.meta_adapter.list_task_ids(ActionCreatorTask::kind(), &keys).await?
263 } else {
264 Vec::new()
265 };
266
267 let mut deps = file_deps;
269 deps.extend(subject_deps);
270 debug!("Task dependencies: {:?}", deps);
271
272 let task = ActionCreatorTask::new(tn_id, Box::from(id_tag), a_id, action);
274 app.scheduler
275 .task(task)
276 .key(format!("{},{}", tn_id, a_id))
277 .depend_on(deps)
278 .schedule()
279 .await?;
280
281 Ok(format!("@{}", a_id).into_boxed_str())
283}
284
285async fn create_ephemeral_action(
288 app: &App,
289 tn_id: TnId,
290 id_tag: &str,
291 action: CreateAction,
292) -> ClResult<Box<str>> {
293 use crate::forward::{self, ForwardActionParams};
294
295 debug!(
296 action_type = %action.typ,
297 issuer = %id_tag,
298 "Creating ephemeral action (no persistence)"
299 );
300
301 let dsl = app.ext::<Arc<DslEngine>>()?;
302
303 let flags = action.flags.clone().or_else(|| {
305 dsl.get_behavior(action.typ.as_ref())
306 .and_then(|b| b.default_flags.as_ref())
307 .map(|f: &String| Box::from(f.as_str()))
308 });
309
310 let action_for_token = CreateAction {
311 typ: action.typ.clone(),
312 sub_typ: action.sub_typ.clone(),
313 parent_id: action.parent_id.clone(),
314 audience_tag: action.audience_tag.clone(),
315 content: action.content.clone(),
316 attachments: None, subject: action.subject.clone(),
318 expires_at: action.expires_at,
319 visibility: action.visibility,
320 flags,
321 x: None, };
323
324 let action_token = app.auth_adapter.create_action_token(tn_id, action_for_token).await?;
326 let action_id = hasher::hash("a", action_token.as_bytes());
327
328 let params = ForwardActionParams {
330 action_id: &action_id,
331 temp_id: None,
332 issuer_tag: id_tag,
333 audience_tag: action.audience_tag.as_deref(),
334 action_type: action.typ.as_ref(),
335 sub_type: action.sub_typ.as_deref(),
336 content: action.content.as_ref(),
337 attachments: None,
338 status: None,
339 };
340 let _result = forward::forward_outbound_action(app, tn_id, ¶ms).await;
341
342 schedule_delivery(app, tn_id, id_tag, &action_id, &action).await?;
344
345 info!(action_id = %action_id, "Ephemeral action created and forwarded");
346
347 Ok(action_id)
349}
350
351#[derive(Debug, Serialize, Deserialize)]
353pub struct ActionCreatorTask {
354 tn_id: TnId,
355 id_tag: Box<str>,
356 a_id: u64,
357 action: CreateAction,
358}
359
360impl ActionCreatorTask {
361 pub fn new(tn_id: TnId, id_tag: Box<str>, a_id: u64, action: CreateAction) -> Arc<Self> {
362 Arc::new(Self { tn_id, id_tag, a_id, action })
363 }
364}
365
366#[async_trait]
367impl Task<App> for ActionCreatorTask {
368 fn kind() -> &'static str {
369 "action.create"
370 }
371 fn kind_of(&self) -> &'static str {
372 Self::kind()
373 }
374
375 fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<App>>> {
376 let task: ActionCreatorTask = serde_json::from_str(ctx)?;
377 Ok(Arc::new(task))
378 }
379
380 fn serialize(&self) -> String {
381 serde_json::to_string(self).unwrap_or_else(|e| {
382 error!("Failed to serialize ActionCreatorTask: {}", e);
383 "{}".to_string()
384 })
385 }
386
387 async fn run(&self, app: &App) -> ClResult<()> {
388 info!(
389 "→ ACTION.CREATE: a_id={} type={} audience={}",
390 self.a_id,
391 self.action.typ,
392 self.action.audience_tag.as_deref().unwrap_or("-")
393 );
394
395 let attachments = resolve_attachments(app, self.tn_id, &self.action.attachments).await?;
397
398 if let Some(ref attachment_ids) = attachments {
400 for file_id in attachment_ids {
401 if let Err(e) =
402 upgrade_file_visibility(app, self.tn_id, file_id, self.action.visibility).await
403 {
404 warn!(
405 "Failed to upgrade visibility for file {}: {} - continuing anyway",
406 file_id, e
407 );
408 }
410 }
411 }
412
413 let subject = resolve_subject(app, self.tn_id, &self.action.subject).await?;
415
416 let resolved_audience = if self.action.audience_tag.is_none() {
419 helpers::resolve_parent_audience(
420 app.meta_adapter.as_ref(),
421 self.tn_id,
422 self.action.parent_id.as_deref(),
423 )
424 .await
425 } else {
426 None
427 };
428 let effective_audience = self.action.audience_tag.clone().or(resolved_audience);
429
430 let dsl = app.ext::<Arc<DslEngine>>()?;
431
432 let resolved_key = if subject.is_some()
434 && self.action.subject.as_ref().is_some_and(|s| s.starts_with('@'))
435 {
436 dsl.get_key_pattern(self.action.typ.as_ref()).map(|pattern| {
438 helpers::apply_key_pattern(
439 pattern,
440 self.action.typ.as_ref(),
441 &self.id_tag,
442 effective_audience.as_deref(),
443 self.action.parent_id.as_deref(),
444 subject.as_deref(),
445 )
446 })
447 } else {
448 None
449 };
450
451 let action_with_resolved = CreateAction {
453 audience_tag: effective_audience.clone(),
454 subject: subject.clone(),
455 ..self.action.clone()
456 };
457
458 let (action_id, action_token) =
460 generate_action_token(app, self.tn_id, &action_with_resolved, &attachments, &subject)
461 .await?;
462
463 let attachments_refs: Option<Vec<&str>> =
465 attachments.as_ref().map(|v| v.iter().map(|s| s.as_ref()).collect());
466 finalize_action(
467 app,
468 self.tn_id,
469 self.a_id,
470 &action_id,
471 &action_token,
472 meta_adapter::FinalizeActionOptions {
473 attachments: attachments_refs.as_deref(),
474 subject: subject.as_deref(),
475 audience_tag: effective_audience.as_deref(),
476 key: resolved_key.as_deref(),
477 },
478 )
479 .await?;
480
481 let temp_id = format!("@{}", self.a_id);
483
484 let action_for_processing = meta_adapter::Action {
486 action_id: action_id.clone(),
487 typ: action_with_resolved.typ.clone(),
488 sub_typ: action_with_resolved.sub_typ.clone(),
489 issuer_tag: self.id_tag.clone(),
490 parent_id: action_with_resolved.parent_id.clone(),
491 root_id: helpers::resolve_root_id(
492 app.meta_adapter.as_ref(),
493 self.tn_id,
494 action_with_resolved.parent_id.as_deref(),
495 )
496 .await,
497 audience_tag: action_with_resolved.audience_tag.clone(),
498 content: helpers::serialize_content(action_with_resolved.content.as_ref())
499 .map(|s| s.into_boxed_str()),
500 attachments: attachments.clone(),
501 subject: subject.clone(),
502 created_at: Timestamp::now(),
503 expires_at: action_with_resolved.expires_at,
504 visibility: action_with_resolved.visibility,
505 flags: action_with_resolved.flags.clone(),
506 x: action_with_resolved.x.clone(),
507 };
508
509 let attachment_views = if attachments.is_some() {
511 app.meta_adapter
512 .get_action(self.tn_id, &action_id)
513 .await
514 .ok()
515 .flatten()
516 .and_then(|a| a.attachments)
517 } else {
518 None
519 };
520
521 post_store::process_after_store(
522 app,
523 self.tn_id,
524 &action_for_processing,
525 attachment_views.as_deref(),
526 ProcessingContext::Outbound { temp_id: Some(temp_id.into()) },
527 )
528 .await?;
529
530 info!("← ACTION.CREATED: {} type={}", action_id, self.action.typ);
531 Ok(())
532 }
533}
534
535async fn resolve_attachments(
537 app: &App,
538 tn_id: TnId,
539 attachments: &Option<Vec<Box<str>>>,
540) -> ClResult<Option<Vec<Box<str>>>> {
541 let Some(attachments) = attachments else {
542 return Ok(None);
543 };
544
545 let mut resolved = Vec::with_capacity(attachments.len());
546 for a in attachments {
547 if let Some(f_id) = a.strip_prefix('@') {
548 let file_id = app.meta_adapter.get_file_id(tn_id, f_id.parse()?).await?;
549 resolved.push(file_id.clone());
550 } else {
551 resolved.push(a.clone());
552 }
553 }
554 Ok(Some(resolved))
555}
556
557async fn resolve_subject(
559 app: &App,
560 tn_id: TnId,
561 subject: &Option<Box<str>>,
562) -> ClResult<Option<Box<str>>> {
563 let Some(subject) = subject else {
564 return Ok(None);
565 };
566
567 if let Some(a_id_str) = subject.strip_prefix('@') {
568 let a_id: u64 = a_id_str.parse()?;
569 let action_id = app.meta_adapter.get_action_id(tn_id, a_id).await?;
570 Ok(Some(action_id))
571 } else {
572 Ok(Some(subject.clone()))
574 }
575}
576
577async fn generate_action_token(
579 app: &App,
580 tn_id: TnId,
581 action: &CreateAction,
582 attachments: &Option<Vec<Box<str>>>,
583 subject: &Option<Box<str>>,
584) -> ClResult<(Box<str>, Box<str>)> {
585 let dsl = app.ext::<Arc<DslEngine>>()?;
586
587 let flags = action.flags.clone().or_else(|| {
589 dsl.get_behavior(action.typ.as_ref())
590 .and_then(|b| b.default_flags.as_ref())
591 .map(|f: &String| Box::from(f.as_str()))
592 });
593
594 let action_for_token = CreateAction {
595 typ: action.typ.clone(),
596 sub_typ: action.sub_typ.clone(),
597 parent_id: action.parent_id.clone(),
598 audience_tag: action.audience_tag.clone(),
599 content: action.content.clone(),
600 attachments: attachments.clone(),
601 subject: subject.clone(), expires_at: action.expires_at,
603 visibility: action.visibility,
604 flags,
605 x: None, };
607
608 let action_token =
610 match app.auth_adapter.create_action_token(tn_id, action_for_token.clone()).await {
611 Ok(token) => token,
612 Err(Error::DbError) => {
613 info!("No signing key found for tenant {}, creating one automatically", tn_id.0);
615 app.auth_adapter.create_profile_key(tn_id, None).await?;
616 app.auth_adapter.create_action_token(tn_id, action_for_token).await?
617 }
618 Err(e) => return Err(e),
619 };
620 let action_id = hasher::hash("a", action_token.as_bytes());
621
622 Ok((action_id, action_token))
623}
624
625async fn finalize_action(
627 app: &App,
628 tn_id: TnId,
629 a_id: u64,
630 action_id: &str,
631 action_token: &str,
632 options: meta_adapter::FinalizeActionOptions<'_>,
633) -> ClResult<()> {
634 app.meta_adapter.finalize_action(tn_id, a_id, action_id, options).await?;
635 app.meta_adapter.store_action_token(tn_id, action_id, action_token).await?;
636
637 Ok(())
638}
639
640async fn determine_recipients(
642 _app: &App,
643 _tn_id: TnId,
644 id_tag: &str,
645 _action_id: &str,
646 action: &CreateAction,
647) -> ClResult<Vec<Box<str>>> {
648 if let Some(audience_tag) = &action.audience_tag {
650 if audience_tag.as_ref() != id_tag {
652 Ok(vec![audience_tag.clone()])
653 } else {
654 Ok(Vec::new())
655 }
656 } else {
657 Ok(Vec::new())
659 }
660}
661
662async fn schedule_delivery(
664 app: &App,
665 tn_id: TnId,
666 id_tag: &str,
667 action_id: &str,
668 action: &CreateAction,
669) -> ClResult<()> {
670 let dsl = app.ext::<Arc<DslEngine>>()?;
671
672 if action.typ.as_ref() == "APRV" {
674 if let Some(ref subject_id) = action.subject {
675 if let Ok(Some(subject_action)) = app.meta_adapter.get_action(tn_id, subject_id).await {
677 let subject_broadcast = dsl
678 .get_behavior(&subject_action.typ)
679 .and_then(|b| b.broadcast)
680 .unwrap_or(false);
681
682 if subject_broadcast {
683 debug!(
684 "APRV {} subject {} has broadcast=true, fanning out to followers",
685 action_id, subject_id
686 );
687 return schedule_broadcast_delivery(
690 app,
691 tn_id,
692 id_tag,
693 action_id,
694 Some(subject_id.as_ref()),
695 action.audience_tag.as_deref(),
696 )
697 .await;
698 }
699 }
700 }
701 }
702
703 let mut recipients = determine_recipients(app, tn_id, id_tag, action_id, action).await?;
705
706 let behavior = dsl.get_behavior(action.typ.as_ref());
708
709 let deliver_to_subject_owner =
712 behavior.as_ref().and_then(|b| b.deliver_to_subject_owner).unwrap_or(false);
713
714 if deliver_to_subject_owner {
715 if let Some(ref subject_id) = action.subject {
716 if let Ok(Some(subject_action)) = app.meta_adapter.get_action(tn_id, subject_id).await {
718 let subject_owner = &subject_action.issuer.id_tag;
719 if subject_owner.as_ref() != id_tag
721 && !recipients.iter().any(|r| r.as_ref() == subject_owner.as_ref())
722 {
723 info!(
724 "→ DUAL DELIVERY: Adding subject owner {} for {} (deliver_to_subject_owner)",
725 subject_owner, action_id
726 );
727 recipients.push(subject_owner.clone());
728 }
729 }
730 }
731 }
732
733 let fanout_recipients = schedule_subscriber_fanout(
736 app,
737 tn_id,
738 action_id,
739 action.parent_id.as_deref(),
740 id_tag, )
742 .await?;
743
744 for r in fanout_recipients {
747 if !recipients.iter().any(|existing| existing.as_ref() == r.as_ref()) {
748 recipients.push(r);
749 }
750 }
751
752 if !recipients.is_empty() {
753 let recipient_preview: Vec<&str> = recipients.iter().take(3).map(|s| s.as_ref()).collect();
755 if recipients.len() <= 3 {
756 info!("→ DELIVERY: {} → [{}]", action_id, recipient_preview.join(", "));
757 } else {
758 info!(
759 "→ DELIVERY: {} → {} recipients [{}...]",
760 action_id,
761 recipients.len(),
762 recipient_preview.join(", ")
763 );
764 }
765 }
766
767 let deliver_subject = behavior.as_ref().and_then(|b| b.deliver_subject).unwrap_or(false);
769
770 let related_action_id =
771 if deliver_subject { action.subject.as_deref().map(|s| s.into()) } else { None };
772
773 for recipient_tag in recipients {
774 debug!("Creating delivery task for action {} to {}", action_id, recipient_tag);
775
776 let delivery_task = ActionDeliveryTask::new_with_related(
777 tn_id,
778 action_id.into(),
779 recipient_tag.clone(),
780 recipient_tag.clone(),
781 related_action_id.clone(),
782 );
783
784 let task_key = format!("delivery:{}:{}", action_id, recipient_tag);
785 let retry_policy = RetryPolicy::new((10, 43200), 50);
786
787 app.scheduler
788 .task(delivery_task)
789 .key(&task_key)
790 .with_retry(retry_policy)
791 .schedule()
792 .await?;
793 }
794
795 Ok(())
796}
797
798async fn schedule_broadcast_delivery(
805 app: &App,
806 tn_id: TnId,
807 id_tag: &str,
808 action_id: &str,
809 related_action_id: Option<&str>,
810 author_id_tag: Option<&str>,
811) -> ClResult<()> {
812 let follower_actions = app
814 .meta_adapter
815 .list_actions(
816 tn_id,
817 &meta_adapter::ListActionOptions {
818 typ: Some(vec!["FLLW".into(), "CONN".into()]),
819 ..Default::default()
821 },
822 )
823 .await?;
824
825 let mut recipients: HashSet<Box<str>> = HashSet::new();
828 for action_view in follower_actions {
829 if action_view.status.as_deref() == Some("D") {
831 continue;
832 }
833 if action_view.issuer.id_tag.as_ref() != id_tag {
834 recipients.insert(action_view.issuer.id_tag.clone());
835 }
836 }
837
838 if let Some(author) = author_id_tag {
840 if author != id_tag {
841 recipients.insert(author.into());
842 }
843 }
844
845 let recipients_vec: Vec<&str> = recipients.iter().map(|s| s.as_ref()).collect();
847 let recipient_preview: Vec<&str> = recipients_vec.iter().take(3).copied().collect();
848 if !recipients.is_empty() {
849 if recipients.len() <= 3 {
850 info!("→ BROADCAST: {} → [{}]", action_id, recipient_preview.join(", "));
851 } else {
852 info!(
853 "→ BROADCAST: {} → {} recipients [{}...]",
854 action_id,
855 recipients.len(),
856 recipient_preview.join(", ")
857 );
858 }
859 }
860
861 let retry_policy = RetryPolicy::new((10, 43200), 50);
862 let related_box: Option<Box<str>> = related_action_id.map(|s| s.into());
863
864 for recipient_tag in recipients {
865 debug!("Creating broadcast delivery task for action {} to {}", action_id, recipient_tag);
866
867 let delivery_task = ActionDeliveryTask::new_with_related(
868 tn_id,
869 action_id.into(),
870 recipient_tag.clone(),
871 recipient_tag.clone(),
872 related_box.clone(),
873 );
874
875 let task_key = format!("delivery:{}:{}", action_id, recipient_tag);
876
877 app.scheduler
878 .task(delivery_task)
879 .key(&task_key)
880 .with_retry(retry_policy.clone())
881 .schedule()
882 .await?;
883 }
884
885 Ok(())
886}
887
888pub async fn schedule_subscriber_fanout(
907 app: &App,
908 tn_id: TnId,
909 action_id: &str,
910 parent_id: Option<&str>,
911 issuer: &str,
912) -> ClResult<Vec<Box<str>>> {
913 let Some(starting_parent) = parent_id else {
914 return Ok(Vec::new());
915 };
916
917 let our_id_tag: Box<str> = app.auth_adapter.read_id_tag(tn_id).await?;
919
920 let mut current_parent_id: Option<String> = Some(starting_parent.to_string());
923 let mut recipients = Vec::new();
924
925 while let Some(p_id) = current_parent_id.take() {
926 let Some(parent_action) = app.meta_adapter.get_action(tn_id, &p_id).await? else {
927 break; };
929
930 let subscribable = app
931 .ext::<Arc<DslEngine>>()?
932 .get_behavior(&parent_action.typ)
933 .and_then(|b| b.subscribable)
934 .unwrap_or(false);
935
936 if subscribable {
937 let is_local = match &parent_action.audience {
940 None => parent_action.issuer.id_tag.as_ref() == our_id_tag.as_ref(),
941 Some(aud) => aud.id_tag.as_ref() == our_id_tag.as_ref(),
942 };
943
944 if is_local {
945 let subs = app
947 .meta_adapter
948 .list_actions(
949 tn_id,
950 &meta_adapter::ListActionOptions {
951 typ: Some(vec!["SUBS".into()]),
952 subject: Some(p_id.clone()),
953 status: Some(vec!["A".into()]),
954 ..Default::default()
955 },
956 )
957 .await?;
958
959 for sub in subs {
960 let sub_tag = sub.issuer.id_tag.as_ref();
961 if sub_tag != our_id_tag.as_ref() && sub_tag != issuer {
963 recipients.push(sub.issuer.id_tag.clone());
964 }
965 }
966
967 if !recipients.is_empty() {
969 info!(
970 "→ SUBSCRIBER FAN-OUT: {} → {} recipients (root: {})",
971 action_id,
972 recipients.len(),
973 p_id
974 );
975
976 let retry_policy = RetryPolicy::new((10, 43200), 50);
977 for recipient_tag in &recipients {
978 let delivery_task = ActionDeliveryTask::new(
979 tn_id,
980 action_id.into(),
981 recipient_tag.clone(),
982 recipient_tag.clone(),
983 );
984 let task_key = format!("fanout:{}:{}", action_id, recipient_tag);
985 app.scheduler
986 .task(delivery_task)
987 .key(&task_key)
988 .with_retry(retry_policy.clone())
989 .schedule()
990 .await?;
991 }
992 }
993 }
994 break; }
996
997 current_parent_id = parent_action.parent_id.map(|p| p.to_string());
999 }
1000
1001 Ok(recipients)
1002}
1003
1004#[derive(Debug, Serialize, Deserialize)]
1006pub struct ActionVerifierTask {
1007 tn_id: TnId,
1008 token: Box<str>,
1009 client_address: Option<Box<str>>,
1011}
1012
1013impl ActionVerifierTask {
1014 pub fn new(tn_id: TnId, token: Box<str>, client_address: Option<Box<str>>) -> Arc<Self> {
1015 Arc::new(Self { tn_id, token, client_address })
1016 }
1017}
1018
1019#[async_trait]
1020impl Task<App> for ActionVerifierTask {
1021 fn kind() -> &'static str {
1022 "action.verify"
1023 }
1024 fn kind_of(&self) -> &'static str {
1025 Self::kind()
1026 }
1027
1028 fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<App>>> {
1029 let parts: Vec<&str> = ctx.splitn(3, ',').collect();
1031 if parts.len() < 2 {
1032 return Err(Error::Internal("invalid ActionVerifier context format".into()));
1033 }
1034 let tn_id = TnId(parts[0].parse()?);
1035 let token = parts[1].into();
1036 let client_address = parts.get(2).map(|&s| s.into());
1037 let task = ActionVerifierTask::new(tn_id, token, client_address);
1038 Ok(task)
1039 }
1040
1041 fn serialize(&self) -> String {
1042 match &self.client_address {
1043 Some(addr) => format!("{},{},{}", self.tn_id.0, self.token, addr),
1044 None => format!("{},{}", self.tn_id.0, self.token),
1045 }
1046 }
1047
1048 async fn run(&self, app: &App) -> ClResult<()> {
1049 let action_id = hasher::hash("a", self.token.as_bytes());
1050 debug!("Running task action.verify {}", action_id);
1051
1052 process::process_inbound_action_token(
1053 app,
1054 self.tn_id,
1055 &action_id,
1056 &self.token,
1057 false,
1058 self.client_address.as_ref().map(|s| s.to_string()),
1059 )
1060 .await?;
1061
1062 debug!("Finished task action.verify {}", action_id);
1063 Ok(())
1064 }
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069 use super::*;
1070
1071 #[test]
1072 fn test_create_action_struct() {
1073 let action = CreateAction {
1074 typ: "MSG".into(),
1075 audience_tag: Some("bob.example.com".into()),
1076 content: Some(serde_json::Value::String("Hello".to_string())),
1077 ..Default::default()
1078 };
1079
1080 assert_eq!(action.typ.as_ref(), "MSG");
1081 assert_eq!(action.content, Some(serde_json::Value::String("Hello".to_string())));
1082 assert_eq!(action.audience_tag.as_deref(), Some("bob.example.com"));
1083 }
1084
1085 #[test]
1086 fn test_create_action_without_audience() {
1087 let action = CreateAction {
1088 typ: "POST".into(),
1089 sub_typ: Some("TEXT".into()),
1090 content: Some(serde_json::Value::String("Hello world".to_string())),
1091 flags: Some("RC".into()), ..Default::default()
1093 };
1094
1095 assert_eq!(action.typ.as_ref(), "POST");
1096 assert!(action.audience_tag.is_none());
1097 assert_eq!(action.flags.as_deref(), Some("RC"));
1098 }
1099}
1100
1101