code0_flow/flow_service/
mod.rs1use crate::{
2 flow_definition::Reader,
3 flow_service::{auth::get_authorization_metadata, retry::create_channel_with_retry},
4};
5use tonic::{Extensions, Request, transport::Channel};
6use tucana::{
7 aquila::{ModuleUpdateRequest, module_service_client::ModuleServiceClient},
8 shared::Module,
9};
10
11pub mod auth;
12pub mod retry;
13
14pub struct FlowUpdateService {
15 modules: Vec<Module>,
16 channel: Channel,
17 aquila_token: String,
18 definition_source: Option<String>,
19}
20
21impl FlowUpdateService {
22 pub async fn from_url(aquila_url: String, definition_path: &str, aquila_token: String) -> Self {
27 let reader = Reader::configure(definition_path.to_string(), true, vec![], None);
28 let modules = match reader.read_modules() {
29 Ok(modules) => modules,
30 Err(error) => {
31 log::error!("Error occurred while reading definitions: {:?}", error);
32 panic!("Error occurred while reading definitions")
33 }
34 };
35
36 let channel = create_channel_with_retry("Aquila", aquila_url).await;
37
38 Self {
39 modules,
40 channel,
41 aquila_token,
42 definition_source: None,
43 }
44 }
45
46 pub fn with_definition_source(mut self, source: String) -> Self {
47 self.definition_source = Some(source);
48 self
49 }
50
51 pub async fn send(&mut self) {
52 let _ = self.send_with_status().await;
53 }
54
55 pub async fn send_with_status(&mut self) -> bool {
56 self.update().await
57 }
58
59 async fn update(&mut self) -> bool {
60 if self.modules.is_empty() {
61 log::info!("No Modules are present, aborting update.");
62 return true;
63 }
64
65 let mut modules = self.modules.clone();
66 if let Some(source) = &self.definition_source {
67 modules = modules
68 .into_iter()
69 .map(|module| apply_definition_source_to_module(module, source.clone()))
70 .collect::<Vec<_>>();
71 }
72
73 log::info!("Updating {} Modules.", self.modules.len());
74 let mut client = ModuleServiceClient::new(self.channel.clone());
75 let request = Request::from_parts(
76 get_authorization_metadata(&self.aquila_token),
77 Extensions::new(),
78 ModuleUpdateRequest { modules },
79 );
80
81 match client.update(request).await {
82 Ok(response) => {
83 let res = response.into_inner();
84
85 match res.success {
86 true => log::info!("Module definition update has been successful"),
87 false => log::warn!("Module definition update has been unsuccessful"),
88 };
89
90 res.success
91 }
92 Err(err) => {
93 log::error!("Module definition update failed. Reason: {:?}", err);
94 false
95 }
96 }
97 }
98}
99
100fn apply_definition_source_to_module(mut module: Module, source: String) -> Module {
101 for data_type in &mut module.definition_data_types {
102 data_type.definition_source = source.clone();
103 }
104 for flow_type in &mut module.flow_types {
105 flow_type.definition_source = Some(source.clone());
106 }
107 for runtime_flow_type in &mut module.runtime_flow_types {
108 runtime_flow_type.definition_source = Some(source.clone());
109 }
110 for function in &mut module.function_definitions {
111 function.definition_source = source.clone();
112 }
113 for runtime_function in &mut module.runtime_function_definitions {
114 runtime_function.definition_source = source.clone();
115 }
116
117 module
118}