code0_flow/flow_definition/
mod.rs

1use tucana::{
2    aquila::{
3        DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest,
4        data_type_service_client::DataTypeServiceClient,
5        flow_type_service_client::FlowTypeServiceClient,
6        runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
7    },
8    shared::{DataType, FlowType, RuntimeFunctionDefinition},
9};
10
11pub struct FlowUpdateService {
12    aquila_url: String,
13    data_types: Vec<DataType>,
14    runtime_definitions: Vec<RuntimeFunctionDefinition>,
15    flow_types: Vec<FlowType>,
16}
17
18impl FlowUpdateService {
19    pub fn from_url(aquila_url: String) -> Self {
20        Self {
21            aquila_url,
22            data_types: Vec::new(),
23            runtime_definitions: Vec::new(),
24            flow_types: Vec::new(),
25        }
26    }
27
28    pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
29        self.flow_types = flow_types;
30        self
31    }
32
33    pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
34        self.data_types = data_types;
35        self
36    }
37
38    pub fn with_runtime_definitions(
39        mut self,
40        runtime_definitions: Vec<RuntimeFunctionDefinition>,
41    ) -> Self {
42        self.runtime_definitions = runtime_definitions;
43        self
44    }
45
46    pub async fn send(&self) {
47        self.update_data_types().await;
48        self.update_runtime_definitions().await;
49        self.update_flow_types().await;
50    }
51
52    async fn update_data_types(&self) {
53        if self.data_types.is_empty() {
54            log::info!("No data types to update");
55            return;
56        }
57
58        log::info!("Updating the current DataTypes!");
59        let mut client = match DataTypeServiceClient::connect(self.aquila_url.clone()).await {
60            Ok(client) => {
61                log::info!("Successfully connected to the DataTypeService");
62                client
63            }
64            Err(err) => {
65                log::error!("Failed to connect to the DataTypeService: {:?}", err);
66                return;
67            }
68        };
69
70        let request = DataTypeUpdateRequest {
71            data_types: self.data_types.clone(),
72        };
73
74        match client.update(request).await {
75            Ok(response) => {
76                log::info!(
77                    "Was the update of the DataTypes accepted by Sagittarius? {}",
78                    response.into_inner().success
79                );
80            }
81            Err(err) => {
82                log::error!("Failed to update data types: {:?}", err);
83            }
84        }
85    }
86
87    async fn update_runtime_definitions(&self) {
88        if self.runtime_definitions.is_empty() {
89            log::info!("No runtime definitions to update");
90            return;
91        }
92
93        log::info!("Updating the current RuntimeDefinitions!");
94        let mut client =
95            match RuntimeFunctionDefinitionServiceClient::connect(self.aquila_url.clone()).await {
96                Ok(client) => {
97                    log::info!("Connected to RuntimeFunctionDefinitionService");
98                    client
99                }
100                Err(err) => {
101                    log::error!(
102                        "Failed to connect to RuntimeFunctionDefinitionService: {:?}",
103                        err
104                    );
105                    return;
106                }
107            };
108
109        let request = RuntimeFunctionDefinitionUpdateRequest {
110            runtime_functions: self.runtime_definitions.clone(),
111        };
112
113        match client.update(request).await {
114            Ok(response) => {
115                log::info!(
116                    "Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
117                    response.into_inner().success
118                );
119            }
120            Err(err) => {
121                log::error!("Failed to update runtime function definitions: {:?}", err);
122            }
123        }
124    }
125
126    async fn update_flow_types(&self) {
127        if self.flow_types.is_empty() {
128            log::info!("No FlowTypes to update!");
129            return;
130        }
131
132        log::info!("Updating the current FlowTypes!");
133        let mut client = match FlowTypeServiceClient::connect(self.aquila_url.clone()).await {
134            Ok(client) => {
135                log::info!("Connected to FlowTypeService!");
136                client
137            }
138            Err(err) => {
139                log::error!("Failed to connect to FlowTypeService: {:?}", err);
140                return;
141            }
142        };
143
144        let request = FlowTypeUpdateRequest {
145            flow_types: self.flow_types.clone(),
146        };
147
148        match client.update(request).await {
149            Ok(response) => {
150                log::info!(
151                    "Was the update of the FlowTypes accepted by Sagittarius? {}",
152                    response.into_inner().success
153                );
154            }
155            Err(err) => {
156                log::error!("Failed to update flow types: {:?}", err);
157            }
158        }
159    }
160}