tesser_portfolio/
lib.rs

1//! Portfolio accounting primitives.
2
3use std::cmp;
4use std::collections::HashMap;
5use std::fs;
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use chrono::{DateTime, Utc};
10use rusqlite::{params, Connection, OptionalExtension};
11use rust_decimal::Decimal;
12use serde::{de::DeserializeOwned, Deserialize, Serialize};
13use tesser_core::{
14    AccountBalance, AssetId, Cash, CashBook, ExchangeId, Fill, Instrument, InstrumentKind, Order,
15    Position, Price, Quantity, Side, Symbol,
16};
17use tesser_ledger::LedgerEntry;
18use tesser_markets::MarketRegistry;
19use thiserror::Error;
20
21/// Result alias for portfolio operations.
22pub type PortfolioResult<T> = Result<T, PortfolioError>;
23
24/// Portfolio-specific error type.
25#[derive(Debug, Error)]
26pub enum PortfolioError {
27    /// Raised when a fill references a symbol that is not being tracked yet.
28    #[error("unknown symbol: {0}")]
29    UnknownSymbol(Symbol),
30    /// Wraps any other issues surfaced by dependencies.
31    #[error("internal error: {0}")]
32    Internal(String),
33}
34
35/// Configuration used when instantiating a portfolio.
36#[derive(Clone, Debug, Deserialize, Serialize)]
37pub struct PortfolioConfig {
38    pub initial_balances: HashMap<AssetId, Price>,
39    pub reporting_currency: AssetId,
40    pub max_drawdown: Option<Decimal>,
41}
42
43impl Default for PortfolioConfig {
44    fn default() -> Self {
45        let mut balances = HashMap::new();
46        balances.insert(AssetId::from("USDT"), Decimal::from(10_000));
47        Self {
48            initial_balances: balances,
49            reporting_currency: AssetId::from("USDT"),
50            max_drawdown: None,
51        }
52    }
53}
54
55/// Aggregated ledger and position data for a specific venue.
56#[derive(Clone, Debug, Default, Deserialize, Serialize)]
57pub struct SubAccount {
58    pub exchange: ExchangeId,
59    pub balances: CashBook,
60    pub positions: HashMap<Symbol, Position>,
61}
62
63impl SubAccount {
64    fn new(exchange: ExchangeId) -> Self {
65        Self {
66            exchange,
67            balances: CashBook::new(),
68            positions: HashMap::new(),
69        }
70    }
71
72    fn ensure_currency(&mut self, reporting_currency: AssetId, currency: AssetId) {
73        self.balances.0.entry(currency).or_insert(Cash {
74            currency,
75            quantity: Decimal::ZERO,
76            conversion_rate: conversion_rate_for_currency(currency, reporting_currency),
77        });
78    }
79}
80
81/// Summary of the realized cash deltas generated by a fill.
82#[derive(Clone, Copy, Debug, Default)]
83pub struct FillLedgerImpact {
84    pub realized_pnl: Decimal,
85}
86
87/// Stores aggregate positions keyed by symbol.
88pub struct Portfolio {
89    sub_accounts: HashMap<ExchangeId, SubAccount>,
90    reporting_currency: AssetId,
91    initial_equity: Price,
92    drawdown_limit: Option<Decimal>,
93    peak_equity: Price,
94    liquidate_only: bool,
95    market_registry: Arc<MarketRegistry>,
96}
97
98impl Portfolio {
99    /// Instantiate a new portfolio with default configuration.
100    pub fn new(config: PortfolioConfig, registry: Arc<MarketRegistry>) -> Self {
101        let limit = config.max_drawdown.filter(|value| *value > Decimal::ZERO);
102        let mut sub_accounts: HashMap<ExchangeId, SubAccount> = HashMap::new();
103        for (currency, amount) in &config.initial_balances {
104            let exchange = currency.exchange;
105            let account = sub_accounts
106                .entry(exchange)
107                .or_insert_with(|| SubAccount::new(exchange));
108            account.ensure_currency(config.reporting_currency, *currency);
109            account.balances.upsert(Cash {
110                currency: *currency,
111                quantity: *amount,
112                conversion_rate: conversion_rate_for_currency(*currency, config.reporting_currency),
113            });
114        }
115        let mut portfolio = Self {
116            sub_accounts,
117            reporting_currency: config.reporting_currency,
118            initial_equity: Decimal::ZERO,
119            drawdown_limit: limit,
120            peak_equity: Decimal::ZERO,
121            liquidate_only: false,
122            market_registry: registry,
123        };
124        portfolio.ensure_reporting_currency_entries();
125        portfolio.initial_equity = portfolio.cash_value();
126        portfolio.peak_equity = portfolio.initial_equity;
127        portfolio
128    }
129
130    /// Build a portfolio snapshot from live exchange balances and positions.
131    pub fn from_exchange_state(
132        positions: Vec<Position>,
133        balances: Vec<AccountBalance>,
134        config: PortfolioConfig,
135        registry: Arc<MarketRegistry>,
136        preserve_metrics_from: Option<&PortfolioState>,
137    ) -> Self {
138        let mut portfolio = Self::new(config, registry);
139        portfolio.sub_accounts.clear();
140        for position in positions.into_iter() {
141            if position.quantity.is_zero() {
142                continue;
143            }
144            let exchange = position.symbol.exchange;
145            let account = portfolio.account_mut(exchange);
146            account.positions.insert(position.symbol, position);
147        }
148        for balance in balances {
149            let exchange = if balance.exchange.is_specified() {
150                balance.exchange
151            } else {
152                balance.asset.exchange
153            };
154            let reporting = portfolio.reporting_currency;
155            let account = portfolio.account_mut(exchange);
156            account.ensure_currency(reporting, balance.asset);
157            account.balances.upsert(Cash {
158                currency: balance.asset,
159                quantity: balance.available,
160                conversion_rate: conversion_rate_for_currency(balance.asset, reporting),
161            });
162        }
163        portfolio.ensure_reporting_currency_entries();
164        portfolio.initial_equity = portfolio.cash_value();
165        portfolio.peak_equity = portfolio.initial_equity;
166        portfolio.update_drawdown_state();
167        if let Some(state) = preserve_metrics_from {
168            portfolio.peak_equity = cmp::max(state.peak_equity, portfolio.initial_equity);
169        }
170        portfolio
171    }
172
173    /// Update positions in response to an execution fill and return realized PnL.
174    pub fn apply_fill_positions(&mut self, fill: &Fill) -> PortfolioResult<FillLedgerImpact> {
175        let instrument = self
176            .market_registry
177            .get(fill.symbol)
178            .ok_or(PortfolioError::UnknownSymbol(fill.symbol))?;
179        let reporting = self.reporting_currency;
180        let account = self.account_mut(fill.symbol.exchange);
181        account.ensure_currency(reporting, instrument.settlement_currency);
182        account.ensure_currency(reporting, instrument.base);
183        account.ensure_currency(reporting, instrument.quote);
184        let mut realized_delta = Decimal::ZERO;
185        {
186            let entry = account.positions.entry(fill.symbol).or_insert(Position {
187                symbol: fill.symbol,
188                side: Some(fill.side),
189                quantity: Decimal::ZERO,
190                entry_price: Some(fill.fill_price),
191                unrealized_pnl: Decimal::ZERO,
192                updated_at: fill.timestamp,
193            });
194
195            if entry.side.is_none() {
196                entry.side = Some(fill.side);
197            }
198
199            match entry.side {
200                Some(side) if side == fill.side => {
201                    let total_qty = entry.quantity + fill.fill_quantity;
202                    let prev_cost = entry
203                        .entry_price
204                        .map(|price| price * entry.quantity)
205                        .unwrap_or_default();
206                    let new_cost = fill.fill_price * fill.fill_quantity;
207                    entry.entry_price = if total_qty.is_zero() {
208                        Some(fill.fill_price)
209                    } else {
210                        Some((prev_cost + new_cost) / total_qty)
211                    };
212                    entry.quantity = total_qty;
213                }
214                Some(_) => {
215                    if let Some(entry_price) = entry.entry_price {
216                        let closing_qty = entry.quantity.min(fill.fill_quantity);
217                        realized_delta = calculate_realized_pnl(
218                            &instrument.kind,
219                            entry_price,
220                            fill.fill_price,
221                            closing_qty,
222                            fill.side,
223                        );
224                    }
225                    let remaining = entry.quantity - fill.fill_quantity;
226                    if remaining > Decimal::ZERO {
227                        entry.quantity = remaining;
228                    } else if remaining < Decimal::ZERO {
229                        entry.quantity = remaining.abs();
230                        entry.side = Some(fill.side);
231                        entry.entry_price = Some(fill.fill_price);
232                    } else {
233                        entry.quantity = Decimal::ZERO;
234                        entry.side = None;
235                        entry.entry_price = None;
236                    }
237                }
238                None => {
239                    entry.side = Some(fill.side);
240                    entry.quantity = fill.fill_quantity;
241                    entry.entry_price = Some(fill.fill_price);
242                }
243            }
244
245            entry.updated_at = fill.timestamp;
246        }
247
248        self.update_drawdown_state();
249        Ok(FillLedgerImpact {
250            realized_pnl: realized_delta,
251        })
252    }
253
254    /// Apply externally generated ledger entries to the cash book.
255    pub fn apply_ledger_entries(&mut self, entries: &[LedgerEntry]) -> PortfolioResult<()> {
256        let reporting = self.reporting_currency;
257        for entry in entries {
258            let account = self.account_mut(entry.exchange);
259            account.ensure_currency(reporting, entry.asset);
260            account.balances.adjust(entry.asset, entry.amount);
261        }
262        self.update_drawdown_state();
263        Ok(())
264    }
265
266    /// Retrieve a position snapshot for a symbol.
267    #[must_use]
268    pub fn position(&self, symbol: impl Into<Symbol>) -> Option<&Position> {
269        let symbol = symbol.into();
270        self.sub_accounts
271            .get(&symbol.exchange)
272            .and_then(|account| account.positions.get(&symbol))
273    }
274
275    /// Total net asset value (cash + unrealized PnL).
276    #[must_use]
277    pub fn equity(&self) -> Price {
278        let unrealized: Price = self
279            .sub_accounts
280            .values()
281            .flat_map(|account| account.positions.values())
282            .map(|p| p.unrealized_pnl)
283            .sum();
284        self.cash_value() + unrealized
285    }
286
287    /// Cash on hand that is not locked in positions.
288    #[must_use]
289    pub fn cash(&self) -> Price {
290        let mut total = Decimal::ZERO;
291        for account in self.sub_accounts.values() {
292            for (currency, cash) in account.balances.iter() {
293                if matches_reporting_currency(*currency, self.reporting_currency) {
294                    total += cash.quantity;
295                }
296            }
297        }
298        total
299    }
300
301    /// Realized profit and loss across all closed positions.
302    #[must_use]
303    pub fn realized_pnl(&self) -> Price {
304        self.equity() - self.initial_equity - self.total_unrealized()
305    }
306
307    /// Initial capital provided to the portfolio.
308    #[must_use]
309    pub fn initial_equity(&self) -> Price {
310        self.initial_equity
311    }
312
313    /// Clone all tracked positions for external consumers (e.g., strategies).
314    #[must_use]
315    pub fn positions(&self) -> Vec<Position> {
316        self.sub_accounts
317            .values()
318            .flat_map(|account| account.positions.values().cloned())
319            .collect()
320    }
321
322    /// Return the available cash balance for a specific asset, if tracked.
323    #[must_use]
324    pub fn balance(&self, currency: impl Into<AssetId>) -> Option<Cash> {
325        let currency = currency.into();
326        self.sub_accounts
327            .get(&currency.exchange)
328            .and_then(|account| account.balances.get(currency).cloned())
329    }
330
331    /// Compute the aggregate equity for a single exchange.
332    #[must_use]
333    pub fn exchange_equity(&self, exchange: ExchangeId) -> Price {
334        self.sub_accounts
335            .get(&exchange)
336            .map(|account| {
337                let unrealized: Price = account
338                    .positions
339                    .values()
340                    .map(|position| position.unrealized_pnl)
341                    .sum();
342                account.balances.total_value() + unrealized
343            })
344            .unwrap_or_default()
345    }
346
347    /// Signed position quantity helper (long positive, short negative).
348    #[must_use]
349    pub fn signed_position_qty(&self, symbol: impl Into<Symbol>) -> Quantity {
350        let symbol = symbol.into();
351        self.sub_accounts
352            .get(&symbol.exchange)
353            .and_then(|account| account.positions.get(&symbol))
354            .map(|position| match position.side {
355                Some(Side::Buy) => position.quantity,
356                Some(Side::Sell) => -position.quantity,
357                None => Decimal::ZERO,
358            })
359            .unwrap_or(Decimal::ZERO)
360    }
361
362    /// Whether the portfolio currently allows only exposure-reducing orders.
363    #[must_use]
364    pub fn liquidate_only(&self) -> bool {
365        self.liquidate_only
366    }
367
368    /// Forcefully toggle liquidate-only mode. Returns true if the flag changed.
369    pub fn set_liquidate_only(&mut self, enabled: bool) -> bool {
370        if self.liquidate_only == enabled {
371            false
372        } else {
373            self.liquidate_only = enabled;
374            true
375        }
376    }
377
378    /// Snapshot the current state for persistence.
379    #[must_use]
380    pub fn snapshot(&self) -> PortfolioState {
381        PortfolioState {
382            positions: self.aggregate_positions_map(),
383            balances: self.aggregate_balances(),
384            reporting_currency: self.reporting_currency,
385            initial_equity: self.initial_equity,
386            drawdown_limit: self.drawdown_limit,
387            peak_equity: self.peak_equity,
388            liquidate_only: self.liquidate_only,
389            sub_accounts: self
390                .sub_accounts
391                .iter()
392                .map(|(exchange, account)| {
393                    (
394                        *exchange,
395                        SubAccountState {
396                            exchange: *exchange,
397                            balances: account.balances.clone(),
398                            positions: account.positions.clone(),
399                        },
400                    )
401                })
402                .collect(),
403        }
404    }
405
406    /// Rehydrate a portfolio from a persisted snapshot.
407    pub fn from_state(
408        state: PortfolioState,
409        config: PortfolioConfig,
410        registry: Arc<MarketRegistry>,
411    ) -> Self {
412        let drawdown_limit = config
413            .max_drawdown
414            .filter(|value| *value > Decimal::ZERO)
415            .or(state.drawdown_limit);
416        let mut portfolio = Self {
417            sub_accounts: HashMap::new(),
418            reporting_currency: state.reporting_currency,
419            initial_equity: state.initial_equity,
420            drawdown_limit,
421            peak_equity: cmp::max(state.peak_equity, state.initial_equity),
422            liquidate_only: state.liquidate_only,
423            market_registry: registry,
424        };
425        if state.sub_accounts.is_empty() {
426            for (symbol, position) in state.positions {
427                let account = portfolio.account_mut(symbol.exchange);
428                account.positions.insert(symbol, position);
429            }
430            for (asset, cash) in state.balances.iter() {
431                let account = portfolio.account_mut(asset.exchange);
432                account.balances.upsert(cash.clone());
433            }
434        } else {
435            for (exchange, snapshot) in state.sub_accounts {
436                let mut account = SubAccount::new(exchange);
437                account.positions = snapshot.positions;
438                account.balances = snapshot.balances;
439                portfolio.sub_accounts.insert(exchange, account);
440            }
441        }
442        portfolio.ensure_reporting_currency_entries();
443        portfolio.update_drawdown_state();
444        portfolio
445    }
446
447    /// Refresh mark-to-market pricing and conversion rates for a symbol.
448    pub fn update_market_data(
449        &mut self,
450        symbol: impl Into<Symbol>,
451        price: Price,
452    ) -> PortfolioResult<bool> {
453        let symbol = symbol.into();
454        let instrument = self
455            .market_registry
456            .get(symbol)
457            .ok_or(PortfolioError::UnknownSymbol(symbol))?;
458        let reporting = self.reporting_currency;
459        let account = self.account_mut(symbol.exchange);
460        let mut updated = false;
461        if let Some(position) = account.positions.get_mut(&symbol) {
462            update_unrealized(position, &instrument, price);
463            updated = true;
464        }
465        Self::update_conversion_rates(account, reporting, &instrument, price);
466        self.update_drawdown_state();
467        Ok(updated)
468    }
469
470    fn update_drawdown_state(&mut self) {
471        let equity = self.equity();
472        if equity > self.peak_equity {
473            self.peak_equity = equity;
474        }
475        if let Some(limit) = self.drawdown_limit {
476            if self.peak_equity > Decimal::ZERO {
477                let drawdown = (self.peak_equity - equity) / self.peak_equity;
478                if drawdown >= limit {
479                    self.liquidate_only = true;
480                }
481            }
482        }
483    }
484
485    fn update_conversion_rates(
486        account: &mut SubAccount,
487        reporting_currency: AssetId,
488        instrument: &Instrument,
489        price: Price,
490    ) {
491        if instrument.quote == reporting_currency {
492            account.ensure_currency(reporting_currency, instrument.base);
493            account.ensure_currency(reporting_currency, instrument.quote);
494            account
495                .balances
496                .update_conversion_rate(instrument.base, price);
497            account
498                .balances
499                .update_conversion_rate(instrument.quote, Decimal::ONE);
500            return;
501        }
502        if instrument.base == reporting_currency && !price.is_zero() {
503            account.ensure_currency(reporting_currency, instrument.quote);
504            account.ensure_currency(reporting_currency, instrument.base);
505            account
506                .balances
507                .update_conversion_rate(instrument.base, Decimal::ONE);
508            account
509                .balances
510                .update_conversion_rate(instrument.quote, Decimal::ONE / price);
511            return;
512        }
513        let quote_rate = account
514            .balances
515            .get(instrument.quote)
516            .map(|cash| cash.conversion_rate)
517            .unwrap_or(Decimal::ZERO);
518        if quote_rate > Decimal::ZERO {
519            account.ensure_currency(reporting_currency, instrument.base);
520            account
521                .balances
522                .update_conversion_rate(instrument.base, price * quote_rate);
523        }
524        let base_rate = account
525            .balances
526            .get(instrument.base)
527            .map(|cash| cash.conversion_rate)
528            .unwrap_or(Decimal::ZERO);
529        if base_rate > Decimal::ZERO && !price.is_zero() {
530            account.ensure_currency(reporting_currency, instrument.quote);
531            account
532                .balances
533                .update_conversion_rate(instrument.quote, base_rate / price);
534        }
535    }
536
537    fn account_mut(&mut self, exchange: ExchangeId) -> &mut SubAccount {
538        self.sub_accounts.entry(exchange).or_insert_with(|| {
539            let mut account = SubAccount::new(exchange);
540            account.ensure_currency(self.reporting_currency, self.reporting_currency);
541            account
542        })
543    }
544
545    fn ensure_reporting_currency_entries(&mut self) {
546        let reporting = self.reporting_currency;
547        for account in self.sub_accounts.values_mut() {
548            account.ensure_currency(reporting, reporting);
549        }
550    }
551
552    fn aggregate_positions_map(&self) -> HashMap<Symbol, Position> {
553        self.sub_accounts
554            .values()
555            .flat_map(|account| account.positions.iter())
556            .map(|(symbol, position)| (*symbol, position.clone()))
557            .collect()
558    }
559
560    fn aggregate_balances(&self) -> CashBook {
561        let mut combined = CashBook::new();
562        for account in self.sub_accounts.values() {
563            for (asset, cash) in account.balances.iter() {
564                let entry = combined.0.entry(*asset).or_insert(Cash {
565                    currency: *asset,
566                    quantity: Decimal::ZERO,
567                    conversion_rate: cash.conversion_rate,
568                });
569                entry.quantity += cash.quantity;
570                if cash.conversion_rate > Decimal::ZERO {
571                    entry.conversion_rate = cash.conversion_rate;
572                }
573            }
574        }
575        combined
576    }
577
578    fn total_unrealized(&self) -> Price {
579        self.sub_accounts
580            .values()
581            .flat_map(|account| account.positions.values())
582            .map(|position| position.unrealized_pnl)
583            .sum()
584    }
585
586    fn cash_value(&self) -> Price {
587        self.sub_accounts
588            .values()
589            .map(|account| account.balances.total_value())
590            .sum()
591    }
592}
593
594fn calculate_realized_pnl(
595    kind: &InstrumentKind,
596    entry_price: Price,
597    exit_price: Price,
598    quantity: Quantity,
599    exit_side: Side,
600) -> Price {
601    match kind {
602        InstrumentKind::Spot => Decimal::ZERO,
603        InstrumentKind::LinearPerpetual => {
604            let delta = match exit_side {
605                Side::Buy => entry_price - exit_price,
606                Side::Sell => exit_price - entry_price,
607            };
608            delta * quantity
609        }
610        InstrumentKind::InversePerpetual => {
611            if entry_price.is_zero() || exit_price.is_zero() {
612                return Decimal::ZERO;
613            }
614            let inv_entry = Decimal::ONE / entry_price;
615            let inv_exit = Decimal::ONE / exit_price;
616            let delta = match exit_side {
617                Side::Buy => inv_entry - inv_exit,
618                Side::Sell => inv_exit - inv_entry,
619            };
620            delta * quantity
621        }
622    }
623}
624
625fn update_unrealized(position: &mut Position, instrument: &Instrument, price: Price) {
626    position.unrealized_pnl = match (instrument.kind, position.entry_price, position.side) {
627        (InstrumentKind::Spot, _, _) => Decimal::ZERO,
628        (InstrumentKind::LinearPerpetual, Some(entry), Some(side)) => {
629            let delta = match side {
630                Side::Buy => price - entry,
631                Side::Sell => entry - price,
632            };
633            delta * position.quantity
634        }
635        (InstrumentKind::InversePerpetual, Some(entry), Some(side)) => {
636            if price.is_zero() || entry.is_zero() {
637                Decimal::ZERO
638            } else {
639                let inv_entry = Decimal::ONE / entry;
640                let inv_price = Decimal::ONE / price;
641                let delta = match side {
642                    Side::Buy => inv_price - inv_entry,
643                    Side::Sell => inv_entry - inv_price,
644                };
645                delta * position.quantity
646            }
647        }
648        _ => Decimal::ZERO,
649    };
650    position.updated_at = Utc::now();
651}
652
653fn matches_reporting_currency(currency: AssetId, reporting: AssetId) -> bool {
654    if currency == reporting {
655        return true;
656    }
657    reporting.exchange == ExchangeId::UNSPECIFIED && currency.code() == reporting.code()
658}
659
660fn conversion_rate_for_currency(currency: AssetId, reporting: AssetId) -> Price {
661    if matches_reporting_currency(currency, reporting) {
662        Decimal::ONE
663    } else {
664        Decimal::ZERO
665    }
666}
667
668/// Snapshot of a single venue's ledger/positions for persistence.
669#[derive(Clone, Debug, Default, Deserialize, Serialize)]
670pub struct SubAccountState {
671    pub exchange: ExchangeId,
672    pub positions: HashMap<Symbol, Position>,
673    pub balances: CashBook,
674}
675
676/// Serializable representation of a portfolio used for persistence.
677#[derive(Clone, Debug, Default, Deserialize, Serialize)]
678pub struct PortfolioState {
679    pub positions: HashMap<Symbol, Position>,
680    pub balances: CashBook,
681    pub reporting_currency: AssetId,
682    pub initial_equity: Price,
683    pub drawdown_limit: Option<Decimal>,
684    pub peak_equity: Price,
685    pub liquidate_only: bool,
686    #[serde(default)]
687    pub sub_accounts: HashMap<ExchangeId, SubAccountState>,
688}
689
690/// Tracks the last known execution timestamp and corresponding identifiers.
691#[derive(Clone, Debug, Default, Deserialize, Serialize)]
692pub struct ExecutionCheckpoint {
693    pub last_timestamp: Option<DateTime<Utc>>,
694    #[serde(default)]
695    pub exec_ids: Vec<String>,
696}
697
698/// Durable snapshot of the live trading runtime persisted on disk.
699#[derive(Clone, Debug, Default, Deserialize, Serialize)]
700pub struct LiveState {
701    pub portfolio: Option<PortfolioState>,
702    pub open_orders: Vec<Order>,
703    pub last_prices: HashMap<Symbol, Price>,
704    pub last_candle_ts: Option<DateTime<Utc>>,
705    pub strategy_state: Option<serde_json::Value>,
706    #[serde(default)]
707    pub execution_checkpoint: ExecutionCheckpoint,
708}
709
710/// Abstraction over state persistence backends.
711pub trait StateRepository: Send + Sync + 'static {
712    /// Structured snapshot stored in durable persistence.
713    type Snapshot: Serialize + DeserializeOwned + Send + Sync + 'static;
714
715    /// Load the most recent state from durable storage or defaults if none exists.
716    fn load(&self) -> PortfolioResult<Self::Snapshot>;
717    /// Atomically save the provided state snapshot.
718    fn save(&self, state: &Self::Snapshot) -> PortfolioResult<()>;
719}
720
721const STATE_SCHEMA: &str = r#"
722CREATE TABLE IF NOT EXISTS state (
723    id INTEGER PRIMARY KEY CHECK (id = 1),
724    payload TEXT NOT NULL,
725    updated_at TEXT NOT NULL DEFAULT (datetime('now'))
726);
727"#;
728
729/// [`StateRepository`] implementation backed by a SQLite database file.
730#[derive(Clone)]
731pub struct SqliteStateRepository {
732    path: PathBuf,
733}
734
735impl SqliteStateRepository {
736    /// Create a new repository that stores state inside the provided file path.
737    pub fn new(path: PathBuf) -> Self {
738        Self { path }
739    }
740
741    fn connect(&self) -> PortfolioResult<Connection> {
742        if let Some(parent) = self.path.parent() {
743            fs::create_dir_all(parent).map_err(|err| {
744                PortfolioError::Internal(format!(
745                    "failed to create state directory {}: {err}",
746                    parent.display()
747                ))
748            })?;
749        }
750        let conn = Connection::open(&self.path).map_err(|err| {
751            PortfolioError::Internal(format!(
752                "failed to open state database {}: {err}",
753                self.path.display()
754            ))
755        })?;
756        conn.execute_batch("PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL;")
757            .map_err(|err| {
758                PortfolioError::Internal(format!("failed to configure sqlite: {err}"))
759            })?;
760        conn.execute_batch(STATE_SCHEMA)
761            .map_err(|err| PortfolioError::Internal(format!("failed to apply schema: {err}")))?;
762        Ok(conn)
763    }
764}
765
766impl StateRepository for SqliteStateRepository {
767    type Snapshot = LiveState;
768
769    fn load(&self) -> PortfolioResult<LiveState> {
770        let conn = self.connect()?;
771        let payload: Option<String> = conn
772            .query_row("SELECT payload FROM state WHERE id = 1", [], |row| {
773                row.get(0)
774            })
775            .optional()
776            .map_err(|err| PortfolioError::Internal(format!("failed to read state: {err}")))?;
777        if let Some(json) = payload {
778            serde_json::from_str(&json).map_err(|err| {
779                PortfolioError::Internal(format!("failed to decode persisted state: {err}"))
780            })
781        } else {
782            Ok(LiveState::default())
783        }
784    }
785
786    fn save(&self, state: &LiveState) -> PortfolioResult<()> {
787        let mut conn = self.connect()?;
788        let payload = serde_json::to_string(state).map_err(|err| {
789            PortfolioError::Internal(format!("failed to serialize live state: {err}"))
790        })?;
791        let tx = conn.transaction().map_err(|err| {
792            PortfolioError::Internal(format!("failed to begin transaction: {err}"))
793        })?;
794        tx.execute(
795            "INSERT INTO state (id, payload, updated_at)
796             VALUES (1, ?, CURRENT_TIMESTAMP)
797             ON CONFLICT(id) DO UPDATE SET payload=excluded.payload, updated_at=CURRENT_TIMESTAMP",
798            params![payload],
799        )
800        .map_err(|err| PortfolioError::Internal(format!("failed to upsert state row: {err}")))?;
801        tx.commit()
802            .map_err(|err| PortfolioError::Internal(format!("failed to commit state: {err}")))?;
803        Ok(())
804    }
805}
806
807#[cfg(test)]
808mod tests {
809    use super::*;
810    use chrono::Utc;
811    use std::sync::Arc;
812    use tesser_core::{AssetId, ExchangeId, Instrument, InstrumentKind, Side, Symbol};
813    use tesser_ledger::{entries_from_fill, FillLedgerContext};
814
815    fn sample_fill(side: Side, price: Price, qty: Quantity) -> Fill {
816        Fill {
817            order_id: uuid::Uuid::new_v4().to_string(),
818            symbol: Symbol::from("BTCUSDT"),
819            side,
820            fill_price: price,
821            fill_quantity: qty,
822            fee: None,
823            fee_asset: None,
824            timestamp: Utc::now(),
825        }
826    }
827
828    fn sample_registry() -> Arc<MarketRegistry> {
829        let instrument = Instrument {
830            symbol: "BTCUSDT".into(),
831            base: "BTC".into(),
832            quote: "USDT".into(),
833            kind: InstrumentKind::LinearPerpetual,
834            settlement_currency: "USDT".into(),
835            tick_size: Decimal::new(1, 0),
836            lot_size: Decimal::new(1, 0),
837        };
838        Arc::new(MarketRegistry::from_instruments(vec![instrument]).unwrap())
839    }
840
841    #[test]
842    fn portfolio_updates_equity() {
843        let mut portfolio = Portfolio::new(PortfolioConfig::default(), sample_registry());
844        let buy = sample_fill(Side::Buy, Decimal::from(50_000), Decimal::new(1, 1));
845        apply_with_ledger(&mut portfolio, &buy);
846        assert!(portfolio.cash() < Decimal::from(10_000));
847    }
848
849    #[test]
850    fn triggers_liquidate_only_on_drawdown() {
851        let registry = sample_registry();
852        let config = PortfolioConfig {
853            max_drawdown: Some(Decimal::new(2, 2)), // 2%
854            ..PortfolioConfig::default()
855        };
856        let mut portfolio = Portfolio::new(config, registry.clone());
857        let buy = sample_fill(Side::Buy, Decimal::from(10), Decimal::from(10));
858        apply_with_ledger(&mut portfolio, &buy);
859        assert!(!portfolio.liquidate_only());
860        // Price crash reduces equity by more than 2%
861        portfolio
862            .update_market_data(buy.symbol, Decimal::ZERO)
863            .unwrap();
864        assert!(portfolio.liquidate_only());
865    }
866
867    #[test]
868    fn rebuild_preserves_peak_equity_from_previous_state() {
869        let registry = sample_registry();
870        let preserved = PortfolioState {
871            reporting_currency: AssetId::from("USDT"),
872            initial_equity: Decimal::from(10_000),
873            peak_equity: Decimal::from(25_000),
874            ..PortfolioState::default()
875        };
876        let balances = vec![AccountBalance {
877            exchange: ExchangeId::UNSPECIFIED,
878            asset: AssetId::from("USDT"),
879            total: Decimal::from(10_000),
880            available: Decimal::from(10_000),
881            updated_at: Utc::now(),
882        }];
883        let config = PortfolioConfig {
884            reporting_currency: AssetId::from("USDT"),
885            ..PortfolioConfig::default()
886        };
887        let portfolio = Portfolio::from_exchange_state(
888            Vec::new(),
889            balances,
890            config,
891            registry,
892            Some(&preserved),
893        );
894        let snapshot = portfolio.snapshot();
895        assert_eq!(snapshot.peak_equity, Decimal::from(25_000));
896    }
897
898    fn apply_with_ledger(portfolio: &mut Portfolio, fill: &Fill) {
899        let impact = portfolio.apply_fill_positions(fill).unwrap();
900        let registry = sample_registry();
901        let instrument = registry.get(fill.symbol).unwrap();
902        let entries = entries_from_fill(FillLedgerContext::new(
903            fill,
904            &instrument,
905            impact.realized_pnl,
906        ));
907        portfolio.apply_ledger_entries(&entries).unwrap();
908    }
909}