code0_flow/flow_service/
mod.rs

1use crate::flow_definition::Reader;
2use tucana::{
3    aquila::{
4        DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest,
5        data_type_service_client::DataTypeServiceClient,
6        flow_type_service_client::FlowTypeServiceClient,
7        runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
8    },
9    shared::{DefinitionDataType as DataType, FlowType, RuntimeFunctionDefinition},
10};
11
12pub struct FlowUpdateService {
13    aquila_url: String,
14    data_types: Vec<DataType>,
15    runtime_definitions: Vec<RuntimeFunctionDefinition>,
16    flow_types: Vec<FlowType>,
17}
18
19impl FlowUpdateService {
20    /// Create a new FlowUpdateService instance from an Aquila URL and a definition path.
21    ///
22    /// This will read the definition files from the given path and initialize the service with the data types, runtime definitions, and flow types.
23    pub fn from_url(aquila_url: String, definition_path: &str) -> Self {
24        let mut data_types = Vec::new();
25        let mut runtime_definitions = Vec::new();
26        let mut flow_types = Vec::new();
27
28        let reader = Reader::configure(definition_path.to_string(), true, vec![], None);
29
30        let features = match reader.read_features() {
31            Ok(features) => features,
32            Err(error) => {
33                log::error!("Error occurred while reading definitions: {:?}", error);
34                panic!("Error occurred while reading definitions")
35            }
36        };
37
38        for feature in features {
39            data_types.append(&mut feature.data_types.clone());
40            flow_types.append(&mut feature.flow_types.clone());
41            runtime_definitions.append(&mut feature.functions.clone());
42        }
43
44        Self {
45            aquila_url,
46            data_types,
47            runtime_definitions,
48            flow_types,
49        }
50    }
51
52    pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
53        self.flow_types = flow_types;
54        self
55    }
56
57    pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
58        self.data_types = data_types;
59        self
60    }
61
62    pub fn with_runtime_definitions(
63        mut self,
64        runtime_definitions: Vec<RuntimeFunctionDefinition>,
65    ) -> Self {
66        self.runtime_definitions = runtime_definitions;
67        self
68    }
69
70    pub async fn send(&self) {
71        self.update_data_types().await;
72        self.update_runtime_definitions().await;
73        self.update_flow_types().await;
74    }
75
76    async fn update_data_types(&self) {
77        if self.data_types.is_empty() {
78            log::info!("No data types to update");
79            return;
80        }
81
82        log::info!("Updating the current DataTypes!");
83        let mut client = match DataTypeServiceClient::connect(self.aquila_url.clone()).await {
84            Ok(client) => {
85                log::info!("Successfully connected to the DataTypeService");
86                client
87            }
88            Err(err) => {
89                log::error!("Failed to connect to the DataTypeService: {:?}", err);
90                return;
91            }
92        };
93
94        let request = DataTypeUpdateRequest {
95            data_types: self.data_types.clone(),
96        };
97
98        match client.update(request).await {
99            Ok(response) => {
100                log::info!(
101                    "Was the update of the DataTypes accepted by Sagittarius? {}",
102                    response.into_inner().success
103                );
104            }
105            Err(err) => {
106                log::error!("Failed to update data types: {:?}", err);
107            }
108        }
109    }
110
111    async fn update_runtime_definitions(&self) {
112        if self.runtime_definitions.is_empty() {
113            log::info!("No runtime definitions to update");
114            return;
115        }
116
117        log::info!("Updating the current RuntimeDefinitions!");
118        let mut client =
119            match RuntimeFunctionDefinitionServiceClient::connect(self.aquila_url.clone()).await {
120                Ok(client) => {
121                    log::info!("Connected to RuntimeFunctionDefinitionService");
122                    client
123                }
124                Err(err) => {
125                    log::error!(
126                        "Failed to connect to RuntimeFunctionDefinitionService: {:?}",
127                        err
128                    );
129                    return;
130                }
131            };
132
133        let request = RuntimeFunctionDefinitionUpdateRequest {
134            runtime_functions: self.runtime_definitions.clone(),
135        };
136
137        match client.update(request).await {
138            Ok(response) => {
139                log::info!(
140                    "Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
141                    response.into_inner().success
142                );
143            }
144            Err(err) => {
145                log::error!("Failed to update runtime function definitions: {:?}", err);
146            }
147        }
148    }
149
150    async fn update_flow_types(&self) {
151        if self.flow_types.is_empty() {
152            log::info!("No FlowTypes to update!");
153            return;
154        }
155
156        log::info!("Updating the current FlowTypes!");
157        let mut client = match FlowTypeServiceClient::connect(self.aquila_url.clone()).await {
158            Ok(client) => {
159                log::info!("Connected to FlowTypeService!");
160                client
161            }
162            Err(err) => {
163                log::error!("Failed to connect to FlowTypeService: {:?}", err);
164                return;
165            }
166        };
167
168        let request = FlowTypeUpdateRequest {
169            flow_types: self.flow_types.clone(),
170        };
171
172        match client.update(request).await {
173            Ok(response) => {
174                log::info!(
175                    "Was the update of the FlowTypes accepted by Sagittarius? {}",
176                    response.into_inner().success
177                );
178            }
179            Err(err) => {
180                log::error!("Failed to update flow types: {:?}", err);
181            }
182        }
183    }
184}