Skip to main content

cloudillo_action/
task.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use std::collections::HashSet;
4use std::sync::Arc;
5
6// Re-export from cloudillo-types for backward compatibility
7pub use cloudillo_types::action_types::{CreateAction, ACCESS_TOKEN_EXPIRY};
8
9use cloudillo_core::scheduler::{RetryPolicy, Task, TaskId};
10use cloudillo_file::descriptor;
11use cloudillo_file::management::upgrade_file_visibility;
12use cloudillo_types::hasher;
13use cloudillo_types::meta_adapter;
14
15use crate::{
16	delivery::ActionDeliveryTask,
17	dsl::DslEngine,
18	helpers,
19	post_store::{self, ProcessingContext},
20	prelude::*,
21	process,
22};
23
24pub async fn create_action(
25	app: &App,
26	tn_id: TnId,
27	id_tag: &str,
28	action: CreateAction,
29) -> ClResult<Box<str>> {
30	let dsl = app.ext::<Arc<DslEngine>>()?;
31
32	// Check if this is an ephemeral action type
33	let is_ephemeral = dsl
34		.get_behavior(action.typ.as_ref())
35		.map(|b| b.ephemeral.unwrap_or(false))
36		.unwrap_or(false);
37
38	if is_ephemeral {
39		return create_ephemeral_action(app, tn_id, id_tag, action).await;
40	}
41
42	// Get behavior flags for validation
43	let behavior = dsl.get_behavior(action.typ.as_ref());
44
45	// Outbound validation: allow_unknown
46	// If false, we can only send to recipients we have a relationship with
47	let allow_unknown = behavior.as_ref().and_then(|b| b.allow_unknown).unwrap_or(false);
48	if !allow_unknown {
49		if let Some(ref audience_tag) = action.audience_tag {
50			// Skip validation if audience is ourselves
51			if audience_tag.as_ref() != id_tag {
52				let has_relationship = app
53					.meta_adapter
54					.read_profile(tn_id, audience_tag)
55					.await
56					.ok()
57					.map(|(_, p)| p.following || p.connected.is_connected())
58					.unwrap_or(false);
59
60				if !has_relationship {
61					return Err(Error::ValidationError(format!(
62						"Cannot send {} to unknown recipient {}",
63						action.typ, audience_tag
64					)));
65				}
66			}
67		}
68	}
69
70	// Outbound validation: requires_subscription
71	// If true, we must have an active subscription to the target action (or be the creator)
72	let requires_subscription =
73		behavior.as_ref().and_then(|b| b.requires_subscription).unwrap_or(false);
74	if requires_subscription {
75		let target_id = action.subject.as_deref().or(action.parent_id.as_deref());
76		if let Some(target_id) = target_id {
77			// Skip validation for @temp_id references (will be resolved later)
78			if !target_id.starts_with('@') {
79				// Get the target action to check if we're the creator
80				let target_action = app.meta_adapter.get_action(tn_id, target_id).await?;
81				if let Some(target) = target_action {
82					// If we're the target action's creator, we always have permission
83					if target.issuer.id_tag.as_ref() != id_tag {
84						// Check for active subscription
85						let subs_key = format!("SUBS:{}:{}", target_id, id_tag);
86						let subscription =
87							app.meta_adapter.get_action_by_key(tn_id, &subs_key).await?;
88
89						if subscription.is_none() {
90							// Also check root action subscription if target has a root
91							let root_sub = if let Some(root_id) = &target.root_id {
92								let root_subs_key = format!("SUBS:{}:{}", root_id, id_tag);
93								app.meta_adapter.get_action_by_key(tn_id, &root_subs_key).await?
94							} else {
95								None
96							};
97
98							if root_sub.is_none() {
99								return Err(Error::ValidationError(format!(
100									"Cannot send {} without subscription to {}",
101									action.typ, target_id
102								)));
103							}
104						}
105					}
106				}
107			}
108		}
109	}
110
111	// Outbound validation: generic flag gating from BehaviorFlags
112	{
113		let (_action_type, sub_type) = helpers::extract_type_and_subtype(&action.typ);
114		let is_delete = sub_type.as_deref() == Some("DEL");
115
116		if !is_delete {
117			if let Some(flag) = behavior.as_ref().and_then(|b| b.gated_by_parent_flag) {
118				if let Some(ref parent_id) = action.parent_id {
119					if !parent_id.starts_with('@') {
120						if let Ok(Some(parent)) =
121							app.meta_adapter.get_action(tn_id, parent_id).await
122						{
123							if !helpers::is_capability_enabled(parent.flags.as_deref(), flag) {
124								return Err(Error::ValidationError(format!(
125									"{} is disabled on the parent action",
126									action.typ
127								)));
128							}
129						}
130					}
131				}
132			}
133			if let Some(flag) = behavior.as_ref().and_then(|b| b.gated_by_subject_flag) {
134				if let Some(ref subject_id) = action.subject {
135					if !subject_id.starts_with('@') {
136						if let Ok(Some(subject)) =
137							app.meta_adapter.get_action(tn_id, subject_id).await
138						{
139							if !helpers::is_capability_enabled(subject.flags.as_deref(), flag) {
140								return Err(Error::ValidationError(format!(
141									"{} is disabled on the subject action",
142									action.typ
143								)));
144							}
145						}
146					}
147				}
148			}
149		}
150	}
151
152	// Serialize content Value to string for storage (always JSON-encode)
153	let content_str = helpers::serialize_content(action.content.as_ref());
154
155	// Resolve visibility: explicit > parent inheritance > user default > 'F'
156	let visibility = helpers::inherit_visibility(
157		app.meta_adapter.as_ref(),
158		tn_id,
159		action.visibility,
160		action.parent_id.as_deref(),
161	)
162	.await;
163
164	// If no visibility from explicit or parent, use user's default setting
165	let visibility = if visibility.is_some() {
166		visibility
167	} else {
168		// Get user's default visibility setting
169		match app.settings.get_string(tn_id, "privacy.default_visibility").await {
170			Ok(default_vis) => default_vis.chars().next(),
171			Err(_) => Some('F'), // Fallback to Followers
172		}
173	};
174
175	// Open actions (uppercase 'O' flag) should be Connected visibility for discoverability
176	let visibility = if helpers::is_open(action.flags.as_deref()) {
177		Some('C') // Connected visibility for open groups
178	} else {
179		visibility
180	};
181
182	// Resolve root_id from parent chain (auto-populated, not client-specified)
183	let root_id =
184		helpers::resolve_root_id(app.meta_adapter.as_ref(), tn_id, action.parent_id.as_deref())
185			.await;
186
187	// Create pending action in database with a_id (action_id is NULL at this point)
188	let pending_action = meta_adapter::Action {
189		action_id: "", // Empty action_id for pending actions
190		issuer_tag: id_tag,
191		typ: action.typ.as_ref(),
192		sub_typ: action.sub_typ.as_deref(),
193		parent_id: action.parent_id.as_deref(),
194		root_id: root_id.as_deref(),
195		audience_tag: action.audience_tag.as_deref(),
196		content: content_str.as_deref(),
197		attachments: action.attachments.as_ref().map(|v| v.iter().map(|a| a.as_ref()).collect()),
198		subject: action.subject.as_deref(),
199		expires_at: action.expires_at,
200		created_at: Timestamp::now(),
201		visibility,
202		flags: action.flags.as_deref(),
203		x: action.x.clone(),
204	};
205
206	// Generate key from key_pattern for deduplication (e.g., REACT uses {type}:{parent}:{issuer})
207	let key = dsl.get_key_pattern(action.typ.as_ref()).map(|pattern| {
208		helpers::apply_key_pattern(
209			pattern,
210			action.typ.as_ref(),
211			id_tag,
212			action.audience_tag.as_deref(),
213			action.parent_id.as_deref(),
214			action.subject.as_deref(),
215		)
216	});
217	let action_result =
218		app.meta_adapter.create_action(tn_id, &pending_action, key.as_deref()).await?;
219
220	let a_id = match action_result {
221		meta_adapter::ActionId::AId(a_id) => a_id,
222		meta_adapter::ActionId::ActionId(_) => {
223			// This shouldn't happen for new actions
224			return Err(Error::Internal("Unexpected ActionId result".into()));
225		}
226	};
227
228	// Collect file attachment dependencies
229	let attachments_to_wait = if let Some(attachments) = &action.attachments {
230		attachments
231			.iter()
232			.filter(|a| a.starts_with("@"))
233			.map(|a| format!("{},{}", tn_id, &a[1..]).into_boxed_str())
234			.collect::<Vec<_>>()
235	} else {
236		Vec::new()
237	};
238
239	// Collect subject dependency if it references a pending action
240	let subject_key = action.subject.as_ref().and_then(|s| {
241		s.strip_prefix('@')
242			.map(|a_id_str| format!("{},{}", tn_id, a_id_str).into_boxed_str())
243	});
244
245	debug!(
246		"Dependencies for a_id={}: attachments={:?}, subject={:?}",
247		a_id, attachments_to_wait, subject_key
248	);
249
250	// Get file task dependencies
251	let file_deps = app
252		.meta_adapter
253		.list_task_ids(
254			descriptor::FileIdGeneratorTask::kind(),
255			&attachments_to_wait.into_boxed_slice(),
256		)
257		.await?;
258
259	// Get subject action task dependencies
260	let subject_deps = if let Some(ref key) = subject_key {
261		let keys = vec![key.clone()];
262		app.meta_adapter.list_task_ids(ActionCreatorTask::kind(), &keys).await?
263	} else {
264		Vec::new()
265	};
266
267	// Combine all dependencies
268	let mut deps = file_deps;
269	deps.extend(subject_deps);
270	debug!("Task dependencies: {:?}", deps);
271
272	// Create ActionCreatorTask to finalize the action
273	let task = ActionCreatorTask::new(tn_id, Box::from(id_tag), a_id, action);
274	app.scheduler
275		.task(task)
276		.key(format!("{},{}", tn_id, a_id))
277		.depend_on(deps)
278		.schedule()
279		.await?;
280
281	// Return @{a_id} placeholder
282	Ok(format!("@{}", a_id).into_boxed_str())
283}
284
285/// Create an ephemeral action (forward only, no persistence)
286/// Used for PRES (presence), CSIG (call signaling), and other transient actions
287async fn create_ephemeral_action(
288	app: &App,
289	tn_id: TnId,
290	id_tag: &str,
291	action: CreateAction,
292) -> ClResult<Box<str>> {
293	use crate::forward::{self, ForwardActionParams};
294
295	debug!(
296		action_type = %action.typ,
297		issuer = %id_tag,
298		"Creating ephemeral action (no persistence)"
299	);
300
301	let dsl = app.ext::<Arc<DslEngine>>()?;
302
303	// Resolve flags: explicit > default_flags from action type definition
304	let flags = action.flags.clone().or_else(|| {
305		dsl.get_behavior(action.typ.as_ref())
306			.and_then(|b| b.default_flags.as_ref())
307			.map(|f: &String| Box::from(f.as_str()))
308	});
309
310	let action_for_token = CreateAction {
311		typ: action.typ.clone(),
312		sub_typ: action.sub_typ.clone(),
313		parent_id: action.parent_id.clone(),
314		audience_tag: action.audience_tag.clone(),
315		content: action.content.clone(),
316		attachments: None, // Ephemeral actions shouldn't have attachments
317		subject: action.subject.clone(),
318		expires_at: action.expires_at,
319		visibility: action.visibility,
320		flags,
321		x: None, // Ephemeral actions don't use x metadata
322	};
323
324	// Generate action token
325	let action_token = app.auth_adapter.create_action_token(tn_id, action_for_token).await?;
326	let action_id = hasher::hash("a", action_token.as_bytes());
327
328	// Forward to connected WebSocket clients
329	let params = ForwardActionParams {
330		action_id: &action_id,
331		temp_id: None,
332		issuer_tag: id_tag,
333		audience_tag: action.audience_tag.as_deref(),
334		action_type: action.typ.as_ref(),
335		sub_type: action.sub_typ.as_deref(),
336		content: action.content.as_ref(),
337		attachments: None,
338		status: None,
339	};
340	let _result = forward::forward_outbound_action(app, tn_id, &params).await;
341
342	// Schedule delivery to remote recipients (if any)
343	schedule_delivery(app, tn_id, id_tag, &action_id, &action).await?;
344
345	info!(action_id = %action_id, "Ephemeral action created and forwarded");
346
347	// Return the action_id directly (not a placeholder)
348	Ok(action_id)
349}
350
351/// Action creator Task - finalizes pending actions
352#[derive(Debug, Serialize, Deserialize)]
353pub struct ActionCreatorTask {
354	tn_id: TnId,
355	id_tag: Box<str>,
356	a_id: u64,
357	action: CreateAction,
358}
359
360impl ActionCreatorTask {
361	pub fn new(tn_id: TnId, id_tag: Box<str>, a_id: u64, action: CreateAction) -> Arc<Self> {
362		Arc::new(Self { tn_id, id_tag, a_id, action })
363	}
364}
365
366#[async_trait]
367impl Task<App> for ActionCreatorTask {
368	fn kind() -> &'static str {
369		"action.create"
370	}
371	fn kind_of(&self) -> &'static str {
372		Self::kind()
373	}
374
375	fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<App>>> {
376		let task: ActionCreatorTask = serde_json::from_str(ctx)?;
377		Ok(Arc::new(task))
378	}
379
380	fn serialize(&self) -> String {
381		serde_json::to_string(self).unwrap_or_else(|e| {
382			error!("Failed to serialize ActionCreatorTask: {}", e);
383			"{}".to_string()
384		})
385	}
386
387	async fn run(&self, app: &App) -> ClResult<()> {
388		info!(
389			"→ ACTION.CREATE: a_id={} type={} audience={}",
390			self.a_id,
391			self.action.typ,
392			self.action.audience_tag.as_deref().unwrap_or("-")
393		);
394
395		// 1. Resolve file attachments
396		let attachments = resolve_attachments(app, self.tn_id, &self.action.attachments).await?;
397
398		// 1b. Upgrade attachment visibility to match action visibility
399		if let Some(ref attachment_ids) = attachments {
400			for file_id in attachment_ids {
401				if let Err(e) =
402					upgrade_file_visibility(app, self.tn_id, file_id, self.action.visibility).await
403				{
404					warn!(
405						"Failed to upgrade visibility for file {}: {} - continuing anyway",
406						file_id, e
407					);
408					// Continue - don't fail action creation due to visibility upgrade
409				}
410			}
411		}
412
413		// 1c. Resolve subject reference (@a_id → action_id)
414		let subject = resolve_subject(app, self.tn_id, &self.action.subject).await?;
415
416		// 1d. Resolve audience from parent action if not explicitly set
417		// This enables federation for conversation messages (MSG in CONV) and similar hierarchical actions
418		let resolved_audience = if self.action.audience_tag.is_none() {
419			helpers::resolve_parent_audience(
420				app.meta_adapter.as_ref(),
421				self.tn_id,
422				self.action.parent_id.as_deref(),
423			)
424			.await
425		} else {
426			None
427		};
428		let effective_audience = self.action.audience_tag.clone().or(resolved_audience);
429
430		let dsl = app.ext::<Arc<DslEngine>>()?;
431
432		// 1e. Regenerate key if subject was resolved (changed from @xxx to a1~xxx)
433		let resolved_key = if subject.is_some()
434			&& self.action.subject.as_ref().is_some_and(|s| s.starts_with('@'))
435		{
436			// Subject was a reference that got resolved - regenerate the key
437			dsl.get_key_pattern(self.action.typ.as_ref()).map(|pattern| {
438				helpers::apply_key_pattern(
439					pattern,
440					self.action.typ.as_ref(),
441					&self.id_tag,
442					effective_audience.as_deref(),
443					self.action.parent_id.as_deref(),
444					subject.as_deref(),
445				)
446			})
447		} else {
448			None
449		};
450
451		// Create a modified action with resolved audience and subject for token generation and delivery
452		let action_with_resolved = CreateAction {
453			audience_tag: effective_audience.clone(),
454			subject: subject.clone(),
455			..self.action.clone()
456		};
457
458		// 2. Generate action token and get action_id
459		let (action_id, action_token) =
460			generate_action_token(app, self.tn_id, &action_with_resolved, &attachments, &subject)
461				.await?;
462
463		// 3. Finalize action in database (including resolved audience)
464		let attachments_refs: Option<Vec<&str>> =
465			attachments.as_ref().map(|v| v.iter().map(|s| s.as_ref()).collect());
466		finalize_action(
467			app,
468			self.tn_id,
469			self.a_id,
470			&action_id,
471			&action_token,
472			meta_adapter::FinalizeActionOptions {
473				attachments: attachments_refs.as_deref(),
474				subject: subject.as_deref(),
475				audience_tag: effective_audience.as_deref(),
476				key: resolved_key.as_deref(),
477			},
478		)
479		.await?;
480
481		// 4. Process after storage (unified: hooks, WebSocket, fanout, delivery)
482		let temp_id = format!("@{}", self.a_id);
483
484		// Build Action struct for unified processing
485		let action_for_processing = meta_adapter::Action {
486			action_id: action_id.clone(),
487			typ: action_with_resolved.typ.clone(),
488			sub_typ: action_with_resolved.sub_typ.clone(),
489			issuer_tag: self.id_tag.clone(),
490			parent_id: action_with_resolved.parent_id.clone(),
491			root_id: helpers::resolve_root_id(
492				app.meta_adapter.as_ref(),
493				self.tn_id,
494				action_with_resolved.parent_id.as_deref(),
495			)
496			.await,
497			audience_tag: action_with_resolved.audience_tag.clone(),
498			content: helpers::serialize_content(action_with_resolved.content.as_ref())
499				.map(|s| s.into_boxed_str()),
500			attachments: attachments.clone(),
501			subject: subject.clone(),
502			created_at: Timestamp::now(),
503			expires_at: action_with_resolved.expires_at,
504			visibility: action_with_resolved.visibility,
505			flags: action_with_resolved.flags.clone(),
506			x: action_with_resolved.x.clone(),
507		};
508
509		// Fetch attachment views (with dimensions) for WebSocket forwarding
510		let attachment_views = if attachments.is_some() {
511			app.meta_adapter
512				.get_action(self.tn_id, &action_id)
513				.await
514				.ok()
515				.flatten()
516				.and_then(|a| a.attachments)
517		} else {
518			None
519		};
520
521		post_store::process_after_store(
522			app,
523			self.tn_id,
524			&action_for_processing,
525			attachment_views.as_deref(),
526			ProcessingContext::Outbound { temp_id: Some(temp_id.into()) },
527		)
528		.await?;
529
530		info!("← ACTION.CREATED: {} type={}", action_id, self.action.typ);
531		Ok(())
532	}
533}
534
535/// Resolve file attachment references (@f_id → file_id)
536async fn resolve_attachments(
537	app: &App,
538	tn_id: TnId,
539	attachments: &Option<Vec<Box<str>>>,
540) -> ClResult<Option<Vec<Box<str>>>> {
541	let Some(attachments) = attachments else {
542		return Ok(None);
543	};
544
545	let mut resolved = Vec::with_capacity(attachments.len());
546	for a in attachments {
547		if let Some(f_id) = a.strip_prefix('@') {
548			let file_id = app.meta_adapter.get_file_id(tn_id, f_id.parse()?).await?;
549			resolved.push(file_id.clone());
550		} else {
551			resolved.push(a.clone());
552		}
553	}
554	Ok(Some(resolved))
555}
556
557/// Resolve subject reference (@a_id → action_id)
558async fn resolve_subject(
559	app: &App,
560	tn_id: TnId,
561	subject: &Option<Box<str>>,
562) -> ClResult<Option<Box<str>>> {
563	let Some(subject) = subject else {
564		return Ok(None);
565	};
566
567	if let Some(a_id_str) = subject.strip_prefix('@') {
568		let a_id: u64 = a_id_str.parse()?;
569		let action_id = app.meta_adapter.get_action_id(tn_id, a_id).await?;
570		Ok(Some(action_id))
571	} else {
572		// Already resolved or external action ID
573		Ok(Some(subject.clone()))
574	}
575}
576
577/// Generate action token and compute action_id
578async fn generate_action_token(
579	app: &App,
580	tn_id: TnId,
581	action: &CreateAction,
582	attachments: &Option<Vec<Box<str>>>,
583	subject: &Option<Box<str>>,
584) -> ClResult<(Box<str>, Box<str>)> {
585	let dsl = app.ext::<Arc<DslEngine>>()?;
586
587	// Resolve flags: explicit > default_flags from action type definition
588	let flags = action.flags.clone().or_else(|| {
589		dsl.get_behavior(action.typ.as_ref())
590			.and_then(|b| b.default_flags.as_ref())
591			.map(|f: &String| Box::from(f.as_str()))
592	});
593
594	let action_for_token = CreateAction {
595		typ: action.typ.clone(),
596		sub_typ: action.sub_typ.clone(),
597		parent_id: action.parent_id.clone(),
598		audience_tag: action.audience_tag.clone(),
599		content: action.content.clone(),
600		attachments: attachments.clone(),
601		subject: subject.clone(), // Use resolved subject
602		expires_at: action.expires_at,
603		visibility: action.visibility,
604		flags,
605		x: None, // x is stored in DB but not in JWT token
606	};
607
608	// Try to create action token, if it fails due to missing key, create one and retry
609	let action_token =
610		match app.auth_adapter.create_action_token(tn_id, action_for_token.clone()).await {
611			Ok(token) => token,
612			Err(Error::DbError) => {
613				// Key might be missing - create one and retry
614				info!("No signing key found for tenant {}, creating one automatically", tn_id.0);
615				app.auth_adapter.create_profile_key(tn_id, None).await?;
616				app.auth_adapter.create_action_token(tn_id, action_for_token).await?
617			}
618			Err(e) => return Err(e),
619		};
620	let action_id = hasher::hash("a", action_token.as_bytes());
621
622	Ok((action_id, action_token))
623}
624
625/// Finalize action in database and store token
626async fn finalize_action(
627	app: &App,
628	tn_id: TnId,
629	a_id: u64,
630	action_id: &str,
631	action_token: &str,
632	options: meta_adapter::FinalizeActionOptions<'_>,
633) -> ClResult<()> {
634	app.meta_adapter.finalize_action(tn_id, a_id, action_id, options).await?;
635	app.meta_adapter.store_action_token(tn_id, action_id, action_token).await?;
636
637	Ok(())
638}
639
640/// Determine delivery recipients for federation (direct messaging only)
641async fn determine_recipients(
642	_app: &App,
643	_tn_id: TnId,
644	id_tag: &str,
645	_action_id: &str,
646	action: &CreateAction,
647) -> ClResult<Vec<Box<str>>> {
648	// Only deliver to specific audience (no broadcast to followers)
649	if let Some(audience_tag) = &action.audience_tag {
650		// Don't send to self
651		if audience_tag.as_ref() != id_tag {
652			Ok(vec![audience_tag.clone()])
653		} else {
654			Ok(Vec::new())
655		}
656	} else {
657		// No audience - nothing to deliver
658		Ok(Vec::new())
659	}
660}
661
662/// Schedule delivery tasks for all recipients
663async fn schedule_delivery(
664	app: &App,
665	tn_id: TnId,
666	id_tag: &str,
667	action_id: &str,
668	action: &CreateAction,
669) -> ClResult<()> {
670	let dsl = app.ext::<Arc<DslEngine>>()?;
671
672	// For APRV actions: check if the subject action should be broadcast to followers
673	if action.typ.as_ref() == "APRV" {
674		if let Some(ref subject_id) = action.subject {
675			// Get the subject action to check its broadcast behavior
676			if let Ok(Some(subject_action)) = app.meta_adapter.get_action(tn_id, subject_id).await {
677				let subject_broadcast = dsl
678					.get_behavior(&subject_action.typ)
679					.and_then(|b| b.broadcast)
680					.unwrap_or(false);
681
682				if subject_broadcast {
683					debug!(
684						"APRV {} subject {} has broadcast=true, fanning out to followers",
685						action_id, subject_id
686					);
687					// Fan-out APRV to followers with related action (the approved POST)
688					// Also send to author (audience) who may not be a follower
689					return schedule_broadcast_delivery(
690						app,
691						tn_id,
692						id_tag,
693						action_id,
694						Some(subject_id.as_ref()),
695						action.audience_tag.as_deref(),
696					)
697					.await;
698				}
699			}
700		}
701	}
702
703	// Standard delivery: send to specific audience only
704	let mut recipients = determine_recipients(app, tn_id, id_tag, action_id, action).await?;
705
706	// Get behavior flags
707	let behavior = dsl.get_behavior(action.typ.as_ref());
708
709	// Check if this action type should also deliver to subject's owner
710	// This is used by INVT to deliver to both invitee AND the CONV home
711	let deliver_to_subject_owner =
712		behavior.as_ref().and_then(|b| b.deliver_to_subject_owner).unwrap_or(false);
713
714	if deliver_to_subject_owner {
715		if let Some(ref subject_id) = action.subject {
716			// Look up the subject action to find its owner
717			if let Ok(Some(subject_action)) = app.meta_adapter.get_action(tn_id, subject_id).await {
718				let subject_owner = &subject_action.issuer.id_tag;
719				// Add subject owner if not already in recipients and not self
720				if subject_owner.as_ref() != id_tag
721					&& !recipients.iter().any(|r| r.as_ref() == subject_owner.as_ref())
722				{
723					info!(
724						"→ DUAL DELIVERY: Adding subject owner {} for {} (deliver_to_subject_owner)",
725						subject_owner, action_id
726					);
727					recipients.push(subject_owner.clone());
728				}
729			}
730		}
731	}
732
733	// Fan-out to subscribers of subscribable parent chain (e.g., MSG in CONV)
734	// This schedules its own delivery tasks and returns the list for logging
735	let fanout_recipients = schedule_subscriber_fanout(
736		app,
737		tn_id,
738		action_id,
739		action.parent_id.as_deref(),
740		id_tag, // issuer = ourselves for outbound
741	)
742	.await?;
743
744	// Add fanout recipients to the list for logging purposes
745	// (delivery tasks were already scheduled by schedule_subscriber_fanout)
746	for r in fanout_recipients {
747		if !recipients.iter().any(|existing| existing.as_ref() == r.as_ref()) {
748			recipients.push(r);
749		}
750	}
751
752	if !recipients.is_empty() {
753		// Log summary with up to 3 recipient names
754		let recipient_preview: Vec<&str> = recipients.iter().take(3).map(|s| s.as_ref()).collect();
755		if recipients.len() <= 3 {
756			info!("→ DELIVERY: {} → [{}]", action_id, recipient_preview.join(", "));
757		} else {
758			info!(
759				"→ DELIVERY: {} → {} recipients [{}...]",
760				action_id,
761				recipients.len(),
762				recipient_preview.join(", ")
763			);
764		}
765	}
766
767	// Check if this action type should deliver its subject along with it
768	let deliver_subject = behavior.as_ref().and_then(|b| b.deliver_subject).unwrap_or(false);
769
770	let related_action_id =
771		if deliver_subject { action.subject.as_deref().map(|s| s.into()) } else { None };
772
773	for recipient_tag in recipients {
774		debug!("Creating delivery task for action {} to {}", action_id, recipient_tag);
775
776		let delivery_task = ActionDeliveryTask::new_with_related(
777			tn_id,
778			action_id.into(),
779			recipient_tag.clone(),
780			recipient_tag.clone(),
781			related_action_id.clone(),
782		);
783
784		let task_key = format!("delivery:{}:{}", action_id, recipient_tag);
785		let retry_policy = RetryPolicy::new((10, 43200), 50);
786
787		app.scheduler
788			.task(delivery_task)
789			.key(&task_key)
790			.with_retry(retry_policy)
791			.schedule()
792			.await?;
793	}
794
795	Ok(())
796}
797
798/// Schedule broadcast delivery to followers (used for APRV fan-out)
799///
800/// This delivers an action to all followers (entities that have FLLW or CONN actions pointing to us),
801/// with an optional related action token included (for APRV, this is the approved action).
802///
803/// Also sends a direct delivery to the author if specified (they may not be a follower).
804async fn schedule_broadcast_delivery(
805	app: &App,
806	tn_id: TnId,
807	id_tag: &str,
808	action_id: &str,
809	related_action_id: Option<&str>,
810	author_id_tag: Option<&str>,
811) -> ClResult<()> {
812	// Query for followers (entities that issued FLLW or CONN actions to us)
813	let follower_actions = app
814		.meta_adapter
815		.list_actions(
816			tn_id,
817			&meta_adapter::ListActionOptions {
818				typ: Some(vec!["FLLW".into(), "CONN".into()]),
819				// Don't filter by status - we'll exclude deleted ('D') in the loop
820				..Default::default()
821			},
822		)
823		.await?;
824
825	// Extract unique follower id_tags (excluding self and deleted connections)
826	// Anyone who sent us a CONN/FLLW request is a follower (unless deleted)
827	let mut recipients: HashSet<Box<str>> = HashSet::new();
828	for action_view in follower_actions {
829		// Skip deleted connections
830		if action_view.status.as_deref() == Some("D") {
831			continue;
832		}
833		if action_view.issuer.id_tag.as_ref() != id_tag {
834			recipients.insert(action_view.issuer.id_tag.clone());
835		}
836	}
837
838	// Always send to author (they need to know their action was approved, even if not a follower)
839	if let Some(author) = author_id_tag {
840		if author != id_tag {
841			recipients.insert(author.into());
842		}
843	}
844
845	// Log summary with up to 3 recipient names
846	let recipients_vec: Vec<&str> = recipients.iter().map(|s| s.as_ref()).collect();
847	let recipient_preview: Vec<&str> = recipients_vec.iter().take(3).copied().collect();
848	if !recipients.is_empty() {
849		if recipients.len() <= 3 {
850			info!("→ BROADCAST: {} → [{}]", action_id, recipient_preview.join(", "));
851		} else {
852			info!(
853				"→ BROADCAST: {} → {} recipients [{}...]",
854				action_id,
855				recipients.len(),
856				recipient_preview.join(", ")
857			);
858		}
859	}
860
861	let retry_policy = RetryPolicy::new((10, 43200), 50);
862	let related_box: Option<Box<str>> = related_action_id.map(|s| s.into());
863
864	for recipient_tag in recipients {
865		debug!("Creating broadcast delivery task for action {} to {}", action_id, recipient_tag);
866
867		let delivery_task = ActionDeliveryTask::new_with_related(
868			tn_id,
869			action_id.into(),
870			recipient_tag.clone(),
871			recipient_tag.clone(),
872			related_box.clone(),
873		);
874
875		let task_key = format!("delivery:{}:{}", action_id, recipient_tag);
876
877		app.scheduler
878			.task(delivery_task)
879			.key(&task_key)
880			.with_retry(retry_policy.clone())
881			.schedule()
882			.await?;
883	}
884
885	Ok(())
886}
887
888/// Schedule fan-out delivery to subscribers of a subscribable parent chain.
889///
890/// Used by both:
891/// - Outbound: `schedule_delivery()` for locally created actions
892/// - Inbound: `process.rs` for received actions that need re-delivery
893///
894/// Walks up the parent chain until finding a subscribable action (e.g., CONV).
895/// If that action is "local" (we own it), fans out to all subscribers.
896///
897/// # Arguments
898/// * `app` - Application state
899/// * `tn_id` - Tenant ID
900/// * `action_id` - The action being delivered
901/// * `parent_id` - Starting point for parent chain walk (may be None)
902/// * `issuer` - Action issuer to exclude from delivery (they already have it)
903///
904/// # Returns
905/// List of recipients that delivery tasks were scheduled for
906pub async fn schedule_subscriber_fanout(
907	app: &App,
908	tn_id: TnId,
909	action_id: &str,
910	parent_id: Option<&str>,
911	issuer: &str,
912) -> ClResult<Vec<Box<str>>> {
913	let Some(starting_parent) = parent_id else {
914		return Ok(Vec::new());
915	};
916
917	// Get our id_tag to check for local ownership
918	let our_id_tag: Box<str> = app.auth_adapter.read_id_tag(tn_id).await?;
919
920	// Walk parent chain to find subscribable root
921	// Use owned String to avoid borrow checker issues across loop iterations
922	let mut current_parent_id: Option<String> = Some(starting_parent.to_string());
923	let mut recipients = Vec::new();
924
925	while let Some(p_id) = current_parent_id.take() {
926		let Some(parent_action) = app.meta_adapter.get_action(tn_id, &p_id).await? else {
927			break; // Parent not found locally
928		};
929
930		let subscribable = app
931			.ext::<Arc<DslEngine>>()?
932			.get_behavior(&parent_action.typ)
933			.and_then(|b| b.subscribable)
934			.unwrap_or(false);
935
936		if subscribable {
937			// Check if this subscribable parent is local:
938			// (audience=null & issuer=us) | audience=us
939			let is_local = match &parent_action.audience {
940				None => parent_action.issuer.id_tag.as_ref() == our_id_tag.as_ref(),
941				Some(aud) => aud.id_tag.as_ref() == our_id_tag.as_ref(),
942			};
943
944			if is_local {
945				// Get all subscribers, excluding ourselves and the issuer
946				let subs = app
947					.meta_adapter
948					.list_actions(
949						tn_id,
950						&meta_adapter::ListActionOptions {
951							typ: Some(vec!["SUBS".into()]),
952							subject: Some(p_id.clone()),
953							status: Some(vec!["A".into()]),
954							..Default::default()
955						},
956					)
957					.await?;
958
959				for sub in subs {
960					let sub_tag = sub.issuer.id_tag.as_ref();
961					// Exclude ourselves and the issuer (they already have it)
962					if sub_tag != our_id_tag.as_ref() && sub_tag != issuer {
963						recipients.push(sub.issuer.id_tag.clone());
964					}
965				}
966
967				// Schedule delivery tasks
968				if !recipients.is_empty() {
969					info!(
970						"→ SUBSCRIBER FAN-OUT: {} → {} recipients (root: {})",
971						action_id,
972						recipients.len(),
973						p_id
974					);
975
976					let retry_policy = RetryPolicy::new((10, 43200), 50);
977					for recipient_tag in &recipients {
978						let delivery_task = ActionDeliveryTask::new(
979							tn_id,
980							action_id.into(),
981							recipient_tag.clone(),
982							recipient_tag.clone(),
983						);
984						let task_key = format!("fanout:{}:{}", action_id, recipient_tag);
985						app.scheduler
986							.task(delivery_task)
987							.key(&task_key)
988							.with_retry(retry_policy.clone())
989							.schedule()
990							.await?;
991					}
992				}
993			}
994			break; // Found subscribable root, done walking
995		}
996
997		// Continue up the chain
998		current_parent_id = parent_action.parent_id.map(|p| p.to_string());
999	}
1000
1001	Ok(recipients)
1002}
1003
1004/// Action verifier generator Task
1005#[derive(Debug, Serialize, Deserialize)]
1006pub struct ActionVerifierTask {
1007	tn_id: TnId,
1008	token: Box<str>,
1009	/// Optional client IP address for rate limiting (stored as string for serialization)
1010	client_address: Option<Box<str>>,
1011}
1012
1013impl ActionVerifierTask {
1014	pub fn new(tn_id: TnId, token: Box<str>, client_address: Option<Box<str>>) -> Arc<Self> {
1015		Arc::new(Self { tn_id, token, client_address })
1016	}
1017}
1018
1019#[async_trait]
1020impl Task<App> for ActionVerifierTask {
1021	fn kind() -> &'static str {
1022		"action.verify"
1023	}
1024	fn kind_of(&self) -> &'static str {
1025		Self::kind()
1026	}
1027
1028	fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<App>>> {
1029		// Format: "tn_id,token" or "tn_id,token,client_address"
1030		let parts: Vec<&str> = ctx.splitn(3, ',').collect();
1031		if parts.len() < 2 {
1032			return Err(Error::Internal("invalid ActionVerifier context format".into()));
1033		}
1034		let tn_id = TnId(parts[0].parse()?);
1035		let token = parts[1].into();
1036		let client_address = parts.get(2).map(|&s| s.into());
1037		let task = ActionVerifierTask::new(tn_id, token, client_address);
1038		Ok(task)
1039	}
1040
1041	fn serialize(&self) -> String {
1042		match &self.client_address {
1043			Some(addr) => format!("{},{},{}", self.tn_id.0, self.token, addr),
1044			None => format!("{},{}", self.tn_id.0, self.token),
1045		}
1046	}
1047
1048	async fn run(&self, app: &App) -> ClResult<()> {
1049		let action_id = hasher::hash("a", self.token.as_bytes());
1050		debug!("Running task action.verify {}", action_id);
1051
1052		process::process_inbound_action_token(
1053			app,
1054			self.tn_id,
1055			&action_id,
1056			&self.token,
1057			false,
1058			self.client_address.as_ref().map(|s| s.to_string()),
1059		)
1060		.await?;
1061
1062		debug!("Finished task action.verify {}", action_id);
1063		Ok(())
1064	}
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069	use super::*;
1070
1071	#[test]
1072	fn test_create_action_struct() {
1073		let action = CreateAction {
1074			typ: "MSG".into(),
1075			audience_tag: Some("bob.example.com".into()),
1076			content: Some(serde_json::Value::String("Hello".to_string())),
1077			..Default::default()
1078		};
1079
1080		assert_eq!(action.typ.as_ref(), "MSG");
1081		assert_eq!(action.content, Some(serde_json::Value::String("Hello".to_string())));
1082		assert_eq!(action.audience_tag.as_deref(), Some("bob.example.com"));
1083	}
1084
1085	#[test]
1086	fn test_create_action_without_audience() {
1087		let action = CreateAction {
1088			typ: "POST".into(),
1089			sub_typ: Some("TEXT".into()),
1090			content: Some(serde_json::Value::String("Hello world".to_string())),
1091			flags: Some("RC".into()), // Reactions and comments allowed
1092			..Default::default()
1093		};
1094
1095		assert_eq!(action.typ.as_ref(), "POST");
1096		assert!(action.audience_tag.is_none());
1097		assert_eq!(action.flags.as_deref(), Some("RC"));
1098	}
1099}
1100
1101// vim: ts=4