re_grpc_server/
lib.rs

1//! Server for the legacy `StoreHub` API.
2
3pub mod shutdown;
4
5use std::{collections::VecDeque, net::SocketAddr, pin::Pin};
6
7use tokio::{
8    net::TcpListener,
9    sync::{broadcast, mpsc, oneshot},
10};
11use tokio_stream::{Stream, StreamExt as _, wrappers::BroadcastStream};
12use tonic::transport::{Server, server::TcpIncoming};
13use tower_http::cors::CorsLayer;
14
15use re_byte_size::SizeBytes;
16use re_log_encoding::{ToApplication as _, ToTransport as _};
17use re_log_types::{DataSourceMessage, TableMsg};
18use re_protos::sdk_comms::v1alpha1::{
19    ReadTablesRequest, ReadTablesResponse, WriteMessagesRequest, WriteTableRequest,
20    WriteTableResponse,
21};
22
23use re_protos::{
24    common::v1alpha1::{
25        DataframePart as DataframePartProto, StoreKind as StoreKindProto, TableId as TableIdProto,
26    },
27    log_msg::v1alpha1::LogMsg as LogMsgProto,
28    sdk_comms::v1alpha1::{
29        ReadMessagesRequest, ReadMessagesResponse, WriteMessagesResponse,
30        message_proxy_service_server,
31    },
32};
33
34use crate::priority_stream::PriorityMerge;
35
36mod priority_stream;
37
38pub use re_memory::MemoryLimit;
39
40/// Default port of the OSS /proxy server.
41pub const DEFAULT_SERVER_PORT: u16 = 9876;
42
43pub const MAX_DECODING_MESSAGE_SIZE: usize = u32::MAX as usize;
44pub const MAX_ENCODING_MESSAGE_SIZE: usize = MAX_DECODING_MESSAGE_SIZE;
45
46// Channel capacity is completely arbitrary, e just want something large enough
47// to handle bursts of messages. This is roughly 16 MiB of `Msg` (excluding their contents).
48const MESSAGE_QUEUE_CAPACITY: usize =
49    (16 * 1024 * 1024 / std::mem::size_of::<LogOrTableMsgProto>()).next_power_of_two();
50
51/// Options for the gRPC Proxy Server
52#[derive(Clone, Copy, Debug)]
53pub struct ServerOptions {
54    /// When a client connect, should they be sent the oldest data first, or the newest?
55    pub playback_behavior: PlaybackBehavior,
56
57    /// Start garbage collecting old data when we reach this.
58    ///
59    /// If server & client are running on the same machine and all clients are expected to connect before
60    /// any data is sent, it is highly recommended that you set the memory limit to `0B`,
61    /// otherwise you're potentially doubling your memory usage!
62    pub memory_limit: MemoryLimit,
63}
64
65impl Default for ServerOptions {
66    fn default() -> Self {
67        Self {
68            playback_behavior: PlaybackBehavior::OldestFirst,
69            memory_limit: MemoryLimit::UNLIMITED,
70        }
71    }
72}
73
74/// What happens when a client connects to a gRPC server?
75#[derive(Clone, Copy, Debug)]
76pub enum PlaybackBehavior {
77    /// Start playing back all the old data first,
78    /// and only after start sending anything that happened since.
79    OldestFirst,
80
81    /// Prioritize the newest arriving messages,
82    /// replaying the history later, starting with the newest.
83    NewestFirst,
84}
85
86impl PlaybackBehavior {
87    pub fn from_newest_first(newest_first: bool) -> Self {
88        if newest_first {
89            Self::NewestFirst
90        } else {
91            Self::OldestFirst
92        }
93    }
94}
95
96/// Wrapper with a nicer error message
97#[derive(Debug)]
98pub struct TonicStatusError(pub tonic::Status);
99
100impl std::fmt::Display for TonicStatusError {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        // TODO(emilk): duplicated in `re_grpc_client`
103        let status = &self.0;
104
105        write!(f, "gRPC error")?;
106
107        if status.code() != tonic::Code::Unknown {
108            write!(f, ", code: '{}'", status.code())?;
109        }
110        if !status.message().is_empty() {
111            write!(f, ", message: {:?}", status.message())?;
112        }
113        // Binary data - not useful.
114        // if !status.details().is_empty() {
115        //     write!(f, ", details: {:?}", status.details())?;
116        // }
117        if !status.metadata().is_empty() {
118            write!(f, ", metadata: {:?}", status.metadata().as_ref())?;
119        }
120        Ok(())
121    }
122}
123
124impl From<tonic::Status> for TonicStatusError {
125    fn from(value: tonic::Status) -> Self {
126        Self(value)
127    }
128}
129
130// TODO(jan): Refactor `serve`/`spawn` variants into a builder?
131
132/// Start a Rerun server, listening on `addr`.
133///
134/// A Rerun server is an in-memory implementation of a Storage Node.
135///
136/// The returned future must be polled for the server to make progress.
137///
138/// Currently, the only RPCs supported by the server are `WriteMessages` and `ReadMessages`.
139///
140/// Clients send data to the server via `WriteMessages`. Any sent messages will be stored
141/// in the server's message queue. Messages are only removed if the server hits its configured
142/// memory limit.
143///
144/// Clients receive data from the server via `ReadMessages`. Upon establishing the stream,
145/// the server sends all messages stored in its message queue, and subscribes the client
146/// to the queue. Any messages sent to the server through `WriteMessages` will be proxied
147/// to the open `ReadMessages` stream.
148pub async fn serve(
149    addr: SocketAddr,
150    options: ServerOptions,
151    shutdown: shutdown::Shutdown,
152) -> anyhow::Result<()> {
153    serve_impl(addr, MessageProxy::new(options), shutdown).await
154}
155
156async fn serve_impl(
157    addr: SocketAddr,
158    message_proxy: MessageProxy,
159    shutdown: shutdown::Shutdown,
160) -> anyhow::Result<()> {
161    let tcp_listener = TcpListener::bind(addr).await?;
162    let incoming = TcpIncoming::from(tcp_listener).with_nodelay(Some(true));
163
164    let connect_addr = if addr.ip().is_loopback() || addr.ip().is_unspecified() {
165        format!("rerun+http://127.0.0.1:{}/proxy", addr.port())
166    } else {
167        format!("rerun+http://{addr}/proxy")
168    };
169    re_log::info!(
170        "Listening for gRPC connections on {addr}. Connect by running `rerun --connect {connect_addr}`"
171    );
172
173    let cors = CorsLayer::very_permissive();
174    let grpc_web = tonic_web::GrpcWebLayer::new();
175
176    let routes = {
177        let mut routes_builder = tonic::service::Routes::builder();
178        routes_builder.add_service(
179            re_protos::sdk_comms::v1alpha1::message_proxy_service_server::MessageProxyServiceServer::new(
180                message_proxy,
181            )
182            .max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE)
183            .max_encoding_message_size(MAX_ENCODING_MESSAGE_SIZE),
184        );
185        routes_builder.routes()
186    };
187
188    Server::builder()
189        .accept_http1(true) // Support `grpc-web` clients
190        .layer(cors) // Allow CORS requests from web clients
191        .layer(grpc_web) // Support `grpc-web` clients
192        .add_routes(routes)
193        .serve_with_incoming_shutdown(incoming, shutdown.wait())
194        .await?;
195
196    Ok(())
197}
198
199/// Start a Rerun server, listening on `addr`.
200///
201/// The returned future must be polled for the server to make progress.
202///
203/// This function additionally accepts a smart channel, through which messages
204/// can be sent to the server directly. It is similar to creating a client
205/// and sending messages through `WriteMessages`, but without the overhead
206/// of a localhost connection.
207///
208/// See [`serve`] for more information about what a Rerun server is.
209pub async fn serve_from_channel(
210    addr: SocketAddr,
211    options: ServerOptions,
212    shutdown: shutdown::Shutdown,
213    channel_rx: re_smart_channel::Receiver<re_log_types::LogMsg>,
214) {
215    let message_proxy = MessageProxy::new(options);
216    let event_tx = message_proxy.event_tx.clone();
217
218    tokio::task::spawn_blocking(move || {
219        use re_smart_channel::SmartMessagePayload;
220
221        loop {
222            let msg = if let Ok(msg) = channel_rx.recv() {
223                match msg.payload {
224                    SmartMessagePayload::Msg(msg) => msg,
225                    SmartMessagePayload::Flush { on_flush_done } => {
226                        on_flush_done(); // we don't buffer
227                        continue;
228                    }
229                    SmartMessagePayload::Quit(err) => {
230                        if let Some(err) = err {
231                            re_log::debug!("smart channel sender quit: {err}");
232                        } else {
233                            re_log::debug!("smart channel sender quit");
234                        }
235                        break;
236                    }
237                }
238            } else {
239                re_log::debug!("smart channel sender closed, closing receiver");
240                break;
241            };
242
243            let msg = match msg.to_transport(re_log_encoding::rrd::Compression::LZ4) {
244                Ok(msg) => msg,
245                Err(err) => {
246                    re_log::error!("failed to encode message: {err}");
247                    continue;
248                }
249            };
250
251            if event_tx.blocking_send(Event::Message(msg.into())).is_err() {
252                re_log::debug!("shut down, closing sender");
253                break;
254            }
255        }
256    });
257
258    if let Err(err) = serve_impl(addr, message_proxy, shutdown).await {
259        re_log::error!("message proxy server crashed: {err}");
260    }
261}
262
263/// Start a Rerun server, listening on `addr`.
264///
265/// This function additionally accepts a `ReceiveSet`, from which the
266/// server will read all messages. It is similar to creating a client
267/// and sending messages through `WriteMessages`, but without the overhead
268/// of a localhost connection.
269///
270/// See [`serve`] for more information about what a Rerun server is.
271pub fn spawn_from_rx_set(
272    addr: SocketAddr,
273    options: ServerOptions,
274    shutdown: shutdown::Shutdown,
275    rxs: re_smart_channel::ReceiveSet<re_log_types::DataSourceMessage>,
276) {
277    let message_proxy = MessageProxy::new(options);
278    let event_tx = message_proxy.event_tx.clone();
279
280    tokio::spawn(async move {
281        if let Err(err) = serve_impl(addr, message_proxy, shutdown).await {
282            re_log::error!("message proxy server crashed: {err}");
283        }
284    });
285
286    tokio::task::spawn_blocking(move || {
287        use re_smart_channel::SmartMessagePayload;
288
289        loop {
290            let msg = if let Ok(msg) = rxs.recv() {
291                match msg.payload {
292                    SmartMessagePayload::Msg(msg) => msg,
293                    SmartMessagePayload::Flush { on_flush_done } => {
294                        on_flush_done(); // we don't buffer
295                        continue;
296                    }
297                    SmartMessagePayload::Quit(err) => {
298                        if let Some(err) = err {
299                            re_log::debug!("smart channel sender quit: {err}");
300                        } else {
301                            re_log::debug!("smart channel sender quit");
302                        }
303                        if rxs.is_empty() {
304                            // We won't ever receive more data:
305                            break;
306                        }
307                        continue;
308                    }
309                }
310            } else {
311                if rxs.is_empty() {
312                    // We won't ever receive more data:
313                    break;
314                }
315                continue;
316            };
317
318            let msg = match msg {
319                DataSourceMessage::LogMsg(log_msg) => log_msg,
320                DataSourceMessage::UiCommand(ui_command) => {
321                    re_log::warn!(
322                        "Received a UI command, grpc server can't forward these yet: {ui_command:?}"
323                    );
324                    continue;
325                }
326            };
327
328            let msg = match msg.to_transport(re_log_encoding::rrd::Compression::LZ4) {
329                Ok(msg) => msg,
330                Err(err) => {
331                    re_log::error!("failed to encode message: {err}");
332                    continue;
333                }
334            };
335
336            if event_tx.blocking_send(Event::Message(msg.into())).is_err() {
337                re_log::debug!("shut down, closing sender");
338                break;
339            }
340        }
341    });
342}
343
344/// Start a Rerun server, listening on `addr`.
345///
346/// This function additionally creates a smart channel, and returns its receiving end.
347/// Any messages received by the server are sent through the channel. This is similar
348/// to creating a client and calling `ReadMessages`, but without the overhead of a
349/// localhost connection.
350///
351/// The server is spawned as a task on a `tokio` runtime. This function panics if the
352/// runtime is not available.
353///
354/// See [`serve`] for more information about what a Rerun server is.
355pub fn spawn_with_recv(
356    addr: SocketAddr,
357    options: ServerOptions,
358    shutdown: shutdown::Shutdown,
359) -> (
360    re_smart_channel::Receiver<re_log_types::DataSourceMessage>,
361    crossbeam::channel::Receiver<re_log_types::TableMsg>,
362) {
363    let uri = re_uri::ProxyUri::new(re_uri::Origin::from_scheme_and_socket_addr(
364        re_uri::Scheme::RerunHttp,
365        addr,
366    ));
367    let (channel_log_tx, channel_log_rx) = re_smart_channel::smart_channel(
368        re_smart_channel::SmartMessageSource::MessageProxy(uri.clone()),
369        re_smart_channel::SmartChannelSource::MessageProxy(uri),
370    );
371    let (channel_table_tx, channel_table_rx) = crossbeam::channel::unbounded();
372    let (message_proxy, mut broadcast_log_rx, mut broadcast_table_rx) =
373        MessageProxy::new_with_recv(options);
374    tokio::spawn(async move {
375        if let Err(err) = serve_impl(addr, message_proxy, shutdown).await {
376            re_log::error!("message proxy server crashed: {err}");
377        }
378    });
379    tokio::spawn(async move {
380        let mut app_id_cache = re_log_encoding::CachingApplicationIdInjector::default();
381
382        loop {
383            let msg = match broadcast_log_rx.recv().await {
384                Ok(msg) => match msg.msg {
385                    Some(msg) => msg.to_application((&mut app_id_cache, None)),
386                    None => Err(re_protos::missing_field!(
387                        re_protos::log_msg::v1alpha1::LogMsg,
388                        "msg"
389                    )
390                    .into()),
391                },
392                Err(broadcast::error::RecvError::Closed) => {
393                    re_log::debug!("message proxy server shut down, closing receiver");
394                    channel_log_tx.quit(None).ok();
395                    break;
396                }
397                Err(broadcast::error::RecvError::Lagged(n)) => {
398                    re_log::warn!(
399                        "message proxy receiver dropped {n} messages due to backpressure"
400                    );
401                    continue;
402                }
403            };
404            match msg {
405                Ok(mut log_msg) => {
406                    // Insert the timestamp metadata into the Arrow message for accurate e2e latency measurements.
407                    // Note that this function is only called by the viewer
408                    // (that's what the message-receiver is connected to).
409                    log_msg.insert_arrow_record_batch_metadata(
410                        re_sorbet::timestamp_metadata::KEY_TIMESTAMP_VIEWER_IPC_DECODED.to_owned(),
411                        re_sorbet::timestamp_metadata::now_timestamp(),
412                    );
413
414                    if channel_log_tx.send(log_msg.into()).is_err() {
415                        re_log::debug!(
416                            "message proxy smart channel receiver closed, closing sender"
417                        );
418                        break;
419                    }
420                }
421                Err(err) => {
422                    re_log::error!("dropping LogMsg due to failed decode: {err}");
423                }
424            }
425        }
426    });
427    tokio::spawn(async move {
428        loop {
429            let msg = match broadcast_table_rx.recv().await {
430                Ok(msg) => msg.data.try_into().map(|data| TableMsg {
431                    id: msg.id.into(),
432                    data,
433                }),
434                Err(broadcast::error::RecvError::Closed) => {
435                    re_log::debug!("message proxy server shut down, closing receiver");
436                    // `crossbeam` does not have a `quit` method, so we're done here.
437                    break;
438                }
439                Err(broadcast::error::RecvError::Lagged(n)) => {
440                    re_log::warn!(
441                        "message proxy receiver dropped {n} messages due to backpressure"
442                    );
443                    continue;
444                }
445            };
446            match msg {
447                Ok(msg) => {
448                    if channel_table_tx.send(msg).is_err() {
449                        re_log::debug!(
450                            "message proxy smart channel receiver closed, closing sender"
451                        );
452                        break;
453                    }
454                }
455                Err(err) => {
456                    re_log::error!("dropping table due to failed decode: {err}");
457                }
458            }
459        }
460    });
461    (channel_log_rx, channel_table_rx)
462}
463
464enum Event {
465    /// New client connected, requesting full history and subscribing to new messages.
466    NewClient(
467        oneshot::Sender<(
468            Vec<LogOrTableMsgProto>,
469            broadcast::Receiver<LogMsgProto>,
470            broadcast::Receiver<TableMsgProto>,
471        )>,
472    ),
473
474    /// A client sent a message.
475    Message(LogMsgProto),
476
477    /// A client sent a table.
478    Table(TableMsgProto),
479}
480
481#[derive(Clone)]
482struct TableMsgProto {
483    id: TableIdProto,
484    data: DataframePartProto,
485}
486
487#[derive(Clone)]
488enum LogOrTableMsgProto {
489    LogMsg(LogMsgProto),
490    Table(TableMsgProto),
491}
492
493impl LogOrTableMsgProto {
494    fn total_size_bytes(&self) -> u64 {
495        match self {
496            Self::LogMsg(log_msg) => log_msg.total_size_bytes(),
497            Self::Table(table) => table.total_size_bytes(),
498        }
499    }
500}
501
502impl From<LogMsgProto> for LogOrTableMsgProto {
503    fn from(value: LogMsgProto) -> Self {
504        Self::LogMsg(value)
505    }
506}
507
508impl From<TableMsgProto> for LogOrTableMsgProto {
509    fn from(value: TableMsgProto) -> Self {
510        Self::Table(value)
511    }
512}
513
514// -----------------------------------------------------------------------------------
515
516#[derive(Default)]
517struct MsgQueue {
518    /// Messages stored in order of arrival, and garbage collected if the server hits the memory limit.
519    queue: VecDeque<LogOrTableMsgProto>,
520
521    /// Total size of [`Self::queue`] in bytes.
522    size_bytes: u64,
523}
524
525impl MsgQueue {
526    pub fn iter(&self) -> impl DoubleEndedIterator<Item = &LogOrTableMsgProto> {
527        self.queue.iter()
528    }
529
530    pub fn push_back(&mut self, msg: LogOrTableMsgProto) {
531        self.size_bytes += msg.total_size_bytes();
532        self.queue.push_back(msg);
533    }
534
535    pub fn pop_front(&mut self) -> Option<LogOrTableMsgProto> {
536        if let Some(msg) = self.queue.pop_front() {
537            self.size_bytes -= msg.total_size_bytes();
538            Some(msg)
539        } else {
540            None
541        }
542    }
543}
544
545// -----------------------------------------------------------------------------------
546
547/// Contains all messages received so far,
548/// minus some that are garbage collected when needed.
549#[derive(Default)]
550struct MessageBuffer {
551    /// Normal data messages.
552    ///
553    /// First to be garbage collected if we run into the memory limit.
554    disposable: MsgQueue,
555
556    /// "Static" (non-temporal) data messages.
557    ///
558    /// Our chunk-store already keeps static messages forever,
559    /// and it makes sense: you usually log them once,
560    /// and then expect them to stay around.
561    ///
562    /// We keep the static messages for as long as we can, but if [`Self::disposable`]
563    /// is empty and we're still over our memory budget, we start throwing
564    /// away the oldest messages from here too.
565    /// This is because some users use static logging for camera images,
566    /// which adds up very quickly.
567    ///
568    /// Ideally we would keep exactly one static message per entity/component stream
569    /// (like the `ChunkStore` does), but we'll save that for:
570    /// TODO(#5531): replace this with `ChunkStore`
571    static_: MsgQueue,
572
573    /// These are never garbage collected.
574    persistent: MsgQueue,
575}
576
577impl MessageBuffer {
578    fn size_bytes(&self) -> u64 {
579        let Self {
580            disposable,
581            static_,
582            persistent,
583        } = self;
584        disposable.size_bytes + static_.size_bytes + persistent.size_bytes
585    }
586
587    fn all(&self, playback_behavior: PlaybackBehavior) -> Vec<LogOrTableMsgProto> {
588        re_tracing::profile_function!();
589
590        let Self {
591            disposable,
592            static_,
593            persistent,
594        } = self;
595
596        // Note: we ALWAYS send the persistent and static data before the disposable,
597        // regardless of PlaybackBehavior!
598
599        match playback_behavior {
600            PlaybackBehavior::OldestFirst => {
601                itertools::chain!(persistent.iter(), static_.iter(), disposable.iter())
602                    .cloned()
603                    .collect()
604            }
605            PlaybackBehavior::NewestFirst => itertools::chain!(
606                persistent.iter().rev(),
607                static_.iter().rev(),
608                disposable.iter().rev()
609            )
610            .cloned()
611            .collect(),
612        }
613    }
614
615    fn add_table(&mut self, table: TableMsgProto) {
616        self.disposable.push_back(table.into());
617    }
618
619    fn add_log_msg(&mut self, msg: LogMsgProto) {
620        let Some(inner) = &msg.msg else {
621            re_log::error!(
622                "{}",
623                re_protos::missing_field!(re_protos::log_msg::v1alpha1::LogMsg, "msg")
624            );
625            return;
626        };
627
628        // We put store info, blueprint data, and blueprint activation commands
629        // in a separate queue that does *not* get garbage collected.
630        use re_protos::log_msg::v1alpha1::log_msg::Msg;
631        match inner {
632            // Store info, blueprint activation commands
633            Msg::SetStoreInfo(..) | Msg::BlueprintActivationCommand(..) => {
634                self.persistent.push_back(msg.into());
635            }
636
637            Msg::ArrowMsg(inner) => {
638                let is_blueprint = inner
639                    .store_id
640                    .as_ref()
641                    .is_some_and(|id| id.kind() == StoreKindProto::Blueprint);
642
643                if is_blueprint {
644                    // Persist blueprint messages forever.
645                    self.persistent.push_back(msg.into());
646                } else if inner.is_static == Some(true) {
647                    self.static_.push_back(msg.into());
648                } else {
649                    // Recording data
650                    self.disposable.push_back(msg.into());
651                }
652            }
653        }
654    }
655
656    pub fn gc(&mut self, max_bytes: u64) {
657        if self.size_bytes() <= max_bytes {
658            // We're not using too much memory.
659            return;
660        }
661
662        re_tracing::profile_scope!("Drop messages");
663        re_log::info_once!(
664            "Memory limit ({}) exceeded. Dropping old log messages from the gRPC proxy server. Clients connecting after this will not see the full history.",
665            re_format::format_bytes(max_bytes as _)
666        );
667
668        let start_size = self.size_bytes();
669        let mut messages_dropped = 0;
670
671        while self.disposable.pop_front().is_some() {
672            messages_dropped += 1;
673            if self.size_bytes() < max_bytes {
674                break;
675            }
676        }
677
678        if max_bytes < self.size_bytes() {
679            re_log::info_once!(
680                "Memory limit ({}) exceeded. Dropping old *static* log messages as well. Clients connecting after this will no longer see the complete set of static data.",
681                re_format::format_bytes(max_bytes as _)
682            );
683            while self.static_.pop_front().is_some() {
684                messages_dropped += 1;
685                if self.size_bytes() < max_bytes {
686                    break;
687                }
688            }
689        }
690
691        let bytes_dropped = start_size - self.size_bytes();
692
693        re_log::trace!(
694            "Dropped {} bytes in {messages_dropped} message(s)",
695            re_format::format_bytes(bytes_dropped as _)
696        );
697
698        if max_bytes < self.size_bytes() {
699            re_log::warn_once!(
700                "The gRPC server is using more memory than the given memory limit ({}), despite having garbage-collected all non-persistent messages.",
701                re_format::format_bytes(max_bytes as _)
702            );
703        }
704    }
705}
706
707// -----------------------------------------------------------------------------------
708
709/// Main event loop for the server, which runs in its own task.
710///
711/// Handles message history, and broadcasts messages to clients.
712struct EventLoop {
713    options: ServerOptions,
714
715    /// New log messages are broadcast to all clients.
716    broadcast_log_tx: broadcast::Sender<LogMsgProto>,
717
718    /// New table messages are broadcast to all clients.
719    broadcast_table_tx: broadcast::Sender<TableMsgProto>,
720
721    /// Channel for incoming events.
722    event_rx: mpsc::Receiver<Event>,
723
724    messages: MessageBuffer,
725}
726
727impl EventLoop {
728    fn new(
729        options: ServerOptions,
730        event_rx: mpsc::Receiver<Event>,
731        broadcast_log_tx: broadcast::Sender<LogMsgProto>,
732        broadcast_table_tx: broadcast::Sender<TableMsgProto>,
733    ) -> Self {
734        Self {
735            options,
736            broadcast_log_tx,
737            broadcast_table_tx,
738            event_rx,
739            messages: Default::default(),
740        }
741    }
742
743    async fn run_in_place(mut self) {
744        loop {
745            let Some(event) = self.event_rx.recv().await else {
746                break;
747            };
748
749            match event {
750                Event::NewClient(channel) => {
751                    channel
752                        .send((
753                            self.messages.all(self.options.playback_behavior),
754                            self.broadcast_log_tx.subscribe(),
755                            self.broadcast_table_tx.subscribe(),
756                        ))
757                        .ok();
758                }
759                Event::Message(msg) => self.handle_msg(msg),
760                Event::Table(table) => self.handle_table(table),
761            }
762        }
763    }
764
765    fn handle_msg(&mut self, msg: LogMsgProto) {
766        self.broadcast_log_tx.send(msg.clone()).ok();
767
768        if self.is_history_disabled() {
769            // no need to gc or maintain history
770            return;
771        }
772
773        self.gc_if_using_too_much_ram();
774
775        self.messages.add_log_msg(msg);
776    }
777
778    fn handle_table(&mut self, table: TableMsgProto) {
779        self.broadcast_table_tx.send(table.clone()).ok();
780
781        if self.is_history_disabled() {
782            // no need to gc or maintain history
783            return;
784        }
785
786        self.gc_if_using_too_much_ram();
787
788        self.messages.add_table(table);
789    }
790
791    fn is_history_disabled(&self) -> bool {
792        self.options.memory_limit.max_bytes.is_some_and(|b| b == 0)
793    }
794
795    fn gc_if_using_too_much_ram(&mut self) {
796        let Some(max_bytes) = self.options.memory_limit.max_bytes else {
797            // Unlimited memory!
798            return;
799        };
800
801        self.messages.gc(max_bytes as _);
802    }
803}
804
805impl SizeBytes for TableMsgProto {
806    fn heap_size_bytes(&self) -> u64 {
807        let Self { id, data } = self;
808        id.heap_size_bytes() + data.heap_size_bytes()
809    }
810}
811
812pub struct MessageProxy {
813    options: ServerOptions,
814    _queue_task_handle: tokio::task::JoinHandle<()>,
815    event_tx: mpsc::Sender<Event>,
816}
817
818impl MessageProxy {
819    pub fn new(options: ServerOptions) -> Self {
820        Self::new_with_recv(options).0
821    }
822
823    fn new_with_recv(
824        options: ServerOptions,
825    ) -> (
826        Self,
827        broadcast::Receiver<LogMsgProto>,
828        broadcast::Receiver<TableMsgProto>,
829    ) {
830        let (event_tx, event_rx) = mpsc::channel(MESSAGE_QUEUE_CAPACITY);
831        let (broadcast_log_tx, broadcast_log_rx) = broadcast::channel(MESSAGE_QUEUE_CAPACITY);
832        let (broadcast_table_tx, broadcast_table_rx) = broadcast::channel(MESSAGE_QUEUE_CAPACITY);
833
834        let task_handle = tokio::spawn(async move {
835            EventLoop::new(options, event_rx, broadcast_log_tx, broadcast_table_tx)
836                .run_in_place()
837                .await;
838        });
839
840        (
841            Self {
842                options,
843                _queue_task_handle: task_handle,
844                event_tx,
845            },
846            broadcast_log_rx,
847            broadcast_table_rx,
848        )
849    }
850
851    async fn push_msg(&self, msg: LogMsgProto) {
852        self.event_tx.send(Event::Message(msg)).await.ok();
853    }
854
855    async fn push_table(&self, table: TableMsgProto) {
856        self.event_tx.send(Event::Table(table)).await.ok();
857    }
858
859    async fn new_client_message_stream(&self) -> ReadMessagesStream {
860        let (sender, receiver) = oneshot::channel();
861        if let Err(err) = self.event_tx.send(Event::NewClient(sender)).await {
862            re_log::error!("Error accepting new client: {err}");
863            return Box::pin(tokio_stream::empty());
864        }
865        let (history, log_channel, _) = match receiver.await {
866            Ok(v) => v,
867            Err(err) => {
868                re_log::error!("Error accepting new client: {err}");
869                return Box::pin(tokio_stream::empty());
870            }
871        };
872
873        let history = tokio_stream::iter(
874            history
875                .into_iter()
876                .filter_map(|log_msg| {
877                    if let LogOrTableMsgProto::LogMsg(log_msg) = log_msg {
878                        Some(ReadMessagesResponse {
879                            log_msg: Some(log_msg),
880                        })
881                    } else {
882                        None
883                    }
884                })
885                .map(Ok),
886        );
887        let channel = BroadcastStream::new(log_channel).map(|result| {
888            result
889                .map(|log_msg| ReadMessagesResponse {
890                    log_msg: Some(log_msg),
891                })
892                .map_err(|err| {
893                    re_log::error!("Error reading message from broadcast channel: {err}");
894                    tonic::Status::internal("internal channel error")
895                })
896        });
897
898        match self.options.playback_behavior {
899            PlaybackBehavior::OldestFirst => Box::pin(history.chain(channel)),
900            PlaybackBehavior::NewestFirst => Box::pin(PriorityMerge::new(channel, history)),
901        }
902    }
903
904    async fn new_client_table_stream(&self) -> ReadTablesStream {
905        let (sender, receiver) = oneshot::channel();
906        if let Err(err) = self.event_tx.send(Event::NewClient(sender)).await {
907            re_log::error!("Error accepting new client: {err}");
908            return Box::pin(tokio_stream::empty());
909        }
910        let (history, _, table_channel) = match receiver.await {
911            Ok(v) => v,
912            Err(err) => {
913                re_log::error!("Error accepting new client: {err}");
914                return Box::pin(tokio_stream::empty());
915            }
916        };
917
918        let history = tokio_stream::iter(
919            history
920                .into_iter()
921                .filter_map(|table| {
922                    if let LogOrTableMsgProto::Table(table) = table {
923                        Some(ReadTablesResponse {
924                            id: Some(table.id),
925                            data: Some(table.data),
926                        })
927                    } else {
928                        None
929                    }
930                })
931                .map(Ok),
932        );
933        let channel = BroadcastStream::new(table_channel).map(|result| {
934            result
935                .map(|table| ReadTablesResponse {
936                    id: Some(table.id),
937                    data: Some(table.data),
938                })
939                .map_err(|err| {
940                    re_log::error!("Error reading message from broadcast channel: {err}");
941                    tonic::Status::internal("internal channel error")
942                })
943        });
944
945        Box::pin(history.chain(channel))
946    }
947}
948
949type ReadMessagesStream = Pin<Box<dyn Stream<Item = tonic::Result<ReadMessagesResponse>> + Send>>;
950type ReadTablesStream = Pin<Box<dyn Stream<Item = tonic::Result<ReadTablesResponse>> + Send>>;
951
952#[tonic::async_trait]
953impl message_proxy_service_server::MessageProxyService for MessageProxy {
954    async fn write_messages(
955        &self,
956        request: tonic::Request<tonic::Streaming<WriteMessagesRequest>>,
957    ) -> tonic::Result<tonic::Response<WriteMessagesResponse>> {
958        let mut stream = request.into_inner();
959        loop {
960            match stream.message().await {
961                Ok(Some(WriteMessagesRequest {
962                    log_msg: Some(log_msg),
963                })) => {
964                    self.push_msg(log_msg).await;
965                }
966
967                Ok(Some(WriteMessagesRequest { log_msg: None })) => {
968                    re_log::warn!("missing log_msg in WriteMessagesRequest");
969                }
970
971                Ok(None) => {
972                    // Connection was closed
973                    break;
974                }
975
976                Err(err) => {
977                    re_log::error!("Error while receiving messages: {}", TonicStatusError(err));
978                    break;
979                }
980            }
981        }
982
983        Ok(tonic::Response::new(WriteMessagesResponse {}))
984    }
985
986    type ReadMessagesStream = ReadMessagesStream;
987
988    async fn read_messages(
989        &self,
990        _: tonic::Request<ReadMessagesRequest>,
991    ) -> tonic::Result<tonic::Response<Self::ReadMessagesStream>> {
992        Ok(tonic::Response::new(self.new_client_message_stream().await))
993    }
994
995    type ReadTablesStream = ReadTablesStream;
996
997    async fn write_table(
998        &self,
999        request: tonic::Request<WriteTableRequest>,
1000    ) -> tonic::Result<tonic::Response<WriteTableResponse>> {
1001        if let WriteTableRequest {
1002            id: Some(id),
1003            data: Some(data),
1004        } = request.into_inner()
1005        {
1006            self.push_table(TableMsgProto { id, data }).await;
1007        } else {
1008            re_log::warn!("malformed `WriteTableRequest`");
1009        }
1010
1011        Ok(tonic::Response::new(WriteTableResponse {}))
1012    }
1013
1014    async fn read_tables(
1015        &self,
1016        _: tonic::Request<ReadTablesRequest>,
1017    ) -> tonic::Result<tonic::Response<Self::ReadTablesStream>> {
1018        Ok(tonic::Response::new(self.new_client_table_stream().await))
1019    }
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024    use std::net::SocketAddr;
1025    use std::sync::Arc;
1026    use std::time::Duration;
1027
1028    use itertools::{Itertools as _, chain};
1029    use similar_asserts::assert_eq;
1030    use tokio::net::TcpListener;
1031    use tokio_util::sync::CancellationToken;
1032    use tonic::transport::Channel;
1033    use tonic::transport::Endpoint;
1034    use tonic::transport::server::TcpIncoming;
1035
1036    use re_chunk::RowId;
1037    use re_log_encoding::rrd::Compression;
1038    use re_log_types::{LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource};
1039    use re_protos::sdk_comms::v1alpha1::{
1040        message_proxy_service_client::MessageProxyServiceClient,
1041        message_proxy_service_server::MessageProxyServiceServer,
1042    };
1043
1044    use super::*;
1045
1046    #[derive(Clone)]
1047    struct Completion(Arc<CancellationToken>);
1048
1049    impl Drop for Completion {
1050        fn drop(&mut self) {
1051            self.finish();
1052        }
1053    }
1054
1055    impl Completion {
1056        fn new() -> Self {
1057            Self(Arc::new(CancellationToken::new()))
1058        }
1059
1060        fn finish(&self) {
1061            self.0.cancel();
1062        }
1063
1064        async fn wait(&self) {
1065            self.0.cancelled().await;
1066        }
1067    }
1068
1069    fn set_store_info_msg(store_id: &StoreId) -> LogMsg {
1070        LogMsg::SetStoreInfo(SetStoreInfo {
1071            row_id: *RowId::new(),
1072            info: StoreInfo::new(
1073                store_id.clone(),
1074                StoreSource::RustSdk {
1075                    rustc_version: String::new(),
1076                    llvm_version: String::new(),
1077                },
1078            ),
1079        })
1080    }
1081
1082    /// Generates `n` log messages wrapped in a `SetStoreInfo` at the start and `BlueprintActivationCommand` at the end,
1083    /// to exercise message ordering.
1084    fn fake_log_stream_blueprint(n: usize) -> Vec<LogMsg> {
1085        let store_id = StoreId::random(StoreKind::Blueprint, "test_app");
1086
1087        let mut messages = Vec::new();
1088        messages.push(set_store_info_msg(&store_id));
1089        for _ in 0..n {
1090            messages.push(LogMsg::ArrowMsg(
1091                store_id.clone(),
1092                re_chunk::Chunk::builder("test_entity")
1093                    .with_archetype(
1094                        re_chunk::RowId::new(),
1095                        re_log_types::TimePoint::default().with(
1096                            re_log_types::Timeline::new_sequence("blueprint"),
1097                            re_log_types::TimeInt::from_millis(re_log_types::NonMinI64::MIN),
1098                        ),
1099                        &re_types::blueprint::archetypes::Background::new(
1100                            re_types::blueprint::components::BackgroundKind::SolidColor,
1101                        )
1102                        .with_color([255, 0, 0]),
1103                    )
1104                    .build()
1105                    .unwrap()
1106                    .to_arrow_msg()
1107                    .unwrap(),
1108            ));
1109        }
1110        messages.push(LogMsg::BlueprintActivationCommand(
1111            re_log_types::BlueprintActivationCommand {
1112                blueprint_id: store_id,
1113                make_active: true,
1114                make_default: true,
1115            },
1116        ));
1117
1118        messages
1119    }
1120
1121    #[derive(Clone, Copy)]
1122    enum Temporalness {
1123        Static,
1124        Temporal,
1125    }
1126
1127    fn fake_log_stream_recording(n: usize) -> Vec<LogMsg> {
1128        let store_id = StoreId::random(StoreKind::Recording, "test_app");
1129
1130        chain!(
1131            [set_store_info_msg(&store_id)],
1132            generate_log_messages(&store_id, n, Temporalness::Temporal)
1133        )
1134        .collect()
1135    }
1136
1137    fn generate_log_messages(
1138        store_id: &StoreId,
1139        n: usize,
1140        temporalness: Temporalness,
1141    ) -> Vec<LogMsg> {
1142        let mut messages = Vec::new();
1143        for _ in 0..n {
1144            let timepoint = match temporalness {
1145                Temporalness::Static => re_log_types::TimePoint::STATIC,
1146                Temporalness::Temporal => re_log_types::TimePoint::default().with(
1147                    re_log_types::Timeline::new_sequence("log_time"),
1148                    re_log_types::TimeInt::from_millis(re_log_types::NonMinI64::MIN),
1149                ),
1150            };
1151
1152            messages.push(LogMsg::ArrowMsg(
1153                store_id.clone(),
1154                re_chunk::Chunk::builder("test_entity")
1155                    .with_archetype(
1156                        re_chunk::RowId::new(),
1157                        timepoint,
1158                        &re_types::archetypes::Points2D::new([(0.0, 0.0), (1.0, 1.0), (2.0, 2.0)]),
1159                    )
1160                    .build()
1161                    .unwrap()
1162                    .to_arrow_msg()
1163                    .unwrap(),
1164            ));
1165        }
1166        messages
1167    }
1168
1169    async fn setup() -> (Completion, SocketAddr) {
1170        setup_opt(ServerOptions {
1171            playback_behavior: PlaybackBehavior::OldestFirst,
1172            memory_limit: MemoryLimit::UNLIMITED,
1173        })
1174        .await
1175    }
1176
1177    async fn setup_with_memory_limit(memory_limit: MemoryLimit) -> (Completion, SocketAddr) {
1178        setup_opt(ServerOptions {
1179            playback_behavior: PlaybackBehavior::OldestFirst,
1180            memory_limit,
1181        })
1182        .await
1183    }
1184
1185    async fn setup_opt(options: ServerOptions) -> (Completion, SocketAddr) {
1186        let completion = Completion::new();
1187
1188        let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1189        let addr = tcp_listener.local_addr().unwrap();
1190
1191        tokio::spawn({
1192            let completion = completion.clone();
1193            async move {
1194                tonic::transport::Server::builder()
1195                    .add_service(
1196                        MessageProxyServiceServer::new(super::MessageProxy::new(options))
1197                            .max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE)
1198                            .max_encoding_message_size(MAX_ENCODING_MESSAGE_SIZE),
1199                    )
1200                    .serve_with_incoming_shutdown(
1201                        TcpIncoming::from(tcp_listener).with_nodelay(Some(true)),
1202                        completion.wait(),
1203                    )
1204                    .await
1205                    .unwrap();
1206            }
1207        });
1208
1209        (completion, addr)
1210    }
1211
1212    async fn make_client(addr: SocketAddr) -> MessageProxyServiceClient<Channel> {
1213        MessageProxyServiceClient::new(
1214            Endpoint::from_shared(format!("http://{addr}"))
1215                .unwrap()
1216                .connect()
1217                .await
1218                .unwrap(),
1219        )
1220        .max_decoding_message_size(crate::MAX_DECODING_MESSAGE_SIZE)
1221    }
1222
1223    async fn write_messages(
1224        client: &mut MessageProxyServiceClient<Channel>,
1225        messages: Vec<LogMsg>,
1226    ) {
1227        client
1228            .write_messages(tokio_stream::iter(
1229                messages
1230                    .clone()
1231                    .into_iter()
1232                    .map(|msg| msg.to_transport(Compression::Off).unwrap())
1233                    .map(|msg| WriteMessagesRequest {
1234                        log_msg: Some(msg.into()),
1235                    }),
1236            ))
1237            .await
1238            .unwrap();
1239    }
1240
1241    async fn read_log_stream(
1242        log_stream: &mut tonic::Response<tonic::Streaming<ReadMessagesResponse>>,
1243        n: usize,
1244    ) -> Vec<LogMsg> {
1245        let mut app_id_cache = re_log_encoding::CachingApplicationIdInjector::default();
1246
1247        let mut stream_ref = log_stream.get_mut().map(|result| {
1248            let msg = result.unwrap().log_msg.unwrap().msg.unwrap();
1249            msg.to_application((&mut app_id_cache, None)).unwrap()
1250        });
1251
1252        let mut messages = Vec::new();
1253        for _ in 0..n {
1254            messages.push(stream_ref.next().await.unwrap());
1255        }
1256        messages
1257    }
1258
1259    #[tokio::test]
1260    async fn pubsub_basic() {
1261        let (completion, addr) = setup().await;
1262        let mut client = make_client(addr).await; // We use the same client for both producing and consuming
1263        let messages = fake_log_stream_blueprint(3);
1264
1265        // start reading
1266        let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1267
1268        write_messages(&mut client, messages.clone()).await;
1269
1270        // the messages should be echoed to us
1271        let actual = read_log_stream(&mut log_stream, messages.len()).await;
1272
1273        assert_eq!(messages, actual);
1274
1275        // While `SetStoreInfo` is sent first in `fake_log_stream`,
1276        // we can observe that it's also received first,
1277        // even though it is actually stored out of order in `persistent_message_queue`.
1278        assert!(matches!(messages[0], LogMsg::SetStoreInfo(..)));
1279        assert!(matches!(actual[0], LogMsg::SetStoreInfo(..)));
1280
1281        completion.finish();
1282    }
1283
1284    #[tokio::test]
1285    async fn pubsub_history() {
1286        let (completion, addr) = setup().await;
1287        let mut client = make_client(addr).await; // We use the same client for both producing and consuming
1288        let messages = fake_log_stream_blueprint(3);
1289
1290        // don't read anything yet - these messages should be sent to us as part of history when we call `read_messages` later
1291
1292        write_messages(&mut client, messages.clone()).await;
1293
1294        // Start reading now - we should receive full history at this point:
1295        let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1296
1297        let actual = read_log_stream(&mut log_stream, messages.len()).await;
1298        assert_eq!(messages, actual);
1299
1300        completion.finish();
1301    }
1302
1303    #[tokio::test]
1304    async fn one_producer_many_consumers() {
1305        let (completion, addr) = setup().await;
1306        let mut producer = make_client(addr).await; // We use separate clients for producing and consuming
1307        let mut consumers = vec![make_client(addr).await, make_client(addr).await];
1308        let messages = fake_log_stream_blueprint(3);
1309
1310        // Initialize multiple read streams:
1311        let mut log_streams = vec![];
1312        for consumer in &mut consumers {
1313            log_streams.push(
1314                consumer
1315                    .read_messages(ReadMessagesRequest {})
1316                    .await
1317                    .unwrap(),
1318            );
1319        }
1320
1321        write_messages(&mut producer, messages.clone()).await;
1322
1323        // Each consumer should've received them:
1324        for log_stream in &mut log_streams {
1325            let actual = read_log_stream(log_stream, messages.len()).await;
1326            assert_eq!(messages, actual);
1327        }
1328
1329        completion.finish();
1330    }
1331
1332    #[tokio::test]
1333    async fn many_producers_many_consumers() {
1334        let (completion, addr) = setup().await;
1335        let mut producers = vec![make_client(addr).await, make_client(addr).await];
1336        let mut consumers = vec![make_client(addr).await, make_client(addr).await];
1337        let messages = fake_log_stream_blueprint(3);
1338
1339        // Initialize multiple read streams:
1340        let mut log_streams = vec![];
1341        for consumer in &mut consumers {
1342            log_streams.push(
1343                consumer
1344                    .read_messages(ReadMessagesRequest {})
1345                    .await
1346                    .unwrap(),
1347            );
1348        }
1349
1350        // Write a few messages using each producer:
1351        for producer in &mut producers {
1352            write_messages(producer, messages.clone()).await;
1353        }
1354
1355        let expected = [messages.clone(), messages.clone()].concat();
1356
1357        // Each consumer should've received one set of messages from each producer.
1358        // Note that in this test we also guarantee the order of messages across producers,
1359        // due to the `write_messages` calls being sequential.
1360
1361        for log_stream in &mut log_streams {
1362            let actual = read_log_stream(log_stream, expected.len()).await;
1363            assert_eq!(actual, expected);
1364        }
1365
1366        completion.finish();
1367    }
1368
1369    #[tokio::test]
1370    async fn memory_limit_drops_messages() {
1371        // Use an absurdly low memory limit to force all messages to be dropped immediately from history
1372        let (completion, addr) = setup_with_memory_limit(MemoryLimit::from_bytes(1)).await;
1373        let mut client = make_client(addr).await;
1374        let messages = fake_log_stream_recording(3);
1375
1376        write_messages(&mut client, messages.clone()).await;
1377
1378        // Start reading
1379        let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1380        let mut actual = vec![];
1381        loop {
1382            let timeout_stream = log_stream.get_mut().timeout(Duration::from_millis(100));
1383            tokio::pin!(timeout_stream);
1384            let timeout_result = timeout_stream.try_next().await;
1385            let mut app_id_cache = re_log_encoding::CachingApplicationIdInjector::default();
1386            match timeout_result {
1387                Ok(Some(value)) => {
1388                    let msg = value.unwrap().log_msg.unwrap().msg.unwrap();
1389                    actual.push(msg.to_application((&mut app_id_cache, None)).unwrap());
1390                }
1391
1392                // Stream closed | Timed out
1393                Ok(None) | Err(_) => break,
1394            }
1395        }
1396
1397        // The GC runs _before_ a message is stored, so we should see the persistent message, and the last message sent.
1398        assert_eq!(actual.len(), 2);
1399        assert_eq!(&actual[0], &messages[0]);
1400        assert_eq!(&actual[1], messages.last().unwrap());
1401
1402        completion.finish();
1403    }
1404
1405    #[tokio::test]
1406    async fn memory_limit_does_not_drop_blueprint() {
1407        // Use an absurdly low memory limit to force all messages to be dropped immediately from history
1408        let (completion, addr) = setup_with_memory_limit(MemoryLimit::from_bytes(1)).await;
1409        let mut client = make_client(addr).await;
1410        let messages = fake_log_stream_blueprint(3);
1411
1412        // Write some messages
1413        write_messages(&mut client, messages.clone()).await;
1414
1415        // Start reading
1416        let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1417        let mut actual = vec![];
1418        loop {
1419            let timeout_stream = log_stream.get_mut().timeout(Duration::from_millis(100));
1420            tokio::pin!(timeout_stream);
1421            let timeout_result = timeout_stream.try_next().await;
1422            let mut app_id_cache = re_log_encoding::CachingApplicationIdInjector::default();
1423            match timeout_result {
1424                Ok(Some(value)) => {
1425                    let msg = value.unwrap().log_msg.unwrap().msg.unwrap();
1426                    actual.push(msg.to_application((&mut app_id_cache, None)).unwrap());
1427                }
1428
1429                // Stream closed | Timed out
1430                Ok(None) | Err(_) => break,
1431            }
1432        }
1433
1434        // The stream in this case only contains SetStoreInfo, ArrowMsg with StoreKind::Blueprint,
1435        // and BlueprintActivationCommand. None of these things should be GC'd:
1436        assert_eq!(messages, actual);
1437
1438        completion.finish();
1439    }
1440
1441    #[tokio::test]
1442    async fn memory_limit_does_not_interrupt_stream() {
1443        let memory_limits = [
1444            0, // Will actually disable the message buffer and GC logic. Good to test that!
1445            1, // An absurdly low memory limit to force all messages to be dropped immediately from history
1446        ];
1447
1448        for memory_limit in memory_limits {
1449            let (completion, addr) =
1450                setup_with_memory_limit(MemoryLimit::from_bytes(memory_limit)).await;
1451            let mut client = make_client(addr).await; // We use the same client for both producing and consuming
1452            let messages = fake_log_stream_blueprint(3);
1453
1454            // Start reading
1455            let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1456
1457            write_messages(&mut client, messages.clone()).await;
1458
1459            // The messages should be echoed to us, even though none of them will be stored in history
1460            let actual = read_log_stream(&mut log_stream, messages.len()).await;
1461            assert_eq!(messages, actual);
1462
1463            completion.finish();
1464        }
1465    }
1466
1467    #[tokio::test]
1468    async fn static_data_is_returned_first() {
1469        let (completion, addr) = setup_with_memory_limit(MemoryLimit::UNLIMITED).await;
1470        let mut client = make_client(addr).await;
1471
1472        let store_id = StoreId::random(StoreKind::Recording, "test_app");
1473
1474        let set_store_info = vec![set_store_info_msg(&store_id)];
1475        let first_static = generate_log_messages(&store_id, 3, Temporalness::Static);
1476        let first_temporal = generate_log_messages(&store_id, 3, Temporalness::Temporal);
1477        let second_static = generate_log_messages(&store_id, 3, Temporalness::Static);
1478
1479        write_messages(&mut client, set_store_info.clone()).await;
1480        write_messages(&mut client, first_static.clone()).await;
1481        write_messages(&mut client, first_temporal.clone()).await;
1482        write_messages(&mut client, second_static.clone()).await;
1483
1484        // All static data should always come before temporal data:
1485        let expected =
1486            itertools::chain!(set_store_info, first_static, second_static, first_temporal)
1487                .collect_vec();
1488
1489        let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1490        let actual = read_log_stream(&mut log_stream, expected.len()).await;
1491
1492        assert_eq!(actual, expected);
1493
1494        completion.finish();
1495    }
1496
1497    #[tokio::test]
1498    async fn playback_newest_first() {
1499        let (completion, addr) = setup_opt(ServerOptions {
1500            playback_behavior: PlaybackBehavior::NewestFirst, // this is what we want to test
1501            memory_limit: MemoryLimit::UNLIMITED,
1502        })
1503        .await;
1504        let mut client = make_client(addr).await;
1505
1506        let store_id = StoreId::random(StoreKind::Recording, "test_app");
1507
1508        let set_store_info = vec![set_store_info_msg(&store_id)];
1509        let first_statics = generate_log_messages(&store_id, 3, Temporalness::Static);
1510        let temporals = generate_log_messages(&store_id, 3, Temporalness::Temporal);
1511        let second_statics = generate_log_messages(&store_id, 3, Temporalness::Static);
1512
1513        write_messages(&mut client, set_store_info.clone()).await;
1514        write_messages(&mut client, first_statics.clone()).await;
1515        write_messages(&mut client, temporals.clone()).await;
1516        write_messages(&mut client, second_statics.clone()).await;
1517
1518        // All static data should always come before temporal data:
1519        let expected = itertools::chain!(
1520            set_store_info.into_iter().rev(),
1521            second_statics.into_iter().rev(),
1522            first_statics.into_iter().rev(),
1523            temporals.into_iter().rev()
1524        )
1525        .collect_vec();
1526
1527        let mut log_stream = client.read_messages(ReadMessagesRequest {}).await.unwrap();
1528        let actual = read_log_stream(&mut log_stream, expected.len()).await;
1529
1530        assert_eq!(actual, expected);
1531
1532        completion.finish();
1533    }
1534}