code0_flow/flow_service/
mod.rs1use crate::{
2 flow_definition::Reader,
3 flow_service::{auth::get_authorization_metadata, retry::create_channel_with_retry},
4};
5use tokio::time::Duration;
6use tonic::{Extensions, Request, transport::Channel};
7use tucana::{
8 aquila::{ModuleUpdateRequest, module_service_client::ModuleServiceClient},
9 shared::{Module, ModuleDefinition},
10};
11
12pub mod auth;
13pub mod retry;
14
15pub struct FlowUpdateService {
16 modules: Vec<Module>,
17 channel: Channel,
18 aquila_token: String,
19 definition_source: Option<String>,
20}
21
22pub struct ModuleDefinitionAppendix {
23 pub module_identifier: String,
24 pub definitions: Vec<ModuleDefinition>,
25}
26
27impl FlowUpdateService {
28 pub async fn from_url(
33 aquila_url: String,
34 definition_path: &str,
35 aquila_token: String,
36 connect_timeout: Duration,
37 request_timeout: Duration,
38 ) -> Self {
39 let reader = Reader::configure(definition_path.to_string(), true, vec![], None);
40 let modules = match reader.read_modules() {
41 Ok(modules) => modules,
42 Err(error) => {
43 log::error!("Error occurred while reading definitions: {:?}", error);
44 panic!("Error occurred while reading definitions")
45 }
46 };
47
48 let channel =
49 create_channel_with_retry("Aquila", aquila_url, connect_timeout, request_timeout).await;
50
51 Self {
52 modules,
53 channel,
54 aquila_token,
55 definition_source: None,
56 }
57 }
58
59 pub fn with_appendix(mut self, appendices: Vec<ModuleDefinitionAppendix>) -> Self {
60 append_definitions_to_matching_modules(&mut self.modules, appendices);
61 self
62 }
63
64 pub fn with_definition_source(mut self, source: String) -> Self {
65 self.definition_source = Some(source);
66 self
67 }
68
69 pub async fn send(&mut self) {
70 let _ = self.send_with_status().await;
71 }
72
73 pub async fn send_with_status(&mut self) -> bool {
74 self.update().await
75 }
76
77 async fn update(&mut self) -> bool {
78 if self.modules.is_empty() {
79 log::info!("No Modules are present, aborting update.");
80 return true;
81 }
82
83 let mut modules = self.modules.clone();
84 if let Some(source) = &self.definition_source {
85 modules = modules
86 .into_iter()
87 .map(|module| apply_definition_source_to_module(module, source.clone()))
88 .collect::<Vec<_>>();
89 }
90
91 log::info!("Updating {} Modules.", self.modules.len());
92 let mut client = ModuleServiceClient::new(self.channel.clone());
93 let request = Request::from_parts(
94 get_authorization_metadata(&self.aquila_token),
95 Extensions::new(),
96 ModuleUpdateRequest { modules },
97 );
98
99 match client.update(request).await {
100 Ok(response) => {
101 let res = response.into_inner();
102
103 match res.success {
104 true => log::info!("Module definition update has been successful"),
105 false => log::warn!("Module definition update has been unsuccessful"),
106 };
107
108 res.success
109 }
110 Err(err) => {
111 log::error!("Module definition update failed. Reason: {:?}", err);
112 false
113 }
114 }
115 }
116}
117
118fn append_definitions_to_matching_modules(
119 modules: &mut [Module],
120 appendices: Vec<ModuleDefinitionAppendix>,
121) {
122 for appendix in appendices {
123 for module in modules.iter_mut() {
124 if module.identifier == appendix.module_identifier {
125 module.definitions.extend(appendix.definitions.clone());
126 }
127 }
128 }
129}
130
131fn apply_definition_source_to_module(mut module: Module, source: String) -> Module {
132 for data_type in &mut module.definition_data_types {
133 data_type.definition_source = source.clone();
134 }
135 for flow_type in &mut module.flow_types {
136 flow_type.definition_source = Some(source.clone());
137 }
138 for runtime_flow_type in &mut module.runtime_flow_types {
139 runtime_flow_type.definition_source = Some(source.clone());
140 }
141 for function in &mut module.function_definitions {
142 function.definition_source = source.clone();
143 }
144 for runtime_function in &mut module.runtime_function_definitions {
145 runtime_function.definition_source = source.clone();
146 }
147
148 module
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154
155 fn module(identifier: &str) -> Module {
156 Module {
157 identifier: identifier.to_string(),
158 name: Vec::new(),
159 description: Vec::new(),
160 documentation: String::new(),
161 author: String::new(),
162 icon: String::new(),
163 version: String::new(),
164 flow_types: Vec::new(),
165 runtime_flow_types: Vec::new(),
166 function_definitions: Vec::new(),
167 runtime_function_definitions: Vec::new(),
168 definition_data_types: Vec::new(),
169 configurations: Vec::new(),
170 definitions: Vec::new(),
171 }
172 }
173
174 fn definition(identifier: &str) -> ModuleDefinition {
175 ModuleDefinition {
176 flow_type_identifier: vec![identifier.to_string()],
177 value: None,
178 }
179 }
180
181 #[test]
182 fn appends_appendix_definitions_to_all_modules_with_same_identifier() {
183 let mut modules = vec![module("shared"), module("other"), module("shared")];
184 let appendices = vec![ModuleDefinitionAppendix {
185 module_identifier: "shared".to_string(),
186 definitions: vec![definition("flow")],
187 }];
188
189 append_definitions_to_matching_modules(&mut modules, appendices);
190
191 assert_eq!(modules[0].definitions.len(), 1);
192 assert_eq!(modules[1].definitions.len(), 0);
193 assert_eq!(modules[2].definitions.len(), 1);
194 }
195
196 #[test]
197 fn appends_multiple_matching_appendices() {
198 let mut modules = vec![module("shared")];
199 let appendices = vec![
200 ModuleDefinitionAppendix {
201 module_identifier: "shared".to_string(),
202 definitions: vec![definition("flow-a")],
203 },
204 ModuleDefinitionAppendix {
205 module_identifier: "shared".to_string(),
206 definitions: vec![definition("flow-b")],
207 },
208 ModuleDefinitionAppendix {
209 module_identifier: "other".to_string(),
210 definitions: vec![definition("flow-c")],
211 },
212 ];
213
214 append_definitions_to_matching_modules(&mut modules, appendices);
215
216 assert_eq!(modules[0].definitions.len(), 2);
217 assert_eq!(
218 modules[0].definitions[0].flow_type_identifier,
219 vec!["flow-a".to_string()]
220 );
221 assert_eq!(
222 modules[0].definitions[1].flow_type_identifier,
223 vec!["flow-b".to_string()]
224 );
225 }
226}