cala_server/server/
mod.rs

1mod config;
2
3use async_graphql::*;
4use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
5use axum::{routing::get, Extension, Router};
6use axum_extra::headers::HeaderMap;
7use cala_ledger::CalaLedger;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tracing::instrument;
11
12use crate::{app::CalaApp, extension::*, graphql};
13
14pub use config::*;
15
16pub async fn run<Q: QueryExtensionMarker, M: MutationExtensionMarker>(
17    config: ServerConfig,
18    app: CalaApp,
19) -> anyhow::Result<()> {
20    let ledger = app.ledger().clone();
21    let schema = graphql::schema::<Q, M>(Some(app));
22
23    let app = Router::new()
24        .route(
25            "/graphql",
26            get(playground).post(axum::routing::post(graphql_handler::<Q, M>)),
27        )
28        .layer(Extension(schema))
29        .layer(Extension(ledger));
30
31    println!("Starting graphql server on port {}", config.port);
32    let listener =
33        tokio::net::TcpListener::bind(&std::net::SocketAddr::from(([0, 0, 0, 0], config.port)))
34            .await?;
35    axum::serve(listener, app.into_make_service()).await?;
36    Ok(())
37}
38
39#[instrument(name = "cala_server.graphql", skip_all, fields(error, error.level, error.message))]
40pub async fn graphql_handler<Q: QueryExtensionMarker, M: MutationExtensionMarker>(
41    headers: HeaderMap,
42    schema: Extension<Schema<graphql::CoreQuery<Q>, graphql::CoreMutation<M>, EmptySubscription>>,
43    Extension(ledger): Extension<CalaLedger>,
44    req: GraphQLRequest,
45) -> GraphQLResponse {
46    cala_tracing::http::extract_tracing(&headers);
47    let mut req = req.into_inner();
48    let op = match maybe_init_atomic_operation(&mut req, &ledger).await {
49        Err(e) => {
50            return async_graphql::Response::from_errors(vec![async_graphql::ServerError::new(
51                e.to_string(),
52                None,
53            )])
54            .into();
55        }
56        Ok(op) => op,
57    };
58    if let Some(ref op) = op {
59        req = req.data(Arc::clone(op));
60    }
61    let mut res = schema.execute(req).await;
62    if let Some(op) = op {
63        if res.errors.is_empty() {
64            if let Err(e) = Arc::into_inner(op)
65                .expect("Arc::into_inner")
66                .into_inner()
67                .commit()
68                .await
69            {
70                res.errors
71                    .push(async_graphql::ServerError::new(e.to_string(), None));
72                res.data = async_graphql::Value::Null;
73            }
74        } else {
75            // For atomic operations, if any mutation fails, clear the data
76            // since nothing was actually committed to the database
77            res.data = async_graphql::Value::Null;
78        }
79    }
80    res.into()
81}
82
83async fn playground() -> impl axum::response::IntoResponse {
84    axum::response::Html(async_graphql::http::playground_source(
85        async_graphql::http::GraphQLPlaygroundConfig::new("/graphql"),
86    ))
87}
88
89async fn maybe_init_atomic_operation(
90    req: &mut async_graphql::Request,
91    ledger: &CalaLedger,
92) -> Result<Option<Arc<Mutex<es_entity::DbOpWithTime<'static>>>>, cala_ledger::error::LedgerError> {
93    use async_graphql::parser::types::*;
94
95    let operation_name = req
96        .operation_name
97        .as_ref()
98        .map(|n| async_graphql::Name::new(n.clone()));
99    if let Ok(query) = req.parsed_query() {
100        let is_mutation = match (&query.operations, operation_name) {
101            (DocumentOperations::Single(op), _) => op.node.ty == OperationType::Mutation,
102            (DocumentOperations::Multiple(ops), _) if ops.len() == 1 => {
103                ops.values().next().expect("ops.next").node.ty == OperationType::Mutation
104            }
105            (DocumentOperations::Multiple(ops), Some(name)) if ops.get(&name).is_some() => {
106                ops.get(&name).expect("ops.get").node.ty == OperationType::Mutation
107            }
108            _ => false,
109        };
110        if is_mutation {
111            return Ok(Some(Arc::new(Mutex::new(ledger.begin_operation().await?))));
112        }
113    }
114    Ok(None)
115}