use super::read::compute_time_slices;
use super::{KdbConnection, KdbDeserialize, KdbExt, SymbolInterner};
use crate::adapters::cache::{CacheConfig, CacheKey, FileCache};
use crate::nodes::produce_async;
use crate::types::*;
use anyhow::bail;
use kdb_plus_fixed::ipc::{ConnectionMethod, K, QStream};
use log::info;
use std::rc::Rc;
#[must_use]
pub fn kdb_read_cached<T, F>(
connection: KdbConnection,
period: std::time::Duration,
cache_config: CacheConfig,
query_fn: F,
) -> Rc<dyn Stream<Burst<T>>>
where
T: Element
+ Send
+ Sync
+ KdbDeserialize
+ serde::Serialize
+ for<'de> serde::Deserialize<'de>
+ 'static,
F: FnMut((NanoTime, NanoTime), i32, usize) -> String + Send + 'static,
{
produce_async(move |ctx| {
let start_time = ctx.start_time;
let end_time_result = ctx.end_time();
async move {
if start_time == NanoTime::ZERO {
bail!(
"kdb_read_cached: start_time is NanoTime::ZERO; \
use RunMode::HistoricalFrom with an explicit start time"
);
}
let end_time = match end_time_result {
Ok(t) if t == NanoTime::MAX => bail!(
"kdb_read_cached requires RunFor::Duration; \
RunFor::Forever would generate an unbounded number of slices"
),
Ok(t) => t,
Err(_) => bail!(
"kdb_read_cached requires RunFor::Duration; \
RunFor::Cycles does not provide an end time"
),
};
tokio::fs::create_dir_all(&cache_config.folder).await?;
let cache = FileCache::<T>::new(cache_config);
let slices = compute_time_slices(start_time, end_time, period);
Ok(async_stream::stream! {
let mut socket: Option<QStream> = None;
let mut interner = SymbolInterner::default();
let mut query_fn = query_fn;
'slices: for (within, date, iteration) in slices {
let query = query_fn(within, date, iteration);
let key = CacheKey::from_parts(&[&query]);
let cached = match cache.get(&key).await {
Ok(Some(rows)) => Some(rows),
Ok(None) => None,
Err(e) => {
log::warn!("KDB cache read error (falling back to KDB): {e}");
None
}
};
if let Some(rows) = cached {
for (time, record) in rows {
yield Ok((time, record));
}
continue;
}
if socket.is_none() {
let creds = connection.credentials_string();
match QStream::connect(
ConnectionMethod::TCP,
&connection.host,
connection.port,
&creds,
)
.await
{
Ok(s) => socket = Some(s),
Err(e) => {
yield Err(e.into());
break 'slices;
}
}
}
let sock = socket.as_mut().unwrap();
info!("KDB query: {}", query);
let fetch_start = std::time::Instant::now();
let result: K = match sock.send_sync_message(&query.as_str()).await {
Ok(r) => r,
Err(e) => {
yield Err(e.into());
break 'slices;
}
};
let (columns, rows) = match (result.column_names(), result.rows()) {
(Ok(cols), Ok(rows)) => (cols, rows),
(Err(e), _) | (_, Err(e)) => {
yield Err(e);
break 'slices;
}
};
let row_count = rows.len();
info!("KDB query: {} rows in {:?}", row_count, fetch_start.elapsed());
let mut parsed: Vec<(NanoTime, T)> = Vec::with_capacity(row_count);
let mut prev_time: Option<NanoTime> = None;
for row in &rows {
let (time, record) = match T::from_kdb_row(row, &columns, &mut interner) {
Ok(r) => r,
Err(e) => {
yield Err(e.into());
break 'slices;
}
};
if let Some(prev) = prev_time
&& time < prev
{
yield Err(anyhow::anyhow!(
"KDB data is not sorted by time: got {:?} after {:?}. \
Add `xasc` to your query to sort the data.",
time,
prev
));
break 'slices;
}
prev_time = Some(time);
parsed.push((time, record));
}
if let Err(e) = cache.put(&key, &query, &parsed).await {
log::warn!("KDB cache write error: {e}");
}
for (time, record) in parsed {
yield Ok((time, record));
}
}
})
}
})
}