architect_sdk/
client.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//! General purpose client for Architect

#[cfg(feature = "grpc")]
use anyhow::{anyhow, Result};
#[cfg(feature = "grpc")]
use api::{
    external::{marketdata::*, symbology::*},
    grpc::json_service::{marketdata_client::*, symbology_client::*},
};
#[cfg(feature = "grpc")]
use hickory_resolver::{config::*, TokioAsyncResolver};
#[cfg(feature = "grpc")]
use tonic::codec::Streaming;

#[derive(Default, Debug)]
pub struct ArchitectClient {}

impl ArchitectClient {
    #[cfg(feature = "grpc")]
    pub async fn resolve_service(&self, domain_name: &str) -> Result<String> {
        let resolver =
            TokioAsyncResolver::tokio(ResolverConfig::default(), ResolverOpts::default());
        let records = resolver.srv_lookup(domain_name).await?;
        let rec = records
            .iter()
            .next()
            .ok_or_else(|| anyhow!("no SRV records found for domain: {domain_name}"))?;
        Ok(format!("dns://{}:{}", rec.target(), rec.port()))
    }

    /// Load symbology from the given endpoint into global memory.
    #[cfg(feature = "grpc")]
    pub async fn load_symbology_from(&self, endpoint: impl AsRef<str>) -> Result<()> {
        use crate::symbology::Txn;
        let mut client = SymbologyClient::connect(endpoint.as_ref().to_string()).await?;
        let snap =
            client.symbology_snapshot(SymbologySnapshotRequest {}).await?.into_inner();
        let mut txn = Txn::begin();
        for route in snap.routes {
            txn.add_route(route)?;
        }
        for venue in snap.venues {
            txn.add_venue(venue)?;
        }
        for product in snap.products {
            txn.add_product(product)?;
        }
        for market in snap.markets {
            txn.add_market(market)?;
        }
        txn.commit()?;
        Ok(())
    }

    #[cfg(feature = "grpc")]
    pub async fn subscribe_l1_book_snapshots_from(
        // NB alee: keeping this mut for now in case we mux clients
        &mut self,
        endpoint: impl AsRef<str>,
        // if None, subscribe to all L1 books for all markets available
        market_ids: Option<Vec<MarketId>>,
    ) -> Result<Streaming<L1BookSnapshot>> {
        let mut client = MarketdataClient::connect(endpoint.as_ref().to_string()).await?;
        let stream = client
            .subscribe_l1_book_snapshots(SubscribeL1BookSnapshotsRequest { market_ids })
            .await?
            .into_inner();
        Ok(stream)
    }
}