phlow_sdk/structs/
modules.rs

1use crossbeam::channel;
2use phs::{wrap_async_fn, Repositories};
3use std::{collections::HashMap, fmt::Display};
4use tokio::sync::oneshot::{self, Receiver};
5use valu3::{prelude::*, value::Value};
6
7pub enum Error {
8    VersionNotFound(String),
9    ModuleLoaderError(String),
10    ModuleNotFound(String),
11}
12
13impl std::fmt::Debug for Error {
14    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15        match self {
16            Error::VersionNotFound(module) => write!(f, "Version not found for module: {}", module),
17            Error::ModuleLoaderError(err) => write!(f, "Module loader error: {}", err),
18            Error::ModuleNotFound(name) => write!(f, "Module not found: {}", name),
19        }
20    }
21}
22
23impl Display for Error {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        match self {
26            Error::VersionNotFound(module) => write!(f, "Version not found for module: {}", module),
27            Error::ModuleLoaderError(err) => write!(f, "Module loader error: {}", err),
28            Error::ModuleNotFound(name) => write!(f, "Module not found: {}", name),
29        }
30    }
31}
32
33#[derive(ToValue, FromValue, Clone, Debug)]
34pub struct ModuleData {
35    pub version: String,
36    pub repository: Option<String>,
37    pub repository_path: Option<String>,
38    pub repository_raw_content: Option<String>,
39    pub module: String,
40    pub name: String,
41    pub with: Value,
42    pub input: Value,
43    pub output: Value,
44    pub input_order: Value,
45}
46
47impl ModuleData {
48    pub fn set_info(&mut self, info: Value) {
49        let input = match info.get("input") {
50            Some(input) => {
51                if let Value::Object(obj) = input {
52                    if let Some(obj_type) = obj.get("type") {
53                        if obj_type.to_string() == "object" {
54                            obj.get("properties").unwrap_or(&Value::Null).clone()
55                        } else {
56                            input.clone()
57                        }
58                    } else {
59                        input.clone()
60                    }
61                } else {
62                    input.clone()
63                }
64            }
65            None => Value::Null,
66        };
67
68        let output = match info.get("output") {
69            Some(output) => output.clone(),
70            None => Value::Null,
71        };
72
73        let input_order = match info.get("input_order") {
74            Some(input_order) => input_order.clone(),
75            None => Value::Null,
76        };
77
78        self.input = input;
79        self.output = output;
80        self.input_order = input_order;
81    }
82}
83
84impl TryFrom<Value> for ModuleData {
85    type Error = Error;
86
87    fn try_from(value: Value) -> Result<Self, Error> {
88        let module = match value.get("module") {
89            Some(module) => module.to_string(),
90            None => return Err(Error::ModuleLoaderError("Module not found".to_string())),
91        };
92        let repository = value.get("repository").map(|v| v.to_string());
93
94        let repository_path = if repository.is_none() {
95            let mut padded = module.to_string();
96            while padded.len() < 4 {
97                padded.push('_');
98            }
99
100            let prefix = &padded[0..2];
101            let middle = &padded[2..4];
102
103            let repository = format!("{}/{}/{}", prefix, middle, module);
104            Some(repository)
105        } else {
106            None
107        };
108
109        let repository_raw_content = value.get("repository_raw_content").map(|v| v.to_string());
110
111        let version = match value.get("version") {
112            Some(version) => version.to_string(),
113            None => return Err(Error::VersionNotFound(module.clone())),
114        };
115
116        let name = match value.get("name") {
117            Some(name) => name.to_string(),
118            None => module.clone(),
119        };
120
121        let with = match value.get("with") {
122            Some(with) => with.clone(),
123            None => Value::Null,
124        };
125
126        Ok(ModuleData {
127            module,
128            repository,
129            version,
130            name,
131            with,
132            input: Value::Null,
133            output: Value::Null,
134            repository_path,
135            repository_raw_content,
136            input_order: Value::Null,
137        })
138    }
139}
140
141#[derive(Debug, Clone)]
142pub enum ModulesError {
143    ModuleNotFound(String),
144    ModuleNotLoaded(String),
145    ModuleError(String),
146}
147
148impl Display for ModulesError {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        match self {
151            ModulesError::ModuleNotFound(name) => write!(f, "Module not found: {}", name),
152            ModulesError::ModuleNotLoaded(name) => write!(f, "Module not loaded: {}", name),
153            ModulesError::ModuleError(err) => write!(f, "Module error: {}", err),
154        }
155    }
156}
157
158#[derive(Debug, Clone)]
159pub struct ModuleResponse {
160    pub error: Option<String>,
161    pub data: Value,
162}
163
164impl Into<ModuleResponse> for Value {
165    fn into(self) -> ModuleResponse {
166        ModuleResponse {
167            error: None,
168            data: self,
169        }
170    }
171}
172
173impl ModuleResponse {
174    pub fn from_error(error: String) -> Self {
175        Self {
176            error: Some(error),
177            data: Value::Null,
178        }
179    }
180
181    pub fn from_success(value: Value) -> Self {
182        Self {
183            error: None,
184            data: value,
185        }
186    }
187}
188
189#[derive(Debug)]
190pub struct ModulePackage {
191    pub input: Option<Value>,
192    pub sender: oneshot::Sender<ModuleResponse>,
193}
194
195impl ModulePackage {
196    pub fn input(&self) -> Option<Value> {
197        self.input.clone()
198    }
199}
200
201#[derive(Debug, Default, Clone)]
202pub struct ModuleParams {
203    pub with: Value,
204    pub input: Value,
205    pub output: Value,
206    pub input_order: Vec<String>,
207}
208
209#[derive(Debug, Clone)]
210pub struct Module {
211    pub sender: channel::Sender<ModulePackage>,
212    pub params: ModuleParams,
213}
214
215impl Module {
216    pub fn send(&self, input: Option<Value>) -> Receiver<ModuleResponse> {
217        let (package_sender, package_receiver) = oneshot::channel();
218        let package = ModulePackage {
219            input,
220            sender: package_sender,
221        };
222
223        let _ = self.sender.send(package);
224
225        package_receiver
226    }
227}
228
229#[derive(Debug, Default, Clone)]
230pub struct Modules {
231    pub modules: HashMap<String, Module>,
232}
233
234impl Modules {
235    pub fn extract(&self) -> Self {
236        Self {
237            modules: self.modules.clone(),
238        }
239    }
240
241    pub fn register(&mut self, module_data: ModuleData, sender: channel::Sender<ModulePackage>) {
242        let input_order = if let Value::Array(arr) = module_data.input_order {
243            arr.into_iter().map(|s| s.to_string()).collect()
244        } else {
245            Vec::new()
246        };
247
248        let module = Module {
249            sender,
250            params: ModuleParams {
251                with: module_data.with,
252                input: module_data.input,
253                output: module_data.output,
254                input_order,
255            },
256        };
257
258        self.modules.insert(module_data.name.to_string(), module);
259    }
260
261    pub async fn execute(
262        &self,
263        name: &str,
264        input: &Option<Value>,
265    ) -> Result<ModuleResponse, ModulesError> {
266        if let Some(module) = self.modules.get(name) {
267            let package_receiver = module.send(input.clone());
268
269            let value = package_receiver.await.unwrap_or(ModuleResponse::from_error(
270                "Module response channel closed".to_string(),
271            ));
272
273            Ok(value)
274        } else {
275            Err(ModulesError::ModuleNotLoaded(name.to_string()))
276        }
277    }
278
279    pub fn extract_repositories(&self) -> Repositories {
280        let mut repositories = HashMap::new();
281
282        for (name, module) in self.modules.clone() {
283            let args = module.params.input_order.clone();
284            let func = move |value: Value| {
285                let package_receiver = module.send(Some(value));
286
287                async move {
288                    let result = package_receiver.await.unwrap_or(ModuleResponse::from_error(
289                        "Module response channel closed".to_string(),
290                    ));
291
292                    if let Some(error) = result.error {
293                        Value::from(format!("Error: {}", error))
294                    } else {
295                        result.data
296                    }
297                }
298            };
299
300            let repository_function = wrap_async_fn(name.clone(), func, args);
301
302            repositories.insert(name, repository_function);
303        }
304
305        Repositories { repositories }
306    }
307}