use minarrow::{FieldArray, Table, Vec64, arr_f64, arr_i32, arr_str32};
#[cfg(feature = "msgpack")]
use futures_core::Stream;
#[cfg(feature = "msgpack")]
use lightstream::models::protocol::LightstreamMessage;
#[cfg(feature = "msgpack")]
use lightstream::models::protocol::connection::LightstreamConnection;
#[cfg(feature = "msgpack")]
use tokio::io::AsyncWrite;
#[cfg(feature = "msgpack")]
#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct Command {
pub action: String,
pub timestamp_ms: u64,
pub params: Vec<String>,
}
pub fn make_table(label: &str, n_rows: usize) -> Table {
let ids: Vec64<i32> = (0..n_rows as i32).collect();
let values: Vec64<f64> = (0..n_rows).map(|i| i as f64 * 0.5).collect();
let labels: Vec64<String> = (0..n_rows).map(|i| format!("{}_{}", label, i)).collect();
let label_refs: Vec64<&str> = labels.iter().map(String::as_str).collect();
let id_col = FieldArray::from_arr("id", arr_i32!(ids));
let value_col = FieldArray::from_arr("value", arr_f64!(values));
let label_col = FieldArray::from_arr("label", arr_str32!(label_refs));
Table::new(label.to_string(), Some(vec![id_col, value_col, label_col]))
}
pub fn table_schema() -> Vec<minarrow::Field> {
let sample = make_table("_schema", 0);
sample.schema().iter().map(|f| (**f).clone()).collect()
}
#[cfg(feature = "msgpack")]
pub fn register_demo_types<S, W>(conn: &mut LightstreamConnection<S, W>)
where
S: Stream<Item = Result<Vec<u8>, std::io::Error>> + Unpin + Send,
W: AsyncWrite + Unpin + Send,
{
conn.register_message("raw");
conn.register_message("command");
conn.register_table("metrics", table_schema());
}
#[cfg(feature = "msgpack")]
pub async fn send_demo_messages<S, W>(
conn: &mut LightstreamConnection<S, W>,
label: &str,
) -> std::io::Result<()>
where
S: Stream<Item = Result<Vec<u8>, std::io::Error>> + Unpin + Send,
W: AsyncWrite + Unpin + Send,
{
conn.send("raw", format!("hello-{}", label).as_bytes())
.await?;
conn.send_msgpack(
"command",
&Command {
action: "ingest".into(),
timestamp_ms: 1_700_000_000_000,
params: vec![format!("--source={}", label)],
},
)
.await?;
conn.send_table("metrics", &make_table(label, 4)).await?;
conn.send("raw", format!("goodbye-{}", label).as_bytes())
.await?;
conn.flush().await?;
conn.shutdown().await?;
Ok(())
}
#[cfg(feature = "msgpack")]
pub async fn recv_and_print_all<S, W>(conn: &mut LightstreamConnection<S, W>)
where
S: Stream<Item = Result<Vec<u8>, std::io::Error>> + Unpin + Send,
W: AsyncWrite + Unpin + Send,
{
while let Some(result) = conn.recv().await {
let msg = result.unwrap();
match &msg {
LightstreamMessage::Message { tag, payload } => {
if *tag == 0 {
println!(" [raw] {:?}", std::str::from_utf8(payload).unwrap());
} else {
let cmd: Command = msg.decode_msgpack().unwrap();
println!(" [command] {:?}", cmd);
}
}
LightstreamMessage::Table { table, .. } => {
println!(" [table] {} rows, {} cols", table.n_rows, table.cols.len());
}
}
}
}