Skip to main content

code0_flow/flow_service/
mod.rs

1use crate::{
2    flow_definition::Reader,
3    flow_service::{auth::get_authorization_metadata, retry::create_channel_with_retry},
4};
5use tonic::{Extensions, Request, transport::Channel};
6use tucana::{
7    aquila::{
8        DataTypeUpdateRequest, FlowTypeUpdateRequest, FunctionDefinitionUpdateRequest,
9        RuntimeFunctionDefinitionUpdateRequest, data_type_service_client::DataTypeServiceClient,
10        flow_type_service_client::FlowTypeServiceClient,
11        function_definition_service_client::FunctionDefinitionServiceClient,
12        runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
13    },
14    shared::{
15        DefinitionDataType as DataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition,
16    },
17};
18
19pub mod auth;
20pub mod retry;
21
22pub struct FlowUpdateService {
23    data_types: Vec<DataType>,
24    runtime_functions: Vec<RuntimeFunctionDefinition>,
25    functions: Vec<FunctionDefinition>,
26    flow_types: Vec<FlowType>,
27    channel: Channel,
28    aquila_token: String,
29}
30
31impl FlowUpdateService {
32    /// Create a new FlowUpdateService instance from an Aquila URL and a definition path.
33    ///
34    /// This will read the definition files from the given path and initialize the service with the data types, runtime function definitions, function definitions, and flow types.
35    pub async fn from_url(aquila_url: String, definition_path: &str, aquila_token: String) -> Self {
36        let mut data_types = Vec::new();
37        let mut runtime_functions = Vec::new();
38        let mut functions = Vec::new();
39        let mut flow_types = Vec::new();
40
41        let reader = Reader::configure(definition_path.to_string(), true, vec![], None);
42
43        let features = match reader.read_features() {
44            Ok(features) => features,
45            Err(error) => {
46                log::error!("Error occurred while reading definitions: {:?}", error);
47                panic!("Error occurred while reading definitions")
48            }
49        };
50
51        for feature in features {
52            data_types.append(&mut feature.data_types.clone());
53            flow_types.append(&mut feature.flow_types.clone());
54            runtime_functions.append(&mut feature.runtime_functions.clone());
55            functions.append(&mut feature.functions.clone());
56        }
57
58        let channel = create_channel_with_retry("Aquila", aquila_url).await;
59
60        Self {
61            data_types,
62            runtime_functions,
63            functions,
64            flow_types,
65            channel,
66            aquila_token,
67        }
68    }
69
70    pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
71        self.flow_types = flow_types;
72        self
73    }
74
75    pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
76        self.data_types = data_types;
77        self
78    }
79
80    pub fn with_runtime_functions(
81        mut self,
82        runtime_functions: Vec<RuntimeFunctionDefinition>,
83    ) -> Self {
84        self.runtime_functions = runtime_functions;
85        self
86    }
87
88    pub fn with_functions(mut self, functions: Vec<FunctionDefinition>) -> Self {
89        self.functions = functions;
90        self
91    }
92
93    pub async fn send(&self) {
94        self.update_data_types().await;
95        self.update_runtime_functions().await;
96        self.update_functions().await;
97        self.update_flow_types().await;
98    }
99
100    async fn update_data_types(&self) {
101        if self.data_types.is_empty() {
102            log::info!("No DataTypes present.");
103            return;
104        }
105
106        log::info!("Updating {} DataTypes.", self.data_types.len());
107        let mut client = DataTypeServiceClient::new(self.channel.clone());
108        let request = Request::from_parts(
109            get_authorization_metadata(&self.aquila_token),
110            Extensions::new(),
111            DataTypeUpdateRequest {
112                data_types: self.data_types.clone(),
113            },
114        );
115
116        match client.update(request).await {
117            Ok(response) => {
118                log::info!(
119                    "Was the update of the DataTypes accepted by Sagittarius? {}",
120                    response.into_inner().success
121                );
122            }
123            Err(err) => {
124                log::error!("Failed to update data types: {:?}", err);
125            }
126        }
127    }
128
129    async fn update_functions(&self) {
130        if self.functions.is_empty() {
131            log::info!("No FunctionDefinitions present.");
132            return;
133        }
134
135        log::info!("Updating {} FunctionDefinitions.", self.functions.len());
136        let mut client = FunctionDefinitionServiceClient::new(self.channel.clone());
137        let request = Request::from_parts(
138            get_authorization_metadata(&self.aquila_token),
139            Extensions::new(),
140            FunctionDefinitionUpdateRequest {
141                functions: self.functions.clone(),
142            },
143        );
144
145        match client.update(request).await {
146            Ok(response) => {
147                log::info!(
148                    "Was the update of the FunctionDefinitions accepted by Sagittarius? {}",
149                    response.into_inner().success
150                );
151            }
152            Err(err) => {
153                log::error!("Failed to update function definitions: {:?}", err);
154            }
155        }
156    }
157
158    async fn update_runtime_functions(&self) {
159        if self.runtime_functions.is_empty() {
160            log::info!("No RuntimeFunctionDefinitions present.");
161            return;
162        }
163
164        log::info!(
165            "Updating {} RuntimeFunctionDefinitions.",
166            self.runtime_functions.len()
167        );
168        let mut client = RuntimeFunctionDefinitionServiceClient::new(self.channel.clone());
169        let request = Request::from_parts(
170            get_authorization_metadata(&self.aquila_token),
171            Extensions::new(),
172            RuntimeFunctionDefinitionUpdateRequest {
173                runtime_functions: self.runtime_functions.clone(),
174            },
175        );
176
177        match client.update(request).await {
178            Ok(response) => {
179                log::info!(
180                    "Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
181                    response.into_inner().success
182                );
183            }
184            Err(err) => {
185                log::error!("Failed to update runtime function definitions: {:?}", err);
186            }
187        }
188    }
189
190    async fn update_flow_types(&self) {
191        if self.flow_types.is_empty() {
192            log::info!("No FlowTypes present.");
193            return;
194        }
195
196        log::info!("Updating {} FlowTypes.", self.flow_types.len());
197        let mut client = FlowTypeServiceClient::new(self.channel.clone());
198        let request = Request::from_parts(
199            get_authorization_metadata(&self.aquila_token),
200            Extensions::new(),
201            FlowTypeUpdateRequest {
202                flow_types: self.flow_types.clone(),
203            },
204        );
205
206        match client.update(request).await {
207            Ok(response) => {
208                log::info!(
209                    "Was the update of the FlowTypes accepted by Sagittarius? {}",
210                    response.into_inner().success
211                );
212            }
213            Err(err) => {
214                log::error!("Failed to update flow types: {:?}", err);
215            }
216        }
217    }
218}