cala_server/server/
mod.rs1mod config;
2
3use async_graphql::*;
4use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
5use axum::{routing::get, Extension, Router};
6use axum_extra::headers::HeaderMap;
7use cala_ledger::CalaLedger;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tracing::instrument;
11
12use crate::{app::CalaApp, extension::*, graphql};
13
14pub use config::*;
15
16pub async fn run<Q: QueryExtensionMarker, M: MutationExtensionMarker>(
17 config: ServerConfig,
18 app: CalaApp,
19) -> anyhow::Result<()> {
20 let ledger = app.ledger().clone();
21 let schema = graphql::schema::<Q, M>(Some(app));
22
23 let app = Router::new()
24 .route(
25 "/graphql",
26 get(playground).post(axum::routing::post(graphql_handler::<Q, M>)),
27 )
28 .layer(Extension(schema))
29 .layer(Extension(ledger));
30
31 println!("Starting graphql server on port {}", config.port);
32 let listener =
33 tokio::net::TcpListener::bind(&std::net::SocketAddr::from(([0, 0, 0, 0], config.port)))
34 .await?;
35 axum::serve(listener, app.into_make_service()).await?;
36 Ok(())
37}
38
39#[instrument(name = "cala_server.graphql", skip_all, fields(error, error.level, error.message))]
40pub async fn graphql_handler<Q: QueryExtensionMarker, M: MutationExtensionMarker>(
41 headers: HeaderMap,
42 schema: Extension<Schema<graphql::CoreQuery<Q>, graphql::CoreMutation<M>, EmptySubscription>>,
43 Extension(ledger): Extension<CalaLedger>,
44 req: GraphQLRequest,
45) -> GraphQLResponse {
46 cala_tracing::http::extract_tracing(&headers);
47 let mut req = req.into_inner();
48 let op = match maybe_init_atomic_operation(&mut req, &ledger).await {
49 Err(e) => {
50 return async_graphql::Response::from_errors(vec![async_graphql::ServerError::new(
51 e.to_string(),
52 None,
53 )])
54 .into();
55 }
56 Ok(op) => op,
57 };
58 if let Some(ref op) = op {
59 req = req.data(Arc::clone(op));
60 }
61 let mut res = schema.execute(req).await;
62 if let Some(op) = op {
63 if res.errors.is_empty() {
64 if let Err(e) = Arc::into_inner(op)
65 .expect("Arc::into_inner")
66 .into_inner()
67 .commit()
68 .await
69 {
70 res.errors
71 .push(async_graphql::ServerError::new(e.to_string(), None));
72 res.data = async_graphql::Value::Null;
73 }
74 } else {
75 res.data = async_graphql::Value::Null;
78 }
79 }
80 res.into()
81}
82
83async fn playground() -> impl axum::response::IntoResponse {
84 axum::response::Html(async_graphql::http::playground_source(
85 async_graphql::http::GraphQLPlaygroundConfig::new("/graphql"),
86 ))
87}
88
89async fn maybe_init_atomic_operation(
90 req: &mut async_graphql::Request,
91 ledger: &CalaLedger,
92) -> Result<Option<Arc<Mutex<es_entity::DbOpWithTime<'static>>>>, cala_ledger::error::LedgerError> {
93 use async_graphql::parser::types::*;
94
95 let operation_name = req
96 .operation_name
97 .as_ref()
98 .map(|n| async_graphql::Name::new(n.clone()));
99 if let Ok(query) = req.parsed_query() {
100 let is_mutation = match (&query.operations, operation_name) {
101 (DocumentOperations::Single(op), _) => op.node.ty == OperationType::Mutation,
102 (DocumentOperations::Multiple(ops), _) if ops.len() == 1 => {
103 ops.values().next().expect("ops.next").node.ty == OperationType::Mutation
104 }
105 (DocumentOperations::Multiple(ops), Some(name)) if ops.get(&name).is_some() => {
106 ops.get(&name).expect("ops.get").node.ty == OperationType::Mutation
107 }
108 _ => false,
109 };
110 if is_mutation {
111 return Ok(Some(Arc::new(Mutex::new(ledger.begin_operation().await?))));
112 }
113 }
114 Ok(None)
115}