shape_ast/parser/
stream.rs1use crate::error::{Result, ShapeError};
4use pest::iterators::Pair;
5
6use crate::ast::{
7 Span, Statement, StreamConfig, StreamDef, StreamOnError, StreamOnEvent, StreamOnWindow,
8 Timeframe, VariableDecl,
9};
10use crate::parser::string_literals::parse_string_literal;
11use crate::parser::{Rule, pair_span, parse_variable_decl, statements};
12
13pub fn parse_stream_def(pair: Pair<Rule>) -> Result<StreamDef> {
15 let mut name = String::new();
16 let mut name_span = Span::DUMMY;
17 let mut config = StreamConfig {
18 provider: String::new(),
19 symbols: vec![],
20 timeframes: vec![],
21 buffer_size: None,
22 reconnect: None,
23 reconnect_delay: None,
24 };
25 let mut state = vec![];
26 let mut on_connect = None;
27 let mut on_disconnect = None;
28 let mut on_event = None;
29 let mut on_window = None;
30 let mut on_error = None;
31
32 for inner in pair.into_inner() {
33 match inner.as_rule() {
34 Rule::ident => {
35 if name.is_empty() {
36 name = inner.as_str().to_string();
37 name_span = pair_span(&inner);
38 }
39 }
40 Rule::stream_body => {
41 for body_item in inner.into_inner() {
42 match body_item.as_rule() {
43 Rule::stream_config => {
44 config = parse_stream_config(body_item)?;
45 }
46 Rule::stream_state => {
47 state = parse_stream_state(body_item)?;
48 }
49 Rule::stream_on_connect => {
50 on_connect = Some(parse_stream_on_connect(body_item)?);
51 }
52 Rule::stream_on_disconnect => {
53 on_disconnect = Some(parse_stream_on_disconnect(body_item)?);
54 }
55 Rule::stream_on_event => {
56 on_event = Some(parse_stream_on_event(body_item)?);
57 }
58 Rule::stream_on_window => {
59 on_window = Some(parse_stream_on_window(body_item)?);
60 }
61 Rule::stream_on_error => {
62 on_error = Some(parse_stream_on_error(body_item)?);
63 }
64 _ => {}
65 }
66 }
67 }
68 _ => {}
69 }
70 }
71
72 Ok(StreamDef {
73 name,
74 name_span,
75 config,
76 state,
77 on_connect,
78 on_disconnect,
79 on_event,
80 on_window,
81 on_error,
82 })
83}
84
85fn parse_stream_config(pair: Pair<Rule>) -> Result<StreamConfig> {
87 let mut config = StreamConfig {
88 provider: String::new(),
89 symbols: vec![],
90 timeframes: vec![],
91 buffer_size: None,
92 reconnect: None,
93 reconnect_delay: None,
94 };
95
96 for inner in pair.into_inner() {
97 if inner.as_rule() == Rule::stream_config_list {
98 for config_item in inner.into_inner() {
99 if config_item.as_rule() == Rule::stream_config_item {
100 let config_str = config_item.as_str();
103 let inner_pairs = config_item.into_inner();
104
105 if config_str.starts_with("provider") {
107 for pair in inner_pairs {
109 if pair.as_rule() == Rule::string {
110 config.provider = parse_string_literal(pair.as_str())?;
111 }
112 }
113 } else if config_str.starts_with("symbols") {
114 for pair in inner_pairs {
116 if pair.as_rule() == Rule::symbol_list {
117 config.symbols = parse_symbol_list(pair)?;
118 }
119 }
120 } else if config_str.starts_with("timeframes") {
121 for pair in inner_pairs {
123 if pair.as_rule() == Rule::timeframe {
124 if let Some(tf) = Timeframe::parse(pair.as_str()) {
125 config.timeframes.push(tf);
126 }
127 }
128 }
129 } else if config_str.starts_with("buffer_size") {
130 for pair in inner_pairs {
132 if pair.as_rule() == Rule::integer {
133 config.buffer_size = Some(pair.as_str().parse().map_err(|e| {
134 ShapeError::ParseError {
135 message: format!("Invalid buffer_size: {}", e),
136 location: None,
137 }
138 })?);
139 }
140 }
141 } else if config_str.starts_with("reconnect:")
142 && !config_str.starts_with("reconnect_delay")
143 {
144 for pair in inner_pairs {
146 if pair.as_rule() == Rule::boolean {
147 config.reconnect = Some(pair.as_str() == "true");
148 }
149 }
150 } else if config_str.starts_with("reconnect_delay") {
151 for pair in inner_pairs {
153 if pair.as_rule() == Rule::number {
154 config.reconnect_delay =
155 Some(pair.as_str().parse().map_err(|e| {
156 ShapeError::ParseError {
157 message: format!("Invalid reconnect_delay: {}", e),
158 location: None,
159 }
160 })?);
161 }
162 }
163 }
164 }
165 }
166 }
167 }
168
169 Ok(config)
170}
171
172fn parse_symbol_list(pair: Pair<Rule>) -> Result<Vec<String>> {
174 let mut symbols = vec![];
175
176 if pair.as_rule() == Rule::symbol_list {
177 for inner in pair.into_inner() {
178 if inner.as_rule() == Rule::ident {
179 symbols.push(inner.as_str().to_string());
180 }
181 }
182 }
183
184 Ok(symbols)
185}
186
187fn parse_stream_state(pair: Pair<Rule>) -> Result<Vec<VariableDecl>> {
189 let mut state = vec![];
190
191 for inner in pair.into_inner() {
192 if inner.as_rule() == Rule::stream_state_list {
193 for decl in inner.into_inner() {
194 if decl.as_rule() == Rule::variable_decl {
195 state.push(parse_variable_decl(decl)?);
196 }
197 }
198 }
199 }
200
201 Ok(state)
202}
203
204fn parse_stream_on_connect(pair: Pair<Rule>) -> Result<Vec<Statement>> {
206 statements::parse_statements(pair.into_inner())
207}
208
209fn parse_stream_on_disconnect(pair: Pair<Rule>) -> Result<Vec<Statement>> {
211 statements::parse_statements(pair.into_inner())
212}
213
214fn parse_stream_on_event(pair: Pair<Rule>) -> Result<StreamOnEvent> {
216 let mut inner_pairs = pair.into_inner();
217
218 let event_param = inner_pairs
220 .next()
221 .map(|p| p.as_str().to_string())
222 .unwrap_or_default();
223
224 let body = statements::parse_statements(inner_pairs)?;
226
227 Ok(StreamOnEvent { event_param, body })
228}
229
230fn parse_stream_on_window(pair: Pair<Rule>) -> Result<StreamOnWindow> {
232 let mut inner_pairs = pair.into_inner();
233
234 let key_param = inner_pairs
236 .next()
237 .map(|p| p.as_str().to_string())
238 .unwrap_or_default();
239
240 let window_param = inner_pairs
242 .next()
243 .map(|p| p.as_str().to_string())
244 .unwrap_or_default();
245
246 let body = statements::parse_statements(inner_pairs)?;
248
249 Ok(StreamOnWindow {
250 key_param,
251 window_param,
252 body,
253 })
254}
255
256fn parse_stream_on_error(pair: Pair<Rule>) -> Result<StreamOnError> {
258 let mut inner_pairs = pair.into_inner();
259
260 let error_param = inner_pairs
262 .next()
263 .map(|p| p.as_str().to_string())
264 .unwrap_or_default();
265
266 let body = statements::parse_statements(inner_pairs)?;
268
269 Ok(StreamOnError { error_param, body })
270}