code0_flow/flow_service/
mod.rs1use crate::flow_definition::Reader;
2use tucana::{
3 aquila::{
4 DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest,
5 data_type_service_client::DataTypeServiceClient,
6 flow_type_service_client::FlowTypeServiceClient,
7 runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
8 },
9 shared::{DefinitionDataType as DataType, FlowType, RuntimeFunctionDefinition},
10};
11
12pub struct FlowUpdateService {
13 aquila_url: String,
14 data_types: Vec<DataType>,
15 runtime_definitions: Vec<RuntimeFunctionDefinition>,
16 flow_types: Vec<FlowType>,
17}
18
19impl FlowUpdateService {
20 pub fn from_url(aquila_url: String, definition_path: &str) -> Self {
24 let mut data_types = Vec::new();
25 let mut runtime_definitions = Vec::new();
26 let mut flow_types = Vec::new();
27
28 let reader = Reader::configure(definition_path.to_string(), true, vec![], None);
29
30 let features = match reader.read_features() {
31 Ok(features) => features,
32 Err(error) => {
33 log::error!("Error occurred while reading definitions: {:?}", error);
34 panic!("Error occurred while reading definitions")
35 }
36 };
37
38 for feature in features {
39 data_types.append(&mut feature.data_types.clone());
40 flow_types.append(&mut feature.flow_types.clone());
41 runtime_definitions.append(&mut feature.functions.clone());
42 }
43
44 Self {
45 aquila_url,
46 data_types,
47 runtime_definitions,
48 flow_types,
49 }
50 }
51
52 pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
53 self.flow_types = flow_types;
54 self
55 }
56
57 pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
58 self.data_types = data_types;
59 self
60 }
61
62 pub fn with_runtime_definitions(
63 mut self,
64 runtime_definitions: Vec<RuntimeFunctionDefinition>,
65 ) -> Self {
66 self.runtime_definitions = runtime_definitions;
67 self
68 }
69
70 pub async fn send(&self) {
71 self.update_data_types().await;
72 self.update_runtime_definitions().await;
73 self.update_flow_types().await;
74 }
75
76 async fn update_data_types(&self) {
77 if self.data_types.is_empty() {
78 log::info!("No data types to update");
79 return;
80 }
81
82 log::info!("Updating the current DataTypes!");
83 let mut client = match DataTypeServiceClient::connect(self.aquila_url.clone()).await {
84 Ok(client) => {
85 log::info!("Successfully connected to the DataTypeService");
86 client
87 }
88 Err(err) => {
89 log::error!("Failed to connect to the DataTypeService: {:?}", err);
90 return;
91 }
92 };
93
94 let request = DataTypeUpdateRequest {
95 data_types: self.data_types.clone(),
96 };
97
98 match client.update(request).await {
99 Ok(response) => {
100 log::info!(
101 "Was the update of the DataTypes accepted by Sagittarius? {}",
102 response.into_inner().success
103 );
104 }
105 Err(err) => {
106 log::error!("Failed to update data types: {:?}", err);
107 }
108 }
109 }
110
111 async fn update_runtime_definitions(&self) {
112 if self.runtime_definitions.is_empty() {
113 log::info!("No runtime definitions to update");
114 return;
115 }
116
117 log::info!("Updating the current RuntimeDefinitions!");
118 let mut client =
119 match RuntimeFunctionDefinitionServiceClient::connect(self.aquila_url.clone()).await {
120 Ok(client) => {
121 log::info!("Connected to RuntimeFunctionDefinitionService");
122 client
123 }
124 Err(err) => {
125 log::error!(
126 "Failed to connect to RuntimeFunctionDefinitionService: {:?}",
127 err
128 );
129 return;
130 }
131 };
132
133 let request = RuntimeFunctionDefinitionUpdateRequest {
134 runtime_functions: self.runtime_definitions.clone(),
135 };
136
137 match client.update(request).await {
138 Ok(response) => {
139 log::info!(
140 "Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
141 response.into_inner().success
142 );
143 }
144 Err(err) => {
145 log::error!("Failed to update runtime function definitions: {:?}", err);
146 }
147 }
148 }
149
150 async fn update_flow_types(&self) {
151 if self.flow_types.is_empty() {
152 log::info!("No FlowTypes to update!");
153 return;
154 }
155
156 log::info!("Updating the current FlowTypes!");
157 let mut client = match FlowTypeServiceClient::connect(self.aquila_url.clone()).await {
158 Ok(client) => {
159 log::info!("Connected to FlowTypeService!");
160 client
161 }
162 Err(err) => {
163 log::error!("Failed to connect to FlowTypeService: {:?}", err);
164 return;
165 }
166 };
167
168 let request = FlowTypeUpdateRequest {
169 flow_types: self.flow_types.clone(),
170 };
171
172 match client.update(request).await {
173 Ok(response) => {
174 log::info!(
175 "Was the update of the FlowTypes accepted by Sagittarius? {}",
176 response.into_inner().success
177 );
178 }
179 Err(err) => {
180 log::error!("Failed to update flow types: {:?}", err);
181 }
182 }
183 }
184}