exc 0.7.3

The abstraction layer of exchanges.
Documentation
use std::sync::{Arc, RwLock};

use exc_core::{
    types::instrument::{FetchInstruments, InstrumentMeta, SubscribeInstruments},
    ExchangeError, Str,
};
use futures::{stream, StreamExt, TryStreamExt};
use rust_decimal::Decimal;
use tower::ServiceExt;

use crate::types::instrument::GetInstrument;

use super::{FetchInstrumentSvc, SubscribeInstrumentSvc};

mod inst;

#[derive(Default)]
pub(super) struct State {
    insts: RwLock<inst::InstState>,
}

impl State {
    pub(super) async fn init(
        self: Arc<Self>,
        mut fetch: FetchInstrumentSvc,
        tags: Vec<Str>,
    ) -> Result<(), ExchangeError> {
        let mut finished = false;
        while !finished {
            let mut stream = fetch
                .ready()
                .await?
                .call_all(stream::iter(tags.iter().cloned()).map(|tag| FetchInstruments { tag }))
                .boxed()
                .try_flatten();
            while let Some(meta) = stream.next().await {
                match meta {
                    Ok(meta) => {
                        self.insts.write().unwrap().insert(meta);
                    }
                    Err(err) => {
                        tracing::error!(%err, "init; fetch instruments stream error");
                        break;
                    }
                }
            }
            finished = true;
        }
        Ok(())
    }

    pub(super) fn get_instrument(
        &self,
        req: &GetInstrument,
    ) -> Option<Arc<InstrumentMeta<Decimal>>> {
        tracing::debug!(symbol=%req.symbol, "getting instrument");
        self.insts.read().unwrap().get(&req.symbol).cloned()
    }

    pub(super) async fn watch_instruments(
        self: Arc<Self>,
        mut svc: SubscribeInstrumentSvc,
        tags: Vec<Str>,
    ) -> Result<(), ExchangeError> {
        loop {
            let mut stream = svc
                .ready()
                .await?
                .call_all(stream::iter(
                    tags.iter().cloned().map(|tag| SubscribeInstruments { tag }),
                ))
                .boxed()
                .try_flatten();
            while let Some(meta) = stream.next().await {
                match meta {
                    Ok(meta) => {
                        self.insts.write().unwrap().insert(meta);
                    }
                    Err(err) => {
                        tracing::error!(%err, "watch instruments; stream error");
                        break;
                    }
                }
            }
        }
    }
}