code0_flow/flow_definition/
mod.rs

1use tucana::{
2    aquila::{
3        DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest,
4        data_type_service_client::DataTypeServiceClient,
5        flow_type_service_client::FlowTypeServiceClient,
6        runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
7    },
8    shared::{DefinitionDataType as DataType, FlowType, RuntimeFunctionDefinition},
9};
10
11pub struct FlowUpdateService {
12    aquila_url: String,
13    data_types: Vec<DataType>,
14    runtime_definitions: Vec<RuntimeFunctionDefinition>,
15    flow_types: Vec<FlowType>,
16}
17
18impl FlowUpdateService {
19    /// Create a new FlowUpdateService instance from an Aquila URL and a definition path.
20    ///
21    /// This will read the definition files from the given path and initialize the service with the data types, runtime definitions, and flow types.
22    pub fn from_url(aquila_url: String, definition_path: &str) -> Self {
23        let mut data_types = Vec::new();
24        let mut runtime_definitions = Vec::new();
25        let mut flow_types = Vec::new();
26
27        let definitions = match code0_definition_reader::parser::Parser::from_path(definition_path)
28        {
29            Some(reader) => reader,
30            None => {
31                log::error!("No definition folder found at path: {}", definition_path);
32                return Self {
33                    aquila_url,
34                    data_types,
35                    runtime_definitions,
36                    flow_types,
37                };
38            }
39        };
40
41        for feature in definitions.features {
42            data_types.append(&mut feature.data_types.clone());
43            flow_types.append(&mut feature.flow_types.clone());
44            runtime_definitions.append(&mut feature.runtime_functions.clone());
45        }
46
47        Self {
48            aquila_url,
49            data_types,
50            runtime_definitions,
51            flow_types,
52        }
53    }
54
55    pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
56        self.flow_types = flow_types;
57        self
58    }
59
60    pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
61        self.data_types = data_types;
62        self
63    }
64
65    pub fn with_runtime_definitions(
66        mut self,
67        runtime_definitions: Vec<RuntimeFunctionDefinition>,
68    ) -> Self {
69        self.runtime_definitions = runtime_definitions;
70        self
71    }
72
73    pub async fn send(&self) {
74        self.update_data_types().await;
75        self.update_runtime_definitions().await;
76        self.update_flow_types().await;
77    }
78
79    async fn update_data_types(&self) {
80        if self.data_types.is_empty() {
81            log::info!("No data types to update");
82            return;
83        }
84
85        log::info!("Updating the current DataTypes!");
86        let mut client = match DataTypeServiceClient::connect(self.aquila_url.clone()).await {
87            Ok(client) => {
88                log::info!("Successfully connected to the DataTypeService");
89                client
90            }
91            Err(err) => {
92                log::error!("Failed to connect to the DataTypeService: {:?}", err);
93                return;
94            }
95        };
96
97        let request = DataTypeUpdateRequest {
98            data_types: self.data_types.clone(),
99        };
100
101        match client.update(request).await {
102            Ok(response) => {
103                log::info!(
104                    "Was the update of the DataTypes accepted by Sagittarius? {}",
105                    response.into_inner().success
106                );
107            }
108            Err(err) => {
109                log::error!("Failed to update data types: {:?}", err);
110            }
111        }
112    }
113
114    async fn update_runtime_definitions(&self) {
115        if self.runtime_definitions.is_empty() {
116            log::info!("No runtime definitions to update");
117            return;
118        }
119
120        log::info!("Updating the current RuntimeDefinitions!");
121        let mut client =
122            match RuntimeFunctionDefinitionServiceClient::connect(self.aquila_url.clone()).await {
123                Ok(client) => {
124                    log::info!("Connected to RuntimeFunctionDefinitionService");
125                    client
126                }
127                Err(err) => {
128                    log::error!(
129                        "Failed to connect to RuntimeFunctionDefinitionService: {:?}",
130                        err
131                    );
132                    return;
133                }
134            };
135
136        let request = RuntimeFunctionDefinitionUpdateRequest {
137            runtime_functions: self.runtime_definitions.clone(),
138        };
139
140        match client.update(request).await {
141            Ok(response) => {
142                log::info!(
143                    "Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
144                    response.into_inner().success
145                );
146            }
147            Err(err) => {
148                log::error!("Failed to update runtime function definitions: {:?}", err);
149            }
150        }
151    }
152
153    async fn update_flow_types(&self) {
154        if self.flow_types.is_empty() {
155            log::info!("No FlowTypes to update!");
156            return;
157        }
158
159        log::info!("Updating the current FlowTypes!");
160        let mut client = match FlowTypeServiceClient::connect(self.aquila_url.clone()).await {
161            Ok(client) => {
162                log::info!("Connected to FlowTypeService!");
163                client
164            }
165            Err(err) => {
166                log::error!("Failed to connect to FlowTypeService: {:?}", err);
167                return;
168            }
169        };
170
171        let request = FlowTypeUpdateRequest {
172            flow_types: self.flow_types.clone(),
173        };
174
175        match client.update(request).await {
176            Ok(response) => {
177                log::info!(
178                    "Was the update of the FlowTypes accepted by Sagittarius? {}",
179                    response.into_inner().success
180                );
181            }
182            Err(err) => {
183                log::error!("Failed to update flow types: {:?}", err);
184            }
185        }
186    }
187}