subgraph/data_sources/sql/
mod.rs

1use std::{path::Path, str::FromStr};
2
3use async_graphql::dynamic::FieldValue;
4use bson::{to_document, Document};
5use log::{debug, error, info, trace};
6use sqlx::{sqlite::SqliteConnectOptions, MySql, Pool, Postgres, Sqlite};
7
8use crate::{
9    cli_args::CliArgs,
10    configuration::subgraph::{
11        data_sources::sql::{DialectEnum, SqlDataSourceConfig},
12        entities::ServiceEntityConfig,
13        SubGraphConfig,
14    },
15    graphql::{
16        entity::create_return_types::{ResolverResponse, ResolverResponseMeta},
17        schema::create_auth_service::TokenData,
18    },
19    resolver_type::ResolverType,
20    sql_value::SqlValue,
21};
22
23use super::DataSource;
24
25pub mod create_query;
26pub mod services;
27
28#[derive(Debug, Clone)]
29pub struct SqlDataSource {
30    pub pool: PoolEnum,
31    pub config: SqlDataSourceConfig,
32    pub subgraph_config: SubGraphConfig,
33}
34
35#[derive(Debug, Clone)]
36pub struct TestEnum {
37    pub pool: PoolEnum,
38}
39
40#[derive(Debug, Clone)]
41pub enum PoolEnum {
42    MySql(Pool<MySql>),
43    Postgres(Pool<Postgres>),
44    SqLite(Pool<Sqlite>),
45}
46
47#[derive(Debug, Clone)]
48pub struct SqlQuery {
49    query: String,
50    count_query: Option<String>,
51    identifier_query: Option<String>,
52    values: Vec<SqlValue>,
53    where_values: Vec<SqlValue>,
54    table: String,
55}
56
57impl SqlDataSource {
58    pub async fn init(
59        sql_data_source_config: &SqlDataSourceConfig,
60        args: &CliArgs,
61        subgraph_config: SubGraphConfig,
62    ) -> DataSource {
63        debug!("Initializing SQL Data Source");
64
65        // Create the pool
66        let pool: PoolEnum = match sql_data_source_config.dialect {
67            DialectEnum::SQLITE => {
68                trace!("Creating SQLite Pool: {:?}", &sql_data_source_config.uri);
69
70                let options = if let Some(extensions) = &sql_data_source_config.sqlite_extensions {
71                    trace!("Creating SQLite Pool with Extensions: {:?}", &extensions);
72                    let mut options = SqliteConnectOptions::from_str(&sql_data_source_config.uri)
73                        .expect("Failed to create SqliteConnectOptions with extensions");
74                    for extension in extensions {
75                        options = options.extension(extension.clone());
76                    }
77                    options
78                } else {
79                    SqliteConnectOptions::from_str(&sql_data_source_config.uri)
80                        .expect("Failed to create SqliteConnectOptions")
81                };
82
83                let pool = sqlx::sqlite::SqlitePoolOptions::new()
84                    .max_connections(5)
85                    .connect_with(options)
86                    .await
87                    .unwrap();
88
89                PoolEnum::SqLite(pool)
90            }
91            DialectEnum::POSTGRES => {
92                trace!("Creating Postgres Pool: {:?}", &sql_data_source_config.uri);
93                let pool = sqlx::postgres::PgPoolOptions::new()
94                    .max_connections(5)
95                    .connect(&sql_data_source_config.uri)
96                    .await
97                    .unwrap();
98                PoolEnum::Postgres(pool)
99            }
100            DialectEnum::MYSQL => {
101                trace!("Creating MySql Pool: {:?}", &sql_data_source_config.uri);
102                let pool = sqlx::mysql::MySqlPoolOptions::new()
103                    .max_connections(5)
104                    .connect(&sql_data_source_config.uri)
105                    .await
106                    .unwrap();
107                PoolEnum::MySql(pool)
108            }
109        };
110
111        if let Some(migrate) = &args.migrate {
112            if migrate == "run" {
113                let path = sql_data_source_config.migrations_path.clone();
114                if path.is_some() {
115                    let path = path.unwrap();
116                    info!("Running Migrations: {:?}", &path);
117
118                    let migration = sqlx::migrate::Migrator::new(Path::new(&path)).await;
119                    match migration {
120                        Ok(migration) => match &pool {
121                            PoolEnum::MySql(pool) => {
122                                let migration_completed = migration.run(pool).await;
123                                match migration_completed {
124                                    Ok(_) => {
125                                        info!("Migration Complete");
126                                    }
127                                    Err(e) => {
128                                        error!("MySQL Migration Error: {:?}", e);
129                                    }
130                                }
131                            }
132                            PoolEnum::Postgres(pool) => {
133                                let completed = migration.run(pool).await;
134                                match completed {
135                                    Ok(_) => {
136                                        info!("Migration Complete");
137                                    }
138                                    Err(e) => {
139                                        error!("Postgres Migration Error: {:?}", e);
140                                    }
141                                }
142                            }
143                            PoolEnum::SqLite(pool) => {
144                                let completed = migration.run(pool).await;
145                                match completed {
146                                    Ok(_) => {
147                                        info!("Migration Complete");
148                                    }
149                                    Err(e) => {
150                                        error!("SQLITE Migration Error: {:?}", e);
151                                    }
152                                }
153                            }
154                        },
155                        Err(e) => {
156                            error!("Migrations Failed: {:?}", e);
157                        }
158                    }
159                }
160            } else if migrate == "revert" {
161                //TODO:
162            }
163        }
164
165        DataSource::SQL(SqlDataSource {
166            pool,
167            config: sql_data_source_config.clone(),
168            subgraph_config,
169        })
170    }
171
172    pub async fn execute_operation<'a>(
173        data_source: &DataSource,
174        input: Document,
175        entity: ServiceEntityConfig,
176        resolver_type: ResolverType,
177        subgraph_config: &SubGraphConfig,
178        token_data: &Option<TokenData>,
179        has_selection_set: bool,
180    ) -> Result<Option<FieldValue<'a>>, async_graphql::Error> {
181        debug!("Executing SQL Operation");
182
183        let data_source = match data_source {
184            DataSource::SQL(ds) => ds,
185            _ => unreachable!(),
186        };
187
188        let entity_data_source = ServiceEntityConfig::get_entity_data_source(&entity);
189
190        let table;
191
192        // If the entity has a data source, use that table name
193        if entity_data_source.is_some() {
194            if entity_data_source.clone().unwrap().table.is_some() {
195                table = entity_data_source.unwrap().table.unwrap();
196            } else {
197                table = entity.name.to_string();
198            }
199        } else {
200            table = entity.name.to_string();
201        }
202
203        let query = SqlDataSource::create_query(
204            input.clone(),
205            resolver_type,
206            &table,
207            data_source.config.dialect.clone(),
208            &entity,
209            &subgraph_config,
210        )?;
211
212        let user_uuid = if token_data.is_some() {
213            Some(token_data.as_ref().unwrap().user_uuid.to_string())
214        } else {
215            None
216        };
217
218        // Return the result from the database as a FieldValue
219        match resolver_type {
220            ResolverType::FindOne => {
221                let result = services::Services::find_one(&data_source.pool, &query).await?;
222                let res = ResolverResponse {
223                    data: vec![FieldValue::owned_any(result)],
224                    meta: ResolverResponseMeta {
225                        request_id: uuid::Uuid::new_v4().to_string(),
226                        service_name: subgraph_config.service.name.clone(),
227                        service_version: subgraph_config.service.version.clone(),
228                        executed_at: chrono::Utc::now()
229                            .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
230                        count: 1,
231                        total_count: 1,
232                        page: 1,
233                        total_pages: 1,
234                        user_uuid,
235                    },
236                };
237                Ok(Some(FieldValue::owned_any(res)))
238            }
239            ResolverType::FindMany => {
240                let (entities, total_count) =
241                    services::Services::find_many(&data_source.pool, &query, &has_selection_set)
242                        .await?;
243                let count = entities.len();
244                let opts_doc = if input.clone().get("opts").is_some() {
245                    trace!("opts: {:?}", input.get("opts").unwrap());
246                    to_document(input.get("opts").unwrap()).unwrap()
247                } else {
248                    let mut d = Document::new();
249                    d.insert("per_page", 10);
250                    d.insert("page", 1);
251                    trace!("created opts: {:?}", d);
252                    d
253                };
254                trace!("opts_doc: {:?}", opts_doc);
255                let page = opts_doc
256                    .get_i64("page")
257                    .unwrap_or(opts_doc.get_i32("page").unwrap_or(1) as i64);
258                let per_page = opts_doc
259                    .get_i64("per_page")
260                    .unwrap_or(opts_doc.get_i32("per_page").unwrap_or(10) as i64);
261                trace!("per_page: {:?}", per_page);
262                let res = ResolverResponse {
263                    data: entities
264                        .into_iter()
265                        .map(|row| FieldValue::owned_any(row))
266                        .collect(),
267                    meta: ResolverResponseMeta {
268                        request_id: uuid::Uuid::new_v4().to_string(),
269                        service_name: subgraph_config.service.name.clone(),
270                        service_version: subgraph_config.service.version.clone(),
271                        executed_at: chrono::Utc::now()
272                            .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
273                        count: count as i64,
274                        total_count: total_count.0,
275                        page,
276                        total_pages: if per_page == -1 {
277                            1
278                        } else {
279                            (total_count.0 / per_page) + 1
280                        },
281                        user_uuid,
282                    },
283                };
284
285                Ok(Some(FieldValue::owned_any(res)))
286            }
287            ResolverType::CreateOne => {
288                let result = services::Services::create_one(
289                    &entity,
290                    &data_source.pool,
291                    &query,
292                    data_source.config.dialect.clone(),
293                    &subgraph_config,
294                )
295                .await?;
296
297                let res = ResolverResponse {
298                    data: vec![FieldValue::owned_any(result)],
299                    meta: ResolverResponseMeta {
300                        request_id: uuid::Uuid::new_v4().to_string(),
301                        service_name: subgraph_config.service.name.clone(),
302                        service_version: subgraph_config.service.version.clone(),
303                        executed_at: chrono::Utc::now()
304                            .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
305                        count: 1,
306                        total_count: 1,
307                        page: 1,
308                        total_pages: 1,
309                        user_uuid,
310                    },
311                };
312
313                Ok(Some(FieldValue::owned_any(res)))
314            }
315            ResolverType::UpdateOne => {
316                let result =
317                    services::Services::update_one(&entity, &data_source.pool, &query).await?;
318                let res = ResolverResponse {
319                    data: vec![FieldValue::owned_any(result)],
320                    meta: ResolverResponseMeta {
321                        request_id: uuid::Uuid::new_v4().to_string(),
322                        service_name: subgraph_config.service.name.clone(),
323                        service_version: subgraph_config.service.version.clone(),
324                        executed_at: chrono::Utc::now()
325                            .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
326                        count: 1,
327                        total_count: 1,
328                        page: 1,
329                        total_pages: 1,
330                        user_uuid,
331                    },
332                };
333                Ok(Some(FieldValue::owned_any(res)))
334            }
335            ResolverType::UpdateMany => {
336                let results =
337                    services::Services::update_many(&entity, &data_source.pool, &query).await?;
338                let count = results.len();
339                let res = ResolverResponse {
340                    data: results
341                        .into_iter()
342                        .map(|row| FieldValue::owned_any(row))
343                        .collect(),
344                    meta: ResolverResponseMeta {
345                        request_id: uuid::Uuid::new_v4().to_string(),
346                        service_name: subgraph_config.service.name.clone(),
347                        service_version: subgraph_config.service.version.clone(),
348                        executed_at: chrono::Utc::now()
349                            .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
350                        count: count as i64,
351                        total_count: count as i64,
352                        page: 1,
353                        total_pages: 1,
354                        user_uuid,
355                    },
356                };
357                Ok(Some(FieldValue::owned_any(res)))
358            }
359            _ => panic!("Invalid resolver type"),
360        }
361    }
362}