1pub mod shutdown;
4
5use std::{collections::VecDeque, net::SocketAddr, pin::Pin};
6
7use tokio::{
8 net::TcpListener,
9 sync::{broadcast, mpsc, oneshot},
10};
11use tokio_stream::{Stream, StreamExt as _, wrappers::BroadcastStream};
12use tonic::transport::{Server, server::TcpIncoming};
13use tower_http::cors::CorsLayer;
14
15use re_byte_size::SizeBytes;
16use re_log_encoding::{ToApplication as _, ToTransport as _};
17use re_log_types::{DataSourceMessage, TableMsg};
18use re_protos::sdk_comms::v1alpha1::{
19 ReadTablesRequest, ReadTablesResponse, WriteMessagesRequest, WriteTableRequest,
20 WriteTableResponse,
21};
22
23use re_protos::{
24 common::v1alpha1::{
25 DataframePart as DataframePartProto, StoreKind as StoreKindProto, TableId as TableIdProto,
26 },
27 log_msg::v1alpha1::LogMsg as LogMsgProto,
28 sdk_comms::v1alpha1::{
29 ReadMessagesRequest, ReadMessagesResponse, WriteMessagesResponse,
30 message_proxy_service_server,
31 },
32};
33
34use crate::priority_stream::PriorityMerge;
35
36mod priority_stream;
37
38pub use re_memory::MemoryLimit;
39
40pub const DEFAULT_SERVER_PORT: u16 = 9876;
42
43pub const MAX_DECODING_MESSAGE_SIZE: usize = u32::MAX as usize;
44pub const MAX_ENCODING_MESSAGE_SIZE: usize = MAX_DECODING_MESSAGE_SIZE;
45
46const MESSAGE_QUEUE_CAPACITY: usize =
49 (16 * 1024 * 1024 / std::mem::size_of::<LogOrTableMsgProto>()).next_power_of_two();
50
51#[derive(Clone, Copy, Debug)]
53pub struct ServerOptions {
54 pub playback_behavior: PlaybackBehavior,
56
57 pub memory_limit: MemoryLimit,
63}
64
65impl Default for ServerOptions {
66 fn default() -> Self {
67 Self {
68 playback_behavior: PlaybackBehavior::OldestFirst,
69 memory_limit: MemoryLimit::UNLIMITED,
70 }
71 }
72}
73
74#[derive(Clone, Copy, Debug)]
76pub enum PlaybackBehavior {
77 OldestFirst,
80
81 NewestFirst,
84}
85
86impl PlaybackBehavior {
87 pub fn from_newest_first(newest_first: bool) -> Self {
88 if newest_first {
89 Self::NewestFirst
90 } else {
91 Self::OldestFirst
92 }
93 }
94}
95
96#[derive(Debug)]
98pub struct TonicStatusError(pub tonic::Status);
99
100impl std::fmt::Display for TonicStatusError {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 let status = &self.0;
104
105 write!(f, "gRPC error")?;
106
107 if status.code() != tonic::Code::Unknown {
108 write!(f, ", code: '{}'", status.code())?;
109 }
110 if !status.message().is_empty() {
111 write!(f, ", message: {:?}", status.message())?;
112 }
113 if !status.metadata().is_empty() {
118 write!(f, ", metadata: {:?}", status.metadata().as_ref())?;
119 }
120 Ok(())
121 }
122}
123
124impl From<tonic::Status> for TonicStatusError {
125 fn from(value: tonic::Status) -> Self {
126 Self(value)
127 }
128}
129
130pub async fn serve(
149 addr: SocketAddr,
150 options: ServerOptions,
151 shutdown: shutdown::Shutdown,
152) -> anyhow::Result<()> {
153 serve_impl(addr, MessageProxy::new(options), shutdown).await
154}
155
156async fn serve_impl(
157 addr: SocketAddr,
158 message_proxy: MessageProxy,
159 shutdown: shutdown::Shutdown,
160) -> anyhow::Result<()> {
161 let tcp_listener = TcpListener::bind(addr).await?;
162 let incoming = TcpIncoming::from(tcp_listener).with_nodelay(Some(true));
163
164 let connect_addr = if addr.ip().is_loopback() || addr.ip().is_unspecified() {
165 format!("rerun+http://127.0.0.1:{}/proxy", addr.port())
166 } else {
167 format!("rerun+http://{addr}/proxy")
168 };
169 re_log::info!(
170 "Listening for gRPC connections on {addr}. Connect by running `rerun --connect {connect_addr}`"
171 );
172
173 let cors = CorsLayer::very_permissive();
174 let grpc_web = tonic_web::GrpcWebLayer::new();
175
176 let routes = {
177 let mut routes_builder = tonic::service::Routes::builder();
178 routes_builder.add_service(
179 re_protos::sdk_comms::v1alpha1::message_proxy_service_server::MessageProxyServiceServer::new(
180 message_proxy,
181 )
182 .max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE)
183 .max_encoding_message_size(MAX_ENCODING_MESSAGE_SIZE),
184 );
185 routes_builder.routes()
186 };
187
188 Server::builder()
189 .accept_http1(true) .layer(cors) .layer(grpc_web) .add_routes(routes)
193 .serve_with_incoming_shutdown(incoming, shutdown.wait())
194 .await?;
195
196 Ok(())
197}
198
199pub async fn serve_from_channel(
210 addr: SocketAddr,
211 options: ServerOptions,
212 shutdown: shutdown::Shutdown,
213 channel_rx: re_smart_channel::Receiver<re_log_types::LogMsg>,
214) {
215 let message_proxy = MessageProxy::new(options);
216 let event_tx = message_proxy.event_tx.clone();
217
218 tokio::task::spawn_blocking(move || {
219 use re_smart_channel::SmartMessagePayload;
220
221 loop {
222 let msg = if let Ok(msg) = channel_rx.recv() {
223 match msg.payload {
224 SmartMessagePayload::Msg(msg) => msg,
225 SmartMessagePayload::Flush { on_flush_done } => {
226 on_flush_done(); continue;
228 }
229 SmartMessagePayload::Quit(err) => {
230 if let Some(err) = err {
231 re_log::debug!("smart channel sender quit: {err}");
232 } else {
233 re_log::debug!("smart channel sender quit");
234 }
235 break;
236 }
237 }
238 } else {
239 re_log::debug!("smart channel sender closed, closing receiver");
240 break;
241 };
242
243 let msg = match msg.to_transport(re_log_encoding::rrd::Compression::LZ4) {
244 Ok(msg) => msg,
245 Err(err) => {
246 re_log::error!("failed to encode message: {err}");
247 continue;
248 }
249 };
250
251 if event_tx.blocking_send(Event::Message(msg.into())).is_err() {
252 re_log::debug!("shut down, closing sender");
253 break;
254 }
255 }
256 });
257
258 if let Err(err) = serve_impl(addr, message_proxy, shutdown).await {
259 re_log::error!("message proxy server crashed: {err}");
260 }
261}
262
263pub fn spawn_from_rx_set(
272 addr: SocketAddr,
273 options: ServerOptions,
274 shutdown: shutdown::Shutdown,
275 rxs: re_smart_channel::ReceiveSet<re_log_types::DataSourceMessage>,
276) {
277 let message_proxy = MessageProxy::new(options);
278 let event_tx = message_proxy.event_tx.clone();
279
280 tokio::spawn(async move {
281 if let Err(err) = serve_impl(addr, message_proxy, shutdown).await {
282 re_log::error!("message proxy server crashed: {err}");
283 }
284 });
285
286 tokio::task::spawn_blocking(move || {
287 use re_smart_channel::SmartMessagePayload;
288
289 loop {
290 let msg = if let Ok(msg) = rxs.recv() {
291 match msg.payload {
292 SmartMessagePayload::Msg(msg) => msg,
293 SmartMessagePayload::Flush { on_flush_done } => {
294 on_flush_done(); continue;
296 }
297 SmartMessagePayload::Quit(err) => {
298 if let Some(err) = err {
299 re_log::debug!("smart channel sender quit: {err}");
300 } else {
301 re_log::debug!("smart channel sender quit");
302 }
303 if rxs.is_empty() {
304 break;
306 }
307 continue;
308 }
309 }
310 } else {
311 if rxs.is_empty() {
312 break;
314 }
315 continue;
316 };
317
318 let msg = match msg {
319 DataSourceMessage::LogMsg(log_msg) => log_msg,
320 DataSourceMessage::UiCommand(ui_command) => {
321 re_log::warn!(
322 "Received a UI command, grpc server can't forward these yet: {ui_command:?}"
323 );
324 continue;
325 }
326 };
327
328 let msg = match msg.to_transport(re_log_encoding::rrd::Compression::LZ4) {
329 Ok(msg) => msg,
330 Err(err) => {
331 re_log::error!("failed to encode message: {err}");
332 continue;
333 }
334 };
335
336 if event_tx.blocking_send(Event::Message(msg.into())).is_err() {
337 re_log::debug!("shut down, closing sender");
338 break;
339 }
340 }
341 });
342}
343
344pub fn spawn_with_recv(
356 addr: SocketAddr,
357 options: ServerOptions,
358 shutdown: shutdown::Shutdown,
359) -> (
360 re_smart_channel::Receiver<re_log_types::DataSourceMessage>,
361 crossbeam::channel::Receiver<re_log_types::TableMsg>,
362) {
363 let uri = re_uri::ProxyUri::new(re_uri::Origin::from_scheme_and_socket_addr(
364 re_uri::Scheme::RerunHttp,
365 addr,
366 ));
367 let (channel_log_tx, channel_log_rx) = re_smart_channel::smart_channel(
368 re_smart_channel::SmartMessageSource::MessageProxy(uri.clone()),
369 re_smart_channel::SmartChannelSource::MessageProxy(uri),
370 );
371 let (channel_table_tx, channel_table_rx) = crossbeam::channel::unbounded();
372 let (message_proxy, mut broadcast_log_rx, mut broadcast_table_rx) =
373 MessageProxy::new_with_recv(options);
374 tokio::spawn(async move {
375 if let Err(err) = serve_impl(addr, message_proxy, shutdown).await {
376 re_log::error!("message proxy server crashed: {err}");
377 }
378 });
379 tokio::spawn(async move {
380 let mut app_id_cache = re_log_encoding::CachingApplicationIdInjector::default();
381
382 loop {
383 let msg = match broadcast_log_rx.recv().await {
384 Ok(msg) => match msg.msg {
385 Some(msg) => msg.to_application((&mut app_id_cache, None)),
386 None => Err(re_protos::missing_field!(
387 re_protos::log_msg::v1alpha1::LogMsg,
388 "msg"
389 )
390 .into()),
391 },
392 Err(broadcast::error::RecvError::Closed) => {
393 re_log::debug!("message proxy server shut down, closing receiver");
394 channel_log_tx.quit(None).ok();
395 break;
396 }
397 Err(broadcast::error::RecvError::Lagged(n)) => {
398 re_log::warn!(
399 "message proxy receiver dropped {n} messages due to backpressure"
400 );
401 continue;
402 }
403 };
404 match msg {
405 Ok(mut log_msg) => {
406 log_msg.insert_arrow_record_batch_metadata(
410 re_sorbet::timestamp_metadata::KEY_TIMESTAMP_VIEWER_IPC_DECODED.to_owned(),
411 re_sorbet::timestamp_metadata::now_timestamp(),
412 );
413
414 if channel_log_tx.send(log_msg.into()).is_err() {
415 re_log::debug!(
416 "message proxy smart channel receiver closed, closing sender"
417 );
418 break;
419 }
420 }
421 Err(err) => {
422 re_log::error!("dropping LogMsg due to failed decode: {err}");
423 }
424 }
425 }
426 });
427 tokio::spawn(async move {
428 loop {
429 let msg = match broadcast_table_rx.recv().await {
430 Ok(msg) => msg.data.try_into().map(|data| TableMsg {
431 id: msg.id.into(),
432 data,
433 }),
434 Err(broadcast::error::RecvError::Closed) => {
435 re_log::debug!("message proxy server shut down, closing receiver");
436 break;
438 }
439 Err(broadcast::error::RecvError::Lagged(n)) => {
440 re_log::warn!(
441 "message proxy receiver dropped {n} messages due to backpressure"
442 );
443 continue;
444 }
445 };
446 match msg {
447 Ok(msg) => {
448 if channel_table_tx.send(msg).is_err() {
449 re_log::debug!(
450 "message proxy smart channel receiver closed, closing sender"
451 );
452 break;
453 }
454 }
455 Err(err) => {
456 re_log::error!("dropping table due to failed decode: {err}");
457 }
458 }
459 }
460 });
461 (channel_log_rx, channel_table_rx)
462}
463
464enum Event {
465 NewClient(
467 oneshot::Sender<(
468 Vec<LogOrTableMsgProto>,
469 broadcast::Receiver<LogMsgProto>,
470 broadcast::Receiver<TableMsgProto>,
471 )>,
472 ),
473
474 Message(LogMsgProto),
476
477 Table(TableMsgProto),
479}
480
481#[derive(Clone)]
482struct TableMsgProto {
483 id: TableIdProto,
484 data: DataframePartProto,
485}
486
487#[derive(Clone)]
488enum LogOrTableMsgProto {
489 LogMsg(LogMsgProto),
490 Table(TableMsgProto),
491}
492
493impl LogOrTableMsgProto {
494 fn total_size_bytes(&self) -> u64 {
495 match self {
496 Self::LogMsg(log_msg) => log_msg.total_size_bytes(),
497 Self::Table(table) => table.total_size_bytes(),
498 }
499 }
500}
501
502impl From<LogMsgProto> for LogOrTableMsgProto {
503 fn from(value: LogMsgProto) -> Self {
504 Self::LogMsg(value)
505 }
506}
507
508impl From<TableMsgProto> for LogOrTableMsgProto {
509 fn from(value: TableMsgProto) -> Self {
510 Self::Table(value)
511 }
512}
513
514#[derive(Default)]
517struct MsgQueue {
518 queue: VecDeque<LogOrTableMsgProto>,
520
521 size_bytes: u64,
523}
524
525impl MsgQueue {
526 pub fn iter(&self) -> impl DoubleEndedIterator<Item = &LogOrTableMsgProto> {
527 self.queue.iter()
528 }
529
530 pub fn push_back(&mut self, msg: LogOrTableMsgProto) {
531 self.size_bytes += msg.total_size_bytes();
532 self.queue.push_back(msg);
533 }
534
535 pub fn pop_front(&mut self) -> Option<LogOrTableMsgProto> {
536 if let Some(msg) = self.queue.pop_front() {
537 self.size_bytes -= msg.total_size_bytes();
538 Some(msg)
539 } else {
540 None
541 }
542 }
543}
544
545#[derive(Default)]
550struct MessageBuffer {
551 disposable: MsgQueue,
555
556 static_: MsgQueue,
572
573 persistent: MsgQueue,
575}
576
577impl MessageBuffer {
578 fn size_bytes(&self) -> u64 {
579 let Self {
580 disposable,
581 static_,
582 persistent,
583 } = self;
584 disposable.size_bytes + static_.size_bytes + persistent.size_bytes
585 }
586
587 fn all(&self, playback_behavior: PlaybackBehavior) -> Vec<LogOrTableMsgProto> {
588 re_tracing::profile_function!();
589
590 let Self {
591 disposable,
592 static_,
593 persistent,
594 } = self;
595
596 match playback_behavior {
600 PlaybackBehavior::OldestFirst => {
601 itertools::chain!(persistent.iter(), static_.iter(), disposable.iter())
602 .cloned()
603 .collect()
604 }
605 PlaybackBehavior::NewestFirst => itertools::chain!(
606 persistent.iter().rev(),
607 static_.iter().rev(),
608 disposable.iter().rev()
609 )
610 .cloned()
611 .collect(),
612 }
613 }
614
615 fn add_table(&mut self, table: TableMsgProto) {
616 self.disposable.push_back(table.into());
617 }
618
619 fn add_log_msg(&mut self, msg: LogMsgProto) {
620 let Some(inner) = &msg.msg else {
621 re_log::error!(
622 "{}",
623 re_protos::missing_field!(re_protos::log_msg::v1alpha1::LogMsg, "msg")
624 );
625 return;
626 };
627
628 use re_protos::log_msg::v1alpha1::log_msg::Msg;
631 match inner {
632 Msg::SetStoreInfo(..) | Msg::BlueprintActivationCommand(..) => {
634 self.persistent.push_back(msg.into());
635 }
636
637 Msg::ArrowMsg(inner) => {
638 let is_blueprint = inner
639 .store_id
640 .as_ref()
641 .is_some_and(|id| id.kind() == StoreKindProto::Blueprint);
642
643 if is_blueprint {
644 self.persistent.push_back(msg.into());
646 } else if inner.is_static == Some(true) {
647 self.static_.push_back(msg.into());
648 } else {
649 self.disposable.push_back(msg.into());
651 }
652 }
653 }
654 }
655
656 pub fn gc(&mut self, max_bytes: u64) {
657 if self.size_bytes() <= max_bytes {
658 return;
660 }
661
662 re_tracing::profile_scope!("Drop messages");
663 re_log::info_once!(
664 "Memory limit ({}) exceeded. Dropping old log messages from the gRPC proxy server. Clients connecting after this will not see the full history.",
665 re_format::format_bytes(max_bytes as _)
666 );
667
668 let start_size = self.size_bytes();
669 let mut messages_dropped = 0;
670
671 while self.disposable.pop_front().is_some() {
672 messages_dropped += 1;
673 if self.size_bytes() < max_bytes {
674 break;
675 }
676 }
677
678 if max_bytes < self.size_bytes() {
679 re_log::info_once!(
680 "Memory limit ({}) exceeded. Dropping old *static* log messages as well. Clients connecting after this will no longer see the complete set of static data.",
681 re_format::format_bytes(max_bytes as _)
682 );
683 while self.static_.pop_front().is_some() {
684 messages_dropped += 1;
685 if self.size_bytes() < max_bytes {
686 break;
687 }
688 }
689 }
690
691 let bytes_dropped = start_size - self.size_bytes();
692
693 re_log::trace!(
694 "Dropped {} bytes in {messages_dropped} message(s)",
695 re_format::format_bytes(bytes_dropped as _)
696 );
697
698 if max_bytes < self.size_bytes() {
699 re_log::warn_once!(
700 "The gRPC server is using more memory than the given memory limit ({}), despite having garbage-collected all non-persistent messages.",
701 re_format::format_bytes(max_bytes as _)
702 );
703 }
704 }
705}
706
707struct EventLoop {
713 options: ServerOptions,
714
715 broadcast_log_tx: broadcast::Sender<LogMsgProto>,
717
718 broadcast_table_tx: broadcast::Sender<TableMsgProto>,
720
721 event_rx: mpsc::Receiver<Event>,
723
724 messages: MessageBuffer,
725}
726
727impl EventLoop {
728 fn new(
729 options: ServerOptions,
730 event_rx: mpsc::Receiver<Event>,
731 broadcast_log_tx: broadcast::Sender<LogMsgProto>,
732 broadcast_table_tx: broadcast::Sender<TableMsgProto>,
733 ) -> Self {
734 Self {
735 options,
736 broadcast_log_tx,
737 broadcast_table_tx,
738 event_rx,
739 messages: Default::default(),
740 }
741 }
742
743 async fn run_in_place(mut self) {
744 loop {
745 let Some(event) = self.event_rx.recv().await else {
746 break;
747 };
748
749 match event {
750 Event::NewClient(channel) => {
751 channel
752 .send((
753 self.messages.all(self.options.playback_behavior),
754 self.broadcast_log_tx.subscribe(),
755 self.broadcast_table_tx.subscribe(),
756 ))
757 .ok();
758 }
759 Event::Message(msg) => self.handle_msg(msg),
760 Event::Table(table) => self.handle_table(table),
761 }
762 }
763 }
764
765 fn handle_msg(&mut self, msg: LogMsgProto) {
766 self.broadcast_log_tx.send(msg.clone()).ok();
767
768 if self.is_history_disabled() {
769 return;
771 }
772
773 self.gc_if_using_too_much_ram();
774
775 self.messages.add_log_msg(msg);
776 }
777
778 fn handle_table(&mut self, table: TableMsgProto) {
779 self.broadcast_table_tx.send(table.clone()).ok();
780
781 if self.is_history_disabled() {
782 return;
784 }
785
786 self.gc_if_using_too_much_ram();
787
788 self.messages.add_table(table);
789 }
790
791 fn is_history_disabled(&self) -> bool {
792 self.options.memory_limit.max_bytes.is_some_and(|b| b == 0)
793 }
794
795 fn gc_if_using_too_much_ram(&mut self) {
796 let Some(max_bytes) = self.options.memory_limit.max_bytes else {
797 return;
799 };
800
801 self.messages.gc(max_bytes as _);
802 }
803}
804
805impl SizeBytes for TableMsgProto {
806 fn heap_size_bytes(&self) -> u64 {
807 let Self { id, data } = self;
808 id.heap_size_bytes() + data.heap_size_bytes()
809 }
810}
811
812pub struct MessageProxy {
813 options: ServerOptions,
814 _queue_task_handle: tokio::task::JoinHandle<()>,
815 event_tx: mpsc::Sender<Event>,
816}
817
818impl MessageProxy {
819 pub fn new(options: ServerOptions) -> Self {
820 Self::new_with_recv(options).0
821 }
822
823 fn new_with_recv(
824 options: ServerOptions,
825 ) -> (
826 Self,
827 broadcast::Receiver<LogMsgProto>,
828 broadcast::Receiver<TableMsgProto>,
829 ) {
830 let (event_tx, event_rx) = mpsc::channel(MESSAGE_QUEUE_CAPACITY);
831 let (broadcast_log_tx, broadcast_log_rx) = broadcast::channel(MESSAGE_QUEUE_CAPACITY);
832 let (broadcast_table_tx, broadcast_table_rx) = broadcast::channel(MESSAGE_QUEUE_CAPACITY);
833
834 let task_handle = tokio::spawn(async move {
835 EventLoop::new(options, event_rx, broadcast_log_tx, broadcast_table_tx)
836 .run_in_place()
837 .await;
838 });
839
840 (
841 Self {
842 options,
843 _queue_task_handle: task_handle,
844 event_tx,
845 },
846 broadcast_log_rx,
847 broadcast_table_rx,
848 )
849 }
850
851 async fn push_msg(&self, msg: LogMsgProto) {
852 self.event_tx.send(Event::Message(msg)).await.ok();
853 }
854
855 async fn push_table(&self, table: TableMsgProto) {
856 self.event_tx.send(Event::Table(table)).await.ok();
857 }
858
859 async fn new_client_message_stream(&self) -> ReadMessagesStream {
860 let (sender, receiver) = oneshot::channel();
861 if let Err(err) = self.event_tx.send(Event::NewClient(sender)).await {
862 re_log::error!("Error accepting new client: {err}");
863 return Box::pin(tokio_stream::empty());
864 }
865 let (history, log_channel, _) = match receiver.await {
866 Ok(v) => v,
867 Err(err) => {
868 re_log::error!("Error accepting new client: {err}");
869 return Box::pin(tokio_stream::empty());
870 }
871 };
872
873 let history = tokio_stream::iter(
874 history
875 .into_iter()
876 .filter_map(|log_msg| {
877 if let LogOrTableMsgProto::LogMsg(log_msg) = log_msg {
878 Some(ReadMessagesResponse {
879 log_msg: Some(log_msg),
880 })
881 } else {
882 None
883 }
884 })
885 .map(Ok),
886 );
887 let channel = BroadcastStream::new(log_channel).map(|result| {
888 result
889 .map(|log_msg| ReadMessagesResponse {
890 log_msg: Some(log_msg),
891 })
892 .map_err(|err| {
893 re_log::error!("Error reading message from broadcast channel: {err}");
894 tonic::Status::internal("internal channel error")
895 })
896 });
897
898 match self.options.playback_behavior {
899 PlaybackBehavior::OldestFirst => Box::pin(history.chain(channel)),
900 PlaybackBehavior::NewestFirst => Box::pin(PriorityMerge::new(channel, history)),
901 }
902 }
903
904 async fn new_client_table_stream(&self) -> ReadTablesStream {
905 let (sender, receiver) = oneshot::channel();
906 if let Err(err) = self.event_tx.send(Event::NewClient(sender)).await {
907 re_log::error!("Error accepting new client: {err}");
908 return Box::pin(tokio_stream::empty());
909 }
910 let (history, _, table_channel) = match receiver.await {
911 Ok(v) => v,
912 Err(err) => {
913 re_log::error!("Error accepting new client: {err}");
914 return Box::pin(tokio_stream::empty());
915 }
916 };
917
918 let history = tokio_stream::iter(
919 history
920 .into_iter()
921 .filter_map(|table| {
922 if let LogOrTableMsgProto::Table(table) = table {
923 Some(ReadTablesResponse {
924 id: Some(table.id),
925 data: Some(table.data),
926 })
927 } else {
928 None
929 }
930 })
931 .map(Ok),
932 );
933 let channel = BroadcastStream::new(table_channel).map(|result| {
934 result
935 .map(|table| ReadTablesResponse {
936 id: Some(table.id),
937 data: Some(table.data),
938 })
939 .map_err(|err| {
940 re_log::error!("Error reading message from broadcast channel: {err}");
941 tonic::Status::internal("internal channel error")
942 })
943 });
944
945 Box::pin(history.chain(channel))
946 }
947}
948
949type ReadMessagesStream = Pin<Box<dyn Stream<Item = tonic::Result<ReadMessagesResponse>> + Send>>;
950type ReadTablesStream = Pin<Box<dyn Stream<Item = tonic::Result<ReadTablesResponse>> + Send>>;
951
952#[tonic::async_trait]
953impl message_proxy_service_server::MessageProxyService for MessageProxy {
954 async fn write_messages(
955 &self,
956 request: tonic::Request<tonic::Streaming<WriteMessagesRequest>>,
957 ) -> tonic::Result<tonic::Response<WriteMessagesResponse>> {
958 let mut stream = request.into_inner();
959 loop {
960 match stream.message().await {
961 Ok(Some(WriteMessagesRequest {
962 log_msg: Some(log_msg),
963 })) => {
964 self.push_msg(log_msg).await;
965 }
966
967 Ok(Some(WriteMessagesRequest { log_msg: None })) => {
968 re_log::warn!("missing log_msg in WriteMessagesRequest");
969 }
970
971 Ok(None) => {
972 break;
974 }
975
976 Err(err) => {
977 re_log::error!("Error while receiving messages: {}", TonicStatusError(err));
978 break;
979 }
980 }
981 }
982
983 Ok(tonic::Response::new(WriteMessagesResponse {}))
984 }
985
986 type ReadMessagesStream = ReadMessagesStream;
987
988 async fn read_messages(
989 &self,
990 _: tonic::Request<ReadMessagesRequest>,
991 ) -> tonic::Result<tonic::Response<Self::ReadMessagesStream>> {
992 Ok(tonic::Response::new(self.new_client_message_stream().await))
993 }
994
995 type ReadTablesStream = ReadTablesStream;
996
997 async fn write_table(
998 &self,
999 request: tonic::Request<WriteTableRequest>,
1000 ) -> tonic::Result<tonic::Response<WriteTableResponse>> {
1001 if let WriteTableRequest {
1002 id: Some(id),
1003 data: Some(data),
1004 } = request.into_inner()
1005 {
1006 self.push_table(TableMsgProto { id, data }).await;
1007 } else {
1008 re_log::warn!("malformed `WriteTableRequest`");
1009 }
1010
1011 Ok(tonic::Response::new(WriteTableResponse {}))
1012 }
1013
1014 async fn read_tables(
1015 &self,
1016 _: tonic::Request<ReadTablesRequest>,
1017 ) -> tonic::Result<tonic::Response<Self::ReadTablesStream>> {
1018 Ok(tonic::Response::new(self.new_client_table_stream().await))
1019 }
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024 use std::net::SocketAddr;
1025 use std::sync::Arc;
1026 use std::time::Duration;
1027
1028 use itertools::{Itertools as _, chain};
1029 use similar_asserts::assert_eq;
1030 use tokio::net::TcpListener;
1031 use tokio_util::sync::CancellationToken;
1032 use tonic::transport::Channel;
1033 use tonic::transport::Endpoint;
1034 use tonic::transport::server::TcpIncoming;
1035
1036 use re_chunk::RowId;
1037 use re_log_encoding::rrd::Compression;
1038 use re_log_types::{LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource};
1039 use re_protos::sdk_comms::v1alpha1::{
1040 message_proxy_service_client::MessageProxyServiceClient,
1041 message_proxy_service_server::MessageProxyServiceServer,
1042 };
1043
1044 use super::*;
1045
1046 #[derive(Clone)]
1047 struct Completion(Arc<CancellationToken>);
1048
1049 impl Drop for Completion {
1050 fn drop(&mut self) {
1051 self.finish();
1052 }
1053 }
1054
1055 impl Completion {
1056 fn new() -> Self {
1057 Self(Arc::new(CancellationToken::new()))
1058 }
1059
1060 fn finish(&self) {
1061 self.0.cancel();
1062 }
1063
1064 async fn wait(&self) {
1065 self.0.cancelled().await;
1066 }
1067 }
1068
1069 fn set_store_info_msg(store_id: &StoreId) -> LogMsg {
1070 LogMsg::SetStoreInfo(SetStoreInfo {
1071 row_id: *RowId::new(),
1072 info: StoreInfo::new(
1073 store_id.clone(),
1074 StoreSource::RustSdk {
1075 rustc_version: String::new(),
1076 llvm_version: String::new(),
1077 },
1078 ),
1079 })
1080 }
1081
1082 fn fake_log_stream_blueprint(n: usize) -> Vec<LogMsg> {
1085 let store_id = StoreId::random(StoreKind::Blueprint, "test_app");
1086
1087 let mut messages = Vec::new();
1088 messages.push(set_store_info_msg(&store_id));
1089 for _ in 0..n {
1090 messages.push(LogMsg::ArrowMsg(
1091 store_id.clone(),
1092 re_chunk::Chunk::builder("test_entity")
1093 .with_archetype(
1094 re_chunk::RowId::new(),
1095 re_log_types::TimePoint::default().with(
1096 re_log_types::Timeline::new_sequence("blueprint"),
1097 re_log_types::TimeInt::from_millis(re_log_types::NonMinI64::MIN),
1098 ),
1099 &re_types::blueprint::archetypes::Background::new(
1100 re_types::blueprint::components::BackgroundKind::SolidColor,
1101 )
1102 .with_color([255, 0, 0]),
1103 )
1104 .build()
1105 .unwrap()
1106 .to_arrow_msg()
1107 .unwrap(),
1108 ));
1109 }
1110 messages.push(LogMsg::BlueprintActivationCommand(
1111 re_log_types::BlueprintActivationCommand {
1112 blueprint_id: store_id,
1113 make_active: true,
1114 make_default: true,
1115 },
1116 ));
1117
1118 messages
1119 }
1120
1121 #[derive(Clone, Copy)]
1122 enum Temporalness {
1123 Static,
1124 Temporal,
1125 }
1126
1127 fn fake_log_stream_recording(n: usize) -> Vec<LogMsg> {
1128 let store_id = StoreId::random(StoreKind::Recording, "test_app");
1129
1130 chain!(
1131 [set_store_info_msg(&store_id)],
1132 generate_log_messages(&store_id, n, Temporalness::Temporal)
1133 )
1134 .collect()
1135 }
1136
1137 fn generate_log_messages(
1138 store_id: &StoreId,
1139 n: usize,
1140 temporalness: Temporalness,
1141 ) -> Vec<LogMsg> {
1142 let mut messages = Vec::new();
1143 for _ in 0..n {
1144 let timepoint = match temporalness {
1145 Temporalness::Static => re_log_types::TimePoint::STATIC,
1146 Temporalness::Temporal => re_log_types::TimePoint::default().with(
1147 re_log_types::Timeline::new_sequence("log_time"),
1148 re_log_types::TimeInt::from_millis(re_log_types::NonMinI64::MIN),
1149 ),
1150 };
1151
1152 messages.push(LogMsg::ArrowMsg(
1153 store_id.clone(),
1154 re_chunk::Chunk::builder("test_entity")
1155 .with_archetype(
1156 re_chunk::RowId::new(),
1157 timepoint,
1158 &re_types::archetypes::Points2D::new([(0.0, 0.0), (1.0, 1.0), (2.0, 2.0)]),
1159 )
1160 .build()
1161 .unwrap()
1162 .to_arrow_msg()
1163 .unwrap(),
1164 ));
1165 }
1166 messages
1167 }
1168
1169 async fn setup() -> (Completion, SocketAddr) {
1170 setup_opt(ServerOptions {
1171 playback_behavior: PlaybackBehavior::OldestFirst,
1172 memory_limit: MemoryLimit::UNLIMITED,
1173 })
1174 .await
1175 }
1176
1177 async fn setup_with_memory_limit(memory_limit: MemoryLimit) -> (Completion, SocketAddr) {
1178 setup_opt(ServerOptions {
1179 playback_behavior: PlaybackBehavior::OldestFirst,
1180 memory_limit,
1181 })
1182 .await
1183 }
1184
1185 async fn setup_opt(options: ServerOptions) -> (Completion, SocketAddr) {
1186 let completion = Completion::new();
1187
1188 let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1189 let addr = tcp_listener.local_addr().unwrap();
1190
1191 tokio::spawn({
1192 let completion = completion.clone();
1193 async move {
1194 tonic::transport::Server::builder()
1195 .add_service(
1196 MessageProxyServiceServer::new(super::MessageProxy::new(options))
1197 .max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE)
1198 .max_encoding_message_size(MAX_ENCODING_MESSAGE_SIZE),
1199 )
1200 .serve_with_incoming_shutdown(
1201 TcpIncoming::from(tcp_listener).with_nodelay(Some(true)),
1202 completion.wait(),
1203 )
1204 .await
1205 .unwrap();
1206 }
1207 });
1208
1209 (completion, addr)
1210 }
1211
1212 async fn make_client(addr: SocketAddr) -> MessageProxyServiceClient<Channel> {
1213 MessageProxyServiceClient::new(
1214 Endpoint::from_shared(format!("http://{addr}"))
1215 .unwrap()
1216 .connect()
1217 .await
1218 .unwrap(),
1219 )
1220 .max_decoding_message_size(crate::MAX_DECODING_MESSAGE_SIZE)
1221 }
1222
1223 async fn write_messages(
1224 client: &mut MessageProxyServiceClient<Channel>,
1225 messages: Vec<LogMsg>,
1226 ) {
1227 client
1228 .write_messages(tokio_stream::iter(
1229 messages
1230 .clone()
1231 .into_iter()
1232 .map(|msg| msg.to_transport(Compression::Off).unwrap())
1233 .map(|msg| WriteMessagesRequest {
1234 log_msg: Some(msg.into()),
1235 }),
1236 ))
1237 .await
1238 .unwrap();
1239 }
1240
1241 async fn read_log_stream(
1242 log_stream: &mut tonic::Response<tonic::Streaming<ReadMessagesResponse>>,
1243 n: usize,
1244 ) -> Vec<LogMsg> {
1245 let mut app_id_cache = re_log_encoding::CachingApplicationIdInjector::default();
1246
1247 let mut stream_ref = log_stream.get_mut().map(|result| {
1248 let msg = result.unwrap().log_msg.unwrap().msg.unwrap();
1249 msg.to_application((&mut app_id_cache, None)).unwrap()
1250 });
1251
1252 let mut messages = Vec::new();
1253 for _ in 0..n {
1254 messages.push(stream_ref.next().await.unwrap());
1255 }
1256 messages
1257 }
1258
1259 #[tokio::test]
1260 async fn pubsub_basic() {
1261 let (completion, addr) = setup().await;
1262 let mut client = make_client(addr).await; let messages = fake_log_stream_blueprint(3);
1264
1265 let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1267
1268 write_messages(&mut client, messages.clone()).await;
1269
1270 let actual = read_log_stream(&mut log_stream, messages.len()).await;
1272
1273 assert_eq!(messages, actual);
1274
1275 assert!(matches!(messages[0], LogMsg::SetStoreInfo(..)));
1279 assert!(matches!(actual[0], LogMsg::SetStoreInfo(..)));
1280
1281 completion.finish();
1282 }
1283
1284 #[tokio::test]
1285 async fn pubsub_history() {
1286 let (completion, addr) = setup().await;
1287 let mut client = make_client(addr).await; let messages = fake_log_stream_blueprint(3);
1289
1290 write_messages(&mut client, messages.clone()).await;
1293
1294 let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1296
1297 let actual = read_log_stream(&mut log_stream, messages.len()).await;
1298 assert_eq!(messages, actual);
1299
1300 completion.finish();
1301 }
1302
1303 #[tokio::test]
1304 async fn one_producer_many_consumers() {
1305 let (completion, addr) = setup().await;
1306 let mut producer = make_client(addr).await; let mut consumers = vec![make_client(addr).await, make_client(addr).await];
1308 let messages = fake_log_stream_blueprint(3);
1309
1310 let mut log_streams = vec![];
1312 for consumer in &mut consumers {
1313 log_streams.push(
1314 consumer
1315 .read_messages(ReadMessagesRequest {})
1316 .await
1317 .unwrap(),
1318 );
1319 }
1320
1321 write_messages(&mut producer, messages.clone()).await;
1322
1323 for log_stream in &mut log_streams {
1325 let actual = read_log_stream(log_stream, messages.len()).await;
1326 assert_eq!(messages, actual);
1327 }
1328
1329 completion.finish();
1330 }
1331
1332 #[tokio::test]
1333 async fn many_producers_many_consumers() {
1334 let (completion, addr) = setup().await;
1335 let mut producers = vec![make_client(addr).await, make_client(addr).await];
1336 let mut consumers = vec![make_client(addr).await, make_client(addr).await];
1337 let messages = fake_log_stream_blueprint(3);
1338
1339 let mut log_streams = vec![];
1341 for consumer in &mut consumers {
1342 log_streams.push(
1343 consumer
1344 .read_messages(ReadMessagesRequest {})
1345 .await
1346 .unwrap(),
1347 );
1348 }
1349
1350 for producer in &mut producers {
1352 write_messages(producer, messages.clone()).await;
1353 }
1354
1355 let expected = [messages.clone(), messages.clone()].concat();
1356
1357 for log_stream in &mut log_streams {
1362 let actual = read_log_stream(log_stream, expected.len()).await;
1363 assert_eq!(actual, expected);
1364 }
1365
1366 completion.finish();
1367 }
1368
1369 #[tokio::test]
1370 async fn memory_limit_drops_messages() {
1371 let (completion, addr) = setup_with_memory_limit(MemoryLimit::from_bytes(1)).await;
1373 let mut client = make_client(addr).await;
1374 let messages = fake_log_stream_recording(3);
1375
1376 write_messages(&mut client, messages.clone()).await;
1377
1378 let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1380 let mut actual = vec![];
1381 loop {
1382 let timeout_stream = log_stream.get_mut().timeout(Duration::from_millis(100));
1383 tokio::pin!(timeout_stream);
1384 let timeout_result = timeout_stream.try_next().await;
1385 let mut app_id_cache = re_log_encoding::CachingApplicationIdInjector::default();
1386 match timeout_result {
1387 Ok(Some(value)) => {
1388 let msg = value.unwrap().log_msg.unwrap().msg.unwrap();
1389 actual.push(msg.to_application((&mut app_id_cache, None)).unwrap());
1390 }
1391
1392 Ok(None) | Err(_) => break,
1394 }
1395 }
1396
1397 assert_eq!(actual.len(), 2);
1399 assert_eq!(&actual[0], &messages[0]);
1400 assert_eq!(&actual[1], messages.last().unwrap());
1401
1402 completion.finish();
1403 }
1404
1405 #[tokio::test]
1406 async fn memory_limit_does_not_drop_blueprint() {
1407 let (completion, addr) = setup_with_memory_limit(MemoryLimit::from_bytes(1)).await;
1409 let mut client = make_client(addr).await;
1410 let messages = fake_log_stream_blueprint(3);
1411
1412 write_messages(&mut client, messages.clone()).await;
1414
1415 let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1417 let mut actual = vec![];
1418 loop {
1419 let timeout_stream = log_stream.get_mut().timeout(Duration::from_millis(100));
1420 tokio::pin!(timeout_stream);
1421 let timeout_result = timeout_stream.try_next().await;
1422 let mut app_id_cache = re_log_encoding::CachingApplicationIdInjector::default();
1423 match timeout_result {
1424 Ok(Some(value)) => {
1425 let msg = value.unwrap().log_msg.unwrap().msg.unwrap();
1426 actual.push(msg.to_application((&mut app_id_cache, None)).unwrap());
1427 }
1428
1429 Ok(None) | Err(_) => break,
1431 }
1432 }
1433
1434 assert_eq!(messages, actual);
1437
1438 completion.finish();
1439 }
1440
1441 #[tokio::test]
1442 async fn memory_limit_does_not_interrupt_stream() {
1443 let memory_limits = [
1444 0, 1, ];
1447
1448 for memory_limit in memory_limits {
1449 let (completion, addr) =
1450 setup_with_memory_limit(MemoryLimit::from_bytes(memory_limit)).await;
1451 let mut client = make_client(addr).await; let messages = fake_log_stream_blueprint(3);
1453
1454 let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1456
1457 write_messages(&mut client, messages.clone()).await;
1458
1459 let actual = read_log_stream(&mut log_stream, messages.len()).await;
1461 assert_eq!(messages, actual);
1462
1463 completion.finish();
1464 }
1465 }
1466
1467 #[tokio::test]
1468 async fn static_data_is_returned_first() {
1469 let (completion, addr) = setup_with_memory_limit(MemoryLimit::UNLIMITED).await;
1470 let mut client = make_client(addr).await;
1471
1472 let store_id = StoreId::random(StoreKind::Recording, "test_app");
1473
1474 let set_store_info = vec![set_store_info_msg(&store_id)];
1475 let first_static = generate_log_messages(&store_id, 3, Temporalness::Static);
1476 let first_temporal = generate_log_messages(&store_id, 3, Temporalness::Temporal);
1477 let second_static = generate_log_messages(&store_id, 3, Temporalness::Static);
1478
1479 write_messages(&mut client, set_store_info.clone()).await;
1480 write_messages(&mut client, first_static.clone()).await;
1481 write_messages(&mut client, first_temporal.clone()).await;
1482 write_messages(&mut client, second_static.clone()).await;
1483
1484 let expected =
1486 itertools::chain!(set_store_info, first_static, second_static, first_temporal)
1487 .collect_vec();
1488
1489 let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1490 let actual = read_log_stream(&mut log_stream, expected.len()).await;
1491
1492 assert_eq!(actual, expected);
1493
1494 completion.finish();
1495 }
1496
1497 #[tokio::test]
1498 async fn playback_newest_first() {
1499 let (completion, addr) = setup_opt(ServerOptions {
1500 playback_behavior: PlaybackBehavior::NewestFirst, memory_limit: MemoryLimit::UNLIMITED,
1502 })
1503 .await;
1504 let mut client = make_client(addr).await;
1505
1506 let store_id = StoreId::random(StoreKind::Recording, "test_app");
1507
1508 let set_store_info = vec![set_store_info_msg(&store_id)];
1509 let first_statics = generate_log_messages(&store_id, 3, Temporalness::Static);
1510 let temporals = generate_log_messages(&store_id, 3, Temporalness::Temporal);
1511 let second_statics = generate_log_messages(&store_id, 3, Temporalness::Static);
1512
1513 write_messages(&mut client, set_store_info.clone()).await;
1514 write_messages(&mut client, first_statics.clone()).await;
1515 write_messages(&mut client, temporals.clone()).await;
1516 write_messages(&mut client, second_statics.clone()).await;
1517
1518 let expected = itertools::chain!(
1520 set_store_info.into_iter().rev(),
1521 second_statics.into_iter().rev(),
1522 first_statics.into_iter().rev(),
1523 temporals.into_iter().rev()
1524 )
1525 .collect_vec();
1526
1527 let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1528 let actual = read_log_stream(&mut log_stream, expected.len()).await;
1529
1530 assert_eq!(actual, expected);
1531
1532 completion.finish();
1533 }
1534}