code0_flow/flow_service/
mod.rs1use crate::{
2 flow_definition::Reader,
3 flow_service::{auth::get_authorization_metadata, retry::create_channel_with_retry},
4};
5use tonic::{Extensions, Request, transport::Channel};
6use tucana::{
7 aquila::{
8 DataTypeUpdateRequest, FlowTypeUpdateRequest, FunctionDefinitionUpdateRequest,
9 RuntimeFunctionDefinitionUpdateRequest, data_type_service_client::DataTypeServiceClient,
10 flow_type_service_client::FlowTypeServiceClient,
11 function_definition_service_client::FunctionDefinitionServiceClient,
12 runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
13 },
14 shared::{
15 DefinitionDataType as DataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition,
16 },
17};
18
19pub mod auth;
20pub mod retry;
21
22pub struct FlowUpdateService {
23 data_types: Vec<DataType>,
24 runtime_functions: Vec<RuntimeFunctionDefinition>,
25 functions: Vec<FunctionDefinition>,
26 flow_types: Vec<FlowType>,
27 channel: Channel,
28 aquila_token: String,
29}
30
31impl FlowUpdateService {
32 pub async fn from_url(aquila_url: String, definition_path: &str, aquila_token: String) -> Self {
36 let mut data_types = Vec::new();
37 let mut runtime_functions = Vec::new();
38 let mut functions = Vec::new();
39 let mut flow_types = Vec::new();
40
41 let reader = Reader::configure(definition_path.to_string(), true, vec![], None);
42
43 let features = match reader.read_features() {
44 Ok(features) => features,
45 Err(error) => {
46 log::error!("Error occurred while reading definitions: {:?}", error);
47 panic!("Error occurred while reading definitions")
48 }
49 };
50
51 for feature in features {
52 data_types.append(&mut feature.data_types.clone());
53 flow_types.append(&mut feature.flow_types.clone());
54 runtime_functions.append(&mut feature.runtime_functions.clone());
55 functions.append(&mut feature.functions.clone());
56 }
57
58 let channel = create_channel_with_retry("Aquila", aquila_url).await;
59
60 Self {
61 data_types,
62 runtime_functions,
63 functions,
64 flow_types,
65 channel,
66 aquila_token,
67 }
68 }
69
70 pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
71 self.flow_types = flow_types;
72 self
73 }
74
75 pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
76 self.data_types = data_types;
77 self
78 }
79
80 pub fn with_runtime_functions(
81 mut self,
82 runtime_functions: Vec<RuntimeFunctionDefinition>,
83 ) -> Self {
84 self.runtime_functions = runtime_functions;
85 self
86 }
87
88 pub fn with_functions(mut self, functions: Vec<FunctionDefinition>) -> Self {
89 self.functions = functions;
90 self
91 }
92
93 pub async fn send(&self) {
94 let _ = self.send_with_status().await;
95 }
96
97 pub async fn send_with_status(&self) -> bool {
98 let data_types_success = self.update_data_types().await;
99 let runtime_functions_success = self.update_runtime_functions().await;
100 let functions_success = self.update_functions().await;
101 let flow_types_success = self.update_flow_types().await;
102 data_types_success && runtime_functions_success && functions_success && flow_types_success
103 }
104
105 async fn update_data_types(&self) -> bool {
106 if self.data_types.is_empty() {
107 log::info!("No DataTypes present.");
108 return true;
109 }
110
111 log::info!("Updating {} DataTypes.", self.data_types.len());
112 let mut client = DataTypeServiceClient::new(self.channel.clone());
113 let request = Request::from_parts(
114 get_authorization_metadata(&self.aquila_token),
115 Extensions::new(),
116 DataTypeUpdateRequest {
117 data_types: self.data_types.clone(),
118 },
119 );
120
121 match client.update(request).await {
122 Ok(response) => {
123 let res = response.into_inner();
124 log::info!(
125 "Was the update of the DataTypes accepted by Sagittarius? {}",
126 res.success
127 );
128
129 res.success
130 }
131 Err(err) => {
132 log::error!("Failed to update data types: {:?}", err);
133 false
134 }
135 }
136 }
137
138 async fn update_functions(&self) -> bool {
139 if self.functions.is_empty() {
140 log::info!("No FunctionDefinitions present.");
141 return true;
142 }
143
144 log::info!("Updating {} FunctionDefinitions.", self.functions.len());
145 let mut client = FunctionDefinitionServiceClient::new(self.channel.clone());
146 let request = Request::from_parts(
147 get_authorization_metadata(&self.aquila_token),
148 Extensions::new(),
149 FunctionDefinitionUpdateRequest {
150 functions: self.functions.clone(),
151 },
152 );
153
154 match client.update(request).await {
155 Ok(response) => {
156 let res = response.into_inner();
157 log::info!(
158 "Was the update of the FunctionDefinitions accepted by Sagittarius? {}",
159 res.success
160 );
161 res.success
162 }
163 Err(err) => {
164 log::error!("Failed to update function definitions: {:?}", err);
165 false
166 }
167 }
168 }
169
170 async fn update_runtime_functions(&self) -> bool {
171 if self.runtime_functions.is_empty() {
172 log::info!("No RuntimeFunctionDefinitions present.");
173 return true;
174 }
175
176 log::info!(
177 "Updating {} RuntimeFunctionDefinitions.",
178 self.runtime_functions.len()
179 );
180 let mut client = RuntimeFunctionDefinitionServiceClient::new(self.channel.clone());
181 let request = Request::from_parts(
182 get_authorization_metadata(&self.aquila_token),
183 Extensions::new(),
184 RuntimeFunctionDefinitionUpdateRequest {
185 runtime_functions: self.runtime_functions.clone(),
186 },
187 );
188
189 match client.update(request).await {
190 Ok(response) => {
191 let res = response.into_inner();
192 log::info!(
193 "Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
194 res.success
195 );
196 res.success
197 }
198 Err(err) => {
199 log::error!("Failed to update runtime function definitions: {:?}", err);
200 false
201 }
202 }
203 }
204
205 async fn update_flow_types(&self) -> bool {
206 if self.flow_types.is_empty() {
207 log::info!("No FlowTypes present.");
208 return true;
209 }
210
211 log::info!("Updating {} FlowTypes.", self.flow_types.len());
212 let mut client = FlowTypeServiceClient::new(self.channel.clone());
213 let request = Request::from_parts(
214 get_authorization_metadata(&self.aquila_token),
215 Extensions::new(),
216 FlowTypeUpdateRequest {
217 flow_types: self.flow_types.clone(),
218 },
219 );
220
221 match client.update(request).await {
222 Ok(response) => {
223 let res = response.into_inner();
224 log::info!(
225 "Was the update of the FlowTypes accepted by Sagittarius? {}",
226 res.success
227 );
228 res.success
229 }
230 Err(err) => {
231 log::error!("Failed to update flow types: {:?}", err);
232 false
233 }
234 }
235 }
236}