pub fn make_ws_filter<Query, Mutation, Subscription, CtxT, S, I>(
    schema: impl Into<Arc<RootNode<'static, Query, Mutation, Subscription, S>>>,
    init: I
) -> BoxedFilter<(impl Reply,)>
where Query: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static, Query::TypeInfo: Send + Sync, Mutation: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static, Mutation::TypeInfo: Send + Sync, Subscription: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static, Subscription::TypeInfo: Send + Sync, CtxT: Unpin + Send + Sync + 'static, S: ScalarValue + Send + Sync + 'static, I: Init<S, CtxT> + Clone + Send + Sync,
Expand description

Makes a filter for GraphQL subscriptions.

This filter auto-selects between the legacy graphql-ws GraphQL over WebSocket Protocol and the new graphql-transport-ws GraphQL over WebSocket Protocol, based on the Sec-Websocket-Protocol HTTP header value.

The schema argument is your juniper schema.

The init argument is used to provide the custom juniper::Context and additional configuration for connections. This can be a juniper_graphql_ws::ConnectionConfig if the context and configuration are already known, or it can be a closure that gets executed asynchronously whenever a client sends the subscription initialization message. Using a closure allows to perform an authentication based on the parameters provided by a client.

§Example

type UserId = String;
struct AppState(Vec<i64>);
#[derive(Clone)]
struct ExampleContext(Arc<AppState>, UserId);

struct QueryRoot;

#[graphql_object(context = ExampleContext)]
impl QueryRoot {
    fn say_hello(context: &ExampleContext) -> String {
        format!(
            "good morning {}, the app state is {:?}",
            context.1,
            context.0,
        )
    }
}

type StringsStream = Pin<Box<dyn Stream<Item = String> + Send>>;

struct SubscriptionRoot;

#[graphql_subscription(context = ExampleContext)]
impl SubscriptionRoot {
    async fn say_hellos(context: &ExampleContext) -> StringsStream {
        let mut interval = tokio::time::interval(Duration::from_secs(1));
        let context = context.clone();
        Box::pin(async_stream::stream! {
            let mut counter = 0;
            while counter < 5 {
                counter += 1;
                interval.tick().await;
                yield format!(
                    "{counter}: good morning {}, the app state is {:?}",
                     context.1,
                     context.0,
                )
            }
        })
    }
}

let schema = Arc::new(RootNode::new(QueryRoot, EmptyMutation::new(), SubscriptionRoot));
let app_state = Arc::new(AppState(vec![3, 4, 5]));
let app_state_for_ws = app_state.clone();

let context_extractor = warp::any()
    .and(warp::header::<String>("authorization"))
    .and(warp::any().map(move || app_state.clone()))
    .map(|auth_header: String, app_state: Arc<AppState>| {
        let user_id = auth_header; // we believe them
        ExampleContext(app_state, user_id)
    })
    .boxed();

let graphql_endpoint = (warp::path("graphql")
        .and(warp::post())
        .and(make_graphql_filter(schema.clone(), context_extractor)))
    .or(warp::path("subscriptions")
        .and(juniper_warp::subscriptions::make_ws_filter(
            schema,
            move |variables: juniper::Variables| {
                let user_id = variables
                    .get("authorization")
                    .map(ToString::to_string)
                    .unwrap_or_default(); // we believe them
                async move {
                    Ok::<_, Infallible>(ConnectionConfig::new(
                        ExampleContext(app_state_for_ws.clone(), user_id),
                    ))
                }
            },
        )));