code0_flow/flow_service/
mod.rs1use crate::{
2 flow_definition::Reader,
3 flow_service::{auth::get_authorization_metadata, retry::create_channel_with_retry},
4};
5use tonic::{Extensions, Request, transport::Channel};
6use tucana::{
7 aquila::{
8 DataTypeUpdateRequest, FlowTypeUpdateRequest, FunctionDefinitionUpdateRequest,
9 RuntimeFunctionDefinitionUpdateRequest, data_type_service_client::DataTypeServiceClient,
10 flow_type_service_client::FlowTypeServiceClient,
11 function_definition_service_client::FunctionDefinitionServiceClient,
12 runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
13 },
14 shared::{
15 DefinitionDataType as DataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition,
16 },
17};
18
19pub mod auth;
20pub mod retry;
21
22pub struct FlowUpdateService {
23 data_types: Vec<DataType>,
24 runtime_functions: Vec<RuntimeFunctionDefinition>,
25 functions: Vec<FunctionDefinition>,
26 flow_types: Vec<FlowType>,
27 channel: Channel,
28 aquila_token: String,
29 definition_source: Option<String>,
30}
31
32impl FlowUpdateService {
33 pub async fn from_url(aquila_url: String, definition_path: &str, aquila_token: String) -> Self {
37 let mut data_types = Vec::new();
38 let mut runtime_functions = Vec::new();
39 let mut functions = Vec::new();
40 let mut flow_types = Vec::new();
41
42 let reader = Reader::configure(definition_path.to_string(), true, vec![], None);
43
44 let features = match reader.read_features() {
45 Ok(features) => features,
46 Err(error) => {
47 log::error!("Error occurred while reading definitions: {:?}", error);
48 panic!("Error occurred while reading definitions")
49 }
50 };
51
52 for feature in features {
53 data_types.append(&mut feature.data_types.clone());
54 flow_types.append(&mut feature.flow_types.clone());
55 runtime_functions.append(&mut feature.runtime_functions.clone());
56 functions.append(&mut feature.functions.clone());
57 }
58
59 let channel = create_channel_with_retry("Aquila", aquila_url).await;
60
61 Self {
62 data_types,
63 runtime_functions,
64 functions,
65 flow_types,
66 channel,
67 aquila_token,
68 definition_source: None,
69 }
70 }
71
72 pub fn with_definition_source(mut self, source: String) -> Self {
73 self.definition_source = Some(source);
74 self
75 }
76
77 pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
78 self.flow_types = flow_types;
79 self
80 }
81
82 pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
83 self.data_types = data_types;
84 self
85 }
86
87 pub fn with_runtime_functions(
88 mut self,
89 runtime_functions: Vec<RuntimeFunctionDefinition>,
90 ) -> Self {
91 self.runtime_functions = runtime_functions;
92 self
93 }
94
95 pub fn with_functions(mut self, functions: Vec<FunctionDefinition>) -> Self {
96 self.functions = functions;
97 self
98 }
99
100 pub async fn send(&mut self) {
101 let _ = self.send_with_status().await;
102 }
103
104 pub async fn send_with_status(&mut self) -> bool {
105 let data_types_success = self.update_data_types().await;
106 let runtime_functions_success = self.update_runtime_functions().await;
107 let functions_success = self.update_functions().await;
108 let flow_types_success = self.update_flow_types().await;
109 data_types_success && runtime_functions_success && functions_success && flow_types_success
110 }
111
112 async fn update_data_types(&mut self) -> bool {
113 if self.data_types.is_empty() {
114 log::info!("No DataTypes present.");
115 return true;
116 }
117
118 if let Some(source) = &self.definition_source {
119 for data_type in self.data_types.iter_mut() {
120 data_type.definition_source = source.to_string();
121 }
122 }
123
124 log::info!("Updating {} DataTypes.", self.data_types.len());
125 let mut client = DataTypeServiceClient::new(self.channel.clone());
126 let request = Request::from_parts(
127 get_authorization_metadata(&self.aquila_token),
128 Extensions::new(),
129 DataTypeUpdateRequest {
130 data_types: self.data_types.clone(),
131 },
132 );
133
134 match client.update(request).await {
135 Ok(response) => {
136 let res = response.into_inner();
137 log::info!(
138 "Was the update of the DataTypes accepted by Sagittarius? {}",
139 res.success
140 );
141
142 res.success
143 }
144 Err(err) => {
145 log::error!("Failed to update data types: {:?}", err);
146 false
147 }
148 }
149 }
150
151 async fn update_functions(&mut self) -> bool {
152 if self.functions.is_empty() {
153 log::info!("No FunctionDefinitions present.");
154 return true;
155 }
156
157 if let Some(source) = &self.definition_source {
158 for function in self.functions.iter_mut() {
159 function.definition_source = source.to_string();
160 }
161 };
162
163 log::info!("Updating {} FunctionDefinitions.", self.functions.len());
164 let mut client = FunctionDefinitionServiceClient::new(self.channel.clone());
165 let request = Request::from_parts(
166 get_authorization_metadata(&self.aquila_token),
167 Extensions::new(),
168 FunctionDefinitionUpdateRequest {
169 functions: self.functions.clone(),
170 },
171 );
172
173 match client.update(request).await {
174 Ok(response) => {
175 let res = response.into_inner();
176 log::info!(
177 "Was the update of the FunctionDefinitions accepted by Sagittarius? {}",
178 res.success
179 );
180 res.success
181 }
182 Err(err) => {
183 log::error!("Failed to update function definitions: {:?}", err);
184 false
185 }
186 }
187 }
188
189 async fn update_runtime_functions(&mut self) -> bool {
190 if self.runtime_functions.is_empty() {
191 log::info!("No RuntimeFunctionDefinitions present.");
192 return true;
193 }
194
195 if let Some(source) = &self.definition_source {
196 for runtime_function in self.runtime_functions.iter_mut() {
197 runtime_function.definition_source = source.to_string();
198 }
199 }
200
201 log::info!(
202 "Updating {} RuntimeFunctionDefinitions.",
203 self.runtime_functions.len()
204 );
205 let mut client = RuntimeFunctionDefinitionServiceClient::new(self.channel.clone());
206 let request = Request::from_parts(
207 get_authorization_metadata(&self.aquila_token),
208 Extensions::new(),
209 RuntimeFunctionDefinitionUpdateRequest {
210 runtime_functions: self.runtime_functions.clone(),
211 },
212 );
213
214 match client.update(request).await {
215 Ok(response) => {
216 let res = response.into_inner();
217 log::info!(
218 "Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
219 res.success
220 );
221 res.success
222 }
223 Err(err) => {
224 log::error!("Failed to update runtime function definitions: {:?}", err);
225 false
226 }
227 }
228 }
229
230 async fn update_flow_types(&mut self) -> bool {
231 if self.flow_types.is_empty() {
232 log::info!("No FlowTypes present.");
233 return true;
234 }
235
236 if let Some(source) = &self.definition_source {
237 for flow_type in self.flow_types.iter_mut() {
238 flow_type.definition_source = Some(source.to_string());
239 }
240 }
241
242 log::info!("Updating {} FlowTypes.", self.flow_types.len());
243 let mut client = FlowTypeServiceClient::new(self.channel.clone());
244 let request = Request::from_parts(
245 get_authorization_metadata(&self.aquila_token),
246 Extensions::new(),
247 FlowTypeUpdateRequest {
248 flow_types: self.flow_types.clone(),
249 },
250 );
251
252 match client.update(request).await {
253 Ok(response) => {
254 let res = response.into_inner();
255 log::info!(
256 "Was the update of the FlowTypes accepted by Sagittarius? {}",
257 res.success
258 );
259 res.success
260 }
261 Err(err) => {
262 log::error!("Failed to update flow types: {:?}", err);
263 false
264 }
265 }
266 }
267}