Skip to main content

code0_flow/flow_service/
mod.rs

1use crate::{
2    flow_definition::Reader,
3    flow_service::{auth::get_authorization_metadata, retry::create_channel_with_retry},
4};
5use tonic::{Extensions, Request, transport::Channel};
6use tucana::{
7    aquila::{ModuleUpdateRequest, module_service_client::ModuleServiceClient},
8    shared::{Module, ModuleDefinition},
9};
10
11pub mod auth;
12pub mod retry;
13
14pub struct FlowUpdateService {
15    modules: Vec<Module>,
16    channel: Channel,
17    aquila_token: String,
18    definition_source: Option<String>,
19}
20
21pub struct ModuleDefinitionAppendix {
22    pub module_identifier: String,
23    pub definitions: Vec<ModuleDefinition>,
24}
25
26impl FlowUpdateService {
27    /// Create a new FlowUpdateService instance from an Aquila URL and a definition path.
28    ///
29    /// This reads the definition files from the given path as modules and initializes the
30    /// service with those module definitions.
31    pub async fn from_url(aquila_url: String, definition_path: &str, aquila_token: String) -> Self {
32        let reader = Reader::configure(definition_path.to_string(), true, vec![], None);
33        let modules = match reader.read_modules() {
34            Ok(modules) => modules,
35            Err(error) => {
36                log::error!("Error occurred while reading definitions: {:?}", error);
37                panic!("Error occurred while reading definitions")
38            }
39        };
40
41        let channel = create_channel_with_retry("Aquila", aquila_url).await;
42
43        Self {
44            modules,
45            channel,
46            aquila_token,
47            definition_source: None,
48        }
49    }
50
51    pub fn with_appendix(mut self, appendices: Vec<ModuleDefinitionAppendix>) -> Self {
52        append_definitions_to_matching_modules(&mut self.modules, appendices);
53        self
54    }
55
56    pub fn with_definition_source(mut self, source: String) -> Self {
57        self.definition_source = Some(source);
58        self
59    }
60
61    pub async fn send(&mut self) {
62        let _ = self.send_with_status().await;
63    }
64
65    pub async fn send_with_status(&mut self) -> bool {
66        self.update().await
67    }
68
69    async fn update(&mut self) -> bool {
70        if self.modules.is_empty() {
71            log::info!("No Modules are present, aborting update.");
72            return true;
73        }
74
75        let mut modules = self.modules.clone();
76        if let Some(source) = &self.definition_source {
77            modules = modules
78                .into_iter()
79                .map(|module| apply_definition_source_to_module(module, source.clone()))
80                .collect::<Vec<_>>();
81        }
82
83        log::info!("Updating {} Modules.", self.modules.len());
84        let mut client = ModuleServiceClient::new(self.channel.clone());
85        let request = Request::from_parts(
86            get_authorization_metadata(&self.aquila_token),
87            Extensions::new(),
88            ModuleUpdateRequest { modules },
89        );
90
91        match client.update(request).await {
92            Ok(response) => {
93                let res = response.into_inner();
94
95                match res.success {
96                    true => log::info!("Module definition update has been successful"),
97                    false => log::warn!("Module definition update has been unsuccessful"),
98                };
99
100                res.success
101            }
102            Err(err) => {
103                log::error!("Module definition update failed. Reason: {:?}", err);
104                false
105            }
106        }
107    }
108}
109
110fn append_definitions_to_matching_modules(
111    modules: &mut [Module],
112    appendices: Vec<ModuleDefinitionAppendix>,
113) {
114    for appendix in appendices {
115        for module in modules.iter_mut() {
116            if module.identifier == appendix.module_identifier {
117                module.definitions.extend(appendix.definitions.clone());
118            }
119        }
120    }
121}
122
123fn apply_definition_source_to_module(mut module: Module, source: String) -> Module {
124    for data_type in &mut module.definition_data_types {
125        data_type.definition_source = source.clone();
126    }
127    for flow_type in &mut module.flow_types {
128        flow_type.definition_source = Some(source.clone());
129    }
130    for runtime_flow_type in &mut module.runtime_flow_types {
131        runtime_flow_type.definition_source = Some(source.clone());
132    }
133    for function in &mut module.function_definitions {
134        function.definition_source = source.clone();
135    }
136    for runtime_function in &mut module.runtime_function_definitions {
137        runtime_function.definition_source = source.clone();
138    }
139
140    module
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146
147    fn module(identifier: &str) -> Module {
148        Module {
149            identifier: identifier.to_string(),
150            name: Vec::new(),
151            description: Vec::new(),
152            documentation: String::new(),
153            author: String::new(),
154            icon: String::new(),
155            version: String::new(),
156            flow_types: Vec::new(),
157            runtime_flow_types: Vec::new(),
158            function_definitions: Vec::new(),
159            runtime_function_definitions: Vec::new(),
160            definition_data_types: Vec::new(),
161            configurations: Vec::new(),
162            definitions: Vec::new(),
163        }
164    }
165
166    fn definition(identifier: &str) -> ModuleDefinition {
167        ModuleDefinition {
168            flow_type_identifier: vec![identifier.to_string()],
169            value: None,
170        }
171    }
172
173    #[test]
174    fn appends_appendix_definitions_to_all_modules_with_same_identifier() {
175        let mut modules = vec![module("shared"), module("other"), module("shared")];
176        let appendices = vec![ModuleDefinitionAppendix {
177            module_identifier: "shared".to_string(),
178            definitions: vec![definition("flow")],
179        }];
180
181        append_definitions_to_matching_modules(&mut modules, appendices);
182
183        assert_eq!(modules[0].definitions.len(), 1);
184        assert_eq!(modules[1].definitions.len(), 0);
185        assert_eq!(modules[2].definitions.len(), 1);
186    }
187
188    #[test]
189    fn appends_multiple_matching_appendices() {
190        let mut modules = vec![module("shared")];
191        let appendices = vec![
192            ModuleDefinitionAppendix {
193                module_identifier: "shared".to_string(),
194                definitions: vec![definition("flow-a")],
195            },
196            ModuleDefinitionAppendix {
197                module_identifier: "shared".to_string(),
198                definitions: vec![definition("flow-b")],
199            },
200            ModuleDefinitionAppendix {
201                module_identifier: "other".to_string(),
202                definitions: vec![definition("flow-c")],
203            },
204        ];
205
206        append_definitions_to_matching_modules(&mut modules, appendices);
207
208        assert_eq!(modules[0].definitions.len(), 2);
209        assert_eq!(
210            modules[0].definitions[0].flow_type_identifier,
211            vec!["flow-a".to_string()]
212        );
213        assert_eq!(
214            modules[0].definitions[1].flow_type_identifier,
215            vec!["flow-b".to_string()]
216        );
217    }
218}