subgraph/data_sources/sql/
mod.rs1use std::{path::Path, str::FromStr};
2
3use async_graphql::dynamic::FieldValue;
4use bson::{to_document, Document};
5use log::{debug, error, info, trace};
6use sqlx::{sqlite::SqliteConnectOptions, MySql, Pool, Postgres, Sqlite};
7
8use crate::{
9 cli_args::CliArgs,
10 configuration::subgraph::{
11 data_sources::sql::{DialectEnum, SqlDataSourceConfig},
12 entities::ServiceEntityConfig,
13 SubGraphConfig,
14 },
15 graphql::{
16 entity::create_return_types::{ResolverResponse, ResolverResponseMeta},
17 schema::create_auth_service::TokenData,
18 },
19 resolver_type::ResolverType,
20 sql_value::SqlValue,
21};
22
23use super::DataSource;
24
25pub mod create_query;
26pub mod services;
27
28#[derive(Debug, Clone)]
29pub struct SqlDataSource {
30 pub pool: PoolEnum,
31 pub config: SqlDataSourceConfig,
32 pub subgraph_config: SubGraphConfig,
33}
34
35#[derive(Debug, Clone)]
36pub struct TestEnum {
37 pub pool: PoolEnum,
38}
39
40#[derive(Debug, Clone)]
41pub enum PoolEnum {
42 MySql(Pool<MySql>),
43 Postgres(Pool<Postgres>),
44 SqLite(Pool<Sqlite>),
45}
46
47#[derive(Debug, Clone)]
48pub struct SqlQuery {
49 query: String,
50 count_query: Option<String>,
51 identifier_query: Option<String>,
52 values: Vec<SqlValue>,
53 where_values: Vec<SqlValue>,
54 table: String,
55}
56
57impl SqlDataSource {
58 pub async fn init(
59 sql_data_source_config: &SqlDataSourceConfig,
60 args: &CliArgs,
61 subgraph_config: SubGraphConfig,
62 ) -> DataSource {
63 debug!("Initializing SQL Data Source");
64
65 let pool: PoolEnum = match sql_data_source_config.dialect {
67 DialectEnum::SQLITE => {
68 trace!("Creating SQLite Pool: {:?}", &sql_data_source_config.uri);
69
70 let options = if let Some(extensions) = &sql_data_source_config.sqlite_extensions {
71 trace!("Creating SQLite Pool with Extensions: {:?}", &extensions);
72 let mut options = SqliteConnectOptions::from_str(&sql_data_source_config.uri)
73 .expect("Failed to create SqliteConnectOptions with extensions");
74 for extension in extensions {
75 options = options.extension(extension.clone());
76 }
77 options
78 } else {
79 SqliteConnectOptions::from_str(&sql_data_source_config.uri)
80 .expect("Failed to create SqliteConnectOptions")
81 };
82
83 let pool = sqlx::sqlite::SqlitePoolOptions::new()
84 .max_connections(5)
85 .connect_with(options)
86 .await
87 .unwrap();
88
89 PoolEnum::SqLite(pool)
90 }
91 DialectEnum::POSTGRES => {
92 trace!("Creating Postgres Pool: {:?}", &sql_data_source_config.uri);
93 let pool = sqlx::postgres::PgPoolOptions::new()
94 .max_connections(5)
95 .connect(&sql_data_source_config.uri)
96 .await
97 .unwrap();
98 PoolEnum::Postgres(pool)
99 }
100 DialectEnum::MYSQL => {
101 trace!("Creating MySql Pool: {:?}", &sql_data_source_config.uri);
102 let pool = sqlx::mysql::MySqlPoolOptions::new()
103 .max_connections(5)
104 .connect(&sql_data_source_config.uri)
105 .await
106 .unwrap();
107 PoolEnum::MySql(pool)
108 }
109 };
110
111 if let Some(migrate) = &args.migrate {
112 if migrate == "run" {
113 let path = sql_data_source_config.migrations_path.clone();
114 if path.is_some() {
115 let path = path.unwrap();
116 info!("Running Migrations: {:?}", &path);
117
118 let migration = sqlx::migrate::Migrator::new(Path::new(&path)).await;
119 match migration {
120 Ok(migration) => match &pool {
121 PoolEnum::MySql(pool) => {
122 let migration_completed = migration.run(pool).await;
123 match migration_completed {
124 Ok(_) => {
125 info!("Migration Complete");
126 }
127 Err(e) => {
128 error!("MySQL Migration Error: {:?}", e);
129 }
130 }
131 }
132 PoolEnum::Postgres(pool) => {
133 let completed = migration.run(pool).await;
134 match completed {
135 Ok(_) => {
136 info!("Migration Complete");
137 }
138 Err(e) => {
139 error!("Postgres Migration Error: {:?}", e);
140 }
141 }
142 }
143 PoolEnum::SqLite(pool) => {
144 let completed = migration.run(pool).await;
145 match completed {
146 Ok(_) => {
147 info!("Migration Complete");
148 }
149 Err(e) => {
150 error!("SQLITE Migration Error: {:?}", e);
151 }
152 }
153 }
154 },
155 Err(e) => {
156 error!("Migrations Failed: {:?}", e);
157 }
158 }
159 }
160 } else if migrate == "revert" {
161 }
163 }
164
165 DataSource::SQL(SqlDataSource {
166 pool,
167 config: sql_data_source_config.clone(),
168 subgraph_config,
169 })
170 }
171
172 pub async fn execute_operation<'a>(
173 data_source: &DataSource,
174 input: Document,
175 entity: ServiceEntityConfig,
176 resolver_type: ResolverType,
177 subgraph_config: &SubGraphConfig,
178 token_data: &Option<TokenData>,
179 has_selection_set: bool,
180 ) -> Result<Option<FieldValue<'a>>, async_graphql::Error> {
181 debug!("Executing SQL Operation");
182
183 let data_source = match data_source {
184 DataSource::SQL(ds) => ds,
185 _ => unreachable!(),
186 };
187
188 let entity_data_source = ServiceEntityConfig::get_entity_data_source(&entity);
189
190 let table;
191
192 if entity_data_source.is_some() {
194 if entity_data_source.clone().unwrap().table.is_some() {
195 table = entity_data_source.unwrap().table.unwrap();
196 } else {
197 table = entity.name.to_string();
198 }
199 } else {
200 table = entity.name.to_string();
201 }
202
203 let query = SqlDataSource::create_query(
204 input.clone(),
205 resolver_type,
206 &table,
207 data_source.config.dialect.clone(),
208 &entity,
209 &subgraph_config,
210 )?;
211
212 let user_uuid = if token_data.is_some() {
213 Some(token_data.as_ref().unwrap().user_uuid.to_string())
214 } else {
215 None
216 };
217
218 match resolver_type {
220 ResolverType::FindOne => {
221 let result = services::Services::find_one(&data_source.pool, &query).await?;
222 let res = ResolverResponse {
223 data: vec![FieldValue::owned_any(result)],
224 meta: ResolverResponseMeta {
225 request_id: uuid::Uuid::new_v4().to_string(),
226 service_name: subgraph_config.service.name.clone(),
227 service_version: subgraph_config.service.version.clone(),
228 executed_at: chrono::Utc::now()
229 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
230 count: 1,
231 total_count: 1,
232 page: 1,
233 total_pages: 1,
234 user_uuid,
235 },
236 };
237 Ok(Some(FieldValue::owned_any(res)))
238 }
239 ResolverType::FindMany => {
240 let (entities, total_count) =
241 services::Services::find_many(&data_source.pool, &query, &has_selection_set)
242 .await?;
243 let count = entities.len();
244 let opts_doc = if input.clone().get("opts").is_some() {
245 trace!("opts: {:?}", input.get("opts").unwrap());
246 to_document(input.get("opts").unwrap()).unwrap()
247 } else {
248 let mut d = Document::new();
249 d.insert("per_page", 10);
250 d.insert("page", 1);
251 trace!("created opts: {:?}", d);
252 d
253 };
254 trace!("opts_doc: {:?}", opts_doc);
255 let page = opts_doc
256 .get_i64("page")
257 .unwrap_or(opts_doc.get_i32("page").unwrap_or(1) as i64);
258 let per_page = opts_doc
259 .get_i64("per_page")
260 .unwrap_or(opts_doc.get_i32("per_page").unwrap_or(10) as i64);
261 trace!("per_page: {:?}", per_page);
262 let res = ResolverResponse {
263 data: entities
264 .into_iter()
265 .map(|row| FieldValue::owned_any(row))
266 .collect(),
267 meta: ResolverResponseMeta {
268 request_id: uuid::Uuid::new_v4().to_string(),
269 service_name: subgraph_config.service.name.clone(),
270 service_version: subgraph_config.service.version.clone(),
271 executed_at: chrono::Utc::now()
272 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
273 count: count as i64,
274 total_count: total_count.0,
275 page,
276 total_pages: if per_page == -1 {
277 1
278 } else {
279 (total_count.0 / per_page) + 1
280 },
281 user_uuid,
282 },
283 };
284
285 Ok(Some(FieldValue::owned_any(res)))
286 }
287 ResolverType::CreateOne => {
288 let result = services::Services::create_one(
289 &entity,
290 &data_source.pool,
291 &query,
292 data_source.config.dialect.clone(),
293 &subgraph_config,
294 )
295 .await?;
296
297 let res = ResolverResponse {
298 data: vec![FieldValue::owned_any(result)],
299 meta: ResolverResponseMeta {
300 request_id: uuid::Uuid::new_v4().to_string(),
301 service_name: subgraph_config.service.name.clone(),
302 service_version: subgraph_config.service.version.clone(),
303 executed_at: chrono::Utc::now()
304 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
305 count: 1,
306 total_count: 1,
307 page: 1,
308 total_pages: 1,
309 user_uuid,
310 },
311 };
312
313 Ok(Some(FieldValue::owned_any(res)))
314 }
315 ResolverType::UpdateOne => {
316 let result =
317 services::Services::update_one(&entity, &data_source.pool, &query).await?;
318 let res = ResolverResponse {
319 data: vec![FieldValue::owned_any(result)],
320 meta: ResolverResponseMeta {
321 request_id: uuid::Uuid::new_v4().to_string(),
322 service_name: subgraph_config.service.name.clone(),
323 service_version: subgraph_config.service.version.clone(),
324 executed_at: chrono::Utc::now()
325 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
326 count: 1,
327 total_count: 1,
328 page: 1,
329 total_pages: 1,
330 user_uuid,
331 },
332 };
333 Ok(Some(FieldValue::owned_any(res)))
334 }
335 ResolverType::UpdateMany => {
336 let results =
337 services::Services::update_many(&entity, &data_source.pool, &query).await?;
338 let count = results.len();
339 let res = ResolverResponse {
340 data: results
341 .into_iter()
342 .map(|row| FieldValue::owned_any(row))
343 .collect(),
344 meta: ResolverResponseMeta {
345 request_id: uuid::Uuid::new_v4().to_string(),
346 service_name: subgraph_config.service.name.clone(),
347 service_version: subgraph_config.service.version.clone(),
348 executed_at: chrono::Utc::now()
349 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
350 count: count as i64,
351 total_count: count as i64,
352 page: 1,
353 total_pages: 1,
354 user_uuid,
355 },
356 };
357 Ok(Some(FieldValue::owned_any(res)))
358 }
359 _ => panic!("Invalid resolver type"),
360 }
361 }
362}