mod test_support;
use pretty_assertions::assert_eq;
use serde_json::json;
use springql::*;
use springql_foreign_service::{
sink::ForeignSink,
source::{ForeignSource, ForeignSourceInput},
};
use springql_test_logger::setup_test_logger;
use crate::test_support::{apply_ddls, drain_from_sink};
#[test]
fn test_feat_floor_time() {
setup_test_logger();
let json_oracle = json!({
"ts": "2020-01-01 23:59:59.999999999",
"ticker": "ORCL",
"amount": 20,
});
let source_input = vec![json_oracle];
let test_source = ForeignSource::new().unwrap();
let test_sink = ForeignSink::start().unwrap();
let ddls = vec![
"
CREATE SOURCE STREAM source_trade (
ts TIMESTAMP NOT NULL ROWTIME,
ticker TEXT NOT NULL,
amount INTEGER NOT NULL
);
"
.to_string(),
"
CREATE SINK STREAM sink_trade (
ts TIMESTAMP NOT NULL ROWTIME,
ticker TEXT NOT NULL,
amount INTEGER NOT NULL
);
"
.to_string(),
"
CREATE PUMP pu_floor_time AS
INSERT INTO sink_trade (ts, ticker, amount)
SELECT STREAM FLOOR_TIME(source_trade.ts, DURATION_SECS(1)), source_trade.ticker, source_trade.amount FROM source_trade;
"
.to_string(),
format!(
"
CREATE SINK WRITER tcp_sink_trade FOR sink_trade
TYPE NET_CLIENT OPTIONS (
PROTOCOL 'TCP',
REMOTE_HOST '{remote_host}',
REMOTE_PORT '{remote_port}'
);
",
remote_host = test_sink.host_ip(),
remote_port = test_sink.port()
),
format!(
"
CREATE SOURCE READER tcp_trade FOR source_trade
TYPE NET_CLIENT OPTIONS (
PROTOCOL 'TCP',
REMOTE_HOST '{remote_host}',
REMOTE_PORT '{remote_port}'
);
",
remote_host = test_source.host_ip(),
remote_port = test_source.port()
),
];
let _pipeline = apply_ddls(&ddls, SpringConfig::default());
test_source.start(ForeignSourceInput::new_fifo_batch(source_input));
let sink_received = drain_from_sink(&test_sink);
let r = sink_received.get(0).unwrap();
assert_eq!(r["ts"], "2020-01-01 23:59:59.000000000");
}