tesser_execution/algorithm/
pegged.rs

1use anyhow::{anyhow, Result};
2use chrono::{DateTime, Duration, Utc};
3use rust_decimal::Decimal;
4use serde::{Deserialize, Serialize};
5use tesser_core::{
6    Order, OrderId, OrderRequest, OrderStatus, OrderType, OrderUpdateRequest, Quantity, Signal,
7    Tick, TimeInForce,
8};
9use uuid::Uuid;
10
11use super::{AlgoStatus, ChildOrderAction, ChildOrderRequest, ExecutionAlgorithm};
12
13#[derive(Debug, Deserialize, Serialize)]
14struct PeggedState {
15    id: Uuid,
16    parent_signal: Signal,
17    status: String,
18    total_quantity: Quantity,
19    filled_quantity: Quantity,
20    clip_size: Quantity,
21    offset_bps: Decimal,
22    refresh: Duration,
23    last_order_time: Option<DateTime<Utc>>,
24    last_peg_price: Option<Decimal>,
25    next_child_seq: u32,
26    #[serde(default)]
27    min_chase_distance: Decimal,
28    #[serde(default)]
29    active_child: Option<ActiveChildOrder>,
30}
31
32#[derive(Clone, Debug, Deserialize, Serialize)]
33struct ActiveChildOrder {
34    order_id: OrderId,
35    client_order_id: Option<String>,
36    total_quantity: Quantity,
37    filled_quantity: Quantity,
38    working_price: Option<Decimal>,
39}
40
41impl ActiveChildOrder {
42    fn remaining(&self) -> Quantity {
43        (self.total_quantity - self.filled_quantity).max(Decimal::ZERO)
44    }
45}
46
47/// Algorithm that repeatedly submits IOC limit orders pegged to the top of book.
48pub struct PeggedBestAlgorithm {
49    state: PeggedState,
50}
51
52impl PeggedBestAlgorithm {
53    pub fn new(
54        signal: Signal,
55        total_quantity: Quantity,
56        offset_bps: Decimal,
57        clip_size: Option<Quantity>,
58        refresh: Duration,
59        min_chase_distance: Option<Decimal>,
60    ) -> Result<Self> {
61        if total_quantity <= Decimal::ZERO {
62            return Err(anyhow!("pegged algorithm requires positive quantity"));
63        }
64        if offset_bps < Decimal::ZERO {
65            return Err(anyhow!("offset must be non-negative"));
66        }
67        if refresh <= Duration::zero() {
68            return Err(anyhow!("refresh interval must be positive"));
69        }
70        let clip = clip_size
71            .unwrap_or(total_quantity)
72            .max(Decimal::ZERO)
73            .min(total_quantity);
74        Ok(Self {
75            state: PeggedState {
76                id: Uuid::new_v4(),
77                parent_signal: signal,
78                status: "Working".into(),
79                total_quantity,
80                filled_quantity: Decimal::ZERO,
81                clip_size: if clip <= Decimal::ZERO {
82                    total_quantity
83                } else {
84                    clip
85                },
86                offset_bps,
87                refresh,
88                last_order_time: None,
89                last_peg_price: None,
90                next_child_seq: 0,
91                min_chase_distance: min_chase_distance
92                    .unwrap_or(Decimal::ZERO)
93                    .max(Decimal::ZERO),
94                active_child: None,
95            },
96        })
97    }
98
99    fn remaining(&self) -> Quantity {
100        (self.state.total_quantity - self.state.filled_quantity).max(Decimal::ZERO)
101    }
102
103    fn peg_price(&self, tick_price: Decimal) -> Decimal {
104        let offset = self.state.offset_bps / Decimal::from(10_000);
105        match self.state.parent_signal.kind.side() {
106            tesser_core::Side::Buy => tick_price * (Decimal::ONE - offset),
107            tesser_core::Side::Sell => tick_price * (Decimal::ONE + offset),
108        }
109    }
110
111    fn should_refresh(&self, price: Decimal, now: DateTime<Utc>) -> bool {
112        if self.last_emit_elapsed(now) >= self.state.refresh {
113            return true;
114        }
115        self.state
116            .last_peg_price
117            .map(|prev| (prev - price).abs() > self.state.min_chase_distance)
118            .unwrap_or(true)
119    }
120
121    fn should_chase(&self, price: Decimal, now: DateTime<Utc>) -> bool {
122        if self.last_emit_elapsed(now) < self.state.refresh {
123            return false;
124        }
125        if let Some(prev) = self.state.last_peg_price {
126            if (prev - price).abs() < self.state.min_chase_distance {
127                return false;
128            }
129        }
130        true
131    }
132
133    fn last_emit_elapsed(&self, now: DateTime<Utc>) -> Duration {
134        self.state
135            .last_order_time
136            .map(|ts| now.signed_duration_since(ts))
137            .unwrap_or(self.state.refresh)
138    }
139
140    fn build_child(&mut self, price: Decimal, qty: Quantity) -> ChildOrderRequest {
141        self.state.next_child_seq += 1;
142        ChildOrderRequest {
143            parent_algo_id: self.state.id,
144            action: ChildOrderAction::Place(OrderRequest {
145                symbol: self.state.parent_signal.symbol,
146                side: self.state.parent_signal.kind.side(),
147                order_type: OrderType::Limit,
148                quantity: qty,
149                price: Some(price),
150                trigger_price: None,
151                time_in_force: Some(TimeInForce::GoodTilCanceled),
152                client_order_id: Some(format!(
153                    "peg-{}-{}",
154                    self.state.id, self.state.next_child_seq
155                )),
156                take_profit: None,
157                stop_loss: None,
158                display_quantity: None,
159            }),
160        }
161    }
162
163    fn mark_completed(&mut self) {
164        if self.remaining() <= Decimal::ZERO {
165            self.state.status = "Completed".into();
166        }
167    }
168
169    fn sequence_from_client_id(&self, client_id: &str) -> Option<u32> {
170        let rest = client_id.strip_prefix("peg-")?;
171        let (id_part, seq_part) = rest.rsplit_once('-')?;
172        if id_part != self.state.id.to_string() {
173            return None;
174        }
175        seq_part.parse().ok()
176    }
177
178    fn is_active_status(status: OrderStatus) -> bool {
179        matches!(
180            status,
181            OrderStatus::PendingNew | OrderStatus::Accepted | OrderStatus::PartiallyFilled
182        )
183    }
184}
185
186impl ExecutionAlgorithm for PeggedBestAlgorithm {
187    fn kind(&self) -> &'static str {
188        "PEGGED_BEST"
189    }
190
191    fn id(&self) -> &Uuid {
192        &self.state.id
193    }
194
195    fn status(&self) -> AlgoStatus {
196        match self.state.status.as_str() {
197            "Working" => AlgoStatus::Working,
198            "Completed" => AlgoStatus::Completed,
199            "Cancelled" => AlgoStatus::Cancelled,
200            other => AlgoStatus::Failed(other.to_string()),
201        }
202    }
203
204    fn start(&mut self) -> Result<Vec<ChildOrderRequest>> {
205        Ok(Vec::new())
206    }
207
208    fn on_child_order_placed(&mut self, order: &Order) {
209        self.state.last_order_time = Some(Utc::now());
210        if let Some(price) = order.request.price {
211            self.state.last_peg_price = Some(price);
212        }
213        self.state.active_child = Some(ActiveChildOrder {
214            order_id: order.id.clone(),
215            client_order_id: order.request.client_order_id.clone(),
216            total_quantity: order.request.quantity,
217            filled_quantity: order.filled_quantity,
218            working_price: order.request.price,
219        });
220    }
221
222    fn on_fill(&mut self, fill: &tesser_core::Fill) -> Result<Vec<ChildOrderRequest>> {
223        self.state.filled_quantity += fill.fill_quantity;
224        if let Some(active) = self.state.active_child.as_mut() {
225            if active.order_id == fill.order_id {
226                active.filled_quantity += fill.fill_quantity;
227                if active.remaining() <= Decimal::ZERO {
228                    self.state.active_child = None;
229                }
230            }
231        }
232        self.mark_completed();
233        Ok(Vec::new())
234    }
235
236    fn on_tick(&mut self, tick: &Tick) -> Result<Vec<ChildOrderRequest>> {
237        if !matches!(self.status(), AlgoStatus::Working) {
238            return Ok(Vec::new());
239        }
240        let remaining = self.remaining();
241        if remaining <= Decimal::ZERO {
242            self.state.status = "Completed".into();
243            return Ok(Vec::new());
244        }
245        let now = Utc::now();
246        let price = self.peg_price(tick.price.max(Decimal::ZERO));
247        if let Some(active) = &self.state.active_child {
248            if !self.should_chase(price, now) {
249                return Ok(Vec::new());
250            }
251            if active.remaining() <= Decimal::ZERO {
252                return Ok(Vec::new());
253            }
254            self.state.last_peg_price = Some(price);
255            self.state.last_order_time = Some(now);
256            let request = OrderUpdateRequest {
257                order_id: active.order_id.clone(),
258                symbol: self.state.parent_signal.symbol,
259                side: self.state.parent_signal.kind.side(),
260                new_price: Some(price),
261                new_quantity: Some(active.total_quantity),
262            };
263            return Ok(vec![ChildOrderRequest {
264                parent_algo_id: self.state.id,
265                action: ChildOrderAction::Amend(request),
266            }]);
267        }
268        if !self.should_refresh(price, now) {
269            return Ok(Vec::new());
270        }
271        self.state.last_peg_price = Some(price);
272        let qty = remaining.min(self.state.clip_size);
273        if qty <= Decimal::ZERO {
274            return Ok(Vec::new());
275        }
276        Ok(vec![self.build_child(price, qty)])
277    }
278
279    fn on_timer(&mut self) -> Result<Vec<ChildOrderRequest>> {
280        self.mark_completed();
281        Ok(Vec::new())
282    }
283
284    fn cancel(&mut self) -> Result<()> {
285        self.state.status = "Cancelled".into();
286        Ok(())
287    }
288
289    fn bind_child_order(&mut self, order: Order) -> Result<()> {
290        if !Self::is_active_status(order.status) {
291            return Ok(());
292        }
293        let Some(client_id) = order.request.client_order_id.as_deref() else {
294            return Ok(());
295        };
296        if let Some(seq) = self.sequence_from_client_id(client_id) {
297            self.state.next_child_seq = self.state.next_child_seq.max(seq);
298        }
299        self.state.last_order_time = Some(order.updated_at);
300        if let Some(price) = order.request.price {
301            self.state.last_peg_price = Some(price);
302        }
303        self.state.active_child = Some(ActiveChildOrder {
304            order_id: order.id.clone(),
305            client_order_id: order.request.client_order_id.clone(),
306            total_quantity: order.request.quantity,
307            filled_quantity: order.filled_quantity,
308            working_price: order.request.price,
309        });
310        if !matches!(self.status(), AlgoStatus::Working) {
311            self.state.status = "Working".into();
312        }
313        Ok(())
314    }
315
316    fn state(&self) -> serde_json::Value {
317        serde_json::to_value(&self.state).expect("pegged state serialization failed")
318    }
319
320    fn from_state(state: serde_json::Value) -> Result<Self>
321    where
322        Self: Sized,
323    {
324        let state: PeggedState = serde_json::from_value(state)?;
325        Ok(Self { state })
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332    use tesser_core::{Order, OrderStatus, Signal, SignalKind, Tick};
333
334    #[test]
335    fn emits_child_after_tick() {
336        let signal = Signal::new("BTCUSDT", SignalKind::EnterLong, 0.9);
337        let mut algo = PeggedBestAlgorithm::new(
338            signal,
339            Decimal::from(5),
340            Decimal::new(5, 1),
341            None,
342            Duration::seconds(1),
343            Some(Decimal::new(1, 2)),
344        )
345        .unwrap();
346        let tick = Tick {
347            symbol: "BTCUSDT".into(),
348            price: Decimal::from(100),
349            size: Decimal::ONE,
350            side: tesser_core::Side::Buy,
351            exchange_timestamp: Utc::now(),
352            received_at: Utc::now(),
353        };
354        let orders = algo.on_tick(&tick).unwrap();
355        assert_eq!(orders.len(), 1);
356        match &orders[0].action {
357            ChildOrderAction::Place(request) => assert!(request.quantity > Decimal::ZERO),
358            other => panic!("unexpected action: {other:?}"),
359        }
360    }
361
362    #[test]
363    fn chases_active_order_with_amend() {
364        let signal = Signal::new("BTCUSDT", SignalKind::EnterLong, 0.9);
365        let mut algo = PeggedBestAlgorithm::new(
366            signal,
367            Decimal::from(2),
368            Decimal::new(1, 1),
369            None,
370            Duration::seconds(1),
371            Some(Decimal::from(5)),
372        )
373        .unwrap();
374        let first_tick = Tick {
375            symbol: "BTCUSDT".into(),
376            price: Decimal::from(30_000),
377            size: Decimal::ONE,
378            side: tesser_core::Side::Buy,
379            exchange_timestamp: Utc::now(),
380            received_at: Utc::now(),
381        };
382        let child = algo.on_tick(&first_tick).unwrap();
383        let request = match &child[0].action {
384            ChildOrderAction::Place(req) => req.clone(),
385            other => panic!("expected placement, got {other:?}"),
386        };
387        let order = Order {
388            id: "child-order".into(),
389            request: request.clone(),
390            status: OrderStatus::Accepted,
391            filled_quantity: Decimal::ZERO,
392            avg_fill_price: None,
393            created_at: Utc::now(),
394            updated_at: Utc::now(),
395        };
396        algo.on_child_order_placed(&order);
397        algo.state.last_order_time = Some(Utc::now() - Duration::seconds(5));
398
399        let chase_tick = Tick {
400            price: Decimal::from(30_020),
401            ..first_tick
402        };
403        let chase = algo.on_tick(&chase_tick).unwrap();
404        assert_eq!(chase.len(), 1);
405        match &chase[0].action {
406            ChildOrderAction::Amend(update) => {
407                assert_eq!(update.order_id, order.id);
408                assert_eq!(update.symbol, order.request.symbol);
409                assert_eq!(update.new_quantity, Some(order.request.quantity));
410                assert!(update.new_price.unwrap() > request.price.unwrap());
411            }
412            other => panic!("expected amend action, got {other:?}"),
413        }
414    }
415}