Function juniper_warp::subscriptions::make_ws_filter
source · pub fn make_ws_filter<Query, Mutation, Subscription, CtxT, S, I>(
schema: impl Into<Arc<RootNode<'static, Query, Mutation, Subscription, S>>>,
init: I
) -> BoxedFilter<(impl Reply,)>where
Query: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
Query::TypeInfo: Send + Sync,
Mutation: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
Mutation::TypeInfo: Send + Sync,
Subscription: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
Subscription::TypeInfo: Send + Sync,
CtxT: Unpin + Send + Sync + 'static,
S: ScalarValue + Send + Sync + 'static,
I: Init<S, CtxT> + Clone + Send + Sync,
Expand description
Makes a filter for GraphQL subscriptions.
This filter auto-selects between the
legacy graphql-ws
GraphQL over WebSocket Protocol and the
new graphql-transport-ws
GraphQL over WebSocket Protocol, based on the
Sec-Websocket-Protocol
HTTP header value.
The schema
argument is your juniper
schema.
The init
argument is used to provide the custom juniper::Context
and additional
configuration for connections. This can be a juniper_graphql_ws::ConnectionConfig
if the
context and configuration are already known, or it can be a closure that gets executed
asynchronously whenever a client sends the subscription initialization message. Using a
closure allows to perform an authentication based on the parameters provided by a client.
§Example
type UserId = String;
struct AppState(Vec<i64>);
#[derive(Clone)]
struct ExampleContext(Arc<AppState>, UserId);
struct QueryRoot;
#[graphql_object(context = ExampleContext)]
impl QueryRoot {
fn say_hello(context: &ExampleContext) -> String {
format!(
"good morning {}, the app state is {:?}",
context.1,
context.0,
)
}
}
type StringsStream = Pin<Box<dyn Stream<Item = String> + Send>>;
struct SubscriptionRoot;
#[graphql_subscription(context = ExampleContext)]
impl SubscriptionRoot {
async fn say_hellos(context: &ExampleContext) -> StringsStream {
let mut interval = tokio::time::interval(Duration::from_secs(1));
let context = context.clone();
Box::pin(async_stream::stream! {
let mut counter = 0;
while counter < 5 {
counter += 1;
interval.tick().await;
yield format!(
"{counter}: good morning {}, the app state is {:?}",
context.1,
context.0,
)
}
})
}
}
let schema = Arc::new(RootNode::new(QueryRoot, EmptyMutation::new(), SubscriptionRoot));
let app_state = Arc::new(AppState(vec![3, 4, 5]));
let app_state_for_ws = app_state.clone();
let context_extractor = warp::any()
.and(warp::header::<String>("authorization"))
.and(warp::any().map(move || app_state.clone()))
.map(|auth_header: String, app_state: Arc<AppState>| {
let user_id = auth_header; // we believe them
ExampleContext(app_state, user_id)
})
.boxed();
let graphql_endpoint = (warp::path("graphql")
.and(warp::post())
.and(make_graphql_filter(schema.clone(), context_extractor)))
.or(warp::path("subscriptions")
.and(juniper_warp::subscriptions::make_ws_filter(
schema,
move |variables: juniper::Variables| {
let user_id = variables
.get("authorization")
.map(ToString::to_string)
.unwrap_or_default(); // we believe them
async move {
Ok::<_, Infallible>(ConnectionConfig::new(
ExampleContext(app_state_for_ws.clone(), user_id),
))
}
},
)));