architect_sdk/marketdata/
l2_client.rs1use super::LevelBook;
2use crate::synced::{Synced, SyncedHandle};
3use anyhow::{anyhow, bail, Result};
4use architect_api::{
5 grpc::service::marketdata_client::MarketdataClient, marketdata::*,
6 symbology::MarketdataVenue, utils::sequence::SequenceIdAndNumber,
7};
8use arcstr::ArcStr;
9use futures::StreamExt;
10use log::{debug, error};
11use parking_lot::{MappedMutexGuard, Mutex, MutexGuard};
12use std::{
13 sync::{Arc, Weak},
14 time::Duration,
15};
16use tonic::{transport::Channel, Streaming};
17
18pub struct L2Client {
28 venue: Arc<Option<MarketdataVenue>>,
29 symbol: ArcStr,
30 updates: Streaming<L2BookUpdate>,
31 state: Arc<Mutex<L2ClientState>>,
32 ready: Synced<bool>,
33 alive: Arc<()>,
34}
35
36impl L2Client {
37 pub async fn new(
38 channel: Channel,
39 symbol: String,
40 venue: Option<MarketdataVenue>,
41 ) -> Result<Self> {
42 let mut client = MarketdataClient::new(channel);
43 let mut updates = client
44 .subscribe_l2_book_updates(SubscribeL2BookUpdatesRequest {
45 venue: venue.clone(),
46 symbol: symbol.clone(),
47 })
48 .await?
49 .into_inner();
50 let first_update = updates.next().await.ok_or(anyhow!("no first update"))??;
54 let state = match first_update {
55 L2BookUpdate::Snapshot(snap) => {
56 debug!("subscribed to stream, first update: {:?}", snap);
57 L2ClientState {
58 sequence: snap.sequence,
59 book: LevelBook::of_l2_book_snapshot(snap)?,
60 }
61 }
62 L2BookUpdate::Diff(..) => {
63 bail!("received diff before snapshot on L2 book update stream");
64 }
65 };
66 Ok(Self {
67 venue: Arc::new(venue),
68 symbol: symbol.into(),
69 updates,
70 state: Arc::new(Mutex::new(state)),
71 ready: Synced::new(true), alive: Arc::new(()),
73 })
74 }
75
76 pub async fn connect<D>(
77 endpoint: D,
78 symbol: String,
79 venue: Option<MarketdataVenue>,
80 ) -> Result<Self>
81 where
82 D: TryInto<tonic::transport::Endpoint>,
83 D::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
84 {
85 let mut client = MarketdataClient::connect(endpoint).await?;
86 let mut updates = client
87 .subscribe_l2_book_updates(SubscribeL2BookUpdatesRequest {
88 venue: venue.clone(),
89 symbol: symbol.clone(),
90 })
91 .await?
92 .into_inner();
93 let first_update = updates.next().await.ok_or(anyhow!("no first update"))??;
97 let state = match first_update {
98 L2BookUpdate::Snapshot(snap) => {
99 debug!("subscribed to stream, first update: {:?}", snap);
100 L2ClientState {
101 sequence: snap.sequence,
102 book: LevelBook::of_l2_book_snapshot(snap)?,
103 }
104 }
105 L2BookUpdate::Diff(..) => {
106 bail!("received diff before snapshot on L2 book update stream");
107 }
108 };
109 Ok(Self {
110 venue: Arc::new(venue),
111 symbol: symbol.into(),
112 updates,
113 state: Arc::new(Mutex::new(state)),
114 ready: Synced::new(true), alive: Arc::new(()),
116 })
117 }
118
119 pub async fn next(&mut self) -> Option<SequenceIdAndNumber> {
120 let update = match self.updates.next().await? {
121 Ok(update) => update,
122 Err(e) => {
123 error!("error on L2 book update stream for {}: {:?}", self.symbol, e);
124 return None;
125 }
126 };
127 match self.apply_update(update) {
128 Ok(sin) => Some(sin),
129 Err(e) => {
130 error!("error applying L2 book update for {}: {:?}", self.symbol, e);
131 None
132 }
133 }
134 }
135
136 pub fn handle(&self) -> L2ClientHandle {
137 L2ClientHandle {
138 venue: self.venue.clone(),
139 symbol: self.symbol.clone(),
140 state: self.state.clone(),
141 ready: self.ready.handle(),
142 alive: Arc::downgrade(&self.alive),
143 }
144 }
145
146 pub fn sequence(&self) -> SequenceIdAndNumber {
147 self.state.lock().sequence
148 }
149
150 pub fn book(&self) -> MappedMutexGuard<'_, LevelBook> {
151 let guard = self.state.lock();
152 MutexGuard::map(guard, |state| &mut state.book)
153 }
154
155 pub(super) fn apply_update(
156 &mut self,
157 update: L2BookUpdate,
158 ) -> Result<SequenceIdAndNumber> {
159 let mut state = self.state.lock();
160 state.apply_update(update)
161 }
162}
163
164#[derive(Clone)]
171pub struct L2ClientHandle {
172 pub venue: Arc<Option<MarketdataVenue>>,
173 pub symbol: ArcStr,
174 pub(super) state: Arc<Mutex<L2ClientState>>,
175 pub(super) ready: SyncedHandle<bool>,
176 pub(super) alive: Weak<()>,
177}
178
179impl L2ClientHandle {
180 pub fn is_ready(&self) -> bool {
181 self.ready.is_synced()
182 }
183
184 pub async fn wait_ready(&mut self, timeout: Option<Duration>) -> Result<()> {
185 self.ready.wait_synced(timeout).await
186 }
187
188 pub fn is_alive(&self) -> bool {
189 self.alive.upgrade().is_some()
190 }
191
192 pub fn sequence(&self) -> Option<SequenceIdAndNumber> {
193 if self.is_ready() && self.is_alive() {
194 Some(self.state.lock().sequence)
195 } else {
196 None
197 }
198 }
199
200 pub fn book(&self) -> Option<MappedMutexGuard<'_, LevelBook>> {
201 if self.is_ready() && self.is_alive() {
202 let guard = self.state.lock();
203 Some(MutexGuard::map(guard, |state| &mut state.book))
204 } else {
205 None
206 }
207 }
208}
209
210#[derive(Default)]
211pub(super) struct L2ClientState {
212 pub sequence: SequenceIdAndNumber,
213 pub book: LevelBook,
214}
215
216impl L2ClientState {
217 pub(super) fn apply_update(
218 &mut self,
219 update: L2BookUpdate,
220 ) -> Result<SequenceIdAndNumber> {
221 match update {
222 L2BookUpdate::Snapshot(snap) => {
223 debug!("processing new snapshot: {:?}", snap);
224 self.sequence = snap.sequence;
225 self.book = LevelBook::of_l2_book_snapshot(snap)?;
226 Ok(self.sequence)
227 }
228 L2BookUpdate::Diff(diff) => {
229 let L2BookDiff { sequence, ref bids, ref asks, .. } = diff;
230 if !sequence.is_next_in_sequence(&self.sequence) {
231 bail!(
232 "feed sequence numbers out of sync: expected {}, got {}",
233 self.sequence,
234 sequence
235 );
236 }
237 self.sequence = sequence;
238 self.book.timestamp = diff
239 .timestamp()
240 .ok_or_else(|| anyhow!("invalid timestamp on book update"))?;
241 for (price, size) in bids {
242 if size.is_zero() {
243 let _ = self.book.buy.remove(price);
244 } else {
245 let _ = self.book.buy.insert(*price, *size);
246 }
247 }
248 for (price, size) in asks {
249 if size.is_zero() {
250 let _ = self.book.sell.remove(price);
251 } else {
252 let _ = self.book.sell.insert(*price, *size);
253 }
254 }
255 Ok(self.sequence)
256 }
257 }
258 }
259}