jetstream_oxide/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
pub mod error;
pub mod events;
pub mod exports;

use std::io::{
    Cursor,
    Read,
};

use chrono::Utc;
use futures_util::stream::StreamExt;
use tokio::{
    net::TcpStream,
    task::JoinHandle,
};
use tokio_tungstenite::{
    connect_async,
    tungstenite::Message,
    MaybeTlsStream,
    WebSocketStream,
};
use url::Url;
use zstd::dict::DecoderDictionary;

use crate::{
    error::{
        ConfigValidationError,
        ConnectionError,
        JetstreamEventError,
    },
    events::JetstreamEvent,
};

/// The Jetstream endpoints officially provided by Bluesky themselves.
///
/// There are no guarantees that these endpoints will always be available, but you are free
/// to run your own Jetstream instance in any case.
pub enum DefaultJetstreamEndpoints {
    /// `jetstream1.us-east.bsky.network`
    USEastOne,
    /// `jetstream2.us-east.bsky.network`
    USEastTwo,
    /// `jetstream1.us-west.bsky.network`
    USWestOne,
    /// `jetstream2.us-west.bsky.network`
    USWestTwo,
}

impl From<DefaultJetstreamEndpoints> for String {
    fn from(endpoint: DefaultJetstreamEndpoints) -> Self {
        match endpoint {
            DefaultJetstreamEndpoints::USEastOne => {
                "wss://jetstream1.us-east.bsky.network/subscribe".to_owned()
            }
            DefaultJetstreamEndpoints::USEastTwo => {
                "wss://jetstream2.us-east.bsky.network/subscribe".to_owned()
            }
            DefaultJetstreamEndpoints::USWestOne => {
                "wss://jetstream1.us-west.bsky.network/subscribe".to_owned()
            }
            DefaultJetstreamEndpoints::USWestTwo => {
                "wss://jetstream2.us-west.bsky.network/subscribe".to_owned()
            }
        }
    }
}

/// The maximum number of wanted collections that can be requested on a single Jetstream connection.
const MAX_WANTED_COLLECTIONS: usize = 100;
/// The maximum number of wanted DIDs that can be requested on a single Jetstream connection.
const MAX_WANTED_DIDS: usize = 10_000;

/// The custom `zstd` dictionary used for decoding compressed Jetstream messages.
///
/// Sourced from the [official Bluesky Jetstream repo.](https://github.com/bluesky-social/jetstream/tree/main/pkg/models)
const JETSTREAM_ZSTD_DICTIONARY: &[u8] = include_bytes!("../zstd/dictionary");

/// A receiver channel for consuming Jetstream events.
pub type JetstreamReceiver = flume::Receiver<JetstreamEvent>;

/// An internal sender channel for sending Jetstream events to [JetstreamReceiver]'s.
type JetstreamSender = flume::Sender<JetstreamEvent>;

/// A wrapper connector type for working with a WebSocket connection to a Jetstream instance to
/// receive and consume events. See [JetstreamConnector::connect] for more info.
pub struct JetstreamConnector {
    /// The configuration for the Jetstream connection.
    config: JetstreamConfig,
}

pub enum JetstreamCompression {
    /// No compression, just raw plaintext JSON.
    None,
    /// Use the `zstd` compression algorithm, which can result in a ~56% smaller messages on
    /// average. See [here](https://github.com/bluesky-social/jetstream?tab=readme-ov-file#compression) for more info.
    Zstd,
}

impl From<JetstreamCompression> for bool {
    fn from(compression: JetstreamCompression) -> Self {
        match compression {
            JetstreamCompression::None => false,
            JetstreamCompression::Zstd => true,
        }
    }
}

pub struct JetstreamConfig {
    /// A Jetstream endpoint to connect to with a WebSocket Scheme i.e.
    /// `wss://jetstream1.us-east.bsky.network/subscribe`.
    pub endpoint: String,
    /// A list of collection [NSIDs](https://atproto.com/specs/nsid) to filter events for.
    ///
    /// An empty list will receive events for *all* collections.
    ///
    /// Regardless of desired collections, all subscribers receive
    /// [AccountEvent](events::account::AccountEvent) and
    /// [IdentityEvent](events::identity::Identity) events.
    pub wanted_collections: Vec<exports::Nsid>,
    /// A list of repo [DIDs](https://atproto.com/specs/did) to filter events for.
    ///
    /// An empty list will receive events for *all* repos, which is a lot of events!
    pub wanted_dids: Vec<exports::Did>,
    /// The compression algorithm to request and use for the WebSocket connection (if any).
    pub compression: JetstreamCompression,
    /// An optional timestamp to begin playback from.
    ///
    /// An absent cursor or a cursor from the future will result in live-tail operation.
    ///
    /// When reconnecting, use the time_us from your most recently processed event and maybe
    /// provide a negative buffer (i.e. subtract a few seconds) to ensure gapless playback.
    pub cursor: Option<chrono::DateTime<Utc>>,
}

impl Default for JetstreamConfig {
    fn default() -> Self {
        JetstreamConfig {
            endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
            wanted_collections: Vec::new(),
            wanted_dids: Vec::new(),
            compression: JetstreamCompression::None,
            cursor: None,
        }
    }
}

impl JetstreamConfig {
    /// Constructs a new endpoint URL with the given [JetstreamConfig] applied.
    pub fn construct_endpoint(&self, endpoint: &String) -> Result<Url, url::ParseError> {
        let did_search_query = self
            .wanted_dids
            .iter()
            .map(|s| ("wantedDids", s.to_string()));

        let collection_search_query = self
            .wanted_collections
            .iter()
            .map(|s| ("wantedCollections", s.to_string()));

        let compression = (
            "compress",
            match self.compression {
                JetstreamCompression::None => "false".to_owned(),
                JetstreamCompression::Zstd => "true".to_owned(),
            },
        );

        let cursor = self
            .cursor
            .map(|c| ("cursor", c.timestamp_micros().to_string()));

        let params = did_search_query
            .chain(collection_search_query)
            .chain(std::iter::once(compression))
            .chain(cursor.into_iter())
            .collect::<Vec<(&str, String)>>();

        Url::parse_with_params(&endpoint, params)
    }

    /// Validates the configuration to make sure it is within the limits of the Jetstream API.
    ///
    /// # Constants
    /// The following constants are used to validate the configuration and should only be changed
    /// if the Jetstream API has itself changed.
    /// - [MAX_WANTED_COLLECTIONS]
    /// - [MAX_WANTED_DIDS]
    pub fn validate(&self) -> Result<(), ConfigValidationError> {
        let collections = self.wanted_collections.len();
        let dids = self.wanted_dids.len();

        if collections > MAX_WANTED_COLLECTIONS {
            return Err(ConfigValidationError::TooManyWantedCollections(collections));
        }

        if dids > MAX_WANTED_DIDS {
            return Err(ConfigValidationError::TooManyDids(dids));
        }

        Ok(())
    }
}

impl JetstreamConnector {
    /// Create a Jetstream connector with a valid [JetstreamConfig].
    ///
    /// After creation, you can call [connect] to connect to the provided Jetstream instance.
    pub fn new(config: JetstreamConfig) -> Result<Self, ConfigValidationError> {
        // We validate the configuration here so any issues are caught early.
        config.validate()?;
        Ok(JetstreamConnector { config })
    }

    /// Connects to a Jetstream instance as defined in the [JetstreamConfig].
    ///
    /// A [JetstreamReceiver] is returned which can be used to respond to events. When all instances
    /// of this receiver are dropped, the connection and task are automatically closed.
    pub async fn connect(
        &self,
    ) -> Result<
        (
            JetstreamReceiver,
            JoinHandle<Result<(), JetstreamEventError>>,
        ),
        ConnectionError,
    > {
        // We validate the config again for good measure. Probably not necessary but it can't hurt.
        self.config
            .validate()
            .map_err(ConnectionError::InvalidConfig)?;

        // TODO: Run some benchmarks and look into using a bounded channel instead.
        let (send_channel, receive_channel) = flume::unbounded();

        let configured_endpoint = self
            .config
            .construct_endpoint(&self.config.endpoint)
            .map_err(ConnectionError::InvalidEndpoint)?;

        let (ws_stream, _) = connect_async(&configured_endpoint)
            .await
            .map_err(ConnectionError::WebSocketFailure)?;

        let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY);

        // TODO: Internally creating and returning a tokio task might not be the best idea(?)
        let handle = tokio::task::spawn(websocket_task(dict, ws_stream, send_channel));

        Ok((receive_channel, handle))
    }
}

/// The main task that handles the WebSocket connection and sends [JetstreamEvent]'s to any
/// receivers that are listening for them.
async fn websocket_task(
    dictionary: DecoderDictionary<'_>,
    ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
    send_channel: JetstreamSender,
) -> Result<(), JetstreamEventError> {
    // TODO: Use the write half to allow the user to change configuration settings on the fly.
    let (_, mut read) = ws.split();
    loop {
        if let Some(Ok(message)) = read.next().await {
            match message {
                Message::Text(json) => {
                    let event = serde_json::from_str::<JetstreamEvent>(&json)
                        .map_err(JetstreamEventError::ReceivedMalformedJSON)?;

                    if let Err(_) = send_channel.send(event) {
                        // We can assume that all receivers have been dropped, so we can close the
                        // connection and exit the task.
                        log::info!(
                            "All receivers for the Jetstream connection have been dropped, closing connection."
                        );
                        return Ok(());
                    }
                }
                Message::Binary(zstd_json) => {
                    let mut cursor = Cursor::new(zstd_json);
                    let mut decoder =
                        zstd::stream::Decoder::with_prepared_dictionary(&mut cursor, &dictionary)
                            .map_err(JetstreamEventError::CompressionDictionaryError)?;

                    let mut json = String::new();
                    decoder
                        .read_to_string(&mut json)
                        .map_err(JetstreamEventError::CompressionDecoderError)?;

                    let event = serde_json::from_str::<JetstreamEvent>(&json)
                        .map_err(JetstreamEventError::ReceivedMalformedJSON)?;

                    if let Err(_) = send_channel.send(event) {
                        // We can assume that all receivers have been dropped, so we can close the
                        // connection and exit the task.
                        log::info!(
                            "All receivers for the Jetstream connection have been dropped, closing connection..."
                        );
                        return Ok(());
                    }
                }
                _ => {}
            }
        }
    }
}