Skip to main content

cloudillo_action/dsl/
engine.rs

1//! DSL Engine for executing action type hooks
2//!
3//! The engine loads action definitions, validates them, and executes lifecycle hooks
4//! (on_create, on_receive, on_accept, on_reject) with proper resource limits and error handling.
5
6use super::operations::{OperationExecutor, EARLY_RETURN_MARKER};
7use super::types::*;
8use super::validator;
9use crate::hooks::{HookContext, HookResult, HookType};
10use crate::prelude::*;
11use std::collections::HashMap;
12use std::path::Path;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::time::timeout;
16
17/// Maximum hook execution time
18const HOOK_TIMEOUT: Duration = Duration::from_secs(5);
19
20/// DSL Engine - loads and executes action type definitions
21#[derive(Default)]
22pub struct DslEngine {
23	definitions: HashMap<String, ActionDefinition>,
24}
25
26impl DslEngine {
27	/// Create a new DSL engine
28	pub fn new() -> Self {
29		Self::default()
30	}
31
32	/// Load action definition from JSON file
33	pub fn load_definition_from_file(&mut self, path: impl AsRef<Path>) -> ClResult<()> {
34		let content = std::fs::read_to_string(path)?;
35		let definition: ActionDefinition = serde_json::from_str(&content).map_err(|e| {
36			tracing::error!("Failed to parse DSL definition: {}", e);
37			Error::Parse
38		})?;
39
40		// Validate definition
41		if let Err(errors) = validator::validate_definition(&definition) {
42			let error_msg = errors
43				.iter()
44				.map(|e| format!("{}: {}", e.path, e.message))
45				.collect::<Vec<_>>()
46				.join(", ");
47			tracing::error!("Invalid DSL definition: {}", error_msg);
48			return Err(Error::ValidationError(format!(
49				"Invalid action definition: {}",
50				error_msg
51			)));
52		}
53
54		let action_type = definition.r#type.clone();
55		self.definitions.insert(action_type.clone(), definition);
56
57		tracing::info!("Loaded DSL definition: {}", action_type);
58		Ok(())
59	}
60
61	/// Load action definition from JSON string
62	pub fn load_definition_from_json(&mut self, json: &str) -> ClResult<()> {
63		let definition: ActionDefinition = serde_json::from_str(json).map_err(|e| {
64			tracing::error!("Failed to parse DSL definition: {}", e);
65			Error::Parse
66		})?;
67
68		// Validate definition
69		if let Err(errors) = validator::validate_definition(&definition) {
70			let error_msg = errors
71				.iter()
72				.map(|e| format!("{}: {}", e.path, e.message))
73				.collect::<Vec<_>>()
74				.join(", ");
75			tracing::error!("Invalid DSL definition: {}", error_msg);
76			return Err(Error::ValidationError(format!(
77				"Invalid action definition: {}",
78				error_msg
79			)));
80		}
81
82		let action_type = definition.r#type.clone();
83		self.definitions.insert(action_type.clone(), definition);
84
85		tracing::info!("Loaded DSL definition: {}", action_type);
86		Ok(())
87	}
88
89	/// Load action definition directly
90	pub fn load_definition(&mut self, definition: ActionDefinition) {
91		let action_type = definition.r#type.clone();
92		self.definitions.insert(action_type.clone(), definition);
93		tracing::info!("Loaded DSL definition: {}", action_type);
94	}
95
96	/// Load all definitions from a directory
97	pub fn load_definitions_from_dir(&mut self, dir: impl AsRef<Path>) -> ClResult<usize> {
98		let dir = dir.as_ref();
99		let mut count = 0;
100
101		for entry in std::fs::read_dir(dir)? {
102			let entry = entry?;
103			let path = entry.path();
104
105			if path.extension().and_then(|s| s.to_str()) == Some("json") {
106				match self.load_definition_from_file(&path) {
107					Ok(()) => count += 1,
108					Err(e) => {
109						tracing::error!("Failed to load definition from {:?}: {}", path, e);
110					}
111				}
112			}
113		}
114
115		tracing::info!("Loaded {} DSL definitions from {:?}", count, dir);
116		Ok(count)
117	}
118
119	/// Get action definition
120	pub fn get_definition(&self, action_type: &str) -> Option<&ActionDefinition> {
121		self.definitions.get(action_type)
122	}
123
124	/// Check if action type has DSL definition
125	pub fn has_definition(&self, action_type: &str) -> bool {
126		self.definitions.contains_key(action_type)
127	}
128
129	/// Resolve action type for hook lookup.
130	/// Tries full type (typ:sub_typ) first, then falls back to base type.
131	pub fn resolve_action_type(&self, typ: &str, sub_typ: Option<&str>) -> Option<String> {
132		if let Some(st) = sub_typ {
133			let full = format!("{}:{}", typ, st);
134			if self.definitions.contains_key(&full) {
135				return Some(full);
136			}
137		}
138		if self.definitions.contains_key(typ) {
139			Some(typ.to_string())
140		} else {
141			None
142		}
143	}
144
145	/// Execute a hook for an action type
146	pub async fn execute_hook(
147		&self,
148		app: &App,
149		action_type: &str,
150		hook_type: HookType,
151		mut context: HookContext,
152	) -> ClResult<()> {
153		use crate::hooks::HookImplementation;
154
155		let definition = self.definitions.get(action_type).ok_or_else(|| {
156			Error::ValidationError(format!("Action definition not found: {}", action_type))
157		})?;
158
159		let implementation = match hook_type {
160			HookType::OnCreate => &definition.hooks.on_create,
161			HookType::OnReceive => &definition.hooks.on_receive,
162			HookType::OnAccept => &definition.hooks.on_accept,
163			HookType::OnReject => &definition.hooks.on_reject,
164		};
165
166		// Execute hook based on implementation type
167		match implementation {
168			HookImplementation::None => {
169				// Check if there's a native hook registered for this action type
170				let hook_reg = app.ext::<Arc<tokio::sync::RwLock<crate::hooks::HookRegistry>>>()?;
171				let registry = hook_reg.read().await;
172				if let Some(hook_fn) = registry.get_hook(action_type, hook_type) {
173					let hook_fn = hook_fn.clone();
174					drop(registry);
175					match timeout(HOOK_TIMEOUT, hook_fn(app.clone(), context)).await {
176						Ok(Ok(hook_result)) => {
177							if !hook_result.continue_processing {
178								tracing::debug!("Native hook requested to abort processing");
179							}
180							Ok(())
181						}
182						Ok(Err(e)) => Err(e),
183						Err(_) => Err(Error::Timeout),
184					}
185				} else {
186					drop(registry);
187					Ok(())
188				}
189			}
190
191			HookImplementation::Dsl(operations) => {
192				if operations.is_empty() {
193					return Ok(());
194				}
195
196				// Execute DSL operations with timeout
197				let execution = async {
198					let mut executor = OperationExecutor::new(app);
199
200					for operation in operations {
201						match executor.execute(operation, &mut context).await {
202							Ok(()) => {}
203							Err(Error::ValidationError(ref msg)) if msg == EARLY_RETURN_MARKER => {
204								tracing::debug!("DSL hook early return");
205								break;
206							}
207							Err(e) => return Err(e),
208						}
209					}
210
211					Ok(())
212				};
213
214				match timeout(HOOK_TIMEOUT, execution).await {
215					Ok(result) => result,
216					Err(_) => Err(Error::Timeout),
217				}
218			}
219
220			HookImplementation::Native(_) => {
221				// Look up and execute native hook from registry
222				let hook_reg = app.ext::<Arc<tokio::sync::RwLock<crate::hooks::HookRegistry>>>()?;
223				let registry = hook_reg.read().await;
224				if let Some(hook_fn) = registry.get_hook(action_type, hook_type) {
225					let hook_fn = hook_fn.clone();
226					drop(registry);
227					match timeout(HOOK_TIMEOUT, hook_fn(app.clone(), context)).await {
228						Ok(Ok(hook_result)) => {
229							// Merge variables back into context
230							// (in future, we may want to pass context by reference and update it)
231							if !hook_result.continue_processing {
232								tracing::debug!("Native hook requested to abort processing");
233							}
234							Ok(())
235						}
236						Ok(Err(e)) => Err(e),
237						Err(_) => Err(Error::Timeout),
238					}
239				} else {
240					drop(registry);
241					tracing::warn!(
242						"Native hook not found in registry for {} hook on action type: {}",
243						hook_type.as_str(),
244						action_type
245					);
246					Ok(())
247				}
248			}
249
250			HookImplementation::Hybrid { dsl, .. } => {
251				// Execute DSL operations first
252				if !dsl.is_empty() {
253					let execution = async {
254						let mut executor = OperationExecutor::new(app);
255
256						for operation in dsl {
257							match executor.execute(operation, &mut context).await {
258								Ok(()) => {}
259								Err(Error::ValidationError(ref msg))
260									if msg == EARLY_RETURN_MARKER =>
261								{
262									tracing::debug!("DSL hook early return");
263									break;
264								}
265								Err(e) => return Err(e),
266							}
267						}
268
269						Ok(())
270					};
271
272					match timeout(HOOK_TIMEOUT, execution).await {
273						Ok(result) => result?,
274						Err(_) => return Err(Error::Timeout),
275					}
276				}
277
278				// Then execute native function
279				let hook_reg = app.ext::<Arc<tokio::sync::RwLock<crate::hooks::HookRegistry>>>()?;
280				let registry = hook_reg.read().await;
281				if let Some(hook_fn) = registry.get_hook(action_type, hook_type) {
282					let hook_fn = hook_fn.clone();
283					drop(registry);
284					match timeout(HOOK_TIMEOUT, hook_fn(app.clone(), context)).await {
285						Ok(Ok(hook_result)) => {
286							if !hook_result.continue_processing {
287								tracing::debug!("Hybrid native hook requested to abort processing");
288							}
289							Ok(())
290						}
291						Ok(Err(e)) => Err(e),
292						Err(_) => Err(Error::Timeout),
293					}
294				} else {
295					drop(registry);
296					Ok(())
297				}
298			}
299		}
300	}
301
302	/// Execute a hook for an action type and return the HookResult
303	/// This is useful for synchronous endpoints that need to return the hook's response
304	pub async fn execute_hook_with_result(
305		&self,
306		app: &App,
307		action_type: &str,
308		hook_type: HookType,
309		mut context: HookContext,
310	) -> ClResult<HookResult> {
311		use crate::hooks::HookImplementation;
312
313		let definition = self.definitions.get(action_type).ok_or_else(|| {
314			Error::ValidationError(format!("Action definition not found: {}", action_type))
315		})?;
316
317		let implementation = match hook_type {
318			HookType::OnCreate => &definition.hooks.on_create,
319			HookType::OnReceive => &definition.hooks.on_receive,
320			HookType::OnAccept => &definition.hooks.on_accept,
321			HookType::OnReject => &definition.hooks.on_reject,
322		};
323
324		// Execute hook based on implementation type
325		match implementation {
326			HookImplementation::None => {
327				// Check if there's a native hook registered for this action type
328				let hook_reg = app.ext::<Arc<tokio::sync::RwLock<crate::hooks::HookRegistry>>>()?;
329				let registry = hook_reg.read().await;
330				if let Some(hook_fn) = registry.get_hook(action_type, hook_type) {
331					let hook_fn = hook_fn.clone();
332					drop(registry);
333					match timeout(HOOK_TIMEOUT, hook_fn(app.clone(), context)).await {
334						Ok(Ok(hook_result)) => Ok(hook_result),
335						Ok(Err(e)) => Err(e),
336						Err(_) => Err(Error::Timeout),
337					}
338				} else {
339					drop(registry);
340					Ok(HookResult::default())
341				}
342			}
343
344			HookImplementation::Dsl(operations) => {
345				if operations.is_empty() {
346					return Ok(HookResult::default());
347				}
348
349				// Execute DSL operations with timeout
350				let execution = async {
351					let mut executor = OperationExecutor::new(app);
352
353					for operation in operations {
354						match executor.execute(operation, &mut context).await {
355							Ok(()) => {}
356							Err(Error::ValidationError(ref msg)) if msg == EARLY_RETURN_MARKER => {
357								tracing::debug!("DSL hook early return");
358								break;
359							}
360							Err(e) => return Err(e),
361						}
362					}
363
364					Ok(HookResult {
365						vars: context.vars.clone(),
366						continue_processing: true,
367						return_value: None,
368					})
369				};
370
371				match timeout(HOOK_TIMEOUT, execution).await {
372					Ok(result) => result,
373					Err(_) => Err(Error::Timeout),
374				}
375			}
376
377			HookImplementation::Native(_) => {
378				// Look up and execute native hook from registry
379				let hook_reg = app.ext::<Arc<tokio::sync::RwLock<crate::hooks::HookRegistry>>>()?;
380				let registry = hook_reg.read().await;
381				if let Some(hook_fn) = registry.get_hook(action_type, hook_type) {
382					let hook_fn = hook_fn.clone();
383					drop(registry);
384					match timeout(HOOK_TIMEOUT, hook_fn(app.clone(), context)).await {
385						Ok(Ok(hook_result)) => Ok(hook_result),
386						Ok(Err(e)) => Err(e),
387						Err(_) => Err(Error::Timeout),
388					}
389				} else {
390					drop(registry);
391					tracing::warn!(
392						"Native hook not found in registry for {} hook on action type: {}",
393						hook_type.as_str(),
394						action_type
395					);
396					Ok(HookResult::default())
397				}
398			}
399
400			HookImplementation::Hybrid { dsl, .. } => {
401				// Execute DSL operations first
402				if !dsl.is_empty() {
403					let execution = async {
404						let mut executor = OperationExecutor::new(app);
405
406						for operation in dsl {
407							match executor.execute(operation, &mut context).await {
408								Ok(()) => {}
409								Err(Error::ValidationError(ref msg))
410									if msg == EARLY_RETURN_MARKER =>
411								{
412									tracing::debug!("DSL hook early return");
413									break;
414								}
415								Err(e) => return Err(e),
416							}
417						}
418
419						Ok(())
420					};
421
422					match timeout(HOOK_TIMEOUT, execution).await {
423						Ok(result) => result?,
424						Err(_) => return Err(Error::Timeout),
425					}
426				}
427
428				// Then execute native function
429				let hook_reg = app.ext::<Arc<tokio::sync::RwLock<crate::hooks::HookRegistry>>>()?;
430				let registry = hook_reg.read().await;
431				if let Some(hook_fn) = registry.get_hook(action_type, hook_type) {
432					let hook_fn = hook_fn.clone();
433					drop(registry);
434					match timeout(HOOK_TIMEOUT, hook_fn(app.clone(), context)).await {
435						Ok(Ok(hook_result)) => Ok(hook_result),
436						Ok(Err(e)) => Err(e),
437						Err(_) => Err(Error::Timeout),
438					}
439				} else {
440					drop(registry);
441					Ok(HookResult::default())
442				}
443			}
444		}
445	}
446
447	/// Get behavior flags for an action type
448	pub fn get_behavior(&self, action_type: &str) -> Option<&BehaviorFlags> {
449		self.definitions.get(action_type).map(|d| &d.behavior)
450	}
451
452	/// Get field constraints for an action type
453	pub fn get_field_constraints(&self, action_type: &str) -> Option<&FieldConstraints> {
454		self.definitions.get(action_type).map(|d| &d.fields)
455	}
456
457	/// Get key pattern for an action type
458	pub fn get_key_pattern(&self, action_type: &str) -> Option<&str> {
459		self.definitions.get(action_type).and_then(|d| d.key_pattern.as_deref())
460	}
461
462	/// Validate action content against the schema defined for an action type.
463	///
464	/// Returns Ok(()) if content is valid or no schema is defined.
465	/// Returns Err with validation details if content violates the schema.
466	pub fn validate_content(
467		&self,
468		action_type: &str,
469		content: Option<&serde_json::Value>,
470	) -> ClResult<()> {
471		// Try full type first, then base type (e.g., "REACT:LIKE" -> "REACT")
472		let definition = self
473			.definitions
474			.get(action_type)
475			.or_else(|| action_type.split(':').next().and_then(|base| self.definitions.get(base)))
476			.ok_or_else(|| {
477				Error::ValidationError(format!("Unknown action type: {}", action_type))
478			})?;
479
480		// Check field constraints for content
481		if let Some(FieldConstraint::Required) = definition.fields.content {
482			if content.is_none() || matches!(content, Some(serde_json::Value::Null)) {
483				return Err(Error::ValidationError(format!(
484					"Content is required for action type {}",
485					action_type
486				)));
487			}
488		}
489
490		if let Some(FieldConstraint::Forbidden) = definition.fields.content {
491			if content.is_some() && !matches!(content, Some(serde_json::Value::Null)) {
492				return Err(Error::ValidationError(format!(
493					"Content is forbidden for action type {}",
494					action_type
495				)));
496			}
497		}
498
499		// If no schema defined or no content, validation passes
500		let Some(schema_wrapper) = &definition.schema else {
501			return Ok(());
502		};
503		let Some(schema) = &schema_wrapper.content else {
504			return Ok(());
505		};
506		let Some(content) = content else {
507			return Ok(());
508		};
509
510		// Validate content against schema
511		self.validate_value_against_schema(content, schema, "content")
512	}
513
514	/// Validate a value against a content schema
515	fn validate_value_against_schema(
516		&self,
517		value: &serde_json::Value,
518		schema: &ContentSchema,
519		path: &str,
520	) -> ClResult<()> {
521		match schema.content_type {
522			ContentType::String => {
523				let s = value
524					.as_str()
525					.ok_or_else(|| Error::ValidationError(format!("{}: expected string", path)))?;
526
527				// Check min_length
528				if let Some(min) = schema.min_length {
529					if s.len() < min {
530						return Err(Error::ValidationError(format!(
531							"{}: string too short (min {})",
532							path, min
533						)));
534					}
535				}
536
537				// Check max_length
538				if let Some(max) = schema.max_length {
539					if s.len() > max {
540						return Err(Error::ValidationError(format!(
541							"{}: string too long (max {})",
542							path, max
543						)));
544					}
545				}
546
547				// Check pattern
548				if let Some(ref pattern) = schema.pattern {
549					let re = regex::Regex::new(pattern).map_err(|e| {
550						Error::ValidationError(format!("{}: invalid pattern: {}", path, e))
551					})?;
552					if !re.is_match(s) {
553						return Err(Error::ValidationError(format!(
554							"{}: string does not match pattern",
555							path
556						)));
557					}
558				}
559
560				// Check enum
561				if let Some(ref allowed) = schema.r#enum {
562					let string_val = serde_json::Value::String(s.to_string());
563					if !allowed.contains(&string_val) {
564						return Err(Error::ValidationError(format!(
565							"{}: value not in allowed enum",
566							path
567						)));
568					}
569				}
570			}
571
572			ContentType::Number => {
573				if !value.is_number() {
574					return Err(Error::ValidationError(format!("{}: expected number", path)));
575				}
576
577				// Check enum
578				if let Some(ref allowed) = schema.r#enum {
579					if !allowed.contains(value) {
580						return Err(Error::ValidationError(format!(
581							"{}: value not in allowed enum",
582							path
583						)));
584					}
585				}
586			}
587
588			ContentType::Boolean => {
589				if !value.is_boolean() {
590					return Err(Error::ValidationError(format!("{}: expected boolean", path)));
591				}
592			}
593
594			ContentType::Object => {
595				let obj = value
596					.as_object()
597					.ok_or_else(|| Error::ValidationError(format!("{}: expected object", path)))?;
598
599				// Check required properties
600				if let Some(ref required) = schema.required {
601					for prop in required {
602						if !obj.contains_key(prop) {
603							return Err(Error::ValidationError(format!(
604								"{}: missing required property '{}'",
605								path, prop
606							)));
607						}
608					}
609				}
610
611				// Validate individual properties
612				if let Some(ref properties) = schema.properties {
613					for (prop_name, prop_schema) in properties {
614						if let Some(prop_value) = obj.get(prop_name) {
615							self.validate_field_value(
616								prop_value,
617								prop_schema,
618								&format!("{}.{}", path, prop_name),
619							)?;
620						}
621					}
622				}
623			}
624
625			ContentType::Json => {
626				// Json type accepts any valid JSON - no further validation
627			}
628		}
629
630		Ok(())
631	}
632
633	/// Validate a field value against a schema field definition
634	fn validate_field_value(
635		&self,
636		value: &serde_json::Value,
637		schema: &SchemaField,
638		path: &str,
639	) -> ClResult<()> {
640		match schema.field_type {
641			FieldType::String => {
642				let s = value
643					.as_str()
644					.ok_or_else(|| Error::ValidationError(format!("{}: expected string", path)))?;
645
646				if let Some(min) = schema.min_length {
647					if s.len() < min {
648						return Err(Error::ValidationError(format!(
649							"{}: string too short (min {})",
650							path, min
651						)));
652					}
653				}
654
655				if let Some(max) = schema.max_length {
656					if s.len() > max {
657						return Err(Error::ValidationError(format!(
658							"{}: string too long (max {})",
659							path, max
660						)));
661					}
662				}
663
664				if let Some(ref allowed) = schema.r#enum {
665					let string_val = serde_json::Value::String(s.to_string());
666					if !allowed.contains(&string_val) {
667						return Err(Error::ValidationError(format!(
668							"{}: value '{}' not in allowed enum",
669							path, s
670						)));
671					}
672				}
673			}
674
675			FieldType::Number => {
676				if !value.is_number() {
677					return Err(Error::ValidationError(format!("{}: expected number", path)));
678				}
679			}
680
681			FieldType::Boolean => {
682				if !value.is_boolean() {
683					return Err(Error::ValidationError(format!("{}: expected boolean", path)));
684				}
685			}
686
687			FieldType::Array => {
688				let arr = value
689					.as_array()
690					.ok_or_else(|| Error::ValidationError(format!("{}: expected array", path)))?;
691
692				if let Some(ref item_schema) = schema.items {
693					for (i, item) in arr.iter().enumerate() {
694						self.validate_field_value(item, item_schema, &format!("{}[{}]", path, i))?;
695					}
696				}
697			}
698
699			FieldType::Json => {
700				// Json type accepts any valid JSON
701			}
702		}
703
704		Ok(())
705	}
706
707	/// List all loaded action types
708	pub fn list_action_types(&self) -> Vec<String> {
709		self.definitions.keys().cloned().collect()
710	}
711
712	/// Get statistics about loaded definitions
713	pub fn stats(&self) -> DslEngineStats {
714		let total_definitions = self.definitions.len();
715		let mut hook_counts = HookCounts::default();
716
717		for def in self.definitions.values() {
718			if def.hooks.on_create.is_some() {
719				hook_counts.on_create += 1;
720			}
721			if def.hooks.on_receive.is_some() {
722				hook_counts.on_receive += 1;
723			}
724			if def.hooks.on_accept.is_some() {
725				hook_counts.on_accept += 1;
726			}
727			if def.hooks.on_reject.is_some() {
728				hook_counts.on_reject += 1;
729			}
730		}
731
732		DslEngineStats { total_definitions, hook_counts }
733	}
734}
735
736/// DSL engine statistics
737#[derive(Debug, Clone)]
738pub struct DslEngineStats {
739	pub total_definitions: usize,
740	pub hook_counts: HookCounts,
741}
742
743/// Hook counts
744#[derive(Debug, Clone, Default)]
745pub struct HookCounts {
746	pub on_create: usize,
747	pub on_receive: usize,
748	pub on_accept: usize,
749	pub on_reject: usize,
750}
751
752#[cfg(test)]
753mod tests {
754	#[test]
755	fn test_load_definition_from_json() {
756		let _json = r#"
757		{
758			"type": "TEST",
759			"version": "1.0",
760			"description": "Test action",
761			"fields": {},
762			"behavior": {},
763			"hooks": {}
764		}
765		"#;
766
767		// Note: Can't create App in test without full initialization
768		// This test would need mock/test fixtures
769	}
770}
771
772// vim: ts=4