phlow_sdk/structs/
modules.rs

1use crossbeam::channel;
2use phs::{wrap_async_fn, Repositories};
3use std::{collections::HashMap, fmt::Display};
4use tokio::sync::oneshot::{self, Receiver};
5use valu3::{prelude::*, value::Value};
6
7pub enum Error {
8    VersionNotFound(String),
9    ModuleLoaderError(String),
10    ModuleNotFound(String),
11}
12
13impl std::fmt::Debug for Error {
14    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15        match self {
16            Error::VersionNotFound(module) => write!(f, "Version not found for module: {}", module),
17            Error::ModuleLoaderError(err) => write!(f, "Module loader error: {}", err),
18            Error::ModuleNotFound(name) => write!(f, "Module not found: {}", name),
19        }
20    }
21}
22
23impl Display for Error {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        match self {
26            Error::VersionNotFound(module) => write!(f, "Version not found for module: {}", module),
27            Error::ModuleLoaderError(err) => write!(f, "Module loader error: {}", err),
28            Error::ModuleNotFound(name) => write!(f, "Module not found: {}", name),
29        }
30    }
31}
32
33#[derive(ToValue, FromValue, Clone, Debug)]
34pub struct ModuleData {
35    pub version: String,
36    pub repository: Option<String>,
37    pub repository_path: Option<String>,
38    pub repository_raw_content: Option<String>,
39    pub module: String,
40    pub name: String,
41    pub with: Value,
42    pub input: Value,
43    pub output: Value,
44    pub input_order: Value,
45    pub local_path: Option<String>,
46}
47
48impl ModuleData {
49    pub fn set_info(&mut self, info: Value) {
50        if info.is_null() || info.is_null() {
51            return;
52        }
53
54        let input = match info.get("input") {
55            Some(input) => {
56                if let Value::Object(obj) = input {
57                    if let Some(obj_type) = obj.get("type") {
58                        if obj_type.to_string() == "object" {
59                            obj.get("properties").unwrap_or(&Value::Null).clone()
60                        } else {
61                            input.clone()
62                        }
63                    } else {
64                        input.clone()
65                    }
66                } else {
67                    input.clone()
68                }
69            }
70            None => Value::Null,
71        };
72
73        let output = match info.get("output") {
74            Some(output) => output.clone(),
75            None => Value::Null,
76        };
77
78        let input_order = match info.get("input_order") {
79            Some(input_order) => input_order.clone(),
80            None => Value::Null,
81        };
82
83        self.input = input;
84        self.output = output;
85        self.input_order = input_order;
86    }
87}
88
89impl TryFrom<Value> for ModuleData {
90    type Error = Error;
91
92    fn try_from(value: Value) -> Result<Self, Error> {
93        let module = match value.get("module") {
94            Some(module) => module.to_string(),
95            None => return Err(Error::ModuleLoaderError("Module not found".to_string())),
96        };
97        let repository = value.get("repository").map(|v| v.to_string());
98
99        // Check if module is a local path
100        let is_local_path =
101            module.starts_with("./") || module.starts_with("../") || module.starts_with("/");
102        let local_path = if is_local_path {
103            Some(module.clone())
104        } else {
105            None
106        };
107
108        let repository_path = if repository.is_none() && !is_local_path {
109            let mut padded = module.to_string();
110            while padded.len() < 4 {
111                padded.push('_');
112            }
113
114            let prefix = &padded[0..2];
115            let middle = &padded[2..4];
116
117            let repository = format!("{}/{}/{}", prefix, middle, module);
118            Some(repository)
119        } else {
120            None
121        };
122
123        let repository_raw_content = value.get("repository_raw_content").map(|v| v.to_string());
124
125        let version = match value.get("version") {
126            Some(version) => version.to_string(),
127            None => "latest".to_string(),
128        };
129
130        let name = match value.get("name") {
131            Some(name) => name.to_string(),
132            None => {
133                if is_local_path {
134                    // Extract module name from path
135                    std::path::Path::new(&module)
136                        .file_name()
137                        .and_then(|os_str| os_str.to_str())
138                        .unwrap_or(&module)
139                        .to_string()
140                } else {
141                    module.clone()
142                }
143            }
144        };
145
146        let with = match value.get("with") {
147            Some(with) => with.clone(),
148            None => Value::Null,
149        };
150
151        Ok(ModuleData {
152            module,
153            repository,
154            version,
155            name,
156            with,
157            input: Value::Null,
158            output: Value::Null,
159            repository_path,
160            repository_raw_content,
161            input_order: Value::Null,
162            local_path,
163        })
164    }
165}
166
167#[derive(Debug, Clone)]
168pub enum ModulesError {
169    ModuleNotFound(String),
170    ModuleNotLoaded(String),
171    ModuleError(String),
172}
173
174impl Display for ModulesError {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        match self {
177            ModulesError::ModuleNotFound(name) => write!(f, "Module not found: {}", name),
178            ModulesError::ModuleNotLoaded(name) => write!(f, "Module not loaded: {}", name),
179            ModulesError::ModuleError(err) => write!(f, "Module error: {}", err),
180        }
181    }
182}
183
184#[derive(Debug, Clone)]
185pub struct ModuleResponse {
186    pub error: Option<String>,
187    pub data: Value,
188}
189
190impl Into<ModuleResponse> for Value {
191    fn into(self) -> ModuleResponse {
192        ModuleResponse {
193            error: None,
194            data: self,
195        }
196    }
197}
198
199impl ModuleResponse {
200    pub fn from_error(error: String) -> Self {
201        Self {
202            error: Some(error),
203            data: Value::Null,
204        }
205    }
206
207    pub fn from_success(value: Value) -> Self {
208        Self {
209            error: None,
210            data: value,
211        }
212    }
213}
214
215#[derive(Debug)]
216pub struct ModulePackage {
217    pub input: Option<Value>,
218    pub payload: Option<Value>,
219    pub sender: oneshot::Sender<ModuleResponse>,
220}
221
222impl ModulePackage {
223    pub fn input(&self) -> Option<Value> {
224        self.input.clone()
225    }
226
227    pub fn payload(&self) -> Option<Value> {
228        self.payload.clone()
229    }
230}
231
232#[derive(Debug, Default, Clone)]
233pub struct ModuleParams {
234    pub with: Value,
235    pub input: Value,
236    pub output: Value,
237    pub input_order: Vec<String>,
238}
239
240#[derive(Debug, Clone)]
241pub struct Module {
242    pub sender: channel::Sender<ModulePackage>,
243    pub params: ModuleParams,
244}
245
246impl Module {
247    pub fn send(&self, input: Option<Value>, payload: Option<Value>) -> Receiver<ModuleResponse> {
248        let (package_sender, package_receiver) = oneshot::channel();
249        let package = ModulePackage {
250            input,
251            payload,
252            sender: package_sender,
253        };
254
255        let _ = self.sender.send(package);
256
257        package_receiver
258    }
259}
260
261#[derive(Debug, Default, Clone)]
262pub struct Modules {
263    pub modules: HashMap<String, Module>,
264}
265
266impl Modules {
267    pub fn extract(&self) -> Self {
268        Self {
269            modules: self.modules.clone(),
270        }
271    }
272
273    pub fn register(&mut self, module_data: ModuleData, sender: channel::Sender<ModulePackage>) {
274        let input_order = if let Value::Array(arr) = module_data.input_order {
275            arr.into_iter().map(|s| s.to_string()).collect()
276        } else {
277            Vec::new()
278        };
279
280        let module = Module {
281            sender,
282            params: ModuleParams {
283                with: module_data.with,
284                input: module_data.input,
285                output: module_data.output,
286                input_order,
287            },
288        };
289
290        self.modules.insert(module_data.name.to_string(), module);
291    }
292
293    pub async fn execute(
294        &self,
295        name: &str,
296        input: &Option<Value>,
297        payload: &Option<Value>,
298    ) -> Result<ModuleResponse, ModulesError> {
299        if let Some(module) = self.modules.get(name) {
300            let package_receiver = module.send(input.clone(), payload.clone());
301
302            let value = package_receiver.await.unwrap_or(ModuleResponse::from_error(
303                "Module response channel closed".to_string(),
304            ));
305
306            Ok(value)
307        } else {
308            Err(ModulesError::ModuleNotLoaded(name.to_string()))
309        }
310    }
311
312    pub fn extract_repositories(&self) -> Repositories {
313        let mut repositories = HashMap::new();
314
315        for (name, module) in self.modules.clone() {
316            let args = module.params.input_order.clone();
317            let func = move |value: Value| {
318                let package_receiver = module.send(Some(value), None);
319
320                async move {
321                    let result = package_receiver.await.unwrap_or(ModuleResponse::from_error(
322                        "Module response channel closed".to_string(),
323                    ));
324
325                    if let Some(error) = result.error {
326                        Value::from(format!("Error: {}", error))
327                    } else {
328                        result.data
329                    }
330                }
331            };
332
333            let repository_function = wrap_async_fn(name.clone(), func, args);
334
335            repositories.insert(name, repository_function);
336        }
337
338        Repositories { repositories }
339    }
340}