phlow_engine/
engine.rs

1use crate::repositories::{Repositories, RepositoryFunction};
2use regex::Regex;
3use rhai::serde::from_dynamic;
4use rhai::{Dynamic, Engine};
5use std::sync::Arc;
6use std::sync::OnceLock;
7use tokio::runtime::Runtime;
8use tokio::sync::oneshot;
9use valu3::value::Value;
10
11fn build_engine() -> Engine {
12    let mut engine = Engine::new();
13
14    // Define operadores personalizados
15    match engine.register_custom_operator("starts_with", 80) {
16        Ok(engine) => engine.register_fn("start_withs", |x: String, y: String| x.starts_with(&y)),
17        Err(_) => {
18            panic!("Error on register custom operator starts_with");
19        }
20    };
21
22    match engine.register_custom_operator("ends_with", 81) {
23        Ok(engine) => engine.register_fn("ends_with", |x: String, y: String| x.ends_with(&y)),
24        Err(_) => {
25            panic!("Error on register custom operator ends_with");
26        }
27    };
28
29    match engine.register_custom_operator("search", 82) {
30        Ok(engine) => engine.register_fn("search", |x: String, y: String| match Regex::new(&x) {
31            Ok(re) => re.is_match(&y),
32            Err(_) => false,
33        }),
34        Err(_) => {
35            panic!("Error on register custom operator search");
36        }
37    };
38
39    engine
40}
41
42static RUNTIME: OnceLock<Runtime> = OnceLock::new();
43
44pub fn build_engine_sync(repositories: Option<Repositories>) -> Engine {
45    let mut engine = build_engine();
46    let rt = RUNTIME.get_or_init(|| match Runtime::new() {
47        Ok(rt) => rt,
48        Err(e) => panic!("Error creating runtime: {:?}", e),
49    });
50
51    if let Some(repositories) = repositories {
52        for (key, call) in repositories.repositories {
53            let call: RepositoryFunction = Arc::new(move |value: Value| -> Value {
54                let call_clone = Arc::clone(&call);
55                let (tx, rx) = oneshot::channel();
56
57                rt.spawn(async move {
58                    let result = (call_clone)(value);
59                    let _ = tx.send(result);
60                });
61
62                rx.blocking_recv().unwrap_or(Value::Null)
63            }) as RepositoryFunction;
64
65            engine.register_fn(key.clone(), move |dynamic: Dynamic| {
66                let value: Value = match from_dynamic(&dynamic) {
67                    Ok(value) => value,
68                    Err(_) => Value::Null,
69                };
70                call(value)
71            });
72        }
73    }
74
75    engine
76}
77
78pub fn build_engine_async(repositories: Option<Repositories>) -> Arc<Engine> {
79    let mut engine = build_engine();
80
81    if let Some(repositories) = repositories {
82        for (key, call) in repositories.repositories {
83            let call: RepositoryFunction = Arc::new(move |value: Value| -> Value {
84                let call_clone = Arc::clone(&call);
85                let (tx, rx) = oneshot::channel();
86
87                // Executa a chamada assíncrona corretamente sem criar um novo runtime
88                tokio::task::spawn(async move {
89                    let result = (call_clone)(value);
90                    let _ = tx.send(result);
91                });
92
93                // Usa tokio::runtime::Handle::current() para evitar erro de runtime
94                rx.blocking_recv().unwrap_or(Value::Null) // Aguarda sem criar outro runtime
95            }) as RepositoryFunction;
96
97            engine.register_fn(key.clone(), move |dynamic: Dynamic| {
98                let value: Value = match from_dynamic(&dynamic) {
99                    Ok(value) => value,
100                    Err(_) => Value::Null,
101                };
102                call(value)
103            });
104        }
105    }
106
107    Arc::new(engine)
108}
109
110#[cfg(test)]
111mod tests {
112    use crate::plugin;
113
114    use super::*;
115    use std::collections::HashMap;
116    use std::sync::Arc;
117    use valu3::value::Value;
118
119    #[test]
120    fn test_custom_operators() {
121        let engine = build_engine_async(None);
122
123        let result: bool = engine.eval(r#""hello" starts_with "he""#).unwrap();
124        assert!(result);
125
126        let result: bool = engine.eval(r#""world" ends_with "ld""#).unwrap();
127        assert!(result);
128
129        let result: bool = engine.eval(r#""\\d+" search "123""#).unwrap();
130        assert!(result);
131    }
132
133    #[test]
134    fn test_repository_function() {
135        let mut repositories = HashMap::new();
136
137        let mock_function: Arc<dyn Fn(Value) -> Value + Send + Sync> = plugin!(|value| {
138            if let Value::String(s) = value {
139                Value::from(format!("{}-processed", s))
140            } else {
141                Value::Null
142            }
143        });
144
145        repositories.insert("process".to_string(), mock_function);
146
147        let repos = Repositories { repositories };
148        let engine = build_engine_sync(Some(repos));
149
150        let result: Value = engine.eval(r#"process("data")"#).unwrap();
151
152        assert_eq!(result, Value::from("data-processed"));
153    }
154}