#[cfg(any(feature = "async", feature = "blocking"))]
mod client;
mod constants;
mod convert;
pub mod decode;
mod payload;
mod wire;
use std::num::NonZeroU32;
use std::time::{Duration, Instant};
#[cfg(any(feature = "async", feature = "blocking"))]
use reqwest::{StatusCode, header::HeaderMap};
use thiserror::Error;
use crate::models::{
BoardId, EngineName, IndexId, MarketName, ParseBoardError, ParseCandleBorderError,
ParseCandleError, ParseEngineError, ParseEventError, ParseHistoryDatesError,
ParseHistoryRecordError, ParseIndexAnalyticsError, ParseIndexError, ParseMarketError,
ParseOrderbookError, ParseSecStatError, ParseSecurityBoardError, ParseSecurityError,
ParseSecuritySnapshotError, ParseSiteNewsError, ParseTradeError, ParseTurnoverError, SecId,
};
#[cfg(all(feature = "async", feature = "history"))]
pub use client::AsyncHistoryPages;
#[cfg(all(feature = "blocking", feature = "history"))]
pub use client::HistoryPages;
#[cfg(feature = "async")]
pub use client::{
AsyncCandlesPages, AsyncGlobalSecuritiesPages, AsyncIndexAnalyticsPages,
AsyncMarketSecuritiesPages, AsyncMarketTradesPages, AsyncMoexClient, AsyncMoexClientBuilder,
AsyncOwnedBoardScope, AsyncOwnedEngineScope, AsyncOwnedIndexScope, AsyncOwnedMarketScope,
AsyncOwnedMarketSecurityScope, AsyncOwnedSecurityResourceScope, AsyncOwnedSecurityScope,
AsyncRawIssRequestBuilder, AsyncSecStatsPages, AsyncSecuritiesPages, AsyncTradesPages,
};
#[cfg(all(feature = "async", feature = "news"))]
pub use client::{AsyncEventsPages, AsyncSiteNewsPages};
#[cfg(feature = "blocking")]
pub use client::{
CandlesPages, GlobalSecuritiesPages, IndexAnalyticsPages, MarketSecuritiesPages,
MarketTradesPages, OwnedBoardScope, OwnedEngineScope, OwnedIndexScope, OwnedMarketScope,
OwnedMarketSecurityScope, OwnedSecurityResourceScope, OwnedSecurityScope, RawIssRequestBuilder,
SecStatsPages, SecuritiesPages, TradesPages,
};
#[cfg(all(feature = "blocking", feature = "news"))]
pub use client::{EventsPages, SiteNewsPages};
#[cfg(feature = "blocking")]
pub type BlockingMoexClient = client::BlockingMoexClient;
#[cfg(feature = "blocking")]
pub type BlockingMoexClientBuilder = client::BlockingMoexClientBuilder;
#[derive(Debug, Clone, Copy)]
pub enum IssEndpoint<'a> {
Indexes,
IndexAnalytics { indexid: &'a IndexId },
Turnovers,
EngineTurnovers { engine: &'a EngineName },
Engines,
Markets { engine: &'a EngineName },
Boards {
engine: &'a EngineName,
market: &'a MarketName,
},
GlobalSecurities,
SecurityInfo { security: &'a SecId },
SecurityBoards { security: &'a SecId },
MarketSecurities {
engine: &'a EngineName,
market: &'a MarketName,
},
MarketSecurityInfo {
engine: &'a EngineName,
market: &'a MarketName,
security: &'a SecId,
},
MarketOrderbook {
engine: &'a EngineName,
market: &'a MarketName,
},
MarketTrades {
engine: &'a EngineName,
market: &'a MarketName,
},
SecStats {
engine: &'a EngineName,
market: &'a MarketName,
},
Securities {
engine: &'a EngineName,
market: &'a MarketName,
board: &'a BoardId,
},
BoardSecuritySnapshots {
engine: &'a EngineName,
market: &'a MarketName,
board: &'a BoardId,
},
Orderbook {
engine: &'a EngineName,
market: &'a MarketName,
board: &'a BoardId,
security: &'a SecId,
},
Trades {
engine: &'a EngineName,
market: &'a MarketName,
board: &'a BoardId,
security: &'a SecId,
},
Candles {
engine: &'a EngineName,
market: &'a MarketName,
board: &'a BoardId,
security: &'a SecId,
},
CandleBorders {
engine: &'a EngineName,
market: &'a MarketName,
security: &'a SecId,
},
#[cfg(feature = "news")]
SiteNews,
#[cfg(feature = "news")]
Events,
#[cfg(feature = "history")]
HistoryDates {
engine: &'a EngineName,
market: &'a MarketName,
board: &'a BoardId,
security: &'a SecId,
},
#[cfg(feature = "history")]
History {
engine: &'a EngineName,
market: &'a MarketName,
board: &'a BoardId,
security: &'a SecId,
},
}
impl IssEndpoint<'_> {
pub fn path(self) -> String {
match self {
Self::Indexes => constants::INDEXES_ENDPOINT.to_owned(),
Self::IndexAnalytics { indexid } => constants::index_analytics_endpoint(indexid),
Self::Turnovers => constants::TURNOVERS_ENDPOINT.to_owned(),
Self::EngineTurnovers { engine } => constants::engine_turnovers_endpoint(engine),
Self::Engines => constants::ENGINES_ENDPOINT.to_owned(),
Self::Markets { engine } => constants::markets_endpoint(engine),
Self::Boards { engine, market } => constants::boards_endpoint(engine, market),
Self::GlobalSecurities => constants::GLOBAL_SECURITIES_ENDPOINT.to_owned(),
Self::SecurityInfo { security } | Self::SecurityBoards { security } => {
constants::security_endpoint(security)
}
Self::MarketSecurities { engine, market } => {
constants::market_securities_endpoint(engine, market)
}
Self::MarketSecurityInfo {
engine,
market,
security,
} => constants::market_security_endpoint(engine, market, security),
Self::MarketOrderbook { engine, market } => {
constants::market_orderbook_endpoint(engine, market)
}
Self::MarketTrades { engine, market } => {
constants::market_trades_endpoint(engine, market)
}
Self::SecStats { engine, market } => constants::secstats_endpoint(engine, market),
Self::Securities {
engine,
market,
board,
}
| Self::BoardSecuritySnapshots {
engine,
market,
board,
} => constants::securities_endpoint(engine, market, board),
Self::Orderbook {
engine,
market,
board,
security,
} => constants::orderbook_endpoint(engine, market, board, security),
Self::Trades {
engine,
market,
board,
security,
} => constants::trades_endpoint(engine, market, board, security),
Self::Candles {
engine,
market,
board,
security,
} => constants::candles_endpoint(engine, market, board, security),
Self::CandleBorders {
engine,
market,
security,
} => constants::candleborders_endpoint(engine, market, security),
#[cfg(feature = "news")]
Self::SiteNews => constants::SITENEWS_ENDPOINT.to_owned(),
#[cfg(feature = "news")]
Self::Events => constants::EVENTS_ENDPOINT.to_owned(),
#[cfg(feature = "history")]
Self::HistoryDates {
engine,
market,
board,
security,
} => constants::history_dates_endpoint(engine, market, board, security),
#[cfg(feature = "history")]
Self::History {
engine,
market,
board,
security,
} => constants::history_endpoint(engine, market, board, security),
}
}
pub fn default_table(self) -> Option<&'static str> {
match self {
Self::Indexes => Some("indices"),
Self::IndexAnalytics { .. } => Some("analytics"),
Self::Turnovers | Self::EngineTurnovers { .. } => Some("turnovers"),
Self::Engines => Some("engines"),
Self::Markets { .. } => Some("markets"),
Self::Boards { .. } | Self::SecurityBoards { .. } => Some("boards"),
Self::GlobalSecurities
| Self::SecurityInfo { .. }
| Self::MarketSecurities { .. }
| Self::MarketSecurityInfo { .. }
| Self::Securities { .. } => Some("securities"),
Self::BoardSecuritySnapshots { .. } => Some("securities,marketdata"),
Self::Orderbook { .. } | Self::MarketOrderbook { .. } => Some("orderbook"),
Self::Trades { .. } | Self::MarketTrades { .. } => Some("trades"),
Self::Candles { .. } => Some("candles"),
Self::CandleBorders { .. } => Some("borders"),
Self::SecStats { .. } => Some("secstats"),
#[cfg(feature = "news")]
Self::SiteNews => Some("sitenews"),
#[cfg(feature = "news")]
Self::Events => Some("events"),
#[cfg(feature = "history")]
Self::HistoryDates { .. } => Some("dates"),
#[cfg(feature = "history")]
Self::History { .. } => Some("history"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RetryPolicy {
max_attempts: NonZeroU32,
delay: Duration,
}
impl RetryPolicy {
pub fn new(max_attempts: NonZeroU32) -> Self {
Self {
max_attempts,
delay: Duration::from_millis(400),
}
}
pub fn with_delay(mut self, delay: Duration) -> Self {
self.delay = delay;
self
}
pub fn max_attempts(self) -> NonZeroU32 {
self.max_attempts
}
pub fn delay(self) -> Duration {
self.delay
}
}
impl Default for RetryPolicy {
fn default() -> Self {
Self::new(NonZeroU32::new(3).expect("retry policy default attempts must be non-zero"))
}
}
pub fn with_retry<T, F>(policy: RetryPolicy, mut action: F) -> Result<T, MoexError>
where
F: FnMut() -> Result<T, MoexError>,
{
let mut attempts_left = policy.max_attempts().get();
loop {
match action() {
Ok(value) => return Ok(value),
Err(error) if attempts_left > 1 && error.is_retryable() => {
attempts_left -= 1;
std::thread::sleep(policy.delay());
}
Err(error) => return Err(error),
}
}
}
#[cfg(feature = "async")]
pub async fn with_retry_async<T, F, Fut, S, SleepFut>(
policy: RetryPolicy,
mut action: F,
mut sleep: S,
) -> Result<T, MoexError>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, MoexError>>,
S: FnMut(Duration) -> SleepFut,
SleepFut: std::future::Future<Output = ()>,
{
let mut attempts_left = policy.max_attempts().get();
loop {
match action().await {
Ok(value) => return Ok(value),
Err(error) if attempts_left > 1 && error.is_retryable() => {
attempts_left -= 1;
sleep(policy.delay()).await;
}
Err(error) => return Err(error),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RateLimit {
min_interval: Duration,
}
impl RateLimit {
pub fn every(min_interval: Duration) -> Self {
Self { min_interval }
}
pub fn per_second(requests_per_second: NonZeroU32) -> Self {
let per_second_nanos: u128 = 1_000_000_000;
let requests = u128::from(requests_per_second.get());
let nanos = per_second_nanos.div_ceil(requests);
let nanos = u64::try_from(nanos).unwrap_or(u64::MAX);
Self::every(Duration::from_nanos(nanos))
}
pub fn min_interval(self) -> Duration {
self.min_interval
}
}
#[derive(Debug, Clone)]
pub struct RateLimiter {
limit: RateLimit,
next_allowed_at: Option<Instant>,
}
impl RateLimiter {
pub fn new(limit: RateLimit) -> Self {
Self {
limit,
next_allowed_at: None,
}
}
pub fn limit(&self) -> RateLimit {
self.limit
}
pub fn reserve_delay(&mut self) -> Duration {
self.reserve_delay_at(Instant::now())
}
fn reserve_delay_at(&mut self, now: Instant) -> Duration {
let scheduled_at = match self.next_allowed_at {
Some(next_allowed_at) if next_allowed_at > now => next_allowed_at,
_ => now,
};
let delay = scheduled_at.saturating_duration_since(now);
self.next_allowed_at = Some(scheduled_at + self.limit.min_interval);
delay
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum IssToggle {
#[default]
Off,
On,
}
impl IssToggle {
pub const fn as_query_value(self) -> &'static str {
match self {
Self::Off => "off",
Self::On => "on",
}
}
}
impl From<bool> for IssToggle {
fn from(value: bool) -> Self {
if value { Self::On } else { Self::Off }
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct IssRequestOptions {
metadata: Option<IssToggle>,
data: Option<IssToggle>,
version: Option<IssToggle>,
json: Option<Box<str>>,
}
impl IssRequestOptions {
pub fn new() -> Self {
Self::default()
}
pub fn metadata(mut self, metadata: IssToggle) -> Self {
self.metadata = Some(metadata);
self
}
pub fn data(mut self, data: IssToggle) -> Self {
self.data = Some(data);
self
}
pub fn version(mut self, version: IssToggle) -> Self {
self.version = Some(version);
self
}
pub fn json(mut self, json: impl Into<String>) -> Self {
self.json = Some(json.into().into_boxed_str());
self
}
pub fn metadata_value(&self) -> Option<IssToggle> {
self.metadata
}
pub fn data_value(&self) -> Option<IssToggle> {
self.data
}
pub fn version_value(&self) -> Option<IssToggle> {
self.version
}
pub fn json_value(&self) -> Option<&str> {
self.json.as_deref()
}
}
#[derive(Debug, Clone)]
#[cfg(any(feature = "async", feature = "blocking"))]
pub struct RawIssResponse {
status: StatusCode,
headers: HeaderMap,
body: String,
}
#[cfg(any(feature = "async", feature = "blocking"))]
impl RawIssResponse {
pub(crate) fn new(status: StatusCode, headers: HeaderMap, body: String) -> Self {
Self {
status,
headers,
body,
}
}
pub fn status(&self) -> StatusCode {
self.status
}
pub fn headers(&self) -> &HeaderMap {
&self.headers
}
pub fn body(&self) -> &str {
&self.body
}
pub fn into_parts(self) -> (StatusCode, HeaderMap, String) {
(self.status, self.headers, self.body)
}
}
pub fn with_rate_limit<T, F>(limiter: &mut RateLimiter, action: F) -> T
where
F: FnOnce() -> T,
{
let delay = limiter.reserve_delay();
if !delay.is_zero() {
std::thread::sleep(delay);
}
action()
}
#[cfg(feature = "async")]
pub async fn with_rate_limit_async<T, F, Fut, S, SleepFut>(
limiter: &mut RateLimiter,
action: F,
mut sleep: S,
) -> T
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = T>,
S: FnMut(Duration) -> SleepFut,
SleepFut: std::future::Future<Output = ()>,
{
let delay = limiter.reserve_delay();
if !delay.is_zero() {
sleep(delay).await;
}
action().await
}
#[derive(Debug, Error)]
pub enum MoexError {
#[error("invalid base URL '{base_url}': {reason}")]
InvalidBaseUrl {
base_url: &'static str,
reason: String,
},
#[cfg(any(feature = "async", feature = "blocking"))]
#[error("failed to build HTTP client: {source}")]
BuildHttpClient {
#[source]
source: reqwest::Error,
},
#[error(
"async rate limit requires sleep function; set AsyncMoexClientBuilder::rate_limit_sleep(...)"
)]
MissingAsyncRateLimitSleep,
#[error("failed to build URL for endpoint '{endpoint}': {reason}")]
EndpointUrl {
endpoint: Box<str>,
reason: String,
},
#[error("raw request path is not set")]
MissingRawPath,
#[error("invalid raw request path '{path}': {reason}")]
InvalidRawPath {
path: Box<str>,
reason: Box<str>,
},
#[error("raw endpoint '{endpoint}' does not contain table '{table}'")]
MissingRawTable {
endpoint: Box<str>,
table: Box<str>,
},
#[error(
"raw table '{table}' from endpoint '{endpoint}' has invalid row width at row {row}: expected {expected}, got {actual}"
)]
InvalidRawTableRowWidth {
endpoint: Box<str>,
table: Box<str>,
row: usize,
expected: usize,
actual: usize,
},
#[error("failed to decode raw table '{table}' row {row} from endpoint '{endpoint}': {source}")]
InvalidRawTableRow {
endpoint: Box<str>,
table: Box<str>,
row: usize,
#[source]
source: serde_json::Error,
},
#[cfg(any(feature = "async", feature = "blocking"))]
#[error("request to endpoint '{endpoint}' failed: {source}")]
Request {
endpoint: Box<str>,
#[source]
source: reqwest::Error,
},
#[cfg(any(feature = "async", feature = "blocking"))]
#[error(
"endpoint '{endpoint}' returned HTTP {status} (content-type={content_type:?}, prefix={body_prefix:?})"
)]
HttpStatus {
endpoint: Box<str>,
status: StatusCode,
content_type: Option<Box<str>>,
body_prefix: Box<str>,
},
#[cfg(any(feature = "async", feature = "blocking"))]
#[error("failed to read endpoint '{endpoint}' response body: {source}")]
ReadBody {
endpoint: Box<str>,
#[source]
source: reqwest::Error,
},
#[error("failed to decode endpoint '{endpoint}' JSON payload: {source}")]
Decode {
endpoint: Box<str>,
#[source]
source: serde_json::Error,
},
#[error(
"endpoint '{endpoint}' returned non-JSON payload (content-type={content_type:?}, prefix={body_prefix:?})"
)]
NonJsonPayload {
endpoint: Box<str>,
content_type: Option<Box<str>>,
body_prefix: Box<str>,
},
#[error("endpoint '{endpoint}' returned unexpected security rows count: {row_count}")]
UnexpectedSecurityRows {
endpoint: Box<str>,
row_count: usize,
},
#[error("endpoint '{endpoint}' returned unexpected history dates rows count: {row_count}")]
UnexpectedHistoryDatesRows {
endpoint: Box<str>,
row_count: usize,
},
#[error("invalid index row {row} from endpoint '{endpoint}': {source}")]
InvalidIndex {
endpoint: Box<str>,
row: usize,
#[source]
source: ParseIndexError,
},
#[error("invalid history dates row {row} from endpoint '{endpoint}': {source}")]
InvalidHistoryDates {
endpoint: Box<str>,
row: usize,
#[source]
source: ParseHistoryDatesError,
},
#[error("invalid history row {row} from endpoint '{endpoint}': {source}")]
InvalidHistory {
endpoint: Box<str>,
row: usize,
#[source]
source: ParseHistoryRecordError,
},
#[error("invalid turnover row {row} from endpoint '{endpoint}': {source}")]
InvalidTurnover {
endpoint: Box<str>,
row: usize,
#[source]
source: ParseTurnoverError,
},
#[error("invalid sitenews row {row} from endpoint '{endpoint}': {source}")]
InvalidSiteNews {
endpoint: Box<str>,
row: usize,
#[source]
source: ParseSiteNewsError,
},
#[error("invalid events row {row} from endpoint '{endpoint}': {source}")]
InvalidEvent {
endpoint: Box<str>,
row: usize,
#[source]
source: ParseEventError,
},
#[error("invalid secstats row {row} from endpoint '{endpoint}': {source}")]
InvalidSecStat {
endpoint: Box<str>,
row: usize,
#[source]
source: ParseSecStatError,
},
#[error("invalid index analytics row {row} from endpoint '{endpoint}': {source}")]
InvalidIndexAnalytics {
endpoint: Box<str>,
row: usize,
#[source]
source: ParseIndexAnalyticsError,
},
#[error("invalid engine row {row} from endpoint '{endpoint}': {source}")]
InvalidEngine {
endpoint: Box<str>,
row: usize,
#[source]
source: ParseEngineError,
},
#[error("invalid market row {row} from endpoint '{endpoint}': {source}")]
InvalidMarket {
endpoint: Box<str>,
row: usize,
#[source]
source: ParseMarketError,
},
#[error("invalid board row {row} from endpoint '{endpoint}': {source}")]
InvalidBoard {
endpoint: Box<str>,
row: usize,
#[source]
source: ParseBoardError,
},
#[error("invalid security board row {row} from endpoint '{endpoint}': {source}")]
InvalidSecurityBoard {
endpoint: Box<str>,
row: usize,
#[source]
source: ParseSecurityBoardError,
},
#[error("invalid security row {row} from endpoint '{endpoint}': {source}")]
InvalidSecurity {
endpoint: Box<str>,
row: usize,
#[source]
source: ParseSecurityError,
},
#[error("invalid security snapshot {table} row {row} from endpoint '{endpoint}': {source}")]
InvalidSecuritySnapshot {
endpoint: Box<str>,
table: &'static str,
row: usize,
#[source]
source: ParseSecuritySnapshotError,
},
#[error("invalid orderbook row {row} from endpoint '{endpoint}': {source}")]
InvalidOrderbook {
endpoint: Box<str>,
row: usize,
#[source]
source: ParseOrderbookError,
},
#[error("invalid candle border row {row} from endpoint '{endpoint}': {source}")]
InvalidCandleBorder {
endpoint: Box<str>,
row: usize,
#[source]
source: ParseCandleBorderError,
},
#[error("invalid candle row {row} from endpoint '{endpoint}': {source}")]
InvalidCandle {
endpoint: Box<str>,
row: usize,
#[source]
source: ParseCandleError,
},
#[error("invalid trade row {row} from endpoint '{endpoint}': {source}")]
InvalidTrade {
endpoint: Box<str>,
row: usize,
#[source]
source: ParseTradeError,
},
#[error(
"pagination overflow for endpoint '{endpoint}': start={start}, limit={limit} exceeds u32"
)]
PaginationOverflow {
endpoint: Box<str>,
start: u32,
limit: u32,
},
#[error(
"pagination is stuck for endpoint '{endpoint}': repeated page at start={start}, limit={limit}"
)]
PaginationStuck {
endpoint: Box<str>,
start: u32,
limit: u32,
},
}
impl MoexError {
pub fn is_retryable(&self) -> bool {
match self {
#[cfg(any(feature = "async", feature = "blocking"))]
Self::BuildHttpClient { .. } => false,
#[cfg(any(feature = "async", feature = "blocking"))]
Self::Request { source, .. } => {
source.is_timeout()
|| source.is_connect()
|| source.status().is_some_and(is_retryable_status)
}
#[cfg(any(feature = "async", feature = "blocking"))]
Self::ReadBody { .. } => true,
#[cfg(any(feature = "async", feature = "blocking"))]
Self::HttpStatus { status, .. } => is_retryable_status(*status),
_ => false,
}
}
#[cfg(any(feature = "async", feature = "blocking"))]
pub fn status_code(&self) -> Option<StatusCode> {
match self {
Self::Request { source, .. } => source.status(),
Self::HttpStatus { status, .. } => Some(*status),
_ => None,
}
}
pub fn response_body_prefix(&self) -> Option<&str> {
match self {
#[cfg(any(feature = "async", feature = "blocking"))]
Self::HttpStatus { body_prefix, .. } | Self::NonJsonPayload { body_prefix, .. } => {
Some(body_prefix)
}
#[cfg(not(any(feature = "async", feature = "blocking")))]
Self::NonJsonPayload { body_prefix, .. } => Some(body_prefix),
_ => None,
}
}
}
#[cfg(any(feature = "async", feature = "blocking"))]
fn is_retryable_status(status: StatusCode) -> bool {
status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error()
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RepeatPagePolicy {
Error,
}
#[cfg(test)]
mod tests;