phlow_sdk/
macros.rs

1#[macro_export]
2macro_rules! listen {
3    ($rx:expr, $resolve:expr) => {{
4        for package in $rx {
5            $crate::tokio::spawn(async move {
6                $resolve(package).await;
7            });
8        }
9    }};
10}
11
12#[macro_export]
13macro_rules! use_log {
14    () => {{
15        let _ = env_logger::Builder::from_env(
16            env_logger::Env::new()
17                .default_filter_or("info")
18                .filter_or("PHLOW_LOG", "info"),
19        )
20        .try_init();
21    }};
22}
23
24#[macro_export]
25macro_rules! span_enter {
26    ($span:expr) => {
27        let span_enter_clone = $span.clone();
28        let _enter = span_enter_clone.enter();
29    };
30}
31
32#[macro_export]
33macro_rules! sender_safe {
34    ($sender:expr, $data:expr) => {
35        if let Err(err) = $sender.send($data) {
36            $crate::tracing::debug!("Error sending data: {:?}", err);
37        }
38    };
39}
40
41#[macro_export]
42macro_rules! sender_package {
43    ($id:expr, $sender:expr, $data:expr) => {{
44        let (tx, rx) = $crate::tokio::sync::oneshot::channel::<$crate::valu3::value::Value>();
45
46        let package = $crate::structs::Package {
47            response: Some(tx),
48            request_data: $data,
49            origin: $id,
50            span: None,
51            dispatch: None,
52        };
53
54        sender_safe!($sender, package);
55
56        rx
57    }};
58    ($span:expr, $dispatch:expr, $id:expr, $sender:expr, $data:expr) => {{
59        let (tx, rx) = $crate::tokio::sync::oneshot::channel::<$crate::valu3::value::Value>();
60
61        let package = $crate::structs::Package {
62            response: Some(tx),
63            request_data: $data,
64            origin: $id,
65            span: Some($span),
66            dispatch: Some($dispatch),
67        };
68
69        sender_safe!($sender, package);
70
71        rx
72    }};
73}
74
75#[macro_export]
76macro_rules! module_channel {
77    ($setup:expr) => {{
78        let (tx, rx) = $crate::crossbeam::channel::unbounded::<ModulePackage>();
79
80        sender_safe!($setup.setup_sender, Some(tx));
81
82        rx
83    }};
84}
85
86#[macro_export]
87macro_rules! create_step {
88    ($handler:ident(setup)) => {
89        #[unsafe(export_name = "plugin")]
90        pub extern "C" fn plugin(setup: $crate::structs::ModuleSetup) {
91            use_log!();
92
93            if let Ok(rt) = $crate::tokio::runtime::Runtime::new() {
94                if let Err(e) = rt.block_on($handler(setup)) {
95                    $crate::tracing::error!("Error in plugin: {:?}", e);
96                }
97            } else {
98                $crate::tracing::error!("Error creating runtime");
99                return;
100            };
101        }
102    };
103
104    ($handler:ident(rx)) => {
105        #[unsafe(export_name = "plugin")]
106        pub extern "C" fn plugin(setup: $crate::structs::ModuleSetup) {
107            let dispatch = setup.dispatch.clone();
108            $crate::tracing::dispatcher::with_default(&dispatch, || {
109                use_log!();
110
111                if let Ok(rt) = $crate::tokio::runtime::Runtime::new() {
112                    let rx = module_channel!(setup);
113
114                    if setup.is_test_mode {
115                        // During tests, run handler in a detached thread
116                        std::thread::spawn(move || {
117                            if let Err(e) = rt.block_on($handler(rx)) {
118                                $crate::tracing::error!("Error in plugin during test: {:?}", e);
119                            }
120                        });
121
122                        // Give the thread a moment to start
123                        std::thread::sleep(std::time::Duration::from_millis(50));
124                    } else {
125                        // In normal mode, block on the handler
126                        if let Err(e) = rt.block_on($handler(rx)) {
127                            $crate::tracing::error!("Error in plugin: {:?}", e);
128                        }
129                    }
130                } else {
131                    $crate::tracing::error!("Error creating runtime");
132                    return;
133                };
134            });
135        }
136    };
137}
138
139#[macro_export]
140macro_rules! create_main {
141    ($handler:ident(setup)) => {
142        #[unsafe(export_name = "plugin")]
143        pub extern "C" fn plugin(setup: $crate::structs::ModuleSetup) {
144            let dispatch = setup.dispatch.clone();
145            $crate::tracing::dispatcher::with_default(&dispatch, || {
146                let _guard = $crate::otel::init_tracing_subscriber(setup.app_data.clone());
147                use_log!();
148
149                if let Ok(rt) = $crate::tokio::runtime::Runtime::new() {
150                    rt.block_on($handler(setup)).unwrap_or_else(|e| {
151                        $crate::tracing::error!("Error in plugin: {:?}", e);
152                    });
153                } else {
154                    $crate::tracing::error!("Error creating runtime");
155                    return;
156                }
157            });
158        }
159    };
160}