code0_flow/flow_definition/
mod.rs

1use tucana::{
2    aquila::{
3        data_type_service_client::DataTypeServiceClient,
4        flow_type_service_client::FlowTypeServiceClient,
5        runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
6        DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest,
7    },
8    shared::{DataType, FlowType, RuntimeFunctionDefinition},
9};
10
11pub struct FlowUpdateService {
12    aquila_url: String,
13    data_types: Vec<DataType>,
14    runtime_definitions: Vec<RuntimeFunctionDefinition>,
15    flow_types: Vec<FlowType>,
16}
17
18impl FlowUpdateService {
19    pub fn from_url(aquila_url: String) -> Self {
20        Self {
21            aquila_url,
22            data_types: Vec::new(),
23            runtime_definitions: Vec::new(),
24            flow_types: Vec::new(),
25        }
26    }
27
28    pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
29        self.flow_types = flow_types;
30        self
31    }
32
33    pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
34        self.data_types = data_types;
35        self
36    }
37
38    pub fn with_runtime_definitions(
39        mut self,
40        runtime_definitions: Vec<RuntimeFunctionDefinition>,
41    ) -> Self {
42        self.runtime_definitions = runtime_definitions;
43        self
44    }
45
46    pub async fn send(&self) {
47        self.update_data_types().await;
48        self.update_runtime_definitions().await;
49        self.update_flow_types().await;
50    }
51
52    async fn update_data_types(&self) {
53        if self.data_types.is_empty() {
54            return;
55        }
56
57        log::info!("Updating the current DataTypes!");
58        let mut client = match DataTypeServiceClient::connect(self.aquila_url.clone()).await {
59            Ok(client) => client,
60            Err(err) => {
61                log::error!("Failed to connect to DataTypeService: {}", err);
62                return;
63            }
64        };
65
66        let request = DataTypeUpdateRequest {
67            data_types: self.data_types.clone(),
68        };
69
70        match client.update(request).await {
71            Ok(response) => {
72                log::info!(
73                    "Was the update of the DataTypes accepted by Sagittarius? {}",
74                    response.into_inner().success
75                );
76            }
77            Err(err) => {
78                log::error!("Failed to update data types: {}", err);
79            }
80        }
81    }
82
83    async fn update_runtime_definitions(&self) {
84        if self.runtime_definitions.is_empty() {
85            return;
86        }
87
88        log::info!("Updating the current RuntimeDefinitions!");
89        let mut client =
90            match RuntimeFunctionDefinitionServiceClient::connect(self.aquila_url.clone()).await {
91                Ok(client) => client,
92                Err(err) => {
93                    log::error!(
94                        "Failed to connect to RuntimeFunctionDefinitionService: {}",
95                        err
96                    );
97                    return;
98                }
99            };
100
101        let request = RuntimeFunctionDefinitionUpdateRequest {
102            runtime_functions: self.runtime_definitions.clone(),
103        };
104
105        match client.update(request).await {
106            Ok(response) => {
107                log::info!(
108                    "Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
109                    response.into_inner().success
110                );
111            }
112            Err(err) => {
113                log::error!("Failed to update runtime function definitions: {}", err);
114            }
115        }
116    }
117
118    async fn update_flow_types(&self) {
119        if self.flow_types.is_empty() {
120            return;
121        }
122
123        log::info!("Updating the current FlowTypes!");
124        let mut client = match FlowTypeServiceClient::connect(self.aquila_url.clone()).await {
125            Ok(client) => client,
126            Err(err) => {
127                log::error!("Failed to connect to FlowTypeService: {}", err);
128                return;
129            }
130        };
131
132        let request = FlowTypeUpdateRequest {
133            flow_types: self.flow_types.clone(),
134        };
135
136        match client.update(request).await {
137            Ok(response) => {
138                log::info!(
139                    "Was the update of the FlowTypes accepted by Sagittarius? {}",
140                    response.into_inner().success
141                );
142            }
143            Err(err) => {
144                log::error!("Failed to update flow types: {}", err);
145            }
146        }
147    }
148}