1use anyhow::{anyhow, Result};
2use chrono::{DateTime, Duration, Utc};
3use rust_decimal::Decimal;
4use serde::{Deserialize, Serialize};
5use tesser_core::{
6 Order, OrderId, OrderRequest, OrderStatus, OrderType, OrderUpdateRequest, Quantity, Signal,
7 Tick, TimeInForce,
8};
9use uuid::Uuid;
10
11use super::{AlgoStatus, ChildOrderAction, ChildOrderRequest, ExecutionAlgorithm};
12
13#[derive(Debug, Deserialize, Serialize)]
14struct PeggedState {
15 id: Uuid,
16 parent_signal: Signal,
17 status: String,
18 total_quantity: Quantity,
19 filled_quantity: Quantity,
20 clip_size: Quantity,
21 offset_bps: Decimal,
22 refresh: Duration,
23 last_order_time: Option<DateTime<Utc>>,
24 last_peg_price: Option<Decimal>,
25 next_child_seq: u32,
26 #[serde(default)]
27 min_chase_distance: Decimal,
28 #[serde(default)]
29 active_child: Option<ActiveChildOrder>,
30}
31
32#[derive(Clone, Debug, Deserialize, Serialize)]
33struct ActiveChildOrder {
34 order_id: OrderId,
35 client_order_id: Option<String>,
36 total_quantity: Quantity,
37 filled_quantity: Quantity,
38 working_price: Option<Decimal>,
39}
40
41impl ActiveChildOrder {
42 fn remaining(&self) -> Quantity {
43 (self.total_quantity - self.filled_quantity).max(Decimal::ZERO)
44 }
45}
46
47pub struct PeggedBestAlgorithm {
49 state: PeggedState,
50}
51
52impl PeggedBestAlgorithm {
53 pub fn new(
54 signal: Signal,
55 total_quantity: Quantity,
56 offset_bps: Decimal,
57 clip_size: Option<Quantity>,
58 refresh: Duration,
59 min_chase_distance: Option<Decimal>,
60 ) -> Result<Self> {
61 if total_quantity <= Decimal::ZERO {
62 return Err(anyhow!("pegged algorithm requires positive quantity"));
63 }
64 if offset_bps < Decimal::ZERO {
65 return Err(anyhow!("offset must be non-negative"));
66 }
67 if refresh <= Duration::zero() {
68 return Err(anyhow!("refresh interval must be positive"));
69 }
70 let clip = clip_size
71 .unwrap_or(total_quantity)
72 .max(Decimal::ZERO)
73 .min(total_quantity);
74 Ok(Self {
75 state: PeggedState {
76 id: Uuid::new_v4(),
77 parent_signal: signal,
78 status: "Working".into(),
79 total_quantity,
80 filled_quantity: Decimal::ZERO,
81 clip_size: if clip <= Decimal::ZERO {
82 total_quantity
83 } else {
84 clip
85 },
86 offset_bps,
87 refresh,
88 last_order_time: None,
89 last_peg_price: None,
90 next_child_seq: 0,
91 min_chase_distance: min_chase_distance
92 .unwrap_or(Decimal::ZERO)
93 .max(Decimal::ZERO),
94 active_child: None,
95 },
96 })
97 }
98
99 fn remaining(&self) -> Quantity {
100 (self.state.total_quantity - self.state.filled_quantity).max(Decimal::ZERO)
101 }
102
103 fn peg_price(&self, tick_price: Decimal) -> Decimal {
104 let offset = self.state.offset_bps / Decimal::from(10_000);
105 match self.state.parent_signal.kind.side() {
106 tesser_core::Side::Buy => tick_price * (Decimal::ONE - offset),
107 tesser_core::Side::Sell => tick_price * (Decimal::ONE + offset),
108 }
109 }
110
111 fn should_refresh(&self, price: Decimal, now: DateTime<Utc>) -> bool {
112 if self.last_emit_elapsed(now) >= self.state.refresh {
113 return true;
114 }
115 self.state
116 .last_peg_price
117 .map(|prev| (prev - price).abs() > self.state.min_chase_distance)
118 .unwrap_or(true)
119 }
120
121 fn should_chase(&self, price: Decimal, now: DateTime<Utc>) -> bool {
122 if self.last_emit_elapsed(now) < self.state.refresh {
123 return false;
124 }
125 if let Some(prev) = self.state.last_peg_price {
126 if (prev - price).abs() < self.state.min_chase_distance {
127 return false;
128 }
129 }
130 true
131 }
132
133 fn last_emit_elapsed(&self, now: DateTime<Utc>) -> Duration {
134 self.state
135 .last_order_time
136 .map(|ts| now.signed_duration_since(ts))
137 .unwrap_or(self.state.refresh)
138 }
139
140 fn build_child(&mut self, price: Decimal, qty: Quantity) -> ChildOrderRequest {
141 self.state.next_child_seq += 1;
142 ChildOrderRequest {
143 parent_algo_id: self.state.id,
144 action: ChildOrderAction::Place(OrderRequest {
145 symbol: self.state.parent_signal.symbol,
146 side: self.state.parent_signal.kind.side(),
147 order_type: OrderType::Limit,
148 quantity: qty,
149 price: Some(price),
150 trigger_price: None,
151 time_in_force: Some(TimeInForce::GoodTilCanceled),
152 client_order_id: Some(format!(
153 "peg-{}-{}",
154 self.state.id, self.state.next_child_seq
155 )),
156 take_profit: None,
157 stop_loss: None,
158 display_quantity: None,
159 }),
160 }
161 }
162
163 fn mark_completed(&mut self) {
164 if self.remaining() <= Decimal::ZERO {
165 self.state.status = "Completed".into();
166 }
167 }
168
169 fn sequence_from_client_id(&self, client_id: &str) -> Option<u32> {
170 let rest = client_id.strip_prefix("peg-")?;
171 let (id_part, seq_part) = rest.rsplit_once('-')?;
172 if id_part != self.state.id.to_string() {
173 return None;
174 }
175 seq_part.parse().ok()
176 }
177
178 fn is_active_status(status: OrderStatus) -> bool {
179 matches!(
180 status,
181 OrderStatus::PendingNew | OrderStatus::Accepted | OrderStatus::PartiallyFilled
182 )
183 }
184}
185
186impl ExecutionAlgorithm for PeggedBestAlgorithm {
187 fn kind(&self) -> &'static str {
188 "PEGGED_BEST"
189 }
190
191 fn id(&self) -> &Uuid {
192 &self.state.id
193 }
194
195 fn status(&self) -> AlgoStatus {
196 match self.state.status.as_str() {
197 "Working" => AlgoStatus::Working,
198 "Completed" => AlgoStatus::Completed,
199 "Cancelled" => AlgoStatus::Cancelled,
200 other => AlgoStatus::Failed(other.to_string()),
201 }
202 }
203
204 fn start(&mut self) -> Result<Vec<ChildOrderRequest>> {
205 Ok(Vec::new())
206 }
207
208 fn on_child_order_placed(&mut self, order: &Order) {
209 self.state.last_order_time = Some(Utc::now());
210 if let Some(price) = order.request.price {
211 self.state.last_peg_price = Some(price);
212 }
213 self.state.active_child = Some(ActiveChildOrder {
214 order_id: order.id.clone(),
215 client_order_id: order.request.client_order_id.clone(),
216 total_quantity: order.request.quantity,
217 filled_quantity: order.filled_quantity,
218 working_price: order.request.price,
219 });
220 }
221
222 fn on_fill(&mut self, fill: &tesser_core::Fill) -> Result<Vec<ChildOrderRequest>> {
223 self.state.filled_quantity += fill.fill_quantity;
224 if let Some(active) = self.state.active_child.as_mut() {
225 if active.order_id == fill.order_id {
226 active.filled_quantity += fill.fill_quantity;
227 if active.remaining() <= Decimal::ZERO {
228 self.state.active_child = None;
229 }
230 }
231 }
232 self.mark_completed();
233 Ok(Vec::new())
234 }
235
236 fn on_tick(&mut self, tick: &Tick) -> Result<Vec<ChildOrderRequest>> {
237 if !matches!(self.status(), AlgoStatus::Working) {
238 return Ok(Vec::new());
239 }
240 let remaining = self.remaining();
241 if remaining <= Decimal::ZERO {
242 self.state.status = "Completed".into();
243 return Ok(Vec::new());
244 }
245 let now = Utc::now();
246 let price = self.peg_price(tick.price.max(Decimal::ZERO));
247 if let Some(active) = &self.state.active_child {
248 if !self.should_chase(price, now) {
249 return Ok(Vec::new());
250 }
251 if active.remaining() <= Decimal::ZERO {
252 return Ok(Vec::new());
253 }
254 self.state.last_peg_price = Some(price);
255 self.state.last_order_time = Some(now);
256 let request = OrderUpdateRequest {
257 order_id: active.order_id.clone(),
258 symbol: self.state.parent_signal.symbol,
259 side: self.state.parent_signal.kind.side(),
260 new_price: Some(price),
261 new_quantity: Some(active.total_quantity),
262 };
263 return Ok(vec![ChildOrderRequest {
264 parent_algo_id: self.state.id,
265 action: ChildOrderAction::Amend(request),
266 }]);
267 }
268 if !self.should_refresh(price, now) {
269 return Ok(Vec::new());
270 }
271 self.state.last_peg_price = Some(price);
272 let qty = remaining.min(self.state.clip_size);
273 if qty <= Decimal::ZERO {
274 return Ok(Vec::new());
275 }
276 Ok(vec![self.build_child(price, qty)])
277 }
278
279 fn on_timer(&mut self) -> Result<Vec<ChildOrderRequest>> {
280 self.mark_completed();
281 Ok(Vec::new())
282 }
283
284 fn cancel(&mut self) -> Result<()> {
285 self.state.status = "Cancelled".into();
286 Ok(())
287 }
288
289 fn bind_child_order(&mut self, order: Order) -> Result<()> {
290 if !Self::is_active_status(order.status) {
291 return Ok(());
292 }
293 let Some(client_id) = order.request.client_order_id.as_deref() else {
294 return Ok(());
295 };
296 if let Some(seq) = self.sequence_from_client_id(client_id) {
297 self.state.next_child_seq = self.state.next_child_seq.max(seq);
298 }
299 self.state.last_order_time = Some(order.updated_at);
300 if let Some(price) = order.request.price {
301 self.state.last_peg_price = Some(price);
302 }
303 self.state.active_child = Some(ActiveChildOrder {
304 order_id: order.id.clone(),
305 client_order_id: order.request.client_order_id.clone(),
306 total_quantity: order.request.quantity,
307 filled_quantity: order.filled_quantity,
308 working_price: order.request.price,
309 });
310 if !matches!(self.status(), AlgoStatus::Working) {
311 self.state.status = "Working".into();
312 }
313 Ok(())
314 }
315
316 fn state(&self) -> serde_json::Value {
317 serde_json::to_value(&self.state).expect("pegged state serialization failed")
318 }
319
320 fn from_state(state: serde_json::Value) -> Result<Self>
321 where
322 Self: Sized,
323 {
324 let state: PeggedState = serde_json::from_value(state)?;
325 Ok(Self { state })
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332 use tesser_core::{Order, OrderStatus, Signal, SignalKind, Tick};
333
334 #[test]
335 fn emits_child_after_tick() {
336 let signal = Signal::new("BTCUSDT", SignalKind::EnterLong, 0.9);
337 let mut algo = PeggedBestAlgorithm::new(
338 signal,
339 Decimal::from(5),
340 Decimal::new(5, 1),
341 None,
342 Duration::seconds(1),
343 Some(Decimal::new(1, 2)),
344 )
345 .unwrap();
346 let tick = Tick {
347 symbol: "BTCUSDT".into(),
348 price: Decimal::from(100),
349 size: Decimal::ONE,
350 side: tesser_core::Side::Buy,
351 exchange_timestamp: Utc::now(),
352 received_at: Utc::now(),
353 };
354 let orders = algo.on_tick(&tick).unwrap();
355 assert_eq!(orders.len(), 1);
356 match &orders[0].action {
357 ChildOrderAction::Place(request) => assert!(request.quantity > Decimal::ZERO),
358 other => panic!("unexpected action: {other:?}"),
359 }
360 }
361
362 #[test]
363 fn chases_active_order_with_amend() {
364 let signal = Signal::new("BTCUSDT", SignalKind::EnterLong, 0.9);
365 let mut algo = PeggedBestAlgorithm::new(
366 signal,
367 Decimal::from(2),
368 Decimal::new(1, 1),
369 None,
370 Duration::seconds(1),
371 Some(Decimal::from(5)),
372 )
373 .unwrap();
374 let first_tick = Tick {
375 symbol: "BTCUSDT".into(),
376 price: Decimal::from(30_000),
377 size: Decimal::ONE,
378 side: tesser_core::Side::Buy,
379 exchange_timestamp: Utc::now(),
380 received_at: Utc::now(),
381 };
382 let child = algo.on_tick(&first_tick).unwrap();
383 let request = match &child[0].action {
384 ChildOrderAction::Place(req) => req.clone(),
385 other => panic!("expected placement, got {other:?}"),
386 };
387 let order = Order {
388 id: "child-order".into(),
389 request: request.clone(),
390 status: OrderStatus::Accepted,
391 filled_quantity: Decimal::ZERO,
392 avg_fill_price: None,
393 created_at: Utc::now(),
394 updated_at: Utc::now(),
395 };
396 algo.on_child_order_placed(&order);
397 algo.state.last_order_time = Some(Utc::now() - Duration::seconds(5));
398
399 let chase_tick = Tick {
400 price: Decimal::from(30_020),
401 ..first_tick
402 };
403 let chase = algo.on_tick(&chase_tick).unwrap();
404 assert_eq!(chase.len(), 1);
405 match &chase[0].action {
406 ChildOrderAction::Amend(update) => {
407 assert_eq!(update.order_id, order.id);
408 assert_eq!(update.symbol, order.request.symbol);
409 assert_eq!(update.new_quantity, Some(order.request.quantity));
410 assert!(update.new_price.unwrap() > request.price.unwrap());
411 }
412 other => panic!("expected amend action, got {other:?}"),
413 }
414 }
415}