use super::integration_tests::{TABLE_NAME, slice_query, with_test_data};
use super::*;
use crate::adapters::cache::CacheConfig;
use crate::{RunFor, RunMode, nodes::*, types::*};
use anyhow::Result;
const PERIOD: std::time::Duration = std::time::Duration::from_secs(24 * 3600);
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
struct TestTradeCached {
sym: Sym,
price: f64,
qty: i64,
}
impl KdbDeserialize for TestTradeCached {
fn from_kdb_row(
row: Row<'_>,
_columns: &[String],
interner: &mut SymbolInterner,
) -> Result<(NanoTime, Self), KdbError> {
let time = row.get_timestamp(1)?; Ok((
time,
TestTradeCached {
sym: row.get_sym(2, interner)?,
price: row.get(3)?.get_float()?,
qty: row.get(4)?.get_long()?,
},
))
}
}
fn run_cached(conn: KdbConnection, cache_dir: &std::path::Path) -> Result<usize> {
let stream = kdb_read_cached::<TestTradeCached, _>(
conn,
PERIOD,
CacheConfig::new(cache_dir, u64::MAX),
|within, date, _| slice_query(date, within.0, within.1),
);
let collected = stream.collapse().collect();
collected.clone().run(
RunMode::HistoricalFrom(NanoTime::from_kdb_timestamp(0)),
RunFor::Duration(std::time::Duration::from_secs(86400)),
)?;
Ok(collected.peek_value().len())
}
fn count_cache_files(cache_dir: &std::path::Path) -> usize {
std::fs::read_dir(cache_dir)
.into_iter()
.flatten()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().map(|x| x == "cache").unwrap_or(false))
.count()
}
#[test]
fn test_kdb_read_cached_populates_and_hits() -> Result<()> {
let _ = env_logger::try_init();
let cache_dir =
std::env::temp_dir().join(format!("wingfoil_cache_integ_{}", std::process::id()));
with_test_data(3, 1, true, |_n, conn| {
let n = run_cached(conn, &cache_dir)?;
assert_eq!(n, 3, "First run should read 3 rows from KDB");
assert!(
count_cache_files(&cache_dir) > 0,
"Cache directory should contain .cache files after first run"
);
Ok(())
})?;
let closed = KdbConnection::new("localhost", 59999);
let n = run_cached(closed, &cache_dir)?;
assert_eq!(n, 3, "Second run should return same 3 rows from cache");
std::fs::remove_dir_all(&cache_dir).ok();
Ok(())
}
#[test]
fn test_kdb_read_cached_corrupt_fallback() -> Result<()> {
let _ = env_logger::try_init();
let cache_dir =
std::env::temp_dir().join(format!("wingfoil_cache_corrupt_{}", std::process::id()));
with_test_data(3, 1, true, |_n, conn| {
let n = run_cached(conn.clone(), &cache_dir)?;
assert_eq!(n, 3);
let corrupt_path = std::fs::read_dir(&cache_dir)?
.filter_map(|e| e.ok())
.find(|e| e.path().extension().map(|x| x == "cache").unwrap_or(false))
.expect("should have a cache file")
.path();
std::fs::write(
&corrupt_path,
format!("select from {}\ngarbage not valid bincode", TABLE_NAME),
)?;
let n = run_cached(conn, &cache_dir)?;
assert_eq!(n, 3, "Fallback run should still return 3 rows");
Ok(())
})?;
let closed = KdbConnection::new("localhost", 59999);
let n = run_cached(closed, &cache_dir)?;
assert_eq!(n, 3, "After corrupt-file overwrite, cache hit should work");
std::fs::remove_dir_all(&cache_dir).ok();
Ok(())
}
#[test]
fn test_kdb_read_cached_partial_cache() -> Result<()> {
let _ = env_logger::try_init();
let cache_dir =
std::env::temp_dir().join(format!("wingfoil_cache_partial_{}", std::process::id()));
let half_day = std::time::Duration::from_secs(12 * 3600);
let run = |conn: KdbConnection| -> Result<usize> {
let stream = kdb_read_cached::<TestTradeCached, _>(
conn,
half_day,
CacheConfig::new(&cache_dir, u64::MAX),
|within, date, _| slice_query(date, within.0, within.1),
);
let collected = stream.collapse().collect();
collected.clone().run(
RunMode::HistoricalFrom(NanoTime::from_kdb_timestamp(0)),
RunFor::Duration(std::time::Duration::from_secs(2 * 86400)),
)?;
Ok(collected.peek_value().len())
};
with_test_data(4, 2, true, |n, conn| {
let count = run(conn.clone())?;
assert_eq!(count, n, "First run should read all rows from KDB");
let victim = std::fs::read_dir(&cache_dir)?
.filter_map(|e| e.ok())
.find(|e| e.path().extension().map(|x| x == "cache").unwrap_or(false))
.expect("should have cache files")
.path();
std::fs::remove_file(&victim)?;
let count2 = run(conn)?;
assert_eq!(count2, n, "Partial-cache run should still return all rows");
Ok(())
})?;
let closed = KdbConnection::new("localhost", 59999);
let n = run(closed)?;
assert!(
n > 0,
"All slices cached: closed port should not be dialled"
);
std::fs::remove_dir_all(&cache_dir).ok();
Ok(())
}