1use std::borrow::Cow;
2use std::collections::HashMap;
3
4use serde::de::{
5 DeserializeSeed, IgnoredAny, IntoDeserializer, MapAccess, Visitor,
6 value::{MapDeserializer, StrDeserializer},
7};
8use serde_json::value::RawValue;
9
10use crate::models::{
11 Board, Candle, CandleBorder, Engine, Index, IndexAnalytics, Market, OrderbookLevel,
12 ParseSecuritySnapshotError, SecId, SecStat, Security, SecurityBoard, SecuritySnapshot, Trade,
13 Turnover,
14};
15#[cfg(feature = "news")]
16use crate::models::{Event, SiteNews};
17#[cfg(feature = "history")]
18use crate::models::{HistoryDates, HistoryRecord};
19
20use super::MoexError;
21use super::constants::*;
22use super::convert::*;
23use super::wire::*;
24
25#[derive(Debug, serde::Deserialize)]
26struct RawIssTableRowsPayload {
27 columns: Vec<Box<str>>,
28 data: Vec<Vec<serde_json::Value>>,
29}
30
31#[derive(Debug)]
32pub struct RawTables {
37 endpoint: Box<str>,
38 blocks: HashMap<Box<str>, Box<RawValue>>,
39}
40
41impl RawTables {
42 pub fn len(&self) -> usize {
44 self.blocks.len()
45 }
46
47 pub fn is_empty(&self) -> bool {
49 self.blocks.is_empty()
50 }
51
52 pub fn table_names(&self) -> impl Iterator<Item = &str> {
57 self.blocks.keys().map(Box::as_ref)
58 }
59
60 pub fn take_rows<T>(&mut self, table: impl Into<String>) -> Result<Vec<T>, MoexError>
65 where
66 T: serde::de::DeserializeOwned,
67 {
68 let table: Box<str> = table.into().into_boxed_str();
69 let raw_table =
70 self.blocks
71 .remove(table.as_ref())
72 .ok_or_else(|| MoexError::MissingRawTable {
73 endpoint: self.endpoint.clone(),
74 table: table.clone(),
75 })?;
76 let table_payload =
78 serde_json::from_str(raw_table.get()).map_err(|source| MoexError::Decode {
79 endpoint: format!("{} (table={})", self.endpoint, table).into_boxed_str(),
80 source,
81 })?;
82 decode_raw_table_rows_payload_with_context(
83 table_payload,
84 self.endpoint.as_ref(),
85 table.as_ref(),
86 )
87 }
88}
89
90#[derive(Debug)]
91pub struct RawTableView<'a> {
93 columns: Vec<Cow<'a, str>>,
94 data: Vec<Vec<&'a RawValue>>,
95}
96
97impl<'a> RawTableView<'a> {
98 pub fn columns(&self) -> &[Cow<'a, str>] {
100 &self.columns
101 }
102
103 pub fn rows(&self) -> &[Vec<&'a RawValue>] {
105 &self.data
106 }
107
108 pub fn len(&self) -> usize {
110 self.data.len()
111 }
112
113 pub fn is_empty(&self) -> bool {
115 self.data.is_empty()
116 }
117
118 pub fn column_index(&self, column: &str) -> Option<usize> {
120 self.columns.iter().position(|name| name.as_ref() == column)
121 }
122
123 pub fn raw_value(&self, row: usize, column: &str) -> Option<&'a RawValue> {
125 let column_index = self.column_index(column)?;
126 self.data
127 .get(row)
128 .and_then(|values| values.get(column_index))
129 .copied()
130 }
131
132 pub fn deserialize_value<T>(
136 &self,
137 row: usize,
138 column: &str,
139 ) -> Result<Option<T>, serde_json::Error>
140 where
141 T: serde::de::Deserialize<'a>,
142 {
143 self.raw_value(row, column)
144 .map(|raw| serde_json::from_str(raw.get()))
145 .transpose()
146 }
147}
148
149struct RawIssTableRowsSeed<'a> {
150 table: &'a str,
151}
152
153impl<'de> DeserializeSeed<'de> for RawIssTableRowsSeed<'_> {
154 type Value = Option<RawIssTableRowsPayload>;
155
156 fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
157 where
158 D: serde::Deserializer<'de>,
159 {
160 deserializer.deserialize_map(RawIssTableRowsVisitor { table: self.table })
161 }
162}
163
164struct RawIssTableRowsVisitor<'a> {
165 table: &'a str,
166}
167
168impl<'de> Visitor<'de> for RawIssTableRowsVisitor<'_> {
169 type Value = Option<RawIssTableRowsPayload>;
170
171 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 formatter.write_str("ISS JSON object with table blocks")
173 }
174
175 fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
176 where
177 A: MapAccess<'de>,
178 {
179 let mut selected = None;
180
181 while let Some(key) = map.next_key::<Cow<'de, str>>()? {
182 if key == self.table && selected.is_none() {
183 selected = Some(map.next_value::<RawIssTableRowsPayload>()?);
184 } else {
185 map.next_value::<IgnoredAny>()?;
186 }
187 }
188
189 Ok(selected)
190 }
191}
192
193fn decode_single_raw_table_payload(
194 payload: &str,
195 table: &str,
196) -> Result<Option<RawIssTableRowsPayload>, serde_json::Error> {
197 let mut deserializer = serde_json::Deserializer::from_str(payload);
198 RawIssTableRowsSeed { table }.deserialize(&mut deserializer)
199}
200
201fn decode_top_level_raw_blocks(
202 payload: &str,
203) -> Result<HashMap<Box<str>, Box<RawValue>>, serde_json::Error> {
204 serde_json::from_str(payload)
207}
208
209#[derive(Debug, serde::Deserialize)]
210struct BorrowedRawIssTableRowsPayload<'a> {
211 #[serde(borrow)]
212 columns: Vec<Cow<'a, str>>,
213 #[serde(borrow)]
214 data: Vec<Vec<&'a RawValue>>,
215}
216
217struct BorrowedRawIssTableRowsSeed<'a> {
218 table: &'a str,
219}
220
221impl<'de> DeserializeSeed<'de> for BorrowedRawIssTableRowsSeed<'_> {
222 type Value = Option<BorrowedRawIssTableRowsPayload<'de>>;
223
224 fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
225 where
226 D: serde::Deserializer<'de>,
227 {
228 deserializer.deserialize_map(BorrowedRawIssTableRowsVisitor { table: self.table })
229 }
230}
231
232struct BorrowedRawIssTableRowsVisitor<'a> {
233 table: &'a str,
234}
235
236impl<'de> Visitor<'de> for BorrowedRawIssTableRowsVisitor<'_> {
237 type Value = Option<BorrowedRawIssTableRowsPayload<'de>>;
238
239 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240 formatter.write_str("ISS JSON object with table blocks")
241 }
242
243 fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
244 where
245 A: MapAccess<'de>,
246 {
247 let mut selected = None;
248
249 while let Some(key) = map.next_key::<Cow<'de, str>>()? {
250 if key == self.table && selected.is_none() {
251 selected = Some(map.next_value::<BorrowedRawIssTableRowsPayload<'de>>()?);
252 } else {
253 map.next_value::<IgnoredAny>()?;
254 }
255 }
256
257 Ok(selected)
258 }
259}
260
261fn decode_single_raw_table_payload_borrowed<'a>(
262 payload: &'a str,
263 table: &str,
264) -> Result<Option<BorrowedRawIssTableRowsPayload<'a>>, serde_json::Error> {
265 let mut deserializer = serde_json::Deserializer::from_str(payload);
266 BorrowedRawIssTableRowsSeed { table }.deserialize(&mut deserializer)
267}
268
269fn decode_raw_table_row<T>(
270 columns: &[Box<str>],
271 values: Vec<serde_json::Value>,
272) -> Result<T, serde_json::Error>
273where
274 T: serde::de::DeserializeOwned,
275{
276 let entries = columns
277 .iter()
278 .map(Box::as_ref)
279 .zip(values)
280 .map(|(column, value)| {
281 (
282 StrDeserializer::<serde_json::Error>::new(column),
283 value.into_deserializer(),
284 )
285 });
286 let deserializer = MapDeserializer::new(entries);
287 <T as serde::Deserialize>::deserialize(deserializer)
288}
289
290fn decode_raw_table_rows_payload_with_context<T>(
291 table_payload: RawIssTableRowsPayload,
292 endpoint: &str,
293 table: &str,
294) -> Result<Vec<T>, MoexError>
295where
296 T: serde::de::DeserializeOwned,
297{
298 let endpoint: Box<str> = endpoint.to_owned().into_boxed_str();
299 let table: Box<str> = table.to_owned().into_boxed_str();
300 let RawIssTableRowsPayload { columns, data } = table_payload;
301 let expected_width = columns.len();
302 let mut rows = Vec::with_capacity(data.len());
303
304 for (row, values) in data.into_iter().enumerate() {
305 let actual_width = values.len();
306 if actual_width != expected_width {
307 return Err(MoexError::InvalidRawTableRowWidth {
308 endpoint: endpoint.clone(),
309 table: table.clone(),
310 row,
311 expected: expected_width,
312 actual: actual_width,
313 });
314 }
315
316 let decoded = decode_raw_table_row::<T>(&columns, values).map_err(|source| {
317 MoexError::InvalidRawTableRow {
318 endpoint: endpoint.clone(),
319 table: table.clone(),
320 row,
321 source,
322 }
323 })?;
324 rows.push(decoded);
325 }
326
327 Ok(rows)
328}
329
330pub(super) fn decode_raw_table_rows_json_with_endpoint<T>(
331 payload: &str,
332 endpoint: &str,
333 table: &str,
334) -> Result<Vec<T>, MoexError>
335where
336 T: serde::de::DeserializeOwned,
337{
338 let endpoint = endpoint.to_owned().into_boxed_str();
339 let table = table.to_owned().into_boxed_str();
340
341 let table_payload = decode_single_raw_table_payload(payload, table.as_ref())
342 .map_err(|source| MoexError::Decode {
343 endpoint: endpoint.clone(),
344 source,
345 })?
346 .ok_or_else(|| MoexError::MissingRawTable {
347 endpoint: endpoint.clone(),
348 table: table.clone(),
349 })?;
350 decode_raw_table_rows_payload_with_context(table_payload, endpoint.as_ref(), table.as_ref())
351}
352
353pub(super) fn decode_raw_tables_json_with_endpoint(
354 payload: &str,
355 endpoint: &str,
356) -> Result<RawTables, MoexError> {
357 let endpoint = endpoint.to_owned().into_boxed_str();
358 let raw_blocks = decode_top_level_raw_blocks(payload).map_err(|source| MoexError::Decode {
359 endpoint: endpoint.clone(),
360 source,
361 })?;
362 Ok(RawTables {
363 endpoint,
364 blocks: raw_blocks,
365 })
366}
367
368pub(super) fn decode_raw_table_view_json_with_endpoint<'a>(
369 payload: &'a str,
370 endpoint: &str,
371 table: &str,
372) -> Result<RawTableView<'a>, MoexError> {
373 let endpoint = endpoint.to_owned().into_boxed_str();
374 let table = table.to_owned().into_boxed_str();
375
376 let table_payload = decode_single_raw_table_payload_borrowed(payload, table.as_ref())
377 .map_err(|source| MoexError::Decode {
378 endpoint: endpoint.clone(),
379 source,
380 })?
381 .ok_or_else(|| MoexError::MissingRawTable {
382 endpoint: endpoint.clone(),
383 table: table.clone(),
384 })?;
385
386 let expected_width = table_payload.columns.len();
387 for (row, values) in table_payload.data.iter().enumerate() {
388 let actual_width = values.len();
389 if actual_width != expected_width {
390 return Err(MoexError::InvalidRawTableRowWidth {
391 endpoint: endpoint.clone(),
392 table: table.clone(),
393 row,
394 expected: expected_width,
395 actual: actual_width,
396 });
397 }
398 }
399
400 Ok(RawTableView {
401 columns: table_payload.columns,
402 data: table_payload.data,
403 })
404}
405
406pub(super) fn decode_indexes_json_payload(payload: &str) -> Result<Vec<Index>, MoexError> {
407 let payload: IndexesResponse =
408 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
409 endpoint: INDEXES_ENDPOINT.to_owned().into_boxed_str(),
410 source,
411 })?;
412 convert_index_rows(payload.indices.data, INDEXES_ENDPOINT)
413}
414
415pub(super) fn decode_engines_json_payload(payload: &str) -> Result<Vec<Engine>, MoexError> {
416 let payload: EnginesResponse =
417 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
418 endpoint: ENGINES_ENDPOINT.to_owned().into_boxed_str(),
419 source,
420 })?;
421 convert_engine_rows(payload.engines.data, ENGINES_ENDPOINT)
422}
423
424pub(super) fn decode_markets_json_with_endpoint(
425 payload: &str,
426 endpoint: &str,
427) -> Result<Vec<Market>, MoexError> {
428 let payload: MarketsResponse =
429 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
430 endpoint: endpoint.to_owned().into_boxed_str(),
431 source,
432 })?;
433 convert_market_rows(payload.markets.data, endpoint)
434}
435
436pub(super) fn decode_boards_json_with_endpoint(
437 payload: &str,
438 endpoint: &str,
439) -> Result<Vec<Board>, MoexError> {
440 let payload: BoardsResponse =
441 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
442 endpoint: endpoint.to_owned().into_boxed_str(),
443 source,
444 })?;
445 convert_board_rows(payload.boards.data, endpoint)
446}
447
448pub(super) fn decode_security_boards_json_with_endpoint(
449 payload: &str,
450 endpoint: &str,
451) -> Result<Vec<SecurityBoard>, MoexError> {
452 let payload: SecurityBoardsResponse =
453 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
454 endpoint: endpoint.to_owned().into_boxed_str(),
455 source,
456 })?;
457 convert_security_board_rows(payload.boards.data, endpoint)
458}
459
460pub(super) fn decode_securities_json_with_endpoint(
461 payload: &str,
462 endpoint: &str,
463) -> Result<Vec<Security>, MoexError> {
464 let payload: SecuritiesResponse =
465 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
466 endpoint: endpoint.to_owned().into_boxed_str(),
467 source,
468 })?;
469 convert_security_rows(payload.securities.data, endpoint)
470}
471
472pub(super) fn decode_board_security_snapshots_json_with_endpoint(
473 payload: &str,
474 endpoint: &str,
475) -> Result<Vec<SecuritySnapshot>, MoexError> {
476 let payload: BoardSecuritySnapshotsResponse =
477 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
478 endpoint: endpoint.to_owned().into_boxed_str(),
479 source,
480 })?;
481
482 let mut last_by_secid: HashMap<SecId, (usize, Option<f64>)> =
483 HashMap::with_capacity(payload.marketdata.data.len());
484 for (row, BoardMarketDataRow(secid, last)) in payload.marketdata.data.into_iter().enumerate() {
485 if let Some(last) = last
486 && !last.is_finite()
487 {
488 return Err(MoexError::InvalidSecuritySnapshot {
489 endpoint: endpoint.to_owned().into_boxed_str(),
490 table: "marketdata",
491 row,
492 source: ParseSecuritySnapshotError::NonFiniteLast(last),
493 });
494 }
495 let secid = parse_snapshot_secid(endpoint, "marketdata", row, secid)?;
496 last_by_secid.insert(secid, (row, last));
497 }
498
499 let mut snapshots = Vec::with_capacity(payload.securities.data.len().max(last_by_secid.len()));
500 for (row, BoardSecurityRow(secid, lot_size_raw)) in
501 payload.securities.data.into_iter().enumerate()
502 {
503 let secid = parse_snapshot_secid(endpoint, "securities", row, secid)?;
504 let lot_size = parse_snapshot_lot_size(endpoint, "securities", row, lot_size_raw)?;
505 let last = last_by_secid.remove(&secid).and_then(|(_, last)| last);
506
507 let snapshot =
508 SecuritySnapshot::try_from_parts(secid, lot_size, last).map_err(|source| {
509 MoexError::InvalidSecuritySnapshot {
510 endpoint: endpoint.to_owned().into_boxed_str(),
511 table: "securities",
512 row,
513 source,
514 }
515 })?;
516 snapshots.push(snapshot);
517 }
518
519 for (secid, (row, last)) in last_by_secid {
520 let snapshot = SecuritySnapshot::try_from_parts(secid, None, last).map_err(|source| {
521 MoexError::InvalidSecuritySnapshot {
522 endpoint: endpoint.to_owned().into_boxed_str(),
523 table: "marketdata",
524 row,
525 source,
526 }
527 })?;
528 snapshots.push(snapshot);
529 }
530
531 Ok(snapshots)
532}
533
534pub(super) fn decode_orderbook_json_with_endpoint(
535 payload: &str,
536 endpoint: &str,
537) -> Result<Vec<OrderbookLevel>, MoexError> {
538 let payload: OrderbookResponse =
539 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
540 endpoint: endpoint.to_owned().into_boxed_str(),
541 source,
542 })?;
543 convert_orderbook_rows(payload.orderbook.data, endpoint)
544}
545
546pub(super) fn decode_candle_borders_json_with_endpoint(
547 payload: &str,
548 endpoint: &str,
549) -> Result<Vec<CandleBorder>, MoexError> {
550 let payload: CandleBordersResponse =
551 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
552 endpoint: endpoint.to_owned().into_boxed_str(),
553 source,
554 })?;
555 convert_candle_border_rows(payload.borders.data, endpoint)
556}
557
558pub(super) fn decode_candles_json_with_endpoint(
559 payload: &str,
560 endpoint: &str,
561) -> Result<Vec<Candle>, MoexError> {
562 let payload: CandlesResponse =
563 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
564 endpoint: endpoint.to_owned().into_boxed_str(),
565 source,
566 })?;
567 convert_candle_rows(payload.candles.data, endpoint)
568}
569
570pub(super) fn decode_trades_json_with_endpoint(
571 payload: &str,
572 endpoint: &str,
573) -> Result<Vec<Trade>, MoexError> {
574 let payload: TradesResponse =
575 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
576 endpoint: endpoint.to_owned().into_boxed_str(),
577 source,
578 })?;
579 convert_trade_rows(payload.trades.data, endpoint)
580}
581
582pub(super) fn decode_index_analytics_json_with_endpoint(
583 payload: &str,
584 endpoint: &str,
585) -> Result<Vec<IndexAnalytics>, MoexError> {
586 let payload: IndexAnalyticsResponse =
587 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
588 endpoint: endpoint.to_owned().into_boxed_str(),
589 source,
590 })?;
591 convert_index_analytics_rows(payload.analytics.data, endpoint)
592}
593
594#[cfg(feature = "history")]
595pub(super) fn decode_history_dates_json_with_endpoint(
596 payload: &str,
597 endpoint: &str,
598) -> Result<Vec<HistoryDates>, MoexError> {
599 let payload: HistoryDatesResponse =
600 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
601 endpoint: endpoint.to_owned().into_boxed_str(),
602 source,
603 })?;
604 convert_history_dates_rows(payload.dates.data, endpoint)
605}
606
607#[cfg(feature = "history")]
608pub(super) fn decode_history_json_with_endpoint(
609 payload: &str,
610 endpoint: &str,
611) -> Result<Vec<HistoryRecord>, MoexError> {
612 let payload: HistoryResponse =
613 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
614 endpoint: endpoint.to_owned().into_boxed_str(),
615 source,
616 })?;
617 convert_history_rows(payload.history.data, endpoint)
618}
619
620pub(super) fn decode_turnovers_json_with_endpoint(
621 payload: &str,
622 endpoint: &str,
623) -> Result<Vec<Turnover>, MoexError> {
624 let payload: TurnoversResponse =
625 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
626 endpoint: endpoint.to_owned().into_boxed_str(),
627 source,
628 })?;
629 convert_turnover_rows(payload.turnovers.data, endpoint)
630}
631
632#[cfg(feature = "news")]
633pub(super) fn decode_sitenews_json_with_endpoint(
634 payload: &str,
635 endpoint: &str,
636) -> Result<Vec<SiteNews>, MoexError> {
637 let payload: SiteNewsResponse =
638 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
639 endpoint: endpoint.to_owned().into_boxed_str(),
640 source,
641 })?;
642 convert_sitenews_rows(payload.sitenews.data, endpoint)
643}
644
645#[cfg(feature = "news")]
646pub(super) fn decode_events_json_with_endpoint(
647 payload: &str,
648 endpoint: &str,
649) -> Result<Vec<Event>, MoexError> {
650 let payload: EventsResponse =
651 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
652 endpoint: endpoint.to_owned().into_boxed_str(),
653 source,
654 })?;
655 convert_event_rows(payload.events.data, endpoint)
656}
657
658pub(super) fn decode_secstats_json_with_endpoint(
659 payload: &str,
660 endpoint: &str,
661) -> Result<Vec<SecStat>, MoexError> {
662 let payload: SecStatsResponse =
663 serde_json::from_str(payload).map_err(|source| MoexError::Decode {
664 endpoint: endpoint.to_owned().into_boxed_str(),
665 source,
666 })?;
667 convert_secstats_rows(payload.secstats.data, endpoint)
668}
669
670fn parse_snapshot_secid(
671 endpoint: &str,
672 table: &'static str,
673 row: usize,
674 secid: String,
675) -> Result<SecId, MoexError> {
676 SecId::try_from(secid).map_err(|source| MoexError::InvalidSecuritySnapshot {
677 endpoint: endpoint.to_owned().into_boxed_str(),
678 table,
679 row,
680 source: ParseSecuritySnapshotError::InvalidSecId(source),
681 })
682}
683
684fn parse_snapshot_lot_size(
685 endpoint: &str,
686 table: &'static str,
687 row: usize,
688 lot_size: Option<i64>,
689) -> Result<Option<u32>, MoexError> {
690 match lot_size {
691 None => Ok(None),
692 Some(raw) if raw < 0 => Err(MoexError::InvalidSecuritySnapshot {
693 endpoint: endpoint.to_owned().into_boxed_str(),
694 table,
695 row,
696 source: ParseSecuritySnapshotError::NegativeLotSize(raw),
697 }),
698 Some(raw) => u32::try_from(raw)
699 .map(Some)
700 .map_err(|_| MoexError::InvalidSecuritySnapshot {
701 endpoint: endpoint.to_owned().into_boxed_str(),
702 table,
703 row,
704 source: ParseSecuritySnapshotError::LotSizeOutOfRange(raw),
705 }),
706 }
707}