use std::error::Error as StdError;
use rlmesh_proto::env::v1::JoinResponse;
use tokio::sync::mpsc;
use tonic::Status;
pub(super) fn spawn_response_pump(
mut response_stream: tonic::Streaming<JoinResponse>,
) -> mpsc::Receiver<Result<JoinResponse, Status>> {
let (resp_tx, resp_rx) = mpsc::channel::<Result<JoinResponse, Status>>(32);
tokio::spawn(async move {
loop {
match response_stream.message().await {
Ok(Some(msg)) => {
if resp_tx.send(Ok(msg)).await.is_err() {
tracing::warn!("join stream receiver dropped; stopping response pump");
break;
}
}
Ok(None) => {
tracing::debug!("env join stream ended");
break;
}
Err(error) => {
tracing::error!(
code = ?error.code(),
message = %error.message(),
source = ?error.source(),
"join stream error from env server"
);
let _ = resp_tx.send(Err(error)).await;
break;
}
}
}
});
resp_rx
}