subgraph/data_sources/
mod.rs

1use async_graphql::dynamic::FieldValue;
2use bson::Document;
3use log::debug;
4
5use crate::{
6    cli_args::CliArgs,
7    configuration::subgraph::{
8        data_sources::ServiceDataSourceConfig, entities::ServiceEntityConfig, SubGraphConfig,
9    },
10    graphql::schema::create_auth_service::TokenData,
11    resolver_type::ResolverType,
12};
13
14pub mod http;
15pub mod mongo;
16pub mod sql;
17
18#[derive(Debug, Clone)]
19pub enum DataSource {
20    Mongo(mongo::MongoDataSource),
21    HTTP(http::HttpDataSource),
22    SQL(sql::SqlDataSource),
23}
24
25#[derive(Debug, Clone)]
26pub struct DataSources {
27    sources: Vec<DataSource>,
28}
29
30pub struct TotalCount(i64);
31
32impl DataSources {
33    /// Initialize Data Sources
34    pub async fn init(
35        service_data_source_configs: Vec<ServiceDataSourceConfig>,
36        args: &CliArgs,
37        subgraph_config: &SubGraphConfig,
38    ) -> DataSources {
39        debug!("Initializing Data Sources");
40        let mut data_sources = vec![];
41        for service_data_source_config in service_data_source_configs {
42            match service_data_source_config {
43                ServiceDataSourceConfig::Mongo(conf) => {
44                    data_sources.push(mongo::MongoDataSource::init(&conf).await);
45                }
46                ServiceDataSourceConfig::HTTP(conf) => {
47                    data_sources.push(http::HttpDataSource::init(&conf).await);
48                }
49                ServiceDataSourceConfig::SQL(conf) => {
50                    data_sources
51                        .push(sql::SqlDataSource::init(&conf, args, subgraph_config.clone()).await);
52                }
53            };
54        }
55
56        DataSources {
57            sources: data_sources,
58        }
59    }
60
61    /// Provide entity and all data sources to get the data source for the entity.
62    pub fn get_entity_data_soruce<'a>(
63        data_sources: &'a DataSources,
64        entity: &ServiceEntityConfig,
65    ) -> &'a DataSource {
66        debug!("Getting Data Source for Entity");
67        if entity.data_source.is_some() {
68            let data_source = match entity.data_source.as_ref().unwrap().from.as_ref() {
69                Some(ds_name) => {
70                    let data_source = data_sources
71                        .sources
72                        .iter()
73                        .find(|data_source| match data_source {
74                            DataSource::Mongo(ds) => &ds.config.name == ds_name,
75                            DataSource::HTTP(ds) => &ds.config.name == ds_name,
76                            DataSource::SQL(ds) => &ds.config.name == ds_name,
77                        })
78                        .unwrap(); //TODO: Unwrap here needs error handling
79                    data_source
80                }
81                _ => panic!("Data source specified for entity but not found."),
82            };
83            data_source
84        } else {
85            data_sources.sources.first().unwrap()
86        }
87    }
88
89    pub fn get_data_source_by_name(data_soruces: &DataSources, name: &str) -> DataSource {
90        debug!("Getting Data Source by Name");
91        let data_source = data_soruces
92            .sources
93            .iter()
94            .find(|data_source| match data_source {
95                DataSource::Mongo(ds) => &ds.config.name == name,
96                DataSource::HTTP(ds) => &ds.config.name == name,
97                DataSource::SQL(ds) => &ds.config.name == name,
98            })
99            .unwrap();
100        data_source.clone()
101    }
102
103    /// Execute a data source operation.
104    pub async fn execute<'a>(
105        data_sources: &DataSources,
106        input: Document,
107        entity: ServiceEntityConfig,
108        resolver_type: ResolverType,
109        subgraph_config: &SubGraphConfig,
110        token_data: &Option<TokenData>,
111        has_selection_set: bool,
112    ) -> Result<Option<FieldValue<'a>>, async_graphql::Error> {
113        debug!("Executing Datasource Operation");
114
115        let cloned_entity = entity.clone();
116
117        let data_source = DataSources::get_entity_data_soruce(data_sources, &entity);
118
119        match data_source {
120            DataSource::Mongo(_ds) => Ok(mongo::MongoDataSource::execute_operation(
121                data_source,
122                input,
123                cloned_entity,
124                resolver_type,
125                subgraph_config,
126            )
127            .await?),
128            DataSource::HTTP(_ds) => Ok(http::HttpDataSource::execute_operation(
129                data_source,
130                input,
131                cloned_entity,
132                resolver_type,
133                subgraph_config,
134            )
135            .await?),
136            DataSource::SQL(_ds) => Ok(sql::SqlDataSource::execute_operation(
137                data_source,
138                input,
139                cloned_entity,
140                resolver_type,
141                subgraph_config,
142                token_data,
143                has_selection_set,
144            )
145            .await?),
146        }
147    }
148}