use std::time::Duration;
use re_chunk::ChunkBatcherConfig;
use re_log_types::LogMsg;
use crate::sink::SinkFlushError;
pub struct GrpcServerSink {
uri: re_uri::ProxyUri,
sender: re_smart_channel::Sender<LogMsg>,
_server_handle: std::thread::JoinHandle<()>,
server_shutdown_signal: re_grpc_server::shutdown::Signal,
}
impl GrpcServerSink {
pub fn new(
bind_ip: &str,
grpc_port: u16,
server_options: re_grpc_server::ServerOptions,
) -> Result<Self, std::net::AddrParseError> {
let (server_shutdown_signal, shutdown) = re_grpc_server::shutdown::shutdown();
let grpc_server_addr = format!("{bind_ip}:{grpc_port}").parse()?;
let uri = re_uri::ProxyUri::new(re_uri::Origin::from_scheme_and_socket_addr(
re_uri::Scheme::RerunHttp,
grpc_server_addr,
));
let (channel_tx, channel_rx) = re_smart_channel::smart_channel::<re_log_types::LogMsg>(
re_smart_channel::SmartMessageSource::MessageProxy(uri.clone()),
re_smart_channel::SmartChannelSource::Sdk,
);
let server_handle = std::thread::Builder::new()
.name("message_proxy_server".to_owned())
.spawn(move || {
let mut builder = tokio::runtime::Builder::new_current_thread();
builder.enable_all();
let rt = builder.build().expect("failed to build tokio runtime");
rt.block_on(re_grpc_server::serve_from_channel(
grpc_server_addr,
server_options,
shutdown,
channel_rx,
));
})
.expect("failed to spawn thread for message proxy server");
Ok(Self {
uri,
sender: channel_tx,
_server_handle: server_handle,
server_shutdown_signal,
})
}
pub fn uri(&self) -> re_uri::ProxyUri {
self.uri.clone()
}
}
impl crate::sink::LogSink for GrpcServerSink {
fn send(&self, msg: LogMsg) {
if let Err(err) = self.sender.send(msg) {
re_log::error_once!("Failed to send log message to gRPC server: {err}");
}
}
#[inline]
fn flush_blocking(&self, timeout: Duration) -> Result<(), SinkFlushError> {
self.sender
.flush_blocking(timeout)
.map_err(|err| match err {
re_smart_channel::FlushError::Closed => {
SinkFlushError::failed("gRPC server thread shut down")
}
re_smart_channel::FlushError::Timeout => SinkFlushError::Timeout,
})
}
fn default_batcher_config(&self) -> ChunkBatcherConfig {
ChunkBatcherConfig::LOW_LATENCY
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
impl Drop for GrpcServerSink {
fn drop(&mut self) {
if let Err(err) = self.sender.flush_blocking(Duration::MAX) {
re_log::error!("Failed to flush gRPC queue: {err}");
}
self.server_shutdown_signal.stop();
}
}