1use std::cmp;
4use std::collections::HashMap;
5use std::fs;
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use chrono::{DateTime, Utc};
10use rusqlite::{params, Connection, OptionalExtension};
11use rust_decimal::Decimal;
12use serde::{de::DeserializeOwned, Deserialize, Serialize};
13use tesser_core::{
14 AccountBalance, AssetId, Cash, CashBook, ExchangeId, Fill, Instrument, InstrumentKind, Order,
15 Position, Price, Quantity, Side, Symbol,
16};
17use tesser_ledger::LedgerEntry;
18use tesser_markets::MarketRegistry;
19use thiserror::Error;
20
21pub type PortfolioResult<T> = Result<T, PortfolioError>;
23
24#[derive(Debug, Error)]
26pub enum PortfolioError {
27 #[error("unknown symbol: {0}")]
29 UnknownSymbol(Symbol),
30 #[error("internal error: {0}")]
32 Internal(String),
33}
34
35#[derive(Clone, Debug, Deserialize, Serialize)]
37pub struct PortfolioConfig {
38 pub initial_balances: HashMap<AssetId, Price>,
39 pub reporting_currency: AssetId,
40 pub max_drawdown: Option<Decimal>,
41}
42
43impl Default for PortfolioConfig {
44 fn default() -> Self {
45 let mut balances = HashMap::new();
46 balances.insert(AssetId::from("USDT"), Decimal::from(10_000));
47 Self {
48 initial_balances: balances,
49 reporting_currency: AssetId::from("USDT"),
50 max_drawdown: None,
51 }
52 }
53}
54
55#[derive(Clone, Debug, Default, Deserialize, Serialize)]
57pub struct SubAccount {
58 pub exchange: ExchangeId,
59 pub balances: CashBook,
60 pub positions: HashMap<Symbol, Position>,
61}
62
63impl SubAccount {
64 fn new(exchange: ExchangeId) -> Self {
65 Self {
66 exchange,
67 balances: CashBook::new(),
68 positions: HashMap::new(),
69 }
70 }
71
72 fn ensure_currency(&mut self, reporting_currency: AssetId, currency: AssetId) {
73 self.balances.0.entry(currency).or_insert(Cash {
74 currency,
75 quantity: Decimal::ZERO,
76 conversion_rate: conversion_rate_for_currency(currency, reporting_currency),
77 });
78 }
79}
80
81#[derive(Clone, Copy, Debug, Default)]
83pub struct FillLedgerImpact {
84 pub realized_pnl: Decimal,
85}
86
87pub struct Portfolio {
89 sub_accounts: HashMap<ExchangeId, SubAccount>,
90 reporting_currency: AssetId,
91 initial_equity: Price,
92 drawdown_limit: Option<Decimal>,
93 peak_equity: Price,
94 liquidate_only: bool,
95 market_registry: Arc<MarketRegistry>,
96}
97
98impl Portfolio {
99 pub fn new(config: PortfolioConfig, registry: Arc<MarketRegistry>) -> Self {
101 let limit = config.max_drawdown.filter(|value| *value > Decimal::ZERO);
102 let mut sub_accounts: HashMap<ExchangeId, SubAccount> = HashMap::new();
103 for (currency, amount) in &config.initial_balances {
104 let exchange = currency.exchange;
105 let account = sub_accounts
106 .entry(exchange)
107 .or_insert_with(|| SubAccount::new(exchange));
108 account.ensure_currency(config.reporting_currency, *currency);
109 account.balances.upsert(Cash {
110 currency: *currency,
111 quantity: *amount,
112 conversion_rate: conversion_rate_for_currency(*currency, config.reporting_currency),
113 });
114 }
115 let mut portfolio = Self {
116 sub_accounts,
117 reporting_currency: config.reporting_currency,
118 initial_equity: Decimal::ZERO,
119 drawdown_limit: limit,
120 peak_equity: Decimal::ZERO,
121 liquidate_only: false,
122 market_registry: registry,
123 };
124 portfolio.ensure_reporting_currency_entries();
125 portfolio.initial_equity = portfolio.cash_value();
126 portfolio.peak_equity = portfolio.initial_equity;
127 portfolio
128 }
129
130 pub fn from_exchange_state(
132 positions: Vec<Position>,
133 balances: Vec<AccountBalance>,
134 config: PortfolioConfig,
135 registry: Arc<MarketRegistry>,
136 preserve_metrics_from: Option<&PortfolioState>,
137 ) -> Self {
138 let mut portfolio = Self::new(config, registry);
139 portfolio.sub_accounts.clear();
140 for position in positions.into_iter() {
141 if position.quantity.is_zero() {
142 continue;
143 }
144 let exchange = position.symbol.exchange;
145 let account = portfolio.account_mut(exchange);
146 account.positions.insert(position.symbol, position);
147 }
148 for balance in balances {
149 let exchange = if balance.exchange.is_specified() {
150 balance.exchange
151 } else {
152 balance.asset.exchange
153 };
154 let reporting = portfolio.reporting_currency;
155 let account = portfolio.account_mut(exchange);
156 account.ensure_currency(reporting, balance.asset);
157 account.balances.upsert(Cash {
158 currency: balance.asset,
159 quantity: balance.available,
160 conversion_rate: conversion_rate_for_currency(balance.asset, reporting),
161 });
162 }
163 portfolio.ensure_reporting_currency_entries();
164 portfolio.initial_equity = portfolio.cash_value();
165 portfolio.peak_equity = portfolio.initial_equity;
166 portfolio.update_drawdown_state();
167 if let Some(state) = preserve_metrics_from {
168 portfolio.peak_equity = cmp::max(state.peak_equity, portfolio.initial_equity);
169 }
170 portfolio
171 }
172
173 pub fn apply_fill_positions(&mut self, fill: &Fill) -> PortfolioResult<FillLedgerImpact> {
175 let instrument = self
176 .market_registry
177 .get(fill.symbol)
178 .ok_or(PortfolioError::UnknownSymbol(fill.symbol))?;
179 let reporting = self.reporting_currency;
180 let account = self.account_mut(fill.symbol.exchange);
181 account.ensure_currency(reporting, instrument.settlement_currency);
182 account.ensure_currency(reporting, instrument.base);
183 account.ensure_currency(reporting, instrument.quote);
184 let mut realized_delta = Decimal::ZERO;
185 {
186 let entry = account.positions.entry(fill.symbol).or_insert(Position {
187 symbol: fill.symbol,
188 side: Some(fill.side),
189 quantity: Decimal::ZERO,
190 entry_price: Some(fill.fill_price),
191 unrealized_pnl: Decimal::ZERO,
192 updated_at: fill.timestamp,
193 });
194
195 if entry.side.is_none() {
196 entry.side = Some(fill.side);
197 }
198
199 match entry.side {
200 Some(side) if side == fill.side => {
201 let total_qty = entry.quantity + fill.fill_quantity;
202 let prev_cost = entry
203 .entry_price
204 .map(|price| price * entry.quantity)
205 .unwrap_or_default();
206 let new_cost = fill.fill_price * fill.fill_quantity;
207 entry.entry_price = if total_qty.is_zero() {
208 Some(fill.fill_price)
209 } else {
210 Some((prev_cost + new_cost) / total_qty)
211 };
212 entry.quantity = total_qty;
213 }
214 Some(_) => {
215 if let Some(entry_price) = entry.entry_price {
216 let closing_qty = entry.quantity.min(fill.fill_quantity);
217 realized_delta = calculate_realized_pnl(
218 &instrument.kind,
219 entry_price,
220 fill.fill_price,
221 closing_qty,
222 fill.side,
223 );
224 }
225 let remaining = entry.quantity - fill.fill_quantity;
226 if remaining > Decimal::ZERO {
227 entry.quantity = remaining;
228 } else if remaining < Decimal::ZERO {
229 entry.quantity = remaining.abs();
230 entry.side = Some(fill.side);
231 entry.entry_price = Some(fill.fill_price);
232 } else {
233 entry.quantity = Decimal::ZERO;
234 entry.side = None;
235 entry.entry_price = None;
236 }
237 }
238 None => {
239 entry.side = Some(fill.side);
240 entry.quantity = fill.fill_quantity;
241 entry.entry_price = Some(fill.fill_price);
242 }
243 }
244
245 entry.updated_at = fill.timestamp;
246 }
247
248 self.update_drawdown_state();
249 Ok(FillLedgerImpact {
250 realized_pnl: realized_delta,
251 })
252 }
253
254 pub fn apply_ledger_entries(&mut self, entries: &[LedgerEntry]) -> PortfolioResult<()> {
256 let reporting = self.reporting_currency;
257 for entry in entries {
258 let account = self.account_mut(entry.exchange);
259 account.ensure_currency(reporting, entry.asset);
260 account.balances.adjust(entry.asset, entry.amount);
261 }
262 self.update_drawdown_state();
263 Ok(())
264 }
265
266 #[must_use]
268 pub fn position(&self, symbol: impl Into<Symbol>) -> Option<&Position> {
269 let symbol = symbol.into();
270 self.sub_accounts
271 .get(&symbol.exchange)
272 .and_then(|account| account.positions.get(&symbol))
273 }
274
275 #[must_use]
277 pub fn equity(&self) -> Price {
278 let unrealized: Price = self
279 .sub_accounts
280 .values()
281 .flat_map(|account| account.positions.values())
282 .map(|p| p.unrealized_pnl)
283 .sum();
284 self.cash_value() + unrealized
285 }
286
287 #[must_use]
289 pub fn cash(&self) -> Price {
290 let mut total = Decimal::ZERO;
291 for account in self.sub_accounts.values() {
292 for (currency, cash) in account.balances.iter() {
293 if matches_reporting_currency(*currency, self.reporting_currency) {
294 total += cash.quantity;
295 }
296 }
297 }
298 total
299 }
300
301 #[must_use]
303 pub fn realized_pnl(&self) -> Price {
304 self.equity() - self.initial_equity - self.total_unrealized()
305 }
306
307 #[must_use]
309 pub fn initial_equity(&self) -> Price {
310 self.initial_equity
311 }
312
313 #[must_use]
315 pub fn positions(&self) -> Vec<Position> {
316 self.sub_accounts
317 .values()
318 .flat_map(|account| account.positions.values().cloned())
319 .collect()
320 }
321
322 #[must_use]
324 pub fn balance(&self, currency: impl Into<AssetId>) -> Option<Cash> {
325 let currency = currency.into();
326 self.sub_accounts
327 .get(¤cy.exchange)
328 .and_then(|account| account.balances.get(currency).cloned())
329 }
330
331 #[must_use]
333 pub fn exchange_equity(&self, exchange: ExchangeId) -> Price {
334 self.sub_accounts
335 .get(&exchange)
336 .map(|account| {
337 let unrealized: Price = account
338 .positions
339 .values()
340 .map(|position| position.unrealized_pnl)
341 .sum();
342 account.balances.total_value() + unrealized
343 })
344 .unwrap_or_default()
345 }
346
347 #[must_use]
349 pub fn signed_position_qty(&self, symbol: impl Into<Symbol>) -> Quantity {
350 let symbol = symbol.into();
351 self.sub_accounts
352 .get(&symbol.exchange)
353 .and_then(|account| account.positions.get(&symbol))
354 .map(|position| match position.side {
355 Some(Side::Buy) => position.quantity,
356 Some(Side::Sell) => -position.quantity,
357 None => Decimal::ZERO,
358 })
359 .unwrap_or(Decimal::ZERO)
360 }
361
362 #[must_use]
364 pub fn liquidate_only(&self) -> bool {
365 self.liquidate_only
366 }
367
368 pub fn set_liquidate_only(&mut self, enabled: bool) -> bool {
370 if self.liquidate_only == enabled {
371 false
372 } else {
373 self.liquidate_only = enabled;
374 true
375 }
376 }
377
378 #[must_use]
380 pub fn snapshot(&self) -> PortfolioState {
381 PortfolioState {
382 positions: self.aggregate_positions_map(),
383 balances: self.aggregate_balances(),
384 reporting_currency: self.reporting_currency,
385 initial_equity: self.initial_equity,
386 drawdown_limit: self.drawdown_limit,
387 peak_equity: self.peak_equity,
388 liquidate_only: self.liquidate_only,
389 sub_accounts: self
390 .sub_accounts
391 .iter()
392 .map(|(exchange, account)| {
393 (
394 *exchange,
395 SubAccountState {
396 exchange: *exchange,
397 balances: account.balances.clone(),
398 positions: account.positions.clone(),
399 },
400 )
401 })
402 .collect(),
403 }
404 }
405
406 pub fn from_state(
408 state: PortfolioState,
409 config: PortfolioConfig,
410 registry: Arc<MarketRegistry>,
411 ) -> Self {
412 let drawdown_limit = config
413 .max_drawdown
414 .filter(|value| *value > Decimal::ZERO)
415 .or(state.drawdown_limit);
416 let mut portfolio = Self {
417 sub_accounts: HashMap::new(),
418 reporting_currency: state.reporting_currency,
419 initial_equity: state.initial_equity,
420 drawdown_limit,
421 peak_equity: cmp::max(state.peak_equity, state.initial_equity),
422 liquidate_only: state.liquidate_only,
423 market_registry: registry,
424 };
425 if state.sub_accounts.is_empty() {
426 for (symbol, position) in state.positions {
427 let account = portfolio.account_mut(symbol.exchange);
428 account.positions.insert(symbol, position);
429 }
430 for (asset, cash) in state.balances.iter() {
431 let account = portfolio.account_mut(asset.exchange);
432 account.balances.upsert(cash.clone());
433 }
434 } else {
435 for (exchange, snapshot) in state.sub_accounts {
436 let mut account = SubAccount::new(exchange);
437 account.positions = snapshot.positions;
438 account.balances = snapshot.balances;
439 portfolio.sub_accounts.insert(exchange, account);
440 }
441 }
442 portfolio.ensure_reporting_currency_entries();
443 portfolio.update_drawdown_state();
444 portfolio
445 }
446
447 pub fn update_market_data(
449 &mut self,
450 symbol: impl Into<Symbol>,
451 price: Price,
452 ) -> PortfolioResult<bool> {
453 let symbol = symbol.into();
454 let instrument = self
455 .market_registry
456 .get(symbol)
457 .ok_or(PortfolioError::UnknownSymbol(symbol))?;
458 let reporting = self.reporting_currency;
459 let account = self.account_mut(symbol.exchange);
460 let mut updated = false;
461 if let Some(position) = account.positions.get_mut(&symbol) {
462 update_unrealized(position, &instrument, price);
463 updated = true;
464 }
465 Self::update_conversion_rates(account, reporting, &instrument, price);
466 self.update_drawdown_state();
467 Ok(updated)
468 }
469
470 fn update_drawdown_state(&mut self) {
471 let equity = self.equity();
472 if equity > self.peak_equity {
473 self.peak_equity = equity;
474 }
475 if let Some(limit) = self.drawdown_limit {
476 if self.peak_equity > Decimal::ZERO {
477 let drawdown = (self.peak_equity - equity) / self.peak_equity;
478 if drawdown >= limit {
479 self.liquidate_only = true;
480 }
481 }
482 }
483 }
484
485 fn update_conversion_rates(
486 account: &mut SubAccount,
487 reporting_currency: AssetId,
488 instrument: &Instrument,
489 price: Price,
490 ) {
491 if instrument.quote == reporting_currency {
492 account.ensure_currency(reporting_currency, instrument.base);
493 account.ensure_currency(reporting_currency, instrument.quote);
494 account
495 .balances
496 .update_conversion_rate(instrument.base, price);
497 account
498 .balances
499 .update_conversion_rate(instrument.quote, Decimal::ONE);
500 return;
501 }
502 if instrument.base == reporting_currency && !price.is_zero() {
503 account.ensure_currency(reporting_currency, instrument.quote);
504 account.ensure_currency(reporting_currency, instrument.base);
505 account
506 .balances
507 .update_conversion_rate(instrument.base, Decimal::ONE);
508 account
509 .balances
510 .update_conversion_rate(instrument.quote, Decimal::ONE / price);
511 return;
512 }
513 let quote_rate = account
514 .balances
515 .get(instrument.quote)
516 .map(|cash| cash.conversion_rate)
517 .unwrap_or(Decimal::ZERO);
518 if quote_rate > Decimal::ZERO {
519 account.ensure_currency(reporting_currency, instrument.base);
520 account
521 .balances
522 .update_conversion_rate(instrument.base, price * quote_rate);
523 }
524 let base_rate = account
525 .balances
526 .get(instrument.base)
527 .map(|cash| cash.conversion_rate)
528 .unwrap_or(Decimal::ZERO);
529 if base_rate > Decimal::ZERO && !price.is_zero() {
530 account.ensure_currency(reporting_currency, instrument.quote);
531 account
532 .balances
533 .update_conversion_rate(instrument.quote, base_rate / price);
534 }
535 }
536
537 fn account_mut(&mut self, exchange: ExchangeId) -> &mut SubAccount {
538 self.sub_accounts.entry(exchange).or_insert_with(|| {
539 let mut account = SubAccount::new(exchange);
540 account.ensure_currency(self.reporting_currency, self.reporting_currency);
541 account
542 })
543 }
544
545 fn ensure_reporting_currency_entries(&mut self) {
546 let reporting = self.reporting_currency;
547 for account in self.sub_accounts.values_mut() {
548 account.ensure_currency(reporting, reporting);
549 }
550 }
551
552 fn aggregate_positions_map(&self) -> HashMap<Symbol, Position> {
553 self.sub_accounts
554 .values()
555 .flat_map(|account| account.positions.iter())
556 .map(|(symbol, position)| (*symbol, position.clone()))
557 .collect()
558 }
559
560 fn aggregate_balances(&self) -> CashBook {
561 let mut combined = CashBook::new();
562 for account in self.sub_accounts.values() {
563 for (asset, cash) in account.balances.iter() {
564 let entry = combined.0.entry(*asset).or_insert(Cash {
565 currency: *asset,
566 quantity: Decimal::ZERO,
567 conversion_rate: cash.conversion_rate,
568 });
569 entry.quantity += cash.quantity;
570 if cash.conversion_rate > Decimal::ZERO {
571 entry.conversion_rate = cash.conversion_rate;
572 }
573 }
574 }
575 combined
576 }
577
578 fn total_unrealized(&self) -> Price {
579 self.sub_accounts
580 .values()
581 .flat_map(|account| account.positions.values())
582 .map(|position| position.unrealized_pnl)
583 .sum()
584 }
585
586 fn cash_value(&self) -> Price {
587 self.sub_accounts
588 .values()
589 .map(|account| account.balances.total_value())
590 .sum()
591 }
592}
593
594fn calculate_realized_pnl(
595 kind: &InstrumentKind,
596 entry_price: Price,
597 exit_price: Price,
598 quantity: Quantity,
599 exit_side: Side,
600) -> Price {
601 match kind {
602 InstrumentKind::Spot => Decimal::ZERO,
603 InstrumentKind::LinearPerpetual => {
604 let delta = match exit_side {
605 Side::Buy => entry_price - exit_price,
606 Side::Sell => exit_price - entry_price,
607 };
608 delta * quantity
609 }
610 InstrumentKind::InversePerpetual => {
611 if entry_price.is_zero() || exit_price.is_zero() {
612 return Decimal::ZERO;
613 }
614 let inv_entry = Decimal::ONE / entry_price;
615 let inv_exit = Decimal::ONE / exit_price;
616 let delta = match exit_side {
617 Side::Buy => inv_entry - inv_exit,
618 Side::Sell => inv_exit - inv_entry,
619 };
620 delta * quantity
621 }
622 }
623}
624
625fn update_unrealized(position: &mut Position, instrument: &Instrument, price: Price) {
626 position.unrealized_pnl = match (instrument.kind, position.entry_price, position.side) {
627 (InstrumentKind::Spot, _, _) => Decimal::ZERO,
628 (InstrumentKind::LinearPerpetual, Some(entry), Some(side)) => {
629 let delta = match side {
630 Side::Buy => price - entry,
631 Side::Sell => entry - price,
632 };
633 delta * position.quantity
634 }
635 (InstrumentKind::InversePerpetual, Some(entry), Some(side)) => {
636 if price.is_zero() || entry.is_zero() {
637 Decimal::ZERO
638 } else {
639 let inv_entry = Decimal::ONE / entry;
640 let inv_price = Decimal::ONE / price;
641 let delta = match side {
642 Side::Buy => inv_price - inv_entry,
643 Side::Sell => inv_entry - inv_price,
644 };
645 delta * position.quantity
646 }
647 }
648 _ => Decimal::ZERO,
649 };
650 position.updated_at = Utc::now();
651}
652
653fn matches_reporting_currency(currency: AssetId, reporting: AssetId) -> bool {
654 if currency == reporting {
655 return true;
656 }
657 reporting.exchange == ExchangeId::UNSPECIFIED && currency.code() == reporting.code()
658}
659
660fn conversion_rate_for_currency(currency: AssetId, reporting: AssetId) -> Price {
661 if matches_reporting_currency(currency, reporting) {
662 Decimal::ONE
663 } else {
664 Decimal::ZERO
665 }
666}
667
668#[derive(Clone, Debug, Default, Deserialize, Serialize)]
670pub struct SubAccountState {
671 pub exchange: ExchangeId,
672 pub positions: HashMap<Symbol, Position>,
673 pub balances: CashBook,
674}
675
676#[derive(Clone, Debug, Default, Deserialize, Serialize)]
678pub struct PortfolioState {
679 pub positions: HashMap<Symbol, Position>,
680 pub balances: CashBook,
681 pub reporting_currency: AssetId,
682 pub initial_equity: Price,
683 pub drawdown_limit: Option<Decimal>,
684 pub peak_equity: Price,
685 pub liquidate_only: bool,
686 #[serde(default)]
687 pub sub_accounts: HashMap<ExchangeId, SubAccountState>,
688}
689
690#[derive(Clone, Debug, Default, Deserialize, Serialize)]
692pub struct ExecutionCheckpoint {
693 pub last_timestamp: Option<DateTime<Utc>>,
694 #[serde(default)]
695 pub exec_ids: Vec<String>,
696}
697
698#[derive(Clone, Debug, Default, Deserialize, Serialize)]
700pub struct LiveState {
701 pub portfolio: Option<PortfolioState>,
702 pub open_orders: Vec<Order>,
703 pub last_prices: HashMap<Symbol, Price>,
704 pub last_candle_ts: Option<DateTime<Utc>>,
705 pub strategy_state: Option<serde_json::Value>,
706 #[serde(default)]
707 pub execution_checkpoint: ExecutionCheckpoint,
708}
709
710pub trait StateRepository: Send + Sync + 'static {
712 type Snapshot: Serialize + DeserializeOwned + Send + Sync + 'static;
714
715 fn load(&self) -> PortfolioResult<Self::Snapshot>;
717 fn save(&self, state: &Self::Snapshot) -> PortfolioResult<()>;
719}
720
721const STATE_SCHEMA: &str = r#"
722CREATE TABLE IF NOT EXISTS state (
723 id INTEGER PRIMARY KEY CHECK (id = 1),
724 payload TEXT NOT NULL,
725 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
726);
727"#;
728
729#[derive(Clone)]
731pub struct SqliteStateRepository {
732 path: PathBuf,
733}
734
735impl SqliteStateRepository {
736 pub fn new(path: PathBuf) -> Self {
738 Self { path }
739 }
740
741 fn connect(&self) -> PortfolioResult<Connection> {
742 if let Some(parent) = self.path.parent() {
743 fs::create_dir_all(parent).map_err(|err| {
744 PortfolioError::Internal(format!(
745 "failed to create state directory {}: {err}",
746 parent.display()
747 ))
748 })?;
749 }
750 let conn = Connection::open(&self.path).map_err(|err| {
751 PortfolioError::Internal(format!(
752 "failed to open state database {}: {err}",
753 self.path.display()
754 ))
755 })?;
756 conn.execute_batch("PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL;")
757 .map_err(|err| {
758 PortfolioError::Internal(format!("failed to configure sqlite: {err}"))
759 })?;
760 conn.execute_batch(STATE_SCHEMA)
761 .map_err(|err| PortfolioError::Internal(format!("failed to apply schema: {err}")))?;
762 Ok(conn)
763 }
764}
765
766impl StateRepository for SqliteStateRepository {
767 type Snapshot = LiveState;
768
769 fn load(&self) -> PortfolioResult<LiveState> {
770 let conn = self.connect()?;
771 let payload: Option<String> = conn
772 .query_row("SELECT payload FROM state WHERE id = 1", [], |row| {
773 row.get(0)
774 })
775 .optional()
776 .map_err(|err| PortfolioError::Internal(format!("failed to read state: {err}")))?;
777 if let Some(json) = payload {
778 serde_json::from_str(&json).map_err(|err| {
779 PortfolioError::Internal(format!("failed to decode persisted state: {err}"))
780 })
781 } else {
782 Ok(LiveState::default())
783 }
784 }
785
786 fn save(&self, state: &LiveState) -> PortfolioResult<()> {
787 let mut conn = self.connect()?;
788 let payload = serde_json::to_string(state).map_err(|err| {
789 PortfolioError::Internal(format!("failed to serialize live state: {err}"))
790 })?;
791 let tx = conn.transaction().map_err(|err| {
792 PortfolioError::Internal(format!("failed to begin transaction: {err}"))
793 })?;
794 tx.execute(
795 "INSERT INTO state (id, payload, updated_at)
796 VALUES (1, ?, CURRENT_TIMESTAMP)
797 ON CONFLICT(id) DO UPDATE SET payload=excluded.payload, updated_at=CURRENT_TIMESTAMP",
798 params![payload],
799 )
800 .map_err(|err| PortfolioError::Internal(format!("failed to upsert state row: {err}")))?;
801 tx.commit()
802 .map_err(|err| PortfolioError::Internal(format!("failed to commit state: {err}")))?;
803 Ok(())
804 }
805}
806
807#[cfg(test)]
808mod tests {
809 use super::*;
810 use chrono::Utc;
811 use std::sync::Arc;
812 use tesser_core::{AssetId, ExchangeId, Instrument, InstrumentKind, Side, Symbol};
813 use tesser_ledger::{entries_from_fill, FillLedgerContext};
814
815 fn sample_fill(side: Side, price: Price, qty: Quantity) -> Fill {
816 Fill {
817 order_id: uuid::Uuid::new_v4().to_string(),
818 symbol: Symbol::from("BTCUSDT"),
819 side,
820 fill_price: price,
821 fill_quantity: qty,
822 fee: None,
823 fee_asset: None,
824 timestamp: Utc::now(),
825 }
826 }
827
828 fn sample_registry() -> Arc<MarketRegistry> {
829 let instrument = Instrument {
830 symbol: "BTCUSDT".into(),
831 base: "BTC".into(),
832 quote: "USDT".into(),
833 kind: InstrumentKind::LinearPerpetual,
834 settlement_currency: "USDT".into(),
835 tick_size: Decimal::new(1, 0),
836 lot_size: Decimal::new(1, 0),
837 };
838 Arc::new(MarketRegistry::from_instruments(vec![instrument]).unwrap())
839 }
840
841 #[test]
842 fn portfolio_updates_equity() {
843 let mut portfolio = Portfolio::new(PortfolioConfig::default(), sample_registry());
844 let buy = sample_fill(Side::Buy, Decimal::from(50_000), Decimal::new(1, 1));
845 apply_with_ledger(&mut portfolio, &buy);
846 assert!(portfolio.cash() < Decimal::from(10_000));
847 }
848
849 #[test]
850 fn triggers_liquidate_only_on_drawdown() {
851 let registry = sample_registry();
852 let config = PortfolioConfig {
853 max_drawdown: Some(Decimal::new(2, 2)), ..PortfolioConfig::default()
855 };
856 let mut portfolio = Portfolio::new(config, registry.clone());
857 let buy = sample_fill(Side::Buy, Decimal::from(10), Decimal::from(10));
858 apply_with_ledger(&mut portfolio, &buy);
859 assert!(!portfolio.liquidate_only());
860 portfolio
862 .update_market_data(buy.symbol, Decimal::ZERO)
863 .unwrap();
864 assert!(portfolio.liquidate_only());
865 }
866
867 #[test]
868 fn rebuild_preserves_peak_equity_from_previous_state() {
869 let registry = sample_registry();
870 let preserved = PortfolioState {
871 reporting_currency: AssetId::from("USDT"),
872 initial_equity: Decimal::from(10_000),
873 peak_equity: Decimal::from(25_000),
874 ..PortfolioState::default()
875 };
876 let balances = vec![AccountBalance {
877 exchange: ExchangeId::UNSPECIFIED,
878 asset: AssetId::from("USDT"),
879 total: Decimal::from(10_000),
880 available: Decimal::from(10_000),
881 updated_at: Utc::now(),
882 }];
883 let config = PortfolioConfig {
884 reporting_currency: AssetId::from("USDT"),
885 ..PortfolioConfig::default()
886 };
887 let portfolio = Portfolio::from_exchange_state(
888 Vec::new(),
889 balances,
890 config,
891 registry,
892 Some(&preserved),
893 );
894 let snapshot = portfolio.snapshot();
895 assert_eq!(snapshot.peak_equity, Decimal::from(25_000));
896 }
897
898 fn apply_with_ledger(portfolio: &mut Portfolio, fill: &Fill) {
899 let impact = portfolio.apply_fill_positions(fill).unwrap();
900 let registry = sample_registry();
901 let instrument = registry.get(fill.symbol).unwrap();
902 let entries = entries_from_fill(FillLedgerContext::new(
903 fill,
904 &instrument,
905 impact.realized_pnl,
906 ));
907 portfolio.apply_ledger_entries(&entries).unwrap();
908 }
909}