1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
/* Copyright 2023 Architect Financial Technologies LLC. This is free
 * software released under the GNU Affero Public License version 3. */
//! simplifed order flow client

use crate::{
    config::Common,
    protocol::{
        limits::{LimitScope, LimitSet},
        orderflow::{
            AberrantFill, ChanId, FillId, NormalFill, Order, OrderId, OrderState,
        },
        Account, Dir,
    },
    symbology::TradableProduct,
};
use anyhow::{bail, Result};
use enumflags2::BitFlags;
use netidx::{
    chars::Chars, pack::Pack, pool::Pooled, protocol::value::Value, utils::pack,
};
use netidx_protocols::{call_rpc, rpc::client::Proc};
use rust_decimal::Decimal;
use std::result;

/// This provides a simplified, but still pretty fast, api to query
/// the OMS, send, and cancel orders.
pub struct OmsQueryApi {
    get_order_state: Proc,
    get_channel_id: Proc,
    list_open: Proc,
    get_reject_reason: Proc,
    get_order_details: Proc,
    list_fills: Proc,
    get_fill_details: Proc,
    send_limit_order: Proc,
    cancel_order: Proc,
}

impl OmsQueryApi {
    /// Create a new simple order client
    pub async fn new(common: &Common) -> Result<Self> {
        let base = common.paths.core_flow();
        let subscriber = &common.subscriber;
        Ok(Self {
            get_order_state: Proc::new(subscriber, base.append("get-order-state"))
                .await?,
            list_open: Proc::new(subscriber, base.append("list-open")).await?,
            get_reject_reason: Proc::new(subscriber, base.append("get-reject-reason"))
                .await?,
            get_order_details: Proc::new(subscriber, base.append("get-order-details"))
                .await?,
            list_fills: Proc::new(subscriber, base.append("list-fills")).await?,
            get_fill_details: Proc::new(subscriber, base.append("get-fill-details"))
                .await?,
            send_limit_order: Proc::new(subscriber, base.append("send-limit-order"))
                .await?,
            get_channel_id: Proc::new(subscriber, base.append("get-channel-id")).await?,
            cancel_order: Proc::new(subscriber, base.append("cancel-order")).await?,
        })
    }

    /// Get the state of the specified order ids. This will return a
    /// state for every order id indicated. If an order is not found
    /// then it's state will be BitFlags::empty. len of the returned
    /// vector will always equal the len of the specified vector.
    pub async fn get_order_state(
        &self,
        ids: &Pooled<Vec<OrderId>>,
    ) -> Result<Pooled<Vec<BitFlags<OrderState>>>> {
        let ids = Value::Bytes(pack(ids)?.freeze());
        let res = call_rpc!(&self.get_order_state, ids: ids).await?;
        match res {
            Value::Error(e) => bail!(e.to_string()),
            Value::Bytes(b) => Ok(Pack::decode(&mut &*b)?),
            _ => bail!("unexpected response"),
        }
    }

    /// Return a list of open orders for the specified set and scope
    pub async fn list_open(
        &self,
        set: LimitSet,
        scope: LimitScope,
    ) -> Result<Pooled<Vec<OrderId>>> {
        let res = call_rpc!(&self.list_open, set: set, scope: scope).await?;
        match res {
            Value::Error(e) => bail!(e.to_string()),
            Value::Bytes(b) => Ok(Pack::decode(&mut &*b)?),
            _ => bail!("unexpected reponse"),
        }
    }

    /// Return the reason why the specified set of orders was
    /// rejected. This will be None if the order was not rejected.
    pub async fn get_reject_reason(
        &self,
        ids: &Pooled<Vec<OrderId>>,
    ) -> Result<Pooled<Vec<Option<Chars>>>> {
        let ids = Value::Bytes(pack(ids)?.freeze());
        let res = call_rpc!(&self.get_reject_reason, ids: ids).await?;
        match res {
            Value::Error(e) => bail!(e.to_string()),
            Value::Bytes(b) => Ok(Pack::decode(&mut &*b)?),
            _ => bail!("unexpected response"),
        }
    }

    /// Get the full details of the specified set of orders. If an
    /// order is not found for a given id, then no order state will be
    /// returned. len of the returned vector will always be <= len of
    /// the specified vector.
    pub async fn get_order_details(
        &self,
        ids: &Pooled<Vec<OrderId>>,
    ) -> Result<Pooled<Vec<Order>>> {
        let ids = Value::Bytes(pack(ids)?.freeze());
        let res = call_rpc!(&self.get_order_details, ids: ids).await?;
        match res {
            Value::Error(e) => bail!(e.to_string()),
            Value::Bytes(b) => Ok(Pack::decode(&mut &*b)?),
            _ => bail!("unexpected response"),
        }
    }

    /// List fills in the specified set and scope
    pub async fn list_fills(
        &self,
        set: LimitSet,
        scope: LimitScope,
    ) -> Result<Pooled<Vec<(OrderId, FillId)>>> {
        let res = call_rpc!(&self.list_fills, set: set, scope: scope).await?;
        match res {
            Value::Error(e) => bail!(e.to_string()),
            Value::Bytes(b) => Ok(Pack::decode(&mut &*b)?),
            _ => bail!("unexpected response"),
        }
    }

    /// Get the full details for a fill. If no fill is found for the
    /// specified id then no details will be returned for that id. len
    /// of the returned vector <= len of the specified vector.
    pub async fn get_fill_details(
        &self,
        ids: &Pooled<Vec<FillId>>,
    ) -> Result<Pooled<Vec<result::Result<NormalFill, AberrantFill>>>> {
        let ids = Value::Bytes(pack(ids)?.freeze());
        let res = call_rpc!(&self.get_fill_details, ids: ids).await?;
        match res {
            Value::Error(e) => bail!(e.to_string()),
            Value::Bytes(b) => Ok(Pack::decode(&mut &*b)?),
            _ => bail!("unexpected response"),
        }
    }

    /// Send a limit order, returning the order id if the order was
    /// sent successfully.
    pub async fn send_limit_order(
        &self,
        target: TradableProduct,
        account: Account,
        dir: Dir,
        price: Decimal,
        quantity: Decimal,
    ) -> Result<OrderId> {
        let res = call_rpc!(
            &self.send_limit_order,
            target: target,
            account: account,
            dir: dir,
            price: price,
            quantity: quantity
        )
        .await?;
        match res {
            Value::Error(e) => bail!(e.to_string()),
            v => Ok(v.cast_to()?),
        }
    }

    /// Cancel the specified order. A successful return does not mean
    /// the order is out, it just means the cancel has been received
    /// by the OMS and will be sent on to the counterparty.
    pub async fn cancel_order(&self, id: OrderId) -> Result<()> {
        let res = call_rpc!(&self.cancel_order, id: id).await?;
        match res {
            Value::Error(e) => bail!(e.to_string()),
            Value::Ok => Ok(()),
            _ => bail!("unexpected response"),
        }
    }

    /// Get the channel id of this orderflow channel. This is
    /// necessary if you want to allocate your own orderid for some
    /// reason. This is a very low level operation which isn't
    /// recommended.
    pub async fn get_channel_id(&self) -> Result<ChanId> {
        let args: [(&'static str, Value); 0] = [];
        let res = self.get_channel_id.call(args).await?;
        match res {
            Value::Error(e) => bail!(e.to_string()),
            v => Ok(v.cast_to()?),
        }
    }
}