Skip to main content

code0_flow/flow_service/
mod.rs

1use crate::{
2    flow_definition::Reader,
3    flow_service::{auth::get_authorization_metadata, retry::create_channel_with_retry},
4};
5use tonic::{Extensions, Request, transport::Channel};
6use tucana::{
7    aquila::{
8        DataTypeUpdateRequest, FlowTypeUpdateRequest, FunctionDefinitionUpdateRequest,
9        RuntimeFunctionDefinitionUpdateRequest, data_type_service_client::DataTypeServiceClient,
10        flow_type_service_client::FlowTypeServiceClient,
11        function_definition_service_client::FunctionDefinitionServiceClient,
12        runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
13    },
14    shared::{
15        DefinitionDataType as DataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition,
16    },
17};
18
19pub mod auth;
20pub mod retry;
21
22pub struct FlowUpdateService {
23    data_types: Vec<DataType>,
24    runtime_functions: Vec<RuntimeFunctionDefinition>,
25    functions: Vec<FunctionDefinition>,
26    flow_types: Vec<FlowType>,
27    channel: Channel,
28    aquila_token: String,
29}
30
31impl FlowUpdateService {
32    /// Create a new FlowUpdateService instance from an Aquila URL and a definition path.
33    ///
34    /// This will read the definition files from the given path and initialize the service with the data types, runtime function definitions, function definitions, and flow types.
35    pub async fn from_url(aquila_url: String, definition_path: &str, aquila_token: String) -> Self {
36        let mut data_types = Vec::new();
37        let mut runtime_functions = Vec::new();
38        let mut functions = Vec::new();
39        let mut flow_types = Vec::new();
40
41        let reader = Reader::configure(definition_path.to_string(), true, vec![], None);
42
43        let features = match reader.read_features() {
44            Ok(features) => features,
45            Err(error) => {
46                log::error!("Error occurred while reading definitions: {:?}", error);
47                panic!("Error occurred while reading definitions")
48            }
49        };
50
51        for feature in features {
52            data_types.append(&mut feature.data_types.clone());
53            flow_types.append(&mut feature.flow_types.clone());
54            runtime_functions.append(&mut feature.runtime_functions.clone());
55            functions.append(&mut feature.functions.clone());
56        }
57
58        let channel = create_channel_with_retry("Aquila", aquila_url).await;
59
60        Self {
61            data_types,
62            runtime_functions,
63            functions,
64            flow_types,
65            channel,
66            aquila_token,
67        }
68    }
69
70    pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
71        self.flow_types = flow_types;
72        self
73    }
74
75    pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
76        self.data_types = data_types;
77        self
78    }
79
80    pub fn with_runtime_functions(
81        mut self,
82        runtime_functions: Vec<RuntimeFunctionDefinition>,
83    ) -> Self {
84        self.runtime_functions = runtime_functions;
85        self
86    }
87
88    pub fn with_functions(mut self, functions: Vec<FunctionDefinition>) -> Self {
89        self.functions = functions;
90        self
91    }
92
93    pub async fn send(&self) {
94        let _ = self.send_with_status().await;
95    }
96
97    pub async fn send_with_status(&self) -> bool {
98        let data_types_success = self.update_data_types().await;
99        let runtime_functions_success = self.update_runtime_functions().await;
100        let functions_success = self.update_functions().await;
101        let flow_types_success = self.update_flow_types().await;
102        data_types_success && runtime_functions_success && functions_success && flow_types_success
103    }
104
105    async fn update_data_types(&self) -> bool {
106        if self.data_types.is_empty() {
107            log::info!("No DataTypes present.");
108            return true;
109        }
110
111        log::info!("Updating {} DataTypes.", self.data_types.len());
112        let mut client = DataTypeServiceClient::new(self.channel.clone());
113        let request = Request::from_parts(
114            get_authorization_metadata(&self.aquila_token),
115            Extensions::new(),
116            DataTypeUpdateRequest {
117                data_types: self.data_types.clone(),
118            },
119        );
120
121        match client.update(request).await {
122            Ok(response) => {
123                let res = response.into_inner();
124                log::info!(
125                    "Was the update of the DataTypes accepted by Sagittarius? {}",
126                    res.success
127                );
128
129                res.success
130            }
131            Err(err) => {
132                log::error!("Failed to update data types: {:?}", err);
133                false
134            }
135        }
136    }
137
138    async fn update_functions(&self) -> bool {
139        if self.functions.is_empty() {
140            log::info!("No FunctionDefinitions present.");
141            return true;
142        }
143
144        log::info!("Updating {} FunctionDefinitions.", self.functions.len());
145        let mut client = FunctionDefinitionServiceClient::new(self.channel.clone());
146        let request = Request::from_parts(
147            get_authorization_metadata(&self.aquila_token),
148            Extensions::new(),
149            FunctionDefinitionUpdateRequest {
150                functions: self.functions.clone(),
151            },
152        );
153
154        match client.update(request).await {
155            Ok(response) => {
156                let res = response.into_inner();
157                log::info!(
158                    "Was the update of the FunctionDefinitions accepted by Sagittarius? {}",
159                    res.success
160                );
161                res.success
162            }
163            Err(err) => {
164                log::error!("Failed to update function definitions: {:?}", err);
165                false
166            }
167        }
168    }
169
170    async fn update_runtime_functions(&self) -> bool {
171        if self.runtime_functions.is_empty() {
172            log::info!("No RuntimeFunctionDefinitions present.");
173            return true;
174        }
175
176        log::info!(
177            "Updating {} RuntimeFunctionDefinitions.",
178            self.runtime_functions.len()
179        );
180        let mut client = RuntimeFunctionDefinitionServiceClient::new(self.channel.clone());
181        let request = Request::from_parts(
182            get_authorization_metadata(&self.aquila_token),
183            Extensions::new(),
184            RuntimeFunctionDefinitionUpdateRequest {
185                runtime_functions: self.runtime_functions.clone(),
186            },
187        );
188
189        match client.update(request).await {
190            Ok(response) => {
191                let res = response.into_inner();
192                log::info!(
193                    "Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
194                    res.success
195                );
196                res.success
197            }
198            Err(err) => {
199                log::error!("Failed to update runtime function definitions: {:?}", err);
200                false
201            }
202        }
203    }
204
205    async fn update_flow_types(&self) -> bool {
206        if self.flow_types.is_empty() {
207            log::info!("No FlowTypes present.");
208            return true;
209        }
210
211        log::info!("Updating {} FlowTypes.", self.flow_types.len());
212        let mut client = FlowTypeServiceClient::new(self.channel.clone());
213        let request = Request::from_parts(
214            get_authorization_metadata(&self.aquila_token),
215            Extensions::new(),
216            FlowTypeUpdateRequest {
217                flow_types: self.flow_types.clone(),
218            },
219        );
220
221        match client.update(request).await {
222            Ok(response) => {
223                let res = response.into_inner();
224                log::info!(
225                    "Was the update of the FlowTypes accepted by Sagittarius? {}",
226                    res.success
227                );
228                res.success
229            }
230            Err(err) => {
231                log::error!("Failed to update flow types: {:?}", err);
232                false
233            }
234        }
235    }
236}