Skip to main content

code0_flow/flow_service/
mod.rs

1use crate::{flow_definition::Reader, flow_service::retry::create_channel_with_retry};
2use tonic::transport::Channel;
3use tucana::{
4    aquila::{
5        DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest,
6        data_type_service_client::DataTypeServiceClient,
7        flow_type_service_client::FlowTypeServiceClient,
8        runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
9    },
10    shared::{DefinitionDataType as DataType, FlowType, RuntimeFunctionDefinition},
11};
12
13mod retry;
14
15pub struct FlowUpdateService {
16    data_types: Vec<DataType>,
17    runtime_definitions: Vec<RuntimeFunctionDefinition>,
18    flow_types: Vec<FlowType>,
19    channel: Channel,
20}
21
22impl FlowUpdateService {
23    /// Create a new FlowUpdateService instance from an Aquila URL and a definition path.
24    ///
25    /// This will read the definition files from the given path and initialize the service with the data types, runtime definitions, and flow types.
26    pub async fn from_url(aquila_url: String, definition_path: &str) -> Self {
27        let mut data_types = Vec::new();
28        let mut runtime_definitions = Vec::new();
29        let mut flow_types = Vec::new();
30
31        let reader = Reader::configure(definition_path.to_string(), true, vec![], None);
32
33        let features = match reader.read_features() {
34            Ok(features) => features,
35            Err(error) => {
36                log::error!("Error occurred while reading definitions: {:?}", error);
37                panic!("Error occurred while reading definitions")
38            }
39        };
40
41        for feature in features {
42            data_types.append(&mut feature.data_types.clone());
43            flow_types.append(&mut feature.flow_types.clone());
44            runtime_definitions.append(&mut feature.functions.clone());
45        }
46
47        let channel = create_channel_with_retry("Aquila", aquila_url).await;
48
49        Self {
50            data_types,
51            runtime_definitions,
52            flow_types,
53            channel,
54        }
55    }
56
57    pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
58        self.flow_types = flow_types;
59        self
60    }
61
62    pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
63        self.data_types = data_types;
64        self
65    }
66
67    pub fn with_runtime_definitions(
68        mut self,
69        runtime_definitions: Vec<RuntimeFunctionDefinition>,
70    ) -> Self {
71        self.runtime_definitions = runtime_definitions;
72        self
73    }
74
75    pub async fn send(&self) {
76        self.update_data_types().await;
77        self.update_runtime_definitions().await;
78        self.update_flow_types().await;
79    }
80
81    async fn update_data_types(&self) {
82        if self.data_types.is_empty() {
83            log::info!("No data types to update");
84            return;
85        }
86
87        log::info!("Updating the current DataTypes!");
88        let mut client = DataTypeServiceClient::new(self.channel.clone());
89        let request = DataTypeUpdateRequest {
90            data_types: self.data_types.clone(),
91        };
92
93        match client.update(request).await {
94            Ok(response) => {
95                log::info!(
96                    "Was the update of the DataTypes accepted by Sagittarius? {}",
97                    response.into_inner().success
98                );
99            }
100            Err(err) => {
101                log::error!("Failed to update data types: {:?}", err);
102            }
103        }
104    }
105
106    async fn update_runtime_definitions(&self) {
107        if self.runtime_definitions.is_empty() {
108            log::info!("No runtime definitions to update");
109            return;
110        }
111
112        log::info!("Updating the current RuntimeDefinitions!");
113        let mut client = RuntimeFunctionDefinitionServiceClient::new(self.channel.clone());
114        let request = RuntimeFunctionDefinitionUpdateRequest {
115            runtime_functions: self.runtime_definitions.clone(),
116        };
117
118        match client.update(request).await {
119            Ok(response) => {
120                log::info!(
121                    "Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
122                    response.into_inner().success
123                );
124            }
125            Err(err) => {
126                log::error!("Failed to update runtime function definitions: {:?}", err);
127            }
128        }
129    }
130
131    async fn update_flow_types(&self) {
132        if self.flow_types.is_empty() {
133            log::info!("No FlowTypes to update!");
134            return;
135        }
136
137        log::info!("Updating the current FlowTypes!");
138        let mut client = FlowTypeServiceClient::new(self.channel.clone());
139        let request = FlowTypeUpdateRequest {
140            flow_types: self.flow_types.clone(),
141        };
142
143        match client.update(request).await {
144            Ok(response) => {
145                log::info!(
146                    "Was the update of the FlowTypes accepted by Sagittarius? {}",
147                    response.into_inner().success
148                );
149            }
150            Err(err) => {
151                log::error!("Failed to update flow types: {:?}", err);
152            }
153        }
154    }
155}