Function async_graphql_warp::graphql_subscription_upgrade [−][src]
pub async fn graphql_subscription_upgrade<Query, Mutation, Subscription, S>(
websocket: S,
protocol: WebSocketProtocols,
schema: Schema<Query, Mutation, Subscription>
) where
Query: ObjectType + 'static,
Mutation: ObjectType + 'static,
Subscription: SubscriptionType + 'static,
S: Stream<Item = Result<Message, Error>> + Sink<Message>,
Expand description
Handle the WebSocket subscription.
If you want to control the WebSocket subscription more finely, you can use this function, otherwise it is more convenient to use graphql_subscription.
Examples
use async_graphql::*;
use async_graphql_warp::*;
use warp::Filter;
use futures_util::stream::{Stream, StreamExt};
use std::time::Duration;
struct QueryRoot;
#[Object]
impl QueryRoot {
async fn value(&self) -> i32 {
// A GraphQL Object type must define one or more fields.
100
}
}
struct SubscriptionRoot;
#[Subscription]
impl SubscriptionRoot {
async fn tick(&self) -> impl Stream<Item = String> {
async_stream::stream! {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
let n = interval.tick().await;
yield format!("{}", n.elapsed().as_secs_f32());
}
}
}
}
tokio::runtime::Runtime::new().unwrap().block_on(async {
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
let filter = warp::ws()
.and(graphql_protocol())
.map(move |ws: warp::ws::Ws, protocol| {
let schema = schema.clone();
let reply = ws.on_upgrade( move |websocket| {
graphql_subscription_upgrade(websocket, protocol, schema)
});
warp::reply::with_header(
reply,
"Sec-WebSocket-Protocol",
protocol.sec_websocket_protocol(),
)
});
warp::serve(filter).run(([0, 0, 0, 0], 8000)).await;
});