1use crossbeam::channel;
2use phs::{wrap_async_fn, Repositories};
3use std::{collections::HashMap, fmt::Display};
4use tokio::sync::oneshot::{self, Receiver};
5use valu3::{prelude::*, value::Value};
6
7pub enum Error {
8 VersionNotFound(String),
9 ModuleLoaderError(String),
10 ModuleNotFound(String),
11}
12
13impl std::fmt::Debug for Error {
14 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15 match self {
16 Error::VersionNotFound(module) => write!(f, "Version not found for module: {}", module),
17 Error::ModuleLoaderError(err) => write!(f, "Module loader error: {}", err),
18 Error::ModuleNotFound(name) => write!(f, "Module not found: {}", name),
19 }
20 }
21}
22
23impl Display for Error {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 match self {
26 Error::VersionNotFound(module) => write!(f, "Version not found for module: {}", module),
27 Error::ModuleLoaderError(err) => write!(f, "Module loader error: {}", err),
28 Error::ModuleNotFound(name) => write!(f, "Module not found: {}", name),
29 }
30 }
31}
32
33#[derive(ToValue, FromValue, Clone, Debug)]
34pub struct ModuleData {
35 pub version: String,
36 pub repository: Option<String>,
37 pub repository_path: Option<String>,
38 pub repository_raw_content: Option<String>,
39 pub module: String,
40 pub name: String,
41 pub with: Value,
42 pub input: Value,
43 pub output: Value,
44 pub input_order: Value,
45 pub local_path: Option<String>,
46}
47
48impl ModuleData {
49 pub fn set_info(&mut self, info: Value) {
50 if info.is_null() || info.is_null() {
51 return;
52 }
53
54 let input = match info.get("input") {
55 Some(input) => {
56 if let Value::Object(obj) = input {
57 if let Some(obj_type) = obj.get("type") {
58 if obj_type.to_string() == "object" {
59 obj.get("properties").unwrap_or(&Value::Null).clone()
60 } else {
61 input.clone()
62 }
63 } else {
64 input.clone()
65 }
66 } else {
67 input.clone()
68 }
69 }
70 None => Value::Null,
71 };
72
73 let output = match info.get("output") {
74 Some(output) => output.clone(),
75 None => Value::Null,
76 };
77
78 let input_order = match info.get("input_order") {
79 Some(input_order) => input_order.clone(),
80 None => Value::Null,
81 };
82
83 self.input = input;
84 self.output = output;
85 self.input_order = input_order;
86 }
87}
88
89impl TryFrom<Value> for ModuleData {
90 type Error = Error;
91
92 fn try_from(value: Value) -> Result<Self, Error> {
93 let module = match value.get("module") {
94 Some(module) => module.to_string(),
95 None => return Err(Error::ModuleLoaderError("Module not found".to_string())),
96 };
97 let repository = value.get("repository").map(|v| v.to_string());
98
99 let is_local_path =
101 module.starts_with("./") || module.starts_with("../") || module.starts_with("/");
102 let local_path = if is_local_path {
103 Some(module.clone())
104 } else {
105 None
106 };
107
108 let repository_path = if repository.is_none() && !is_local_path {
109 let mut padded = module.to_string();
110 while padded.len() < 4 {
111 padded.push('_');
112 }
113
114 let prefix = &padded[0..2];
115 let middle = &padded[2..4];
116
117 let repository = format!("{}/{}/{}", prefix, middle, module);
118 Some(repository)
119 } else {
120 None
121 };
122
123 let repository_raw_content = value.get("repository_raw_content").map(|v| v.to_string());
124
125 let version = match value.get("version") {
126 Some(version) => version.to_string(),
127 None => "latest".to_string(),
128 };
129
130 let name = match value.get("name") {
131 Some(name) => name.to_string(),
132 None => {
133 if is_local_path {
134 std::path::Path::new(&module)
136 .file_name()
137 .and_then(|os_str| os_str.to_str())
138 .unwrap_or(&module)
139 .to_string()
140 } else {
141 module.clone()
142 }
143 }
144 };
145
146 let with = match value.get("with") {
147 Some(with) => with.clone(),
148 None => Value::Null,
149 };
150
151 Ok(ModuleData {
152 module,
153 repository,
154 version,
155 name,
156 with,
157 input: Value::Null,
158 output: Value::Null,
159 repository_path,
160 repository_raw_content,
161 input_order: Value::Null,
162 local_path,
163 })
164 }
165}
166
167#[derive(Debug, Clone)]
168pub enum ModulesError {
169 ModuleNotFound(String),
170 ModuleNotLoaded(String),
171 ModuleError(String),
172}
173
174impl Display for ModulesError {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 match self {
177 ModulesError::ModuleNotFound(name) => write!(f, "Module not found: {}", name),
178 ModulesError::ModuleNotLoaded(name) => write!(f, "Module not loaded: {}", name),
179 ModulesError::ModuleError(err) => write!(f, "Module error: {}", err),
180 }
181 }
182}
183
184#[derive(Debug, Clone)]
185pub struct ModuleResponse {
186 pub error: Option<String>,
187 pub data: Value,
188}
189
190impl Into<ModuleResponse> for Value {
191 fn into(self) -> ModuleResponse {
192 ModuleResponse {
193 error: None,
194 data: self,
195 }
196 }
197}
198
199impl ModuleResponse {
200 pub fn from_error(error: String) -> Self {
201 Self {
202 error: Some(error),
203 data: Value::Null,
204 }
205 }
206
207 pub fn from_success(value: Value) -> Self {
208 Self {
209 error: None,
210 data: value,
211 }
212 }
213}
214
215#[derive(Debug)]
216pub struct ModulePackage {
217 pub input: Option<Value>,
218 pub payload: Option<Value>,
219 pub sender: oneshot::Sender<ModuleResponse>,
220}
221
222impl ModulePackage {
223 pub fn input(&self) -> Option<Value> {
224 self.input.clone()
225 }
226
227 pub fn payload(&self) -> Option<Value> {
228 self.payload.clone()
229 }
230}
231
232#[derive(Debug, Default, Clone)]
233pub struct ModuleParams {
234 pub with: Value,
235 pub input: Value,
236 pub output: Value,
237 pub input_order: Vec<String>,
238}
239
240#[derive(Debug, Clone)]
241pub struct Module {
242 pub sender: channel::Sender<ModulePackage>,
243 pub params: ModuleParams,
244}
245
246impl Module {
247 pub fn send(&self, input: Option<Value>, payload: Option<Value>) -> Receiver<ModuleResponse> {
248 let (package_sender, package_receiver) = oneshot::channel();
249 let package = ModulePackage {
250 input,
251 payload,
252 sender: package_sender,
253 };
254
255 let _ = self.sender.send(package);
256
257 package_receiver
258 }
259}
260
261#[derive(Debug, Default, Clone)]
262pub struct Modules {
263 pub modules: HashMap<String, Module>,
264}
265
266impl Modules {
267 pub fn extract(&self) -> Self {
268 Self {
269 modules: self.modules.clone(),
270 }
271 }
272
273 pub fn register(&mut self, module_data: ModuleData, sender: channel::Sender<ModulePackage>) {
274 let input_order = if let Value::Array(arr) = module_data.input_order {
275 arr.into_iter().map(|s| s.to_string()).collect()
276 } else {
277 Vec::new()
278 };
279
280 let module = Module {
281 sender,
282 params: ModuleParams {
283 with: module_data.with,
284 input: module_data.input,
285 output: module_data.output,
286 input_order,
287 },
288 };
289
290 self.modules.insert(module_data.name.to_string(), module);
291 }
292
293 pub async fn execute(
294 &self,
295 name: &str,
296 input: &Option<Value>,
297 payload: &Option<Value>,
298 ) -> Result<ModuleResponse, ModulesError> {
299 if let Some(module) = self.modules.get(name) {
300 let package_receiver = module.send(input.clone(), payload.clone());
301
302 let value = package_receiver.await.unwrap_or(ModuleResponse::from_error(
303 "Module response channel closed".to_string(),
304 ));
305
306 Ok(value)
307 } else {
308 Err(ModulesError::ModuleNotLoaded(name.to_string()))
309 }
310 }
311
312 pub fn extract_repositories(&self) -> Repositories {
313 let mut repositories = HashMap::new();
314
315 for (name, module) in self.modules.clone() {
316 let args = module.params.input_order.clone();
317 let func = move |value: Value| {
318 let package_receiver = module.send(Some(value), None);
319
320 async move {
321 let result = package_receiver.await.unwrap_or(ModuleResponse::from_error(
322 "Module response channel closed".to_string(),
323 ));
324
325 if let Some(error) = result.error {
326 Value::from(format!("Error: {}", error))
327 } else {
328 result.data
329 }
330 }
331 };
332
333 let repository_function = wrap_async_fn(name.clone(), func, args);
334
335 repositories.insert(name, repository_function);
336 }
337
338 Repositories { repositories }
339 }
340}