1use super::expression::ExpressionEvaluator;
13use super::types::*;
14use cloudillo_core::{scheduler::RetryPolicy, ws_broadcast::BroadcastMessage};
15use cloudillo_types::meta_adapter;
16
17use crate::{delivery::ActionDeliveryTask, hooks::HookContext, prelude::*};
18use serde_json::Value;
19use std::collections::{HashMap, HashSet};
20
21pub const EARLY_RETURN_MARKER: &str = "EARLY_RETURN";
23
24struct CreateActionParams<'a> {
26 action_type: &'a str,
27 subtype: &'a Option<Expression>,
28 audience: &'a Option<Expression>,
29 parent: &'a Option<Expression>,
30 subject: &'a Option<Expression>,
31 content: &'a Option<Expression>,
32 attachments: &'a Option<Expression>,
33}
34
35pub struct OperationExecutor<'a> {
37 evaluator: ExpressionEvaluator,
38 app: &'a App,
39 max_operations: usize,
40 operation_count: usize,
41}
42
43impl<'a> OperationExecutor<'a> {
44 pub fn new(app: &'a App) -> Self {
46 Self { evaluator: ExpressionEvaluator::new(), app, max_operations: 100, operation_count: 0 }
47 }
48
49 pub async fn execute(&mut self, op: &Operation, context: &mut HookContext) -> ClResult<()> {
51 self.operation_count += 1;
52 if self.operation_count > self.max_operations {
53 return Err(Error::ValidationError(format!(
54 "Maximum operations exceeded ({})",
55 self.max_operations
56 )));
57 }
58
59 match op {
60 Operation::UpdateProfile { target, set } => {
62 self.execute_update_profile(target, set, context).await
63 }
64 Operation::GetProfile { target, r#as } => {
65 self.execute_get_profile(target, r#as, context).await
66 }
67
68 Operation::CreateAction {
70 r#type,
71 subtype,
72 audience,
73 parent,
74 subject,
75 content,
76 attachments,
77 } => {
78 let params = CreateActionParams {
79 action_type: r#type,
80 subtype,
81 audience,
82 parent,
83 subject,
84 content,
85 attachments,
86 };
87 self.execute_create_action(params, context).await
88 }
89 Operation::GetAction { key, action_id, r#as } => {
90 self.execute_get_action(key, action_id, r#as, context).await
91 }
92 Operation::UpdateAction { target, set } => {
93 self.execute_update_action(target, set, context).await
94 }
95 Operation::DeleteAction { target } => self.execute_delete_action(target, context).await,
96
97 Operation::If { condition, then, r#else } => {
99 self.execute_if(condition, then, r#else, context).await
100 }
101 Operation::Switch { value, cases, default } => {
102 self.execute_switch(value, cases, default, context).await
103 }
104 Operation::Foreach { array, r#as, r#do } => {
105 self.execute_foreach(array, r#as, r#do, context).await
106 }
107 Operation::Return { value: _ } => {
108 Err(Error::ValidationError(EARLY_RETURN_MARKER.to_string()))
110 }
111
112 Operation::Set { var, value } => self.execute_set(var, value, context).await,
114 Operation::Get { var, from } => self.execute_get(var, from, context).await,
115 Operation::Merge { objects, r#as } => self.execute_merge(objects, r#as, context).await,
116
117 Operation::BroadcastToFollowers { action_id, token } => {
119 self.execute_broadcast_to_followers(action_id, token, context).await
120 }
121 Operation::SendToAudience { action_id, token, audience } => {
122 self.execute_send_to_audience(action_id, token, audience, context).await
123 }
124
125 Operation::CreateNotification { user, r#type, action_id, priority } => {
127 self.execute_create_notification(user, r#type, action_id, priority, context)
128 .await
129 }
130
131 Operation::Log { level, message } => self.execute_log(level, message, context).await,
133 Operation::Abort { error, code } => self.execute_abort(error, code, context).await,
134 }
135 }
136
137 async fn execute_update_profile(
140 &mut self,
141 target_expr: &Expression,
142 updates: &HashMap<String, Expression>,
143 context: &mut HookContext,
144 ) -> ClResult<()> {
145 let target = self.evaluator.evaluate(target_expr, context)?;
146 let target_tag = match target {
147 Value::String(s) => s,
148 _ => return Err(Error::ValidationError("target must be a string (idTag)".to_string())),
149 };
150
151 let mut profile_updates: HashMap<String, Value> = HashMap::new();
153 for (key, value_expr) in updates {
154 let value = self.evaluator.evaluate(value_expr, context)?;
155 profile_updates.insert(key.clone(), value);
156 }
157
158 tracing::debug!("DSL: update_profile target={} updates={:?}", target_tag, profile_updates);
159
160 let tn_id = TnId(context.tenant_id as u32);
162
163 let name = profile_updates.get("name").and_then(|v| v.as_str()).map(|s| s.to_string());
165
166 let profile_update = cloudillo_types::meta_adapter::UpdateProfileData {
167 name: name
168 .map(|s| cloudillo_types::types::Patch::Value(s.into()))
169 .unwrap_or(cloudillo_types::types::Patch::Undefined),
170 ..Default::default()
171 };
172
173 self.app
175 .meta_adapter
176 .update_profile(tn_id, &target_tag, &profile_update)
177 .await?;
178
179 tracing::info!(
180 tenant_id = %tn_id.0,
181 target = %target_tag,
182 fields = ?profile_updates.keys().collect::<Vec<_>>(),
183 "DSL: updated profile"
184 );
185
186 Ok(())
187 }
188
189 async fn execute_get_profile(
190 &mut self,
191 target_expr: &Expression,
192 as_var: &Option<String>,
193 context: &mut HookContext,
194 ) -> ClResult<()> {
195 let target = self.evaluator.evaluate(target_expr, context)?;
196 let target_tag = match target {
197 Value::String(s) => s,
198 _ => return Err(Error::ValidationError("target must be a string (idTag)".to_string())),
199 };
200
201 tracing::debug!("DSL: get_profile target={}", target_tag);
202
203 let tn_id = TnId(context.tenant_id as u32);
205
206 let (_etag, profile) = self.app.meta_adapter.read_profile(tn_id, &target_tag).await?;
208
209 if let Some(var_name) = as_var {
211 let profile_json = serde_json::json!({
212 "id_tag": profile.id_tag,
213 "name": profile.name,
214 "type": match profile.typ {
215 cloudillo_types::meta_adapter::ProfileType::Person => "person",
216 cloudillo_types::meta_adapter::ProfileType::Community => "community",
217 },
218 "profile_pic": profile.profile_pic,
219 "following": profile.following,
220 "connected": profile.connected.is_connected(),
221 });
222 context.vars.insert(var_name.clone(), profile_json);
223
224 tracing::info!(
225 tenant_id = %tn_id.0,
226 target = %target_tag,
227 var = %var_name,
228 "DSL: fetched profile"
229 );
230 }
231
232 Ok(())
233 }
234
235 async fn execute_create_action(
238 &mut self,
239 params: CreateActionParams<'_>,
240 context: &mut HookContext,
241 ) -> ClResult<()> {
242 let subtype_val = if let Some(expr) = params.subtype {
244 Some(self.evaluator.evaluate(expr, context)?)
245 } else {
246 None
247 };
248
249 let audience_val = if let Some(expr) = params.audience {
250 Some(self.evaluator.evaluate(expr, context)?)
251 } else {
252 None
253 };
254
255 let parent_val = if let Some(expr) = params.parent {
256 Some(self.evaluator.evaluate(expr, context)?)
257 } else {
258 None
259 };
260
261 let subject_val = if let Some(expr) = params.subject {
262 Some(self.evaluator.evaluate(expr, context)?)
263 } else {
264 None
265 };
266
267 let content_val = if let Some(expr) = params.content {
268 Some(self.evaluator.evaluate(expr, context)?)
269 } else {
270 None
271 };
272
273 let attachments_val = if let Some(expr) = params.attachments {
274 Some(self.evaluator.evaluate(expr, context)?)
275 } else {
276 None
277 };
278
279 tracing::debug!(
280 "DSL: create_action type={} subtype={:?} audience={:?}",
281 params.action_type,
282 subtype_val,
283 audience_val
284 );
285
286 let tn_id = TnId(context.tenant_id as u32);
288
289 let create_action = crate::task::CreateAction {
291 typ: params.action_type.to_string().into_boxed_str(),
292 sub_typ: subtype_val.and_then(|v| v.as_str().map(|s| s.to_string().into_boxed_str())),
293 audience_tag: audience_val
294 .and_then(|v| v.as_str().map(|s| s.to_string().into_boxed_str())),
295 parent_id: parent_val.and_then(|v| v.as_str().map(|s| s.to_string().into_boxed_str())),
296 subject: subject_val.and_then(|v| v.as_str().map(|s| s.to_string().into_boxed_str())),
297 content: content_val,
298 attachments: attachments_val.and_then(|v| {
299 v.as_array().map(|arr| {
300 arr.iter()
301 .filter_map(|item| item.as_str().map(|s| s.to_string().into_boxed_str()))
302 .collect()
303 })
304 }),
305 expires_at: None,
306 visibility: None,
307 flags: None, x: None,
309 };
310
311 let action_id =
313 crate::task::create_action(self.app, tn_id, &context.tenant_tag, create_action).await?;
314
315 tracing::info!(
316 tenant_id = %tn_id.0,
317 action_type = %params.action_type,
318 action_id = %action_id,
319 "DSL: created action"
320 );
321
322 Ok(())
323 }
324
325 async fn execute_get_action(
326 &mut self,
327 key: &Option<Expression>,
328 action_id: &Option<Expression>,
329 as_var: &Option<String>,
330 context: &mut HookContext,
331 ) -> ClResult<()> {
332 let key_val =
333 if let Some(expr) = key { Some(self.evaluator.evaluate(expr, context)?) } else { None };
334
335 let action_id_val = if let Some(expr) = action_id {
336 Some(self.evaluator.evaluate(expr, context)?)
337 } else {
338 None
339 };
340
341 tracing::debug!("DSL: get_action key={:?} action_id={:?}", key_val, action_id_val);
342
343 let tn_id = TnId(context.tenant_id as u32);
345
346 let action_result = if let Some(key) = key_val {
348 let key_str = key
349 .as_str()
350 .ok_or_else(|| Error::ValidationError("key must be a string".to_string()))?;
351
352 self.app.meta_adapter.get_action_by_key(tn_id, key_str).await?.map(|action| {
354 serde_json::json!({
355 "action_id": action.action_id,
356 "type": action.typ,
357 "subtype": action.sub_typ,
358 "issuer": action.issuer_tag,
359 "audience": action.audience_tag,
360 "parent": action.parent_id,
361 "root": action.root_id,
362 "subject": action.subject,
363 "content": action.content,
364 "attachments": action.attachments,
365 "created_at": action.created_at.0,
366 "expires_at": action.expires_at.map(|ts| ts.0),
367 })
368 })
369 } else if let Some(action_id) = action_id_val {
370 let action_id_str = action_id
371 .as_str()
372 .ok_or_else(|| Error::ValidationError("action_id must be a string".to_string()))?;
373
374 self.app.meta_adapter.get_action(tn_id, action_id_str).await?.map(|action| {
376 serde_json::json!({
377 "action_id": action.action_id,
378 "type": action.typ,
379 "subtype": action.sub_typ,
380 "issuer": {
381 "id_tag": action.issuer.id_tag,
382 "name": action.issuer.name,
383 "profile_pic": action.issuer.profile_pic,
384 },
385 "audience": action.audience.as_ref().map(|a| serde_json::json!({
386 "id_tag": a.id_tag,
387 "name": a.name,
388 "profile_pic": a.profile_pic,
389 })),
390 "parent": action.parent_id,
391 "root": action.root_id,
392 "subject": action.subject,
393 "content": action.content,
394 "attachments": action.attachments,
395 "created_at": action.created_at.0,
396 "expires_at": action.expires_at.map(|ts| ts.0),
397 })
398 })
399 } else {
400 return Err(Error::ValidationError(
401 "Either key or action_id must be provided".to_string(),
402 ));
403 };
404
405 if let Some(var_name) = as_var {
407 let value = action_result.unwrap_or(Value::Null);
408 context.vars.insert(var_name.clone(), value.clone());
409
410 if !value.is_null() {
411 tracing::info!(
412 tenant_id = %tn_id.0,
413 var = %var_name,
414 "DSL: fetched action"
415 );
416 } else {
417 tracing::debug!(
418 tenant_id = %tn_id.0,
419 var = %var_name,
420 "DSL: action not found"
421 );
422 }
423 }
424
425 Ok(())
426 }
427
428 async fn execute_update_action(
429 &mut self,
430 target_expr: &Expression,
431 updates: &HashMap<String, UpdateValue>,
432 context: &mut HookContext,
433 ) -> ClResult<()> {
434 let target = self.evaluator.evaluate(target_expr, context)?;
435 let action_id = match target {
436 Value::String(s) => s,
437 _ => {
438 return Err(Error::ValidationError(
439 "target must be a string (actionId)".to_string(),
440 ))
441 }
442 };
443
444 tracing::debug!("DSL: update_action target={}", action_id);
445
446 let tn_id = TnId(context.tenant_id as u32);
448
449 let action_data = self.app.meta_adapter.get_action_data(tn_id, &action_id).await?;
451
452 use cloudillo_types::types::Patch;
454 let mut update_opts = meta_adapter::UpdateActionDataOptions::default();
455
456 for (key, update_value) in updates {
457 match key.as_str() {
458 "status" => {
459 let value = match update_value {
460 UpdateValue::Direct(expr) | UpdateValue::Set { set: expr } => {
461 self.evaluator.evaluate(expr, context)?
462 }
463 _ => {
464 return Err(Error::ValidationError(
465 "status field does not support increment/decrement".to_string(),
466 ))
467 }
468 };
469 update_opts.status = if value.is_null() {
470 Patch::Null
471 } else if let Some(s) = value.as_str() {
472 let status_char = match s {
474 "confirmation" => 'C',
475 "notification" => 'N',
476 "active" => 'A',
477 "pending" => 'P',
478 "deleted" => 'D',
479 _ => s.chars().next().unwrap_or('A'),
480 };
481 Patch::Value(status_char)
482 } else {
483 Patch::Undefined
484 };
485 }
486 "subject" => {
487 let value = match update_value {
488 UpdateValue::Direct(expr) | UpdateValue::Set { set: expr } => {
489 self.evaluator.evaluate(expr, context)?
490 }
491 _ => {
492 return Err(Error::ValidationError(
493 "subject field does not support increment/decrement".to_string(),
494 ))
495 }
496 };
497 update_opts.subject = if value.is_null() {
498 Patch::Null
499 } else if let Some(s) = value.as_str() {
500 Patch::Value(s.to_string())
501 } else {
502 Patch::Undefined
503 };
504 }
505 "reactions" => {
506 let value = match update_value {
507 UpdateValue::Direct(expr) | UpdateValue::Set { set: expr } => {
508 self.evaluator.evaluate(expr, context)?
509 }
510 UpdateValue::Increment { increment } => {
511 let inc = self.evaluator.evaluate(increment, context)?;
512 let inc_val = inc.as_u64().ok_or_else(|| {
513 Error::ValidationError(
514 "increment value must be a number".to_string(),
515 )
516 })? as u32;
517
518 let current =
519 action_data.as_ref().and_then(|d| d.reactions).unwrap_or(0);
520 Value::from(current + inc_val)
521 }
522 UpdateValue::Decrement { decrement } => {
523 let dec = self.evaluator.evaluate(decrement, context)?;
524 let dec_val = dec.as_u64().ok_or_else(|| {
525 Error::ValidationError(
526 "decrement value must be a number".to_string(),
527 )
528 })? as u32;
529
530 let current =
531 action_data.as_ref().and_then(|d| d.reactions).unwrap_or(0);
532 Value::from(current.saturating_sub(dec_val))
533 }
534 };
535 update_opts.reactions = if value.is_null() {
536 Patch::Null
537 } else if let Some(v) = value.as_u64() {
538 Patch::Value(v as u32)
539 } else {
540 Patch::Undefined
541 };
542 }
543 "comments" => {
544 let value = match update_value {
545 UpdateValue::Direct(expr) | UpdateValue::Set { set: expr } => {
546 self.evaluator.evaluate(expr, context)?
547 }
548 UpdateValue::Increment { increment } => {
549 let inc = self.evaluator.evaluate(increment, context)?;
550 let inc_val = inc.as_u64().ok_or_else(|| {
551 Error::ValidationError(
552 "increment value must be a number".to_string(),
553 )
554 })? as u32;
555
556 let current =
557 action_data.as_ref().and_then(|d| d.comments).unwrap_or(0);
558 Value::from(current + inc_val)
559 }
560 UpdateValue::Decrement { decrement } => {
561 let dec = self.evaluator.evaluate(decrement, context)?;
562 let dec_val = dec.as_u64().ok_or_else(|| {
563 Error::ValidationError(
564 "decrement value must be a number".to_string(),
565 )
566 })? as u32;
567
568 let current =
569 action_data.as_ref().and_then(|d| d.comments).unwrap_or(0);
570 Value::from(current.saturating_sub(dec_val))
571 }
572 };
573 update_opts.comments = if value.is_null() {
574 Patch::Null
575 } else if let Some(v) = value.as_u64() {
576 Patch::Value(v as u32)
577 } else {
578 Patch::Undefined
579 };
580 }
581 _ => {
582 tracing::warn!("DSL: update_action ignoring unknown field '{}'", key);
583 }
584 }
585 }
586
587 self.app
589 .meta_adapter
590 .update_action_data(tn_id, &action_id, &update_opts)
591 .await?;
592
593 tracing::info!(
594 tenant_id = %tn_id.0,
595 action_id = %action_id,
596 updates = ?updates.keys().collect::<Vec<_>>(),
597 "DSL: updated action"
598 );
599
600 Ok(())
601 }
602
603 async fn execute_delete_action(
604 &mut self,
605 target_expr: &Expression,
606 context: &mut HookContext,
607 ) -> ClResult<()> {
608 let target = self.evaluator.evaluate(target_expr, context)?;
609 let action_id = match target {
610 Value::String(s) => s,
611 _ => {
612 return Err(Error::ValidationError(
613 "target must be a string (actionId)".to_string(),
614 ))
615 }
616 };
617
618 tracing::debug!("DSL: delete_action target={}", action_id);
619
620 let tn_id = TnId(context.tenant_id as u32);
622
623 self.app.meta_adapter.delete_action(tn_id, &action_id).await?;
625
626 tracing::info!(
627 tenant_id = %tn_id.0,
628 action_id = %action_id,
629 "DSL: deleted action"
630 );
631
632 Ok(())
633 }
634
635 fn execute_if<'b>(
638 &'b mut self,
639 condition: &'b Expression,
640 then_ops: &'b [Operation],
641 else_ops: &'b Option<Vec<Operation>>,
642 context: &'b mut HookContext,
643 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ClResult<()>> + Send + 'b>> {
644 Box::pin(async move {
645 let condition_value = self.evaluator.evaluate(condition, context)?;
646 let is_truthy = match condition_value {
647 Value::Bool(b) => b,
648 Value::Null => false,
649 Value::Number(n) => n.as_f64().unwrap_or(0.0) != 0.0,
650 Value::String(s) => !s.is_empty(),
651 _ => true,
652 };
653
654 if is_truthy {
655 for op in then_ops {
656 self.execute(op, context).await?;
657 }
658 } else if let Some(else_branch) = else_ops {
659 for op in else_branch {
660 self.execute(op, context).await?;
661 }
662 }
663
664 Ok(())
665 })
666 }
667
668 fn execute_switch<'b>(
669 &'b mut self,
670 value_expr: &'b Expression,
671 cases: &'b HashMap<String, Vec<Operation>>,
672 default: &'b Option<Vec<Operation>>,
673 context: &'b mut HookContext,
674 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ClResult<()>> + Send + 'b>> {
675 Box::pin(async move {
676 let value = self.evaluator.evaluate(value_expr, context)?;
677 let value_str = match value {
678 Value::String(s) => s,
679 Value::Number(n) => n.to_string(),
680 Value::Bool(b) => b.to_string(),
681 Value::Null => "null".to_string(),
682 _ => value.to_string(),
683 };
684
685 if let Some(case_ops) = cases.get(&value_str) {
686 for op in case_ops {
687 self.execute(op, context).await?;
688 }
689 } else if let Some(default_ops) = default {
690 for op in default_ops {
691 self.execute(op, context).await?;
692 }
693 }
694
695 Ok(())
696 })
697 }
698
699 fn execute_foreach<'b>(
700 &'b mut self,
701 array_expr: &'b Expression,
702 as_var: &'b Option<String>,
703 do_ops: &'b [Operation],
704 context: &'b mut HookContext,
705 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ClResult<()>> + Send + 'b>> {
706 Box::pin(async move {
707 let array_value = self.evaluator.evaluate(array_expr, context)?;
708 let array = match array_value {
709 Value::Array(arr) => arr,
710 _ => return Err(Error::ValidationError("foreach requires an array".to_string())),
711 };
712
713 if array.len() > 100 {
715 return Err(Error::ValidationError(format!(
716 "foreach limited to 100 items, got {}",
717 array.len()
718 )));
719 }
720
721 for item in array {
722 if let Some(var_name) = as_var {
724 context.vars.insert(var_name.clone(), item.clone());
725 }
726
727 for op in do_ops {
729 self.execute(op, context).await?;
730 }
731 }
732
733 Ok(())
734 })
735 }
736
737 async fn execute_set(
740 &mut self,
741 var_name: &str,
742 value_expr: &Expression,
743 context: &mut HookContext,
744 ) -> ClResult<()> {
745 let value = self.evaluator.evaluate(value_expr, context)?;
746 context.vars.insert(var_name.to_string(), value);
747 Ok(())
748 }
749
750 async fn execute_get(
751 &mut self,
752 var_name: &str,
753 from_expr: &Expression,
754 context: &mut HookContext,
755 ) -> ClResult<()> {
756 let value = self.evaluator.evaluate(from_expr, context)?;
757 context.vars.insert(var_name.to_string(), value);
758 Ok(())
759 }
760
761 async fn execute_merge(
762 &mut self,
763 objects: &[Expression],
764 as_var: &str,
765 context: &mut HookContext,
766 ) -> ClResult<()> {
767 let mut merged = serde_json::Map::new();
768
769 for obj_expr in objects {
770 let value = self.evaluator.evaluate(obj_expr, context)?;
771 if let Value::Object(obj) = value {
772 for (k, v) in obj {
773 merged.insert(k, v);
774 }
775 }
776 }
777
778 context.vars.insert(as_var.to_string(), Value::Object(merged));
779 Ok(())
780 }
781
782 async fn execute_broadcast_to_followers(
785 &mut self,
786 action_id: &Expression,
787 token: &Expression,
788 context: &mut HookContext,
789 ) -> ClResult<()> {
790 let action_id_val = self.evaluator.evaluate(action_id, context)?;
791 let action_id_str = match action_id_val {
792 Value::String(s) => s,
793 _ => return Err(Error::ValidationError("action_id must be a string".to_string())),
794 };
795
796 let token_val = self.evaluator.evaluate(token, context)?;
797 let _token_str = match token_val {
798 Value::String(s) => s,
799 _ => return Err(Error::ValidationError("token must be a string".to_string())),
800 };
801
802 tracing::debug!(
803 "DSL: broadcast_to_followers action_id={} (querying for followers)",
804 action_id_str
805 );
806
807 let tn_id = TnId(context.tenant_id as u32);
809
810 let follower_actions = self
812 .app
813 .meta_adapter
814 .list_actions(
815 tn_id,
816 &meta_adapter::ListActionOptions {
817 typ: Some(vec!["FLLW".into(), "CONN".into()]),
818 ..Default::default()
819 },
820 )
821 .await?;
822
823 let mut follower_set = HashSet::new();
826 for action_view in follower_actions {
827 if action_view.issuer.id_tag.as_ref() != context.tenant_tag.as_str() {
828 follower_set.insert(action_view.issuer.id_tag.clone());
829 }
830 }
831
832 let recipients: Vec<Box<str>> = follower_set.into_iter().collect();
833 tracing::info!(
834 tenant_id = %tn_id.0,
835 action_id = %action_id_str,
836 followers = %recipients.len(),
837 "DSL: broadcasting to followers"
838 );
839
840 for recipient_tag in recipients {
842 tracing::debug!(
843 "DSL: creating delivery task for action {} to {}",
844 action_id_str,
845 recipient_tag
846 );
847
848 let delivery_task = ActionDeliveryTask::new(
849 tn_id,
850 action_id_str.clone().into_boxed_str(),
851 recipient_tag.clone(), recipient_tag.clone(), );
854
855 let task_key = format!("delivery:{}:{}", action_id_str, recipient_tag);
857
858 let retry_policy = RetryPolicy::new((10, 43200), 50);
860
861 self.app
863 .scheduler
864 .task(delivery_task)
865 .key(&task_key)
866 .with_retry(retry_policy)
867 .schedule()
868 .await?;
869 }
870
871 Ok(())
872 }
873
874 async fn execute_send_to_audience(
875 &mut self,
876 action_id: &Expression,
877 token: &Expression,
878 audience: &Expression,
879 context: &mut HookContext,
880 ) -> ClResult<()> {
881 let action_id_val = self.evaluator.evaluate(action_id, context)?;
882 let action_id_str = match action_id_val {
883 Value::String(s) => s,
884 _ => return Err(Error::ValidationError("action_id must be a string".to_string())),
885 };
886
887 let token_val = self.evaluator.evaluate(token, context)?;
888 let _token_str = match token_val {
889 Value::String(s) => s,
890 _ => return Err(Error::ValidationError("token must be a string".to_string())),
891 };
892
893 let audience_val = self.evaluator.evaluate(audience, context)?;
894 let audience_tag = match audience_val {
895 Value::String(s) => s,
896 _ => {
897 return Err(Error::ValidationError("audience must be a string (idTag)".to_string()))
898 }
899 };
900
901 tracing::debug!(
902 "DSL: send_to_audience action_id={} audience={}",
903 action_id_str,
904 audience_tag
905 );
906
907 let tn_id = TnId(context.tenant_id as u32);
909
910 if audience_tag.as_str() == context.tenant_tag.as_str() {
912 tracing::debug!("DSL: skipping send_to_audience (audience is self): {}", audience_tag);
913 return Ok(());
914 }
915
916 tracing::debug!(
918 "DSL: creating delivery task for action {} to {}",
919 action_id_str,
920 audience_tag
921 );
922
923 let delivery_task = ActionDeliveryTask::new(
924 tn_id,
925 action_id_str.clone().into_boxed_str(),
926 audience_tag.clone().into_boxed_str(), audience_tag.clone().into_boxed_str(), );
929
930 let task_key = format!("delivery:{}:{}", action_id_str, audience_tag);
932
933 let retry_policy = RetryPolicy::new((10, 43200), 50);
935
936 self.app
938 .scheduler
939 .task(delivery_task)
940 .key(&task_key)
941 .with_retry(retry_policy)
942 .schedule()
943 .await?;
944
945 tracing::info!(
946 tenant_id = %tn_id.0,
947 action_id = %action_id_str,
948 audience = %audience_tag,
949 "DSL: sent action to audience"
950 );
951
952 Ok(())
953 }
954
955 async fn execute_create_notification(
958 &mut self,
959 user: &Expression,
960 notification_type: &Expression,
961 action_id: &Expression,
962 priority: &Option<Expression>,
963 context: &mut HookContext,
964 ) -> ClResult<()> {
965 let user_val = self.evaluator.evaluate(user, context)?;
966 let type_val = self.evaluator.evaluate(notification_type, context)?;
967 let action_id_val = self.evaluator.evaluate(action_id, context)?;
968
969 let priority_val = if let Some(expr) = priority {
970 Some(self.evaluator.evaluate(expr, context)?)
971 } else {
972 None
973 };
974
975 let user_id = match user_val {
977 Value::String(s) => s,
978 _ => return Err(Error::ValidationError("user must be a string".to_string())),
979 };
980
981 let notification_type = match type_val {
982 Value::String(s) => s,
983 _ => {
984 return Err(Error::ValidationError(
985 "notification_type must be a string".to_string(),
986 ))
987 }
988 };
989
990 let action_id_str = match action_id_val {
991 Value::String(s) => s,
992 _ => return Err(Error::ValidationError("action_id must be a string".to_string())),
993 };
994
995 tracing::debug!(
996 "DSL: create_notification user={} type={} action_id={}",
997 user_id,
998 notification_type,
999 action_id_str
1000 );
1001
1002 let notification_data = serde_json::json!({
1004 "type": notification_type,
1005 "action_id": action_id_str,
1006 "priority": priority_val,
1007 "timestamp": std::time::SystemTime::now()
1008 .duration_since(std::time::UNIX_EPOCH)
1009 .unwrap_or_default()
1010 .as_secs(),
1011 });
1012
1013 let tn_id = cloudillo_types::types::TnId(context.tenant_id as u32);
1015 let broadcast_msg =
1016 BroadcastMessage::new("notification", notification_data, context.tenant_tag.clone());
1017
1018 let _ = self.app.broadcast.send_to_user(tn_id, &user_id, broadcast_msg).await;
1019
1020 tracing::info!(
1021 tenant_id = %context.tenant_id,
1022 user = %user_id,
1023 notification_type = %notification_type,
1024 action_id = %action_id_str,
1025 "DSL: sent notification"
1026 );
1027
1028 Ok(())
1029 }
1030
1031 async fn execute_log(
1034 &mut self,
1035 level: &Option<String>,
1036 message: &Expression,
1037 context: &mut HookContext,
1038 ) -> ClResult<()> {
1039 let message_val = self.evaluator.evaluate(message, context)?;
1040 let message_str = match message_val {
1041 Value::String(s) => s,
1042 v => v.to_string(),
1043 };
1044
1045 match level.as_deref() {
1046 Some("error") => tracing::error!("DSL: {}", message_str),
1047 Some("warn") => tracing::warn!("DSL: {}", message_str),
1048 Some("debug") => tracing::debug!("DSL: {}", message_str),
1049 Some("trace") => tracing::trace!("DSL: {}", message_str),
1050 _ => tracing::info!("DSL: {}", message_str),
1051 }
1052
1053 Ok(())
1054 }
1055
1056 async fn execute_abort(
1057 &mut self,
1058 error: &Expression,
1059 code: &Option<String>,
1060 context: &mut HookContext,
1061 ) -> ClResult<()> {
1062 let error_val = self.evaluator.evaluate(error, context)?;
1063 let error_str = match error_val {
1064 Value::String(s) => s,
1065 v => v.to_string(),
1066 };
1067
1068 let full_error = if let Some(code_str) = code {
1069 format!("Operation aborted [{}]: {}", code_str, error_str)
1070 } else {
1071 format!("Operation aborted: {}", error_str)
1072 };
1073
1074 Err(Error::ValidationError(full_error))
1075 }
1076}
1077
1078