1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

use std::sync::Arc;

use crate::{
    error::Result,
    pipeline::stream_model::StreamModel,
    stream_engine::autonomous_executor::row::{column::stream_column::StreamColumns, Row},
};

use super::format::json::JsonObject;

/// Input row from foreign sources (retrieved from SourceReader).
///
/// Immediately converted into Row on stream-engine boundary.
#[derive(Eq, PartialEq, Debug)]
pub(in crate::stream_engine) struct SourceRow(JsonObject);

impl SourceRow {
    pub(in crate::stream_engine) fn from_json(json: JsonObject) -> Self {
        Self(json)
    }

    /// # Failure
    ///
    /// - [SpringError::InvalidFormat](crate::error::SpringError::InvalidFormat) when:
    ///   - This input row cannot be converted into row.
    pub(in crate::stream_engine::autonomous_executor) fn into_row(
        self,
        stream_model: Arc<StreamModel>,
    ) -> Result<Row> {
        // SourceRow -> JsonObject -> HashMap<ColumnName, SqlValue> -> StreamColumns -> Row

        let column_values = self.0.into_column_values()?;
        let stream_columns = StreamColumns::new(stream_model, column_values)?;
        Ok(Row::new(stream_columns))
    }
}

#[cfg(test)]
mod tests {

    use super::*;

    #[test]
    fn test_json_into_row() {
        let stream = Arc::new(StreamModel::fx_city_temperature());

        let fr = SourceRow::fx_city_temperature_tokyo();
        let r = Row::fx_city_temperature_tokyo();
        assert_eq!(fr.into_row(stream).unwrap(), r);
    }
}