opensea_stream/
lib.rs

1#![deny(missing_docs)]
2
3//! Crate for receiving updates from the [OpenSea Stream API](https://docs.opensea.io/reference/stream-api-overview).
4//! This crate is a thin wrapper over [`phyllo`] with a few convenience functions and struct definitions for the event schema.
5//! It is recommended that you also read the documentation of [`phyllo`] to understand the Phoenix protocol which delivers these messages.
6//!
7//! Events that happen on Solana (and thus carry Solana addresses) are not supported for now.
8//!
9//! # Example
10//! The following example prints all listings of items in the `wandernauts` collection as they are created.
11//! ```no_run
12//! # use opensea_stream::{client, schema, subscribe_to, Collection, Network};
13//! # use phyllo::message::Payload;
14//! #[tokio::main]
15//! async fn main() -> anyhow::Result<()> {
16//!     let mut client = client(Network::Mainnet, "YOUR_API_KEY_HERE").await;
17//!
18//!     // Subscribe to a collection. Note that you must all subscribe to all events
19//!     // in the collection; filtering is your responsibility (see below).
20//!     let (handler, mut subscription) = subscribe_to(
21//!         &mut client,
22//!         Collection::Collection("wandernauts".to_string()),
23//!     )
24//!     .await?;
25//!
26//!     // To unsubscribe:
27//!     // handler.close().await?;
28//!
29//!     loop {
30//!         // The message received from the channel is a raw message of the Phoenix protocol.
31//!         // It may or may not contain a payload.
32//!         let event = match subscription.recv().await?.into_custom_payload() {
33//!             Some(v) => v,
34//!             None => {
35//!                 eprintln!("unexpected message");
36//!                 continue;
37//!             }
38//!         };
39//!
40//!         // Only print item listing events.
41//!         if let schema::Payload::ItemListed(listing) = event.payload {
42//!             println!("{:?}", listing);
43//!         }
44//!     }
45//! }
46//! ```
47//! # Features
48//! `rustls-tls-native-roots` (which uses [`rustls-native-certs`](https://crates.io/crates/rustls-native-certs)
49//! for root certificates) is enabled by default. To use `rustls-tls-webpki-roots` ([`webpki-roots`](https://crates.io/crates/webpki-roots))
50//! instead, include this in your `Cargo.toml`:
51//! ```toml
52//! opensea-stream = { version = "0.1", default-features = false, features = ["rustls-tls-webpki-roots"] }
53//! ```
54
55use phyllo::{
56    channel::{ChannelBuilder, ChannelHandler},
57    error::RegisterChannelError,
58    message::Message,
59    socket::{SocketBuilder, SocketHandler},
60};
61use schema::StreamEvent;
62use serde_json::Value;
63use tokio::sync::broadcast;
64use url::Url;
65
66pub use phyllo;
67
68mod protocol;
69/// Payload schema for messages received from the websocket.
70pub mod schema;
71
72pub use protocol::*;
73
74/// Creates a client.
75pub async fn client(network: Network, token: &str) -> SocketHandler<Collection> {
76    let mut network: Url = Url::from(network);
77    network.query_pairs_mut().append_pair("token", token);
78    SocketBuilder::new(network).build().await
79}
80
81/// Subscribes to all the events of a particular [`Collection`].
82pub async fn subscribe_to(
83    socket: &mut SocketHandler<Collection>,
84    collection: Collection,
85) -> Result<
86    (
87        ChannelHandler<Collection, Event, Value, StreamEvent>,
88        broadcast::Receiver<Message<Collection, Event, Value, StreamEvent>>,
89    ),
90    RegisterChannelError,
91> {
92    socket.channel(ChannelBuilder::new(collection)).await
93}
94
95/// Subscribes to all the events of a particular [`Collection`] using
96/// a custom configuration.
97pub async fn subscribe_to_with_config(
98    socket: &mut SocketHandler<Collection>,
99    channel_builder: ChannelBuilder<Collection>,
100) -> Result<
101    (
102        ChannelHandler<Collection, Event, Value, StreamEvent>,
103        broadcast::Receiver<Message<Collection, Event, Value, StreamEvent>>,
104    ),
105    RegisterChannelError,
106> {
107    socket.channel(channel_builder).await
108}