1use super::operations::{OperationExecutor, EARLY_RETURN_MARKER};
7use super::types::*;
8use super::validator;
9use crate::hooks::{HookContext, HookResult, HookType};
10use crate::prelude::*;
11use std::collections::HashMap;
12use std::path::Path;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::time::timeout;
16
17const HOOK_TIMEOUT: Duration = Duration::from_secs(5);
19
20#[derive(Default)]
22pub struct DslEngine {
23 definitions: HashMap<String, ActionDefinition>,
24}
25
26impl DslEngine {
27 pub fn new() -> Self {
29 Self::default()
30 }
31
32 pub fn load_definition_from_file(&mut self, path: impl AsRef<Path>) -> ClResult<()> {
34 let content = std::fs::read_to_string(path)?;
35 let definition: ActionDefinition = serde_json::from_str(&content).map_err(|e| {
36 tracing::error!("Failed to parse DSL definition: {}", e);
37 Error::Parse
38 })?;
39
40 if let Err(errors) = validator::validate_definition(&definition) {
42 let error_msg = errors
43 .iter()
44 .map(|e| format!("{}: {}", e.path, e.message))
45 .collect::<Vec<_>>()
46 .join(", ");
47 tracing::error!("Invalid DSL definition: {}", error_msg);
48 return Err(Error::ValidationError(format!(
49 "Invalid action definition: {}",
50 error_msg
51 )));
52 }
53
54 let action_type = definition.r#type.clone();
55 self.definitions.insert(action_type.clone(), definition);
56
57 tracing::info!("Loaded DSL definition: {}", action_type);
58 Ok(())
59 }
60
61 pub fn load_definition_from_json(&mut self, json: &str) -> ClResult<()> {
63 let definition: ActionDefinition = serde_json::from_str(json).map_err(|e| {
64 tracing::error!("Failed to parse DSL definition: {}", e);
65 Error::Parse
66 })?;
67
68 if let Err(errors) = validator::validate_definition(&definition) {
70 let error_msg = errors
71 .iter()
72 .map(|e| format!("{}: {}", e.path, e.message))
73 .collect::<Vec<_>>()
74 .join(", ");
75 tracing::error!("Invalid DSL definition: {}", error_msg);
76 return Err(Error::ValidationError(format!(
77 "Invalid action definition: {}",
78 error_msg
79 )));
80 }
81
82 let action_type = definition.r#type.clone();
83 self.definitions.insert(action_type.clone(), definition);
84
85 tracing::info!("Loaded DSL definition: {}", action_type);
86 Ok(())
87 }
88
89 pub fn load_definition(&mut self, definition: ActionDefinition) {
91 let action_type = definition.r#type.clone();
92 self.definitions.insert(action_type.clone(), definition);
93 tracing::info!("Loaded DSL definition: {}", action_type);
94 }
95
96 pub fn load_definitions_from_dir(&mut self, dir: impl AsRef<Path>) -> ClResult<usize> {
98 let dir = dir.as_ref();
99 let mut count = 0;
100
101 for entry in std::fs::read_dir(dir)? {
102 let entry = entry?;
103 let path = entry.path();
104
105 if path.extension().and_then(|s| s.to_str()) == Some("json") {
106 match self.load_definition_from_file(&path) {
107 Ok(()) => count += 1,
108 Err(e) => {
109 tracing::error!("Failed to load definition from {:?}: {}", path, e);
110 }
111 }
112 }
113 }
114
115 tracing::info!("Loaded {} DSL definitions from {:?}", count, dir);
116 Ok(count)
117 }
118
119 pub fn get_definition(&self, action_type: &str) -> Option<&ActionDefinition> {
121 self.definitions.get(action_type)
122 }
123
124 pub fn has_definition(&self, action_type: &str) -> bool {
126 self.definitions.contains_key(action_type)
127 }
128
129 pub fn resolve_action_type(&self, typ: &str, sub_typ: Option<&str>) -> Option<String> {
132 if let Some(st) = sub_typ {
133 let full = format!("{}:{}", typ, st);
134 if self.definitions.contains_key(&full) {
135 return Some(full);
136 }
137 }
138 if self.definitions.contains_key(typ) {
139 Some(typ.to_string())
140 } else {
141 None
142 }
143 }
144
145 pub async fn execute_hook(
147 &self,
148 app: &App,
149 action_type: &str,
150 hook_type: HookType,
151 mut context: HookContext,
152 ) -> ClResult<()> {
153 use crate::hooks::HookImplementation;
154
155 let definition = self.definitions.get(action_type).ok_or_else(|| {
156 Error::ValidationError(format!("Action definition not found: {}", action_type))
157 })?;
158
159 let implementation = match hook_type {
160 HookType::OnCreate => &definition.hooks.on_create,
161 HookType::OnReceive => &definition.hooks.on_receive,
162 HookType::OnAccept => &definition.hooks.on_accept,
163 HookType::OnReject => &definition.hooks.on_reject,
164 };
165
166 match implementation {
168 HookImplementation::None => {
169 let hook_reg = app.ext::<Arc<tokio::sync::RwLock<crate::hooks::HookRegistry>>>()?;
171 let registry = hook_reg.read().await;
172 if let Some(hook_fn) = registry.get_hook(action_type, hook_type) {
173 let hook_fn = hook_fn.clone();
174 drop(registry);
175 match timeout(HOOK_TIMEOUT, hook_fn(app.clone(), context)).await {
176 Ok(Ok(hook_result)) => {
177 if !hook_result.continue_processing {
178 tracing::debug!("Native hook requested to abort processing");
179 }
180 Ok(())
181 }
182 Ok(Err(e)) => Err(e),
183 Err(_) => Err(Error::Timeout),
184 }
185 } else {
186 drop(registry);
187 Ok(())
188 }
189 }
190
191 HookImplementation::Dsl(operations) => {
192 if operations.is_empty() {
193 return Ok(());
194 }
195
196 let execution = async {
198 let mut executor = OperationExecutor::new(app);
199
200 for operation in operations {
201 match executor.execute(operation, &mut context).await {
202 Ok(()) => {}
203 Err(Error::ValidationError(ref msg)) if msg == EARLY_RETURN_MARKER => {
204 tracing::debug!("DSL hook early return");
205 break;
206 }
207 Err(e) => return Err(e),
208 }
209 }
210
211 Ok(())
212 };
213
214 match timeout(HOOK_TIMEOUT, execution).await {
215 Ok(result) => result,
216 Err(_) => Err(Error::Timeout),
217 }
218 }
219
220 HookImplementation::Native(_) => {
221 let hook_reg = app.ext::<Arc<tokio::sync::RwLock<crate::hooks::HookRegistry>>>()?;
223 let registry = hook_reg.read().await;
224 if let Some(hook_fn) = registry.get_hook(action_type, hook_type) {
225 let hook_fn = hook_fn.clone();
226 drop(registry);
227 match timeout(HOOK_TIMEOUT, hook_fn(app.clone(), context)).await {
228 Ok(Ok(hook_result)) => {
229 if !hook_result.continue_processing {
232 tracing::debug!("Native hook requested to abort processing");
233 }
234 Ok(())
235 }
236 Ok(Err(e)) => Err(e),
237 Err(_) => Err(Error::Timeout),
238 }
239 } else {
240 drop(registry);
241 tracing::warn!(
242 "Native hook not found in registry for {} hook on action type: {}",
243 hook_type.as_str(),
244 action_type
245 );
246 Ok(())
247 }
248 }
249
250 HookImplementation::Hybrid { dsl, .. } => {
251 if !dsl.is_empty() {
253 let execution = async {
254 let mut executor = OperationExecutor::new(app);
255
256 for operation in dsl {
257 match executor.execute(operation, &mut context).await {
258 Ok(()) => {}
259 Err(Error::ValidationError(ref msg))
260 if msg == EARLY_RETURN_MARKER =>
261 {
262 tracing::debug!("DSL hook early return");
263 break;
264 }
265 Err(e) => return Err(e),
266 }
267 }
268
269 Ok(())
270 };
271
272 match timeout(HOOK_TIMEOUT, execution).await {
273 Ok(result) => result?,
274 Err(_) => return Err(Error::Timeout),
275 }
276 }
277
278 let hook_reg = app.ext::<Arc<tokio::sync::RwLock<crate::hooks::HookRegistry>>>()?;
280 let registry = hook_reg.read().await;
281 if let Some(hook_fn) = registry.get_hook(action_type, hook_type) {
282 let hook_fn = hook_fn.clone();
283 drop(registry);
284 match timeout(HOOK_TIMEOUT, hook_fn(app.clone(), context)).await {
285 Ok(Ok(hook_result)) => {
286 if !hook_result.continue_processing {
287 tracing::debug!("Hybrid native hook requested to abort processing");
288 }
289 Ok(())
290 }
291 Ok(Err(e)) => Err(e),
292 Err(_) => Err(Error::Timeout),
293 }
294 } else {
295 drop(registry);
296 Ok(())
297 }
298 }
299 }
300 }
301
302 pub async fn execute_hook_with_result(
305 &self,
306 app: &App,
307 action_type: &str,
308 hook_type: HookType,
309 mut context: HookContext,
310 ) -> ClResult<HookResult> {
311 use crate::hooks::HookImplementation;
312
313 let definition = self.definitions.get(action_type).ok_or_else(|| {
314 Error::ValidationError(format!("Action definition not found: {}", action_type))
315 })?;
316
317 let implementation = match hook_type {
318 HookType::OnCreate => &definition.hooks.on_create,
319 HookType::OnReceive => &definition.hooks.on_receive,
320 HookType::OnAccept => &definition.hooks.on_accept,
321 HookType::OnReject => &definition.hooks.on_reject,
322 };
323
324 match implementation {
326 HookImplementation::None => {
327 let hook_reg = app.ext::<Arc<tokio::sync::RwLock<crate::hooks::HookRegistry>>>()?;
329 let registry = hook_reg.read().await;
330 if let Some(hook_fn) = registry.get_hook(action_type, hook_type) {
331 let hook_fn = hook_fn.clone();
332 drop(registry);
333 match timeout(HOOK_TIMEOUT, hook_fn(app.clone(), context)).await {
334 Ok(Ok(hook_result)) => Ok(hook_result),
335 Ok(Err(e)) => Err(e),
336 Err(_) => Err(Error::Timeout),
337 }
338 } else {
339 drop(registry);
340 Ok(HookResult::default())
341 }
342 }
343
344 HookImplementation::Dsl(operations) => {
345 if operations.is_empty() {
346 return Ok(HookResult::default());
347 }
348
349 let execution = async {
351 let mut executor = OperationExecutor::new(app);
352
353 for operation in operations {
354 match executor.execute(operation, &mut context).await {
355 Ok(()) => {}
356 Err(Error::ValidationError(ref msg)) if msg == EARLY_RETURN_MARKER => {
357 tracing::debug!("DSL hook early return");
358 break;
359 }
360 Err(e) => return Err(e),
361 }
362 }
363
364 Ok(HookResult {
365 vars: context.vars.clone(),
366 continue_processing: true,
367 return_value: None,
368 })
369 };
370
371 match timeout(HOOK_TIMEOUT, execution).await {
372 Ok(result) => result,
373 Err(_) => Err(Error::Timeout),
374 }
375 }
376
377 HookImplementation::Native(_) => {
378 let hook_reg = app.ext::<Arc<tokio::sync::RwLock<crate::hooks::HookRegistry>>>()?;
380 let registry = hook_reg.read().await;
381 if let Some(hook_fn) = registry.get_hook(action_type, hook_type) {
382 let hook_fn = hook_fn.clone();
383 drop(registry);
384 match timeout(HOOK_TIMEOUT, hook_fn(app.clone(), context)).await {
385 Ok(Ok(hook_result)) => Ok(hook_result),
386 Ok(Err(e)) => Err(e),
387 Err(_) => Err(Error::Timeout),
388 }
389 } else {
390 drop(registry);
391 tracing::warn!(
392 "Native hook not found in registry for {} hook on action type: {}",
393 hook_type.as_str(),
394 action_type
395 );
396 Ok(HookResult::default())
397 }
398 }
399
400 HookImplementation::Hybrid { dsl, .. } => {
401 if !dsl.is_empty() {
403 let execution = async {
404 let mut executor = OperationExecutor::new(app);
405
406 for operation in dsl {
407 match executor.execute(operation, &mut context).await {
408 Ok(()) => {}
409 Err(Error::ValidationError(ref msg))
410 if msg == EARLY_RETURN_MARKER =>
411 {
412 tracing::debug!("DSL hook early return");
413 break;
414 }
415 Err(e) => return Err(e),
416 }
417 }
418
419 Ok(())
420 };
421
422 match timeout(HOOK_TIMEOUT, execution).await {
423 Ok(result) => result?,
424 Err(_) => return Err(Error::Timeout),
425 }
426 }
427
428 let hook_reg = app.ext::<Arc<tokio::sync::RwLock<crate::hooks::HookRegistry>>>()?;
430 let registry = hook_reg.read().await;
431 if let Some(hook_fn) = registry.get_hook(action_type, hook_type) {
432 let hook_fn = hook_fn.clone();
433 drop(registry);
434 match timeout(HOOK_TIMEOUT, hook_fn(app.clone(), context)).await {
435 Ok(Ok(hook_result)) => Ok(hook_result),
436 Ok(Err(e)) => Err(e),
437 Err(_) => Err(Error::Timeout),
438 }
439 } else {
440 drop(registry);
441 Ok(HookResult::default())
442 }
443 }
444 }
445 }
446
447 pub fn get_behavior(&self, action_type: &str) -> Option<&BehaviorFlags> {
449 self.definitions.get(action_type).map(|d| &d.behavior)
450 }
451
452 pub fn get_field_constraints(&self, action_type: &str) -> Option<&FieldConstraints> {
454 self.definitions.get(action_type).map(|d| &d.fields)
455 }
456
457 pub fn get_key_pattern(&self, action_type: &str) -> Option<&str> {
459 self.definitions.get(action_type).and_then(|d| d.key_pattern.as_deref())
460 }
461
462 pub fn validate_content(
467 &self,
468 action_type: &str,
469 content: Option<&serde_json::Value>,
470 ) -> ClResult<()> {
471 let definition = self
473 .definitions
474 .get(action_type)
475 .or_else(|| action_type.split(':').next().and_then(|base| self.definitions.get(base)))
476 .ok_or_else(|| {
477 Error::ValidationError(format!("Unknown action type: {}", action_type))
478 })?;
479
480 if let Some(FieldConstraint::Required) = definition.fields.content {
482 if content.is_none() || matches!(content, Some(serde_json::Value::Null)) {
483 return Err(Error::ValidationError(format!(
484 "Content is required for action type {}",
485 action_type
486 )));
487 }
488 }
489
490 if let Some(FieldConstraint::Forbidden) = definition.fields.content {
491 if content.is_some() && !matches!(content, Some(serde_json::Value::Null)) {
492 return Err(Error::ValidationError(format!(
493 "Content is forbidden for action type {}",
494 action_type
495 )));
496 }
497 }
498
499 let Some(schema_wrapper) = &definition.schema else {
501 return Ok(());
502 };
503 let Some(schema) = &schema_wrapper.content else {
504 return Ok(());
505 };
506 let Some(content) = content else {
507 return Ok(());
508 };
509
510 self.validate_value_against_schema(content, schema, "content")
512 }
513
514 fn validate_value_against_schema(
516 &self,
517 value: &serde_json::Value,
518 schema: &ContentSchema,
519 path: &str,
520 ) -> ClResult<()> {
521 match schema.content_type {
522 ContentType::String => {
523 let s = value
524 .as_str()
525 .ok_or_else(|| Error::ValidationError(format!("{}: expected string", path)))?;
526
527 if let Some(min) = schema.min_length {
529 if s.len() < min {
530 return Err(Error::ValidationError(format!(
531 "{}: string too short (min {})",
532 path, min
533 )));
534 }
535 }
536
537 if let Some(max) = schema.max_length {
539 if s.len() > max {
540 return Err(Error::ValidationError(format!(
541 "{}: string too long (max {})",
542 path, max
543 )));
544 }
545 }
546
547 if let Some(ref pattern) = schema.pattern {
549 let re = regex::Regex::new(pattern).map_err(|e| {
550 Error::ValidationError(format!("{}: invalid pattern: {}", path, e))
551 })?;
552 if !re.is_match(s) {
553 return Err(Error::ValidationError(format!(
554 "{}: string does not match pattern",
555 path
556 )));
557 }
558 }
559
560 if let Some(ref allowed) = schema.r#enum {
562 let string_val = serde_json::Value::String(s.to_string());
563 if !allowed.contains(&string_val) {
564 return Err(Error::ValidationError(format!(
565 "{}: value not in allowed enum",
566 path
567 )));
568 }
569 }
570 }
571
572 ContentType::Number => {
573 if !value.is_number() {
574 return Err(Error::ValidationError(format!("{}: expected number", path)));
575 }
576
577 if let Some(ref allowed) = schema.r#enum {
579 if !allowed.contains(value) {
580 return Err(Error::ValidationError(format!(
581 "{}: value not in allowed enum",
582 path
583 )));
584 }
585 }
586 }
587
588 ContentType::Boolean => {
589 if !value.is_boolean() {
590 return Err(Error::ValidationError(format!("{}: expected boolean", path)));
591 }
592 }
593
594 ContentType::Object => {
595 let obj = value
596 .as_object()
597 .ok_or_else(|| Error::ValidationError(format!("{}: expected object", path)))?;
598
599 if let Some(ref required) = schema.required {
601 for prop in required {
602 if !obj.contains_key(prop) {
603 return Err(Error::ValidationError(format!(
604 "{}: missing required property '{}'",
605 path, prop
606 )));
607 }
608 }
609 }
610
611 if let Some(ref properties) = schema.properties {
613 for (prop_name, prop_schema) in properties {
614 if let Some(prop_value) = obj.get(prop_name) {
615 self.validate_field_value(
616 prop_value,
617 prop_schema,
618 &format!("{}.{}", path, prop_name),
619 )?;
620 }
621 }
622 }
623 }
624
625 ContentType::Json => {
626 }
628 }
629
630 Ok(())
631 }
632
633 fn validate_field_value(
635 &self,
636 value: &serde_json::Value,
637 schema: &SchemaField,
638 path: &str,
639 ) -> ClResult<()> {
640 match schema.field_type {
641 FieldType::String => {
642 let s = value
643 .as_str()
644 .ok_or_else(|| Error::ValidationError(format!("{}: expected string", path)))?;
645
646 if let Some(min) = schema.min_length {
647 if s.len() < min {
648 return Err(Error::ValidationError(format!(
649 "{}: string too short (min {})",
650 path, min
651 )));
652 }
653 }
654
655 if let Some(max) = schema.max_length {
656 if s.len() > max {
657 return Err(Error::ValidationError(format!(
658 "{}: string too long (max {})",
659 path, max
660 )));
661 }
662 }
663
664 if let Some(ref allowed) = schema.r#enum {
665 let string_val = serde_json::Value::String(s.to_string());
666 if !allowed.contains(&string_val) {
667 return Err(Error::ValidationError(format!(
668 "{}: value '{}' not in allowed enum",
669 path, s
670 )));
671 }
672 }
673 }
674
675 FieldType::Number => {
676 if !value.is_number() {
677 return Err(Error::ValidationError(format!("{}: expected number", path)));
678 }
679 }
680
681 FieldType::Boolean => {
682 if !value.is_boolean() {
683 return Err(Error::ValidationError(format!("{}: expected boolean", path)));
684 }
685 }
686
687 FieldType::Array => {
688 let arr = value
689 .as_array()
690 .ok_or_else(|| Error::ValidationError(format!("{}: expected array", path)))?;
691
692 if let Some(ref item_schema) = schema.items {
693 for (i, item) in arr.iter().enumerate() {
694 self.validate_field_value(item, item_schema, &format!("{}[{}]", path, i))?;
695 }
696 }
697 }
698
699 FieldType::Json => {
700 }
702 }
703
704 Ok(())
705 }
706
707 pub fn list_action_types(&self) -> Vec<String> {
709 self.definitions.keys().cloned().collect()
710 }
711
712 pub fn stats(&self) -> DslEngineStats {
714 let total_definitions = self.definitions.len();
715 let mut hook_counts = HookCounts::default();
716
717 for def in self.definitions.values() {
718 if def.hooks.on_create.is_some() {
719 hook_counts.on_create += 1;
720 }
721 if def.hooks.on_receive.is_some() {
722 hook_counts.on_receive += 1;
723 }
724 if def.hooks.on_accept.is_some() {
725 hook_counts.on_accept += 1;
726 }
727 if def.hooks.on_reject.is_some() {
728 hook_counts.on_reject += 1;
729 }
730 }
731
732 DslEngineStats { total_definitions, hook_counts }
733 }
734}
735
736#[derive(Debug, Clone)]
738pub struct DslEngineStats {
739 pub total_definitions: usize,
740 pub hook_counts: HookCounts,
741}
742
743#[derive(Debug, Clone, Default)]
745pub struct HookCounts {
746 pub on_create: usize,
747 pub on_receive: usize,
748 pub on_accept: usize,
749 pub on_reject: usize,
750}
751
752#[cfg(test)]
753mod tests {
754 #[test]
755 fn test_load_definition_from_json() {
756 let _json = r#"
757 {
758 "type": "TEST",
759 "version": "1.0",
760 "description": "Test action",
761 "fields": {},
762 "behavior": {},
763 "hooks": {}
764 }
765 "#;
766
767 }
770}
771
772