code0_flow/flow_definition/
mod.rs1use tucana::{
2 aquila::{
3 DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest,
4 data_type_service_client::DataTypeServiceClient,
5 flow_type_service_client::FlowTypeServiceClient,
6 runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
7 },
8 shared::{DefinitionDataType as DataType, FlowType, RuntimeFunctionDefinition},
9};
10
11pub struct FlowUpdateService {
12 aquila_url: String,
13 data_types: Vec<DataType>,
14 runtime_definitions: Vec<RuntimeFunctionDefinition>,
15 flow_types: Vec<FlowType>,
16}
17
18impl FlowUpdateService {
19 pub fn from_url(aquila_url: String, definition_path: &str) -> Self {
23 let mut data_types = Vec::new();
24 let mut runtime_definitions = Vec::new();
25 let mut flow_types = Vec::new();
26
27 let definitions = match code0_definition_reader::parser::Parser::from_path(definition_path)
28 {
29 Some(reader) => reader,
30 None => {
31 log::error!("No definition folder found at path: {}", definition_path);
32 return Self {
33 aquila_url,
34 data_types,
35 runtime_definitions,
36 flow_types,
37 };
38 }
39 };
40
41 for feature in definitions.features {
42 data_types.append(&mut feature.data_types.clone());
43 flow_types.append(&mut feature.flow_types.clone());
44 runtime_definitions.append(&mut feature.runtime_functions.clone());
45 }
46
47 Self {
48 aquila_url,
49 data_types,
50 runtime_definitions,
51 flow_types,
52 }
53 }
54
55 pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
56 self.flow_types = flow_types;
57 self
58 }
59
60 pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
61 self.data_types = data_types;
62 self
63 }
64
65 pub fn with_runtime_definitions(
66 mut self,
67 runtime_definitions: Vec<RuntimeFunctionDefinition>,
68 ) -> Self {
69 self.runtime_definitions = runtime_definitions;
70 self
71 }
72
73 pub async fn send(&self) {
74 self.update_data_types().await;
75 self.update_runtime_definitions().await;
76 self.update_flow_types().await;
77 }
78
79 async fn update_data_types(&self) {
80 if self.data_types.is_empty() {
81 log::info!("No data types to update");
82 return;
83 }
84
85 log::info!("Updating the current DataTypes!");
86 let mut client = match DataTypeServiceClient::connect(self.aquila_url.clone()).await {
87 Ok(client) => {
88 log::info!("Successfully connected to the DataTypeService");
89 client
90 }
91 Err(err) => {
92 log::error!("Failed to connect to the DataTypeService: {:?}", err);
93 return;
94 }
95 };
96
97 let request = DataTypeUpdateRequest {
98 data_types: self.data_types.clone(),
99 };
100
101 match client.update(request).await {
102 Ok(response) => {
103 log::info!(
104 "Was the update of the DataTypes accepted by Sagittarius? {}",
105 response.into_inner().success
106 );
107 }
108 Err(err) => {
109 log::error!("Failed to update data types: {:?}", err);
110 }
111 }
112 }
113
114 async fn update_runtime_definitions(&self) {
115 if self.runtime_definitions.is_empty() {
116 log::info!("No runtime definitions to update");
117 return;
118 }
119
120 log::info!("Updating the current RuntimeDefinitions!");
121 let mut client =
122 match RuntimeFunctionDefinitionServiceClient::connect(self.aquila_url.clone()).await {
123 Ok(client) => {
124 log::info!("Connected to RuntimeFunctionDefinitionService");
125 client
126 }
127 Err(err) => {
128 log::error!(
129 "Failed to connect to RuntimeFunctionDefinitionService: {:?}",
130 err
131 );
132 return;
133 }
134 };
135
136 let request = RuntimeFunctionDefinitionUpdateRequest {
137 runtime_functions: self.runtime_definitions.clone(),
138 };
139
140 match client.update(request).await {
141 Ok(response) => {
142 log::info!(
143 "Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
144 response.into_inner().success
145 );
146 }
147 Err(err) => {
148 log::error!("Failed to update runtime function definitions: {:?}", err);
149 }
150 }
151 }
152
153 async fn update_flow_types(&self) {
154 if self.flow_types.is_empty() {
155 log::info!("No FlowTypes to update!");
156 return;
157 }
158
159 log::info!("Updating the current FlowTypes!");
160 let mut client = match FlowTypeServiceClient::connect(self.aquila_url.clone()).await {
161 Ok(client) => {
162 log::info!("Connected to FlowTypeService!");
163 client
164 }
165 Err(err) => {
166 log::error!("Failed to connect to FlowTypeService: {:?}", err);
167 return;
168 }
169 };
170
171 let request = FlowTypeUpdateRequest {
172 flow_types: self.flow_types.clone(),
173 };
174
175 match client.update(request).await {
176 Ok(response) => {
177 log::info!(
178 "Was the update of the FlowTypes accepted by Sagittarius? {}",
179 response.into_inner().success
180 );
181 }
182 Err(err) => {
183 log::error!("Failed to update flow types: {:?}", err);
184 }
185 }
186 }
187}