1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
/* Copyright 2023 Architect Financial Technologies LLC. This is free
* software released under the GNU Affero Public License version 3. */
//! low level high performance orderflow client
use crate::{
config::Common,
protocol::orderflow::{pool_from_oms, ChanId, FromOms, Halt, OrderId, ToOms},
};
use anyhow::{bail, Result};
use netidx::{pool::Pooled, subscriber::Value};
use netidx_core::pack::Pack;
use netidx_protocols::{
pack_channel::client::{self, Connection},
rpc::client::Proc,
};
pub struct Batch(client::Batch);
impl Batch {
pub fn queue(&mut self, m: &ToOms) -> Result<()> {
self.0.queue(m)
}
}
/// This is the low level orderflow channel. This is the fastest way
/// to interact with the core short of being in it's address space.
pub struct Client {
chan: ChanId,
con: Connection,
get_halts: Proc,
}
impl Client {
/// Connect to the specified order flow channel. If you drop the
/// channel it will disconnect. Disconnecting causes all open
/// orders sent by the channel to be canceled.
pub async fn new(common: &Common, chanid: Option<ChanId>) -> Result<Self> {
let path = common.paths.core_flow().append("channel");
let con = Connection::connect(&common.subscriber, path).await?;
con.send_one(&1u64)?;
let version = con.recv_one::<u64>().await?;
if version != 1 {
bail!("incompatible protocol version")
}
con.send_one(&chanid)?;
let chan = match chanid {
Some(id) => id,
None => con.recv_one::<ChanId>().await?,
};
let get_halts =
Proc::new(&common.subscriber, common.paths.core_halts().append("get-halts"))
.await?;
Ok(Self { con, chan, get_halts })
}
/// get the channel id of this channel, should you wish to use
/// OrderId::new directly.
pub fn id(&self) -> ChanId {
self.chan
}
/// Generate a new order id valid only on this client session.
pub fn orderid(&self) -> OrderId {
OrderId::new(self.chan)
}
/// Send one message. It's recommended to send batches of messages
/// using start_batch and send, but if you have just one, you can
/// use this. If you wish to wait for the message to be flushed to
/// the OS call flush.
pub fn send_one(&self, m: &ToOms) -> Result<()> {
self.con.send_one(m)
}
/// Start a new batch of messages. Once you've queued all the
/// messages you'd like to send you can call send to send the
/// batch.
pub fn start_batch(&self) -> Batch {
Batch(self.con.start_batch())
}
/// Send a batch of messages. If you wish to wait for the batch to
/// be flushed to the OS you may call flush.
pub fn send(&self, batch: Batch) -> Result<()> {
self.con.send(batch.0)
}
/// Wait for previously sent message to be flushed to OS
/// channels. This is not required, it is only necessary if you
/// have a high message rate and you want pushback.
pub async fn flush(&self) -> Result<()> {
self.con.flush().await
}
/// Receive one message from the core. If no message is currently
/// available, wait for one to arrive. It is recommended to use
/// recv instead for better efficiencly.
pub async fn recv_one(&self) -> Result<FromOms> {
self.con.recv_one().await
}
/// Receive one message from the core, but if no messages are
/// available, don't wait, instead return None. This will only
/// block if another receive is in progress concurrently.
pub async fn try_recv_one(&self) -> Result<Option<FromOms>> {
self.con.try_recv_one().await
}
/// Receive all available messages from the core. Wait for at
/// least one message to be available.
pub async fn recv(&self) -> Result<Pooled<Vec<FromOms>>> {
let mut batch = pool_from_oms().take();
self.con
.recv(|m| {
batch.push(m);
true
})
.await?;
Ok(batch)
}
/// Receive all available messages from the core, but if none are
/// available do not wait for one to arrive. This will only block
/// if another receive is in progress concurrently.
pub async fn try_recv(&self) -> Result<Pooled<Vec<FromOms>>> {
let mut batch = pool_from_oms().take();
self.con
.try_recv(|m| {
batch.push(m);
true
})
.await?;
Ok(batch)
}
/// Return the list of halts
pub async fn get_halts(&self) -> Result<Pooled<Vec<Halt>>> {
let res = self.get_halts.call::<_, &str>([]).await?;
match res {
Value::Error(e) => bail!(e.to_string()),
Value::Bytes(b) => Ok(Pack::decode(&mut &*b)?),
_ => bail!("unexpected response"),
}
}
}