use crate::error::{Result, ShapeError};
use pest::iterators::Pair;
use crate::ast::{
Span, Statement, StreamConfig, StreamDef, StreamOnError, StreamOnEvent, StreamOnWindow,
Timeframe, VariableDecl,
};
use crate::parser::string_literals::parse_string_literal;
use crate::parser::{Rule, pair_span, parse_variable_decl, statements};
pub fn parse_stream_def(pair: Pair<Rule>) -> Result<StreamDef> {
let mut name = String::new();
let mut name_span = Span::DUMMY;
let mut config = StreamConfig {
provider: String::new(),
symbols: vec![],
timeframes: vec![],
buffer_size: None,
reconnect: None,
reconnect_delay: None,
};
let mut state = vec![];
let mut on_connect = None;
let mut on_disconnect = None;
let mut on_event = None;
let mut on_window = None;
let mut on_error = None;
for inner in pair.into_inner() {
match inner.as_rule() {
Rule::ident => {
if name.is_empty() {
name = inner.as_str().to_string();
name_span = pair_span(&inner);
}
}
Rule::stream_body => {
for body_item in inner.into_inner() {
match body_item.as_rule() {
Rule::stream_config => {
config = parse_stream_config(body_item)?;
}
Rule::stream_state => {
state = parse_stream_state(body_item)?;
}
Rule::stream_on_connect => {
on_connect = Some(parse_stream_on_connect(body_item)?);
}
Rule::stream_on_disconnect => {
on_disconnect = Some(parse_stream_on_disconnect(body_item)?);
}
Rule::stream_on_event => {
on_event = Some(parse_stream_on_event(body_item)?);
}
Rule::stream_on_window => {
on_window = Some(parse_stream_on_window(body_item)?);
}
Rule::stream_on_error => {
on_error = Some(parse_stream_on_error(body_item)?);
}
_ => {}
}
}
}
_ => {}
}
}
Ok(StreamDef {
name,
name_span,
config,
state,
on_connect,
on_disconnect,
on_event,
on_window,
on_error,
})
}
fn parse_stream_config(pair: Pair<Rule>) -> Result<StreamConfig> {
let mut config = StreamConfig {
provider: String::new(),
symbols: vec![],
timeframes: vec![],
buffer_size: None,
reconnect: None,
reconnect_delay: None,
};
for inner in pair.into_inner() {
if inner.as_rule() == Rule::stream_config_list {
for config_item in inner.into_inner() {
if config_item.as_rule() == Rule::stream_config_item {
let config_str = config_item.as_str();
let inner_pairs = config_item.into_inner();
if config_str.starts_with("provider") {
for pair in inner_pairs {
if pair.as_rule() == Rule::string {
config.provider = parse_string_literal(pair.as_str())?;
}
}
} else if config_str.starts_with("symbols") {
for pair in inner_pairs {
if pair.as_rule() == Rule::symbol_list {
config.symbols = parse_symbol_list(pair)?;
}
}
} else if config_str.starts_with("timeframes") {
for pair in inner_pairs {
if pair.as_rule() == Rule::timeframe {
if let Some(tf) = Timeframe::parse(pair.as_str()) {
config.timeframes.push(tf);
}
}
}
} else if config_str.starts_with("buffer_size") {
for pair in inner_pairs {
if pair.as_rule() == Rule::integer {
config.buffer_size = Some(pair.as_str().parse().map_err(|e| {
ShapeError::ParseError {
message: format!("Invalid buffer_size: {}", e),
location: None,
}
})?);
}
}
} else if config_str.starts_with("reconnect:")
&& !config_str.starts_with("reconnect_delay")
{
for pair in inner_pairs {
if pair.as_rule() == Rule::boolean {
config.reconnect = Some(pair.as_str() == "true");
}
}
} else if config_str.starts_with("reconnect_delay") {
for pair in inner_pairs {
if pair.as_rule() == Rule::number {
config.reconnect_delay =
Some(pair.as_str().parse().map_err(|e| {
ShapeError::ParseError {
message: format!("Invalid reconnect_delay: {}", e),
location: None,
}
})?);
}
}
}
}
}
}
}
Ok(config)
}
fn parse_symbol_list(pair: Pair<Rule>) -> Result<Vec<String>> {
let mut symbols = vec![];
if pair.as_rule() == Rule::symbol_list {
for inner in pair.into_inner() {
if inner.as_rule() == Rule::ident {
symbols.push(inner.as_str().to_string());
}
}
}
Ok(symbols)
}
fn parse_stream_state(pair: Pair<Rule>) -> Result<Vec<VariableDecl>> {
let mut state = vec![];
for inner in pair.into_inner() {
if inner.as_rule() == Rule::stream_state_list {
for decl in inner.into_inner() {
if decl.as_rule() == Rule::variable_decl {
state.push(parse_variable_decl(decl)?);
}
}
}
}
Ok(state)
}
fn parse_stream_on_connect(pair: Pair<Rule>) -> Result<Vec<Statement>> {
statements::parse_statements(pair.into_inner())
}
fn parse_stream_on_disconnect(pair: Pair<Rule>) -> Result<Vec<Statement>> {
statements::parse_statements(pair.into_inner())
}
fn parse_stream_on_event(pair: Pair<Rule>) -> Result<StreamOnEvent> {
let mut inner_pairs = pair.into_inner();
let event_param = inner_pairs
.next()
.map(|p| p.as_str().to_string())
.unwrap_or_default();
let body = statements::parse_statements(inner_pairs)?;
Ok(StreamOnEvent { event_param, body })
}
fn parse_stream_on_window(pair: Pair<Rule>) -> Result<StreamOnWindow> {
let mut inner_pairs = pair.into_inner();
let key_param = inner_pairs
.next()
.map(|p| p.as_str().to_string())
.unwrap_or_default();
let window_param = inner_pairs
.next()
.map(|p| p.as_str().to_string())
.unwrap_or_default();
let body = statements::parse_statements(inner_pairs)?;
Ok(StreamOnWindow {
key_param,
window_param,
body,
})
}
fn parse_stream_on_error(pair: Pair<Rule>) -> Result<StreamOnError> {
let mut inner_pairs = pair.into_inner();
let error_param = inner_pairs
.next()
.map(|p| p.as_str().to_string())
.unwrap_or_default();
let body = statements::parse_statements(inner_pairs)?;
Ok(StreamOnError { error_param, body })
}