Skip to main content

cloudillo_action/dsl/
operations.rs

1//! DSL operation executor
2//!
3//! Executes all DSL operations including:
4//! - Profile operations (update_profile, get_profile)
5//! - Action operations (create_action, get_action, update_action, delete_action)
6//! - Control flow (if, switch, foreach, return)
7//! - Data operations (set, get, merge)
8//! - Federation operations (broadcast, send)
9//! - Notification operations
10//! - Utility operations (log, abort)
11
12use super::expression::ExpressionEvaluator;
13use super::types::*;
14use cloudillo_core::{scheduler::RetryPolicy, ws_broadcast::BroadcastMessage};
15use cloudillo_types::meta_adapter;
16
17use crate::{delivery::ActionDeliveryTask, hooks::HookContext, prelude::*};
18use serde_json::Value;
19use std::collections::{HashMap, HashSet};
20
21/// Special marker for early return
22pub const EARLY_RETURN_MARKER: &str = "EARLY_RETURN";
23
24/// Parameters for creating an action
25struct CreateActionParams<'a> {
26	action_type: &'a str,
27	subtype: &'a Option<Expression>,
28	audience: &'a Option<Expression>,
29	parent: &'a Option<Expression>,
30	subject: &'a Option<Expression>,
31	content: &'a Option<Expression>,
32	attachments: &'a Option<Expression>,
33}
34
35/// Operation executor
36pub struct OperationExecutor<'a> {
37	evaluator: ExpressionEvaluator,
38	app: &'a App,
39	max_operations: usize,
40	operation_count: usize,
41}
42
43impl<'a> OperationExecutor<'a> {
44	/// Create a new operation executor
45	pub fn new(app: &'a App) -> Self {
46		Self { evaluator: ExpressionEvaluator::new(), app, max_operations: 100, operation_count: 0 }
47	}
48
49	/// Execute an operation
50	pub async fn execute(&mut self, op: &Operation, context: &mut HookContext) -> ClResult<()> {
51		self.operation_count += 1;
52		if self.operation_count > self.max_operations {
53			return Err(Error::ValidationError(format!(
54				"Maximum operations exceeded ({})",
55				self.max_operations
56			)));
57		}
58
59		match op {
60			// Profile operations
61			Operation::UpdateProfile { target, set } => {
62				self.execute_update_profile(target, set, context).await
63			}
64			Operation::GetProfile { target, r#as } => {
65				self.execute_get_profile(target, r#as, context).await
66			}
67
68			// Action operations
69			Operation::CreateAction {
70				r#type,
71				subtype,
72				audience,
73				parent,
74				subject,
75				content,
76				attachments,
77			} => {
78				let params = CreateActionParams {
79					action_type: r#type,
80					subtype,
81					audience,
82					parent,
83					subject,
84					content,
85					attachments,
86				};
87				self.execute_create_action(params, context).await
88			}
89			Operation::GetAction { key, action_id, r#as } => {
90				self.execute_get_action(key, action_id, r#as, context).await
91			}
92			Operation::UpdateAction { target, set } => {
93				self.execute_update_action(target, set, context).await
94			}
95			Operation::DeleteAction { target } => self.execute_delete_action(target, context).await,
96
97			// Control flow operations
98			Operation::If { condition, then, r#else } => {
99				self.execute_if(condition, then, r#else, context).await
100			}
101			Operation::Switch { value, cases, default } => {
102				self.execute_switch(value, cases, default, context).await
103			}
104			Operation::Foreach { array, r#as, r#do } => {
105				self.execute_foreach(array, r#as, r#do, context).await
106			}
107			Operation::Return { value: _ } => {
108				// Early return mechanism - use special marker
109				Err(Error::ValidationError(EARLY_RETURN_MARKER.to_string()))
110			}
111
112			// Data operations
113			Operation::Set { var, value } => self.execute_set(var, value, context).await,
114			Operation::Get { var, from } => self.execute_get(var, from, context).await,
115			Operation::Merge { objects, r#as } => self.execute_merge(objects, r#as, context).await,
116
117			// Federation operations
118			Operation::BroadcastToFollowers { action_id, token } => {
119				self.execute_broadcast_to_followers(action_id, token, context).await
120			}
121			Operation::SendToAudience { action_id, token, audience } => {
122				self.execute_send_to_audience(action_id, token, audience, context).await
123			}
124
125			// Notification operations
126			Operation::CreateNotification { user, r#type, action_id, priority } => {
127				self.execute_create_notification(user, r#type, action_id, priority, context)
128					.await
129			}
130
131			// Utility operations
132			Operation::Log { level, message } => self.execute_log(level, message, context).await,
133			Operation::Abort { error, code } => self.execute_abort(error, code, context).await,
134		}
135	}
136
137	// Profile Operations
138
139	async fn execute_update_profile(
140		&mut self,
141		target_expr: &Expression,
142		updates: &HashMap<String, Expression>,
143		context: &mut HookContext,
144	) -> ClResult<()> {
145		let target = self.evaluator.evaluate(target_expr, context)?;
146		let target_tag = match target {
147			Value::String(s) => s,
148			_ => return Err(Error::ValidationError("target must be a string (idTag)".to_string())),
149		};
150
151		// Evaluate all update expressions
152		let mut profile_updates: HashMap<String, Value> = HashMap::new();
153		for (key, value_expr) in updates {
154			let value = self.evaluator.evaluate(value_expr, context)?;
155			profile_updates.insert(key.clone(), value);
156		}
157
158		tracing::debug!("DSL: update_profile target={} updates={:?}", target_tag, profile_updates);
159
160		// Convert tenant_id from i64 to TnId
161		let tn_id = TnId(context.tenant_id as u32);
162
163		// Extract fields from profile_updates and build UpdateProfileData
164		let name = profile_updates.get("name").and_then(|v| v.as_str()).map(|s| s.to_string());
165
166		let profile_update = cloudillo_types::meta_adapter::UpdateProfileData {
167			name: name
168				.map(|s| cloudillo_types::types::Patch::Value(s.into()))
169				.unwrap_or(cloudillo_types::types::Patch::Undefined),
170			..Default::default()
171		};
172
173		// Update profile via meta adapter
174		self.app
175			.meta_adapter
176			.update_profile(tn_id, &target_tag, &profile_update)
177			.await?;
178
179		tracing::info!(
180			tenant_id = %tn_id.0,
181			target = %target_tag,
182			fields = ?profile_updates.keys().collect::<Vec<_>>(),
183			"DSL: updated profile"
184		);
185
186		Ok(())
187	}
188
189	async fn execute_get_profile(
190		&mut self,
191		target_expr: &Expression,
192		as_var: &Option<String>,
193		context: &mut HookContext,
194	) -> ClResult<()> {
195		let target = self.evaluator.evaluate(target_expr, context)?;
196		let target_tag = match target {
197			Value::String(s) => s,
198			_ => return Err(Error::ValidationError("target must be a string (idTag)".to_string())),
199		};
200
201		tracing::debug!("DSL: get_profile target={}", target_tag);
202
203		// Convert tenant_id from i64 to TnId
204		let tn_id = TnId(context.tenant_id as u32);
205
206		// Get profile via meta adapter
207		let (_etag, profile) = self.app.meta_adapter.read_profile(tn_id, &target_tag).await?;
208
209		// Store result if variable name provided
210		if let Some(var_name) = as_var {
211			let profile_json = serde_json::json!({
212				"id_tag": profile.id_tag,
213				"name": profile.name,
214				"type": match profile.typ {
215					cloudillo_types::meta_adapter::ProfileType::Person => "person",
216					cloudillo_types::meta_adapter::ProfileType::Community => "community",
217				},
218				"profile_pic": profile.profile_pic,
219				"following": profile.following,
220				"connected": profile.connected.is_connected(),
221			});
222			context.vars.insert(var_name.clone(), profile_json);
223
224			tracing::info!(
225				tenant_id = %tn_id.0,
226				target = %target_tag,
227				var = %var_name,
228				"DSL: fetched profile"
229			);
230		}
231
232		Ok(())
233	}
234
235	// Action Operations
236
237	async fn execute_create_action(
238		&mut self,
239		params: CreateActionParams<'_>,
240		context: &mut HookContext,
241	) -> ClResult<()> {
242		// Evaluate all fields
243		let subtype_val = if let Some(expr) = params.subtype {
244			Some(self.evaluator.evaluate(expr, context)?)
245		} else {
246			None
247		};
248
249		let audience_val = if let Some(expr) = params.audience {
250			Some(self.evaluator.evaluate(expr, context)?)
251		} else {
252			None
253		};
254
255		let parent_val = if let Some(expr) = params.parent {
256			Some(self.evaluator.evaluate(expr, context)?)
257		} else {
258			None
259		};
260
261		let subject_val = if let Some(expr) = params.subject {
262			Some(self.evaluator.evaluate(expr, context)?)
263		} else {
264			None
265		};
266
267		let content_val = if let Some(expr) = params.content {
268			Some(self.evaluator.evaluate(expr, context)?)
269		} else {
270			None
271		};
272
273		let attachments_val = if let Some(expr) = params.attachments {
274			Some(self.evaluator.evaluate(expr, context)?)
275		} else {
276			None
277		};
278
279		tracing::debug!(
280			"DSL: create_action type={} subtype={:?} audience={:?}",
281			params.action_type,
282			subtype_val,
283			audience_val
284		);
285
286		// Convert tenant_id from i64 to TnId
287		let tn_id = TnId(context.tenant_id as u32);
288
289		// Build CreateAction struct
290		let create_action = crate::task::CreateAction {
291			typ: params.action_type.to_string().into_boxed_str(),
292			sub_typ: subtype_val.and_then(|v| v.as_str().map(|s| s.to_string().into_boxed_str())),
293			audience_tag: audience_val
294				.and_then(|v| v.as_str().map(|s| s.to_string().into_boxed_str())),
295			parent_id: parent_val.and_then(|v| v.as_str().map(|s| s.to_string().into_boxed_str())),
296			subject: subject_val.and_then(|v| v.as_str().map(|s| s.to_string().into_boxed_str())),
297			content: content_val,
298			attachments: attachments_val.and_then(|v| {
299				v.as_array().map(|arr| {
300					arr.iter()
301						.filter_map(|item| item.as_str().map(|s| s.to_string().into_boxed_str()))
302						.collect()
303				})
304			}),
305			expires_at: None,
306			visibility: None,
307			flags: None, // Will use default_flags from action type definition
308			x: None,
309		};
310
311		// Call action creation
312		let action_id =
313			crate::task::create_action(self.app, tn_id, &context.tenant_tag, create_action).await?;
314
315		tracing::info!(
316			tenant_id = %tn_id.0,
317			action_type = %params.action_type,
318			action_id = %action_id,
319			"DSL: created action"
320		);
321
322		Ok(())
323	}
324
325	async fn execute_get_action(
326		&mut self,
327		key: &Option<Expression>,
328		action_id: &Option<Expression>,
329		as_var: &Option<String>,
330		context: &mut HookContext,
331	) -> ClResult<()> {
332		let key_val =
333			if let Some(expr) = key { Some(self.evaluator.evaluate(expr, context)?) } else { None };
334
335		let action_id_val = if let Some(expr) = action_id {
336			Some(self.evaluator.evaluate(expr, context)?)
337		} else {
338			None
339		};
340
341		tracing::debug!("DSL: get_action key={:?} action_id={:?}", key_val, action_id_val);
342
343		// Convert tenant_id from i64 to TnId
344		let tn_id = TnId(context.tenant_id as u32);
345
346		// Retrieve action by key or action_id
347		let action_result = if let Some(key) = key_val {
348			let key_str = key
349				.as_str()
350				.ok_or_else(|| Error::ValidationError("key must be a string".to_string()))?;
351
352			// Use get_action_by_key
353			self.app.meta_adapter.get_action_by_key(tn_id, key_str).await?.map(|action| {
354				serde_json::json!({
355					"action_id": action.action_id,
356					"type": action.typ,
357					"subtype": action.sub_typ,
358					"issuer": action.issuer_tag,
359					"audience": action.audience_tag,
360					"parent": action.parent_id,
361					"root": action.root_id,
362					"subject": action.subject,
363					"content": action.content,
364					"attachments": action.attachments,
365					"created_at": action.created_at.0,
366					"expires_at": action.expires_at.map(|ts| ts.0),
367				})
368			})
369		} else if let Some(action_id) = action_id_val {
370			let action_id_str = action_id
371				.as_str()
372				.ok_or_else(|| Error::ValidationError("action_id must be a string".to_string()))?;
373
374			// Use get_action
375			self.app.meta_adapter.get_action(tn_id, action_id_str).await?.map(|action| {
376				serde_json::json!({
377					"action_id": action.action_id,
378					"type": action.typ,
379					"subtype": action.sub_typ,
380					"issuer": {
381						"id_tag": action.issuer.id_tag,
382						"name": action.issuer.name,
383						"profile_pic": action.issuer.profile_pic,
384					},
385					"audience": action.audience.as_ref().map(|a| serde_json::json!({
386						"id_tag": a.id_tag,
387						"name": a.name,
388						"profile_pic": a.profile_pic,
389					})),
390					"parent": action.parent_id,
391					"root": action.root_id,
392					"subject": action.subject,
393					"content": action.content,
394					"attachments": action.attachments,
395					"created_at": action.created_at.0,
396					"expires_at": action.expires_at.map(|ts| ts.0),
397				})
398			})
399		} else {
400			return Err(Error::ValidationError(
401				"Either key or action_id must be provided".to_string(),
402			));
403		};
404
405		// Store result if variable name provided
406		if let Some(var_name) = as_var {
407			let value = action_result.unwrap_or(Value::Null);
408			context.vars.insert(var_name.clone(), value.clone());
409
410			if !value.is_null() {
411				tracing::info!(
412					tenant_id = %tn_id.0,
413					var = %var_name,
414					"DSL: fetched action"
415				);
416			} else {
417				tracing::debug!(
418					tenant_id = %tn_id.0,
419					var = %var_name,
420					"DSL: action not found"
421				);
422			}
423		}
424
425		Ok(())
426	}
427
428	async fn execute_update_action(
429		&mut self,
430		target_expr: &Expression,
431		updates: &HashMap<String, UpdateValue>,
432		context: &mut HookContext,
433	) -> ClResult<()> {
434		let target = self.evaluator.evaluate(target_expr, context)?;
435		let action_id = match target {
436			Value::String(s) => s,
437			_ => {
438				return Err(Error::ValidationError(
439					"target must be a string (actionId)".to_string(),
440				))
441			}
442		};
443
444		tracing::debug!("DSL: update_action target={}", action_id);
445
446		// Convert tenant_id from i64 to TnId
447		let tn_id = TnId(context.tenant_id as u32);
448
449		// Fetch current action data for increment/decrement operations
450		let action_data = self.app.meta_adapter.get_action_data(tn_id, &action_id).await?;
451
452		// Evaluate all update expressions
453		use cloudillo_types::types::Patch;
454		let mut update_opts = meta_adapter::UpdateActionDataOptions::default();
455
456		for (key, update_value) in updates {
457			match key.as_str() {
458				"status" => {
459					let value = match update_value {
460						UpdateValue::Direct(expr) | UpdateValue::Set { set: expr } => {
461							self.evaluator.evaluate(expr, context)?
462						}
463						_ => {
464							return Err(Error::ValidationError(
465								"status field does not support increment/decrement".to_string(),
466							))
467						}
468					};
469					update_opts.status = if value.is_null() {
470						Patch::Null
471					} else if let Some(s) = value.as_str() {
472						// Map human-readable status names to single-char codes
473						let status_char = match s {
474							"confirmation" => 'C',
475							"notification" => 'N',
476							"active" => 'A',
477							"pending" => 'P',
478							"deleted" => 'D',
479							_ => s.chars().next().unwrap_or('A'),
480						};
481						Patch::Value(status_char)
482					} else {
483						Patch::Undefined
484					};
485				}
486				"subject" => {
487					let value = match update_value {
488						UpdateValue::Direct(expr) | UpdateValue::Set { set: expr } => {
489							self.evaluator.evaluate(expr, context)?
490						}
491						_ => {
492							return Err(Error::ValidationError(
493								"subject field does not support increment/decrement".to_string(),
494							))
495						}
496					};
497					update_opts.subject = if value.is_null() {
498						Patch::Null
499					} else if let Some(s) = value.as_str() {
500						Patch::Value(s.to_string())
501					} else {
502						Patch::Undefined
503					};
504				}
505				"reactions" => {
506					let value = match update_value {
507						UpdateValue::Direct(expr) | UpdateValue::Set { set: expr } => {
508							self.evaluator.evaluate(expr, context)?
509						}
510						UpdateValue::Increment { increment } => {
511							let inc = self.evaluator.evaluate(increment, context)?;
512							let inc_val = inc.as_u64().ok_or_else(|| {
513								Error::ValidationError(
514									"increment value must be a number".to_string(),
515								)
516							})? as u32;
517
518							let current =
519								action_data.as_ref().and_then(|d| d.reactions).unwrap_or(0);
520							Value::from(current + inc_val)
521						}
522						UpdateValue::Decrement { decrement } => {
523							let dec = self.evaluator.evaluate(decrement, context)?;
524							let dec_val = dec.as_u64().ok_or_else(|| {
525								Error::ValidationError(
526									"decrement value must be a number".to_string(),
527								)
528							})? as u32;
529
530							let current =
531								action_data.as_ref().and_then(|d| d.reactions).unwrap_or(0);
532							Value::from(current.saturating_sub(dec_val))
533						}
534					};
535					update_opts.reactions = if value.is_null() {
536						Patch::Null
537					} else if let Some(v) = value.as_u64() {
538						Patch::Value(v as u32)
539					} else {
540						Patch::Undefined
541					};
542				}
543				"comments" => {
544					let value = match update_value {
545						UpdateValue::Direct(expr) | UpdateValue::Set { set: expr } => {
546							self.evaluator.evaluate(expr, context)?
547						}
548						UpdateValue::Increment { increment } => {
549							let inc = self.evaluator.evaluate(increment, context)?;
550							let inc_val = inc.as_u64().ok_or_else(|| {
551								Error::ValidationError(
552									"increment value must be a number".to_string(),
553								)
554							})? as u32;
555
556							let current =
557								action_data.as_ref().and_then(|d| d.comments).unwrap_or(0);
558							Value::from(current + inc_val)
559						}
560						UpdateValue::Decrement { decrement } => {
561							let dec = self.evaluator.evaluate(decrement, context)?;
562							let dec_val = dec.as_u64().ok_or_else(|| {
563								Error::ValidationError(
564									"decrement value must be a number".to_string(),
565								)
566							})? as u32;
567
568							let current =
569								action_data.as_ref().and_then(|d| d.comments).unwrap_or(0);
570							Value::from(current.saturating_sub(dec_val))
571						}
572					};
573					update_opts.comments = if value.is_null() {
574						Patch::Null
575					} else if let Some(v) = value.as_u64() {
576						Patch::Value(v as u32)
577					} else {
578						Patch::Undefined
579					};
580				}
581				_ => {
582					tracing::warn!("DSL: update_action ignoring unknown field '{}'", key);
583				}
584			}
585		}
586
587		// Update action data via meta adapter
588		self.app
589			.meta_adapter
590			.update_action_data(tn_id, &action_id, &update_opts)
591			.await?;
592
593		tracing::info!(
594			tenant_id = %tn_id.0,
595			action_id = %action_id,
596			updates = ?updates.keys().collect::<Vec<_>>(),
597			"DSL: updated action"
598		);
599
600		Ok(())
601	}
602
603	async fn execute_delete_action(
604		&mut self,
605		target_expr: &Expression,
606		context: &mut HookContext,
607	) -> ClResult<()> {
608		let target = self.evaluator.evaluate(target_expr, context)?;
609		let action_id = match target {
610			Value::String(s) => s,
611			_ => {
612				return Err(Error::ValidationError(
613					"target must be a string (actionId)".to_string(),
614				))
615			}
616		};
617
618		tracing::debug!("DSL: delete_action target={}", action_id);
619
620		// Convert tenant_id from i64 to TnId
621		let tn_id = TnId(context.tenant_id as u32);
622
623		// Delete action via meta adapter
624		self.app.meta_adapter.delete_action(tn_id, &action_id).await?;
625
626		tracing::info!(
627			tenant_id = %tn_id.0,
628			action_id = %action_id,
629			"DSL: deleted action"
630		);
631
632		Ok(())
633	}
634
635	// Control Flow Operations
636
637	fn execute_if<'b>(
638		&'b mut self,
639		condition: &'b Expression,
640		then_ops: &'b [Operation],
641		else_ops: &'b Option<Vec<Operation>>,
642		context: &'b mut HookContext,
643	) -> std::pin::Pin<Box<dyn std::future::Future<Output = ClResult<()>> + Send + 'b>> {
644		Box::pin(async move {
645			let condition_value = self.evaluator.evaluate(condition, context)?;
646			let is_truthy = match condition_value {
647				Value::Bool(b) => b,
648				Value::Null => false,
649				Value::Number(n) => n.as_f64().unwrap_or(0.0) != 0.0,
650				Value::String(s) => !s.is_empty(),
651				_ => true,
652			};
653
654			if is_truthy {
655				for op in then_ops {
656					self.execute(op, context).await?;
657				}
658			} else if let Some(else_branch) = else_ops {
659				for op in else_branch {
660					self.execute(op, context).await?;
661				}
662			}
663
664			Ok(())
665		})
666	}
667
668	fn execute_switch<'b>(
669		&'b mut self,
670		value_expr: &'b Expression,
671		cases: &'b HashMap<String, Vec<Operation>>,
672		default: &'b Option<Vec<Operation>>,
673		context: &'b mut HookContext,
674	) -> std::pin::Pin<Box<dyn std::future::Future<Output = ClResult<()>> + Send + 'b>> {
675		Box::pin(async move {
676			let value = self.evaluator.evaluate(value_expr, context)?;
677			let value_str = match value {
678				Value::String(s) => s,
679				Value::Number(n) => n.to_string(),
680				Value::Bool(b) => b.to_string(),
681				Value::Null => "null".to_string(),
682				_ => value.to_string(),
683			};
684
685			if let Some(case_ops) = cases.get(&value_str) {
686				for op in case_ops {
687					self.execute(op, context).await?;
688				}
689			} else if let Some(default_ops) = default {
690				for op in default_ops {
691					self.execute(op, context).await?;
692				}
693			}
694
695			Ok(())
696		})
697	}
698
699	fn execute_foreach<'b>(
700		&'b mut self,
701		array_expr: &'b Expression,
702		as_var: &'b Option<String>,
703		do_ops: &'b [Operation],
704		context: &'b mut HookContext,
705	) -> std::pin::Pin<Box<dyn std::future::Future<Output = ClResult<()>> + Send + 'b>> {
706		Box::pin(async move {
707			let array_value = self.evaluator.evaluate(array_expr, context)?;
708			let array = match array_value {
709				Value::Array(arr) => arr,
710				_ => return Err(Error::ValidationError("foreach requires an array".to_string())),
711			};
712
713			// Limit to 100 iterations
714			if array.len() > 100 {
715				return Err(Error::ValidationError(format!(
716					"foreach limited to 100 items, got {}",
717					array.len()
718				)));
719			}
720
721			for item in array {
722				// Set loop variable if provided
723				if let Some(var_name) = as_var {
724					context.vars.insert(var_name.clone(), item.clone());
725				}
726
727				// Execute loop body
728				for op in do_ops {
729					self.execute(op, context).await?;
730				}
731			}
732
733			Ok(())
734		})
735	}
736
737	// Data Operations
738
739	async fn execute_set(
740		&mut self,
741		var_name: &str,
742		value_expr: &Expression,
743		context: &mut HookContext,
744	) -> ClResult<()> {
745		let value = self.evaluator.evaluate(value_expr, context)?;
746		context.vars.insert(var_name.to_string(), value);
747		Ok(())
748	}
749
750	async fn execute_get(
751		&mut self,
752		var_name: &str,
753		from_expr: &Expression,
754		context: &mut HookContext,
755	) -> ClResult<()> {
756		let value = self.evaluator.evaluate(from_expr, context)?;
757		context.vars.insert(var_name.to_string(), value);
758		Ok(())
759	}
760
761	async fn execute_merge(
762		&mut self,
763		objects: &[Expression],
764		as_var: &str,
765		context: &mut HookContext,
766	) -> ClResult<()> {
767		let mut merged = serde_json::Map::new();
768
769		for obj_expr in objects {
770			let value = self.evaluator.evaluate(obj_expr, context)?;
771			if let Value::Object(obj) = value {
772				for (k, v) in obj {
773					merged.insert(k, v);
774				}
775			}
776		}
777
778		context.vars.insert(as_var.to_string(), Value::Object(merged));
779		Ok(())
780	}
781
782	// Federation Operations
783
784	async fn execute_broadcast_to_followers(
785		&mut self,
786		action_id: &Expression,
787		token: &Expression,
788		context: &mut HookContext,
789	) -> ClResult<()> {
790		let action_id_val = self.evaluator.evaluate(action_id, context)?;
791		let action_id_str = match action_id_val {
792			Value::String(s) => s,
793			_ => return Err(Error::ValidationError("action_id must be a string".to_string())),
794		};
795
796		let token_val = self.evaluator.evaluate(token, context)?;
797		let _token_str = match token_val {
798			Value::String(s) => s,
799			_ => return Err(Error::ValidationError("token must be a string".to_string())),
800		};
801
802		tracing::debug!(
803			"DSL: broadcast_to_followers action_id={} (querying for followers)",
804			action_id_str
805		);
806
807		// Convert tenant_id from i64 to TnId
808		let tn_id = TnId(context.tenant_id as u32);
809
810		// Query for FLLW and CONN actions to find followers
811		let follower_actions = self
812			.app
813			.meta_adapter
814			.list_actions(
815				tn_id,
816				&meta_adapter::ListActionOptions {
817					typ: Some(vec!["FLLW".into(), "CONN".into()]),
818					..Default::default()
819				},
820			)
821			.await?;
822
823		// Extract unique follower id_tags (the issuers of FLLW/CONN actions)
824		// Exclude self (issuer_tag != tenant_tag)
825		let mut follower_set = HashSet::new();
826		for action_view in follower_actions {
827			if action_view.issuer.id_tag.as_ref() != context.tenant_tag.as_str() {
828				follower_set.insert(action_view.issuer.id_tag.clone());
829			}
830		}
831
832		let recipients: Vec<Box<str>> = follower_set.into_iter().collect();
833		tracing::info!(
834			tenant_id = %tn_id.0,
835			action_id = %action_id_str,
836			followers = %recipients.len(),
837			"DSL: broadcasting to followers"
838		);
839
840		// Create delivery task for each recipient
841		for recipient_tag in recipients {
842			tracing::debug!(
843				"DSL: creating delivery task for action {} to {}",
844				action_id_str,
845				recipient_tag
846			);
847
848			let delivery_task = ActionDeliveryTask::new(
849				tn_id,
850				action_id_str.clone().into_boxed_str(),
851				recipient_tag.clone(), // target_instance
852				recipient_tag.clone(), // target_id_tag
853			);
854
855			// Use unique key to prevent duplicate delivery tasks
856			let task_key = format!("delivery:{}:{}", action_id_str, recipient_tag);
857
858			// Create retry policy: exponential backoff from 10 sec to 12 hours, max 50 retries
859			let retry_policy = RetryPolicy::new((10, 43200), 50);
860
861			// Add delivery task to scheduler
862			self.app
863				.scheduler
864				.task(delivery_task)
865				.key(&task_key)
866				.with_retry(retry_policy)
867				.schedule()
868				.await?;
869		}
870
871		Ok(())
872	}
873
874	async fn execute_send_to_audience(
875		&mut self,
876		action_id: &Expression,
877		token: &Expression,
878		audience: &Expression,
879		context: &mut HookContext,
880	) -> ClResult<()> {
881		let action_id_val = self.evaluator.evaluate(action_id, context)?;
882		let action_id_str = match action_id_val {
883			Value::String(s) => s,
884			_ => return Err(Error::ValidationError("action_id must be a string".to_string())),
885		};
886
887		let token_val = self.evaluator.evaluate(token, context)?;
888		let _token_str = match token_val {
889			Value::String(s) => s,
890			_ => return Err(Error::ValidationError("token must be a string".to_string())),
891		};
892
893		let audience_val = self.evaluator.evaluate(audience, context)?;
894		let audience_tag = match audience_val {
895			Value::String(s) => s,
896			_ => {
897				return Err(Error::ValidationError("audience must be a string (idTag)".to_string()))
898			}
899		};
900
901		tracing::debug!(
902			"DSL: send_to_audience action_id={} audience={}",
903			action_id_str,
904			audience_tag
905		);
906
907		// Convert tenant_id from i64 to TnId
908		let tn_id = TnId(context.tenant_id as u32);
909
910		// Don't send to self
911		if audience_tag.as_str() == context.tenant_tag.as_str() {
912			tracing::debug!("DSL: skipping send_to_audience (audience is self): {}", audience_tag);
913			return Ok(());
914		}
915
916		// Create delivery task for the specific audience
917		tracing::debug!(
918			"DSL: creating delivery task for action {} to {}",
919			action_id_str,
920			audience_tag
921		);
922
923		let delivery_task = ActionDeliveryTask::new(
924			tn_id,
925			action_id_str.clone().into_boxed_str(),
926			audience_tag.clone().into_boxed_str(), // target_instance
927			audience_tag.clone().into_boxed_str(), // target_id_tag
928		);
929
930		// Use unique key to prevent duplicate delivery tasks
931		let task_key = format!("delivery:{}:{}", action_id_str, audience_tag);
932
933		// Create retry policy: exponential backoff from 10 sec to 12 hours, max 50 retries
934		let retry_policy = RetryPolicy::new((10, 43200), 50);
935
936		// Add delivery task to scheduler
937		self.app
938			.scheduler
939			.task(delivery_task)
940			.key(&task_key)
941			.with_retry(retry_policy)
942			.schedule()
943			.await?;
944
945		tracing::info!(
946			tenant_id = %tn_id.0,
947			action_id = %action_id_str,
948			audience = %audience_tag,
949			"DSL: sent action to audience"
950		);
951
952		Ok(())
953	}
954
955	// Notification Operations
956
957	async fn execute_create_notification(
958		&mut self,
959		user: &Expression,
960		notification_type: &Expression,
961		action_id: &Expression,
962		priority: &Option<Expression>,
963		context: &mut HookContext,
964	) -> ClResult<()> {
965		let user_val = self.evaluator.evaluate(user, context)?;
966		let type_val = self.evaluator.evaluate(notification_type, context)?;
967		let action_id_val = self.evaluator.evaluate(action_id, context)?;
968
969		let priority_val = if let Some(expr) = priority {
970			Some(self.evaluator.evaluate(expr, context)?)
971		} else {
972			None
973		};
974
975		// Extract string values
976		let user_id = match user_val {
977			Value::String(s) => s,
978			_ => return Err(Error::ValidationError("user must be a string".to_string())),
979		};
980
981		let notification_type = match type_val {
982			Value::String(s) => s,
983			_ => {
984				return Err(Error::ValidationError(
985					"notification_type must be a string".to_string(),
986				))
987			}
988		};
989
990		let action_id_str = match action_id_val {
991			Value::String(s) => s,
992			_ => return Err(Error::ValidationError("action_id must be a string".to_string())),
993		};
994
995		tracing::debug!(
996			"DSL: create_notification user={} type={} action_id={}",
997			user_id,
998			notification_type,
999			action_id_str
1000		);
1001
1002		// Create notification data
1003		let notification_data = serde_json::json!({
1004			"type": notification_type,
1005			"action_id": action_id_str,
1006			"priority": priority_val,
1007			"timestamp": std::time::SystemTime::now()
1008				.duration_since(std::time::UNIX_EPOCH)
1009				.unwrap_or_default()
1010				.as_secs(),
1011		});
1012
1013		// Send notification to user via direct messaging
1014		let tn_id = cloudillo_types::types::TnId(context.tenant_id as u32);
1015		let broadcast_msg =
1016			BroadcastMessage::new("notification", notification_data, context.tenant_tag.clone());
1017
1018		let _ = self.app.broadcast.send_to_user(tn_id, &user_id, broadcast_msg).await;
1019
1020		tracing::info!(
1021			tenant_id = %context.tenant_id,
1022			user = %user_id,
1023			notification_type = %notification_type,
1024			action_id = %action_id_str,
1025			"DSL: sent notification"
1026		);
1027
1028		Ok(())
1029	}
1030
1031	// Utility Operations
1032
1033	async fn execute_log(
1034		&mut self,
1035		level: &Option<String>,
1036		message: &Expression,
1037		context: &mut HookContext,
1038	) -> ClResult<()> {
1039		let message_val = self.evaluator.evaluate(message, context)?;
1040		let message_str = match message_val {
1041			Value::String(s) => s,
1042			v => v.to_string(),
1043		};
1044
1045		match level.as_deref() {
1046			Some("error") => tracing::error!("DSL: {}", message_str),
1047			Some("warn") => tracing::warn!("DSL: {}", message_str),
1048			Some("debug") => tracing::debug!("DSL: {}", message_str),
1049			Some("trace") => tracing::trace!("DSL: {}", message_str),
1050			_ => tracing::info!("DSL: {}", message_str),
1051		}
1052
1053		Ok(())
1054	}
1055
1056	async fn execute_abort(
1057		&mut self,
1058		error: &Expression,
1059		code: &Option<String>,
1060		context: &mut HookContext,
1061	) -> ClResult<()> {
1062		let error_val = self.evaluator.evaluate(error, context)?;
1063		let error_str = match error_val {
1064			Value::String(s) => s,
1065			v => v.to_string(),
1066		};
1067
1068		let full_error = if let Some(code_str) = code {
1069			format!("Operation aborted [{}]: {}", code_str, error_str)
1070		} else {
1071			format!("Operation aborted: {}", error_str)
1072		};
1073
1074		Err(Error::ValidationError(full_error))
1075	}
1076}
1077
1078// vim: ts=4