architect_sdk/marketdata/
l2_client.rs

1use super::LevelBook;
2use crate::synced::{Synced, SyncedHandle};
3use anyhow::{anyhow, bail, Result};
4use architect_api::{
5    grpc::service::marketdata_client::MarketdataClient, marketdata::*,
6    symbology::MarketdataVenue, utils::sequence::SequenceIdAndNumber,
7};
8use arcstr::ArcStr;
9use futures::StreamExt;
10use log::{debug, error};
11use parking_lot::{MappedMutexGuard, Mutex, MutexGuard};
12use std::{
13    sync::{Arc, Weak},
14    time::Duration,
15};
16use tonic::{transport::Channel, Streaming};
17
18/// L2 book client for a single symbol.  No retries/reconnection logic; it just
19/// subscribes to and maintains the book.
20///
21/// To use this, call `L2Client::connect` to construct a client, drive its
22/// `next` method to apply updates, and use the accessors on `L2Client` to access
23/// the current state of the book.
24///
25/// If you need multiple readers/consumers of this client, use `L2Client::handle()`
26/// to create a cheaply cloneable handle to pass around.
27pub struct L2Client {
28    venue: Arc<Option<MarketdataVenue>>,
29    symbol: ArcStr,
30    updates: Streaming<L2BookUpdate>,
31    state: Arc<Mutex<L2ClientState>>,
32    ready: Synced<bool>,
33    alive: Arc<()>,
34}
35
36impl L2Client {
37    pub async fn new(
38        channel: Channel,
39        symbol: String,
40        venue: Option<MarketdataVenue>,
41    ) -> Result<Self> {
42        let mut client = MarketdataClient::new(channel);
43        let mut updates = client
44            .subscribe_l2_book_updates(SubscribeL2BookUpdatesRequest {
45                venue: venue.clone(),
46                symbol: symbol.clone(),
47            })
48            .await?
49            .into_inner();
50        // Simple, non-buffering version of the client; we trust the server to send
51        // us the snapshot first, and then the diffs in sequence order.  If that's
52        // not the case, bail.
53        let first_update = updates.next().await.ok_or(anyhow!("no first update"))??;
54        let state = match first_update {
55            L2BookUpdate::Snapshot(snap) => {
56                debug!("subscribed to stream, first update: {:?}", snap);
57                L2ClientState {
58                    sequence: snap.sequence,
59                    book: LevelBook::of_l2_book_snapshot(snap)?,
60                }
61            }
62            L2BookUpdate::Diff(..) => {
63                bail!("received diff before snapshot on L2 book update stream");
64            }
65        };
66        Ok(Self {
67            venue: Arc::new(venue),
68            symbol: symbol.into(),
69            updates,
70            state: Arc::new(Mutex::new(state)),
71            ready: Synced::new(true), // already got snapshot
72            alive: Arc::new(()),
73        })
74    }
75
76    pub async fn connect<D>(
77        endpoint: D,
78        symbol: String,
79        venue: Option<MarketdataVenue>,
80    ) -> Result<Self>
81    where
82        D: TryInto<tonic::transport::Endpoint>,
83        D::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
84    {
85        let mut client = MarketdataClient::connect(endpoint).await?;
86        let mut updates = client
87            .subscribe_l2_book_updates(SubscribeL2BookUpdatesRequest {
88                venue: venue.clone(),
89                symbol: symbol.clone(),
90            })
91            .await?
92            .into_inner();
93        // Simple, non-buffering version of the client; we trust the server to send
94        // us the snapshot first, and then the diffs in sequence order.  If that's
95        // not the case, bail.
96        let first_update = updates.next().await.ok_or(anyhow!("no first update"))??;
97        let state = match first_update {
98            L2BookUpdate::Snapshot(snap) => {
99                debug!("subscribed to stream, first update: {:?}", snap);
100                L2ClientState {
101                    sequence: snap.sequence,
102                    book: LevelBook::of_l2_book_snapshot(snap)?,
103                }
104            }
105            L2BookUpdate::Diff(..) => {
106                bail!("received diff before snapshot on L2 book update stream");
107            }
108        };
109        Ok(Self {
110            venue: Arc::new(venue),
111            symbol: symbol.into(),
112            updates,
113            state: Arc::new(Mutex::new(state)),
114            ready: Synced::new(true), // already got snapshot
115            alive: Arc::new(()),
116        })
117    }
118
119    pub async fn next(&mut self) -> Option<SequenceIdAndNumber> {
120        let update = match self.updates.next().await? {
121            Ok(update) => update,
122            Err(e) => {
123                error!("error on L2 book update stream for {}: {:?}", self.symbol, e);
124                return None;
125            }
126        };
127        match self.apply_update(update) {
128            Ok(sin) => Some(sin),
129            Err(e) => {
130                error!("error applying L2 book update for {}: {:?}", self.symbol, e);
131                None
132            }
133        }
134    }
135
136    pub fn handle(&self) -> L2ClientHandle {
137        L2ClientHandle {
138            venue: self.venue.clone(),
139            symbol: self.symbol.clone(),
140            state: self.state.clone(),
141            ready: self.ready.handle(),
142            alive: Arc::downgrade(&self.alive),
143        }
144    }
145
146    pub fn sequence(&self) -> SequenceIdAndNumber {
147        self.state.lock().sequence
148    }
149
150    pub fn book(&self) -> MappedMutexGuard<'_, LevelBook> {
151        let guard = self.state.lock();
152        MutexGuard::map(guard, |state| &mut state.book)
153    }
154
155    pub(super) fn apply_update(
156        &mut self,
157        update: L2BookUpdate,
158    ) -> Result<SequenceIdAndNumber> {
159        let mut state = self.state.lock();
160        state.apply_update(update)
161    }
162}
163
164/// Handle to an `L2Client` that can be cheaply cloned.
165///
166/// If still waiting on the first snapshot, `is_ready` will return false.
167/// If the driving `L2Client` is dropped, `is_alive` will return false.
168///
169/// In either case, accessor functions will return `None`.
170#[derive(Clone)]
171pub struct L2ClientHandle {
172    pub venue: Arc<Option<MarketdataVenue>>,
173    pub symbol: ArcStr,
174    pub(super) state: Arc<Mutex<L2ClientState>>,
175    pub(super) ready: SyncedHandle<bool>,
176    pub(super) alive: Weak<()>,
177}
178
179impl L2ClientHandle {
180    pub fn is_ready(&self) -> bool {
181        self.ready.is_synced()
182    }
183
184    pub async fn wait_ready(&mut self, timeout: Option<Duration>) -> Result<()> {
185        self.ready.wait_synced(timeout).await
186    }
187
188    pub fn is_alive(&self) -> bool {
189        self.alive.upgrade().is_some()
190    }
191
192    pub fn sequence(&self) -> Option<SequenceIdAndNumber> {
193        if self.is_ready() && self.is_alive() {
194            Some(self.state.lock().sequence)
195        } else {
196            None
197        }
198    }
199
200    pub fn book(&self) -> Option<MappedMutexGuard<'_, LevelBook>> {
201        if self.is_ready() && self.is_alive() {
202            let guard = self.state.lock();
203            Some(MutexGuard::map(guard, |state| &mut state.book))
204        } else {
205            None
206        }
207    }
208}
209
210#[derive(Default)]
211pub(super) struct L2ClientState {
212    pub sequence: SequenceIdAndNumber,
213    pub book: LevelBook,
214}
215
216impl L2ClientState {
217    pub(super) fn apply_update(
218        &mut self,
219        update: L2BookUpdate,
220    ) -> Result<SequenceIdAndNumber> {
221        match update {
222            L2BookUpdate::Snapshot(snap) => {
223                debug!("processing new snapshot: {:?}", snap);
224                self.sequence = snap.sequence;
225                self.book = LevelBook::of_l2_book_snapshot(snap)?;
226                Ok(self.sequence)
227            }
228            L2BookUpdate::Diff(diff) => {
229                let L2BookDiff { sequence, ref bids, ref asks, .. } = diff;
230                if !sequence.is_next_in_sequence(&self.sequence) {
231                    bail!(
232                        "feed sequence numbers out of sync: expected {}, got {}",
233                        self.sequence,
234                        sequence
235                    );
236                }
237                self.sequence = sequence;
238                self.book.timestamp = diff
239                    .timestamp()
240                    .ok_or_else(|| anyhow!("invalid timestamp on book update"))?;
241                for (price, size) in bids {
242                    if size.is_zero() {
243                        let _ = self.book.buy.remove(price);
244                    } else {
245                        let _ = self.book.buy.insert(*price, *size);
246                    }
247                }
248                for (price, size) in asks {
249                    if size.is_zero() {
250                        let _ = self.book.sell.remove(price);
251                    } else {
252                        let _ = self.book.sell.insert(*price, *size);
253                    }
254                }
255                Ok(self.sequence)
256            }
257        }
258    }
259}