code0_flow/flow_service/
mod.rs1use crate::{flow_definition::Reader, flow_service::retry::create_channel_with_retry};
2use tonic::transport::Channel;
3use tucana::{
4 aquila::{
5 DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest,
6 data_type_service_client::DataTypeServiceClient,
7 flow_type_service_client::FlowTypeServiceClient,
8 runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
9 },
10 shared::{DefinitionDataType as DataType, FlowType, RuntimeFunctionDefinition},
11};
12
13mod retry;
14
15pub struct FlowUpdateService {
16 data_types: Vec<DataType>,
17 runtime_definitions: Vec<RuntimeFunctionDefinition>,
18 flow_types: Vec<FlowType>,
19 channel: Channel,
20}
21
22impl FlowUpdateService {
23 pub async fn from_url(aquila_url: String, definition_path: &str) -> Self {
27 let mut data_types = Vec::new();
28 let mut runtime_definitions = Vec::new();
29 let mut flow_types = Vec::new();
30
31 let reader = Reader::configure(definition_path.to_string(), true, vec![], None);
32
33 let features = match reader.read_features() {
34 Ok(features) => features,
35 Err(error) => {
36 log::error!("Error occurred while reading definitions: {:?}", error);
37 panic!("Error occurred while reading definitions")
38 }
39 };
40
41 for feature in features {
42 data_types.append(&mut feature.data_types.clone());
43 flow_types.append(&mut feature.flow_types.clone());
44 runtime_definitions.append(&mut feature.functions.clone());
45 }
46
47 let channel = create_channel_with_retry("Aquila", aquila_url).await;
48
49 Self {
50 data_types,
51 runtime_definitions,
52 flow_types,
53 channel,
54 }
55 }
56
57 pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
58 self.flow_types = flow_types;
59 self
60 }
61
62 pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
63 self.data_types = data_types;
64 self
65 }
66
67 pub fn with_runtime_definitions(
68 mut self,
69 runtime_definitions: Vec<RuntimeFunctionDefinition>,
70 ) -> Self {
71 self.runtime_definitions = runtime_definitions;
72 self
73 }
74
75 pub async fn send(&self) {
76 self.update_data_types().await;
77 self.update_runtime_definitions().await;
78 self.update_flow_types().await;
79 }
80
81 async fn update_data_types(&self) {
82 if self.data_types.is_empty() {
83 log::info!("No data types to update");
84 return;
85 }
86
87 log::info!("Updating the current DataTypes!");
88 let mut client = DataTypeServiceClient::new(self.channel.clone());
89 let request = DataTypeUpdateRequest {
90 data_types: self.data_types.clone(),
91 };
92
93 match client.update(request).await {
94 Ok(response) => {
95 log::info!(
96 "Was the update of the DataTypes accepted by Sagittarius? {}",
97 response.into_inner().success
98 );
99 }
100 Err(err) => {
101 log::error!("Failed to update data types: {:?}", err);
102 }
103 }
104 }
105
106 async fn update_runtime_definitions(&self) {
107 if self.runtime_definitions.is_empty() {
108 log::info!("No runtime definitions to update");
109 return;
110 }
111
112 log::info!("Updating the current RuntimeDefinitions!");
113 let mut client = RuntimeFunctionDefinitionServiceClient::new(self.channel.clone());
114 let request = RuntimeFunctionDefinitionUpdateRequest {
115 runtime_functions: self.runtime_definitions.clone(),
116 };
117
118 match client.update(request).await {
119 Ok(response) => {
120 log::info!(
121 "Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
122 response.into_inner().success
123 );
124 }
125 Err(err) => {
126 log::error!("Failed to update runtime function definitions: {:?}", err);
127 }
128 }
129 }
130
131 async fn update_flow_types(&self) {
132 if self.flow_types.is_empty() {
133 log::info!("No FlowTypes to update!");
134 return;
135 }
136
137 log::info!("Updating the current FlowTypes!");
138 let mut client = FlowTypeServiceClient::new(self.channel.clone());
139 let request = FlowTypeUpdateRequest {
140 flow_types: self.flow_types.clone(),
141 };
142
143 match client.update(request).await {
144 Ok(response) => {
145 log::info!(
146 "Was the update of the FlowTypes accepted by Sagittarius? {}",
147 response.into_inner().success
148 );
149 }
150 Err(err) => {
151 log::error!("Failed to update flow types: {:?}", err);
152 }
153 }
154 }
155}