subgraph/data_sources/http/
mod.rs1use async_graphql::dynamic::FieldValue;
2use bson::Document;
3use http::{header::HeaderName, HeaderMap, HeaderValue};
4use log::{debug, trace};
5use reqwest::Client;
6
7use crate::{
8 configuration::subgraph::{
9 data_sources::http::{DefaultHeader, HttpDataSourceConfig},
10 entities::ServiceEntityConfig,
11 SubGraphConfig,
12 },
13 graphql::entity::create_return_types::{ResolverResponse, ResolverResponseMeta},
14 resolver_type::ResolverType,
15};
16
17use super::DataSource;
18pub mod filter;
19pub mod services;
20
21#[derive(Debug, Clone)]
22pub struct HttpDataSource {
23 pub client: Client,
24 pub config: HttpDataSourceConfig,
25}
26
27impl HttpDataSource {
28 fn get_headers(default_headers: Option<&Vec<DefaultHeader>>) -> HeaderMap {
29 let mut headers = HeaderMap::new();
30
31 if default_headers.is_some() {
32 let cloned_default_headers = default_headers.unwrap().clone();
33
34 for default_header in cloned_default_headers {
35 let header_name = HeaderName::from_bytes(default_header.name.as_bytes()).unwrap();
36 let header_value =
37 HeaderValue::from_bytes(default_header.value.as_bytes()).unwrap();
38 headers.insert(header_name, header_value);
39 }
40 }
41
42 headers
43 }
44
45 pub async fn init(http_data_source_config: &HttpDataSourceConfig) -> DataSource {
46 let header_config = http_data_source_config.default_headers.as_ref();
47 let headers = HttpDataSource::get_headers(header_config.clone());
48 let client = Client::builder().default_headers(headers).build();
49
50 match client {
51 Ok(client) => DataSource::HTTP(HttpDataSource {
52 client,
53 config: http_data_source_config.clone(),
54 }),
55 Err(error) => {
56 log::error!("Failed to build HTTP Client.");
57 debug!("{:?}", error);
58 panic!()
59 }
60 }
61 }
62
63 pub async fn execute_operation<'a>(
64 data_source: &DataSource,
65 input: Document,
66 entity: ServiceEntityConfig,
67 resolver_type: ResolverType,
68 subgraph_config: &SubGraphConfig,
69 ) -> Result<Option<FieldValue<'a>>, async_graphql::Error> {
70 debug!("Executing HTTP Data Source Operation");
71
72 let data_source = match data_source {
73 DataSource::HTTP(ds) => ds,
74 _ => unreachable!(),
75 };
76
77 trace!("HTTP Data Source: {:?}", data_source);
78
79 let filter =
80 HttpDataSource::create_filter(data_source, input, &entity, resolver_type).await?;
81
82 trace!("Filter Created: {:?}", filter);
83
84 match resolver_type {
85 ResolverType::FindOne => {
86 let result =
87 services::Services::find_one(data_source.client.clone(), filter).await?;
88 let res = ResolverResponse {
89 data: vec![FieldValue::owned_any(result)],
90 meta: ResolverResponseMeta {
91 request_id: uuid::Uuid::new_v4().to_string(),
92 service_name: subgraph_config.service.name.clone(),
93 service_version: subgraph_config.service.version.clone(),
94 executed_at: chrono::Utc::now()
95 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
96 count: 1,
97 total_count: 1,
98 page: 1,
99 total_pages: 1,
100 user_uuid: None,
101 },
102 };
103 Ok(Some(FieldValue::owned_any(res)))
104 }
105 ResolverType::FindMany => {
106 let results =
107 services::Services::find_many(data_source.client.clone(), filter).await?;
108 let count = results.len();
109 let res = ResolverResponse {
110 data: results
111 .into_iter()
112 .map(|doc| FieldValue::owned_any(doc))
113 .collect(),
114 meta: ResolverResponseMeta {
115 request_id: uuid::Uuid::new_v4().to_string(),
116 service_name: subgraph_config.service.name.clone(),
117 service_version: subgraph_config.service.version.clone(),
118 executed_at: chrono::Utc::now()
119 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
120 count: count as i64,
121 total_count: count as i64,
122 page: 1,
123 total_pages: 1,
124 user_uuid: None,
125 },
126 };
127 Ok(Some(FieldValue::owned_any(res)))
128 }
129 ResolverType::CreateOne => {
130 let result =
131 services::Services::create_one(data_source.client.clone(), filter).await?;
132 let res = ResolverResponse {
133 data: vec![FieldValue::owned_any(result)],
134 meta: ResolverResponseMeta {
135 request_id: uuid::Uuid::new_v4().to_string(),
136 service_name: subgraph_config.service.name.clone(),
137 service_version: subgraph_config.service.version.clone(),
138 executed_at: chrono::Utc::now()
139 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
140 count: 1,
141 total_count: 1,
142 page: 1,
143 total_pages: 1,
144 user_uuid: None,
145 },
146 };
147 Ok(Some(FieldValue::owned_any(res)))
148 }
149 ResolverType::UpdateOne => {
150 let result =
151 services::Services::update_one(data_source.client.clone(), filter).await?;
152 let res = ResolverResponse {
153 data: vec![FieldValue::owned_any(result)],
154 meta: ResolverResponseMeta {
155 request_id: uuid::Uuid::new_v4().to_string(),
156 service_name: subgraph_config.service.name.clone(),
157 service_version: subgraph_config.service.version.clone(),
158 executed_at: chrono::Utc::now()
159 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
160 count: 1,
161 total_count: 1,
162 page: 1,
163 total_pages: 1,
164 user_uuid: None,
165 },
166 };
167 Ok(Some(FieldValue::owned_any(res)))
168 }
169 ResolverType::UpdateMany => {
170 let results =
171 services::Services::update_many(data_source.client.clone(), filter).await?;
172 let count = results.len();
173 let res = ResolverResponse {
174 data: results
175 .into_iter()
176 .map(|doc| FieldValue::owned_any(doc))
177 .collect(),
178 meta: ResolverResponseMeta {
179 request_id: uuid::Uuid::new_v4().to_string(),
180 service_name: subgraph_config.service.name.clone(),
181 service_version: subgraph_config.service.version.clone(),
182 executed_at: chrono::Utc::now()
183 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
184 count: count as i64,
185 total_count: count as i64,
186 page: 1,
187 total_pages: 1,
188 user_uuid: None,
189 },
190 };
191 Ok(Some(FieldValue::owned_any(res)))
192 }
193 _ => panic!("Invalid resolver type"),
194 }
195 }
196}