1pub mod context;
2pub mod count;
3pub mod id;
4pub mod modules;
5pub mod otel;
6use context::Context;
7pub use crossbeam;
8use crossbeam::channel;
9use modules::ModulePackage;
10pub use opentelemetry;
11use std::collections::HashMap;
12use std::fmt::{Debug, Formatter};
13pub use tokio;
14use tokio::sync::oneshot;
15pub use tracing;
16pub use tracing_core;
17pub use tracing_opentelemetry;
18pub use tracing_subscriber;
19pub use valu3;
20use valu3::{traits::ToValueBehavior, value::Value};
21
22pub type ModuleId = usize;
23pub type MainRuntimeSender = channel::Sender<Package>;
24pub type ModuleSetupSender = oneshot::Sender<Option<channel::Sender<ModulePackage>>>;
25
26#[derive(Debug)]
27pub struct ModuleSetup {
28 pub id: ModuleId,
29 pub setup_sender: ModuleSetupSender,
30 pub main_sender: Option<MainRuntimeSender>,
31 pub with: Value,
32 pub dispatch: tracing::Dispatch,
33}
34
35impl ModuleSetup {
36 pub fn is_main(&self) -> bool {
37 self.main_sender.is_some()
38 }
39}
40
41#[derive(Default)]
42pub struct Package {
43 pub send: Option<oneshot::Sender<Value>>,
44 pub request_data: Option<Value>,
45 pub origin: ModuleId,
46 pub span: Option<tracing::Span>,
47 pub dispatch: Option<tracing::Dispatch>,
48}
49
50impl Debug for Package {
52 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
53 let map: HashMap<_, _> = vec![
54 ("request_data", self.request_data.to_value()),
55 ("step_position", self.origin.to_value()),
56 ]
57 .into_iter()
58 .collect();
59
60 write!(
61 f,
62 "{}",
63 map.to_value().to_json(valu3::prelude::JsonMode::Inline)
64 )
65 }
66}
67
68impl Package {
69 pub fn get_data(&self) -> Option<&Value> {
70 self.request_data.as_ref()
71 }
72
73 pub fn send(&mut self, response_data: Value) {
74 if let Some(send) = self.send.take() {
75 sender_safe!(send, response_data);
76 }
77 }
78}
79
80#[macro_export]
81macro_rules! span_enter {
82 ($span:expr) => {
83 let span_enter_clone = $span.clone();
84 let _enter = span_enter_clone.enter();
85 };
86}
87
88#[macro_export]
89macro_rules! sender_safe {
90 ($sender:expr, $data:expr) => {
91 if let Err(err) = $sender.send($data) {
92 $crate::tracing::debug!("Error sending data: {:?}", err);
93 }
94 };
95}
96
97#[macro_export]
98macro_rules! otlp_start {
99 () => {
100 let _ = match phlow_sdk::otel::init_tracing_subscriber_plugin() {
101 Ok(guard) => guard,
102 Err(e) => {
103 $crate::tracing::error!("Error creating tracing subscriber: {:?}", e);
104 return;
105 }
106 };
107 };
108}
109
110#[macro_export]
111macro_rules! plugin {
112 ($handler:ident) => {
113 #[no_mangle]
114 pub extern "C" fn plugin(setup: ModuleSetup) {
115 match $handler(setup) {
116 Ok(_) => {}
117 Err(e) => {
118 $crate::tracing::error!("Error in plugin: {:?}", e);
119 }
120 }
121 }
122 };
123}
124
125#[macro_export]
126macro_rules! plugin_async {
127 ($handler:ident) => {
128 #[no_mangle]
129 pub extern "C" fn plugin(setup: ModuleSetup) {
130 if let Ok(rt) = tokio::runtime::Runtime::new() {
131 if let Err(e) = rt.block_on($handler(setup)) {
132 phlow_sdk::tracing::error!("Error in plugin: {:?}", e);
133 }
134 } else {
135 phlow_sdk::tracing::error!("Error creating runtime");
136 return;
137 };
138 }
139 };
140}
141#[macro_export]
142macro_rules! main_plugin_async {
143 ($handler:ident) => {
144 #[no_mangle]
145 pub extern "C" fn plugin(setup: ModuleSetup) {
146 let dispatch = setup.dispatch.clone();
147 phlow_sdk::tracing::dispatcher::with_default(&dispatch, || {
148 let _guard = phlow_sdk::otel::init_tracing_subscriber();
149
150 if let Ok(rt) = phlow_sdk::tokio::runtime::Runtime::new() {
151 rt.block_on(start_server(setup)).unwrap_or_else(|e| {
152 phlow_sdk::tracing::error!("Error in plugin: {:?}", e);
153 });
154 println!("Plugin loaded");
155 } else {
156 phlow_sdk::tracing::error!("Error creating runtime");
157 println!("Plugin loaded");
158
159 return;
160 };
161
162 println!("Plugin loaded");
163 });
164 }
165 };
166}
167
168#[macro_export]
169macro_rules! sender_without_response {
170 ($id:expr, $sender:expr, $data:expr) => {{
171 let package = Package {
172 send: None,
173 request_data: $data,
174 origin: $id,
175 };
176
177 sender_safe!($sender, package);
178 }};
179}
180
181#[macro_export]
182macro_rules! sender {
183 ($id:expr, $sender:expr, $data:expr) => {{
184 let (tx, rx) = tokio::sync::oneshot::channel::<valu3::value::Value>();
185
186 let package = Package {
187 send: Some(tx),
188 request_data: $data,
189 origin: $id,
190 span: None,
191 dispatch: None,
192 };
193
194 sender_safe!($sender, package);
195
196 rx
197 }};
198 ($span:expr, $dispatch:expr, $id:expr, $sender:expr, $data:expr) => {{
199 let (tx, rx) = tokio::sync::oneshot::channel::<valu3::value::Value>();
200
201 let package = Package {
202 send: Some(tx),
203 request_data: $data,
204 origin: $id,
205 span: Some($span),
206 dispatch: Some($dispatch),
207 };
208
209 sender_safe!($sender, package);
210
211 rx
212 }};
213}
214
215pub mod prelude {
216 pub use crate::plugin;
217 pub use crate::*;
218 pub use valu3::json;
219 pub use valu3::prelude::*;
220}