1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/* Copyright 2023 Architect Financial Technologies LLC. This is free
 * software released under the GNU Affero Public License version 3. */
//! low level high performance orderflow client

use crate::{
    config::Common,
    protocol::orderflow::{pool_from_oms, ChanId, FromOms, Halt, OrderId, ToOms},
};
use anyhow::{bail, Result};
use netidx::{pool::Pooled, subscriber::Value};
use netidx_core::pack::Pack;
use netidx_protocols::{
    pack_channel::client::{self, Connection},
    rpc::client::Proc,
};

pub struct Batch(client::Batch);

impl Batch {
    pub fn queue(&mut self, m: &ToOms) -> Result<()> {
        self.0.queue(m)
    }
}

/// This is the low level orderflow channel. This is the fastest way
/// to interact with the core short of being in it's address space.
pub struct Client {
    chan: ChanId,
    con: Connection,
    get_halts: Proc,
}

impl Client {
    /// Connect to the specified order flow channel. If you drop the
    /// channel it will disconnect. Disconnecting causes all open
    /// orders sent by the channel to be canceled.
    pub async fn new(common: &Common, chanid: Option<ChanId>) -> Result<Self> {
        let path = common.paths.core_flow().append("channel");
        let con = Connection::connect(&common.subscriber, path).await?;
        con.send_one(&1u64)?;
        let version = con.recv_one::<u64>().await?;
        if version != 1 {
            bail!("incompatible protocol version")
        }
        con.send_one(&chanid)?;
        let chan = match chanid {
            Some(id) => id,
            None => con.recv_one::<ChanId>().await?,
        };
        let get_halts =
            Proc::new(&common.subscriber, common.paths.core_halts().append("get-halts"))
                .await?;
        Ok(Self { con, chan, get_halts })
    }

    /// get the channel id of this channel, should you wish to use
    /// OrderId::new directly.
    pub fn id(&self) -> ChanId {
        self.chan
    }

    /// Generate a new order id valid only on this client session.
    pub fn orderid(&self) -> OrderId {
        OrderId::new(self.chan)
    }

    /// Send one message. It's recommended to send batches of messages
    /// using start_batch and send, but if you have just one, you can
    /// use this. If you wish to wait for the message to be flushed to
    /// the OS call flush.
    pub fn send_one(&self, m: &ToOms) -> Result<()> {
        self.con.send_one(m)
    }

    /// Start a new batch of messages. Once you've queued all the
    /// messages you'd like to send you can call send to send the
    /// batch.
    pub fn start_batch(&self) -> Batch {
        Batch(self.con.start_batch())
    }

    /// Send a batch of messages. If you wish to wait for the batch to
    /// be flushed to the OS you may call flush.
    pub fn send(&self, batch: Batch) -> Result<()> {
        self.con.send(batch.0)
    }

    /// Wait for previously sent message to be flushed to OS
    /// channels. This is not required, it is only necessary if you
    /// have a high message rate and you want pushback.
    pub async fn flush(&self) -> Result<()> {
        self.con.flush().await
    }

    /// Receive one message from the core. If no message is currently
    /// available, wait for one to arrive. It is recommended to use
    /// recv instead for better efficiencly.
    pub async fn recv_one(&self) -> Result<FromOms> {
        self.con.recv_one().await
    }

    /// Receive one message from the core, but if no messages are
    /// available, don't wait, instead return None. This will only
    /// block if another receive is in progress concurrently.
    pub async fn try_recv_one(&self) -> Result<Option<FromOms>> {
        self.con.try_recv_one().await
    }

    /// Receive all available messages from the core. Wait for at
    /// least one message to be available.
    pub async fn recv(&self) -> Result<Pooled<Vec<FromOms>>> {
        let mut batch = pool_from_oms().take();
        self.con
            .recv(|m| {
                batch.push(m);
                true
            })
            .await?;
        Ok(batch)
    }

    /// Receive all available messages from the core, but if none are
    /// available do not wait for one to arrive. This will only block
    /// if another receive is in progress concurrently.
    pub async fn try_recv(&self) -> Result<Pooled<Vec<FromOms>>> {
        let mut batch = pool_from_oms().take();
        self.con
            .try_recv(|m| {
                batch.push(m);
                true
            })
            .await?;
        Ok(batch)
    }

    /// Return the list of halts
    pub async fn get_halts(&self) -> Result<Pooled<Vec<Halt>>> {
        let res = self.get_halts.call::<_, &str>([]).await?;
        match res {
            Value::Error(e) => bail!(e.to_string()),
            Value::Bytes(b) => Ok(Pack::decode(&mut &*b)?),
            _ => bail!("unexpected response"),
        }
    }
}