use super::KdbConnection;
use crate::nodes::{FutStream, StreamOperators};
use crate::types::*;
use chrono::NaiveDateTime;
use futures::StreamExt;
use kdb_plus_fixed::ipc::{ConnectionMethod, K, QStream};
use std::pin::Pin;
use std::rc::Rc;
pub trait KdbSerialize: Sized {
fn to_kdb_row(&self) -> K;
fn table_schema() -> &'static str {
"" }
}
#[must_use]
pub fn kdb_write<T>(
connection: KdbConnection,
table_name: impl Into<String>,
upstream: &Rc<dyn Stream<Burst<T>>>,
) -> Rc<dyn Node>
where
T: Element + Send + KdbSerialize + 'static,
{
let table_name = table_name.into();
let consumer = Box::new(move |source: Pin<Box<dyn FutStream<Burst<T>>>>| {
kdb_write_consumer(connection, table_name, source)
});
upstream.consume_async(consumer)
}
async fn kdb_write_consumer<T>(
connection: KdbConnection,
table_name: String,
mut source: Pin<Box<dyn FutStream<Burst<T>>>>,
) -> anyhow::Result<()>
where
T: Element + Send + KdbSerialize + 'static,
{
let creds = connection.credentials_string();
let mut socket = QStream::connect(
ConnectionMethod::TCP,
&connection.host,
connection.port,
&creds,
)
.await?;
while let Some((time, batch)) = source.next().await {
for record in batch {
let row = record.to_kdb_row();
let naive: NaiveDateTime = time.into();
let mut row_values = vec![K::new_timestamp(naive.and_utc())];
if let Ok(list) = row.as_vec::<K>() {
row_values.extend(list.iter().cloned());
}
let full_row = K::new_compound_list(row_values);
let query = K::new_compound_list(vec![
K::new_string("insert".to_string(), kdb_plus_fixed::qattribute::NONE),
K::new_symbol(table_name.clone()),
full_row,
]);
socket.send_sync_message(&query).await?;
}
}
Ok(())
}
pub trait KdbWriteOperators<T: Element> {
#[must_use]
fn kdb_write(self: &Rc<Self>, conn: KdbConnection, table: &str) -> Rc<dyn Node>;
}
impl<T: Element + Send + KdbSerialize + 'static> KdbWriteOperators<T> for dyn Stream<Burst<T>> {
fn kdb_write(self: &Rc<Self>, conn: KdbConnection, table: &str) -> Rc<dyn Node> {
kdb_write(conn, table, self)
}
}
#[cfg(test)]
mod tests {
use super::*;
use kdb_plus_fixed::qtype;
#[test]
fn test_kdb_serialize_trait() {
#[derive(Debug, Clone, Default)]
struct TestRecord {
sym: String,
price: f64,
size: i64,
}
impl KdbSerialize for TestRecord {
fn to_kdb_row(&self) -> K {
K::new_compound_list(vec![
K::new_symbol(self.sym.clone()),
K::new_float(self.price),
K::new_long(self.size),
])
}
}
let record = TestRecord {
sym: "AAPL".to_string(),
price: 185.50,
size: 100,
};
let row = record.to_kdb_row();
assert_eq!(row.get_type(), qtype::COMPOUND_LIST);
}
#[test]
fn test_kdb_write_node_creation() {
use crate::nodes::constant;
#[derive(Debug, Clone, Default)]
struct TestTrade {
sym: String,
price: f64,
}
impl KdbSerialize for TestTrade {
fn to_kdb_row(&self) -> K {
K::new_compound_list(vec![
K::new_symbol(self.sym.clone()),
K::new_float(self.price),
])
}
}
let conn = KdbConnection::new("localhost", 5000);
let trade = TestTrade {
sym: "TEST".to_string(),
price: 100.0,
};
let mut batch: Burst<TestTrade> = Burst::new();
batch.push(trade);
let stream = constant(batch);
let _node = kdb_write(conn, "test_table", &stream);
}
}