code0_flow/flow_definition/
mod.rs1use tucana::{
2 aquila::{
3 data_type_service_client::DataTypeServiceClient,
4 flow_type_service_client::FlowTypeServiceClient,
5 runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
6 DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest,
7 },
8 shared::{DataType, FlowType, RuntimeFunctionDefinition},
9};
10
11pub struct FlowUpdateService {
12 aquila_url: String,
13 data_types: Vec<DataType>,
14 runtime_definitions: Vec<RuntimeFunctionDefinition>,
15 flow_types: Vec<FlowType>,
16}
17
18impl FlowUpdateService {
19 pub fn from_url(aquila_url: String) -> Self {
20 Self {
21 aquila_url,
22 data_types: Vec::new(),
23 runtime_definitions: Vec::new(),
24 flow_types: Vec::new(),
25 }
26 }
27
28 pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
29 self.flow_types = flow_types;
30 self
31 }
32
33 pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
34 self.data_types = data_types;
35 self
36 }
37
38 pub fn with_runtime_definitions(
39 mut self,
40 runtime_definitions: Vec<RuntimeFunctionDefinition>,
41 ) -> Self {
42 self.runtime_definitions = runtime_definitions;
43 self
44 }
45
46 pub async fn send(&self) {
47 self.update_data_types().await;
48 self.update_runtime_definitions().await;
49 self.update_flow_types().await;
50 }
51
52 async fn update_data_types(&self) {
53 if self.data_types.is_empty() {
54 return;
55 }
56
57 log::info!("Updating the current DataTypes!");
58 let mut client = match DataTypeServiceClient::connect(self.aquila_url.clone()).await {
59 Ok(client) => client,
60 Err(err) => {
61 log::error!("Failed to connect to DataTypeService: {}", err);
62 return;
63 }
64 };
65
66 let request = DataTypeUpdateRequest {
67 data_types: self.data_types.clone(),
68 };
69
70 match client.update(request).await {
71 Ok(response) => {
72 log::info!(
73 "Was the update of the DataTypes accepted by Sagittarius? {}",
74 response.into_inner().success
75 );
76 }
77 Err(err) => {
78 log::error!("Failed to update data types: {}", err);
79 }
80 }
81 }
82
83 async fn update_runtime_definitions(&self) {
84 if self.runtime_definitions.is_empty() {
85 return;
86 }
87
88 log::info!("Updating the current RuntimeDefinitions!");
89 let mut client =
90 match RuntimeFunctionDefinitionServiceClient::connect(self.aquila_url.clone()).await {
91 Ok(client) => client,
92 Err(err) => {
93 log::error!(
94 "Failed to connect to RuntimeFunctionDefinitionService: {}",
95 err
96 );
97 return;
98 }
99 };
100
101 let request = RuntimeFunctionDefinitionUpdateRequest {
102 runtime_functions: self.runtime_definitions.clone(),
103 };
104
105 match client.update(request).await {
106 Ok(response) => {
107 log::info!(
108 "Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
109 response.into_inner().success
110 );
111 }
112 Err(err) => {
113 log::error!("Failed to update runtime function definitions: {}", err);
114 }
115 }
116 }
117
118 async fn update_flow_types(&self) {
119 if self.flow_types.is_empty() {
120 return;
121 }
122
123 log::info!("Updating the current FlowTypes!");
124 let mut client = match FlowTypeServiceClient::connect(self.aquila_url.clone()).await {
125 Ok(client) => client,
126 Err(err) => {
127 log::error!("Failed to connect to FlowTypeService: {}", err);
128 return;
129 }
130 };
131
132 let request = FlowTypeUpdateRequest {
133 flow_types: self.flow_types.clone(),
134 };
135
136 match client.update(request).await {
137 Ok(response) => {
138 log::info!(
139 "Was the update of the FlowTypes accepted by Sagittarius? {}",
140 response.into_inner().success
141 );
142 }
143 Err(err) => {
144 log::error!("Failed to update flow types: {}", err);
145 }
146 }
147 }
148}