Skip to main content

code0_flow/flow_service/
mod.rs

1use crate::{
2    flow_definition::Reader,
3    flow_service::{auth::get_authorization_metadata, retry::create_channel_with_retry},
4};
5use tokio::time::Duration;
6use tonic::{Extensions, Request, transport::Channel};
7use tucana::{
8    aquila::{ModuleUpdateRequest, module_service_client::ModuleServiceClient},
9    shared::{Module, ModuleDefinition},
10};
11
12pub mod auth;
13pub mod retry;
14
15pub struct FlowUpdateService {
16    modules: Vec<Module>,
17    channel: Channel,
18    aquila_token: String,
19    definition_source: Option<String>,
20}
21
22pub struct ModuleDefinitionAppendix {
23    pub module_identifier: String,
24    pub definitions: Vec<ModuleDefinition>,
25}
26
27impl FlowUpdateService {
28    /// Create a new FlowUpdateService instance from an Aquila URL and a definition path.
29    ///
30    /// This reads the definition files from the given path as modules and initializes the
31    /// service with those module definitions.
32    pub async fn from_url(
33        aquila_url: String,
34        definition_path: &str,
35        aquila_token: String,
36        connect_timeout: Duration,
37        request_timeout: Duration,
38    ) -> Self {
39        let reader = Reader::configure(definition_path.to_string(), true, vec![], None);
40        let modules = match reader.read_modules() {
41            Ok(modules) => modules,
42            Err(error) => {
43                log::error!("Error occurred while reading definitions: {:?}", error);
44                panic!("Error occurred while reading definitions")
45            }
46        };
47
48        let channel =
49            create_channel_with_retry("Aquila", aquila_url, connect_timeout, request_timeout).await;
50
51        Self {
52            modules,
53            channel,
54            aquila_token,
55            definition_source: None,
56        }
57    }
58
59    pub fn with_appendix(mut self, appendices: Vec<ModuleDefinitionAppendix>) -> Self {
60        append_definitions_to_matching_modules(&mut self.modules, appendices);
61        self
62    }
63
64    pub fn with_definition_source(mut self, source: String) -> Self {
65        self.definition_source = Some(source);
66        self
67    }
68
69    pub async fn send(&mut self) {
70        let _ = self.send_with_status().await;
71    }
72
73    pub async fn send_with_status(&mut self) -> bool {
74        self.update().await
75    }
76
77    async fn update(&mut self) -> bool {
78        if self.modules.is_empty() {
79            log::info!("No Modules are present, aborting update.");
80            return true;
81        }
82
83        let mut modules = self.modules.clone();
84        if let Some(source) = &self.definition_source {
85            modules = modules
86                .into_iter()
87                .map(|module| apply_definition_source_to_module(module, source.clone()))
88                .collect::<Vec<_>>();
89        }
90
91        log::info!("Updating {} Modules.", self.modules.len());
92        let mut client = ModuleServiceClient::new(self.channel.clone());
93        let request = Request::from_parts(
94            get_authorization_metadata(&self.aquila_token),
95            Extensions::new(),
96            ModuleUpdateRequest { modules },
97        );
98
99        match client.update(request).await {
100            Ok(response) => {
101                let res = response.into_inner();
102
103                match res.success {
104                    true => log::info!("Module definition update has been successful"),
105                    false => log::warn!("Module definition update has been unsuccessful"),
106                };
107
108                res.success
109            }
110            Err(err) => {
111                log::error!("Module definition update failed. Reason: {:?}", err);
112                false
113            }
114        }
115    }
116}
117
118fn append_definitions_to_matching_modules(
119    modules: &mut [Module],
120    appendices: Vec<ModuleDefinitionAppendix>,
121) {
122    for appendix in appendices {
123        for module in modules.iter_mut() {
124            if module.identifier == appendix.module_identifier {
125                module.definitions.extend(appendix.definitions.clone());
126            }
127        }
128    }
129}
130
131fn apply_definition_source_to_module(mut module: Module, source: String) -> Module {
132    for data_type in &mut module.definition_data_types {
133        data_type.definition_source = source.clone();
134    }
135    for flow_type in &mut module.flow_types {
136        flow_type.definition_source = Some(source.clone());
137    }
138    for runtime_flow_type in &mut module.runtime_flow_types {
139        runtime_flow_type.definition_source = Some(source.clone());
140    }
141    for function in &mut module.function_definitions {
142        function.definition_source = source.clone();
143    }
144    for runtime_function in &mut module.runtime_function_definitions {
145        runtime_function.definition_source = source.clone();
146    }
147
148    module
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    fn module(identifier: &str) -> Module {
156        Module {
157            identifier: identifier.to_string(),
158            name: Vec::new(),
159            description: Vec::new(),
160            documentation: String::new(),
161            author: String::new(),
162            icon: String::new(),
163            version: String::new(),
164            flow_types: Vec::new(),
165            runtime_flow_types: Vec::new(),
166            function_definitions: Vec::new(),
167            runtime_function_definitions: Vec::new(),
168            definition_data_types: Vec::new(),
169            configurations: Vec::new(),
170            definitions: Vec::new(),
171        }
172    }
173
174    fn definition(identifier: &str) -> ModuleDefinition {
175        ModuleDefinition {
176            flow_type_identifier: vec![identifier.to_string()],
177            value: None,
178        }
179    }
180
181    #[test]
182    fn appends_appendix_definitions_to_all_modules_with_same_identifier() {
183        let mut modules = vec![module("shared"), module("other"), module("shared")];
184        let appendices = vec![ModuleDefinitionAppendix {
185            module_identifier: "shared".to_string(),
186            definitions: vec![definition("flow")],
187        }];
188
189        append_definitions_to_matching_modules(&mut modules, appendices);
190
191        assert_eq!(modules[0].definitions.len(), 1);
192        assert_eq!(modules[1].definitions.len(), 0);
193        assert_eq!(modules[2].definitions.len(), 1);
194    }
195
196    #[test]
197    fn appends_multiple_matching_appendices() {
198        let mut modules = vec![module("shared")];
199        let appendices = vec![
200            ModuleDefinitionAppendix {
201                module_identifier: "shared".to_string(),
202                definitions: vec![definition("flow-a")],
203            },
204            ModuleDefinitionAppendix {
205                module_identifier: "shared".to_string(),
206                definitions: vec![definition("flow-b")],
207            },
208            ModuleDefinitionAppendix {
209                module_identifier: "other".to_string(),
210                definitions: vec![definition("flow-c")],
211            },
212        ];
213
214        append_definitions_to_matching_modules(&mut modules, appendices);
215
216        assert_eq!(modules[0].definitions.len(), 2);
217        assert_eq!(
218            modules[0].definitions[0].flow_type_identifier,
219            vec!["flow-a".to_string()]
220        );
221        assert_eq!(
222            modules[0].definitions[1].flow_type_identifier,
223            vec!["flow-b".to_string()]
224        );
225    }
226}