1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
use self::{consolidated_level_book::ConsolidatedLevelBook, level_book::LevelBook};
/// Subscribe to book data
use crate::{
    config::{Common, Cpty},
    protocol::book::{MessageHeader, Snapshot, Updates},
    symbology::TradableProduct,
};
use anyhow::{anyhow, bail, Result};
use futures::channel::mpsc;
use fxhash::FxHashMap;
use log::warn;
use netidx::{
    pack::Pack,
    pool::Pooled,
    subscriber::{Dval, Event, SubId, UpdatesFlags, Value},
};
use std::ops::Deref;

pub mod consolidated_level_book;
pub mod level_book;

/// A subscription to book data
pub struct BookClient {
    book: LevelBook,
    subscription: Dval,
    synced: bool,
}

impl Deref for BookClient {
    type Target = LevelBook;

    fn deref(&self) -> &Self::Target {
        &self.book
    }
}

impl BookClient {
    /// returns true if the tradable product is using the legacy book format
    pub async fn detect_legacy(common: &Common, tp: TradableProduct) -> bool {
        let (path, _) = tp.subscriber_paths();
        let base = common.paths.qf_rt(Some(Cpty { venue: tp.venue, route: tp.route }));
        let path = base.append(&path).append("buy/0/price"); // new qfs will not have this
        let resolver = common.subscriber.resolver();
        match resolver.resolve([path.clone()]).await {
            Ok((pb, _)) => pb.len() > 0,
            Err(e) => {
                warn!("failed to resolve {}, {:?}", path, e);
                false
            }
        }
    }

    /// Subscribe to book data for the specified tradable product. You
    /// must receive the output of the specified up channel and call
    /// `process_event` for each event received with an id that
    /// matches the id of this subscription.
    pub fn new(
        common: &Common,
        tp: TradableProduct,
        up: mpsc::Sender<Pooled<Vec<(SubId, Event)>>>,
    ) -> Self {
        let (path, _) = tp.subscriber_paths();
        let base = common.paths.qf_rt(Some(Cpty { venue: tp.venue, route: tp.route }));
        let path = base.append(&path).append("book");
        let already = common.subscriber.is_subscribed_or_pending(&path);
        let subscription =
            common.subscriber.subscribe_updates(path, [(UpdatesFlags::empty(), up)]);
        if already || subscription.strong_count() > 1 {
            // if we are already subscribed then we need to ask for a
            // snapshot manually
            subscription.write(Value::Null);
        }
        Self { book: LevelBook::default(), subscription, synced: false }
    }

    /// Return the id of this subscription
    pub fn id(&self) -> SubId {
        self.subscription.id()
    }

    pub fn synced(&self) -> bool {
        self.synced
    }

    /// Process the specified book event, updating the book with it's
    /// contents.
    pub fn process_event(&mut self, ev: Event) -> Result<()> {
        match ev {
            Event::Update(Value::Bytes(mut buf)) => {
                let typ: MessageHeader = Pack::decode(&mut buf)?;
                match typ {
                    MessageHeader::Updates => {
                        if self.synced {
                            let updates: Updates = Pack::decode(&mut buf)?;
                            self.book.update(updates)
                        }
                    }
                    MessageHeader::Snapshot => {
                        let snap: Snapshot = Pack::decode(&mut buf)?;
                        self.synced = true;
                        self.book.update_from_snapshot(snap)
                    }
                }
            }
            // this is the default value before the book subscribes on the qf side
            Event::Update(Value::Null) | Event::Unsubscribed => (),
            e => bail!("book protocol error, invalid event {:?}", e),
        }
        Ok(())
    }
}

/// Subscriptions to multiple books consolidated into one
pub struct ConsolidatedBookClient {
    consolidated_book: ConsolidatedLevelBook,
    books: FxHashMap<SubId, (TradableProduct, BookClient)>,
}

impl Deref for ConsolidatedBookClient {
    type Target = ConsolidatedLevelBook;

    fn deref(&self) -> &Self::Target {
        &self.consolidated_book
    }
}

impl ConsolidatedBookClient {
    /// Subscribe to book data for the specified tradable products. You
    /// must receive the output of the specified up channel and call
    /// `process_event` for each event received.
    pub fn new(
        common: &Common,
        tps: Vec<TradableProduct>,
        up: mpsc::Sender<Pooled<Vec<(SubId, Event)>>>,
    ) -> Self {
        let mut books: FxHashMap<SubId, (TradableProduct, BookClient)> =
            FxHashMap::default();
        tps.iter().for_each(|tp| {
            let client = BookClient::new(common, *tp, up.clone());
            books.insert(client.id(), (*tp, client));
        });
        Self { consolidated_book: ConsolidatedLevelBook::default(), books }
    }

    /// Process the specified book event, updating the indivudal book and
    /// consolidated book with it contents.
    pub fn process_event(&mut self, sub_id: SubId, ev: Event) -> Result<()> {
        let (tp, book_client) = self
            .books
            .get_mut(&sub_id)
            .ok_or_else(|| anyhow!("missing book for sub_id: {:?}", sub_id))?;
        book_client.process_event(ev.clone())?;
        match ev {
            Event::Update(Value::Bytes(mut buf)) => {
                let typ: MessageHeader = Pack::decode(&mut buf)?;
                match typ {
                    MessageHeader::Updates => {
                        if book_client.synced() {
                            let updates: Updates = Pack::decode(&mut buf)?;
                            self.consolidated_book.update(*tp, updates)
                        }
                    }
                    MessageHeader::Snapshot => {
                        let snap: Snapshot = Pack::decode(&mut buf)?;
                        self.consolidated_book.update_from_snapshot(*tp, snap)
                    }
                }
            }
            // this is the default value before the book subscribes on the qf side
            Event::Update(Value::Null) | Event::Unsubscribed => (),
            e => bail!("book protocol error, invalid event {:?}", e),
        }
        Ok(())
    }
}