springql_foreign_service/source/source_input/
timed_stream.rs

1// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.
2
3pub mod file_type;
4
5mod file_parser;
6mod timer;
7
8use self::{file_parser::FileParser, file_type::FileType, timer::Timer};
9use anyhow::{Context, Result};
10use chrono::{DateTime, FixedOffset};
11use std::{path::Path, thread, time::Duration};
12
13const SLEEP_DURATION: Duration = Duration::from_micros(10);
14
15/// Generates lines with older timestamp from "now".
16///
17/// If current line's timestamp is newer than "now", `Iterator::next()` blocks (with sleep).
18///
19/// If lines in a file is not ordered by timestamp, Timed Stream just generates `line_with_newer_timestamp -> line_with_older_timestamp` in consecutive iterations.
20#[derive(Debug)]
21pub struct TimedStream {
22    timestamp_field: String,
23    timer: Timer,
24    file_parser: FileParser,
25}
26
27impl TimedStream {
28    pub fn new<P: AsRef<Path>>(
29        file_type: FileType,
30        file_path: P,
31        timestamp_field: String,
32        virt_initial_datetime: DateTime<FixedOffset>,
33    ) -> Result<Self> {
34        let file_parser = FileParser::new(file_type, file_path)?;
35        let timer = Timer::new(virt_initial_datetime.into());
36        Ok(Self {
37            timestamp_field,
38            timer,
39            file_parser,
40        })
41    }
42}
43
44impl Iterator for TimedStream {
45    type Item = Result<serde_json::Value>;
46
47    fn next(&mut self) -> Option<Self::Item> {
48        self.file_parser.next().map(|res_json| {
49            let json = res_json?;
50
51            let timestamp_s = json.get(&self.timestamp_field).with_context(|| {
52                format!(
53                    r#"timestamp field "{}" not found in line: {}"#,
54                    self.timestamp_field, json
55                )
56            })?.as_str().with_context(|| {
57                format!(
58                    r#"timestamp field "{}" is a string in line: {}"#,
59                    self.timestamp_field, json
60                )
61            })?;
62            let timestamp =
63                DateTime::parse_from_rfc3339(timestamp_s)
64                    .with_context(||
65                        format!(
66                            r#"timestamp field "{}" is not in RFC 3339 format. Correct example: "1996-12-19T16:39:57-08:00""#,
67                            timestamp_s
68                        )
69                    )?;
70
71            while self.timer.virt_current_datetime() < timestamp {
72                thread::sleep(SLEEP_DURATION);
73            }
74
75            Ok(json)
76        })
77    }
78}