springql_foreign_service/source/source_input/
timed_stream.rs1pub mod file_type;
4
5mod file_parser;
6mod timer;
7
8use self::{file_parser::FileParser, file_type::FileType, timer::Timer};
9use anyhow::{Context, Result};
10use chrono::{DateTime, FixedOffset};
11use std::{path::Path, thread, time::Duration};
12
13const SLEEP_DURATION: Duration = Duration::from_micros(10);
14
15#[derive(Debug)]
21pub struct TimedStream {
22 timestamp_field: String,
23 timer: Timer,
24 file_parser: FileParser,
25}
26
27impl TimedStream {
28 pub fn new<P: AsRef<Path>>(
29 file_type: FileType,
30 file_path: P,
31 timestamp_field: String,
32 virt_initial_datetime: DateTime<FixedOffset>,
33 ) -> Result<Self> {
34 let file_parser = FileParser::new(file_type, file_path)?;
35 let timer = Timer::new(virt_initial_datetime.into());
36 Ok(Self {
37 timestamp_field,
38 timer,
39 file_parser,
40 })
41 }
42}
43
44impl Iterator for TimedStream {
45 type Item = Result<serde_json::Value>;
46
47 fn next(&mut self) -> Option<Self::Item> {
48 self.file_parser.next().map(|res_json| {
49 let json = res_json?;
50
51 let timestamp_s = json.get(&self.timestamp_field).with_context(|| {
52 format!(
53 r#"timestamp field "{}" not found in line: {}"#,
54 self.timestamp_field, json
55 )
56 })?.as_str().with_context(|| {
57 format!(
58 r#"timestamp field "{}" is a string in line: {}"#,
59 self.timestamp_field, json
60 )
61 })?;
62 let timestamp =
63 DateTime::parse_from_rfc3339(timestamp_s)
64 .with_context(||
65 format!(
66 r#"timestamp field "{}" is not in RFC 3339 format. Correct example: "1996-12-19T16:39:57-08:00""#,
67 timestamp_s
68 )
69 )?;
70
71 while self.timer.virt_current_datetime() < timestamp {
72 thread::sleep(SLEEP_DURATION);
73 }
74
75 Ok(json)
76 })
77 }
78}