code0_flow/flow_service/
mod.rs1use crate::{
2 flow_definition::Reader,
3 flow_service::{auth::get_authorization_metadata, retry::create_channel_with_retry},
4};
5use tonic::{Extensions, Request, transport::Channel};
6use tucana::{
7 aquila::{
8 DataTypeUpdateRequest, FlowTypeUpdateRequest, FunctionDefinitionUpdateRequest,
9 RuntimeFunctionDefinitionUpdateRequest, data_type_service_client::DataTypeServiceClient,
10 flow_type_service_client::FlowTypeServiceClient,
11 function_definition_service_client::FunctionDefinitionServiceClient,
12 runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
13 },
14 shared::{
15 DefinitionDataType as DataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition,
16 },
17};
18
19pub mod auth;
20pub mod retry;
21
22pub struct FlowUpdateService {
23 data_types: Vec<DataType>,
24 runtime_functions: Vec<RuntimeFunctionDefinition>,
25 functions: Vec<FunctionDefinition>,
26 flow_types: Vec<FlowType>,
27 channel: Channel,
28 aquila_token: String,
29}
30
31impl FlowUpdateService {
32 pub async fn from_url(aquila_url: String, definition_path: &str, aquila_token: String) -> Self {
36 let mut data_types = Vec::new();
37 let mut runtime_functions = Vec::new();
38 let mut functions = Vec::new();
39 let mut flow_types = Vec::new();
40
41 let reader = Reader::configure(definition_path.to_string(), true, vec![], None);
42
43 let features = match reader.read_features() {
44 Ok(features) => features,
45 Err(error) => {
46 log::error!("Error occurred while reading definitions: {:?}", error);
47 panic!("Error occurred while reading definitions")
48 }
49 };
50
51 for feature in features {
52 data_types.append(&mut feature.data_types.clone());
53 flow_types.append(&mut feature.flow_types.clone());
54 runtime_functions.append(&mut feature.runtime_functions.clone());
55 functions.append(&mut feature.functions.clone());
56 }
57
58 let channel = create_channel_with_retry("Aquila", aquila_url).await;
59
60 Self {
61 data_types,
62 runtime_functions,
63 functions,
64 flow_types,
65 channel,
66 aquila_token,
67 }
68 }
69
70 pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
71 self.flow_types = flow_types;
72 self
73 }
74
75 pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
76 self.data_types = data_types;
77 self
78 }
79
80 pub fn with_runtime_functions(
81 mut self,
82 runtime_functions: Vec<RuntimeFunctionDefinition>,
83 ) -> Self {
84 self.runtime_functions = runtime_functions;
85 self
86 }
87
88 pub fn with_functions(mut self, functions: Vec<FunctionDefinition>) -> Self {
89 self.functions = functions;
90 self
91 }
92
93 pub async fn send(&self) {
94 self.update_data_types().await;
95 self.update_runtime_functions().await;
96 self.update_functions().await;
97 self.update_flow_types().await;
98 }
99
100 async fn update_data_types(&self) {
101 if self.data_types.is_empty() {
102 log::info!("No DataTypes present.");
103 return;
104 }
105
106 log::info!("Updating {} DataTypes.", self.data_types.len());
107 let mut client = DataTypeServiceClient::new(self.channel.clone());
108 let request = Request::from_parts(
109 get_authorization_metadata(&self.aquila_token),
110 Extensions::new(),
111 DataTypeUpdateRequest {
112 data_types: self.data_types.clone(),
113 },
114 );
115
116 match client.update(request).await {
117 Ok(response) => {
118 log::info!(
119 "Was the update of the DataTypes accepted by Sagittarius? {}",
120 response.into_inner().success
121 );
122 }
123 Err(err) => {
124 log::error!("Failed to update data types: {:?}", err);
125 }
126 }
127 }
128
129 async fn update_functions(&self) {
130 if self.functions.is_empty() {
131 log::info!("No FunctionDefinitions present.");
132 return;
133 }
134
135 log::info!("Updating {} FunctionDefinitions.", self.functions.len());
136 let mut client = FunctionDefinitionServiceClient::new(self.channel.clone());
137 let request = Request::from_parts(
138 get_authorization_metadata(&self.aquila_token),
139 Extensions::new(),
140 FunctionDefinitionUpdateRequest {
141 functions: self.functions.clone(),
142 },
143 );
144
145 match client.update(request).await {
146 Ok(response) => {
147 log::info!(
148 "Was the update of the FunctionDefinitions accepted by Sagittarius? {}",
149 response.into_inner().success
150 );
151 }
152 Err(err) => {
153 log::error!("Failed to update function definitions: {:?}", err);
154 }
155 }
156 }
157
158 async fn update_runtime_functions(&self) {
159 if self.runtime_functions.is_empty() {
160 log::info!("No RuntimeFunctionDefinitions present.");
161 return;
162 }
163
164 log::info!(
165 "Updating {} RuntimeFunctionDefinitions.",
166 self.runtime_functions.len()
167 );
168 let mut client = RuntimeFunctionDefinitionServiceClient::new(self.channel.clone());
169 let request = Request::from_parts(
170 get_authorization_metadata(&self.aquila_token),
171 Extensions::new(),
172 RuntimeFunctionDefinitionUpdateRequest {
173 runtime_functions: self.runtime_functions.clone(),
174 },
175 );
176
177 match client.update(request).await {
178 Ok(response) => {
179 log::info!(
180 "Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
181 response.into_inner().success
182 );
183 }
184 Err(err) => {
185 log::error!("Failed to update runtime function definitions: {:?}", err);
186 }
187 }
188 }
189
190 async fn update_flow_types(&self) {
191 if self.flow_types.is_empty() {
192 log::info!("No FlowTypes present.");
193 return;
194 }
195
196 log::info!("Updating {} FlowTypes.", self.flow_types.len());
197 let mut client = FlowTypeServiceClient::new(self.channel.clone());
198 let request = Request::from_parts(
199 get_authorization_metadata(&self.aquila_token),
200 Extensions::new(),
201 FlowTypeUpdateRequest {
202 flow_types: self.flow_types.clone(),
203 },
204 );
205
206 match client.update(request).await {
207 Ok(response) => {
208 log::info!(
209 "Was the update of the FlowTypes accepted by Sagittarius? {}",
210 response.into_inner().success
211 );
212 }
213 Err(err) => {
214 log::error!("Failed to update flow types: {:?}", err);
215 }
216 }
217 }
218}