1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
//! Simple orderflow client suitable for connecting to an Oms or directly
//! to a Cpty.  It handles connecting to an OrderflowAuthority, requesting
//! an order id range, and passing orderflow messages.

use crate::{AtomicOrderIdAllocator, ChannelDriver, Common, OrderIdAllocator};
use anyhow::{anyhow, Result};
use api::{orderflow::*, ComponentId, TypedMessage};
use log::info;
use std::sync::Arc;
use tokio::{sync::watch, task};

pub struct OrderflowClient {
    driver: Arc<ChannelDriver>,
    _order_ids_tx: Arc<watch::Sender<Option<AtomicOrderIdAllocator>>>,
    order_ids_rx: watch::Receiver<Option<AtomicOrderIdAllocator>>,
    target: ComponentId,
}

impl OrderflowClient {
    /// Connect to a component that implements an orderflow interface.  If no target is specified,
    /// search for an "Oms" component in the config.  If no order authority is specified, search
    /// for a "OrderAuthority" component in the config.
    ///
    /// If no order id range is specified, default to 2^20.
    pub fn new(
        common: &Common,
        driver: Arc<ChannelDriver>,
        order_authority: Option<ComponentId>,
        order_id_range: Option<u64>,
        target: Option<ComponentId>,
    ) -> Result<Self> {
        let (order_ids_tx, order_ids_rx) = watch::channel(None);
        let order_ids_tx = Arc::new(order_ids_tx);
        {
            let common = common.clone();
            let driver = driver.clone();
            let order_ids_tx = order_ids_tx.clone();
            task::spawn(async move {
                driver.wait_connected().await?;
                let order_ids = OrderIdAllocator::get_allocation(
                    &common,
                    Some(&driver),
                    order_authority,
                    order_id_range,
                )
                .await?;
                info!("order id range allocated: {:?}", order_ids);
                order_ids_tx.send(Some(order_ids.into()))?;
                Ok::<_, anyhow::Error>(())
            });
        }
        let target = target
            .or_else(|| {
                info!("no target specified; searching for an Oms in config...");
                common.get_component_of_kind("Oms")
            })
            .ok_or_else(|| anyhow!("no target found"))?;
        Ok(Self { driver, _order_ids_tx: order_ids_tx, order_ids_rx, target })
    }

    pub async fn wait_allocated(&mut self) -> Result<()> {
        let _ = self.order_ids_rx.wait_for(|a| a.is_some()).await?;
        Ok(())
    }

    pub fn next_order_id(&self) -> Result<OrderId> {
        let order_ids = self.order_ids_rx.borrow();
        match order_ids.as_ref() {
            Some(order_ids) => order_ids.next(),
            None => Err(anyhow!("no order ids")),
        }
    }

    /// Send a message to the configured target.
    pub fn send<M>(&self, msg: M) -> Result<()>
    where
        M: Into<TypedMessage>,
    {
        self.driver.send_to(self.target, msg)
    }

    pub fn driver(&self) -> &ChannelDriver {
        &self.driver
    }
}