Function async_graphql_warp::graphql_subscription_upgrade[][src]

pub async fn graphql_subscription_upgrade<Query, Mutation, Subscription>(
    websocket: WebSocket,
    protocol: WebSocketProtocols,
    schema: Schema<Query, Mutation, Subscription>
) where
    Query: ObjectType + 'static,
    Mutation: ObjectType + 'static,
    Subscription: SubscriptionType + 'static, 

Handle the WebSocket subscription.

If you want to control the WebSocket subscription more finely, you can use this function, otherwise it is more convenient to use graphql_subscription.

Examples

use async_graphql::*;
use async_graphql_warp::*;
use warp::Filter;
use futures_util::stream::{Stream, StreamExt};
use std::time::Duration;

struct QueryRoot;

#[Object]
impl QueryRoot {
    async fn value(&self) -> i32 {
        // A GraphQL Object type must define one or more fields.
        100
    }
}

struct SubscriptionRoot;

#[Subscription]
impl SubscriptionRoot {
    async fn tick(&self) -> impl Stream<Item = String> {
        async_stream::stream! {
            let mut interval = tokio::time::interval(Duration::from_secs(1));
            loop {
                let n = interval.tick().await;
                yield format!("{}", n.elapsed().as_secs_f32());
            }
        }
    }
}

tokio::runtime::Runtime::new().unwrap().block_on(async {
    let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
    let filter = warp::ws()
        .and(graphql_protocol())
        .map(move |ws: warp::ws::Ws, protocol| {
            let schema = schema.clone();
            let reply = ws.on_upgrade( move |websocket| {
                graphql_subscription_upgrade(websocket, protocol, schema)
            });
            warp::reply::with_header(
               reply,
               "Sec-WebSocket-Protocol",
               protocol.sec_websocket_protocol(),
           )
        });
    warp::serve(filter).run(([0, 0, 0, 0], 8000)).await;
});