mod test_support;
use std::thread;
use std::time::Duration;
use log::LevelFilter;
use serde_json::json;
use springql_core::api::*;
use springql_foreign_service::{
sink::ForeignSink,
source::{ForeignSource, ForeignSourceInput},
};
use springql_test_logger::setup_test_logger_with_level;
use crate::test_support::*;
fn _gen_source_input(n: u64) -> impl Iterator<Item = serde_json::Value> {
(0..n).map(move |_| {
let ts = "2020-01-01 00:00:00.000000000".to_string();
let ticker = String::from_iter(std::iter::repeat('X').take(1000));
let amount = 100;
json!({
"ts": ts,
"ticker": ticker,
"amount": amount,
})
})
}
fn t(n_in_rows: u64, upper_limit_bytes: u64) {
setup_test_logger_with_level(LevelFilter::Warn);
let source_trade = _gen_source_input(n_in_rows).collect();
let test_source_trade = ForeignSource::new().unwrap();
let test_source_city_temperature = ForeignSource::new().unwrap();
let test_sink = ForeignSink::start().unwrap();
let ddls = vec![
"
CREATE SOURCE STREAM source_trade (
ts TIMESTAMP NOT NULL ROWTIME,
ticker TEXT NOT NULL,
amount INTEGER NOT NULL
);
"
.to_string(),
"
CREATE SOURCE STREAM source_city_temperature (
ts TIMESTAMP NOT NULL ROWTIME,
city TEXT NOT NULL,
temperature INTEGER NOT NULL
);
"
.to_string(),
"
CREATE SINK STREAM sink_joined (
ts TIMESTAMP NOT NULL ROWTIME,
amount INTEGER NOT NULL,
temperature INTEGER
);
"
.to_string(),
"
CREATE PUMP pu_join AS
INSERT INTO sink_joined (ts, amount, temperature)
SELECT STREAM
source_trade.ts,
source_trade.amount,
source_city_temperature.temperature
FROM source_trade
LEFT OUTER JOIN source_city_temperature
ON source_trade.ts = source_city_temperature.ts
FIXED WINDOW DURATION_SECS(10), DURATION_SECS(0);
"
.to_string(),
format!(
"
CREATE SINK WRITER tcp_sink_joined FOR sink_joined
TYPE NET_CLIENT OPTIONS (
PROTOCOL 'TCP',
REMOTE_HOST '{remote_host}',
REMOTE_PORT '{remote_port}'
);
",
remote_host = test_sink.host_ip(),
remote_port = test_sink.port()
),
format!(
"
CREATE SOURCE READER tcp_source_trade FOR source_trade
TYPE NET_CLIENT OPTIONS (
PROTOCOL 'TCP',
REMOTE_HOST '{remote_host}',
REMOTE_PORT '{remote_port}'
);
",
remote_host = test_source_trade.host_ip(),
remote_port = test_source_trade.port()
),
format!(
"
CREATE SOURCE READER tcp_source_city_temperature FOR source_city_temperature
TYPE NET_CLIENT OPTIONS (
PROTOCOL 'TCP',
REMOTE_HOST '{remote_host}',
REMOTE_PORT '{remote_port}'
);
",
remote_host = test_source_city_temperature.host_ip(),
remote_port = test_source_city_temperature.port()
),
];
let mut config = SpringConfig::default();
config.memory.upper_limit_bytes = upper_limit_bytes;
config.memory.severe_to_critical_percent = 60;
config.memory.moderate_to_severe_percent = 30;
config.memory.critical_to_severe_percent = 50;
config.memory.severe_to_moderate_percent = 20;
let _pipeline = apply_ddls(&ddls, config);
test_source_trade.start(ForeignSourceInput::new_fifo_batch(source_trade));
test_source_city_temperature.start(ForeignSourceInput::new_fifo_batch(vec![]));
thread::sleep(Duration::from_secs(10));
}
#[test]
fn test_feat_purger() {
t(10000, 100000)
}