Skip to main content

code0_flow/flow_service/
mod.rs

1use crate::{
2    flow_definition::Reader,
3    flow_service::{auth::get_authorization_metadata, retry::create_channel_with_retry},
4};
5use tonic::{Extensions, Request, transport::Channel};
6use tucana::{
7    aquila::{ModuleUpdateRequest, module_service_client::ModuleServiceClient},
8    shared::Module,
9};
10
11pub mod auth;
12pub mod retry;
13
14pub struct FlowUpdateService {
15    modules: Vec<Module>,
16    channel: Channel,
17    aquila_token: String,
18    definition_source: Option<String>,
19}
20
21impl FlowUpdateService {
22    /// Create a new FlowUpdateService instance from an Aquila URL and a definition path.
23    ///
24    /// This reads the definition files from the given path as modules and initializes the
25    /// service with those module definitions.
26    pub async fn from_url(aquila_url: String, definition_path: &str, aquila_token: String) -> Self {
27        let reader = Reader::configure(definition_path.to_string(), true, vec![], None);
28        let modules = match reader.read_modules() {
29            Ok(modules) => modules,
30            Err(error) => {
31                log::error!("Error occurred while reading definitions: {:?}", error);
32                panic!("Error occurred while reading definitions")
33            }
34        };
35
36        let channel = create_channel_with_retry("Aquila", aquila_url).await;
37
38        Self {
39            modules,
40            channel,
41            aquila_token,
42            definition_source: None,
43        }
44    }
45
46    pub fn with_definition_source(mut self, source: String) -> Self {
47        self.definition_source = Some(source);
48        self
49    }
50
51    pub async fn send(&mut self) {
52        let _ = self.send_with_status().await;
53    }
54
55    pub async fn send_with_status(&mut self) -> bool {
56        self.update().await
57    }
58
59    async fn update(&mut self) -> bool {
60        if self.modules.is_empty() {
61            log::info!("No Modules are present, aborting update.");
62            return true;
63        }
64
65        let mut modules = self.modules.clone();
66        if let Some(source) = &self.definition_source {
67            modules = modules
68                .into_iter()
69                .map(|module| apply_definition_source_to_module(module, source.clone()))
70                .collect::<Vec<_>>();
71        }
72
73        log::info!("Updating {} Modules.", self.modules.len());
74        let mut client = ModuleServiceClient::new(self.channel.clone());
75        let request = Request::from_parts(
76            get_authorization_metadata(&self.aquila_token),
77            Extensions::new(),
78            ModuleUpdateRequest { modules },
79        );
80
81        match client.update(request).await {
82            Ok(response) => {
83                let res = response.into_inner();
84
85                match res.success {
86                    true => log::info!("Module definition update has been successful"),
87                    false => log::warn!("Module definition update has been unsuccessful"),
88                };
89
90                res.success
91            }
92            Err(err) => {
93                log::error!("Module definition update failed. Reason: {:?}", err);
94                false
95            }
96        }
97    }
98}
99
100fn apply_definition_source_to_module(mut module: Module, source: String) -> Module {
101    for data_type in &mut module.definition_data_types {
102        data_type.definition_source = source.clone();
103    }
104    for flow_type in &mut module.flow_types {
105        flow_type.definition_source = Some(source.clone());
106    }
107    for runtime_flow_type in &mut module.runtime_flow_types {
108        runtime_flow_type.definition_source = Some(source.clone());
109    }
110    for function in &mut module.function_definitions {
111        function.definition_source = source.clone();
112    }
113    for runtime_function in &mut module.runtime_function_definitions {
114        runtime_function.definition_source = source.clone();
115    }
116
117    module
118}