use hyperdb_api::{HyperProcess, ListenMode, Parameters, Result};
fn grpc_hyperd() -> Result<(HyperProcess, String)> {
let mut params = Parameters::new();
params.set("log_dir", "test_results");
params.set_listen_mode(ListenMode::Grpc { port: 0 });
let hyper = HyperProcess::new(None, Some(¶ms))?;
let url = hyper
.grpc_url()
.expect("hyperd should expose a gRPC URL when listen_mode=Grpc");
Ok((hyper, url))
}
#[test]
fn test_grpc_stream_chunks_decode_correctly() -> Result<()> {
let (_hyper, url) = grpc_hyperd()?;
let config = hyperdb_api_core::client::grpc::GrpcConfig::new(&url);
let mut client = hyperdb_api_core::client::grpc::GrpcClientSync::connect(config)?;
let stream = client.execute_query_stream("SELECT i FROM generate_series(1, 50000) AS s(i)")?;
struct Counter {
inner: hyperdb_api_core::client::grpc::GrpcChunkStreamSync,
seen: usize,
}
impl hyperdb_api::ChunkSource for Counter {
fn next_chunk(&mut self) -> hyperdb_api::Result<Option<bytes::Bytes>> {
match self.inner.next_chunk()? {
Some(b) => {
self.seen += 1;
Ok(Some(b))
}
None => Ok(None),
}
}
}
let source = Box::new(Counter {
inner: stream,
seen: 0,
});
let mut rowset = hyperdb_api::ArrowRowset::from_stream(source)?;
let mut total_rows = 0usize;
while let Some(chunk) = rowset.next_chunk()? {
total_rows += chunk.len();
}
assert_eq!(total_rows, 50_000, "streamed row count must match query");
Ok(())
}