Skip to main content

code0_flow/flow_service/
mod.rs

1use crate::{
2    flow_definition::Reader,
3    flow_service::{auth::get_authorization_metadata, retry::create_channel_with_retry},
4};
5use tonic::{Extensions, Request, transport::Channel};
6use tucana::{
7    aquila::{
8        DataTypeUpdateRequest, FlowTypeUpdateRequest, FunctionDefinitionUpdateRequest,
9        RuntimeFunctionDefinitionUpdateRequest, data_type_service_client::DataTypeServiceClient,
10        flow_type_service_client::FlowTypeServiceClient,
11        function_definition_service_client::FunctionDefinitionServiceClient,
12        runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
13    },
14    shared::{
15        DefinitionDataType as DataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition,
16    },
17};
18
19pub mod auth;
20pub mod retry;
21
22pub struct FlowUpdateService {
23    data_types: Vec<DataType>,
24    runtime_functions: Vec<RuntimeFunctionDefinition>,
25    functions: Vec<FunctionDefinition>,
26    flow_types: Vec<FlowType>,
27    channel: Channel,
28    aquila_token: String,
29    definition_source: Option<String>,
30}
31
32impl FlowUpdateService {
33    /// Create a new FlowUpdateService instance from an Aquila URL and a definition path.
34    ///
35    /// This will read the definition files from the given path and initialize the service with the data types, runtime function definitions, function definitions, and flow types.
36    pub async fn from_url(aquila_url: String, definition_path: &str, aquila_token: String) -> Self {
37        let mut data_types = Vec::new();
38        let mut runtime_functions = Vec::new();
39        let mut functions = Vec::new();
40        let mut flow_types = Vec::new();
41
42        let reader = Reader::configure(definition_path.to_string(), true, vec![], None);
43
44        let features = match reader.read_features() {
45            Ok(features) => features,
46            Err(error) => {
47                log::error!("Error occurred while reading definitions: {:?}", error);
48                panic!("Error occurred while reading definitions")
49            }
50        };
51
52        for feature in features {
53            data_types.append(&mut feature.data_types.clone());
54            flow_types.append(&mut feature.flow_types.clone());
55            runtime_functions.append(&mut feature.runtime_functions.clone());
56            functions.append(&mut feature.functions.clone());
57        }
58
59        let channel = create_channel_with_retry("Aquila", aquila_url).await;
60
61        Self {
62            data_types,
63            runtime_functions,
64            functions,
65            flow_types,
66            channel,
67            aquila_token,
68            definition_source: None,
69        }
70    }
71
72    pub fn with_definition_source(mut self, source: String) -> Self {
73        self.definition_source = Some(source);
74        self
75    }
76
77    pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
78        self.flow_types = flow_types;
79        self
80    }
81
82    pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
83        self.data_types = data_types;
84        self
85    }
86
87    pub fn with_runtime_functions(
88        mut self,
89        runtime_functions: Vec<RuntimeFunctionDefinition>,
90    ) -> Self {
91        self.runtime_functions = runtime_functions;
92        self
93    }
94
95    pub fn with_functions(mut self, functions: Vec<FunctionDefinition>) -> Self {
96        self.functions = functions;
97        self
98    }
99
100    pub async fn send(&mut self) {
101        let _ = self.send_with_status().await;
102    }
103
104    pub async fn send_with_status(&mut self) -> bool {
105        let data_types_success = self.update_data_types().await;
106        let runtime_functions_success = self.update_runtime_functions().await;
107        let functions_success = self.update_functions().await;
108        let flow_types_success = self.update_flow_types().await;
109        data_types_success && runtime_functions_success && functions_success && flow_types_success
110    }
111
112    async fn update_data_types(&mut self) -> bool {
113        if self.data_types.is_empty() {
114            log::info!("No DataTypes present.");
115            return true;
116        }
117
118        if let Some(source) = &self.definition_source {
119            for data_type in self.data_types.iter_mut() {
120                data_type.definition_source = source.to_string();
121            }
122        }
123
124        log::info!("Updating {} DataTypes.", self.data_types.len());
125        let mut client = DataTypeServiceClient::new(self.channel.clone());
126        let request = Request::from_parts(
127            get_authorization_metadata(&self.aquila_token),
128            Extensions::new(),
129            DataTypeUpdateRequest {
130                data_types: self.data_types.clone(),
131            },
132        );
133
134        match client.update(request).await {
135            Ok(response) => {
136                let res = response.into_inner();
137                log::info!(
138                    "Was the update of the DataTypes accepted by Sagittarius? {}",
139                    res.success
140                );
141
142                res.success
143            }
144            Err(err) => {
145                log::error!("Failed to update data types: {:?}", err);
146                false
147            }
148        }
149    }
150
151    async fn update_functions(&mut self) -> bool {
152        if self.functions.is_empty() {
153            log::info!("No FunctionDefinitions present.");
154            return true;
155        }
156
157        if let Some(source) = &self.definition_source {
158            for function in self.functions.iter_mut() {
159                function.definition_source = source.to_string();
160            }
161        };
162
163        log::info!("Updating {} FunctionDefinitions.", self.functions.len());
164        let mut client = FunctionDefinitionServiceClient::new(self.channel.clone());
165        let request = Request::from_parts(
166            get_authorization_metadata(&self.aquila_token),
167            Extensions::new(),
168            FunctionDefinitionUpdateRequest {
169                functions: self.functions.clone(),
170            },
171        );
172
173        match client.update(request).await {
174            Ok(response) => {
175                let res = response.into_inner();
176                log::info!(
177                    "Was the update of the FunctionDefinitions accepted by Sagittarius? {}",
178                    res.success
179                );
180                res.success
181            }
182            Err(err) => {
183                log::error!("Failed to update function definitions: {:?}", err);
184                false
185            }
186        }
187    }
188
189    async fn update_runtime_functions(&mut self) -> bool {
190        if self.runtime_functions.is_empty() {
191            log::info!("No RuntimeFunctionDefinitions present.");
192            return true;
193        }
194
195        if let Some(source) = &self.definition_source {
196            for runtime_function in self.runtime_functions.iter_mut() {
197                runtime_function.definition_source = source.to_string();
198            }
199        }
200
201        log::info!(
202            "Updating {} RuntimeFunctionDefinitions.",
203            self.runtime_functions.len()
204        );
205        let mut client = RuntimeFunctionDefinitionServiceClient::new(self.channel.clone());
206        let request = Request::from_parts(
207            get_authorization_metadata(&self.aquila_token),
208            Extensions::new(),
209            RuntimeFunctionDefinitionUpdateRequest {
210                runtime_functions: self.runtime_functions.clone(),
211            },
212        );
213
214        match client.update(request).await {
215            Ok(response) => {
216                let res = response.into_inner();
217                log::info!(
218                    "Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
219                    res.success
220                );
221                res.success
222            }
223            Err(err) => {
224                log::error!("Failed to update runtime function definitions: {:?}", err);
225                false
226            }
227        }
228    }
229
230    async fn update_flow_types(&mut self) -> bool {
231        if self.flow_types.is_empty() {
232            log::info!("No FlowTypes present.");
233            return true;
234        }
235
236        if let Some(source) = &self.definition_source {
237            for flow_type in self.flow_types.iter_mut() {
238                flow_type.definition_source = Some(source.to_string());
239            }
240        }
241
242        log::info!("Updating {} FlowTypes.", self.flow_types.len());
243        let mut client = FlowTypeServiceClient::new(self.channel.clone());
244        let request = Request::from_parts(
245            get_authorization_metadata(&self.aquila_token),
246            Extensions::new(),
247            FlowTypeUpdateRequest {
248                flow_types: self.flow_types.clone(),
249            },
250        );
251
252        match client.update(request).await {
253            Ok(response) => {
254                let res = response.into_inner();
255                log::info!(
256                    "Was the update of the FlowTypes accepted by Sagittarius? {}",
257                    res.success
258                );
259                res.success
260            }
261            Err(err) => {
262                log::error!("Failed to update flow types: {:?}", err);
263                false
264            }
265        }
266    }
267}