use crate::{
AccountEventKind, AccountSnapshot, InstrumentAccountSnapshot, UnindexedAccountEvent,
UnindexedAccountSnapshot,
balance::{AssetBalance, Balance},
client::{BracketOrderClient, ExecutionClient},
error::{ApiError, ConnectivityError, OrderError, UnindexedClientError, UnindexedOrderError},
order::{
Order, OrderKey, OrderKind, TimeInForce, TrailingOffsetType,
bracket::{
BracketOrderRequest as UnifiedBracketOrderRequest,
BracketOrderResult as UnifiedBracketOrderResult,
},
id::{ClientOrderId, OrderId, StrategyId},
request::{OrderRequestCancel, OrderRequestOpen, UnindexedOrderResponseCancel},
state::{Cancelled, Filled, Open, OrderState, UnindexedOrderState},
},
trade::{AssetFees, Trade, TradeId},
};
use chrono::{DateTime, Utc};
use fnv::FnvHashMap;
use futures::{SinkExt as _, StreamExt as _, stream::BoxStream};
use indexmap::IndexMap;
use itertools::Itertools as _;
use lru::LruCache;
use rust_decimal::Decimal;
use rustrade_instrument::{
Side, asset::name::AssetNameExchange, exchange::ExchangeId,
instrument::name::InstrumentNameExchange,
};
use rustrade_integration::protocol::websocket::{WebSocket, WsMessage};
use serde::{Deserialize, Serialize};
use smol_str::{SmolStr, format_smolstr};
use std::{num::NonZeroUsize, pin::Pin, str::FromStr, sync::Arc, time::Duration};
use tokio::sync::mpsc;
use tracing::{debug, error, info, trace, warn};
const INITIAL_BACKOFF_MS: u64 = 1_000;
const MAX_BACKOFF_MS: u64 = 30_000;
const MAX_RECONNECT_ATTEMPTS: u32 = 10;
const HEARTBEAT_TIMEOUT_SECS: u64 = 35;
const FILL_RECOVERY_TIMEOUT_SECS: u64 = 30;
const SIGNAL_RECOVERY_LOOKBACK_MS: i64 = 1_500;
const ALPACA_MAX_ACTIVITIES: usize = 100;
const DEFAULT_RATE_LIMIT_DELAY_SECS: u64 = 60;
const MAX_RATE_LIMIT_ATTEMPTS: u32 = 4;
const DEDUP_CACHE_SIZE: usize = 2_000;
const WS_HANDSHAKE_TIMEOUT_SECS: u64 = 15;
const WS_CLOSE_TIMEOUT_SECS: u64 = 5;
struct GracefulShutdownStream<S> {
inner: S,
_handle: tokio::task::JoinHandle<()>,
}
impl<S> GracefulShutdownStream<S> {
fn new(inner: S, handle: tokio::task::JoinHandle<()>) -> Self {
Self {
inner,
_handle: handle,
}
}
}
impl<S: futures::Stream + Unpin> futures::Stream for GracefulShutdownStream<S> {
type Item = S::Item;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl<S> Drop for GracefulShutdownStream<S> {
fn drop(&mut self) {
}
}
struct RateLimitTracker {
blocked_until: parking_lot::Mutex<Option<tokio::time::Instant>>,
}
impl RateLimitTracker {
fn new() -> Self {
Self {
blocked_until: parking_lot::Mutex::new(None),
}
}
async fn wait_if_blocked(&self) {
loop {
let now = tokio::time::Instant::now();
let deadline = {
let mut guard = self.blocked_until.lock();
let d = *guard;
if matches!(d, Some(t) if t <= now) {
*guard = None;
}
d
};
match deadline {
None => return,
Some(until) => {
if until <= now {
return;
}
#[allow(clippy::cast_possible_truncation)]
let delay_ms = (until - now).as_millis() as u64;
debug!(delay_ms, "Alpaca REST rate-limited, waiting before request");
tokio::time::sleep_until(until).await;
}
}
}
}
fn on_rate_limited(&self, retry_after: Option<Duration>) {
let delay = retry_after.unwrap_or(Duration::from_secs(DEFAULT_RATE_LIMIT_DELAY_SECS));
let new_deadline = tokio::time::Instant::now() + delay;
let mut guard = self.blocked_until.lock();
let was_blocked = guard.is_some();
*guard = Some(guard.map_or(new_deadline, |existing| existing.max(new_deadline)));
if was_blocked {
debug!(
delay_secs = delay.as_secs(),
"Alpaca rate-limit cooldown extended"
);
} else {
warn!(
delay_secs = delay.as_secs(),
"Alpaca entering rate-limit degradation mode"
);
}
}
}
struct ExponentialBackoff {
attempt: u32,
max_attempts: u32,
initial_ms: u64,
max_ms: u64,
}
impl ExponentialBackoff {
fn new() -> Self {
Self {
attempt: 0,
max_attempts: MAX_RECONNECT_ATTEMPTS,
initial_ms: INITIAL_BACKOFF_MS,
max_ms: MAX_BACKOFF_MS,
}
}
fn reset(&mut self) {
self.attempt = 0;
}
async fn wait(&mut self) -> bool {
if self.attempt >= self.max_attempts {
return false;
}
let delay_ms = self
.initial_ms
.saturating_mul(2u64.saturating_pow(self.attempt))
.min(self.max_ms);
self.attempt += 1;
debug!(
attempt = self.attempt,
max = self.max_attempts,
delay_ms,
"Alpaca reconnect backoff"
);
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
true
}
}
type SharedDedupCache = Arc<parking_lot::Mutex<LruCache<SmolStr, ()>>>;
fn new_dedup_cache() -> SharedDedupCache {
#[allow(clippy::unwrap_used)]
Arc::new(parking_lot::Mutex::new(LruCache::new(
NonZeroUsize::new(DEDUP_CACHE_SIZE).unwrap(),
)))
}
fn is_duplicate(cache: &SharedDedupCache, key: &SmolStr) -> bool {
let mut guard = cache.lock();
if guard.peek(key).is_some() {
return true;
}
guard.put(key.clone(), ());
false
}
#[derive(Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct AlpacaConfig {
api_key: String,
secret_key: String,
pub paper: bool,
#[cfg(test)]
pub base_url_override: Option<String>,
}
impl std::fmt::Debug for AlpacaConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AlpacaConfig")
.field("api_key", &"***")
.field("secret_key", &"***")
.field("paper", &self.paper)
.finish()
}
}
impl AlpacaConfig {
pub fn new(api_key: String, secret_key: String, paper: bool) -> Self {
Self {
api_key,
secret_key,
paper,
#[cfg(test)]
base_url_override: None,
}
}
#[cfg(test)]
pub fn with_base_url(api_key: String, secret_key: String, base_url: String) -> Self {
Self {
api_key,
secret_key,
paper: true,
base_url_override: Some(base_url),
}
}
pub fn api_key(&self) -> &str {
&self.api_key
}
pub fn rest_base_url(&self) -> &str {
#[cfg(test)]
if let Some(ref url) = self.base_url_override {
return url.as_str();
}
if self.paper {
"https://paper-api.alpaca.markets"
} else {
"https://api.alpaca.markets"
}
}
pub fn ws_url(&self) -> &'static str {
if self.paper {
"wss://paper-api.alpaca.markets/stream"
} else {
"wss://api.alpaca.markets/stream"
}
}
}
#[derive(Debug, Deserialize)]
struct AlpacaAccount {
equity: String,
buying_power: String,
options_buying_power: Option<String>,
#[allow(dead_code)]
crypto_buying_power: Option<String>,
}
#[derive(Debug, Deserialize)]
struct AlpacaPosition {
symbol: String,
asset_class: String,
qty: String,
qty_available: String,
}
#[derive(Debug, Deserialize)]
struct AlpacaOrderResponse {
id: String,
client_order_id: Option<String>,
symbol: String,
qty: Option<String>,
filled_qty: String,
side: String,
#[serde(rename = "type")]
order_type: String,
time_in_force: String,
limit_price: Option<String>,
stop_price: Option<String>,
trail_percent: Option<String>,
trail_price: Option<String>,
created_at: String,
}
#[derive(Debug, Deserialize)]
struct AlpacaActivity {
id: String,
order_id: String,
symbol: String,
side: String,
price: String,
qty: String,
transaction_time: String,
}
#[derive(Debug, Deserialize)]
struct AlpacaApiError {
message: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AlpacaPositionIntent {
BuyToOpen,
BuyToClose,
SellToOpen,
SellToClose,
}
#[derive(Debug, Serialize)]
struct TakeProfitParams {
limit_price: String,
}
#[derive(Debug, Serialize)]
struct StopLossParams {
stop_price: String,
#[serde(skip_serializing_if = "Option::is_none")]
limit_price: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct AlpacaBracketOrderRequest {
pub instrument: InstrumentNameExchange,
pub strategy: StrategyId,
pub cid: ClientOrderId,
pub side: Side,
pub quantity: Decimal,
pub entry_price: Decimal,
pub take_profit_price: Decimal,
pub stop_loss_price: Decimal,
pub stop_loss_limit_price: Option<Decimal>,
pub time_in_force: TimeInForce,
}
impl AlpacaBracketOrderRequest {
#[allow(clippy::too_many_arguments)] pub fn new(
instrument: InstrumentNameExchange,
strategy: StrategyId,
cid: ClientOrderId,
side: Side,
quantity: Decimal,
entry_price: Decimal,
take_profit_price: Decimal,
stop_loss_price: Decimal,
time_in_force: TimeInForce,
) -> Self {
Self {
instrument,
strategy,
cid,
side,
quantity,
entry_price,
take_profit_price,
stop_loss_price,
stop_loss_limit_price: None,
time_in_force,
}
}
#[must_use]
pub fn with_stop_loss_limit_price(mut self, price: Decimal) -> Self {
self.stop_loss_limit_price = Some(price);
self
}
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct AlpacaBracketOrderResult {
pub parent: Order<ExchangeId, InstrumentNameExchange, UnindexedOrderState>,
}
#[derive(Debug, Serialize)]
struct AlpacaOrderRequest<'a> {
symbol: &'a str,
qty: String,
side: &'static str,
#[serde(rename = "type")]
order_type: &'static str,
time_in_force: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
limit_price: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
stop_price: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
trail_percent: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
trail_price: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
client_order_id: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
position_intent: Option<AlpacaPositionIntent>,
#[serde(skip_serializing_if = "Option::is_none")]
order_class: Option<&'static str>,
#[serde(skip_serializing_if = "Option::is_none")]
take_profit: Option<TakeProfitParams>,
#[serde(skip_serializing_if = "Option::is_none")]
stop_loss: Option<StopLossParams>,
}
#[derive(Debug, Deserialize)]
struct AlpacaStreamMessage<'a> {
stream: SmolStr,
#[serde(borrow)]
data: &'a serde_json::value::RawValue,
}
#[derive(Debug, Deserialize)]
struct AlpacaTradeUpdate<'a> {
event: SmolStr,
#[serde(borrow)]
order: AlpacaOrderWs<'a>,
#[serde(borrow)]
price: Option<&'a str>,
#[serde(borrow)]
qty: Option<&'a str>,
#[serde(borrow)]
timestamp: Option<&'a str>,
}
#[derive(Debug, Deserialize)]
struct AlpacaOrderWs<'a> {
id: SmolStr,
client_order_id: Option<SmolStr>,
symbol: SmolStr,
#[serde(borrow)]
qty: Option<&'a str>,
#[serde(borrow)]
filled_qty: Option<&'a str>,
side: SmolStr,
#[serde(rename = "type")]
order_type: SmolStr,
time_in_force: SmolStr,
#[serde(borrow)]
limit_price: Option<&'a str>,
#[serde(borrow)]
stop_price: Option<&'a str>,
#[serde(borrow)]
trail_percent: Option<&'a str>,
#[serde(borrow)]
trail_price: Option<&'a str>,
status: SmolStr,
}
#[derive(Clone)]
pub struct AlpacaClient {
config: Arc<AlpacaConfig>,
http: reqwest::Client,
rate_limiter: Arc<RateLimitTracker>,
orders_url: String,
}
impl std::fmt::Debug for AlpacaClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AlpacaClient")
.field("paper", &self.config.paper)
.finish_non_exhaustive()
}
}
impl AlpacaClient {
#[allow(clippy::expect_used)] fn build_http(config: &AlpacaConfig) -> reqwest::Client {
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
let mut headers = HeaderMap::new();
headers.insert(
HeaderName::from_static("apca-api-key-id"),
HeaderValue::from_str(&config.api_key)
.expect("Alpaca API key contains invalid header characters"),
);
headers.insert(
HeaderName::from_static("apca-api-secret-key"),
HeaderValue::from_str(&config.secret_key)
.expect("Alpaca secret key contains invalid header characters"),
);
reqwest::Client::builder()
.default_headers(headers)
.build()
.expect("failed to build reqwest client for Alpaca")
}
fn base_url(&self) -> &str {
self.config.rest_base_url()
}
}
fn parse_rate_limit_delay(headers: &reqwest::header::HeaderMap) -> Option<Duration> {
headers
.get("x-ratelimit-reset")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.map(|reset_ts| {
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
Duration::from_secs(reset_ts.saturating_sub(now_secs).max(1))
})
}
async fn rest_with_retry<T>(
rate_limiter: &RateLimitTracker,
mut build_request: impl FnMut() -> reqwest::RequestBuilder,
) -> Result<T, UnindexedClientError>
where
T: for<'de> Deserialize<'de>,
{
for attempt in 0..MAX_RATE_LIMIT_ATTEMPTS {
rate_limiter.wait_if_blocked().await;
let response = build_request()
.send()
.await
.map_err(|e| connectivity_err(format!("Alpaca REST request failed: {e}")))?;
if response
.headers()
.get("x-ratelimit-remaining")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u32>().ok())
== Some(0)
{
debug!("Alpaca REST rate-limit bucket exhausted (X-Ratelimit-Remaining: 0)");
}
let status = response.status();
if status == reqwest::StatusCode::TOO_MANY_REQUESTS {
let reset_delay = parse_rate_limit_delay(response.headers());
if attempt + 1 < MAX_RATE_LIMIT_ATTEMPTS {
warn!(
attempt = attempt + 1,
max_attempts = MAX_RATE_LIMIT_ATTEMPTS,
"Alpaca REST rate-limited (429), retrying"
);
rate_limiter.on_rate_limited(reset_delay);
continue;
}
warn!(
max_attempts = MAX_RATE_LIMIT_ATTEMPTS,
"Alpaca REST rate-limit retries exhausted"
);
return Err(UnindexedClientError::Api(ApiError::RateLimit));
}
if status == reqwest::StatusCode::NO_CONTENT {
return Err(connectivity_err(
"Alpaca REST returned 204 No Content — use rest_delete_with_retry for DELETE endpoints"
.to_string(),
));
}
let bytes = response
.bytes()
.await
.map_err(|e| connectivity_err(format!("Alpaca REST read body failed: {e}")))?;
if status.is_success() {
return serde_json::from_slice::<T>(&bytes).map_err(|e| {
connectivity_err(format!(
"Alpaca REST JSON parse error ({status}): {e} | body: {}",
String::from_utf8_lossy(&bytes)
.chars()
.take(200)
.collect::<String>()
))
});
}
let api_err = serde_json::from_slice::<AlpacaApiError>(&bytes)
.map(|e| e.message)
.unwrap_or_else(|_| String::from_utf8_lossy(&bytes).into_owned());
if status.is_client_error() {
return Err(UnindexedClientError::Api(parse_api_error(status, &api_err)));
}
return Err(connectivity_err(format!(
"Alpaca REST error {status}: {api_err}"
)));
}
unreachable!("Alpaca REST retry loop exited without returning")
}
async fn rest_delete_with_retry(
rate_limiter: &RateLimitTracker,
mut build_request: impl FnMut() -> reqwest::RequestBuilder,
) -> Result<(), UnindexedOrderError> {
for attempt in 0..MAX_RATE_LIMIT_ATTEMPTS {
rate_limiter.wait_if_blocked().await;
let response = build_request().send().await.map_err(|e| {
UnindexedOrderError::Connectivity(ConnectivityError::Socket(format!(
"Alpaca cancel request failed: {e}"
)))
})?;
let status = response.status();
if status == reqwest::StatusCode::TOO_MANY_REQUESTS {
let reset_delay = parse_rate_limit_delay(response.headers());
if attempt + 1 < MAX_RATE_LIMIT_ATTEMPTS {
warn!(
attempt = attempt + 1,
max_attempts = MAX_RATE_LIMIT_ATTEMPTS,
"Alpaca cancel rate-limited (429), retrying"
);
rate_limiter.on_rate_limited(reset_delay);
continue;
}
warn!(
max_attempts = MAX_RATE_LIMIT_ATTEMPTS,
"Alpaca cancel rate-limit retries exhausted"
);
return Err(UnindexedOrderError::Rejected(ApiError::RateLimit));
}
if status == reqwest::StatusCode::NO_CONTENT || status.is_success() {
return Ok(());
}
let bytes = response
.bytes()
.await
.inspect_err(
|e| warn!(%e, %status, "Alpaca cancel_order: failed to read error response body"),
)
.unwrap_or_default();
let msg = serde_json::from_slice::<AlpacaApiError>(&bytes)
.map(|e| e.message)
.unwrap_or_else(|_| String::from_utf8_lossy(&bytes).into_owned());
return Err(parse_order_error(status, &msg));
}
unreachable!("Alpaca cancel retry loop exited without returning")
}
impl ExecutionClient for AlpacaClient {
const EXCHANGE: ExchangeId = ExchangeId::AlpacaBroker;
type Config = AlpacaConfig;
type AccountStream = BoxStream<'static, UnindexedAccountEvent>;
fn new(config: Self::Config) -> Self {
let http = Self::build_http(&config);
let orders_url = format!("{}/v2/orders", config.rest_base_url());
Self {
config: Arc::new(config),
http,
rate_limiter: Arc::new(RateLimitTracker::new()),
orders_url,
}
}
async fn account_snapshot(
&self,
assets: &[AssetNameExchange],
instruments: &[InstrumentNameExchange],
) -> Result<UnindexedAccountSnapshot, UnindexedClientError> {
let base = self.base_url();
let http = self.http.clone();
let rl = &self.rate_limiter;
let wants_usd = assets.is_empty()
|| assets
.iter()
.any(|a| a.name().as_str().eq_ignore_ascii_case("usd"));
let wants_non_usd = assets.is_empty()
|| assets
.iter()
.any(|a| !a.name().as_str().eq_ignore_ascii_case("usd"));
let account_url = format!("{base}/v2/account");
let positions_url = format!("{base}/v2/positions");
let balances = match (wants_usd, wants_non_usd) {
(true, true) => {
let (account, positions): (AlpacaAccount, Vec<AlpacaPosition>) = tokio::try_join!(
rest_with_retry(rl, || http.get(&account_url)),
rest_with_retry(rl, || http.get(&positions_url)),
)?;
let mut balances = convert_account_to_balances(&account, assets);
balances.extend(convert_positions_to_balances(&positions, assets));
balances
}
(true, false) => {
let account: AlpacaAccount = rest_with_retry(rl, || http.get(&account_url)).await?;
convert_account_to_balances(&account, assets)
}
(false, true) => {
let positions: Vec<AlpacaPosition> =
rest_with_retry(rl, || http.get(&positions_url)).await?;
convert_positions_to_balances(&positions, assets)
}
(false, false) => Vec::new(),
};
let open_orders = fetch_raw_open_orders(&http, rl, base, instruments).await?;
let instrument_snapshots = build_instrument_snapshots(open_orders, instruments);
Ok(AccountSnapshot::new(
ExchangeId::AlpacaBroker,
balances,
instrument_snapshots,
))
}
async fn account_stream(
&self,
_assets: &[AssetNameExchange], instruments: &[InstrumentNameExchange],
) -> Result<Self::AccountStream, UnindexedClientError> {
let initial_ws = connect_and_subscribe(&self.config).await?;
let (tx, rx) = mpsc::unbounded_channel::<UnindexedAccountEvent>();
let dedup = new_dedup_cache();
let config = self.config.clone();
let http = self.http.clone();
let rate_limiter = self.rate_limiter.clone();
let instruments = instruments.to_vec();
let cm_handle = tokio::spawn(connection_manager(
tx,
dedup,
config,
http,
rate_limiter,
instruments,
Some(initial_ws),
));
let rx_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
let guarded = GracefulShutdownStream::new(rx_stream, cm_handle);
Ok(futures::StreamExt::boxed(guarded))
}
async fn cancel_order(
&self,
request: OrderRequestCancel<ExchangeId, &InstrumentNameExchange>,
) -> Option<UnindexedOrderResponseCancel> {
let key = crate::order::OrderKey {
exchange: request.key.exchange,
instrument: request.key.instrument.clone(),
strategy: request.key.strategy.clone(),
cid: request.key.cid.clone(),
};
let order_id: SmolStr = match &request.state.id {
Some(id) => id.0.clone(),
None => {
warn!(
instrument = %key.instrument,
"Alpaca cancel_order: no exchange order ID available (clientOrderId-only cancel not supported)"
);
return Some(crate::order::request::OrderResponseCancel {
key,
state: Err(UnindexedOrderError::Rejected(ApiError::OrderRejected(
"exchange order ID required for cancel (fetch_open_orders to resolve)"
.into(),
))),
});
}
};
let base = self.base_url();
let http = self.http.clone();
let url = format!("{base}/v2/orders/{order_id}");
match rest_delete_with_retry(&self.rate_limiter, || http.delete(&url)).await {
Ok(()) => {
let exchange_order_id = OrderId(order_id);
Some(crate::order::request::OrderResponseCancel {
key,
state: Ok(Cancelled::new(exchange_order_id, Utc::now(), Decimal::ZERO)),
})
}
Err(e) => Some(crate::order::request::OrderResponseCancel { key, state: Err(e) }),
}
}
async fn open_order(
&self,
request: OrderRequestOpen<ExchangeId, &InstrumentNameExchange>,
) -> Option<Order<ExchangeId, InstrumentNameExchange, UnindexedOrderState>> {
let side = request.state.side;
let reduce_only = request.state.reduce_only;
self.open_order_inner(request, map_position_intent(side, reduce_only))
.await
}
async fn fetch_balances(
&self,
assets: &[AssetNameExchange],
) -> Result<Vec<AssetBalance<AssetNameExchange>>, UnindexedClientError> {
let base = self.base_url();
let http = self.http.clone();
let mut result = Vec::new();
let wants_usd = assets.is_empty()
|| assets
.iter()
.any(|a| a.name().as_str().eq_ignore_ascii_case("usd"));
if wants_usd {
let account_url = format!("{base}/v2/account");
let account: AlpacaAccount =
rest_with_retry(&self.rate_limiter, || http.get(&account_url)).await?;
result.extend(convert_account_to_balances(&account, assets));
}
let wants_non_usd = assets.is_empty()
|| assets
.iter()
.any(|a| !a.name().as_str().eq_ignore_ascii_case("usd"));
if wants_non_usd {
let positions_url = format!("{base}/v2/positions");
let positions: Vec<AlpacaPosition> =
rest_with_retry(&self.rate_limiter, || http.get(&positions_url)).await?;
result.extend(convert_positions_to_balances(&positions, assets));
}
Ok(result)
}
async fn fetch_open_orders(
&self,
instruments: &[InstrumentNameExchange],
) -> Result<Vec<Order<ExchangeId, InstrumentNameExchange, Open>>, UnindexedClientError> {
let base = self.base_url();
let http = self.http.clone();
let open_orders =
fetch_raw_open_orders(&http, &self.rate_limiter, base, instruments).await?;
let result = open_orders
.into_iter()
.filter_map(|o| convert_open_order(&o))
.collect();
Ok(result)
}
async fn fetch_trades(
&self,
time_since: DateTime<Utc>,
instruments: &[InstrumentNameExchange],
) -> Result<Vec<Trade<AssetNameExchange, InstrumentNameExchange>>, UnindexedClientError> {
let after_str = time_since.to_rfc3339();
let base = self.base_url();
let http = self.http.clone();
let page = paginate_activities(&http, &self.rate_limiter, base, &after_str).await?;
if page.truncated {
return Err(UnindexedClientError::Truncated {
limit: MAX_ACTIVITY_PAGES,
});
}
let trades = if instruments.is_empty() {
page.activities
.into_iter()
.filter_map(|a| convert_activity_to_trade(&a))
.collect()
} else {
let instrument_set: fnv::FnvHashSet<&str> =
instruments.iter().map(|i| i.name().as_str()).collect();
page.activities
.into_iter()
.filter(|a| instrument_set.contains(a.symbol.as_str()))
.filter_map(|a| convert_activity_to_trade(&a))
.collect()
};
Ok(trades)
}
}
impl AlpacaClient {
pub async fn open_order_with_intent(
&self,
request: OrderRequestOpen<ExchangeId, &InstrumentNameExchange>,
intent: AlpacaPositionIntent,
) -> Option<Order<ExchangeId, InstrumentNameExchange, UnindexedOrderState>> {
self.open_order_inner(request, intent).await
}
pub async fn open_bracket_order(
&self,
request: AlpacaBracketOrderRequest,
) -> AlpacaBracketOrderResult {
let order_key = crate::order::OrderKey::new(
ExchangeId::AlpacaBroker,
request.instrument.clone(),
request.strategy.clone(),
request.cid.clone(),
);
let tif_str = match request.time_in_force {
TimeInForce::GoodUntilEndOfDay => "day",
TimeInForce::GoodUntilCancelled { post_only } => {
if post_only {
return AlpacaBracketOrderResult {
parent: Order {
key: order_key,
side: request.side,
price: Some(request.entry_price),
quantity: request.quantity,
kind: OrderKind::Limit,
time_in_force: request.time_in_force,
state: OrderState::inactive(OrderError::Rejected(
ApiError::OrderRejected(
"Alpaca does not support post_only for bracket orders"
.to_string(),
),
)),
},
};
}
"gtc"
}
other => {
return AlpacaBracketOrderResult {
parent: Order {
key: order_key,
side: request.side,
price: Some(request.entry_price),
quantity: request.quantity,
kind: OrderKind::Limit,
time_in_force: request.time_in_force,
state: OrderState::inactive(OrderError::Rejected(ApiError::OrderRejected(
format!(
"Alpaca bracket orders only support Day or GTC time_in_force, got {:?}",
other
),
))),
},
};
}
};
let price_ordering_ok = match request.side {
Side::Buy => {
request.stop_loss_price < request.entry_price
&& request.entry_price < request.take_profit_price
}
Side::Sell => {
request.take_profit_price < request.entry_price
&& request.entry_price < request.stop_loss_price
}
};
if !price_ordering_ok {
return AlpacaBracketOrderResult {
parent: Order {
key: order_key,
side: request.side,
price: Some(request.entry_price),
quantity: request.quantity,
kind: OrderKind::Limit,
time_in_force: request.time_in_force,
state: OrderState::inactive(OrderError::Rejected(ApiError::OrderRejected(
format!(
"Invalid bracket price ordering for {:?} side: entry={}, take_profit={}, stop_loss={}",
request.side,
request.entry_price,
request.take_profit_price,
request.stop_loss_price,
),
))),
},
};
}
if let Some(sl_limit) = request.stop_loss_limit_price {
let sl_limit_ok = match request.side {
Side::Buy => sl_limit <= request.stop_loss_price,
Side::Sell => sl_limit >= request.stop_loss_price,
};
if !sl_limit_ok {
return AlpacaBracketOrderResult {
parent: Order {
key: order_key,
side: request.side,
price: Some(request.entry_price),
quantity: request.quantity,
kind: OrderKind::Limit,
time_in_force: request.time_in_force,
state: OrderState::inactive(OrderError::Rejected(ApiError::OrderRejected(
format!(
"Invalid stop-loss limit price for {:?} bracket: \
stop_loss_price={}, stop_loss_limit_price={}",
request.side, request.stop_loss_price, sl_limit,
),
))),
},
};
}
}
let body = AlpacaOrderRequest {
symbol: request.instrument.name().as_str(),
qty: request.quantity.to_string(),
side: map_side(request.side),
order_type: "limit",
time_in_force: tif_str,
limit_price: Some(request.entry_price.to_string()),
stop_price: None,
trail_percent: None,
trail_price: None,
client_order_id: Some(request.cid.0.as_str()),
position_intent: if is_options_or_equity_symbol(request.instrument.name().as_str()) {
Some(map_position_intent(request.side, false))
} else {
None
},
order_class: Some("bracket"),
take_profit: Some(TakeProfitParams {
limit_price: request.take_profit_price.to_string(),
}),
stop_loss: Some(StopLossParams {
stop_price: request.stop_loss_price.to_string(),
limit_price: request.stop_loss_limit_price.map(|p| p.to_string()),
}),
};
let http = self.http.clone();
let rl = &self.rate_limiter;
let result: Result<AlpacaOrderResponse, UnindexedClientError> =
rest_with_retry(rl, || http.post(&self.orders_url).json(&body)).await;
match result {
Ok(resp) => {
let exchange_order_id = OrderId(SmolStr::new(&resp.id));
let time_exchange = parse_timestamp(&resp.created_at).unwrap_or_else(Utc::now);
let filled_qty = Decimal::from_str(&resp.filled_qty).unwrap_or(Decimal::ZERO);
let state = if filled_qty >= request.quantity {
OrderState::fully_filled(Filled::new(
exchange_order_id,
time_exchange,
filled_qty,
None,
))
} else {
OrderState::active(Open::new(exchange_order_id, time_exchange, filled_qty))
};
AlpacaBracketOrderResult {
parent: Order {
key: order_key,
side: request.side,
price: Some(request.entry_price),
quantity: request.quantity,
kind: OrderKind::Limit,
time_in_force: request.time_in_force,
state,
},
}
}
Err(e) => {
let order_err = match e {
UnindexedClientError::Connectivity(ce) => OrderError::Connectivity(ce),
UnindexedClientError::Api(ae) => OrderError::Rejected(ae),
UnindexedClientError::TaskFailed(_)
| UnindexedClientError::Internal(_)
| UnindexedClientError::Truncated { .. }
| UnindexedClientError::TruncatedSnapshot { .. } => {
unreachable!("rest_with_retry (order path) does not produce these variants")
}
};
AlpacaBracketOrderResult {
parent: Order {
key: order_key,
side: request.side,
price: Some(request.entry_price),
quantity: request.quantity,
kind: OrderKind::Limit,
time_in_force: request.time_in_force,
state: OrderState::inactive(order_err),
},
}
}
}
}
async fn open_order_inner(
&self,
request: OrderRequestOpen<ExchangeId, &InstrumentNameExchange>,
intent: AlpacaPositionIntent,
) -> Option<Order<ExchangeId, InstrumentNameExchange, UnindexedOrderState>> {
let instrument = request.key.instrument.clone();
let side = request.state.side;
let price = request.state.price;
let quantity = request.state.quantity;
let kind = request.state.kind;
let time_in_force = request.state.time_in_force;
let cid = request.key.cid.clone();
let order_key = crate::order::OrderKey::new(
ExchangeId::AlpacaBroker,
instrument.clone(),
request.key.strategy.clone(),
cid.clone(),
);
let tif_str = match map_time_in_force(time_in_force) {
Ok(s) => s,
Err(msg) => {
return Some(Order {
key: order_key,
side,
price,
quantity,
kind,
time_in_force,
state: OrderState::inactive(OrderError::Rejected(ApiError::OrderRejected(
msg.to_string(),
))),
});
}
};
let order_type_str = match map_order_kind(kind) {
Some(s) => s,
None => {
return Some(Order {
key: order_key,
side,
price,
quantity,
kind,
time_in_force,
state: OrderState::inactive(OrderError::UnsupportedOrderType(format!(
"Alpaca connector does not yet support OrderKind::{kind:?}"
))),
});
}
};
if let OrderKind::TrailingStop {
offset_type: TrailingOffsetType::BasisPoints,
..
} = kind
{
return Some(Order {
key: order_key,
side,
price,
quantity,
kind,
time_in_force,
state: OrderState::inactive(OrderError::UnsupportedOrderType(
"Alpaca does not support TrailingOffsetType::BasisPoints; \
use Percentage or Absolute"
.to_string(),
)),
});
}
if matches!(kind, OrderKind::StopLimit { .. }) && price.is_none() {
return Some(Order {
key: order_key,
side,
price,
quantity,
kind,
time_in_force,
state: OrderState::inactive(OrderError::Rejected(ApiError::OrderRejected(
"StopLimit order requires Order.price (the limit price) to be set".to_string(),
))),
});
}
let (stop_price, trail_percent, trail_price) = match kind {
OrderKind::Stop { trigger_price } | OrderKind::StopLimit { trigger_price } => {
(Some(trigger_price.to_string()), None, None)
}
OrderKind::TrailingStop {
offset,
offset_type,
} => match offset_type {
TrailingOffsetType::Percentage => (None, Some(offset.to_string()), None),
TrailingOffsetType::Absolute => (None, None, Some(offset.to_string())),
TrailingOffsetType::BasisPoints => unreachable!("validated above"),
},
OrderKind::Market | OrderKind::Limit | OrderKind::TrailingStopLimit { .. } => {
(None, None, None)
}
};
let body = AlpacaOrderRequest {
symbol: instrument.name().as_str(),
qty: quantity.to_string(),
side: map_side(side),
order_type: order_type_str,
time_in_force: tif_str,
limit_price: match kind {
OrderKind::Limit | OrderKind::StopLimit { .. } => price.map(|p| p.to_string()),
_ => None,
},
stop_price,
trail_percent,
trail_price,
client_order_id: Some(cid.0.as_str()),
position_intent: if is_options_or_equity_symbol(instrument.name().as_str()) {
Some(intent)
} else {
None
},
order_class: None,
take_profit: None,
stop_loss: None,
};
let http = self.http.clone();
let rl = &self.rate_limiter;
let result: Result<AlpacaOrderResponse, UnindexedClientError> =
rest_with_retry(rl, || http.post(&self.orders_url).json(&body)).await;
match result {
Ok(resp) => {
let exchange_order_id = OrderId(SmolStr::new(&resp.id));
let time_exchange = parse_timestamp(&resp.created_at).unwrap_or_else(Utc::now);
let filled_qty = Decimal::from_str(&resp.filled_qty).unwrap_or(Decimal::ZERO);
let state = if filled_qty >= quantity {
OrderState::fully_filled(Filled::new(
exchange_order_id,
time_exchange,
filled_qty,
None, ))
} else {
OrderState::active(Open::new(exchange_order_id, time_exchange, filled_qty))
};
Some(Order {
key: order_key,
side,
price,
quantity,
kind,
time_in_force,
state,
})
}
Err(e) => {
let order_err = match e {
UnindexedClientError::Connectivity(ce) => OrderError::Connectivity(ce),
UnindexedClientError::Api(ae) => OrderError::Rejected(ae),
UnindexedClientError::TaskFailed(_)
| UnindexedClientError::Internal(_)
| UnindexedClientError::Truncated { .. }
| UnindexedClientError::TruncatedSnapshot { .. } => {
unreachable!(
"rest_with_retry (order path) does not produce TaskFailed/Internal/Truncated/TruncatedSnapshot variants"
)
}
};
Some(Order {
key: order_key,
side,
price,
quantity,
kind,
time_in_force,
state: OrderState::inactive(order_err),
})
}
}
}
}
const MAX_OPEN_ORDERS: usize = 500;
async fn fetch_raw_open_orders(
http: &reqwest::Client,
rate_limiter: &RateLimitTracker,
base: &str,
instruments: &[InstrumentNameExchange],
) -> Result<Vec<AlpacaOrderResponse>, UnindexedClientError> {
let orders: Vec<AlpacaOrderResponse> = if instruments.is_empty() {
rest_with_retry(rate_limiter, || {
http.get(format!("{base}/v2/orders"))
.query(&[("status", "open"), ("limit", "500")])
})
.await?
} else {
let symbols = instruments.iter().map(|i| i.name().as_str()).join(",");
rest_with_retry(rate_limiter, || {
http.get(format!("{base}/v2/orders")).query(&[
("status", "open"),
("limit", "500"),
("symbols", &symbols),
])
})
.await?
};
if orders.len() == MAX_OPEN_ORDERS {
warn!(
limit = MAX_OPEN_ORDERS,
"Alpaca fetch_raw_open_orders: received exactly {MAX_OPEN_ORDERS} results — \
response is likely truncated"
);
return Err(UnindexedClientError::TruncatedSnapshot {
limit: MAX_OPEN_ORDERS,
});
}
Ok(orders)
}
const MAX_ACTIVITY_PAGES: usize = 50;
struct ActivityPage {
activities: Vec<AlpacaActivity>,
truncated: bool,
}
const PAGE_SIZE_STR: &str = "100"; const _: () = assert!(
ALPACA_MAX_ACTIVITIES == 100,
"PAGE_SIZE_STR must be updated to match ALPACA_MAX_ACTIVITIES",
);
async fn paginate_activities(
http: &reqwest::Client,
rate_limiter: &RateLimitTracker,
base: &str,
after: &str,
) -> Result<ActivityPage, UnindexedClientError> {
let mut all = Vec::with_capacity(ALPACA_MAX_ACTIVITIES);
let mut page_token: Option<String> = None;
let mut pages = 0usize;
let mut truncated = false;
loop {
if pages >= MAX_ACTIVITY_PAGES {
truncated = true;
break;
}
pages += 1;
let page_token_ref = page_token.as_deref();
let activities: Vec<AlpacaActivity> = rest_with_retry(rate_limiter, || {
let mut req = http.get(format!("{base}/v2/account/activities")).query(&[
("activity_type", "FILL"),
("after", after),
("page_size", PAGE_SIZE_STR),
("direction", "asc"),
]);
if let Some(token) = page_token_ref {
req = req.query(&[("page_token", token)]);
}
req
})
.await?;
let page_len = activities.len();
let page_token_candidate = activities.last().map(|a| a.id.clone());
all.extend(activities);
if page_len < ALPACA_MAX_ACTIVITIES {
break;
}
match page_token_candidate {
Some(token) if !token.is_empty() => {
debug!("Alpaca paginate_activities: fetching next page ({page_len} results)");
page_token = Some(token);
}
_ => break,
}
}
Ok(ActivityPage {
activities: all,
truncated,
})
}
#[allow(clippy::cognitive_complexity)] async fn connection_manager(
tx: mpsc::UnboundedSender<UnindexedAccountEvent>,
dedup: SharedDedupCache,
config: Arc<AlpacaConfig>,
http: reqwest::Client,
rate_limiter: Arc<RateLimitTracker>,
instruments: Vec<InstrumentNameExchange>,
initial_ws: Option<WebSocket>,
) {
let mut backoff = ExponentialBackoff::new();
let mut disconnect_time: Option<DateTime<Utc>> = None;
let mut current_ws = initial_ws;
'outer: loop {
let mut ws = match current_ws.take() {
Some(ws) => ws,
None => match connect_and_subscribe(&config).await {
Ok(ws) => ws,
Err(e) => {
error!(%e, "Alpaca WS connect/subscribe failed");
if !backoff.wait().await {
error!("Alpaca max reconnect attempts exhausted");
break;
}
continue;
}
},
};
info!("Alpaca account_stream connected and subscribed");
backoff.reset();
if let Some(dt) = disconnect_time.take() {
let base = config.rest_base_url();
let after_str = dt.to_rfc3339();
match tokio::time::timeout(
Duration::from_secs(FILL_RECOVERY_TIMEOUT_SECS),
recover_fills(
&http,
&rate_limiter,
&instruments,
base,
&after_str,
&tx,
&dedup,
),
)
.await
{
Ok(()) => {}
Err(_) => warn!(
timeout_secs = FILL_RECOVERY_TIMEOUT_SECS,
"Alpaca fill recovery timed out — some fills may be missing"
),
}
}
let mut last_message_time = Utc::now();
let heartbeat = tokio::time::sleep(Duration::from_secs(HEARTBEAT_TIMEOUT_SECS));
tokio::pin!(heartbeat);
macro_rules! reset_heartbeat {
() => {
heartbeat.as_mut().reset(
tokio::time::Instant::now() + Duration::from_secs(HEARTBEAT_TIMEOUT_SECS),
);
last_message_time = Utc::now();
};
}
loop {
tokio::select! {
msg = ws.next() => {
match msg {
Some(Ok(WsMessage::Ping(_))) => {
reset_heartbeat!();
}
Some(Ok(WsMessage::Text(text))) => {
process_ws_text(text.as_str(), &tx, &dedup, &mut backoff);
reset_heartbeat!();
}
Some(Ok(WsMessage::Binary(bytes))) => {
match std::str::from_utf8(&bytes) {
Ok(text) => {
process_ws_text(text, &tx, &dedup, &mut backoff);
reset_heartbeat!();
}
Err(e) => warn!(%e, "Alpaca WS binary frame: not valid UTF-8"),
}
}
Some(Ok(WsMessage::Close(frame))) => {
warn!(frame = ?frame, "Alpaca WS closed by server");
break;
}
Some(Ok(_)) => {} Some(Err(e)) => {
warn!(%e, "Alpaca WS error, reconnecting");
break;
}
None => {
warn!("Alpaca WS stream ended, reconnecting");
break;
}
}
}
_ = &mut heartbeat => {
warn!(
timeout_secs = HEARTBEAT_TIMEOUT_SECS,
"Alpaca heartbeat timeout, reconnecting"
);
break;
}
_ = tx.closed() => {
debug!("Alpaca account_stream consumer dropped, terminating");
let _ = tokio::time::timeout(
Duration::from_secs(WS_CLOSE_TIMEOUT_SECS),
ws.close(None),
).await;
break 'outer;
}
}
}
disconnect_time =
Some(last_message_time - chrono::Duration::milliseconds(SIGNAL_RECOVERY_LOOKBACK_MS));
let _ =
tokio::time::timeout(Duration::from_secs(WS_CLOSE_TIMEOUT_SECS), ws.close(None)).await;
if tx.is_closed() {
break;
}
if !backoff.wait().await {
error!("Alpaca max reconnect attempts exhausted, stream terminating");
break;
}
}
}
#[derive(Debug)]
enum HandshakeError {
Transport(String),
Auth(String),
}
async fn connect_and_subscribe(config: &AlpacaConfig) -> Result<WebSocket, UnindexedClientError> {
let url = config.ws_url();
debug!(%url, "Alpaca: connecting to WebSocket");
let mut ws = rustrade_integration::protocol::websocket::connect(url)
.await
.map_err(|e| {
UnindexedClientError::Connectivity(ConnectivityError::Socket(format!(
"WS connect: {e}"
)))
})?;
let result = tokio::time::timeout(
Duration::from_secs(WS_HANDSHAKE_TIMEOUT_SECS),
ws_handshake(&mut ws, config),
)
.await;
match result {
Ok(Ok(())) => Ok(ws),
Ok(Err(HandshakeError::Transport(e))) => {
let _ = ws.close(None).await;
Err(UnindexedClientError::Connectivity(
ConnectivityError::Socket(e),
))
}
Ok(Err(HandshakeError::Auth(e))) => {
let _ = ws.close(None).await;
Err(UnindexedClientError::Api(ApiError::Unauthenticated(e)))
}
Err(_) => {
let _ = ws.close(None).await;
Err(UnindexedClientError::Connectivity(
ConnectivityError::Timeout,
))
}
}
}
async fn ws_handshake(ws: &mut WebSocket, config: &AlpacaConfig) -> Result<(), HandshakeError> {
let auth = serde_json::json!({
"action": "auth",
"key": config.api_key(),
"secret": config.secret_key,
})
.to_string();
ws.send(WsMessage::Text(auth.into()))
.await
.map_err(|e| HandshakeError::Transport(format!("WS auth send: {e}")))?;
loop {
match ws.next().await {
Some(Ok(WsMessage::Text(text))) => {
if let Some(result) = check_auth_response(text.as_str()) {
result?;
break;
}
}
Some(Ok(WsMessage::Binary(bytes))) => {
if let Ok(text) = std::str::from_utf8(&bytes)
&& let Some(result) = check_auth_response(text)
{
result?;
break;
}
}
Some(Err(e)) => {
return Err(HandshakeError::Transport(format!(
"WS error during auth: {e}"
)));
}
None => {
return Err(HandshakeError::Transport(
"WS closed before auth response".into(),
));
}
_ => {} }
}
let sub = serde_json::json!({
"action": "listen",
"data": { "streams": ["trade_updates"] }
})
.to_string();
ws.send(WsMessage::Text(sub.into()))
.await
.map_err(|e| HandshakeError::Transport(format!("WS subscribe send: {e}")))?;
loop {
match ws.next().await {
Some(Ok(WsMessage::Text(text))) => {
if check_listen_ack(text.as_str()) {
break;
}
if let Ok(msg) = serde_json::from_str::<AlpacaStreamMessage<'_>>(text.as_str()) {
if msg.stream == "trade_updates" {
warn!(stream = %msg.stream, "WS trade_updates event dropped during listen-ack handshake — will be recovered via REST for fills, but lifecycle events (new/canceled) are lost");
} else {
trace!(stream = %msg.stream, "WS message dropped during listen-ack handshake");
}
} else {
trace!(
bytes = text.len(),
"WS non-stream message dropped during listen-ack handshake"
);
}
}
Some(Ok(WsMessage::Binary(bytes))) => {
if let Ok(text) = std::str::from_utf8(&bytes)
&& check_listen_ack(text)
{
break;
}
trace!(
bytes = bytes.len(),
"WS binary message dropped during listen-ack handshake"
);
}
Some(Err(e)) => {
return Err(HandshakeError::Transport(format!(
"WS error during subscribe: {e}"
)));
}
None => {
return Err(HandshakeError::Transport(
"WS closed before subscribe ack".into(),
));
}
_ => {}
}
}
info!("Alpaca WS authenticated and subscribed to trade_updates");
Ok(())
}
fn check_auth_response(text: &str) -> Option<Result<(), HandshakeError>> {
let msg = serde_json::from_str::<AlpacaStreamMessage<'_>>(text).ok()?;
if msg.stream != "authorization" {
return None;
}
#[derive(Deserialize)]
struct AuthData<'a> {
status: &'a str,
}
let data = serde_json::from_str::<AuthData<'_>>(msg.data.get()).ok()?;
if data.status == "authorized" {
Some(Ok(()))
} else {
Some(Err(HandshakeError::Auth(format!(
"Alpaca WS auth failed: status={}",
data.status
))))
}
}
fn check_listen_ack(text: &str) -> bool {
let Ok(msg) = serde_json::from_str::<AlpacaStreamMessage<'_>>(text) else {
return false;
};
if msg.stream != "listening" {
return false;
}
#[derive(Deserialize)]
struct ListenData<'a> {
#[serde(borrow)]
streams: Vec<&'a str>,
}
let Ok(data) = serde_json::from_str::<ListenData<'_>>(msg.data.get()) else {
return false;
};
data.streams.contains(&"trade_updates")
}
fn process_ws_text(
text: &str,
tx: &mpsc::UnboundedSender<UnindexedAccountEvent>,
dedup: &SharedDedupCache,
backoff: &mut ExponentialBackoff,
) {
let msg: AlpacaStreamMessage<'_> = match serde_json::from_str(text) {
Ok(m) => m,
Err(e) => {
trace!(
%e,
raw = ?&text[..text.len().min(200)],
"Alpaca WS: skipped non-JSON message"
);
return;
}
};
backoff.reset();
match msg.stream.as_str() {
"trade_updates" => {
let update: AlpacaTradeUpdate<'_> = match serde_json::from_str(msg.data.get()) {
Ok(u) => u,
Err(e) => {
warn!(
%e,
raw = ?&msg.data.get()[..msg.data.get().len().min(200)],
"Alpaca WS trade_updates: failed to deserialize event — event dropped"
);
return;
}
};
if is_fill_event(&update) {
let key = early_dedup_key(&update);
if is_duplicate(dedup, &key) {
trace!("Alpaca WS: skipping duplicate fill event (early check)");
return;
}
}
if let Some(event) = convert_trade_update(update) {
let _ = tx.send(event);
}
}
"authorization" | "listening" => {
trace!(stream = %msg.stream, "Alpaca WS: auth/listen ack received during stream");
}
other => {
trace!(%other, "Alpaca WS: ignoring unknown stream type");
}
}
}
fn fill_dedup_key_from_event(event: &UnindexedAccountEvent) -> Option<&SmolStr> {
match &event.kind {
AccountEventKind::Trade(trade) => Some(&trade.id.0),
_ => None,
}
}
#[inline]
fn is_fill_event(update: &AlpacaTradeUpdate<'_>) -> bool {
matches!(update.event.as_str(), "fill" | "partial_fill")
}
fn early_dedup_key(update: &AlpacaTradeUpdate<'_>) -> SmolStr {
let filled_qty = update.order.filled_qty.unwrap_or("0");
let qty = Decimal::from_str(filled_qty).unwrap_or(Decimal::ZERO);
format_smolstr!("{}:{}", update.order.id, qty.normalize())
}
async fn recover_fills(
http: &reqwest::Client,
rate_limiter: &RateLimitTracker,
instruments: &[InstrumentNameExchange],
base: &str,
after: &str,
tx: &mpsc::UnboundedSender<UnindexedAccountEvent>,
dedup: &SharedDedupCache,
) {
info!(%after, instruments = instruments.len(), "Alpaca recovering fills after reconnect");
let instrument_set: fnv::FnvHashSet<&str> = if instruments.is_empty() {
fnv::FnvHashSet::default()
} else {
instruments.iter().map(|i| i.name().as_str()).collect()
};
let page = match paginate_activities(http, rate_limiter, base, after).await {
Ok(p) => p,
Err(e) => {
error!(%e, "Alpaca fill recovery: REST request failed");
return;
}
};
if page.truncated {
error!(
max_pages = MAX_ACTIVITY_PAGES,
"Alpaca fill recovery: max page limit reached, truncating — \
fills from this outage are permanently lost. Manual reconciliation required."
);
}
let activities = page.activities;
let mut recovered = 0u32;
let mut duplicates = 0u32;
let mut cumulative_qty: FnvHashMap<&str, Decimal> = FnvHashMap::default();
for activity in &activities {
if !instrument_set.is_empty() && !instrument_set.contains(activity.symbol.as_str()) {
continue;
}
let exec_qty = Decimal::from_str(&activity.qty).unwrap_or(Decimal::ZERO);
let cum = cumulative_qty
.entry(activity.order_id.as_str())
.or_default();
*cum += exec_qty;
let cumulative = *cum;
let mut trade = match convert_activity_to_trade(activity) {
Some(t) => t,
None => {
warn!(id = %activity.id, symbol = %activity.symbol, "Alpaca: skipping activity with unparseable fields");
continue; }
};
trade.id = TradeId(format_smolstr!(
"{}:{}",
activity.order_id,
cumulative.normalize()
));
let event =
UnindexedAccountEvent::new(ExchangeId::AlpacaBroker, AccountEventKind::Trade(trade));
if fill_dedup_key_from_event(&event).is_some_and(|k| is_duplicate(dedup, k)) {
duplicates += 1;
continue;
}
if tx.send(event).is_err() {
debug!("Alpaca fill recovery: consumer dropped during recovery");
return;
}
recovered += 1;
}
info!(recovered, duplicates, "Alpaca fill recovery complete");
}
fn convert_account_to_balances(
account: &AlpacaAccount,
assets: &[AssetNameExchange],
) -> Vec<AssetBalance<AssetNameExchange>> {
let usd_entry = assets
.iter()
.find(|a| a.name().as_str().eq_ignore_ascii_case("usd"));
if !assets.is_empty() && usd_entry.is_none() {
return Vec::new();
}
let usd_name = usd_entry
.cloned()
.unwrap_or_else(|| AssetNameExchange::new("usd"));
let total = Decimal::from_str(&account.equity).unwrap_or(Decimal::ZERO);
let free = account
.options_buying_power
.as_deref()
.and_then(|s| Decimal::from_str(s).ok())
.filter(|d| !d.is_zero())
.unwrap_or_else(|| Decimal::from_str(&account.buying_power).unwrap_or(Decimal::ZERO));
vec![AssetBalance::new(
usd_name,
Balance::new(total, free),
Utc::now(),
)]
}
fn convert_positions_to_balances(
positions: &[AlpacaPosition],
assets: &[AssetNameExchange],
) -> Vec<AssetBalance<AssetNameExchange>> {
let now = Utc::now();
positions
.iter()
.filter(|p| p.asset_class.eq_ignore_ascii_case("crypto"))
.filter_map(|p| {
let base = p
.symbol
.split('/')
.next()
.map(|s| s.to_ascii_lowercase())
.unwrap_or_else(|| p.symbol.to_ascii_lowercase());
if !assets.is_empty()
&& !assets
.iter()
.any(|a| a.name().as_str().eq_ignore_ascii_case(&base))
{
return None;
}
let total = Decimal::from_str(&p.qty).unwrap_or(Decimal::ZERO);
let free = Decimal::from_str(&p.qty_available).unwrap_or(Decimal::ZERO);
let asset_name = AssetNameExchange::new(base);
Some(AssetBalance::new(
asset_name,
Balance::new(total, free),
now,
))
})
.collect()
}
fn build_instrument_snapshots(
orders: Vec<AlpacaOrderResponse>,
instruments: &[InstrumentNameExchange],
) -> Vec<InstrumentAccountSnapshot<ExchangeId, AssetNameExchange, InstrumentNameExchange>> {
let mut by_symbol: IndexMap<SmolStr, Vec<_>> = IndexMap::new();
for order in orders {
let sym = SmolStr::new(&order.symbol);
if let Some(converted) = convert_open_order(&order) {
let wrapped = crate::order::Order {
key: converted.key,
side: converted.side,
price: converted.price,
quantity: converted.quantity,
kind: converted.kind,
time_in_force: converted.time_in_force,
state: OrderState::active(converted.state),
};
by_symbol.entry(sym).or_default().push(wrapped);
}
}
if instruments.is_empty() {
by_symbol
.into_iter()
.map(|(sym, orders)| {
InstrumentAccountSnapshot::new(InstrumentNameExchange::new(sym), orders, None)
})
.collect()
} else {
instruments
.iter()
.map(|inst| {
let orders = by_symbol
.swap_remove(inst.name().as_str())
.unwrap_or_default();
InstrumentAccountSnapshot::new(inst.clone(), orders, None)
})
.collect()
}
}
fn convert_open_order(
o: &AlpacaOrderResponse,
) -> Option<Order<ExchangeId, InstrumentNameExchange, Open>> {
let order_id = OrderId(SmolStr::new(&o.id));
let cid = o
.client_order_id
.as_deref()
.map(ClientOrderId::new)
.unwrap_or_else(|| ClientOrderId::new(o.id.as_str()));
let instrument = InstrumentNameExchange::new(&o.symbol);
let side = parse_side(&o.side)?;
let quantity = Decimal::from_str(o.qty.as_deref().unwrap_or("0")).ok()?;
if quantity.is_zero() {
return None;
}
let price = o
.limit_price
.as_deref()
.and_then(|s| Decimal::from_str(s).ok());
let filled_qty = Decimal::from_str(&o.filled_qty).unwrap_or(Decimal::ZERO);
let kind = parse_order_kind(
&o.order_type,
o.stop_price.as_deref(),
o.trail_percent.as_deref(),
o.trail_price.as_deref(),
)?;
let time_in_force = parse_time_in_force(&o.time_in_force);
let time_exchange = parse_timestamp(&o.created_at).unwrap_or_else(Utc::now);
Some(Order {
key: OrderKey::new(
ExchangeId::AlpacaBroker,
instrument,
StrategyId::unknown(),
cid,
),
side,
price,
quantity,
kind,
time_in_force,
state: Open::new(order_id, time_exchange, filled_qty),
})
}
fn convert_activity_to_trade(
a: &AlpacaActivity,
) -> Option<Trade<AssetNameExchange, InstrumentNameExchange>> {
let trade_id = TradeId::new(&a.id);
let order_id = OrderId(SmolStr::new(&a.order_id));
let instrument = InstrumentNameExchange::new(&a.symbol);
let side = parse_side(&a.side)?;
let price = Decimal::from_str(&a.price).ok()?;
let quantity = Decimal::from_str(&a.qty).ok()?;
let time_exchange = parse_timestamp(&a.transaction_time).unwrap_or_else(|| {
warn!(id = %a.id, "Alpaca activity: unparseable transaction_time, using now");
Utc::now()
});
Some(Trade::new(
trade_id,
order_id,
instrument,
StrategyId::unknown(),
time_exchange,
side,
price,
quantity,
AssetFees::new(
AssetNameExchange::from("USD"),
Decimal::ZERO,
Some(Decimal::ZERO),
),
))
}
fn convert_trade_update(update: AlpacaTradeUpdate<'_>) -> Option<UnindexedAccountEvent> {
let event_str = update.event.as_str();
if !matches!(
event_str,
"fill"
| "partial_fill"
| "new"
| "accepted"
| "pending_new"
| "canceled"
| "expired"
| "replaced"
| "done_for_day"
| "rejected"
) {
trace!(event = %event_str, "Alpaca WS: ignoring trade_updates event type");
return None;
}
let order = &update.order;
let instrument = InstrumentNameExchange::new(&*order.symbol);
let order_id = OrderId(order.id.clone());
let cid = order
.client_order_id
.as_deref()
.map(ClientOrderId::new)
.unwrap_or_else(|| ClientOrderId::new(order.id.as_str()));
match event_str {
"fill" | "partial_fill" => {
let price = update.price.and_then(|s| Decimal::from_str(s).ok())?;
let quantity = update.qty.and_then(|s| Decimal::from_str(s).ok())?;
let side = parse_side(&order.side)?;
let time_exchange = update
.timestamp
.and_then(parse_timestamp)
.unwrap_or_else(Utc::now);
let cum_qty = Decimal::from_str(order.filled_qty.unwrap_or("0"))
.inspect_err(|e| {
warn!(
order_id = %order.id,
filled_qty = ?order.filled_qty,
%e,
"Alpaca WS: failed to parse filled_qty — dedup key will use 0, \
a second malformed fill on the same order would be deduplicated away"
);
})
.unwrap_or(Decimal::ZERO);
let trade_id = TradeId(format_smolstr!("{}:{}", order.id, cum_qty.normalize()));
let trade = Trade::new(
trade_id,
order_id,
instrument,
StrategyId::unknown(),
time_exchange,
side,
price,
quantity,
AssetFees::new(
AssetNameExchange::from("USD"),
Decimal::ZERO,
Some(Decimal::ZERO),
),
);
Some(UnindexedAccountEvent::new(
ExchangeId::AlpacaBroker,
AccountEventKind::Trade(trade),
))
}
"new" | "accepted" | "pending_new" => {
let side = parse_side(&order.side)?;
let quantity = Decimal::from_str(order.qty.unwrap_or("0")).unwrap_or(Decimal::ZERO);
if quantity.is_zero() {
trace!(order_id = %order.id, "Alpaca WS: skipping notional order snapshot (qty=None)");
return None;
}
let price = order.limit_price.and_then(|s| Decimal::from_str(s).ok());
let filled_qty =
Decimal::from_str(order.filled_qty.unwrap_or("0")).unwrap_or(Decimal::ZERO);
let kind = parse_order_kind(
&order.order_type,
order.stop_price,
order.trail_percent,
order.trail_price,
)?;
let time_in_force = parse_time_in_force(&order.time_in_force);
let time_exchange = update
.timestamp
.and_then(parse_timestamp)
.unwrap_or_else(Utc::now);
let open_state = Open::new(order_id, time_exchange, filled_qty);
let order_snapshot = crate::order::Order {
key: OrderKey::new(
ExchangeId::AlpacaBroker,
instrument,
StrategyId::unknown(),
cid,
),
side,
price,
quantity,
kind,
time_in_force,
state: OrderState::active(open_state),
};
Some(UnindexedAccountEvent::new(
ExchangeId::AlpacaBroker,
AccountEventKind::OrderSnapshot(
rustrade_integration::collection::snapshot::Snapshot(order_snapshot),
),
))
}
"canceled" | "expired" | "replaced" | "done_for_day" => {
let time_exchange = update
.timestamp
.and_then(parse_timestamp)
.unwrap_or_else(Utc::now);
let filled_qty =
Decimal::from_str(order.filled_qty.unwrap_or("0")).unwrap_or(Decimal::ZERO);
let cancelled = Cancelled::new(order_id, time_exchange, filled_qty);
let response = crate::order::request::OrderResponseCancel {
key: OrderKey::new(
ExchangeId::AlpacaBroker,
instrument,
StrategyId::unknown(),
cid,
),
state: Ok(cancelled),
};
Some(UnindexedAccountEvent::new(
ExchangeId::AlpacaBroker,
AccountEventKind::OrderCancelled(response),
))
}
"rejected" => {
let response = crate::order::request::OrderResponseCancel {
key: OrderKey::new(
ExchangeId::AlpacaBroker,
instrument,
StrategyId::unknown(),
cid,
),
state: Err(UnindexedOrderError::Rejected(ApiError::OrderRejected(
format!("order rejected: status={}", order.status),
))),
};
Some(UnindexedAccountEvent::new(
ExchangeId::AlpacaBroker,
AccountEventKind::OrderCancelled(response),
))
}
_ => unreachable!("convert_trade_update: unrecognised event passed early-return guard"),
}
}
fn parse_side(s: &str) -> Option<Side> {
match s {
"buy" | "Buy" | "BUY" => Some(Side::Buy),
"sell" | "Sell" | "SELL" => Some(Side::Sell),
other => {
trace!(%other, "Alpaca: unknown order side");
None
}
}
}
fn parse_order_kind(
order_type: &str,
stop_price: Option<&str>,
trail_percent: Option<&str>,
trail_price: Option<&str>,
) -> Option<OrderKind> {
match order_type {
"market" | "Market" => Some(OrderKind::Market),
"limit" | "Limit" => Some(OrderKind::Limit),
"stop" | "Stop" => {
let trigger_price = stop_price.and_then(|s| Decimal::from_str(s).ok())?;
Some(OrderKind::Stop { trigger_price })
}
"stop_limit" | "Stop_limit" => {
let trigger_price = stop_price.and_then(|s| Decimal::from_str(s).ok())?;
Some(OrderKind::StopLimit { trigger_price })
}
"trailing_stop" | "Trailing_stop" => {
if let Some(pct) = trail_percent.and_then(|s| Decimal::from_str(s).ok()) {
Some(OrderKind::TrailingStop {
offset: pct,
offset_type: TrailingOffsetType::Percentage,
})
} else if let Some(price) = trail_price.and_then(|s| Decimal::from_str(s).ok()) {
Some(OrderKind::TrailingStop {
offset: price,
offset_type: TrailingOffsetType::Absolute,
})
} else {
trace!("Alpaca: trailing_stop missing trail_percent and trail_price");
None
}
}
other => {
trace!(%other, "Alpaca: unsupported order type, skipping");
None
}
}
}
fn parse_time_in_force(s: &str) -> TimeInForce {
match s {
"gtc" | "GTC" => TimeInForce::GoodUntilCancelled { post_only: false },
"day" | "DAY" => TimeInForce::GoodUntilEndOfDay,
"fok" | "FOK" => TimeInForce::FillOrKill,
"ioc" | "IOC" => TimeInForce::ImmediateOrCancel,
other => {
warn!(%other, "Alpaca: unknown time_in_force, defaulting to GoodUntilEndOfDay");
TimeInForce::GoodUntilEndOfDay
}
}
}
fn parse_timestamp(s: &str) -> Option<DateTime<Utc>> {
DateTime::parse_from_rfc3339(s)
.ok()
.map(|dt| dt.with_timezone(&Utc))
}
fn map_side(side: Side) -> &'static str {
match side {
Side::Buy => "buy",
Side::Sell => "sell",
}
}
fn map_order_kind(kind: OrderKind) -> Option<&'static str> {
match kind {
OrderKind::Market => Some("market"),
OrderKind::Limit => Some("limit"),
OrderKind::Stop { .. } => Some("stop"),
OrderKind::StopLimit { .. } => Some("stop_limit"),
OrderKind::TrailingStop { .. } => Some("trailing_stop"),
OrderKind::TrailingStopLimit { .. } => None,
}
}
fn map_time_in_force(tif: TimeInForce) -> Result<&'static str, &'static str> {
match tif {
TimeInForce::GoodUntilCancelled { post_only } => {
if post_only {
return Err("Alpaca does not support post_only orders");
}
Ok("gtc")
}
TimeInForce::GoodUntilEndOfDay => Ok("day"),
TimeInForce::FillOrKill => Ok("fok"),
TimeInForce::ImmediateOrCancel => Ok("ioc"),
TimeInForce::AtOpen => Ok("opg"),
TimeInForce::AtClose => Ok("cls"),
TimeInForce::GoodTillDate { .. } => {
Err("Alpaca GoodTillDate is not yet wired through this client")
}
}
}
fn is_options_or_equity_symbol(symbol: &str) -> bool {
!symbol.contains('/')
}
fn map_position_intent(side: Side, reduce_only: bool) -> AlpacaPositionIntent {
match (reduce_only, side) {
(false, Side::Buy) => AlpacaPositionIntent::BuyToOpen,
(false, Side::Sell) => AlpacaPositionIntent::SellToOpen,
(true, Side::Buy) => AlpacaPositionIntent::BuyToClose,
(true, Side::Sell) => AlpacaPositionIntent::SellToClose,
}
}
fn parse_api_error(status: reqwest::StatusCode, message: &str) -> crate::error::UnindexedApiError {
if status == reqwest::StatusCode::TOO_MANY_REQUESTS {
return ApiError::RateLimit;
}
let lower = message.to_ascii_lowercase();
match status.as_u16() {
422 if lower.contains("already") => ApiError::OrderAlreadyCancelled,
422 if lower.contains("insufficient") => {
ApiError::BalanceInsufficient(AssetNameExchange::new("usd"), message.to_owned())
}
401 => ApiError::Unauthenticated(format!("unauthorized: {message}")),
403 => ApiError::Unauthenticated(format!("forbidden: {message}")),
404 => ApiError::OrderRejected(format!("order not found: {message}")),
_ => ApiError::OrderRejected(message.to_owned()),
}
}
fn parse_order_error(status: reqwest::StatusCode, message: &str) -> UnindexedOrderError {
UnindexedOrderError::Rejected(parse_api_error(status, message))
}
fn connectivity_err(msg: impl Into<String>) -> UnindexedClientError {
UnindexedClientError::Connectivity(ConnectivityError::Socket(msg.into()))
}
impl BracketOrderClient for AlpacaClient {
async fn open_bracket_order(
&self,
request: UnifiedBracketOrderRequest<ExchangeId, &InstrumentNameExchange>,
) -> UnifiedBracketOrderResult {
let alpaca_request = AlpacaBracketOrderRequest {
instrument: request.key.instrument.clone(),
strategy: request.key.strategy.clone(),
cid: request.key.cid.clone(),
side: request.state.side,
quantity: request.state.quantity,
entry_price: request.state.entry_price,
take_profit_price: request.state.take_profit_price,
stop_loss_price: request.state.stop_loss_price,
stop_loss_limit_price: request.state.stop_loss_limit_price,
time_in_force: request.state.time_in_force,
};
let result = self.open_bracket_order(alpaca_request).await;
UnifiedBracketOrderResult::parent_only(result.parent)
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)] mod tests {
use super::*;
#[test]
fn test_alpaca_config_debug_redacts_credentials() {
let cfg = AlpacaConfig::new("my_key".into(), "my_secret".into(), true);
let debug = format!("{cfg:?}");
assert!(!debug.contains("my_key"), "api_key should be redacted");
assert!(
!debug.contains("my_secret"),
"secret_key should be redacted"
);
assert!(debug.contains("paper: true"));
}
#[test]
fn test_alpaca_config_urls() {
let paper = AlpacaConfig::new("k".into(), "s".into(), true);
assert!(paper.rest_base_url().contains("paper-api"));
assert!(paper.ws_url().contains("paper-api"));
let live = AlpacaConfig::new("k".into(), "s".into(), false);
assert!(!live.rest_base_url().contains("paper-api"));
assert!(!live.ws_url().contains("paper-api"));
}
#[test]
fn test_parse_side() {
assert_eq!(parse_side("buy"), Some(Side::Buy));
assert_eq!(parse_side("sell"), Some(Side::Sell));
assert_eq!(parse_side("Buy"), Some(Side::Buy));
assert_eq!(parse_side("BUY"), Some(Side::Buy));
assert_eq!(parse_side("unknown"), None);
}
#[test]
fn test_parse_order_kind() {
assert_eq!(
parse_order_kind("market", None, None, None),
Some(OrderKind::Market)
);
assert_eq!(
parse_order_kind("limit", None, None, None),
Some(OrderKind::Limit)
);
assert_eq!(
parse_order_kind("stop", Some("150.00"), None, None),
Some(OrderKind::Stop {
trigger_price: Decimal::from_str("150.00").unwrap()
})
);
assert_eq!(parse_order_kind("stop", None, None, None), None);
assert_eq!(
parse_order_kind("stop_limit", Some("145.00"), None, None),
Some(OrderKind::StopLimit {
trigger_price: Decimal::from_str("145.00").unwrap()
})
);
assert_eq!(
parse_order_kind("trailing_stop", None, Some("5.0"), None),
Some(OrderKind::TrailingStop {
offset: Decimal::from_str("5.0").unwrap(),
offset_type: TrailingOffsetType::Percentage,
})
);
assert_eq!(
parse_order_kind("trailing_stop", None, None, Some("2.50")),
Some(OrderKind::TrailingStop {
offset: Decimal::from_str("2.50").unwrap(),
offset_type: TrailingOffsetType::Absolute,
})
);
assert_eq!(parse_order_kind("trailing_stop", None, None, None), None);
assert_eq!(parse_order_kind("unknown", None, None, None), None);
}
#[test]
fn test_map_order_kind() {
assert_eq!(map_order_kind(OrderKind::Market), Some("market"));
assert_eq!(map_order_kind(OrderKind::Limit), Some("limit"));
assert_eq!(
map_order_kind(OrderKind::Stop {
trigger_price: Decimal::from_str("150.00").unwrap()
}),
Some("stop")
);
assert_eq!(
map_order_kind(OrderKind::StopLimit {
trigger_price: Decimal::from_str("145.00").unwrap()
}),
Some("stop_limit")
);
assert_eq!(
map_order_kind(OrderKind::TrailingStop {
offset: Decimal::from_str("5.0").unwrap(),
offset_type: TrailingOffsetType::Percentage,
}),
Some("trailing_stop")
);
assert_eq!(
map_order_kind(OrderKind::TrailingStop {
offset: Decimal::from_str("2.50").unwrap(),
offset_type: TrailingOffsetType::Absolute,
}),
Some("trailing_stop")
);
assert_eq!(
map_order_kind(OrderKind::TrailingStopLimit {
offset: Decimal::from_str("5.0").unwrap(),
offset_type: TrailingOffsetType::Percentage,
limit_offset: Decimal::from_str("1.0").unwrap(),
}),
None
);
}
#[test]
fn test_map_time_in_force_roundtrip() {
assert_eq!(
map_time_in_force(TimeInForce::GoodUntilCancelled { post_only: false }),
Ok("gtc")
);
assert_eq!(map_time_in_force(TimeInForce::GoodUntilEndOfDay), Ok("day"));
assert_eq!(map_time_in_force(TimeInForce::FillOrKill), Ok("fok"));
assert_eq!(map_time_in_force(TimeInForce::ImmediateOrCancel), Ok("ioc"));
}
#[test]
fn test_map_time_in_force_rejects_post_only() {
let result = map_time_in_force(TimeInForce::GoodUntilCancelled { post_only: true });
assert!(result.is_err(), "post_only must be rejected");
assert!(result.unwrap_err().contains("post_only"));
}
#[test]
fn test_bracket_order_serializes_with_stop_loss_stop_order() {
let body = AlpacaOrderRequest {
symbol: "AAPL",
qty: "10".to_string(),
side: "buy",
order_type: "limit",
time_in_force: "gtc",
limit_price: Some("150.00".to_string()),
stop_price: None,
trail_percent: None,
trail_price: None,
client_order_id: Some("bracket-001"),
position_intent: Some(AlpacaPositionIntent::BuyToOpen),
order_class: Some("bracket"),
take_profit: Some(TakeProfitParams {
limit_price: "160.00".to_string(),
}),
stop_loss: Some(StopLossParams {
stop_price: "145.00".to_string(),
limit_price: None,
}),
};
let json = serde_json::to_value(&body).unwrap();
assert_eq!(json["symbol"], "AAPL");
assert_eq!(json["qty"], "10");
assert_eq!(json["side"], "buy");
assert_eq!(json["type"], "limit");
assert_eq!(json["time_in_force"], "gtc");
assert_eq!(json["limit_price"], "150.00");
assert_eq!(json["order_class"], "bracket");
assert_eq!(json["take_profit"]["limit_price"], "160.00");
assert_eq!(json["stop_loss"]["stop_price"], "145.00");
assert!(
json["stop_loss"].get("limit_price").is_none(),
"stop_loss.limit_price should be omitted when None"
);
}
#[test]
fn test_bracket_order_serializes_with_stop_loss_stop_limit_order() {
let body = AlpacaOrderRequest {
symbol: "SPY",
qty: "5".to_string(),
side: "sell",
order_type: "limit",
time_in_force: "day",
limit_price: Some("450.00".to_string()),
stop_price: None,
trail_percent: None,
trail_price: None,
client_order_id: Some("bracket-002"),
position_intent: Some(AlpacaPositionIntent::SellToClose),
order_class: Some("bracket"),
take_profit: Some(TakeProfitParams {
limit_price: "440.00".to_string(),
}),
stop_loss: Some(StopLossParams {
stop_price: "455.00".to_string(),
limit_price: Some("456.00".to_string()),
}),
};
let json = serde_json::to_value(&body).unwrap();
assert_eq!(json["symbol"], "SPY");
assert_eq!(json["side"], "sell");
assert_eq!(json["time_in_force"], "day");
assert_eq!(json["order_class"], "bracket");
assert_eq!(json["take_profit"]["limit_price"], "440.00");
assert_eq!(json["stop_loss"]["stop_price"], "455.00");
assert_eq!(
json["stop_loss"]["limit_price"], "456.00",
"stop_loss.limit_price should be present for stop-limit orders"
);
}
#[tokio::test]
async fn test_open_bracket_order_rejects_invalid_tif() {
use rust_decimal_macros::dec;
use rustrade_instrument::instrument::name::InstrumentNameExchange;
let config = AlpacaConfig::new("dummy_key".into(), "dummy_secret".into(), true);
let client = AlpacaClient::new(config);
let request = AlpacaBracketOrderRequest::new(
InstrumentNameExchange::new("SPY"),
crate::order::id::StrategyId::new("test"),
crate::order::id::ClientOrderId::new("test-tif"),
Side::Buy,
dec!(1),
dec!(100.00),
dec!(120.00),
dec!(90.00),
TimeInForce::ImmediateOrCancel, );
let result = client.open_bracket_order(request).await;
assert!(
result.parent.state.is_failed(),
"Bracket order with IOC TIF should be rejected locally"
);
}
#[tokio::test]
async fn test_open_bracket_order_rejects_invalid_price_ordering() {
use rust_decimal_macros::dec;
use rustrade_instrument::instrument::name::InstrumentNameExchange;
let config = AlpacaConfig::new("dummy_key".into(), "dummy_secret".into(), true);
let client = AlpacaClient::new(config);
let request = AlpacaBracketOrderRequest::new(
InstrumentNameExchange::new("SPY"),
crate::order::id::StrategyId::new("test"),
crate::order::id::ClientOrderId::new("test-price"),
Side::Buy,
dec!(1),
dec!(100.00),
dec!(120.00),
dec!(105.00), TimeInForce::GoodUntilCancelled { post_only: false },
);
let result = client.open_bracket_order(request).await;
assert!(
result.parent.state.is_failed(),
"Bracket order with invalid price ordering should be rejected locally"
);
}
#[tokio::test]
async fn test_open_bracket_order_rejects_invalid_sl_limit_price() {
use rust_decimal_macros::dec;
use rustrade_instrument::instrument::name::InstrumentNameExchange;
let config = AlpacaConfig::new("dummy_key".into(), "dummy_secret".into(), true);
let client = AlpacaClient::new(config);
let request = AlpacaBracketOrderRequest::new(
InstrumentNameExchange::new("SPY"),
crate::order::id::StrategyId::new("test"),
crate::order::id::ClientOrderId::new("test-sl-limit"),
Side::Buy,
dec!(1),
dec!(100.00),
dec!(120.00),
dec!(90.00),
TimeInForce::GoodUntilCancelled { post_only: false },
)
.with_stop_loss_limit_price(dec!(95.00));
let result = client.open_bracket_order(request).await;
assert!(
result.parent.state.is_failed(),
"Bracket order with invalid SL limit price should be rejected locally"
);
}
#[test]
fn test_non_bracket_order_omits_bracket_fields() {
let body = AlpacaOrderRequest {
symbol: "AAPL",
qty: "1".to_string(),
side: "buy",
order_type: "limit",
time_in_force: "gtc",
limit_price: Some("150.00".to_string()),
stop_price: None,
trail_percent: None,
trail_price: None,
client_order_id: Some("regular-001"),
position_intent: Some(AlpacaPositionIntent::BuyToOpen),
order_class: None,
take_profit: None,
stop_loss: None,
};
let json = serde_json::to_value(&body).unwrap();
assert_eq!(json["symbol"], "AAPL");
assert!(
json.get("order_class").is_none(),
"order_class should be omitted for non-bracket orders"
);
assert!(
json.get("take_profit").is_none(),
"take_profit should be omitted for non-bracket orders"
);
assert!(
json.get("stop_loss").is_none(),
"stop_loss should be omitted for non-bracket orders"
);
}
#[test]
fn test_parse_timestamp_valid() {
let ts = parse_timestamp("2025-04-18T14:30:00Z");
assert!(ts.is_some());
let ts2 = parse_timestamp("2025-04-18T14:30:00.123456Z");
assert!(ts2.is_some());
assert_eq!(parse_timestamp("not-a-timestamp"), None);
}
#[test]
fn test_check_auth_response_authorized() {
let msg =
r#"{"stream":"authorization","data":{"status":"authorized","action":"authenticate"}}"#;
assert!(matches!(check_auth_response(msg), Some(Ok(()))));
}
#[test]
fn test_check_auth_response_unauthorized() {
let msg = r#"{"stream":"authorization","data":{"status":"unauthorized"}}"#;
assert!(matches!(
check_auth_response(msg),
Some(Err(HandshakeError::Auth(_)))
));
}
#[test]
fn test_check_auth_response_non_auth_message() {
let msg = r#"{"stream":"trade_updates","data":{}}"#;
assert!(check_auth_response(msg).is_none());
}
#[test]
fn test_check_listen_ack() {
let ack = r#"{"stream":"listening","data":{"streams":["trade_updates"]}}"#;
assert!(check_listen_ack(ack));
let other = r#"{"stream":"authorization","data":{}}"#;
assert!(!check_listen_ack(other));
}
#[test]
fn test_dedup_cache() {
let cache = new_dedup_cache();
let key = SmolStr::new("order-1:1");
assert!(
!is_duplicate(&cache, &key),
"first time should not be duplicate"
);
assert!(
is_duplicate(&cache, &key),
"second time should be duplicate"
);
}
#[tokio::test]
async fn test_exponential_backoff_progression_and_exhaustion() {
tokio::time::pause();
let mut b = ExponentialBackoff::new();
assert!(b.wait().await, "first wait should return true");
assert_eq!(b.attempt, 1);
while b.wait().await {}
assert_eq!(b.attempt, MAX_RECONNECT_ATTEMPTS);
assert!(!b.wait().await, "exhausted backoff should return false");
b.reset();
assert_eq!(b.attempt, 0);
assert!(b.wait().await, "wait should succeed after reset");
assert_eq!(b.attempt, 1);
}
#[test]
fn test_convert_account_to_balances_empty_assets() {
let account = AlpacaAccount {
equity: "12000.00".into(),
buying_power: "10000.00".into(),
options_buying_power: Some("8000.00".into()),
crypto_buying_power: None,
};
let balances = convert_account_to_balances(&account, &[]);
assert_eq!(balances.len(), 1);
assert_eq!(
balances[0].balance.total,
Decimal::from_str("12000.00").unwrap()
);
assert_eq!(
balances[0].balance.free,
Decimal::from_str("8000.00").unwrap()
);
}
#[test]
fn test_convert_account_to_balances_usd_filter() {
let account = AlpacaAccount {
equity: "12000.00".into(),
buying_power: "10000.00".into(),
options_buying_power: None,
crypto_buying_power: None,
};
let usd = vec![AssetNameExchange::new("USD")];
let balances = convert_account_to_balances(&account, &usd);
assert_eq!(balances.len(), 1);
let non_usd = vec![AssetNameExchange::new("BTC")];
let balances = convert_account_to_balances(&account, &non_usd);
assert!(balances.is_empty());
}
#[test]
fn test_is_options_or_equity_symbol() {
assert!(!is_options_or_equity_symbol("BTC/USD"));
assert!(!is_options_or_equity_symbol("ETH/USD"));
assert!(!is_options_or_equity_symbol("SOL/USD"));
assert!(is_options_or_equity_symbol("AAPL"));
assert!(is_options_or_equity_symbol("SPY"));
assert!(is_options_or_equity_symbol("MSFT"));
assert!(is_options_or_equity_symbol("SPY250418C00450000"));
assert!(is_options_or_equity_symbol("AAPL250418P00145000"));
}
#[test]
fn test_parse_order_error_already_cancelled() {
assert!(matches!(
parse_order_error(
reqwest::StatusCode::UNPROCESSABLE_ENTITY,
"order is already cancelled"
),
UnindexedOrderError::Rejected(ApiError::OrderAlreadyCancelled)
));
}
#[test]
fn test_parse_order_error_already_wins_over_insufficient_on_422() {
assert!(matches!(
parse_order_error(
reqwest::StatusCode::UNPROCESSABLE_ENTITY,
"order already cancelled due to insufficient margin"
),
UnindexedOrderError::Rejected(ApiError::OrderAlreadyCancelled)
));
}
fn make_order_ws<'a>(
id: &str,
symbol: &str,
side: &str,
filled_qty: &'a str,
) -> AlpacaOrderWs<'a> {
AlpacaOrderWs {
id: SmolStr::new(id),
client_order_id: None,
symbol: SmolStr::new(symbol),
qty: Some("2"),
filled_qty: Some(filled_qty),
side: SmolStr::new(side),
order_type: SmolStr::new("limit"),
time_in_force: SmolStr::new("day"),
limit_price: Some("100.00"),
stop_price: None,
trail_percent: None,
trail_price: None,
status: SmolStr::new("partially_filled"),
}
}
#[test]
fn test_convert_trade_update_fill_produces_trade_with_dedup_key() {
let update = AlpacaTradeUpdate {
event: SmolStr::new("fill"),
order: make_order_ws("ord-1", "SPY", "buy", "1"),
price: Some("150.00"),
qty: Some("1"),
timestamp: Some("2025-04-18T14:30:00Z"),
};
let event = convert_trade_update(update).expect("fill should produce an event");
let AccountEventKind::Trade(trade) = event.kind else {
panic!("expected Trade, got {:?}", event.kind);
};
assert_eq!(trade.id.0.as_str(), "ord-1:1");
assert_eq!(trade.price, Decimal::from_str("150.00").unwrap());
assert_eq!(trade.quantity, Decimal::from_str("1").unwrap());
}
#[test]
fn test_convert_trade_update_partial_fill() {
let update = AlpacaTradeUpdate {
event: SmolStr::new("partial_fill"),
order: make_order_ws("ord-2", "AAPL", "sell", "0.5"),
price: Some("200.00"),
qty: Some("0.5"),
timestamp: None,
};
let event = convert_trade_update(update).expect("partial_fill should produce an event");
assert!(matches!(event.kind, AccountEventKind::Trade(_)));
}
#[test]
fn test_convert_trade_update_new_order_produces_snapshot() {
let update = AlpacaTradeUpdate {
event: SmolStr::new("new"),
order: AlpacaOrderWs {
id: SmolStr::new("ord-new"),
client_order_id: Some(SmolStr::new("cid-1")),
symbol: SmolStr::new("AAPL"),
qty: Some("10"),
filled_qty: Some("0"),
side: SmolStr::new("buy"),
order_type: SmolStr::new("limit"),
time_in_force: SmolStr::new("day"),
limit_price: Some("150.00"),
stop_price: None,
trail_percent: None,
trail_price: None,
status: SmolStr::new("new"),
},
price: None,
qty: None,
timestamp: Some("2025-04-18T14:30:00Z"),
};
let event = convert_trade_update(update).expect("new event should produce an event");
assert!(matches!(event.kind, AccountEventKind::OrderSnapshot(_)));
}
#[test]
fn test_convert_trade_update_canceled_produces_cancel() {
let update = AlpacaTradeUpdate {
event: SmolStr::new("canceled"),
order: make_order_ws("ord-3", "AAPL", "sell", "0"),
price: None,
qty: None,
timestamp: Some("2025-04-18T14:30:00Z"),
};
let event = convert_trade_update(update).expect("canceled should produce an event");
let AccountEventKind::OrderCancelled(response) = event.kind else {
panic!("expected OrderCancelled, got {:?}", event.kind);
};
assert!(response.state.is_ok());
}
#[test]
fn test_convert_trade_update_rejected_produces_error() {
let update = AlpacaTradeUpdate {
event: SmolStr::new("rejected"),
order: make_order_ws("ord-4", "SPY", "buy", "0"),
price: None,
qty: None,
timestamp: None,
};
let event = convert_trade_update(update).expect("rejected should produce an event");
let AccountEventKind::OrderCancelled(response) = event.kind else {
panic!("expected OrderCancelled, got {:?}", event.kind);
};
assert!(response.state.is_err());
}
#[test]
fn test_convert_open_order_notional_qty_none_is_skipped() {
let order = AlpacaOrderResponse {
id: "ord-notional".to_string(),
client_order_id: None,
symbol: "SPY".to_string(),
qty: None,
filled_qty: "0".to_string(),
side: "buy".to_string(),
order_type: "market".to_string(),
time_in_force: "day".to_string(),
limit_price: None,
stop_price: None,
trail_percent: None,
trail_price: None,
created_at: "2025-04-18T14:30:00Z".to_string(),
};
assert!(convert_open_order(&order).is_none());
}
#[test]
fn test_convert_activity_to_trade_bad_price_returns_none() {
let activity = AlpacaActivity {
id: "act-1".to_string(),
order_id: "ord-1".to_string(),
symbol: "SPY250418C00450000".to_string(),
side: "buy".to_string(),
price: "not-a-number".to_string(),
qty: "1".to_string(),
transaction_time: "2025-04-18T14:30:00Z".to_string(),
};
assert!(convert_activity_to_trade(&activity).is_none());
}
#[test]
fn test_convert_positions_to_balances_crypto() {
let positions = vec![
AlpacaPosition {
symbol: "BTC/USD".into(),
asset_class: "crypto".into(),
qty: "0.5".into(),
qty_available: "0.4".into(),
},
AlpacaPosition {
symbol: "ETH/USD".into(),
asset_class: "crypto".into(),
qty: "2.0".into(),
qty_available: "2.0".into(),
},
AlpacaPosition {
symbol: "AAPL".into(),
asset_class: "us_equity".into(),
qty: "10".into(),
qty_available: "10".into(),
},
];
let balances = convert_positions_to_balances(&positions, &[]);
assert_eq!(balances.len(), 2, "only crypto positions returned");
assert_eq!(balances[0].asset.name().as_str(), "btc");
assert_eq!(balances[0].balance.total, Decimal::from_str("0.5").unwrap());
assert_eq!(balances[0].balance.free, Decimal::from_str("0.4").unwrap());
let btc_only = vec![AssetNameExchange::new("BTC")];
let balances = convert_positions_to_balances(&positions, &btc_only);
assert_eq!(balances.len(), 1);
assert_eq!(balances[0].asset.name().as_str(), "btc");
}
fn make_order_response(id: &str, symbol: &str) -> AlpacaOrderResponse {
AlpacaOrderResponse {
id: id.to_string(),
client_order_id: None,
symbol: symbol.to_string(),
qty: Some("1".to_string()),
filled_qty: "0".to_string(),
side: "buy".to_string(),
order_type: "limit".to_string(),
time_in_force: "day".to_string(),
limit_price: Some("100.00".to_string()),
stop_price: None,
trail_percent: None,
trail_price: None,
created_at: "2025-04-18T14:30:00Z".to_string(),
}
}
#[test]
fn test_build_instrument_snapshots_empty_instruments_returns_only_with_orders() {
let orders = vec![
make_order_response("o1", "AAPL"),
make_order_response("o2", "SPY"),
];
let snapshots = build_instrument_snapshots(orders, &[]);
assert_eq!(snapshots.len(), 2);
let symbols: Vec<&str> = snapshots
.iter()
.map(|s| s.instrument.name().as_str())
.collect();
assert!(symbols.contains(&"AAPL"));
assert!(symbols.contains(&"SPY"));
}
#[test]
fn test_build_instrument_snapshots_requested_instrument_no_orders_gets_empty_snapshot() {
let orders = vec![make_order_response("o1", "AAPL")];
let instruments = vec![
InstrumentNameExchange::new("AAPL"),
InstrumentNameExchange::new("SPY"),
];
let snapshots = build_instrument_snapshots(orders, &instruments);
assert_eq!(snapshots.len(), 2);
let spy = snapshots
.iter()
.find(|s| s.instrument.name().as_str() == "SPY")
.expect("SPY snapshot must be present even with no orders");
assert!(spy.orders.is_empty());
}
#[test]
fn test_build_instrument_snapshots_non_requested_instrument_excluded() {
let orders = vec![
make_order_response("o1", "AAPL"),
make_order_response("o2", "MSFT"), ];
let instruments = vec![InstrumentNameExchange::new("AAPL")];
let snapshots = build_instrument_snapshots(orders, &instruments);
assert_eq!(snapshots.len(), 1);
assert_eq!(snapshots[0].instrument.name().as_str(), "AAPL");
}
#[test]
fn test_recover_fills_dedup_key_matches_ws_path() {
let order_id = "ord-1";
let ws_keys: Vec<SmolStr> = ["1", "2"]
.iter()
.filter_map(|filled_qty| {
let update = AlpacaTradeUpdate {
event: SmolStr::new("partial_fill"),
order: make_order_ws(order_id, "SPY", "buy", filled_qty),
price: Some("150.00"),
qty: Some("1"),
timestamp: None,
};
let event = convert_trade_update(update)?;
fill_dedup_key_from_event(&event).cloned()
})
.collect();
let mut cumulative = Decimal::ZERO;
let rest_keys: Vec<SmolStr> = ["1", "1"]
.iter()
.map(|exec_qty| {
cumulative += Decimal::from_str(exec_qty).unwrap();
format_smolstr!("{}:{}", order_id, cumulative.normalize())
})
.collect();
assert_eq!(
ws_keys, rest_keys,
"REST recovery dedup keys must match WS path keys for cross-source dedup to work"
);
assert_eq!(ws_keys[0].as_str(), "ord-1:1");
assert_eq!(ws_keys[1].as_str(), "ord-1:2");
}
#[test]
fn early_dedup_key_matches_full_event_path() {
let update = AlpacaTradeUpdate {
event: SmolStr::new("fill"),
order: make_order_ws("ord-abc", "SPY", "buy", "5"),
price: Some("150.00"),
qty: Some("5"),
timestamp: None,
};
let early_key = early_dedup_key(&update);
let event = convert_trade_update(update).expect("fill should produce an event");
let full_key =
fill_dedup_key_from_event(&event).expect("fill event should have a dedup key");
assert_eq!(
early_key.as_str(),
full_key.as_str(),
"early_dedup_key must produce the same key as the full event path"
);
assert_eq!(early_key.as_str(), "ord-abc:5");
}
#[test]
fn early_dedup_key_normalizes_decimal_strings() {
let update1 = AlpacaTradeUpdate {
event: SmolStr::new("fill"),
order: AlpacaOrderWs {
id: SmolStr::new("ord-x"),
client_order_id: Some(SmolStr::new("cid")),
symbol: SmolStr::new("AAPL"),
qty: Some("10"),
filled_qty: Some("1.00"),
side: SmolStr::new("buy"),
order_type: SmolStr::new("market"),
time_in_force: SmolStr::new("day"),
limit_price: None,
stop_price: None,
trail_percent: None,
trail_price: None,
status: SmolStr::new("filled"),
},
price: Some("100.00"),
qty: Some("10"),
timestamp: None,
};
assert_eq!(early_dedup_key(&update1).as_str(), "ord-x:1");
let update2 = AlpacaTradeUpdate {
event: SmolStr::new("fill"),
order: AlpacaOrderWs {
id: SmolStr::new("ord-x"),
client_order_id: Some(SmolStr::new("cid")),
symbol: SmolStr::new("AAPL"),
qty: Some("10"),
filled_qty: Some("1"),
side: SmolStr::new("buy"),
order_type: SmolStr::new("market"),
time_in_force: SmolStr::new("day"),
limit_price: None,
stop_price: None,
trail_percent: None,
trail_price: None,
status: SmolStr::new("filled"),
},
price: Some("100.00"),
qty: Some("10"),
timestamp: None,
};
assert_eq!(early_dedup_key(&update2).as_str(), "ord-x:1");
let update3 = AlpacaTradeUpdate {
event: SmolStr::new("fill"),
order: AlpacaOrderWs {
id: SmolStr::new("ord-x"),
client_order_id: Some(SmolStr::new("cid")),
symbol: SmolStr::new("AAPL"),
qty: Some("10"),
filled_qty: Some("1.0"),
side: SmolStr::new("buy"),
order_type: SmolStr::new("market"),
time_in_force: SmolStr::new("day"),
limit_price: None,
stop_price: None,
trail_percent: None,
trail_price: None,
status: SmolStr::new("filled"),
},
price: Some("100.00"),
qty: Some("10"),
timestamp: None,
};
assert_eq!(early_dedup_key(&update3).as_str(), "ord-x:1");
}
#[test]
fn process_ws_text_rejected_event_without_filled_qty_is_not_dropped() {
let (tx, mut rx) = mpsc::unbounded_channel();
let dedup = new_dedup_cache();
let mut backoff = ExponentialBackoff::new();
let json = r#"{"stream":"trade_updates","data":{"event":"rejected","order":{"id":"test-rej-id","client_order_id":"test-cid","symbol":"AAPL","qty":"10","side":"buy","type":"limit","time_in_force":"day","limit_price":"100.00","status":"rejected"}}}"#;
process_ws_text(json, &tx, &dedup, &mut backoff);
let event = rx.try_recv()
.expect("rejected event without filled_qty must produce an AccountEvent, not be silently dropped");
assert!(
matches!(event.kind, AccountEventKind::OrderCancelled(_)),
"rejected event must map to OrderCancelled, got: {:?}",
event.kind
);
}
#[test]
fn decimal_zero_normalize_is_zero_str() {
assert_eq!(Decimal::ZERO.normalize().to_string(), "0");
}
#[test]
fn decimal_normalize_strips_trailing_zeros() {
let from_rest = Decimal::from_str("1.00").unwrap().normalize();
let from_ws = Decimal::from_str("1").unwrap().normalize();
assert_eq!(from_rest.to_string(), from_ws.to_string());
assert_eq!(from_rest.to_string(), "1");
assert_eq!(
Decimal::from_str("100.000")
.unwrap()
.normalize()
.to_string(),
"100"
);
assert_eq!(
Decimal::from_str("0.10").unwrap().normalize().to_string(),
"0.1"
);
assert_eq!(
Decimal::from_str("0.100").unwrap().normalize().to_string(),
"0.1"
);
}
#[test]
fn convert_account_to_balances_zero_options_buying_power_falls_back_to_buying_power() {
let account = AlpacaAccount {
equity: "12000.00".into(),
buying_power: "10000.00".into(),
options_buying_power: Some("0.00".into()), crypto_buying_power: None,
};
let balances = convert_account_to_balances(&account, &[]);
assert_eq!(balances.len(), 1);
assert_eq!(
balances[0].balance.free,
Decimal::from_str("10000.00").unwrap(),
"zero options_buying_power must fall back to buying_power, not report free=0"
);
}
#[test]
fn map_position_intent_open_buy_maps_to_buy_to_open() {
assert_eq!(
map_position_intent(Side::Buy, false),
AlpacaPositionIntent::BuyToOpen
);
}
#[test]
fn map_position_intent_open_sell_maps_to_sell_to_open() {
assert_eq!(
map_position_intent(Side::Sell, false),
AlpacaPositionIntent::SellToOpen
);
}
#[test]
fn map_position_intent_reduce_buy_maps_to_buy_to_close() {
assert_eq!(
map_position_intent(Side::Buy, true),
AlpacaPositionIntent::BuyToClose
);
}
#[test]
fn map_position_intent_reduce_sell_maps_to_sell_to_close() {
assert_eq!(
map_position_intent(Side::Sell, true),
AlpacaPositionIntent::SellToClose
);
}
#[test]
fn parse_order_error_401_maps_to_unauthenticated() {
assert!(matches!(
parse_order_error(reqwest::StatusCode::UNAUTHORIZED, "bad credentials"),
UnindexedOrderError::Rejected(ApiError::Unauthenticated(_))
));
}
#[test]
fn parse_order_error_403_maps_to_unauthenticated() {
assert!(matches!(
parse_order_error(reqwest::StatusCode::FORBIDDEN, "account suspended"),
UnindexedOrderError::Rejected(ApiError::Unauthenticated(_))
));
}
#[test]
fn parse_order_error_404_maps_to_order_rejected_with_not_found_prefix() {
let err = parse_order_error(reqwest::StatusCode::NOT_FOUND, "order not found");
let UnindexedOrderError::Rejected(ApiError::OrderRejected(msg)) = err else {
panic!("expected OrderRejected, got {err:?}");
};
assert!(
msg.contains("order not found"),
"message should contain 'order not found': {msg}"
);
}
#[test]
fn parse_order_error_422_insufficient_only_maps_to_balance_insufficient() {
assert!(matches!(
parse_order_error(
reqwest::StatusCode::UNPROCESSABLE_ENTITY,
"insufficient funds for this order"
),
UnindexedOrderError::Rejected(ApiError::BalanceInsufficient(_, _))
));
}
#[test]
fn parse_order_error_429_maps_to_rate_limit() {
assert!(matches!(
parse_order_error(reqwest::StatusCode::TOO_MANY_REQUESTS, "rate limited"),
UnindexedOrderError::Rejected(ApiError::RateLimit)
));
}
#[test]
fn parse_time_in_force_unknown_value_falls_back_to_good_until_end_of_day() {
assert_eq!(
parse_time_in_force("opg"),
TimeInForce::GoodUntilEndOfDay,
"unknown TIF must fall back to GoodUntilEndOfDay (with a warn! in production)"
);
}
#[test]
fn build_instrument_snapshots_output_order_matches_instruments_slice() {
let orders = vec![
make_order_response("o1", "SPY"),
make_order_response("o2", "AAPL"),
make_order_response("o3", "MSFT"),
];
let instruments = vec![
InstrumentNameExchange::new("MSFT"),
InstrumentNameExchange::new("AAPL"),
];
let snapshots = build_instrument_snapshots(orders, &instruments);
assert_eq!(snapshots.len(), 2);
assert_eq!(
snapshots[0].instrument.name().as_str(),
"MSFT",
"first snapshot must be MSFT (first in instruments slice)"
);
assert_eq!(
snapshots[1].instrument.name().as_str(),
"AAPL",
"second snapshot must be AAPL (second in instruments slice)"
);
}
mod http_tests {
use super::super::*;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate};
struct Sequential {
call: std::sync::atomic::AtomicU32,
pages: Vec<serde_json::Value>,
}
impl Sequential {
fn new(pages: Vec<serde_json::Value>) -> Self {
Self {
call: std::sync::atomic::AtomicU32::new(0),
pages,
}
}
}
impl Respond for Sequential {
fn respond(&self, _: &Request) -> ResponseTemplate {
let i = self.call.fetch_add(1, std::sync::atomic::Ordering::Relaxed) as usize;
let body = self.pages.get(i).unwrap_or_else(|| {
panic!(
"Sequential: request #{i} has no configured response \
(only {} page(s) supplied)",
self.pages.len()
)
});
ResponseTemplate::new(200).set_body_json(body)
}
}
fn make_activities_json(count: usize, id_prefix: &str) -> serde_json::Value {
serde_json::Value::Array(
(0..count)
.map(|i| {
serde_json::json!({
"id": format!("{id_prefix}-{i:05}"),
"order_id": "ord-1",
"symbol": "SPY",
"side": "buy",
"price": "100.00",
"qty": "1",
"transaction_time": "2025-04-18T14:30:00Z"
})
})
.collect(),
)
}
fn make_orders_json(count: usize) -> serde_json::Value {
serde_json::Value::Array(
(0..count)
.map(|i| {
serde_json::json!({
"id": format!("order-{i:05}"),
"client_order_id": null,
"symbol": "SPY",
"qty": "1",
"filled_qty": "0",
"side": "buy",
"type": "limit",
"time_in_force": "day",
"limit_price": "100.00",
"created_at": "2025-04-18T14:30:00Z"
})
})
.collect(),
)
}
#[tokio::test]
async fn paginate_activities_single_page_below_max_returns_all_not_truncated() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/v2/account/activities"))
.respond_with(
ResponseTemplate::new(200).set_body_json(make_activities_json(5, "act")),
)
.mount(&server)
.await;
let http = reqwest::Client::new();
let rl = RateLimitTracker::new();
let result = paginate_activities(&http, &rl, &server.uri(), "2025-01-01T00:00:00Z")
.await
.unwrap();
assert_eq!(result.activities.len(), 5);
assert!(!result.truncated);
assert_eq!(server.received_requests().await.unwrap().len(), 1);
}
#[tokio::test]
async fn paginate_activities_exactly_page_size_items_fetches_second_page() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/v2/account/activities"))
.respond_with(Sequential::new(vec![
make_activities_json(ALPACA_MAX_ACTIVITIES, "act"),
serde_json::json!([]), ]))
.mount(&server)
.await;
let http = reqwest::Client::new();
let rl = RateLimitTracker::new();
let result = paginate_activities(&http, &rl, &server.uri(), "2025-01-01T00:00:00Z")
.await
.unwrap();
assert_eq!(result.activities.len(), ALPACA_MAX_ACTIVITIES);
assert!(!result.truncated);
assert_eq!(
server.received_requests().await.unwrap().len(),
2,
"exactly 2 requests: first full page + second empty page"
);
}
#[tokio::test]
async fn paginate_activities_two_pages_returns_combined_activities_not_truncated() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/v2/account/activities"))
.respond_with(Sequential::new(vec![
make_activities_json(ALPACA_MAX_ACTIVITIES, "p1"),
make_activities_json(37, "p2"),
]))
.mount(&server)
.await;
let http = reqwest::Client::new();
let rl = RateLimitTracker::new();
let result = paginate_activities(&http, &rl, &server.uri(), "2025-01-01T00:00:00Z")
.await
.unwrap();
assert_eq!(result.activities.len(), ALPACA_MAX_ACTIVITIES + 37);
assert!(!result.truncated);
assert_eq!(server.received_requests().await.unwrap().len(), 2);
}
#[tokio::test]
async fn paginate_activities_at_max_pages_sets_truncated_true() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/v2/account/activities"))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(make_activities_json(ALPACA_MAX_ACTIVITIES, "act")),
)
.mount(&server)
.await;
let http = reqwest::Client::new();
let rl = RateLimitTracker::new();
let result = paginate_activities(&http, &rl, &server.uri(), "2025-01-01T00:00:00Z")
.await
.unwrap();
assert!(
result.truncated,
"must be truncated after MAX_ACTIVITY_PAGES pages"
);
assert_eq!(
result.activities.len(),
MAX_ACTIVITY_PAGES * ALPACA_MAX_ACTIVITIES,
"must accumulate exactly MAX_ACTIVITY_PAGES * page_size activities"
);
assert_eq!(
server.received_requests().await.unwrap().len(),
MAX_ACTIVITY_PAGES,
"loop must issue exactly MAX_ACTIVITY_PAGES requests then stop"
);
}
#[tokio::test]
async fn fetch_raw_open_orders_499_results_returns_ok() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/v2/orders"))
.respond_with(
ResponseTemplate::new(200).set_body_json(make_orders_json(MAX_OPEN_ORDERS - 1)),
)
.mount(&server)
.await;
let http = reqwest::Client::new();
let rl = RateLimitTracker::new();
let result = fetch_raw_open_orders(&http, &rl, &server.uri(), &[]).await;
assert!(
result.is_ok(),
"499 orders must not trigger truncation: {result:?}"
);
assert_eq!(result.unwrap().len(), MAX_OPEN_ORDERS - 1);
}
#[tokio::test]
async fn fetch_raw_open_orders_500_results_returns_truncated_snapshot_error() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/v2/orders"))
.respond_with(
ResponseTemplate::new(200).set_body_json(make_orders_json(MAX_OPEN_ORDERS)),
)
.mount(&server)
.await;
let http = reqwest::Client::new();
let rl = RateLimitTracker::new();
let result = fetch_raw_open_orders(&http, &rl, &server.uri(), &[]).await;
assert!(
matches!(
result,
Err(UnindexedClientError::TruncatedSnapshot { limit }) if limit == MAX_OPEN_ORDERS
),
"500 orders must return TruncatedSnapshot, got: {result:?}"
);
}
#[tokio::test]
async fn open_order_reduce_only_sell_sends_sell_to_close_intent() {
use crate::client::ExecutionClient;
use crate::order::request::{OrderRequestOpen, RequestOpen};
use crate::order::{
OrderKey, OrderKind, TimeInForce,
id::{ClientOrderId, StrategyId},
};
use rust_decimal::Decimal;
use rustrade_instrument::Side;
use rustrade_instrument::exchange::ExchangeId;
use rustrade_instrument::instrument::name::InstrumentNameExchange;
use wiremock::matchers::{method, path};
let server = MockServer::start().await;
let captured_body = std::sync::Arc::new(parking_lot::Mutex::new(None));
let captured_clone = captured_body.clone();
Mock::given(method("POST"))
.and(path("/v2/orders"))
.respond_with(move |req: &Request| {
let body: serde_json::Value = serde_json::from_slice(&req.body).unwrap();
*captured_clone.lock() = Some(body);
ResponseTemplate::new(200).set_body_json(serde_json::json!({
"id": "test-order-id",
"client_order_id": "test-cid",
"symbol": "AAPL",
"qty": "10",
"filled_qty": "0",
"side": "sell",
"type": "market",
"time_in_force": "ioc",
"limit_price": null,
"created_at": "2025-04-18T14:30:00Z"
}))
})
.mount(&server)
.await;
let config =
AlpacaConfig::with_base_url("test-key".into(), "test-secret".into(), server.uri());
let client = AlpacaClient::new(config);
let request = OrderRequestOpen {
key: OrderKey {
exchange: ExchangeId::AlpacaBroker,
instrument: InstrumentNameExchange::new("AAPL"),
strategy: StrategyId::new("test-strategy"),
cid: ClientOrderId::new("test-cid"),
},
state: RequestOpen {
side: Side::Sell,
price: None,
quantity: Decimal::new(10, 0),
kind: OrderKind::Market,
time_in_force: TimeInForce::ImmediateOrCancel,
position_id: None,
reduce_only: true, },
};
let result = client
.open_order(OrderRequestOpen {
key: OrderKey {
exchange: request.key.exchange,
instrument: &request.key.instrument,
strategy: request.key.strategy.clone(),
cid: request.key.cid.clone(),
},
state: request.state.clone(),
})
.await;
assert!(result.is_some(), "open_order should return a result");
let order = result.unwrap();
assert!(
order.state.is_accepted(),
"order should be accepted: {:?}",
order.state
);
let body = captured_body
.lock()
.take()
.expect("request body should be captured");
assert_eq!(
body.get("position_intent").and_then(|v| v.as_str()),
Some("sell_to_close"),
"reduce_only=true + Side::Sell should produce position_intent=sell_to_close, got: {body}"
);
}
#[tokio::test]
async fn open_order_not_reduce_only_buy_sends_buy_to_open_intent() {
use crate::client::ExecutionClient;
use crate::order::request::{OrderRequestOpen, RequestOpen};
use crate::order::{
OrderKey, OrderKind, TimeInForce,
id::{ClientOrderId, StrategyId},
};
use rust_decimal::Decimal;
use rustrade_instrument::Side;
use rustrade_instrument::exchange::ExchangeId;
use rustrade_instrument::instrument::name::InstrumentNameExchange;
let server = MockServer::start().await;
let captured_body = std::sync::Arc::new(parking_lot::Mutex::new(None));
let captured_clone = captured_body.clone();
Mock::given(method("POST"))
.and(path("/v2/orders"))
.respond_with(move |req: &Request| {
let body: serde_json::Value = serde_json::from_slice(&req.body).unwrap();
*captured_clone.lock() = Some(body);
ResponseTemplate::new(200).set_body_json(serde_json::json!({
"id": "test-order-id",
"client_order_id": "test-cid",
"symbol": "AAPL",
"qty": "10",
"filled_qty": "0",
"side": "buy",
"type": "market",
"time_in_force": "ioc",
"limit_price": null,
"created_at": "2025-04-18T14:30:00Z"
}))
})
.mount(&server)
.await;
let config =
AlpacaConfig::with_base_url("test-key".into(), "test-secret".into(), server.uri());
let client = AlpacaClient::new(config);
let instrument = InstrumentNameExchange::new("AAPL");
let request = OrderRequestOpen {
key: OrderKey {
exchange: ExchangeId::AlpacaBroker,
instrument: &instrument,
strategy: StrategyId::new("test-strategy"),
cid: ClientOrderId::new("test-cid"),
},
state: RequestOpen {
side: Side::Buy,
price: None,
quantity: Decimal::new(10, 0),
kind: OrderKind::Market,
time_in_force: TimeInForce::ImmediateOrCancel,
position_id: None,
reduce_only: false, },
};
let result = client.open_order(request).await;
assert!(result.is_some(), "open_order should return a result");
let order = result.unwrap();
assert!(
order.state.is_accepted(),
"order should be accepted: {:?}",
order.state
);
let body = captured_body
.lock()
.take()
.expect("request body should be captured");
assert_eq!(
body.get("position_intent").and_then(|v| v.as_str()),
Some("buy_to_open"),
"reduce_only=false + Side::Buy should produce position_intent=buy_to_open, got: {body}"
);
}
}
}