Skip to main content

re_grpc_server/
lib.rs

1//! Server for the legacy `StoreHub` API.
2
3pub mod shutdown;
4
5use std::collections::VecDeque;
6use std::net::SocketAddr;
7use std::pin::Pin;
8
9use re_byte_size::SizeBytes;
10use re_log_channel::{DataSourceMessage, DataSourceUiCommand};
11use re_log_encoding::{ToApplication as _, ToTransport as _};
12use re_log_types::TableMsg;
13use re_protos::common::v1alpha1::{
14    DataframePart as DataframePartProto, StoreKind as StoreKindProto, TableId as TableIdProto,
15};
16use re_protos::log_msg::v1alpha1::LogMsg as LogMsgProto;
17use re_protos::sdk_comms::v1alpha1::{
18    ReadMessagesRequest, ReadMessagesResponse, ReadTablesRequest, ReadTablesResponse,
19    SaveScreenshotRequest, SaveScreenshotResponse, WriteMessagesRequest, WriteMessagesResponse,
20    WriteTableRequest, WriteTableResponse, message_proxy_service_server,
21};
22use re_quota_channel::{async_broadcast_channel, async_mpsc_channel};
23use std::task::{Context, Poll};
24use tokio::net::TcpListener;
25use tokio::sync::oneshot;
26use tokio_stream::{Stream, StreamExt as _};
27use tonic::transport::Server;
28use tonic::transport::server::TcpIncoming;
29use tower_http::cors::CorsLayer;
30
31use crate::priority_stream::PriorityMerge;
32
33mod priority_stream;
34
35pub use re_memory::MemoryLimit;
36
37/// Default port of the OSS /proxy server.
38pub const DEFAULT_SERVER_PORT: u16 = 9876;
39
40pub const MAX_DECODING_MESSAGE_SIZE: usize = u32::MAX as usize;
41pub const MAX_ENCODING_MESSAGE_SIZE: usize = MAX_DECODING_MESSAGE_SIZE;
42
43/// Maximum number of messages in the input queue.
44const CHANNEL_SIZE_MESSAGES: usize = 1024; // TODO(emilk): move into `ServerOptions` after the patch release.
45
46/// Make sure we can handle a quick burst of messages without blocking,
47/// even if the server has a [`ServerOptions::memory_limit`] of zero.
48const CHANNEL_SIZE_BYTES: u64 = 128 * 1024 * 1024; // TODO(emilk): move into `ServerOptions` after the patch release.
49
50/// Options for the gRPC Proxy Server
51#[derive(Clone, Copy, Debug)]
52pub struct ServerOptions {
53    /// When a client connect, should they be sent the oldest data first, or the newest?
54    pub playback_behavior: PlaybackBehavior,
55
56    /// Limit on how much history the server saves.
57    ///
58    /// It will start garbage collecting old data when we reach this.
59    pub memory_limit: MemoryLimit, // TODO(emilk): rename `history_limit`
60}
61
62impl Default for ServerOptions {
63    fn default() -> Self {
64        Self {
65            playback_behavior: PlaybackBehavior::OldestFirst,
66            memory_limit: MemoryLimit::from_bytes(1024 * 1024 * 1024), // Be very conservative by default
67        }
68    }
69}
70
71/// What happens when a client connects to a gRPC server?
72#[derive(Clone, Copy, Debug)]
73pub enum PlaybackBehavior {
74    /// Start playing back all the old data first,
75    /// and only after start sending anything that happened since.
76    OldestFirst,
77
78    /// Prioritize the newest arriving messages,
79    /// replaying the history later, starting with the newest.
80    NewestFirst,
81}
82
83impl PlaybackBehavior {
84    pub fn from_newest_first(newest_first: bool) -> Self {
85        if newest_first {
86            Self::NewestFirst
87        } else {
88            Self::OldestFirst
89        }
90    }
91}
92
93/// Wrapper with a nicer error message
94#[derive(Debug)]
95pub struct TonicStatusError(pub tonic::Status);
96
97impl std::fmt::Display for TonicStatusError {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        // TODO(emilk): duplicated in `re_grpc_client`
100        let status = &self.0;
101
102        write!(f, "gRPC error")?;
103
104        if status.code() != tonic::Code::Unknown {
105            write!(f, ", code: '{}'", status.code())?;
106        }
107        if !status.message().is_empty() {
108            write!(f, ", message: {:?}", status.message())?;
109        }
110        // Binary data - not useful.
111        // if !status.details().is_empty() {
112        //     write!(f, ", details: {:?}", status.details())?;
113        // }
114        if !status.metadata().is_empty() {
115            write!(f, ", metadata: {:?}", status.metadata().as_ref())?;
116        }
117        Ok(())
118    }
119}
120
121impl From<tonic::Status> for TonicStatusError {
122    fn from(value: tonic::Status) -> Self {
123        Self(value)
124    }
125}
126
127// TODO(jan): Refactor `serve`/`spawn` variants into a builder?
128
129/// Start a Rerun server, listening on `addr`.
130///
131/// A Rerun server is an in-memory implementation of a Storage Node.
132///
133/// The returned future must be polled for the server to make progress.
134///
135/// Currently, the only RPCs supported by the server are `WriteMessages` and `ReadMessages`.
136///
137/// Clients send data to the server via `WriteMessages`. Any sent messages will be stored
138/// in the server's message queue. Messages are only removed if the server hits its configured
139/// memory limit.
140///
141/// Clients receive data from the server via `ReadMessages`. Upon establishing the stream,
142/// the server sends all messages stored in its message queue, and subscribes the client
143/// to the queue. Any messages sent to the server through `WriteMessages` will be proxied
144/// to the open `ReadMessages` stream.
145pub async fn serve(
146    addr: SocketAddr,
147    options: ServerOptions,
148    shutdown: shutdown::Shutdown,
149) -> anyhow::Result<()> {
150    serve_impl(addr, options, MessageProxy::new(options), shutdown).await
151}
152
153async fn serve_impl(
154    addr: SocketAddr,
155    options: ServerOptions,
156    message_proxy: MessageProxy,
157    shutdown: shutdown::Shutdown,
158) -> anyhow::Result<()> {
159    // TODO(rust-lang/rust#130668): When listening on `::` we want to listen to both ipv6 `::` and ipv4 `0.0.0.0`
160    // On Mac & Linux this happens automatically since all sockets are dual-stack by default.
161    // On Windows, the dual stack behavior is opt-in, but `TcpListener::bind` does not expose the option.
162    // To work around this, we explicitly listen on both ipv4 & ipv6 if an unspecified ipv6 address is used.
163    let dual_stack_windows = cfg!(target_os = "windows")
164        && matches!(addr.ip(), std::net::IpAddr::V6(ipv6) if ipv6.is_unspecified());
165
166    let incoming: Pin<Box<dyn Stream<Item = _> + Send>> = if dual_stack_windows {
167        let ipv6_addr = addr;
168        let ipv4_addr = SocketAddr::V4(std::net::SocketAddrV4::new(
169            std::net::Ipv4Addr::UNSPECIFIED,
170            addr.port(),
171        ));
172
173        let tcp_listener_ipv6 = TcpListener::bind(ipv6_addr).await?;
174        let tcp_listener_ipv4 = TcpListener::bind(ipv4_addr).await?;
175
176        let incoming_ipv6 = TcpIncoming::from(tcp_listener_ipv6).with_nodelay(Some(true));
177        let incoming_ipv4 = TcpIncoming::from(tcp_listener_ipv4).with_nodelay(Some(true));
178
179        // Merge both streams into a single stream
180        let merged = tokio_stream::StreamExt::merge(incoming_ipv6, incoming_ipv4);
181
182        let connect_addr = format!("rerun+http://127.0.0.1:{}/proxy", addr.port());
183
184        re_log::info!(
185            "Listening for gRPC connections on {ipv6_addr} and {ipv4_addr}. Connect by running `rerun --connect {connect_addr}`",
186        );
187
188        Box::pin(merged)
189    } else {
190        let tcp_listener = TcpListener::bind(addr).await?;
191        let incoming = TcpIncoming::from(tcp_listener).with_nodelay(Some(true));
192
193        let connect_addr = if addr.ip().is_loopback() || addr.ip().is_unspecified() {
194            format!("rerun+http://127.0.0.1:{}/proxy", addr.port())
195        } else {
196            format!("rerun+http://{addr}/proxy")
197        };
198
199        re_log::info!(
200            "Listening for gRPC connections on {addr}. Connect by running `rerun --connect {connect_addr}`",
201        );
202
203        Box::pin(incoming)
204    };
205
206    re_log::debug!("Server memory limit set at {}", options.memory_limit);
207
208    let cors = CorsLayer::very_permissive();
209    let grpc_web = tonic_web::GrpcWebLayer::new();
210
211    let routes = {
212        let mut routes_builder = tonic::service::Routes::builder();
213        routes_builder.add_service(
214            re_protos::sdk_comms::v1alpha1::message_proxy_service_server::MessageProxyServiceServer::new(
215                message_proxy,
216            )
217            .max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE)
218            .max_encoding_message_size(MAX_ENCODING_MESSAGE_SIZE),
219        );
220        routes_builder.routes()
221    };
222
223    Server::builder()
224        .accept_http1(true) // Support `grpc-web` clients
225        .layer(cors) // Allow CORS requests from web clients
226        .layer(grpc_web) // Support `grpc-web` clients
227        .add_routes(routes)
228        .serve_with_incoming_shutdown(incoming, shutdown.wait())
229        .await?;
230
231    Ok(())
232}
233
234/// Start a Rerun server, listening on `addr`.
235///
236/// The returned future must be polled for the server to make progress.
237///
238/// This function additionally accepts a smart channel, through which messages
239/// can be sent to the server directly. It is similar to creating a client
240/// and sending messages through `WriteMessages`, but without the overhead
241/// of a localhost connection.
242///
243/// See [`serve`] for more information about what a Rerun server is.
244pub async fn serve_from_channel(
245    addr: SocketAddr,
246    options: ServerOptions,
247    shutdown: shutdown::Shutdown,
248    channel_rx: re_log_channel::LogReceiver,
249) {
250    let message_proxy = MessageProxy::new(options);
251    let event_tx = message_proxy.event_tx.clone();
252
253    tokio::task::spawn_blocking(move || {
254        use re_log_channel::SmartMessagePayload;
255
256        loop {
257            let msg = if let Ok(msg) = channel_rx.recv() {
258                match msg.payload {
259                    SmartMessagePayload::Msg(msg) => msg,
260                    SmartMessagePayload::Flush { on_flush_done } => {
261                        on_flush_done(); // we don't buffer
262                        continue;
263                    }
264                    SmartMessagePayload::Quit(err) => {
265                        if let Some(err) = err {
266                            re_log::debug!("smart channel sender quit: {err}");
267                        } else {
268                            re_log::debug!("smart channel sender quit");
269                        }
270                        break;
271                    }
272                }
273            } else {
274                re_log::debug!("smart channel sender closed, closing receiver");
275                break;
276            };
277
278            match msg {
279                DataSourceMessage::LogMsg(msg) => {
280                    let msg = match msg.to_transport(re_log_encoding::rrd::Compression::LZ4) {
281                        Ok(msg) => msg,
282                        Err(err) => {
283                            re_log::error!("failed to encode message: {err}");
284                            continue;
285                        }
286                    };
287
288                    if event_tx
289                        .blocking_send(Event::Message(LogOrTableMsgProto::LogMsg(msg.into())))
290                        .is_err()
291                    {
292                        re_log::debug!("shut down, closing sender");
293                        break;
294                    }
295                }
296                unsupported => {
297                    re_log::error_once!(
298                        "Not implemented: re_grpc_server support for {}",
299                        unsupported.variant_name()
300                    );
301                }
302            }
303        }
304    });
305
306    if let Err(err) = serve_impl(addr, options, message_proxy, shutdown).await {
307        re_log::error!("message proxy server crashed: {err}");
308    }
309}
310
311/// Start a Rerun server, listening on `addr`.
312///
313/// This function additionally accepts a [`re_log_channel::LogReceiverSet`], from which the
314/// server will read all messages. It is similar to creating a client
315/// and sending messages through `WriteMessages`, but without the overhead
316/// of a localhost connection.
317///
318/// See [`serve`] for more information about what a Rerun server is.
319pub fn spawn_from_rx_set(
320    addr: SocketAddr,
321    options: ServerOptions,
322    shutdown: shutdown::Shutdown,
323    rxs: re_log_channel::LogReceiverSet,
324) {
325    let message_proxy = MessageProxy::new(options);
326    let event_tx = message_proxy.event_tx.clone();
327
328    tokio::spawn(async move {
329        if let Err(err) = serve_impl(addr, options, message_proxy, shutdown).await {
330            re_log::error!("message proxy server crashed: {err}");
331        }
332    });
333
334    tokio::task::spawn_blocking(move || {
335        use re_log_channel::SmartMessagePayload;
336
337        loop {
338            let msg = if let Ok(msg) = rxs.recv() {
339                match msg.payload {
340                    SmartMessagePayload::Msg(msg) => msg,
341                    SmartMessagePayload::Flush { on_flush_done } => {
342                        on_flush_done(); // we don't buffer
343                        continue;
344                    }
345                    SmartMessagePayload::Quit(err) => {
346                        if let Some(err) = err {
347                            re_log::debug!("smart channel sender quit: {err}");
348                        } else {
349                            re_log::debug!("smart channel sender quit");
350                        }
351                        if rxs.is_empty() {
352                            // We won't ever receive more data:
353                            break;
354                        }
355                        continue;
356                    }
357                }
358            } else {
359                if rxs.is_empty() {
360                    // We won't ever receive more data:
361                    break;
362                }
363                continue;
364            };
365
366            match msg {
367                DataSourceMessage::LogMsg(msg) => {
368                    let msg = match msg.to_transport(re_log_encoding::rrd::Compression::LZ4) {
369                        Ok(msg) => msg,
370                        Err(err) => {
371                            re_log::error!("failed to encode message: {err}");
372                            continue;
373                        }
374                    };
375
376                    if event_tx
377                        .blocking_send(Event::Message(LogOrTableMsgProto::LogMsg(msg.into())))
378                        .is_err()
379                    {
380                        re_log::debug!("shut down, closing sender");
381                        break;
382                    }
383                }
384                unsupported => {
385                    re_log::error_once!(
386                        "gRPC proxy server cannot forward {}",
387                        unsupported.variant_name()
388                    );
389                }
390            }
391        }
392    });
393}
394
395/// Start a Rerun server, listening on `addr`.
396///
397/// This function additionally creates a smart channel, and returns its receiving end.
398/// Any messages received by the server are sent through the channel. This is similar
399/// to creating a client and calling `ReadMessages`, but without the overhead of a
400/// localhost connection.
401///
402/// The server is spawned as a task on a `tokio` runtime. This function panics if the
403/// runtime is not available.
404///
405/// See [`serve`] for more information about what a Rerun server is.
406pub fn spawn_with_recv(
407    addr: SocketAddr,
408    options: ServerOptions,
409    shutdown: shutdown::Shutdown,
410) -> re_log_channel::LogReceiver {
411    let uri = re_uri::ProxyUri::new(re_uri::Origin::from_scheme_and_socket_addr(
412        re_uri::Scheme::RerunHttp,
413        addr,
414    ));
415
416    let (channel_log_tx, channel_log_rx) =
417        re_log_channel::log_channel(re_log_channel::LogSource::MessageProxy(uri));
418
419    let (message_proxy, mut broadcast_log_rx) = MessageProxy::new_with_recv(options);
420
421    tokio::spawn(async move {
422        if let Err(err) = serve_impl(addr, options, message_proxy, shutdown).await {
423            re_log::error!("message proxy server crashed: {err}");
424        }
425    });
426
427    tokio::spawn(async move {
428        let mut app_id_cache = re_log_encoding::CachingApplicationIdInjector::default();
429
430        loop {
431            let msg: anyhow::Result<DataSourceMessage> = match broadcast_log_rx.recv().await {
432                Ok(inner) => match inner {
433                    LogOrTableMsgProto::LogMsg(msg) => match msg.msg {
434                        Some(msg) => msg
435                            .to_application((&mut app_id_cache, None))
436                            .map(DataSourceMessage::LogMsg)
437                            .map_err(|err| err.into()),
438                        None => Err(re_protos::missing_field!(
439                            re_protos::log_msg::v1alpha1::LogMsg,
440                            "msg"
441                        )
442                        .into()),
443                    },
444
445                    LogOrTableMsgProto::Table(msg) => match msg.data.try_into() {
446                        Ok(data) => Ok(DataSourceMessage::TableMsg(TableMsg {
447                            id: msg.id.into(),
448                            data,
449                        })),
450                        Err(err) => {
451                            re_log::error!("Dropping LogMsg::Table due to failed decode: {err}");
452                            continue;
453                        }
454                    },
455
456                    LogOrTableMsgProto::UiCommand(cmd) => Ok(DataSourceMessage::UiCommand(cmd)),
457                },
458
459                Err(async_broadcast_channel::RecvError::Closed) => {
460                    re_log::debug!("message proxy server shut down, closing receiver");
461                    channel_log_tx.quit(None).ok();
462                    break;
463                }
464            };
465            match msg {
466                Ok(mut log_msg) => {
467                    if let Some(metadata_key) =
468                        re_sorbet::TimestampLocation::IPCDecode.metadata_key()
469                    {
470                        // Insert the timestamp metadata into the Arrow message for accurate e2e latency measurements.
471                        // Note that this function is only called by the viewer
472                        // (that's what the message-receiver is connected to).
473                        log_msg.insert_arrow_record_batch_metadata(
474                            metadata_key.to_owned(),
475                            re_sorbet::timestamp_metadata::now_timestamp(),
476                        );
477                    }
478
479                    if channel_log_tx.send(log_msg).is_err() {
480                        re_log::debug!(
481                            "message proxy smart channel receiver closed, closing sender"
482                        );
483                        break;
484                    }
485                }
486                Err(err) => {
487                    re_log::error!("dropping LogMsg due to failed decode: {err}");
488                }
489            }
490        }
491    });
492
493    channel_log_rx
494}
495
496enum Event {
497    /// New client connected, requesting full history and subscribing to new messages.
498    NewClient(
499        oneshot::Sender<(
500            Vec<LogOrTableMsgProto>,
501            async_broadcast_channel::Receiver<LogOrTableMsgProto>,
502        )>,
503    ),
504
505    /// A client sent a message.
506    Message(LogOrTableMsgProto),
507}
508
509#[derive(Clone)]
510struct TableMsgProto {
511    id: TableIdProto,
512    data: DataframePartProto,
513}
514// -----------------------------------------------------------------------------------
515
516#[derive(Clone)]
517enum LogOrTableMsgProto {
518    LogMsg(LogMsgProto),
519    Table(TableMsgProto),
520    UiCommand(DataSourceUiCommand),
521}
522
523impl SizeBytes for LogOrTableMsgProto {
524    fn heap_size_bytes(&self) -> u64 {
525        match self {
526            Self::LogMsg(log_msg) => log_msg.heap_size_bytes(),
527            Self::Table(table) => table.heap_size_bytes(),
528            Self::UiCommand(cmd) => cmd.heap_size_bytes(),
529        }
530    }
531}
532
533impl From<LogMsgProto> for LogOrTableMsgProto {
534    fn from(value: LogMsgProto) -> Self {
535        Self::LogMsg(value)
536    }
537}
538
539impl From<TableMsgProto> for LogOrTableMsgProto {
540    fn from(value: TableMsgProto) -> Self {
541        Self::Table(value)
542    }
543}
544
545impl From<DataSourceUiCommand> for LogOrTableMsgProto {
546    fn from(value: DataSourceUiCommand) -> Self {
547        Self::UiCommand(value)
548    }
549}
550
551// -----------------------------------------------------------------------------------
552
553#[derive(Default)]
554struct MsgQueue {
555    /// Messages stored in order of arrival, and garbage collected if the server hits the memory limit.
556    queue: VecDeque<LogOrTableMsgProto>,
557
558    /// Total size of [`Self::queue`] in bytes.
559    size_bytes: u64,
560}
561
562impl MsgQueue {
563    pub fn iter(&self) -> impl DoubleEndedIterator<Item = &LogOrTableMsgProto> {
564        self.queue.iter()
565    }
566
567    pub fn push_back(&mut self, msg: LogOrTableMsgProto) {
568        self.size_bytes += msg.total_size_bytes();
569        self.queue.push_back(msg);
570    }
571
572    pub fn pop_front(&mut self) -> Option<LogOrTableMsgProto> {
573        if let Some(msg) = self.queue.pop_front() {
574            self.size_bytes -= msg.total_size_bytes();
575            Some(msg)
576        } else {
577            None
578        }
579    }
580}
581
582// -----------------------------------------------------------------------------------
583
584/// Contains all messages received so far,
585/// minus some that are garbage collected when needed.
586#[derive(Default)]
587struct MessageBuffer {
588    /// Normal data messages.
589    ///
590    /// First to be garbage collected if we run into the memory limit.
591    disposable: MsgQueue,
592
593    /// "Static" (non-temporal) data messages.
594    ///
595    /// Our chunk-store already keeps static messages forever,
596    /// and it makes sense: you usually log them once,
597    /// and then expect them to stay around.
598    ///
599    /// We keep the static messages for as long as we can, but if [`Self::disposable`]
600    /// is empty and we're still over our memory budget, we start throwing
601    /// away the oldest messages from here too.
602    /// This is because some users use static logging for camera images,
603    /// which adds up very quickly.
604    ///
605    /// Ideally we would keep exactly one static message per entity/component stream
606    /// (like the `ChunkStore` does), but we'll save that for:
607    /// TODO(#5531): replace this with `ChunkStore`
608    static_: MsgQueue,
609
610    /// These are never garbage collected.
611    persistent: MsgQueue,
612}
613
614impl MessageBuffer {
615    fn size_bytes(&self) -> u64 {
616        let Self {
617            disposable,
618            static_,
619            persistent,
620        } = self;
621        disposable.size_bytes + static_.size_bytes + persistent.size_bytes
622    }
623
624    fn all(&self, playback_behavior: PlaybackBehavior) -> Vec<LogOrTableMsgProto> {
625        re_tracing::profile_function!();
626
627        let Self {
628            disposable,
629            static_,
630            persistent,
631        } = self;
632
633        // Note: we ALWAYS send the persistent and static data before the disposable,
634        // regardless of PlaybackBehavior!
635
636        match playback_behavior {
637            PlaybackBehavior::OldestFirst => {
638                itertools::chain!(persistent.iter(), static_.iter(), disposable.iter())
639                    .cloned()
640                    .collect()
641            }
642            PlaybackBehavior::NewestFirst => itertools::chain!(
643                persistent.iter().rev(),
644                static_.iter().rev(),
645                disposable.iter().rev()
646            )
647            .cloned()
648            .collect(),
649        }
650    }
651
652    fn add_msg(&mut self, msg: LogOrTableMsgProto) {
653        match msg {
654            LogOrTableMsgProto::LogMsg(msg) => self.add_log_msg(msg),
655            LogOrTableMsgProto::Table(msg) => {
656                self.disposable.push_back(msg.into());
657            }
658            LogOrTableMsgProto::UiCommand(msg) => {
659                self.disposable.push_back(msg.into());
660            }
661        }
662    }
663
664    fn add_log_msg(&mut self, msg: LogMsgProto) {
665        let Some(inner) = &msg.msg else {
666            re_log::error!(
667                "{}",
668                re_protos::missing_field!(re_protos::log_msg::v1alpha1::LogMsg, "msg")
669            );
670            return;
671        };
672
673        // We put store info, blueprint data, and blueprint activation commands
674        // in a separate queue that does *not* get garbage collected.
675        use re_protos::log_msg::v1alpha1::log_msg::Msg;
676        match inner {
677            // Store info, blueprint activation commands
678            Msg::SetStoreInfo(..) | Msg::BlueprintActivationCommand(..) => {
679                self.persistent.push_back(msg.into());
680            }
681
682            Msg::ArrowMsg(inner) => {
683                let is_blueprint = inner
684                    .store_id
685                    .as_ref()
686                    .is_some_and(|id| id.kind() == StoreKindProto::Blueprint);
687
688                if is_blueprint {
689                    // Persist blueprint messages forever.
690                    self.persistent.push_back(msg.into());
691                } else if inner.is_static == Some(true) {
692                    self.static_.push_back(msg.into());
693                } else {
694                    // Recording data
695                    self.disposable.push_back(msg.into());
696                }
697            }
698        }
699    }
700
701    pub fn gc(&mut self, max_bytes: u64) {
702        if self.size_bytes() <= max_bytes {
703            // We're not using too much memory.
704            return;
705        }
706
707        re_tracing::profile_scope!("Drop messages");
708        re_log::info_once!(
709            "Exceeded gRPC proxy server memory limit ({}). Dropping the oldest log messages. Clients connecting after this will not see the full history.",
710            re_format::format_bytes(max_bytes as _)
711        );
712
713        let start_size = self.size_bytes();
714        let mut messages_dropped = 0;
715
716        while self.disposable.pop_front().is_some() {
717            messages_dropped += 1;
718            if self.size_bytes() < max_bytes {
719                break;
720            }
721        }
722
723        if max_bytes < self.size_bytes() {
724            re_log::info_once!(
725                "Exceeded gRPC proxy server memory limit ({}). Dropping old *static* log messages as well. Clients connecting after this will no longer see the complete set of static data.",
726                re_format::format_bytes(max_bytes as _)
727            );
728            while self.static_.pop_front().is_some() {
729                messages_dropped += 1;
730                if self.size_bytes() < max_bytes {
731                    break;
732                }
733            }
734        }
735
736        let bytes_dropped = start_size - self.size_bytes();
737
738        re_log::trace!(
739            "Dropped {} bytes in {messages_dropped} message(s)",
740            re_format::format_bytes(bytes_dropped as _)
741        );
742
743        if max_bytes < self.size_bytes() {
744            re_log::warn_once!(
745                "The gRPC server is using more memory than the given memory limit ({}), despite having garbage-collected all non-persistent messages.",
746                re_format::format_bytes(max_bytes as _)
747            );
748        }
749    }
750}
751
752// -----------------------------------------------------------------------------------
753
754/// A wrapper that converts an `async_broadcast_channel::Receiver` into a `Stream`.
755///
756/// This uses `async_stream` internally to bridge the async recv method to Stream.
757/// The stream yields the inner value (unwrapped from `Tracked`).
758struct BackPressureReceiverStream<T: Clone + SizeBytes + Send + Sync + 'static> {
759    inner: Pin<Box<dyn Stream<Item = Result<T, async_broadcast_channel::RecvError>> + Send>>,
760}
761
762impl<T: Clone + SizeBytes + Send + Sync + 'static> BackPressureReceiverStream<T> {
763    fn new(mut receiver: async_broadcast_channel::Receiver<T>) -> Self {
764        let stream = async_stream::stream! {
765            while let Ok(value) = receiver.recv().await {
766                yield Ok(value);
767            }
768        };
769        Self {
770            inner: Box::pin(stream),
771        }
772    }
773}
774
775impl<T: Clone + SizeBytes + Send + Sync + 'static> Stream for BackPressureReceiverStream<T> {
776    type Item = Result<T, async_broadcast_channel::RecvError>;
777
778    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
779        self.inner.as_mut().poll_next(cx)
780    }
781}
782
783// -----------------------------------------------------------------------------------
784
785/// Main event loop for the server, which runs in its own task.
786///
787/// Handles message history, and broadcasts messages to clients.
788struct EventLoop {
789    options: ServerOptions,
790
791    /// New log messages are broadcast to all clients.
792    /// Uses a back-pressure channel that blocks senders when the byte limit is exceeded.
793    broadcast_log_tx: async_broadcast_channel::Sender<LogOrTableMsgProto>,
794
795    /// Channel for incoming events.
796    event_rx: async_mpsc_channel::Receiver<Event>,
797
798    /// All messages received so far, minus those that have been garbage collected.
799    history: MessageBuffer,
800}
801
802impl EventLoop {
803    fn new(
804        options: ServerOptions,
805        event_rx: async_mpsc_channel::Receiver<Event>,
806        broadcast_log_tx: async_broadcast_channel::Sender<LogOrTableMsgProto>,
807    ) -> Self {
808        Self {
809            options,
810            broadcast_log_tx,
811            event_rx,
812            history: Default::default(),
813        }
814    }
815
816    async fn run_in_place(mut self) {
817        loop {
818            let Some(event) = self.event_rx.recv().await else {
819                break;
820            };
821
822            match event {
823                Event::NewClient(channel) => {
824                    channel
825                        .send((
826                            self.history.all(self.options.playback_behavior),
827                            self.broadcast_log_tx.subscribe(),
828                        ))
829                        .ok();
830                }
831                Event::Message(msg) => self.handle_msg(msg).await,
832            }
833        }
834    }
835
836    async fn handle_msg(&mut self, msg: LogOrTableMsgProto) {
837        // This will block if the broadcast channel is full, applying back-pressure
838        self.broadcast_log_tx.send_async(msg.clone()).await.ok();
839
840        if !self.is_history_enabled() {
841            // no need to gc or maintain history
842            return;
843        }
844
845        self.gc_if_using_too_much_ram();
846
847        self.history.add_msg(msg);
848    }
849
850    fn is_history_enabled(&self) -> bool {
851        self.options.memory_limit != MemoryLimit::ZERO
852    }
853
854    fn gc_if_using_too_much_ram(&mut self) {
855        if self.options.memory_limit.is_limited() {
856            self.history.gc(self.options.memory_limit.as_bytes());
857        }
858    }
859}
860
861impl SizeBytes for TableMsgProto {
862    fn heap_size_bytes(&self) -> u64 {
863        let Self { id, data } = self;
864        id.heap_size_bytes() + data.heap_size_bytes()
865    }
866}
867
868pub struct MessageProxy {
869    options: ServerOptions,
870    _queue_task_handle: tokio::task::JoinHandle<()>,
871    event_tx: async_mpsc_channel::Sender<Event>,
872}
873
874impl MessageProxy {
875    pub fn new(options: ServerOptions) -> Self {
876        Self::new_with_recv(options).0
877    }
878
879    fn new_with_recv(
880        options: ServerOptions,
881    ) -> (Self, async_broadcast_channel::Receiver<LogOrTableMsgProto>) {
882        let (broadcast_log_tx, broadcast_log_rx) = async_broadcast_channel::channel(
883            "re_grpc_server broadcast",
884            CHANNEL_SIZE_MESSAGES,
885            CHANNEL_SIZE_BYTES,
886        );
887
888        let (event_tx, event_rx) = {
889            // TODO(emilk): this could also use a size-based backpressure mechanism.
890            let message_queue_capacity = 32; // Apply backpressure early
891            async_mpsc_channel::channel("re_grpc_server events", message_queue_capacity)
892        };
893
894        let task_handle = tokio::spawn(async move {
895            EventLoop::new(options, event_rx, broadcast_log_tx)
896                .run_in_place()
897                .await;
898        });
899
900        (
901            Self {
902                options,
903                _queue_task_handle: task_handle,
904                event_tx,
905            },
906            broadcast_log_rx,
907        )
908    }
909
910    async fn push_message(&self, message: impl Into<LogOrTableMsgProto>) {
911        let message = message.into();
912        self.event_tx.send(Event::Message(message)).await.ok();
913    }
914
915    async fn new_client_message_stream(&self) -> ReadMsgStream {
916        let (sender, receiver) = oneshot::channel();
917        if let Err(err) = self.event_tx.send(Event::NewClient(sender)).await {
918            re_log::error!("Error accepting new client: {err}");
919            return Box::pin(tokio_stream::empty());
920        }
921        let (history, msg_channel) = match receiver.await {
922            Ok(v) => v,
923            Err(err) => {
924                re_log::error!("Error accepting new client: {err}");
925                return Box::pin(tokio_stream::empty());
926            }
927        };
928
929        let history = tokio_stream::iter(
930            history
931                .into_iter()
932                .map(ReadLogOrTableMsgResponse::from)
933                .map(Ok),
934        );
935
936        // Convert our backpressure receiver into a Stream
937        let channel = BackPressureReceiverStream::new(msg_channel).map(|result| {
938            result.map(ReadLogOrTableMsgResponse::from).map_err(|err| {
939                re_log::error!("Error reading message from broadcast channel: {err}");
940                tonic::Status::internal(format!("internal channel error: {err}"))
941            })
942        });
943
944        match self.options.playback_behavior {
945            PlaybackBehavior::OldestFirst => Box::pin(history.chain(channel)),
946            PlaybackBehavior::NewestFirst => Box::pin(PriorityMerge::new(channel, history)),
947        }
948    }
949
950    async fn new_client_log_stream(&self) -> ReadLogStream {
951        Box::pin(
952            self.new_client_message_stream()
953                .await
954                .filter_map(|msg| match msg {
955                    Ok(ReadLogOrTableMsgResponse::LogMsg(msg)) => Some(Ok(msg)),
956                    Ok(ReadLogOrTableMsgResponse::TableMsg(_)) => {
957                        re_log::warn_once!("A log stream got a TableMsg");
958                        None
959                    }
960                    Ok(ReadLogOrTableMsgResponse::UiCommand) => {
961                        re_log::warn_once!("A log stream got a UiCommandMsg");
962                        None
963                    }
964                    Err(err) => Some(Err(err)),
965                }),
966        )
967    }
968
969    async fn new_client_table_stream(&self) -> ReadTablesStream {
970        Box::pin(
971            self.new_client_message_stream()
972                .await
973                .filter_map(|msg| match msg {
974                    Ok(ReadLogOrTableMsgResponse::LogMsg(_)) => {
975                        re_log::warn_once!("A table stream got a LogMsg");
976                        None
977                    }
978                    Ok(ReadLogOrTableMsgResponse::TableMsg(msg)) => Some(Ok(msg)),
979                    Ok(ReadLogOrTableMsgResponse::UiCommand) => {
980                        re_log::warn_once!("A log stream got a UiCommandMsg");
981                        None
982                    }
983                    Err(err) => Some(Err(err)),
984                }),
985        )
986    }
987}
988
989enum ReadLogOrTableMsgResponse {
990    LogMsg(ReadMessagesResponse),
991    TableMsg(ReadTablesResponse),
992    UiCommand,
993}
994
995impl From<LogOrTableMsgProto> for ReadLogOrTableMsgResponse {
996    fn from(proto: LogOrTableMsgProto) -> Self {
997        match proto {
998            LogOrTableMsgProto::LogMsg(log_msg) => Self::LogMsg(ReadMessagesResponse {
999                log_msg: Some(log_msg),
1000            }),
1001            LogOrTableMsgProto::Table(table_msg) => Self::TableMsg(ReadTablesResponse {
1002                id: Some(table_msg.id),
1003                data: Some(table_msg.data),
1004            }),
1005            LogOrTableMsgProto::UiCommand(_ui_command) => Self::UiCommand,
1006        }
1007    }
1008}
1009
1010type ReadLogStream = Pin<Box<dyn Stream<Item = tonic::Result<ReadMessagesResponse>> + Send>>;
1011type ReadTablesStream = Pin<Box<dyn Stream<Item = tonic::Result<ReadTablesResponse>> + Send>>;
1012
1013type ReadMsgStream = Pin<Box<dyn Stream<Item = tonic::Result<ReadLogOrTableMsgResponse>> + Send>>;
1014
1015#[tonic::async_trait]
1016impl message_proxy_service_server::MessageProxyService for MessageProxy {
1017    async fn write_messages(
1018        &self,
1019        request: tonic::Request<tonic::Streaming<WriteMessagesRequest>>,
1020    ) -> tonic::Result<tonic::Response<WriteMessagesResponse>> {
1021        let mut stream = request.into_inner();
1022        loop {
1023            match stream.message().await {
1024                Ok(Some(WriteMessagesRequest {
1025                    log_msg: Some(log_msg),
1026                })) => {
1027                    self.push_message(log_msg).await;
1028                }
1029
1030                Ok(Some(WriteMessagesRequest { log_msg: None })) => {
1031                    re_log::warn!("missing log_msg in WriteMessagesRequest");
1032                }
1033
1034                Ok(None) => {
1035                    // Connection was closed
1036                    break;
1037                }
1038
1039                Err(err) => {
1040                    re_log::error!("Error while receiving messages: {}", TonicStatusError(err));
1041                    break;
1042                }
1043            }
1044        }
1045
1046        Ok(tonic::Response::new(WriteMessagesResponse {}))
1047    }
1048
1049    type ReadMessagesStream = ReadLogStream;
1050
1051    async fn read_messages(
1052        &self,
1053        _: tonic::Request<ReadMessagesRequest>,
1054    ) -> tonic::Result<tonic::Response<Self::ReadMessagesStream>> {
1055        Ok(tonic::Response::new(self.new_client_log_stream().await))
1056    }
1057
1058    type ReadTablesStream = ReadTablesStream;
1059
1060    async fn write_table(
1061        &self,
1062        request: tonic::Request<WriteTableRequest>,
1063    ) -> tonic::Result<tonic::Response<WriteTableResponse>> {
1064        if let WriteTableRequest {
1065            id: Some(id),
1066            data: Some(data),
1067        } = request.into_inner()
1068        {
1069            self.push_message(TableMsgProto { id, data }).await;
1070        } else {
1071            re_log::warn!("malformed `WriteTableRequest`");
1072        }
1073
1074        Ok(tonic::Response::new(WriteTableResponse {}))
1075    }
1076
1077    async fn read_tables(
1078        &self,
1079        _: tonic::Request<ReadTablesRequest>,
1080    ) -> tonic::Result<tonic::Response<Self::ReadTablesStream>> {
1081        Ok(tonic::Response::new(self.new_client_table_stream().await))
1082    }
1083
1084    async fn save_screenshot(
1085        &self,
1086        request: tonic::Request<SaveScreenshotRequest>,
1087    ) -> tonic::Result<tonic::Response<SaveScreenshotResponse>> {
1088        let SaveScreenshotRequest { view_id, file_path } = request.into_inner();
1089        self.push_message(DataSourceUiCommand::SaveScreenshot {
1090            file_path: file_path.into(),
1091            view_id,
1092        })
1093        .await;
1094
1095        Ok(tonic::Response::new(SaveScreenshotResponse {}))
1096    }
1097}
1098
1099#[cfg(test)]
1100mod tests {
1101    use std::net::SocketAddr;
1102    use std::sync::Arc;
1103    use std::time::Duration;
1104
1105    use itertools::{Itertools as _, chain};
1106    use re_chunk::RowId;
1107    use re_log_encoding::rrd::Compression;
1108    use re_log_types::{LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource};
1109    use re_protos::sdk_comms::v1alpha1::message_proxy_service_client::MessageProxyServiceClient;
1110    use re_protos::sdk_comms::v1alpha1::message_proxy_service_server::MessageProxyServiceServer;
1111    use similar_asserts::assert_eq;
1112    use tokio::net::TcpListener;
1113    use tokio_util::sync::CancellationToken;
1114    use tonic::transport::server::TcpIncoming;
1115    use tonic::transport::{Channel, Endpoint};
1116
1117    use super::*;
1118
1119    #[derive(Clone)]
1120    struct Completion(Arc<CancellationToken>);
1121
1122    impl Drop for Completion {
1123        fn drop(&mut self) {
1124            self.finish();
1125        }
1126    }
1127
1128    impl Completion {
1129        fn new() -> Self {
1130            Self(Arc::new(CancellationToken::new()))
1131        }
1132
1133        fn finish(&self) {
1134            self.0.cancel();
1135        }
1136
1137        async fn wait(&self) {
1138            self.0.cancelled().await;
1139        }
1140    }
1141
1142    fn set_store_info_msg(store_id: &StoreId) -> LogMsg {
1143        LogMsg::SetStoreInfo(SetStoreInfo {
1144            row_id: *RowId::new(),
1145            info: StoreInfo::new(
1146                store_id.clone(),
1147                StoreSource::RustSdk {
1148                    rustc_version: String::new(),
1149                    llvm_version: String::new(),
1150                },
1151            ),
1152        })
1153    }
1154
1155    /// Generates `n` log messages wrapped in a `SetStoreInfo` at the start and `BlueprintActivationCommand` at the end,
1156    /// to exercise message ordering.
1157    fn fake_log_stream_blueprint(n: usize) -> Vec<LogMsg> {
1158        let store_id = StoreId::random(StoreKind::Blueprint, "test_app");
1159
1160        let mut messages = Vec::new();
1161        messages.push(set_store_info_msg(&store_id));
1162        for _ in 0..n {
1163            messages.push(LogMsg::ArrowMsg(
1164                store_id.clone(),
1165                re_chunk::Chunk::builder("test_entity")
1166                    .with_archetype(
1167                        re_chunk::RowId::new(),
1168                        re_log_types::TimePoint::default().with(
1169                            re_log_types::Timeline::new_sequence("blueprint"),
1170                            re_log_types::TimeInt::from_millis(re_log_types::NonMinI64::MIN),
1171                        ),
1172                        &re_sdk_types::blueprint::archetypes::Background::new(
1173                            re_sdk_types::blueprint::components::BackgroundKind::SolidColor,
1174                        )
1175                        .with_color([255, 0, 0]),
1176                    )
1177                    .build()
1178                    .unwrap()
1179                    .to_arrow_msg()
1180                    .unwrap(),
1181            ));
1182        }
1183        messages.push(LogMsg::BlueprintActivationCommand(
1184            re_log_types::BlueprintActivationCommand {
1185                blueprint_id: store_id,
1186                make_active: true,
1187                make_default: true,
1188            },
1189        ));
1190
1191        messages
1192    }
1193
1194    #[derive(Clone, Copy)]
1195    enum Temporalness {
1196        Static,
1197        Temporal,
1198    }
1199
1200    fn fake_log_stream_recording(n: usize) -> Vec<LogMsg> {
1201        let store_id = StoreId::random(StoreKind::Recording, "test_app");
1202
1203        chain!(
1204            [set_store_info_msg(&store_id)],
1205            generate_log_messages(&store_id, n, Temporalness::Temporal)
1206        )
1207        .collect()
1208    }
1209
1210    fn generate_log_messages(
1211        store_id: &StoreId,
1212        n: usize,
1213        temporalness: Temporalness,
1214    ) -> Vec<LogMsg> {
1215        let mut messages = Vec::new();
1216        for _ in 0..n {
1217            let timepoint = match temporalness {
1218                Temporalness::Static => re_log_types::TimePoint::STATIC,
1219                Temporalness::Temporal => re_log_types::TimePoint::default().with(
1220                    re_log_types::Timeline::new_sequence("log_time"),
1221                    re_log_types::TimeInt::from_millis(re_log_types::NonMinI64::MIN),
1222                ),
1223            };
1224
1225            messages.push(LogMsg::ArrowMsg(
1226                store_id.clone(),
1227                re_chunk::Chunk::builder("test_entity")
1228                    .with_archetype(
1229                        re_chunk::RowId::new(),
1230                        timepoint,
1231                        &re_sdk_types::archetypes::Points2D::new([
1232                            (0.0, 0.0),
1233                            (1.0, 1.0),
1234                            (2.0, 2.0),
1235                        ]),
1236                    )
1237                    .build()
1238                    .unwrap()
1239                    .to_arrow_msg()
1240                    .unwrap(),
1241            ));
1242        }
1243        messages
1244    }
1245
1246    async fn setup() -> (Completion, SocketAddr) {
1247        setup_opt(ServerOptions {
1248            playback_behavior: PlaybackBehavior::OldestFirst,
1249            memory_limit: MemoryLimit::UNLIMITED,
1250        })
1251        .await
1252    }
1253
1254    async fn setup_with_memory_limit(memory_limit: MemoryLimit) -> (Completion, SocketAddr) {
1255        setup_opt(ServerOptions {
1256            playback_behavior: PlaybackBehavior::OldestFirst,
1257            memory_limit,
1258        })
1259        .await
1260    }
1261
1262    async fn setup_opt(options: ServerOptions) -> (Completion, SocketAddr) {
1263        let completion = Completion::new();
1264
1265        let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1266        let addr = tcp_listener.local_addr().unwrap();
1267
1268        tokio::spawn({
1269            let completion = completion.clone();
1270            async move {
1271                tonic::transport::Server::builder()
1272                    // NOTE: This NODELAY very likely does nothing because of the call to
1273                    // `serve_with_incoming_shutdown` below, but we better be on the defensive here so
1274                    // we don't get surprised when things inevitably change.
1275                    .tcp_nodelay(true)
1276                    .accept_http1(true)
1277                    .http2_adaptive_window(Some(true)) // Optimize for throughput
1278                    .add_service(
1279                        MessageProxyServiceServer::new(super::MessageProxy::new(options))
1280                            .max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE)
1281                            .max_encoding_message_size(MAX_ENCODING_MESSAGE_SIZE),
1282                    )
1283                    .serve_with_incoming_shutdown(
1284                        TcpIncoming::from(tcp_listener).with_nodelay(Some(true)),
1285                        completion.wait(),
1286                    )
1287                    .await
1288                    .unwrap();
1289            }
1290        });
1291
1292        (completion, addr)
1293    }
1294
1295    async fn make_client(addr: SocketAddr) -> MessageProxyServiceClient<Channel> {
1296        MessageProxyServiceClient::new(
1297            Endpoint::from_shared(format!("http://{addr}"))
1298                .unwrap()
1299                .connect()
1300                .await
1301                .unwrap(),
1302        )
1303        .max_decoding_message_size(crate::MAX_DECODING_MESSAGE_SIZE)
1304    }
1305
1306    async fn write_messages(
1307        client: &mut MessageProxyServiceClient<Channel>,
1308        messages: Vec<LogMsg>,
1309    ) {
1310        client
1311            .write_messages(tokio_stream::iter(
1312                messages
1313                    .clone()
1314                    .into_iter()
1315                    .map(|msg| msg.to_transport(Compression::Off).unwrap())
1316                    .map(|msg| WriteMessagesRequest {
1317                        log_msg: Some(msg.into()),
1318                    }),
1319            ))
1320            .await
1321            .unwrap();
1322    }
1323
1324    async fn read_log_stream(
1325        log_stream: &mut tonic::Response<tonic::Streaming<ReadMessagesResponse>>,
1326        n: usize,
1327    ) -> Vec<LogMsg> {
1328        let mut app_id_cache = re_log_encoding::CachingApplicationIdInjector::default();
1329
1330        let mut stream_ref = log_stream.get_mut().map(|result| {
1331            let msg = result.unwrap().log_msg.unwrap().msg.unwrap();
1332            msg.to_application((&mut app_id_cache, None)).unwrap()
1333        });
1334
1335        let mut messages = Vec::new();
1336        for _ in 0..n {
1337            messages.push(stream_ref.next().await.unwrap());
1338        }
1339        messages
1340    }
1341
1342    #[tokio::test]
1343    async fn pubsub_basic() {
1344        let (completion, addr) = setup().await;
1345        let mut client = make_client(addr).await; // We use the same client for both producing and consuming
1346        let messages = fake_log_stream_blueprint(3);
1347
1348        // start reading
1349        let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1350
1351        write_messages(&mut client, messages.clone()).await;
1352
1353        // the messages should be echoed to us
1354        let actual = read_log_stream(&mut log_stream, messages.len()).await;
1355
1356        assert_eq!(messages, actual);
1357
1358        // While `SetStoreInfo` is sent first in `fake_log_stream`,
1359        // we can observe that it's also received first,
1360        // even though it is actually stored out of order in `persistent_message_queue`.
1361        assert!(matches!(messages[0], LogMsg::SetStoreInfo(..)));
1362        assert!(matches!(actual[0], LogMsg::SetStoreInfo(..)));
1363
1364        completion.finish();
1365    }
1366
1367    #[tokio::test]
1368    async fn pubsub_history() {
1369        let (completion, addr) = setup().await;
1370        let mut client = make_client(addr).await; // We use the same client for both producing and consuming
1371        let messages = fake_log_stream_blueprint(3);
1372
1373        // don't read anything yet - these messages should be sent to us as part of history when we call `read_messages` later
1374
1375        write_messages(&mut client, messages.clone()).await;
1376
1377        // Start reading now - we should receive full history at this point:
1378        let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1379
1380        let actual = read_log_stream(&mut log_stream, messages.len()).await;
1381        assert_eq!(messages, actual);
1382
1383        completion.finish();
1384    }
1385
1386    #[tokio::test]
1387    async fn one_producer_many_consumers() {
1388        let (completion, addr) = setup().await;
1389        let mut producer = make_client(addr).await; // We use separate clients for producing and consuming
1390        let mut consumers = vec![make_client(addr).await, make_client(addr).await];
1391        let messages = fake_log_stream_blueprint(3);
1392
1393        // Initialize multiple read streams:
1394        let mut log_streams = vec![];
1395        for consumer in &mut consumers {
1396            log_streams.push(
1397                consumer
1398                    .read_messages(ReadMessagesRequest {})
1399                    .await
1400                    .unwrap(),
1401            );
1402        }
1403
1404        write_messages(&mut producer, messages.clone()).await;
1405
1406        // Each consumer should've received them:
1407        for log_stream in &mut log_streams {
1408            let actual = read_log_stream(log_stream, messages.len()).await;
1409            assert_eq!(messages, actual);
1410        }
1411
1412        completion.finish();
1413    }
1414
1415    #[tokio::test]
1416    async fn many_producers_many_consumers() {
1417        let (completion, addr) = setup().await;
1418        let mut producers = vec![make_client(addr).await, make_client(addr).await];
1419        let mut consumers = vec![make_client(addr).await, make_client(addr).await];
1420        let messages = fake_log_stream_blueprint(3);
1421
1422        // Initialize multiple read streams:
1423        let mut log_streams = vec![];
1424        for consumer in &mut consumers {
1425            log_streams.push(
1426                consumer
1427                    .read_messages(ReadMessagesRequest {})
1428                    .await
1429                    .unwrap(),
1430            );
1431        }
1432
1433        // Write a few messages using each producer:
1434        for producer in &mut producers {
1435            write_messages(producer, messages.clone()).await;
1436        }
1437
1438        let expected = [messages.clone(), messages.clone()].concat();
1439
1440        // Each consumer should've received one set of messages from each producer.
1441        // Note that in this test we also guarantee the order of messages across producers,
1442        // due to the `write_messages` calls being sequential.
1443
1444        for log_stream in &mut log_streams {
1445            let actual = read_log_stream(log_stream, expected.len()).await;
1446            assert_eq!(actual, expected);
1447        }
1448
1449        completion.finish();
1450    }
1451
1452    #[tokio::test]
1453    async fn memory_limit_drops_messages() {
1454        // Use an absurdly low memory limit to force all messages to be dropped immediately from history
1455        let (completion, addr) = setup_with_memory_limit(MemoryLimit::from_bytes(1)).await;
1456        let mut client = make_client(addr).await;
1457        let messages = fake_log_stream_recording(3);
1458
1459        write_messages(&mut client, messages.clone()).await;
1460
1461        // Start reading
1462        let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1463        let mut actual = vec![];
1464        loop {
1465            let timeout_stream = log_stream.get_mut().timeout(Duration::from_millis(100));
1466            tokio::pin!(timeout_stream);
1467            let timeout_result = timeout_stream.try_next().await;
1468            let mut app_id_cache = re_log_encoding::CachingApplicationIdInjector::default();
1469            match timeout_result {
1470                Ok(Some(value)) => {
1471                    let msg = value.unwrap().log_msg.unwrap().msg.unwrap();
1472                    actual.push(msg.to_application((&mut app_id_cache, None)).unwrap());
1473                }
1474
1475                // Stream closed | Timed out
1476                Ok(None) | Err(_) => break,
1477            }
1478        }
1479
1480        // The GC runs _before_ a message is stored, so we should see the persistent message, and the last message sent.
1481        assert_eq!(actual.len(), 2);
1482        assert_eq!(&actual[0], &messages[0]);
1483        assert_eq!(&actual[1], messages.last().unwrap());
1484
1485        completion.finish();
1486    }
1487
1488    #[tokio::test]
1489    async fn memory_limit_does_not_drop_blueprint() {
1490        // Use an absurdly low memory limit to force all messages to be dropped immediately from history
1491        let (completion, addr) = setup_with_memory_limit(MemoryLimit::from_bytes(1)).await;
1492        let mut client = make_client(addr).await;
1493        let messages = fake_log_stream_blueprint(3);
1494
1495        // Write some messages
1496        write_messages(&mut client, messages.clone()).await;
1497
1498        // Start reading
1499        let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1500        let mut actual = vec![];
1501        loop {
1502            let timeout_stream = log_stream.get_mut().timeout(Duration::from_millis(100));
1503            tokio::pin!(timeout_stream);
1504            let timeout_result = timeout_stream.try_next().await;
1505            let mut app_id_cache = re_log_encoding::CachingApplicationIdInjector::default();
1506            match timeout_result {
1507                Ok(Some(value)) => {
1508                    let msg = value.unwrap().log_msg.unwrap().msg.unwrap();
1509                    actual.push(msg.to_application((&mut app_id_cache, None)).unwrap());
1510                }
1511
1512                // Stream closed | Timed out
1513                Ok(None) | Err(_) => break,
1514            }
1515        }
1516
1517        // The stream in this case only contains SetStoreInfo, ArrowMsg with StoreKind::Blueprint,
1518        // and BlueprintActivationCommand. None of these things should be GC'd:
1519        assert_eq!(messages, actual);
1520
1521        completion.finish();
1522    }
1523
1524    #[tokio::test]
1525    async fn memory_limit_does_not_interrupt_stream() {
1526        let memory_limits = [
1527            0, // Will actually disable the message buffer and GC logic. Good to test that!
1528            1, // An absurdly low memory limit to force all messages to be dropped immediately from history
1529        ];
1530
1531        for memory_limit in memory_limits {
1532            let (completion, addr) =
1533                setup_with_memory_limit(MemoryLimit::from_bytes(memory_limit)).await;
1534            let mut client = make_client(addr).await; // We use the same client for both producing and consuming
1535            let messages = fake_log_stream_blueprint(3);
1536
1537            // Start reading
1538            let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1539
1540            write_messages(&mut client, messages.clone()).await;
1541
1542            // The messages should be echoed to us, even though none of them will be stored in history
1543            let actual = read_log_stream(&mut log_stream, messages.len()).await;
1544            assert_eq!(messages, actual);
1545
1546            completion.finish();
1547        }
1548    }
1549
1550    #[tokio::test]
1551    async fn static_data_is_returned_first() {
1552        let (completion, addr) = setup_with_memory_limit(MemoryLimit::UNLIMITED).await;
1553        let mut client = make_client(addr).await;
1554
1555        let store_id = StoreId::random(StoreKind::Recording, "test_app");
1556
1557        let set_store_info = vec![set_store_info_msg(&store_id)];
1558        let first_static = generate_log_messages(&store_id, 3, Temporalness::Static);
1559        let first_temporal = generate_log_messages(&store_id, 3, Temporalness::Temporal);
1560        let second_static = generate_log_messages(&store_id, 3, Temporalness::Static);
1561
1562        write_messages(&mut client, set_store_info.clone()).await;
1563        write_messages(&mut client, first_static.clone()).await;
1564        write_messages(&mut client, first_temporal.clone()).await;
1565        write_messages(&mut client, second_static.clone()).await;
1566
1567        // All static data should always come before temporal data:
1568        let expected =
1569            itertools::chain!(set_store_info, first_static, second_static, first_temporal)
1570                .collect_vec();
1571
1572        let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1573        let actual = read_log_stream(&mut log_stream, expected.len()).await;
1574
1575        assert_eq!(actual, expected);
1576
1577        completion.finish();
1578    }
1579
1580    #[tokio::test]
1581    async fn playback_newest_first() {
1582        let (completion, addr) = setup_opt(ServerOptions {
1583            playback_behavior: PlaybackBehavior::NewestFirst, // this is what we want to test
1584            memory_limit: MemoryLimit::UNLIMITED,
1585        })
1586        .await;
1587        let mut client = make_client(addr).await;
1588
1589        let store_id = StoreId::random(StoreKind::Recording, "test_app");
1590
1591        let set_store_info = vec![set_store_info_msg(&store_id)];
1592        let first_statics = generate_log_messages(&store_id, 3, Temporalness::Static);
1593        let temporals = generate_log_messages(&store_id, 3, Temporalness::Temporal);
1594        let second_statics = generate_log_messages(&store_id, 3, Temporalness::Static);
1595
1596        write_messages(&mut client, set_store_info.clone()).await;
1597        write_messages(&mut client, first_statics.clone()).await;
1598        write_messages(&mut client, temporals.clone()).await;
1599        write_messages(&mut client, second_statics.clone()).await;
1600
1601        // All static data should always come before temporal data:
1602        let expected = itertools::chain!(
1603            set_store_info.into_iter().rev(),
1604            second_statics.into_iter().rev(),
1605            first_statics.into_iter().rev(),
1606            temporals.into_iter().rev()
1607        )
1608        .collect_vec();
1609
1610        let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1611        let actual = read_log_stream(&mut log_stream, expected.len()).await;
1612
1613        assert_eq!(actual, expected);
1614
1615        completion.finish();
1616    }
1617}