1pub mod shutdown;
4
5use std::collections::VecDeque;
6use std::net::SocketAddr;
7use std::pin::Pin;
8
9use re_byte_size::SizeBytes;
10use re_log_channel::{DataSourceMessage, DataSourceUiCommand};
11use re_log_encoding::{ToApplication as _, ToTransport as _};
12use re_log_types::TableMsg;
13use re_protos::common::v1alpha1::{
14 DataframePart as DataframePartProto, StoreKind as StoreKindProto, TableId as TableIdProto,
15};
16use re_protos::log_msg::v1alpha1::LogMsg as LogMsgProto;
17use re_protos::sdk_comms::v1alpha1::{
18 ReadMessagesRequest, ReadMessagesResponse, ReadTablesRequest, ReadTablesResponse,
19 SaveScreenshotRequest, SaveScreenshotResponse, WriteMessagesRequest, WriteMessagesResponse,
20 WriteTableRequest, WriteTableResponse, message_proxy_service_server,
21};
22use re_quota_channel::{async_broadcast_channel, async_mpsc_channel};
23use std::task::{Context, Poll};
24use tokio::net::TcpListener;
25use tokio::sync::oneshot;
26use tokio_stream::{Stream, StreamExt as _};
27use tonic::transport::Server;
28use tonic::transport::server::TcpIncoming;
29use tower_http::cors::CorsLayer;
30
31use crate::priority_stream::PriorityMerge;
32
33mod priority_stream;
34
35pub use re_memory::MemoryLimit;
36
37pub const DEFAULT_SERVER_PORT: u16 = 9876;
39
40pub const MAX_DECODING_MESSAGE_SIZE: usize = u32::MAX as usize;
41pub const MAX_ENCODING_MESSAGE_SIZE: usize = MAX_DECODING_MESSAGE_SIZE;
42
43const CHANNEL_SIZE_MESSAGES: usize = 1024; const CHANNEL_SIZE_BYTES: u64 = 128 * 1024 * 1024; #[derive(Clone, Copy, Debug)]
52pub struct ServerOptions {
53 pub playback_behavior: PlaybackBehavior,
55
56 pub memory_limit: MemoryLimit, }
61
62impl Default for ServerOptions {
63 fn default() -> Self {
64 Self {
65 playback_behavior: PlaybackBehavior::OldestFirst,
66 memory_limit: MemoryLimit::from_bytes(1024 * 1024 * 1024), }
68 }
69}
70
71#[derive(Clone, Copy, Debug)]
73pub enum PlaybackBehavior {
74 OldestFirst,
77
78 NewestFirst,
81}
82
83impl PlaybackBehavior {
84 pub fn from_newest_first(newest_first: bool) -> Self {
85 if newest_first {
86 Self::NewestFirst
87 } else {
88 Self::OldestFirst
89 }
90 }
91}
92
93#[derive(Debug)]
95pub struct TonicStatusError(pub tonic::Status);
96
97impl std::fmt::Display for TonicStatusError {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 let status = &self.0;
101
102 write!(f, "gRPC error")?;
103
104 if status.code() != tonic::Code::Unknown {
105 write!(f, ", code: '{}'", status.code())?;
106 }
107 if !status.message().is_empty() {
108 write!(f, ", message: {:?}", status.message())?;
109 }
110 if !status.metadata().is_empty() {
115 write!(f, ", metadata: {:?}", status.metadata().as_ref())?;
116 }
117 Ok(())
118 }
119}
120
121impl From<tonic::Status> for TonicStatusError {
122 fn from(value: tonic::Status) -> Self {
123 Self(value)
124 }
125}
126
127pub async fn serve(
146 addr: SocketAddr,
147 options: ServerOptions,
148 shutdown: shutdown::Shutdown,
149) -> anyhow::Result<()> {
150 serve_impl(addr, options, MessageProxy::new(options), shutdown).await
151}
152
153async fn serve_impl(
154 addr: SocketAddr,
155 options: ServerOptions,
156 message_proxy: MessageProxy,
157 shutdown: shutdown::Shutdown,
158) -> anyhow::Result<()> {
159 let dual_stack_windows = cfg!(target_os = "windows")
164 && matches!(addr.ip(), std::net::IpAddr::V6(ipv6) if ipv6.is_unspecified());
165
166 let incoming: Pin<Box<dyn Stream<Item = _> + Send>> = if dual_stack_windows {
167 let ipv6_addr = addr;
168 let ipv4_addr = SocketAddr::V4(std::net::SocketAddrV4::new(
169 std::net::Ipv4Addr::UNSPECIFIED,
170 addr.port(),
171 ));
172
173 let tcp_listener_ipv6 = TcpListener::bind(ipv6_addr).await?;
174 let tcp_listener_ipv4 = TcpListener::bind(ipv4_addr).await?;
175
176 let incoming_ipv6 = TcpIncoming::from(tcp_listener_ipv6).with_nodelay(Some(true));
177 let incoming_ipv4 = TcpIncoming::from(tcp_listener_ipv4).with_nodelay(Some(true));
178
179 let merged = tokio_stream::StreamExt::merge(incoming_ipv6, incoming_ipv4);
181
182 let connect_addr = format!("rerun+http://127.0.0.1:{}/proxy", addr.port());
183
184 re_log::info!(
185 "Listening for gRPC connections on {ipv6_addr} and {ipv4_addr}. Connect by running `rerun --connect {connect_addr}`",
186 );
187
188 Box::pin(merged)
189 } else {
190 let tcp_listener = TcpListener::bind(addr).await?;
191 let incoming = TcpIncoming::from(tcp_listener).with_nodelay(Some(true));
192
193 let connect_addr = if addr.ip().is_loopback() || addr.ip().is_unspecified() {
194 format!("rerun+http://127.0.0.1:{}/proxy", addr.port())
195 } else {
196 format!("rerun+http://{addr}/proxy")
197 };
198
199 re_log::info!(
200 "Listening for gRPC connections on {addr}. Connect by running `rerun --connect {connect_addr}`",
201 );
202
203 Box::pin(incoming)
204 };
205
206 re_log::debug!("Server memory limit set at {}", options.memory_limit);
207
208 let cors = CorsLayer::very_permissive();
209 let grpc_web = tonic_web::GrpcWebLayer::new();
210
211 let routes = {
212 let mut routes_builder = tonic::service::Routes::builder();
213 routes_builder.add_service(
214 re_protos::sdk_comms::v1alpha1::message_proxy_service_server::MessageProxyServiceServer::new(
215 message_proxy,
216 )
217 .max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE)
218 .max_encoding_message_size(MAX_ENCODING_MESSAGE_SIZE),
219 );
220 routes_builder.routes()
221 };
222
223 Server::builder()
224 .accept_http1(true) .layer(cors) .layer(grpc_web) .add_routes(routes)
228 .serve_with_incoming_shutdown(incoming, shutdown.wait())
229 .await?;
230
231 Ok(())
232}
233
234pub async fn serve_from_channel(
245 addr: SocketAddr,
246 options: ServerOptions,
247 shutdown: shutdown::Shutdown,
248 channel_rx: re_log_channel::LogReceiver,
249) {
250 let message_proxy = MessageProxy::new(options);
251 let event_tx = message_proxy.event_tx.clone();
252
253 tokio::task::spawn_blocking(move || {
254 use re_log_channel::SmartMessagePayload;
255
256 loop {
257 let msg = if let Ok(msg) = channel_rx.recv() {
258 match msg.payload {
259 SmartMessagePayload::Msg(msg) => msg,
260 SmartMessagePayload::Flush { on_flush_done } => {
261 on_flush_done(); continue;
263 }
264 SmartMessagePayload::Quit(err) => {
265 if let Some(err) = err {
266 re_log::debug!("smart channel sender quit: {err}");
267 } else {
268 re_log::debug!("smart channel sender quit");
269 }
270 break;
271 }
272 }
273 } else {
274 re_log::debug!("smart channel sender closed, closing receiver");
275 break;
276 };
277
278 match msg {
279 DataSourceMessage::LogMsg(msg) => {
280 let msg = match msg.to_transport(re_log_encoding::rrd::Compression::LZ4) {
281 Ok(msg) => msg,
282 Err(err) => {
283 re_log::error!("failed to encode message: {err}");
284 continue;
285 }
286 };
287
288 if event_tx
289 .blocking_send(Event::Message(LogOrTableMsgProto::LogMsg(msg.into())))
290 .is_err()
291 {
292 re_log::debug!("shut down, closing sender");
293 break;
294 }
295 }
296 unsupported => {
297 re_log::error_once!(
298 "Not implemented: re_grpc_server support for {}",
299 unsupported.variant_name()
300 );
301 }
302 }
303 }
304 });
305
306 if let Err(err) = serve_impl(addr, options, message_proxy, shutdown).await {
307 re_log::error!("message proxy server crashed: {err}");
308 }
309}
310
311pub fn spawn_from_rx_set(
320 addr: SocketAddr,
321 options: ServerOptions,
322 shutdown: shutdown::Shutdown,
323 rxs: re_log_channel::LogReceiverSet,
324) {
325 let message_proxy = MessageProxy::new(options);
326 let event_tx = message_proxy.event_tx.clone();
327
328 tokio::spawn(async move {
329 if let Err(err) = serve_impl(addr, options, message_proxy, shutdown).await {
330 re_log::error!("message proxy server crashed: {err}");
331 }
332 });
333
334 tokio::task::spawn_blocking(move || {
335 use re_log_channel::SmartMessagePayload;
336
337 loop {
338 let msg = if let Ok(msg) = rxs.recv() {
339 match msg.payload {
340 SmartMessagePayload::Msg(msg) => msg,
341 SmartMessagePayload::Flush { on_flush_done } => {
342 on_flush_done(); continue;
344 }
345 SmartMessagePayload::Quit(err) => {
346 if let Some(err) = err {
347 re_log::debug!("smart channel sender quit: {err}");
348 } else {
349 re_log::debug!("smart channel sender quit");
350 }
351 if rxs.is_empty() {
352 break;
354 }
355 continue;
356 }
357 }
358 } else {
359 if rxs.is_empty() {
360 break;
362 }
363 continue;
364 };
365
366 match msg {
367 DataSourceMessage::LogMsg(msg) => {
368 let msg = match msg.to_transport(re_log_encoding::rrd::Compression::LZ4) {
369 Ok(msg) => msg,
370 Err(err) => {
371 re_log::error!("failed to encode message: {err}");
372 continue;
373 }
374 };
375
376 if event_tx
377 .blocking_send(Event::Message(LogOrTableMsgProto::LogMsg(msg.into())))
378 .is_err()
379 {
380 re_log::debug!("shut down, closing sender");
381 break;
382 }
383 }
384 unsupported => {
385 re_log::error_once!(
386 "gRPC proxy server cannot forward {}",
387 unsupported.variant_name()
388 );
389 }
390 }
391 }
392 });
393}
394
395pub fn spawn_with_recv(
407 addr: SocketAddr,
408 options: ServerOptions,
409 shutdown: shutdown::Shutdown,
410) -> re_log_channel::LogReceiver {
411 let uri = re_uri::ProxyUri::new(re_uri::Origin::from_scheme_and_socket_addr(
412 re_uri::Scheme::RerunHttp,
413 addr,
414 ));
415
416 let (channel_log_tx, channel_log_rx) =
417 re_log_channel::log_channel(re_log_channel::LogSource::MessageProxy(uri));
418
419 let (message_proxy, mut broadcast_log_rx) = MessageProxy::new_with_recv(options);
420
421 tokio::spawn(async move {
422 if let Err(err) = serve_impl(addr, options, message_proxy, shutdown).await {
423 re_log::error!("message proxy server crashed: {err}");
424 }
425 });
426
427 tokio::spawn(async move {
428 let mut app_id_cache = re_log_encoding::CachingApplicationIdInjector::default();
429
430 loop {
431 let msg: anyhow::Result<DataSourceMessage> = match broadcast_log_rx.recv().await {
432 Ok(inner) => match inner {
433 LogOrTableMsgProto::LogMsg(msg) => match msg.msg {
434 Some(msg) => msg
435 .to_application((&mut app_id_cache, None))
436 .map(DataSourceMessage::LogMsg)
437 .map_err(|err| err.into()),
438 None => Err(re_protos::missing_field!(
439 re_protos::log_msg::v1alpha1::LogMsg,
440 "msg"
441 )
442 .into()),
443 },
444
445 LogOrTableMsgProto::Table(msg) => match msg.data.try_into() {
446 Ok(data) => Ok(DataSourceMessage::TableMsg(TableMsg {
447 id: msg.id.into(),
448 data,
449 })),
450 Err(err) => {
451 re_log::error!("Dropping LogMsg::Table due to failed decode: {err}");
452 continue;
453 }
454 },
455
456 LogOrTableMsgProto::UiCommand(cmd) => Ok(DataSourceMessage::UiCommand(cmd)),
457 },
458
459 Err(async_broadcast_channel::RecvError::Closed) => {
460 re_log::debug!("message proxy server shut down, closing receiver");
461 channel_log_tx.quit(None).ok();
462 break;
463 }
464 };
465 match msg {
466 Ok(mut log_msg) => {
467 if let Some(metadata_key) =
468 re_sorbet::TimestampLocation::IPCDecode.metadata_key()
469 {
470 log_msg.insert_arrow_record_batch_metadata(
474 metadata_key.to_owned(),
475 re_sorbet::timestamp_metadata::now_timestamp(),
476 );
477 }
478
479 if channel_log_tx.send(log_msg).is_err() {
480 re_log::debug!(
481 "message proxy smart channel receiver closed, closing sender"
482 );
483 break;
484 }
485 }
486 Err(err) => {
487 re_log::error!("dropping LogMsg due to failed decode: {err}");
488 }
489 }
490 }
491 });
492
493 channel_log_rx
494}
495
496enum Event {
497 NewClient(
499 oneshot::Sender<(
500 Vec<LogOrTableMsgProto>,
501 async_broadcast_channel::Receiver<LogOrTableMsgProto>,
502 )>,
503 ),
504
505 Message(LogOrTableMsgProto),
507}
508
509#[derive(Clone)]
510struct TableMsgProto {
511 id: TableIdProto,
512 data: DataframePartProto,
513}
514#[derive(Clone)]
517enum LogOrTableMsgProto {
518 LogMsg(LogMsgProto),
519 Table(TableMsgProto),
520 UiCommand(DataSourceUiCommand),
521}
522
523impl SizeBytes for LogOrTableMsgProto {
524 fn heap_size_bytes(&self) -> u64 {
525 match self {
526 Self::LogMsg(log_msg) => log_msg.heap_size_bytes(),
527 Self::Table(table) => table.heap_size_bytes(),
528 Self::UiCommand(cmd) => cmd.heap_size_bytes(),
529 }
530 }
531}
532
533impl From<LogMsgProto> for LogOrTableMsgProto {
534 fn from(value: LogMsgProto) -> Self {
535 Self::LogMsg(value)
536 }
537}
538
539impl From<TableMsgProto> for LogOrTableMsgProto {
540 fn from(value: TableMsgProto) -> Self {
541 Self::Table(value)
542 }
543}
544
545impl From<DataSourceUiCommand> for LogOrTableMsgProto {
546 fn from(value: DataSourceUiCommand) -> Self {
547 Self::UiCommand(value)
548 }
549}
550
551#[derive(Default)]
554struct MsgQueue {
555 queue: VecDeque<LogOrTableMsgProto>,
557
558 size_bytes: u64,
560}
561
562impl MsgQueue {
563 pub fn iter(&self) -> impl DoubleEndedIterator<Item = &LogOrTableMsgProto> {
564 self.queue.iter()
565 }
566
567 pub fn push_back(&mut self, msg: LogOrTableMsgProto) {
568 self.size_bytes += msg.total_size_bytes();
569 self.queue.push_back(msg);
570 }
571
572 pub fn pop_front(&mut self) -> Option<LogOrTableMsgProto> {
573 if let Some(msg) = self.queue.pop_front() {
574 self.size_bytes -= msg.total_size_bytes();
575 Some(msg)
576 } else {
577 None
578 }
579 }
580}
581
582#[derive(Default)]
587struct MessageBuffer {
588 disposable: MsgQueue,
592
593 static_: MsgQueue,
609
610 persistent: MsgQueue,
612}
613
614impl MessageBuffer {
615 fn size_bytes(&self) -> u64 {
616 let Self {
617 disposable,
618 static_,
619 persistent,
620 } = self;
621 disposable.size_bytes + static_.size_bytes + persistent.size_bytes
622 }
623
624 fn all(&self, playback_behavior: PlaybackBehavior) -> Vec<LogOrTableMsgProto> {
625 re_tracing::profile_function!();
626
627 let Self {
628 disposable,
629 static_,
630 persistent,
631 } = self;
632
633 match playback_behavior {
637 PlaybackBehavior::OldestFirst => {
638 itertools::chain!(persistent.iter(), static_.iter(), disposable.iter())
639 .cloned()
640 .collect()
641 }
642 PlaybackBehavior::NewestFirst => itertools::chain!(
643 persistent.iter().rev(),
644 static_.iter().rev(),
645 disposable.iter().rev()
646 )
647 .cloned()
648 .collect(),
649 }
650 }
651
652 fn add_msg(&mut self, msg: LogOrTableMsgProto) {
653 match msg {
654 LogOrTableMsgProto::LogMsg(msg) => self.add_log_msg(msg),
655 LogOrTableMsgProto::Table(msg) => {
656 self.disposable.push_back(msg.into());
657 }
658 LogOrTableMsgProto::UiCommand(msg) => {
659 self.disposable.push_back(msg.into());
660 }
661 }
662 }
663
664 fn add_log_msg(&mut self, msg: LogMsgProto) {
665 let Some(inner) = &msg.msg else {
666 re_log::error!(
667 "{}",
668 re_protos::missing_field!(re_protos::log_msg::v1alpha1::LogMsg, "msg")
669 );
670 return;
671 };
672
673 use re_protos::log_msg::v1alpha1::log_msg::Msg;
676 match inner {
677 Msg::SetStoreInfo(..) | Msg::BlueprintActivationCommand(..) => {
679 self.persistent.push_back(msg.into());
680 }
681
682 Msg::ArrowMsg(inner) => {
683 let is_blueprint = inner
684 .store_id
685 .as_ref()
686 .is_some_and(|id| id.kind() == StoreKindProto::Blueprint);
687
688 if is_blueprint {
689 self.persistent.push_back(msg.into());
691 } else if inner.is_static == Some(true) {
692 self.static_.push_back(msg.into());
693 } else {
694 self.disposable.push_back(msg.into());
696 }
697 }
698 }
699 }
700
701 pub fn gc(&mut self, max_bytes: u64) {
702 if self.size_bytes() <= max_bytes {
703 return;
705 }
706
707 re_tracing::profile_scope!("Drop messages");
708 re_log::info_once!(
709 "Exceeded gRPC proxy server memory limit ({}). Dropping the oldest log messages. Clients connecting after this will not see the full history.",
710 re_format::format_bytes(max_bytes as _)
711 );
712
713 let start_size = self.size_bytes();
714 let mut messages_dropped = 0;
715
716 while self.disposable.pop_front().is_some() {
717 messages_dropped += 1;
718 if self.size_bytes() < max_bytes {
719 break;
720 }
721 }
722
723 if max_bytes < self.size_bytes() {
724 re_log::info_once!(
725 "Exceeded gRPC proxy server memory limit ({}). Dropping old *static* log messages as well. Clients connecting after this will no longer see the complete set of static data.",
726 re_format::format_bytes(max_bytes as _)
727 );
728 while self.static_.pop_front().is_some() {
729 messages_dropped += 1;
730 if self.size_bytes() < max_bytes {
731 break;
732 }
733 }
734 }
735
736 let bytes_dropped = start_size - self.size_bytes();
737
738 re_log::trace!(
739 "Dropped {} bytes in {messages_dropped} message(s)",
740 re_format::format_bytes(bytes_dropped as _)
741 );
742
743 if max_bytes < self.size_bytes() {
744 re_log::warn_once!(
745 "The gRPC server is using more memory than the given memory limit ({}), despite having garbage-collected all non-persistent messages.",
746 re_format::format_bytes(max_bytes as _)
747 );
748 }
749 }
750}
751
752struct BackPressureReceiverStream<T: Clone + SizeBytes + Send + Sync + 'static> {
759 inner: Pin<Box<dyn Stream<Item = Result<T, async_broadcast_channel::RecvError>> + Send>>,
760}
761
762impl<T: Clone + SizeBytes + Send + Sync + 'static> BackPressureReceiverStream<T> {
763 fn new(mut receiver: async_broadcast_channel::Receiver<T>) -> Self {
764 let stream = async_stream::stream! {
765 while let Ok(value) = receiver.recv().await {
766 yield Ok(value);
767 }
768 };
769 Self {
770 inner: Box::pin(stream),
771 }
772 }
773}
774
775impl<T: Clone + SizeBytes + Send + Sync + 'static> Stream for BackPressureReceiverStream<T> {
776 type Item = Result<T, async_broadcast_channel::RecvError>;
777
778 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
779 self.inner.as_mut().poll_next(cx)
780 }
781}
782
783struct EventLoop {
789 options: ServerOptions,
790
791 broadcast_log_tx: async_broadcast_channel::Sender<LogOrTableMsgProto>,
794
795 event_rx: async_mpsc_channel::Receiver<Event>,
797
798 history: MessageBuffer,
800}
801
802impl EventLoop {
803 fn new(
804 options: ServerOptions,
805 event_rx: async_mpsc_channel::Receiver<Event>,
806 broadcast_log_tx: async_broadcast_channel::Sender<LogOrTableMsgProto>,
807 ) -> Self {
808 Self {
809 options,
810 broadcast_log_tx,
811 event_rx,
812 history: Default::default(),
813 }
814 }
815
816 async fn run_in_place(mut self) {
817 loop {
818 let Some(event) = self.event_rx.recv().await else {
819 break;
820 };
821
822 match event {
823 Event::NewClient(channel) => {
824 channel
825 .send((
826 self.history.all(self.options.playback_behavior),
827 self.broadcast_log_tx.subscribe(),
828 ))
829 .ok();
830 }
831 Event::Message(msg) => self.handle_msg(msg).await,
832 }
833 }
834 }
835
836 async fn handle_msg(&mut self, msg: LogOrTableMsgProto) {
837 self.broadcast_log_tx.send_async(msg.clone()).await.ok();
839
840 if !self.is_history_enabled() {
841 return;
843 }
844
845 self.gc_if_using_too_much_ram();
846
847 self.history.add_msg(msg);
848 }
849
850 fn is_history_enabled(&self) -> bool {
851 self.options.memory_limit != MemoryLimit::ZERO
852 }
853
854 fn gc_if_using_too_much_ram(&mut self) {
855 if self.options.memory_limit.is_limited() {
856 self.history.gc(self.options.memory_limit.as_bytes());
857 }
858 }
859}
860
861impl SizeBytes for TableMsgProto {
862 fn heap_size_bytes(&self) -> u64 {
863 let Self { id, data } = self;
864 id.heap_size_bytes() + data.heap_size_bytes()
865 }
866}
867
868pub struct MessageProxy {
869 options: ServerOptions,
870 _queue_task_handle: tokio::task::JoinHandle<()>,
871 event_tx: async_mpsc_channel::Sender<Event>,
872}
873
874impl MessageProxy {
875 pub fn new(options: ServerOptions) -> Self {
876 Self::new_with_recv(options).0
877 }
878
879 fn new_with_recv(
880 options: ServerOptions,
881 ) -> (Self, async_broadcast_channel::Receiver<LogOrTableMsgProto>) {
882 let (broadcast_log_tx, broadcast_log_rx) = async_broadcast_channel::channel(
883 "re_grpc_server broadcast",
884 CHANNEL_SIZE_MESSAGES,
885 CHANNEL_SIZE_BYTES,
886 );
887
888 let (event_tx, event_rx) = {
889 let message_queue_capacity = 32; async_mpsc_channel::channel("re_grpc_server events", message_queue_capacity)
892 };
893
894 let task_handle = tokio::spawn(async move {
895 EventLoop::new(options, event_rx, broadcast_log_tx)
896 .run_in_place()
897 .await;
898 });
899
900 (
901 Self {
902 options,
903 _queue_task_handle: task_handle,
904 event_tx,
905 },
906 broadcast_log_rx,
907 )
908 }
909
910 async fn push_message(&self, message: impl Into<LogOrTableMsgProto>) {
911 let message = message.into();
912 self.event_tx.send(Event::Message(message)).await.ok();
913 }
914
915 async fn new_client_message_stream(&self) -> ReadMsgStream {
916 let (sender, receiver) = oneshot::channel();
917 if let Err(err) = self.event_tx.send(Event::NewClient(sender)).await {
918 re_log::error!("Error accepting new client: {err}");
919 return Box::pin(tokio_stream::empty());
920 }
921 let (history, msg_channel) = match receiver.await {
922 Ok(v) => v,
923 Err(err) => {
924 re_log::error!("Error accepting new client: {err}");
925 return Box::pin(tokio_stream::empty());
926 }
927 };
928
929 let history = tokio_stream::iter(
930 history
931 .into_iter()
932 .map(ReadLogOrTableMsgResponse::from)
933 .map(Ok),
934 );
935
936 let channel = BackPressureReceiverStream::new(msg_channel).map(|result| {
938 result.map(ReadLogOrTableMsgResponse::from).map_err(|err| {
939 re_log::error!("Error reading message from broadcast channel: {err}");
940 tonic::Status::internal(format!("internal channel error: {err}"))
941 })
942 });
943
944 match self.options.playback_behavior {
945 PlaybackBehavior::OldestFirst => Box::pin(history.chain(channel)),
946 PlaybackBehavior::NewestFirst => Box::pin(PriorityMerge::new(channel, history)),
947 }
948 }
949
950 async fn new_client_log_stream(&self) -> ReadLogStream {
951 Box::pin(
952 self.new_client_message_stream()
953 .await
954 .filter_map(|msg| match msg {
955 Ok(ReadLogOrTableMsgResponse::LogMsg(msg)) => Some(Ok(msg)),
956 Ok(ReadLogOrTableMsgResponse::TableMsg(_)) => {
957 re_log::warn_once!("A log stream got a TableMsg");
958 None
959 }
960 Ok(ReadLogOrTableMsgResponse::UiCommand) => {
961 re_log::warn_once!("A log stream got a UiCommandMsg");
962 None
963 }
964 Err(err) => Some(Err(err)),
965 }),
966 )
967 }
968
969 async fn new_client_table_stream(&self) -> ReadTablesStream {
970 Box::pin(
971 self.new_client_message_stream()
972 .await
973 .filter_map(|msg| match msg {
974 Ok(ReadLogOrTableMsgResponse::LogMsg(_)) => {
975 re_log::warn_once!("A table stream got a LogMsg");
976 None
977 }
978 Ok(ReadLogOrTableMsgResponse::TableMsg(msg)) => Some(Ok(msg)),
979 Ok(ReadLogOrTableMsgResponse::UiCommand) => {
980 re_log::warn_once!("A log stream got a UiCommandMsg");
981 None
982 }
983 Err(err) => Some(Err(err)),
984 }),
985 )
986 }
987}
988
989enum ReadLogOrTableMsgResponse {
990 LogMsg(ReadMessagesResponse),
991 TableMsg(ReadTablesResponse),
992 UiCommand,
993}
994
995impl From<LogOrTableMsgProto> for ReadLogOrTableMsgResponse {
996 fn from(proto: LogOrTableMsgProto) -> Self {
997 match proto {
998 LogOrTableMsgProto::LogMsg(log_msg) => Self::LogMsg(ReadMessagesResponse {
999 log_msg: Some(log_msg),
1000 }),
1001 LogOrTableMsgProto::Table(table_msg) => Self::TableMsg(ReadTablesResponse {
1002 id: Some(table_msg.id),
1003 data: Some(table_msg.data),
1004 }),
1005 LogOrTableMsgProto::UiCommand(_ui_command) => Self::UiCommand,
1006 }
1007 }
1008}
1009
1010type ReadLogStream = Pin<Box<dyn Stream<Item = tonic::Result<ReadMessagesResponse>> + Send>>;
1011type ReadTablesStream = Pin<Box<dyn Stream<Item = tonic::Result<ReadTablesResponse>> + Send>>;
1012
1013type ReadMsgStream = Pin<Box<dyn Stream<Item = tonic::Result<ReadLogOrTableMsgResponse>> + Send>>;
1014
1015#[tonic::async_trait]
1016impl message_proxy_service_server::MessageProxyService for MessageProxy {
1017 async fn write_messages(
1018 &self,
1019 request: tonic::Request<tonic::Streaming<WriteMessagesRequest>>,
1020 ) -> tonic::Result<tonic::Response<WriteMessagesResponse>> {
1021 let mut stream = request.into_inner();
1022 loop {
1023 match stream.message().await {
1024 Ok(Some(WriteMessagesRequest {
1025 log_msg: Some(log_msg),
1026 })) => {
1027 self.push_message(log_msg).await;
1028 }
1029
1030 Ok(Some(WriteMessagesRequest { log_msg: None })) => {
1031 re_log::warn!("missing log_msg in WriteMessagesRequest");
1032 }
1033
1034 Ok(None) => {
1035 break;
1037 }
1038
1039 Err(err) => {
1040 re_log::error!("Error while receiving messages: {}", TonicStatusError(err));
1041 break;
1042 }
1043 }
1044 }
1045
1046 Ok(tonic::Response::new(WriteMessagesResponse {}))
1047 }
1048
1049 type ReadMessagesStream = ReadLogStream;
1050
1051 async fn read_messages(
1052 &self,
1053 _: tonic::Request<ReadMessagesRequest>,
1054 ) -> tonic::Result<tonic::Response<Self::ReadMessagesStream>> {
1055 Ok(tonic::Response::new(self.new_client_log_stream().await))
1056 }
1057
1058 type ReadTablesStream = ReadTablesStream;
1059
1060 async fn write_table(
1061 &self,
1062 request: tonic::Request<WriteTableRequest>,
1063 ) -> tonic::Result<tonic::Response<WriteTableResponse>> {
1064 if let WriteTableRequest {
1065 id: Some(id),
1066 data: Some(data),
1067 } = request.into_inner()
1068 {
1069 self.push_message(TableMsgProto { id, data }).await;
1070 } else {
1071 re_log::warn!("malformed `WriteTableRequest`");
1072 }
1073
1074 Ok(tonic::Response::new(WriteTableResponse {}))
1075 }
1076
1077 async fn read_tables(
1078 &self,
1079 _: tonic::Request<ReadTablesRequest>,
1080 ) -> tonic::Result<tonic::Response<Self::ReadTablesStream>> {
1081 Ok(tonic::Response::new(self.new_client_table_stream().await))
1082 }
1083
1084 async fn save_screenshot(
1085 &self,
1086 request: tonic::Request<SaveScreenshotRequest>,
1087 ) -> tonic::Result<tonic::Response<SaveScreenshotResponse>> {
1088 let SaveScreenshotRequest { view_id, file_path } = request.into_inner();
1089 self.push_message(DataSourceUiCommand::SaveScreenshot {
1090 file_path: file_path.into(),
1091 view_id,
1092 })
1093 .await;
1094
1095 Ok(tonic::Response::new(SaveScreenshotResponse {}))
1096 }
1097}
1098
1099#[cfg(test)]
1100mod tests {
1101 use std::net::SocketAddr;
1102 use std::sync::Arc;
1103 use std::time::Duration;
1104
1105 use itertools::{Itertools as _, chain};
1106 use re_chunk::RowId;
1107 use re_log_encoding::rrd::Compression;
1108 use re_log_types::{LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource};
1109 use re_protos::sdk_comms::v1alpha1::message_proxy_service_client::MessageProxyServiceClient;
1110 use re_protos::sdk_comms::v1alpha1::message_proxy_service_server::MessageProxyServiceServer;
1111 use similar_asserts::assert_eq;
1112 use tokio::net::TcpListener;
1113 use tokio_util::sync::CancellationToken;
1114 use tonic::transport::server::TcpIncoming;
1115 use tonic::transport::{Channel, Endpoint};
1116
1117 use super::*;
1118
1119 #[derive(Clone)]
1120 struct Completion(Arc<CancellationToken>);
1121
1122 impl Drop for Completion {
1123 fn drop(&mut self) {
1124 self.finish();
1125 }
1126 }
1127
1128 impl Completion {
1129 fn new() -> Self {
1130 Self(Arc::new(CancellationToken::new()))
1131 }
1132
1133 fn finish(&self) {
1134 self.0.cancel();
1135 }
1136
1137 async fn wait(&self) {
1138 self.0.cancelled().await;
1139 }
1140 }
1141
1142 fn set_store_info_msg(store_id: &StoreId) -> LogMsg {
1143 LogMsg::SetStoreInfo(SetStoreInfo {
1144 row_id: *RowId::new(),
1145 info: StoreInfo::new(
1146 store_id.clone(),
1147 StoreSource::RustSdk {
1148 rustc_version: String::new(),
1149 llvm_version: String::new(),
1150 },
1151 ),
1152 })
1153 }
1154
1155 fn fake_log_stream_blueprint(n: usize) -> Vec<LogMsg> {
1158 let store_id = StoreId::random(StoreKind::Blueprint, "test_app");
1159
1160 let mut messages = Vec::new();
1161 messages.push(set_store_info_msg(&store_id));
1162 for _ in 0..n {
1163 messages.push(LogMsg::ArrowMsg(
1164 store_id.clone(),
1165 re_chunk::Chunk::builder("test_entity")
1166 .with_archetype(
1167 re_chunk::RowId::new(),
1168 re_log_types::TimePoint::default().with(
1169 re_log_types::Timeline::new_sequence("blueprint"),
1170 re_log_types::TimeInt::from_millis(re_log_types::NonMinI64::MIN),
1171 ),
1172 &re_sdk_types::blueprint::archetypes::Background::new(
1173 re_sdk_types::blueprint::components::BackgroundKind::SolidColor,
1174 )
1175 .with_color([255, 0, 0]),
1176 )
1177 .build()
1178 .unwrap()
1179 .to_arrow_msg()
1180 .unwrap(),
1181 ));
1182 }
1183 messages.push(LogMsg::BlueprintActivationCommand(
1184 re_log_types::BlueprintActivationCommand {
1185 blueprint_id: store_id,
1186 make_active: true,
1187 make_default: true,
1188 },
1189 ));
1190
1191 messages
1192 }
1193
1194 #[derive(Clone, Copy)]
1195 enum Temporalness {
1196 Static,
1197 Temporal,
1198 }
1199
1200 fn fake_log_stream_recording(n: usize) -> Vec<LogMsg> {
1201 let store_id = StoreId::random(StoreKind::Recording, "test_app");
1202
1203 chain!(
1204 [set_store_info_msg(&store_id)],
1205 generate_log_messages(&store_id, n, Temporalness::Temporal)
1206 )
1207 .collect()
1208 }
1209
1210 fn generate_log_messages(
1211 store_id: &StoreId,
1212 n: usize,
1213 temporalness: Temporalness,
1214 ) -> Vec<LogMsg> {
1215 let mut messages = Vec::new();
1216 for _ in 0..n {
1217 let timepoint = match temporalness {
1218 Temporalness::Static => re_log_types::TimePoint::STATIC,
1219 Temporalness::Temporal => re_log_types::TimePoint::default().with(
1220 re_log_types::Timeline::new_sequence("log_time"),
1221 re_log_types::TimeInt::from_millis(re_log_types::NonMinI64::MIN),
1222 ),
1223 };
1224
1225 messages.push(LogMsg::ArrowMsg(
1226 store_id.clone(),
1227 re_chunk::Chunk::builder("test_entity")
1228 .with_archetype(
1229 re_chunk::RowId::new(),
1230 timepoint,
1231 &re_sdk_types::archetypes::Points2D::new([
1232 (0.0, 0.0),
1233 (1.0, 1.0),
1234 (2.0, 2.0),
1235 ]),
1236 )
1237 .build()
1238 .unwrap()
1239 .to_arrow_msg()
1240 .unwrap(),
1241 ));
1242 }
1243 messages
1244 }
1245
1246 async fn setup() -> (Completion, SocketAddr) {
1247 setup_opt(ServerOptions {
1248 playback_behavior: PlaybackBehavior::OldestFirst,
1249 memory_limit: MemoryLimit::UNLIMITED,
1250 })
1251 .await
1252 }
1253
1254 async fn setup_with_memory_limit(memory_limit: MemoryLimit) -> (Completion, SocketAddr) {
1255 setup_opt(ServerOptions {
1256 playback_behavior: PlaybackBehavior::OldestFirst,
1257 memory_limit,
1258 })
1259 .await
1260 }
1261
1262 async fn setup_opt(options: ServerOptions) -> (Completion, SocketAddr) {
1263 let completion = Completion::new();
1264
1265 let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1266 let addr = tcp_listener.local_addr().unwrap();
1267
1268 tokio::spawn({
1269 let completion = completion.clone();
1270 async move {
1271 tonic::transport::Server::builder()
1272 .tcp_nodelay(true)
1276 .accept_http1(true)
1277 .http2_adaptive_window(Some(true)) .add_service(
1279 MessageProxyServiceServer::new(super::MessageProxy::new(options))
1280 .max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE)
1281 .max_encoding_message_size(MAX_ENCODING_MESSAGE_SIZE),
1282 )
1283 .serve_with_incoming_shutdown(
1284 TcpIncoming::from(tcp_listener).with_nodelay(Some(true)),
1285 completion.wait(),
1286 )
1287 .await
1288 .unwrap();
1289 }
1290 });
1291
1292 (completion, addr)
1293 }
1294
1295 async fn make_client(addr: SocketAddr) -> MessageProxyServiceClient<Channel> {
1296 MessageProxyServiceClient::new(
1297 Endpoint::from_shared(format!("http://{addr}"))
1298 .unwrap()
1299 .connect()
1300 .await
1301 .unwrap(),
1302 )
1303 .max_decoding_message_size(crate::MAX_DECODING_MESSAGE_SIZE)
1304 }
1305
1306 async fn write_messages(
1307 client: &mut MessageProxyServiceClient<Channel>,
1308 messages: Vec<LogMsg>,
1309 ) {
1310 client
1311 .write_messages(tokio_stream::iter(
1312 messages
1313 .clone()
1314 .into_iter()
1315 .map(|msg| msg.to_transport(Compression::Off).unwrap())
1316 .map(|msg| WriteMessagesRequest {
1317 log_msg: Some(msg.into()),
1318 }),
1319 ))
1320 .await
1321 .unwrap();
1322 }
1323
1324 async fn read_log_stream(
1325 log_stream: &mut tonic::Response<tonic::Streaming<ReadMessagesResponse>>,
1326 n: usize,
1327 ) -> Vec<LogMsg> {
1328 let mut app_id_cache = re_log_encoding::CachingApplicationIdInjector::default();
1329
1330 let mut stream_ref = log_stream.get_mut().map(|result| {
1331 let msg = result.unwrap().log_msg.unwrap().msg.unwrap();
1332 msg.to_application((&mut app_id_cache, None)).unwrap()
1333 });
1334
1335 let mut messages = Vec::new();
1336 for _ in 0..n {
1337 messages.push(stream_ref.next().await.unwrap());
1338 }
1339 messages
1340 }
1341
1342 #[tokio::test]
1343 async fn pubsub_basic() {
1344 let (completion, addr) = setup().await;
1345 let mut client = make_client(addr).await; let messages = fake_log_stream_blueprint(3);
1347
1348 let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1350
1351 write_messages(&mut client, messages.clone()).await;
1352
1353 let actual = read_log_stream(&mut log_stream, messages.len()).await;
1355
1356 assert_eq!(messages, actual);
1357
1358 assert!(matches!(messages[0], LogMsg::SetStoreInfo(..)));
1362 assert!(matches!(actual[0], LogMsg::SetStoreInfo(..)));
1363
1364 completion.finish();
1365 }
1366
1367 #[tokio::test]
1368 async fn pubsub_history() {
1369 let (completion, addr) = setup().await;
1370 let mut client = make_client(addr).await; let messages = fake_log_stream_blueprint(3);
1372
1373 write_messages(&mut client, messages.clone()).await;
1376
1377 let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1379
1380 let actual = read_log_stream(&mut log_stream, messages.len()).await;
1381 assert_eq!(messages, actual);
1382
1383 completion.finish();
1384 }
1385
1386 #[tokio::test]
1387 async fn one_producer_many_consumers() {
1388 let (completion, addr) = setup().await;
1389 let mut producer = make_client(addr).await; let mut consumers = vec![make_client(addr).await, make_client(addr).await];
1391 let messages = fake_log_stream_blueprint(3);
1392
1393 let mut log_streams = vec![];
1395 for consumer in &mut consumers {
1396 log_streams.push(
1397 consumer
1398 .read_messages(ReadMessagesRequest {})
1399 .await
1400 .unwrap(),
1401 );
1402 }
1403
1404 write_messages(&mut producer, messages.clone()).await;
1405
1406 for log_stream in &mut log_streams {
1408 let actual = read_log_stream(log_stream, messages.len()).await;
1409 assert_eq!(messages, actual);
1410 }
1411
1412 completion.finish();
1413 }
1414
1415 #[tokio::test]
1416 async fn many_producers_many_consumers() {
1417 let (completion, addr) = setup().await;
1418 let mut producers = vec![make_client(addr).await, make_client(addr).await];
1419 let mut consumers = vec![make_client(addr).await, make_client(addr).await];
1420 let messages = fake_log_stream_blueprint(3);
1421
1422 let mut log_streams = vec![];
1424 for consumer in &mut consumers {
1425 log_streams.push(
1426 consumer
1427 .read_messages(ReadMessagesRequest {})
1428 .await
1429 .unwrap(),
1430 );
1431 }
1432
1433 for producer in &mut producers {
1435 write_messages(producer, messages.clone()).await;
1436 }
1437
1438 let expected = [messages.clone(), messages.clone()].concat();
1439
1440 for log_stream in &mut log_streams {
1445 let actual = read_log_stream(log_stream, expected.len()).await;
1446 assert_eq!(actual, expected);
1447 }
1448
1449 completion.finish();
1450 }
1451
1452 #[tokio::test]
1453 async fn memory_limit_drops_messages() {
1454 let (completion, addr) = setup_with_memory_limit(MemoryLimit::from_bytes(1)).await;
1456 let mut client = make_client(addr).await;
1457 let messages = fake_log_stream_recording(3);
1458
1459 write_messages(&mut client, messages.clone()).await;
1460
1461 let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1463 let mut actual = vec![];
1464 loop {
1465 let timeout_stream = log_stream.get_mut().timeout(Duration::from_millis(100));
1466 tokio::pin!(timeout_stream);
1467 let timeout_result = timeout_stream.try_next().await;
1468 let mut app_id_cache = re_log_encoding::CachingApplicationIdInjector::default();
1469 match timeout_result {
1470 Ok(Some(value)) => {
1471 let msg = value.unwrap().log_msg.unwrap().msg.unwrap();
1472 actual.push(msg.to_application((&mut app_id_cache, None)).unwrap());
1473 }
1474
1475 Ok(None) | Err(_) => break,
1477 }
1478 }
1479
1480 assert_eq!(actual.len(), 2);
1482 assert_eq!(&actual[0], &messages[0]);
1483 assert_eq!(&actual[1], messages.last().unwrap());
1484
1485 completion.finish();
1486 }
1487
1488 #[tokio::test]
1489 async fn memory_limit_does_not_drop_blueprint() {
1490 let (completion, addr) = setup_with_memory_limit(MemoryLimit::from_bytes(1)).await;
1492 let mut client = make_client(addr).await;
1493 let messages = fake_log_stream_blueprint(3);
1494
1495 write_messages(&mut client, messages.clone()).await;
1497
1498 let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1500 let mut actual = vec![];
1501 loop {
1502 let timeout_stream = log_stream.get_mut().timeout(Duration::from_millis(100));
1503 tokio::pin!(timeout_stream);
1504 let timeout_result = timeout_stream.try_next().await;
1505 let mut app_id_cache = re_log_encoding::CachingApplicationIdInjector::default();
1506 match timeout_result {
1507 Ok(Some(value)) => {
1508 let msg = value.unwrap().log_msg.unwrap().msg.unwrap();
1509 actual.push(msg.to_application((&mut app_id_cache, None)).unwrap());
1510 }
1511
1512 Ok(None) | Err(_) => break,
1514 }
1515 }
1516
1517 assert_eq!(messages, actual);
1520
1521 completion.finish();
1522 }
1523
1524 #[tokio::test]
1525 async fn memory_limit_does_not_interrupt_stream() {
1526 let memory_limits = [
1527 0, 1, ];
1530
1531 for memory_limit in memory_limits {
1532 let (completion, addr) =
1533 setup_with_memory_limit(MemoryLimit::from_bytes(memory_limit)).await;
1534 let mut client = make_client(addr).await; let messages = fake_log_stream_blueprint(3);
1536
1537 let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1539
1540 write_messages(&mut client, messages.clone()).await;
1541
1542 let actual = read_log_stream(&mut log_stream, messages.len()).await;
1544 assert_eq!(messages, actual);
1545
1546 completion.finish();
1547 }
1548 }
1549
1550 #[tokio::test]
1551 async fn static_data_is_returned_first() {
1552 let (completion, addr) = setup_with_memory_limit(MemoryLimit::UNLIMITED).await;
1553 let mut client = make_client(addr).await;
1554
1555 let store_id = StoreId::random(StoreKind::Recording, "test_app");
1556
1557 let set_store_info = vec![set_store_info_msg(&store_id)];
1558 let first_static = generate_log_messages(&store_id, 3, Temporalness::Static);
1559 let first_temporal = generate_log_messages(&store_id, 3, Temporalness::Temporal);
1560 let second_static = generate_log_messages(&store_id, 3, Temporalness::Static);
1561
1562 write_messages(&mut client, set_store_info.clone()).await;
1563 write_messages(&mut client, first_static.clone()).await;
1564 write_messages(&mut client, first_temporal.clone()).await;
1565 write_messages(&mut client, second_static.clone()).await;
1566
1567 let expected =
1569 itertools::chain!(set_store_info, first_static, second_static, first_temporal)
1570 .collect_vec();
1571
1572 let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1573 let actual = read_log_stream(&mut log_stream, expected.len()).await;
1574
1575 assert_eq!(actual, expected);
1576
1577 completion.finish();
1578 }
1579
1580 #[tokio::test]
1581 async fn playback_newest_first() {
1582 let (completion, addr) = setup_opt(ServerOptions {
1583 playback_behavior: PlaybackBehavior::NewestFirst, memory_limit: MemoryLimit::UNLIMITED,
1585 })
1586 .await;
1587 let mut client = make_client(addr).await;
1588
1589 let store_id = StoreId::random(StoreKind::Recording, "test_app");
1590
1591 let set_store_info = vec![set_store_info_msg(&store_id)];
1592 let first_statics = generate_log_messages(&store_id, 3, Temporalness::Static);
1593 let temporals = generate_log_messages(&store_id, 3, Temporalness::Temporal);
1594 let second_statics = generate_log_messages(&store_id, 3, Temporalness::Static);
1595
1596 write_messages(&mut client, set_store_info.clone()).await;
1597 write_messages(&mut client, first_statics.clone()).await;
1598 write_messages(&mut client, temporals.clone()).await;
1599 write_messages(&mut client, second_statics.clone()).await;
1600
1601 let expected = itertools::chain!(
1603 set_store_info.into_iter().rev(),
1604 second_statics.into_iter().rev(),
1605 first_statics.into_iter().rev(),
1606 temporals.into_iter().rev()
1607 )
1608 .collect_vec();
1609
1610 let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1611 let actual = read_log_stream(&mut log_stream, expected.len()).await;
1612
1613 assert_eq!(actual, expected);
1614
1615 completion.finish();
1616 }
1617}