mod helpers;
use helpers::{Command, make_table, table_schema};
use lightstream::models::protocol::LightstreamMessage;
use lightstream::models::protocol::connection::TcpLightstreamConnection;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Lightstream Protocol Example");
println!("============================\n");
println!("1. Raw Messages");
raw_messages().await?;
println!("\n2. Msgpack Messages");
msgpack_messages().await?;
println!("\n3. Arrow Tables");
arrow_tables().await?;
println!("\n4. Mixed Stream (raw + msgpack + tables interleaved)");
mixed_stream().await?;
println!("\nAll Lightstream protocol examples completed successfully!");
Ok(())
}
async fn raw_messages() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let server = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let mut conn = TcpLightstreamConnection::from_tcp(stream);
conn.register_message("ping");
conn.register_message("pong");
let msg = conn.recv().await.unwrap().unwrap();
assert!(msg.is_message());
let payload = msg.payload().unwrap();
println!(
" Server received: {:?}",
std::str::from_utf8(payload).unwrap()
);
conn.send("pong", b"pong-reply").await.unwrap();
conn.flush().await.unwrap();
conn.shutdown().await.unwrap();
});
let client_stream = tokio::net::TcpStream::connect(addr).await?;
let mut client = TcpLightstreamConnection::from_tcp(client_stream);
client.register_message("ping");
client.register_message("pong");
client.send("ping", b"hello-server").await?;
client.flush().await?;
let msg = client.recv().await.unwrap()?;
assert!(msg.is_message());
let payload = msg.payload().unwrap();
println!(
" Client received: {:?}",
std::str::from_utf8(payload).unwrap()
);
client.shutdown().await?;
server.await?;
println!(" Raw message round-trip complete.");
Ok(())
}
async fn msgpack_messages() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let server = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let mut conn = TcpLightstreamConnection::from_tcp(stream);
conn.register_message("command");
for _ in 0..2 {
let msg = conn.recv().await.unwrap().unwrap();
let cmd: Command = msg.decode_msgpack().unwrap();
println!(" Server decoded: {:?}", cmd);
}
});
let client_stream = tokio::net::TcpStream::connect(addr).await?;
let mut client = TcpLightstreamConnection::from_tcp(client_stream);
client.register_message("command");
let cmd1 = Command {
action: "start".into(),
timestamp_ms: 1_700_000_000_000,
params: vec!["--verbose".into()],
};
let cmd2 = Command {
action: "stop".into(),
timestamp_ms: 1_700_000_001_000,
params: vec![],
};
client.send_msgpack("command", &cmd1).await?;
client.send_msgpack("command", &cmd2).await?;
client.flush().await?;
client.shutdown().await?;
server.await?;
println!(" Msgpack round-trip complete.");
Ok(())
}
async fn arrow_tables() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let server = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let mut conn = TcpLightstreamConnection::from_tcp(stream);
conn.register_table("metrics", table_schema());
for i in 0..2 {
let msg = conn.recv().await.unwrap().unwrap();
let table = msg.table().unwrap();
println!(
" Server received table batch {}: {} rows, {} cols",
i + 1,
table.n_rows,
table.cols.len()
);
assert_eq!(table.cols[0].field.name, "id");
assert_eq!(table.cols[1].field.name, "value");
assert_eq!(table.cols[2].field.name, "label");
}
});
let client_stream = tokio::net::TcpStream::connect(addr).await?;
let mut client = TcpLightstreamConnection::from_tcp(client_stream);
client.register_table("metrics", table_schema());
client
.send_table("metrics", &make_table("batch1", 5))
.await?;
client
.send_table("metrics", &make_table("batch2", 3))
.await?;
client.flush().await?;
client.shutdown().await?;
server.await?;
println!(" Arrow table round-trip complete (schema persisted across batches).");
Ok(())
}
async fn mixed_stream() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let server = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let mut conn = TcpLightstreamConnection::from_tcp(stream);
conn.register_message("raw");
conn.register_message("command");
conn.register_table("metrics", table_schema());
let mut raw_count = 0u32;
let mut msgpack_count = 0u32;
let mut table_count = 0u32;
while let Some(result) = conn.recv().await {
let msg = result.unwrap();
match msg {
LightstreamMessage::Table { table, .. } => {
table_count += 1;
println!(" Server got table: {} rows", table.n_rows);
}
LightstreamMessage::Message { tag, ref payload } => {
if tag == 0 {
raw_count += 1;
println!(
" Server got raw: {:?}",
std::str::from_utf8(payload).unwrap()
);
} else {
msgpack_count += 1;
let cmd: Command = msg.decode_msgpack().unwrap();
println!(" Server got command: {:?}", cmd);
}
}
}
}
println!(
" Server totals: {} raw, {} msgpack, {} table",
raw_count, msgpack_count, table_count
);
});
let client_stream = tokio::net::TcpStream::connect(addr).await?;
let mut client = TcpLightstreamConnection::from_tcp(client_stream);
client.register_message("raw");
client.register_message("command");
client.register_table("metrics", table_schema());
client.send("raw", b"first-raw-message").await?;
client
.send_msgpack(
"command",
&Command {
action: "deploy".into(),
timestamp_ms: 1_700_000_002_000,
params: vec!["--region=us-east".into()],
},
)
.await?;
client
.send_table("metrics", &make_table("mixed", 4))
.await?;
client.send("raw", b"second-raw-message").await?;
client
.send_msgpack(
"command",
&Command {
action: "rollback".into(),
timestamp_ms: 1_700_000_003_000,
params: vec![],
},
)
.await?;
client
.send_table("metrics", &make_table("mixed2", 2))
.await?;
client.flush().await?;
client.shutdown().await?;
server.await?;
println!(" Mixed stream complete.");
Ok(())
}