Skip to main content

shape_ast/parser/
stream.rs

1//! Parser for stream definitions
2
3use crate::error::{Result, ShapeError};
4use pest::iterators::Pair;
5
6use crate::ast::{
7    Span, Statement, StreamConfig, StreamDef, StreamOnError, StreamOnEvent, StreamOnWindow,
8    Timeframe, VariableDecl,
9};
10use crate::parser::string_literals::parse_string_literal;
11use crate::parser::{Rule, pair_span, parse_variable_decl, statements};
12
13/// Parse a stream definition
14pub fn parse_stream_def(pair: Pair<Rule>) -> Result<StreamDef> {
15    let mut name = String::new();
16    let mut name_span = Span::DUMMY;
17    let mut config = StreamConfig {
18        provider: String::new(),
19        symbols: vec![],
20        timeframes: vec![],
21        buffer_size: None,
22        reconnect: None,
23        reconnect_delay: None,
24    };
25    let mut state = vec![];
26    let mut on_connect = None;
27    let mut on_disconnect = None;
28    let mut on_event = None;
29    let mut on_window = None;
30    let mut on_error = None;
31
32    for inner in pair.into_inner() {
33        match inner.as_rule() {
34            Rule::ident => {
35                if name.is_empty() {
36                    name = inner.as_str().to_string();
37                    name_span = pair_span(&inner);
38                }
39            }
40            Rule::stream_body => {
41                for body_item in inner.into_inner() {
42                    match body_item.as_rule() {
43                        Rule::stream_config => {
44                            config = parse_stream_config(body_item)?;
45                        }
46                        Rule::stream_state => {
47                            state = parse_stream_state(body_item)?;
48                        }
49                        Rule::stream_on_connect => {
50                            on_connect = Some(parse_stream_on_connect(body_item)?);
51                        }
52                        Rule::stream_on_disconnect => {
53                            on_disconnect = Some(parse_stream_on_disconnect(body_item)?);
54                        }
55                        Rule::stream_on_event => {
56                            on_event = Some(parse_stream_on_event(body_item)?);
57                        }
58                        Rule::stream_on_window => {
59                            on_window = Some(parse_stream_on_window(body_item)?);
60                        }
61                        Rule::stream_on_error => {
62                            on_error = Some(parse_stream_on_error(body_item)?);
63                        }
64                        _ => {}
65                    }
66                }
67            }
68            _ => {}
69        }
70    }
71
72    Ok(StreamDef {
73        name,
74        name_span,
75        config,
76        state,
77        on_connect,
78        on_disconnect,
79        on_event,
80        on_window,
81        on_error,
82    })
83}
84
85/// Parse stream configuration
86fn parse_stream_config(pair: Pair<Rule>) -> Result<StreamConfig> {
87    let mut config = StreamConfig {
88        provider: String::new(),
89        symbols: vec![],
90        timeframes: vec![],
91        buffer_size: None,
92        reconnect: None,
93        reconnect_delay: None,
94    };
95
96    for inner in pair.into_inner() {
97        if inner.as_rule() == Rule::stream_config_list {
98            for config_item in inner.into_inner() {
99                if config_item.as_rule() == Rule::stream_config_item {
100                    // The stream_config_item already contains the key:value structure
101                    // We need to analyze its string content to determine which config it is
102                    let config_str = config_item.as_str();
103                    let inner_pairs = config_item.into_inner();
104
105                    // Check which config item this is by looking at the string content
106                    if config_str.starts_with("provider") {
107                        // Skip to the string value
108                        for pair in inner_pairs {
109                            if pair.as_rule() == Rule::string {
110                                config.provider = parse_string_literal(pair.as_str())?;
111                            }
112                        }
113                    } else if config_str.starts_with("symbols") {
114                        // Find the symbol_list
115                        for pair in inner_pairs {
116                            if pair.as_rule() == Rule::symbol_list {
117                                config.symbols = parse_symbol_list(pair)?;
118                            }
119                        }
120                    } else if config_str.starts_with("timeframes") {
121                        // Collect timeframes
122                        for pair in inner_pairs {
123                            if pair.as_rule() == Rule::timeframe {
124                                if let Some(tf) = Timeframe::parse(pair.as_str()) {
125                                    config.timeframes.push(tf);
126                                }
127                            }
128                        }
129                    } else if config_str.starts_with("buffer_size") {
130                        // Find the integer value
131                        for pair in inner_pairs {
132                            if pair.as_rule() == Rule::integer {
133                                config.buffer_size = Some(pair.as_str().parse().map_err(|e| {
134                                    ShapeError::ParseError {
135                                        message: format!("Invalid buffer_size: {}", e),
136                                        location: None,
137                                    }
138                                })?);
139                            }
140                        }
141                    } else if config_str.starts_with("reconnect:")
142                        && !config_str.starts_with("reconnect_delay")
143                    {
144                        // Find the boolean value
145                        for pair in inner_pairs {
146                            if pair.as_rule() == Rule::boolean {
147                                config.reconnect = Some(pair.as_str() == "true");
148                            }
149                        }
150                    } else if config_str.starts_with("reconnect_delay") {
151                        // Find the number value
152                        for pair in inner_pairs {
153                            if pair.as_rule() == Rule::number {
154                                config.reconnect_delay =
155                                    Some(pair.as_str().parse().map_err(|e| {
156                                        ShapeError::ParseError {
157                                            message: format!("Invalid reconnect_delay: {}", e),
158                                            location: None,
159                                        }
160                                    })?);
161                            }
162                        }
163                    }
164                }
165            }
166        }
167    }
168
169    Ok(config)
170}
171
172/// Parse a list of symbols
173fn parse_symbol_list(pair: Pair<Rule>) -> Result<Vec<String>> {
174    let mut symbols = vec![];
175
176    if pair.as_rule() == Rule::symbol_list {
177        for inner in pair.into_inner() {
178            if inner.as_rule() == Rule::ident {
179                symbols.push(inner.as_str().to_string());
180            }
181        }
182    }
183
184    Ok(symbols)
185}
186
187/// Parse stream state declarations
188fn parse_stream_state(pair: Pair<Rule>) -> Result<Vec<VariableDecl>> {
189    let mut state = vec![];
190
191    for inner in pair.into_inner() {
192        if inner.as_rule() == Rule::stream_state_list {
193            for decl in inner.into_inner() {
194                if decl.as_rule() == Rule::variable_decl {
195                    state.push(parse_variable_decl(decl)?);
196                }
197            }
198        }
199    }
200
201    Ok(state)
202}
203
204/// Parse on_connect handler
205fn parse_stream_on_connect(pair: Pair<Rule>) -> Result<Vec<Statement>> {
206    statements::parse_statements(pair.into_inner())
207}
208
209/// Parse on_disconnect handler
210fn parse_stream_on_disconnect(pair: Pair<Rule>) -> Result<Vec<Statement>> {
211    statements::parse_statements(pair.into_inner())
212}
213
214/// Parse on_event handler
215fn parse_stream_on_event(pair: Pair<Rule>) -> Result<StreamOnEvent> {
216    let mut inner_pairs = pair.into_inner();
217
218    // First should be the parameter
219    let event_param = inner_pairs
220        .next()
221        .map(|p| p.as_str().to_string())
222        .unwrap_or_default();
223
224    // Remaining pairs are the body statements
225    let body = statements::parse_statements(inner_pairs)?;
226
227    Ok(StreamOnEvent { event_param, body })
228}
229
230/// Parse on_window handler
231fn parse_stream_on_window(pair: Pair<Rule>) -> Result<StreamOnWindow> {
232    let mut inner_pairs = pair.into_inner();
233
234    // First parameter - key
235    let key_param = inner_pairs
236        .next()
237        .map(|p| p.as_str().to_string())
238        .unwrap_or_default();
239
240    // Second parameter - window
241    let window_param = inner_pairs
242        .next()
243        .map(|p| p.as_str().to_string())
244        .unwrap_or_default();
245
246    // Remaining pairs are the body statements
247    let body = statements::parse_statements(inner_pairs)?;
248
249    Ok(StreamOnWindow {
250        key_param,
251        window_param,
252        body,
253    })
254}
255
256/// Parse on_error handler
257fn parse_stream_on_error(pair: Pair<Rule>) -> Result<StreamOnError> {
258    let mut inner_pairs = pair.into_inner();
259
260    // First should be the parameter
261    let error_param = inner_pairs
262        .next()
263        .map(|p| p.as_str().to_string())
264        .unwrap_or_default();
265
266    // Remaining pairs are the body statements
267    let body = statements::parse_statements(inner_pairs)?;
268
269    Ok(StreamOnError { error_param, body })
270}