pyth-lazer-client 21.0.0

A Rust client for Pyth Lazer
Documentation
use std::time::Duration;

use crate::{
    CHANNEL_CAPACITY,
    backoff::{PythLazerExponentialBackoff, PythLazerExponentialBackoffBuilder},
    merkle_ws_connection::cache_key,
    resilient_merkle_ws_connection::PythLazerResilientMerkleWSConnection,
};
use anyhow::{Result, bail};
use backoff::ExponentialBackoff;
use pyth_lazer_protocol::api::SignedMerkleRoot;
use tokio::sync::mpsc::{self, error::TrySendError};
use tracing::{error, warn};
use ttl_cache::TtlCache;
use url::Url;

const DEDUP_CACHE_SIZE: usize = 100_000;
const DEDUP_TTL: Duration = Duration::from_secs(10);

const DEFAULT_ENDPOINTS: [&str; 2] = [
    "wss://pyth-lazer-0.dourolabs.app/v1/merkle/root/stream",
    "wss://pyth-lazer-1.dourolabs.app/v1/merkle/root/stream",
];
const DEFAULT_NUM_CONNECTIONS: usize = 4;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);

pub struct PythLazerMerkleStreamClient {
    endpoints: Vec<Url>,
    access_token: String,
    num_connections: usize,
    _ws_connections: Vec<PythLazerResilientMerkleWSConnection>,
    backoff: ExponentialBackoff,
    timeout: Duration,
    channel_capacity: usize,
}

impl PythLazerMerkleStreamClient {
    pub fn new(
        endpoints: Vec<Url>,
        access_token: String,
        num_connections: usize,
        backoff: PythLazerExponentialBackoff,
        timeout: Duration,
        channel_capacity: usize,
    ) -> Result<Self> {
        if endpoints.is_empty() {
            bail!("At least one endpoint must be provided");
        }
        Ok(Self {
            endpoints,
            access_token,
            num_connections,
            _ws_connections: Vec::with_capacity(num_connections),
            backoff: backoff.into(),
            timeout,
            channel_capacity,
        })
    }

    pub async fn start(&mut self) -> Result<mpsc::Receiver<SignedMerkleRoot>> {
        let (sender, receiver) = mpsc::channel::<SignedMerkleRoot>(self.channel_capacity);
        let (ws_connection_sender, mut ws_connection_receiver) =
            mpsc::channel::<SignedMerkleRoot>(CHANNEL_CAPACITY);

        for i in 0..self.num_connections {
            let endpoint = self.endpoints[i % self.endpoints.len()].clone();
            let connection = PythLazerResilientMerkleWSConnection::new(
                endpoint,
                self.access_token.clone(),
                self.backoff.clone(),
                self.timeout,
                ws_connection_sender.clone(),
            );
            self._ws_connections.push(connection);
        }

        let mut seen_updates = TtlCache::new(DEDUP_CACHE_SIZE);

        tokio::spawn(async move {
            while let Some(root) = ws_connection_receiver.recv().await {
                let key = cache_key(&root);
                if seen_updates.contains_key(&key) {
                    continue;
                }
                seen_updates.insert(key, true, DEDUP_TTL);

                match sender.try_send(root) {
                    Ok(_) => (),
                    Err(TrySendError::Full(r)) => {
                        warn!("Sender channel is full, responses will be delayed");
                        if sender.send(r).await.is_err() {
                            error!("Sender channel is closed, stopping merkle client");
                        }
                    }
                    Err(TrySendError::Closed(_)) => {
                        error!("Sender channel is closed, stopping merkle client");
                    }
                }
            }
        });

        Ok(receiver)
    }
}

pub struct PythLazerMerkleStreamClientBuilder {
    endpoints: Vec<Url>,
    access_token: String,
    num_connections: usize,
    backoff: PythLazerExponentialBackoff,
    timeout: Duration,
    channel_capacity: usize,
}

impl PythLazerMerkleStreamClientBuilder {
    pub fn new(access_token: String) -> Self {
        Self {
            endpoints: DEFAULT_ENDPOINTS
                .iter()
                .map(|&s| s.parse().unwrap())
                .collect(),
            access_token,
            num_connections: DEFAULT_NUM_CONNECTIONS,
            backoff: PythLazerExponentialBackoffBuilder::default().build(),
            timeout: DEFAULT_TIMEOUT,
            channel_capacity: CHANNEL_CAPACITY,
        }
    }

    pub fn with_endpoints(mut self, endpoints: Vec<Url>) -> Self {
        self.endpoints = endpoints;
        self
    }

    pub fn with_num_connections(mut self, num_connections: usize) -> Self {
        self.num_connections = num_connections;
        self
    }

    pub fn with_backoff(mut self, backoff: PythLazerExponentialBackoff) -> Self {
        self.backoff = backoff;
        self
    }

    pub fn with_timeout(mut self, timeout: Duration) -> Self {
        self.timeout = timeout;
        self
    }

    pub fn with_channel_capacity(mut self, channel_capacity: usize) -> Self {
        self.channel_capacity = channel_capacity;
        self
    }

    pub fn build(self) -> Result<PythLazerMerkleStreamClient> {
        PythLazerMerkleStreamClient::new(
            self.endpoints,
            self.access_token,
            self.num_connections,
            self.backoff,
            self.timeout,
            self.channel_capacity,
        )
    }
}