1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// This module defines a simple publish-subscribe structure, though one designed to run across the web
// The publishing and subscribing are done on different servers/functions

use async_stream::stream;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::sync::broadcast::{channel as create_channel, Sender};
use tokio_stream::Stream;

use crate::errors::*;

const MESSAGES_TO_BE_RETAINED: usize = 5;

#[derive(Serialize)]
struct GQLQueryBody<T: Serialize> {
    query: String,
    variables: T,
}

#[derive(Deserialize)]
struct GQLPublishResponse {
    data: PublishResponse,
}
#[derive(Deserialize)]
struct PublishResponse {
    publish: bool,
}

/// The system that publishes data from the queries/mutations system to the subscriptions server.
/// These communications are secured by a JWT specified in [`Options`](crate::Options).
/// This is automatically created from the [`Options`](crate::Options) and passed to all resolvers. You should never need to manually create it.
pub struct Publisher {
    client: Client,
    address: String,
    token: String,
}
impl Publisher {
    /// Creates a new publisher. This is done for you when you create the queries/mutations system, so you should never need to call this.
    pub fn new(hostname: String, port: String, endpoint: String, token: String) -> Result<Self> {
        let address = format!(
            "{hostname}:{port}{endpoint}", // The endpoint should start with '/'
            hostname = hostname,
            port = port,
            endpoint = endpoint
        );

        let client = Client::new();

        Ok(Self {
            client,
            address,
            token,
        })
    }

    /// Sends the given data to the subscriptions server on the given channel. In-depth information about this process is available in the book.
    /// You should use [serde] to serialize anything sent here as a string (this won't be done for you). It should then be deserialized in the
    /// appropriate subscription (which will listen for messages from here indirectly).
    /// This function will return an error if the subscriptions server was unavailable or didn't correctly acknowledge the request.
    /// # Example
    /// ```
    /// let publisher = ctx.data::<Publisher>()?; // Where `ctx` is the `async_graphql` context to a resolver.
    /// publisher.publish("new_user", user_json.to_string()).await?;
    /// ```
    pub async fn publish(&self, channel: &str, data: String) -> Result<()> {
        let client = &self.client;

        // Create the query body with a HashMap of variables
        let mut variables = HashMap::new();
        variables.insert("channel", channel.to_string());
        variables.insert("data", data);

        let body = GQLQueryBody {
            query: "
                mutation PublishData($channel: String!, $data: String!) {
                    publish(
                        channel: $channel,
                        data: $data
                    )
                }
            "
            .to_string(),
            variables,
        };

        let res = client
            .post(&self.address)
            .json(&body)
            .header("Authorization", "Bearer ".to_string() + &self.token)
            .send()
            .await?;

        // Handle if the request wasn't successful on an HTTP level
        if res.status().to_string() != "200 OK" {
            bail!(ErrorKind::SubscriptionDataPublishFailed)
        }

        // Get the body out (data still stringified though, that's handled by resolvers)
        let body: GQLPublishResponse = serde_json::from_str(&res.text().await?)?;

        // Confirm nothing's gone wrong on a GraphQL level (basically only if we got `false` instead of `true`)
        match body.data.publish {
            true => Ok(()),
            _ => bail!(ErrorKind::SubscriptionDataPublishFailed),
        }
    }
}

// Everything from here down operates solely on the subscriptions server, and is stateful!
// Do NOT import these mechanisms in the serverless system!

// This is a traditional PubSub implementation using Tokio's broadcast system
// This doesn't need to be made available because it's entirely internal
pub struct PubSub {
    // A hash map of channels to their Tokio broadcasters
    channels: HashMap<String, Sender<String>>,
}
impl Default for PubSub {
    fn default() -> Self {
        Self {
            channels: HashMap::new(),
        }
    }
}
impl PubSub {
    // Gets a channel or creates a new one if needed
    fn get_channel(&mut self, channel: &str) -> Sender<String> {
        let channel_sender = self.channels.get(channel);
        let channel_sender = match channel_sender {
            Some(sub) => sub,
            None => {
                let (channel_sender, _receiver) = create_channel(MESSAGES_TO_BE_RETAINED);
                self.channels.insert(channel.to_string(), channel_sender); // We put a clone into the HashMap because broadcast can be multi-producer
                self.channels.get(channel).unwrap() // We just added it, we know more than the compiler
            }
        };

        channel_sender.clone()
    }

    pub fn subscribe(&mut self, channel: &str) -> impl Stream<Item = String> {
        let channel_sender = self.get_channel(channel);
        let mut receiver = channel_sender.subscribe();

        stream! {
            loop {
                let message = receiver.recv().await;
                match message {
                    Ok(message) => yield message,
                    _ => continue
                }
            }
        }
    }

    // Creates a new sender for a given channel name if one doesn't exist and then sends a message using it
    pub fn publish(&mut self, channel: &str, data: String) {
        let channel_sender = self.get_channel(channel);
        // This will fail only if there are now receivers, but we don't care if that's the case
        let _ = channel_sender.send(data);
    }

    // Drops the handle to a sender for the given channel
    // All receiver calls after this point will result in a closed channel error
    // This doesn't need to be explicitly called normally
    pub fn close_channel(&mut self, channel: &str) {
        self.channels.remove(channel);
    }
}