code0_flow/flow_definition/
mod.rs1use tucana::{
2 aquila::{
3 DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest,
4 data_type_service_client::DataTypeServiceClient,
5 flow_type_service_client::FlowTypeServiceClient,
6 runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
7 },
8 shared::{DataType, FlowType, RuntimeFunctionDefinition},
9};
10
11pub struct FlowUpdateService {
12 aquila_url: String,
13 data_types: Vec<DataType>,
14 runtime_definitions: Vec<RuntimeFunctionDefinition>,
15 flow_types: Vec<FlowType>,
16}
17
18impl FlowUpdateService {
19 pub fn from_url(aquila_url: String) -> Self {
20 Self {
21 aquila_url,
22 data_types: Vec::new(),
23 runtime_definitions: Vec::new(),
24 flow_types: Vec::new(),
25 }
26 }
27
28 pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
29 self.flow_types = flow_types;
30 self
31 }
32
33 pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
34 self.data_types = data_types;
35 self
36 }
37
38 pub fn with_runtime_definitions(
39 mut self,
40 runtime_definitions: Vec<RuntimeFunctionDefinition>,
41 ) -> Self {
42 self.runtime_definitions = runtime_definitions;
43 self
44 }
45
46 pub async fn send(&self) {
47 self.update_data_types().await;
48 self.update_runtime_definitions().await;
49 self.update_flow_types().await;
50 }
51
52 async fn update_data_types(&self) {
53 if self.data_types.is_empty() {
54 log::info!("No data types to update");
55 return;
56 }
57
58 log::info!("Updating the current DataTypes!");
59 let mut client = match DataTypeServiceClient::connect(self.aquila_url.clone()).await {
60 Ok(client) => {
61 log::info!("Successfully connected to the DataTypeService");
62 client
63 }
64 Err(err) => {
65 log::error!("Failed to connect to the DataTypeService: {:?}", err);
66 return;
67 }
68 };
69
70 let request = DataTypeUpdateRequest {
71 data_types: self.data_types.clone(),
72 };
73
74 match client.update(request).await {
75 Ok(response) => {
76 log::info!(
77 "Was the update of the DataTypes accepted by Sagittarius? {}",
78 response.into_inner().success
79 );
80 }
81 Err(err) => {
82 log::error!("Failed to update data types: {:?}", err);
83 }
84 }
85 }
86
87 async fn update_runtime_definitions(&self) {
88 if self.runtime_definitions.is_empty() {
89 log::info!("No runtime definitions to update");
90 return;
91 }
92
93 log::info!("Updating the current RuntimeDefinitions!");
94 let mut client =
95 match RuntimeFunctionDefinitionServiceClient::connect(self.aquila_url.clone()).await {
96 Ok(client) => {
97 log::info!("Connected to RuntimeFunctionDefinitionService");
98 client
99 }
100 Err(err) => {
101 log::error!(
102 "Failed to connect to RuntimeFunctionDefinitionService: {:?}",
103 err
104 );
105 return;
106 }
107 };
108
109 let request = RuntimeFunctionDefinitionUpdateRequest {
110 runtime_functions: self.runtime_definitions.clone(),
111 };
112
113 match client.update(request).await {
114 Ok(response) => {
115 log::info!(
116 "Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
117 response.into_inner().success
118 );
119 }
120 Err(err) => {
121 log::error!("Failed to update runtime function definitions: {:?}", err);
122 }
123 }
124 }
125
126 async fn update_flow_types(&self) {
127 if self.flow_types.is_empty() {
128 log::info!("No FlowTypes to update!");
129 return;
130 }
131
132 log::info!("Updating the current FlowTypes!");
133 let mut client = match FlowTypeServiceClient::connect(self.aquila_url.clone()).await {
134 Ok(client) => {
135 log::info!("Connected to FlowTypeService!");
136 client
137 }
138 Err(err) => {
139 log::error!("Failed to connect to FlowTypeService: {:?}", err);
140 return;
141 }
142 };
143
144 let request = FlowTypeUpdateRequest {
145 flow_types: self.flow_types.clone(),
146 };
147
148 match client.update(request).await {
149 Ok(response) => {
150 log::info!(
151 "Was the update of the FlowTypes accepted by Sagittarius? {}",
152 response.into_inner().success
153 );
154 }
155 Err(err) => {
156 log::error!("Failed to update flow types: {:?}", err);
157 }
158 }
159 }
160}