code0_flow/flow_service/
mod.rs1use crate::{
2 flow_definition::Reader,
3 flow_service::{auth::get_authorization_metadata, retry::create_channel_with_retry},
4};
5use tonic::{Extensions, Request, transport::Channel};
6use tucana::{
7 aquila::{ModuleUpdateRequest, module_service_client::ModuleServiceClient},
8 shared::{Module, ModuleDefinition},
9};
10
11pub mod auth;
12pub mod retry;
13
14pub struct FlowUpdateService {
15 modules: Vec<Module>,
16 channel: Channel,
17 aquila_token: String,
18 definition_source: Option<String>,
19}
20
21pub struct ModuleDefinitionAppendix {
22 pub module_identifier: String,
23 pub definitions: Vec<ModuleDefinition>,
24}
25
26impl FlowUpdateService {
27 pub async fn from_url(aquila_url: String, definition_path: &str, aquila_token: String) -> Self {
32 let reader = Reader::configure(definition_path.to_string(), true, vec![], None);
33 let modules = match reader.read_modules() {
34 Ok(modules) => modules,
35 Err(error) => {
36 log::error!("Error occurred while reading definitions: {:?}", error);
37 panic!("Error occurred while reading definitions")
38 }
39 };
40
41 let channel = create_channel_with_retry("Aquila", aquila_url).await;
42
43 Self {
44 modules,
45 channel,
46 aquila_token,
47 definition_source: None,
48 }
49 }
50
51 pub fn with_appendix(mut self, appendices: Vec<ModuleDefinitionAppendix>) -> Self {
52 append_definitions_to_matching_modules(&mut self.modules, appendices);
53 self
54 }
55
56 pub fn with_definition_source(mut self, source: String) -> Self {
57 self.definition_source = Some(source);
58 self
59 }
60
61 pub async fn send(&mut self) {
62 let _ = self.send_with_status().await;
63 }
64
65 pub async fn send_with_status(&mut self) -> bool {
66 self.update().await
67 }
68
69 async fn update(&mut self) -> bool {
70 if self.modules.is_empty() {
71 log::info!("No Modules are present, aborting update.");
72 return true;
73 }
74
75 let mut modules = self.modules.clone();
76 if let Some(source) = &self.definition_source {
77 modules = modules
78 .into_iter()
79 .map(|module| apply_definition_source_to_module(module, source.clone()))
80 .collect::<Vec<_>>();
81 }
82
83 log::info!("Updating {} Modules.", self.modules.len());
84 let mut client = ModuleServiceClient::new(self.channel.clone());
85 let request = Request::from_parts(
86 get_authorization_metadata(&self.aquila_token),
87 Extensions::new(),
88 ModuleUpdateRequest { modules },
89 );
90
91 match client.update(request).await {
92 Ok(response) => {
93 let res = response.into_inner();
94
95 match res.success {
96 true => log::info!("Module definition update has been successful"),
97 false => log::warn!("Module definition update has been unsuccessful"),
98 };
99
100 res.success
101 }
102 Err(err) => {
103 log::error!("Module definition update failed. Reason: {:?}", err);
104 false
105 }
106 }
107 }
108}
109
110fn append_definitions_to_matching_modules(
111 modules: &mut [Module],
112 appendices: Vec<ModuleDefinitionAppendix>,
113) {
114 for appendix in appendices {
115 for module in modules.iter_mut() {
116 if module.identifier == appendix.module_identifier {
117 module.definitions.extend(appendix.definitions.clone());
118 }
119 }
120 }
121}
122
123fn apply_definition_source_to_module(mut module: Module, source: String) -> Module {
124 for data_type in &mut module.definition_data_types {
125 data_type.definition_source = source.clone();
126 }
127 for flow_type in &mut module.flow_types {
128 flow_type.definition_source = Some(source.clone());
129 }
130 for runtime_flow_type in &mut module.runtime_flow_types {
131 runtime_flow_type.definition_source = Some(source.clone());
132 }
133 for function in &mut module.function_definitions {
134 function.definition_source = source.clone();
135 }
136 for runtime_function in &mut module.runtime_function_definitions {
137 runtime_function.definition_source = source.clone();
138 }
139
140 module
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146
147 fn module(identifier: &str) -> Module {
148 Module {
149 identifier: identifier.to_string(),
150 name: Vec::new(),
151 description: Vec::new(),
152 documentation: String::new(),
153 author: String::new(),
154 icon: String::new(),
155 version: String::new(),
156 flow_types: Vec::new(),
157 runtime_flow_types: Vec::new(),
158 function_definitions: Vec::new(),
159 runtime_function_definitions: Vec::new(),
160 definition_data_types: Vec::new(),
161 configurations: Vec::new(),
162 definitions: Vec::new(),
163 }
164 }
165
166 fn definition(identifier: &str) -> ModuleDefinition {
167 ModuleDefinition {
168 flow_type_identifier: vec![identifier.to_string()],
169 value: None,
170 }
171 }
172
173 #[test]
174 fn appends_appendix_definitions_to_all_modules_with_same_identifier() {
175 let mut modules = vec![module("shared"), module("other"), module("shared")];
176 let appendices = vec![ModuleDefinitionAppendix {
177 module_identifier: "shared".to_string(),
178 definitions: vec![definition("flow")],
179 }];
180
181 append_definitions_to_matching_modules(&mut modules, appendices);
182
183 assert_eq!(modules[0].definitions.len(), 1);
184 assert_eq!(modules[1].definitions.len(), 0);
185 assert_eq!(modules[2].definitions.len(), 1);
186 }
187
188 #[test]
189 fn appends_multiple_matching_appendices() {
190 let mut modules = vec![module("shared")];
191 let appendices = vec![
192 ModuleDefinitionAppendix {
193 module_identifier: "shared".to_string(),
194 definitions: vec![definition("flow-a")],
195 },
196 ModuleDefinitionAppendix {
197 module_identifier: "shared".to_string(),
198 definitions: vec![definition("flow-b")],
199 },
200 ModuleDefinitionAppendix {
201 module_identifier: "other".to_string(),
202 definitions: vec![definition("flow-c")],
203 },
204 ];
205
206 append_definitions_to_matching_modules(&mut modules, appendices);
207
208 assert_eq!(modules[0].definitions.len(), 2);
209 assert_eq!(
210 modules[0].definitions[0].flow_type_identifier,
211 vec!["flow-a".to_string()]
212 );
213 assert_eq!(
214 modules[0].definitions[1].flow_type_identifier,
215 vec!["flow-b".to_string()]
216 );
217 }
218}