tensor_trade_stream/
lib.rs

1use async_tungstenite::tungstenite::{
2    client::IntoClientRequest,
3    http::{header, HeaderValue},
4    Message,
5};
6use eyre::Result;
7use futures::StreamExt;
8use graphql_client::GraphQLQuery;
9use graphql_ws_client::{
10    graphql::{GraphQLClient, StreamingOperation},
11    AsyncWebsocketClient, GraphQLClientClientBuilder, SubscriptionStream,
12};
13
14pub mod queries;
15mod tokio_spawner;
16
17use tokio_spawner::TokioSpawner;
18
19pub type TransactionQuery = queries::NewTransactionTV2;
20pub type TensorswapOrderUpdateQuery = queries::TswapOrderUpdate;
21pub type TensorswapOrderUpdateAllQuery = queries::TswapOrderUpdateAll;
22
23pub type TransactionVariables = queries::new_transaction_tv2::Variables;
24pub type TensorswapOrderUpdateVariables = queries::tswap_order_update::Variables;
25pub type TensorswapOrderUpdateAllVariables = queries::tswap_order_update_all::Variables;
26
27pub type TransactionResponse = queries::new_transaction_tv2::NewTransactionTv2NewTransactionTv2;
28pub type TensorswapOrderUpdateResponse =
29    queries::tswap_order_update::TswapOrderUpdateTswapOrderUpdate;
30pub type TensorswapOrderUpdateAllResponse =
31    queries::tswap_order_update_all::TswapOrderUpdateAllTswapOrderUpdateAll;
32
33pub async fn subscribe<T: GraphQLQuery + Send + Sync + Unpin + 'static>(
34    api_key: &str,
35    variables: T::Variables,
36) -> Result<(
37    AsyncWebsocketClient<GraphQLClient, Message>,
38    SubscriptionStream<GraphQLClient, StreamingOperation<T>>,
39)>
40where
41    <T as GraphQLQuery>::Variables: Send + Sync + Unpin,
42    <T as GraphQLQuery>::ResponseData: std::fmt::Debug,
43{
44    let mut request = "wss://api.tensor.so/graphql".into_client_request()?;
45    request.headers_mut().insert(
46        header::SEC_WEBSOCKET_PROTOCOL,
47        HeaderValue::from_str("graphql-transport-ws")?,
48    );
49    request
50        .headers_mut()
51        .insert("X-TENSOR-API-KEY", HeaderValue::from_str(api_key)?);
52
53    let (connection, _) = async_tungstenite::tokio::connect_async(request).await?;
54
55    let (sink, stream) = connection.split::<Message>();
56
57    let mut client = GraphQLClientClientBuilder::new()
58        .build(stream, sink, TokioSpawner::current())
59        .await?;
60
61    let stream = client
62        .streaming_operation(StreamingOperation::<T>::new(variables))
63        .await?;
64
65    Ok((client, stream))
66}