Skip to main content

exoware_server/
connect.rs

1//! Ingest, query, compact, and stream services; storage is provided by capability traits.
2
3#![allow(refining_impl_trait)]
4
5use std::collections::HashMap;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9
10use bytes::Bytes;
11use connectrpc::{Chain, ConnectError, ConnectRpcService, Limits, RequestContext as Context};
12use exoware_proto::common::KvEntry;
13use exoware_proto::compact::{
14    PruneResponse, Service as CompactApi, ServiceServer as CompactServiceServer,
15};
16use exoware_proto::google::rpc::{ErrorInfo, RetryInfo};
17use exoware_proto::ingest::{
18    PutResponse as ProtoPutResponse, Service as IngestApi, ServiceServer as IngestServiceServer,
19};
20use exoware_proto::query::{
21    Detail, GetManyEntry, GetManyFrame, GetResponse, RangeFrame, ReduceResponse,
22    Service as QueryApi, ServiceServer as QueryServiceServer,
23};
24use exoware_proto::store::stream::v1::{
25    GetRequestView, GetResponse as StreamGetResponse, Service as StreamApi,
26    ServiceServer as StreamServiceServer, SubscribeRequestView, SubscribeResponse,
27};
28use exoware_proto::stream_filter::{BytesFilter, StreamFilter};
29use exoware_proto::{
30    connect_compression_registry, parse_range_traversal_direction,
31    to_domain_reduce_request_from_view, to_proto_optional_reduced_value, to_proto_reduced_value,
32    with_error_info_detail, with_query_detail, with_retry_info_detail, RangeTraversalDirection,
33};
34use exoware_sdk as exoware_proto;
35use exoware_sdk::keys::Key;
36use exoware_sdk::match_key::MatchKey;
37use exoware_sdk::store::common::v1::bytes_filter::KindView as ProtoBytesFilterKindView;
38use futures::{stream as stream_util, Stream};
39use tokio::sync::Notify;
40
41use crate::reduce::RangeReducer;
42use crate::stream::{StreamHub, StreamNotifier};
43use crate::validate::{self, IngestLimits};
44use crate::{Ingest, Log, Prune, Query, QueryExtra, RangeScan, StoreEngine};
45
46// TODO (#57): Make limits configurable.
47const MAX_CONNECTRPC_BODY_BYTES: usize = 256 * 1024 * 1024;
48const RANGE_STREAM_MAX_FRAME_ROWS: usize = 4096;
49const REDUCE_SCAN_BATCH_SIZE: usize = 4096;
50
51fn query_detail(sequence_number: u64, extra: QueryExtra) -> Detail {
52    Detail {
53        sequence_number,
54        extra,
55        ..Default::default()
56    }
57}
58
59struct RangeStreamRequest {
60    start_key: Key,
61    end_key: Key,
62    limit: usize,
63    batch_size: usize,
64    forward: bool,
65    sequence_number: u64,
66}
67
68async fn range_stream<Q>(
69    query: Arc<Q>,
70    request: RangeStreamRequest,
71) -> Result<Pin<Box<dyn Stream<Item = Result<RangeFrame, ConnectError>> + Send>>, ConnectError>
72where
73    Q: Query,
74{
75    let RangeStreamRequest {
76        start_key,
77        end_key,
78        limit,
79        batch_size,
80        forward,
81        sequence_number,
82    } = request;
83    let entries = query
84        .range_scan(start_key, end_key, limit, forward)
85        .await
86        .map_err(ConnectError::internal)?;
87
88    Ok(Box::pin(stream_util::unfold(
89        Some((entries, false)),
90        move |state| async move {
91            let (mut entries, emitted_frame) = state?;
92            let batch = match entries.next_batch(batch_size).await {
93                Ok(batch) => batch,
94                Err(e) => return Some((Err(ConnectError::internal(e)), None)),
95            };
96            let detail = query_detail(sequence_number, batch.extra);
97            if batch.rows.is_empty() {
98                if emitted_frame && detail.extra.is_empty() {
99                    return None;
100                }
101                return Some((
102                    Ok(RangeFrame {
103                        detail: Some(detail).into(),
104                        ..Default::default()
105                    }),
106                    None,
107                ));
108            }
109
110            let mut chunk = Vec::with_capacity(batch.rows.len());
111            for (key, value) in batch.rows {
112                chunk.push(KvEntry {
113                    key: key.into(),
114                    value,
115                    ..Default::default()
116                });
117            }
118            Some((
119                Ok(RangeFrame {
120                    results: chunk,
121                    detail: Some(detail).into(),
122                    ..Default::default()
123                }),
124                Some((entries, true)),
125            ))
126        },
127    )))
128}
129
130/// All-in-one single-process composition for a backend that serves every store capability.
131/// Split deployments construct the narrower capability states directly.
132pub struct AppState<E> {
133    /// Backend that implements every store capability.
134    pub engine: Arc<E>,
135    /// Limits enforced by the ingest service before writing.
136    pub ingest_limits: IngestLimits,
137    /// Gates ingest (writes) only. Query and compact remain available during drains so that
138    /// in-flight reads can complete while the worker sheds write traffic.
139    pub ready: Arc<AtomicBool>,
140    /// Shared fan-out hub for `store.stream.v1.Subscribe`.
141    pub stream: Arc<StreamHub>,
142}
143
144impl<E> Clone for AppState<E> {
145    fn clone(&self) -> Self {
146        Self {
147            engine: self.engine.clone(),
148            ingest_limits: self.ingest_limits,
149            ready: self.ready.clone(),
150            stream: self.stream.clone(),
151        }
152    }
153}
154
155impl<E> AppState<E>
156where
157    E: StoreEngine,
158{
159    pub fn new(engine: Arc<E>) -> Self {
160        let current_sequence = engine.current_sequence();
161        Self {
162            engine,
163            ingest_limits: IngestLimits::default(),
164            ready: Arc::new(AtomicBool::new(true)),
165            stream: Arc::new(StreamHub::new(current_sequence)),
166        }
167    }
168
169    pub fn with_ingest_limits(mut self, limits: IngestLimits) -> Self {
170        self.ingest_limits = limits;
171        self
172    }
173}
174
175/// State for an ingest-only service.
176pub struct IngestState<I> {
177    /// Backend used for writes.
178    pub ingest: Arc<I>,
179    /// Limits enforced before writes reach the backend.
180    pub limits: IngestLimits,
181    /// Gates ingest writes only.
182    pub ready: Arc<AtomicBool>,
183    /// Optional live-stream notifier.
184    pub notifier: Option<Arc<dyn StreamNotifier>>,
185}
186
187impl<I> Clone for IngestState<I> {
188    fn clone(&self) -> Self {
189        Self {
190            ingest: self.ingest.clone(),
191            limits: self.limits,
192            ready: self.ready.clone(),
193            notifier: self.notifier.clone(),
194        }
195    }
196}
197
198impl<I> IngestState<I>
199where
200    I: Ingest,
201{
202    pub fn new(ingest: Arc<I>) -> Self {
203        Self {
204            ingest,
205            limits: IngestLimits::default(),
206            ready: Arc::new(AtomicBool::new(true)),
207            notifier: None,
208        }
209    }
210
211    pub fn with_notifier(ingest: Arc<I>, notifier: Arc<dyn StreamNotifier>) -> Self {
212        Self {
213            ingest,
214            limits: IngestLimits::default(),
215            ready: Arc::new(AtomicBool::new(true)),
216            notifier: Some(notifier),
217        }
218    }
219
220    pub fn with_limits(mut self, limits: IngestLimits) -> Self {
221        self.limits = limits;
222        self
223    }
224}
225
226impl<E> From<AppState<E>> for IngestState<E> {
227    fn from(state: AppState<E>) -> Self {
228        Self {
229            ingest: state.engine,
230            limits: state.ingest_limits,
231            ready: state.ready,
232            notifier: Some(state.stream),
233        }
234    }
235}
236
237/// State for a query-only service.
238pub struct QueryState<Q> {
239    /// Backend used for point and range reads.
240    pub query: Arc<Q>,
241}
242
243impl<Q> Clone for QueryState<Q> {
244    fn clone(&self) -> Self {
245        Self {
246            query: self.query.clone(),
247        }
248    }
249}
250
251impl<Q> QueryState<Q>
252where
253    Q: Query,
254{
255    pub fn new(query: Arc<Q>) -> Self {
256        Self { query }
257    }
258}
259
260impl<E> From<AppState<E>> for QueryState<E> {
261    fn from(state: AppState<E>) -> Self {
262        Self {
263            query: state.engine,
264        }
265    }
266}
267
268/// State for a compact-only service.
269pub struct CompactState<P> {
270    /// Backend used for prune requests.
271    pub prune: Arc<P>,
272}
273
274impl<P> Clone for CompactState<P> {
275    fn clone(&self) -> Self {
276        Self {
277            prune: self.prune.clone(),
278        }
279    }
280}
281
282impl<P> CompactState<P>
283where
284    P: Prune,
285{
286    pub fn new(prune: Arc<P>) -> Self {
287        Self { prune }
288    }
289}
290
291impl<E> From<AppState<E>> for CompactState<E> {
292    fn from(state: AppState<E>) -> Self {
293        Self {
294            prune: state.engine,
295        }
296    }
297}
298
299/// State for a stream-only service.
300pub struct StreamState<L> {
301    /// Backend used to load committed batches.
302    pub log: Arc<L>,
303    /// In-process notifier used to wake subscribers after new batches commit.
304    pub notifier: Arc<dyn StreamNotifier>,
305}
306
307impl<L> Clone for StreamState<L> {
308    fn clone(&self) -> Self {
309        Self {
310            log: self.log.clone(),
311            notifier: self.notifier.clone(),
312        }
313    }
314}
315
316impl<L> StreamState<L>
317where
318    L: Log,
319{
320    pub fn new(log: Arc<L>, notifier: Arc<dyn StreamNotifier>) -> Self {
321        Self { log, notifier }
322    }
323}
324
325impl<E> From<AppState<E>> for StreamState<E> {
326    fn from(state: AppState<E>) -> Self {
327        Self {
328            log: state.engine,
329            notifier: state.stream,
330        }
331    }
332}
333
334pub struct IngestConnect<I> {
335    state: IngestState<I>,
336}
337
338impl<I> Clone for IngestConnect<I> {
339    fn clone(&self) -> Self {
340        Self {
341            state: self.state.clone(),
342        }
343    }
344}
345
346impl<I> IngestConnect<I>
347where
348    I: Ingest,
349{
350    pub fn new(state: impl Into<IngestState<I>>) -> Self {
351        Self {
352            state: state.into(),
353        }
354    }
355}
356
357impl<I> IngestApi for IngestConnect<I>
358where
359    I: Ingest,
360{
361    async fn put(
362        &self,
363        _ctx: Context,
364        request: buffa::view::OwnedView<exoware_proto::store::ingest::v1::PutRequestView<'static>>,
365    ) -> connectrpc::ServiceResult<ProtoPutResponse> {
366        if !self.state.ready.load(Ordering::SeqCst) {
367            return Err(with_error_info_detail(
368                ConnectError::unavailable("ingest is not ready"),
369                ErrorInfo {
370                    reason: "WORKER_NOT_READY".to_string(),
371                    domain: "store.ingest".to_string(),
372                    ..Default::default()
373                },
374            ));
375        }
376
377        validate::validate_put_request(&request, self.state.limits)?;
378
379        let wire = request.bytes();
380        let mut batch = Vec::with_capacity(request.kvs.len());
381        for kv in request.kvs.iter() {
382            let key: Key = wire.slice_ref(kv.key);
383            let value = wire.slice_ref(kv.value);
384            batch.push((key, value));
385        }
386
387        let seq = self
388            .state
389            .ingest
390            .put_batch(batch)
391            .await
392            .map_err(ConnectError::internal)?;
393
394        // Advance any attached stream frontier after the write is committed.
395        if let Some(notifier) = &self.state.notifier {
396            notifier.advance(seq);
397        }
398
399        connectrpc::Response::ok(ProtoPutResponse {
400            sequence_number: seq,
401            ..Default::default()
402        })
403    }
404}
405
406pub struct QueryConnect<Q> {
407    state: QueryState<Q>,
408}
409
410impl<Q> Clone for QueryConnect<Q> {
411    fn clone(&self) -> Self {
412        Self {
413            state: self.state.clone(),
414        }
415    }
416}
417
418impl<Q> QueryConnect<Q>
419where
420    Q: Query,
421{
422    pub fn new(state: impl Into<QueryState<Q>>) -> Self {
423        Self {
424            state: state.into(),
425        }
426    }
427
428    fn current_sequence_number(&self) -> u64 {
429        self.state.query.current_sequence()
430    }
431
432    fn error_detail(&self) -> Detail {
433        Detail {
434            sequence_number: self.current_sequence_number(),
435            ..Default::default()
436        }
437    }
438
439    fn consistency_not_ready_error(&self, required: u64, current: u64) -> ConnectError {
440        let err = with_retry_info_detail(
441            ConnectError::aborted("minimum consistency token is not yet visible"),
442            RetryInfo {
443                retry_delay: Some(buffa_types::google::protobuf::Duration::from(
444                    std::time::Duration::from_secs(1),
445                ))
446                .into(),
447                ..Default::default()
448            },
449        );
450        with_query_detail(
451            with_error_info_detail(
452                err,
453                ErrorInfo {
454                    reason: "CONSISTENCY_NOT_READY".to_string(),
455                    domain: "store.query".to_string(),
456                    metadata: [
457                        ("required_sequence_number".to_string(), required.to_string()),
458                        ("current_sequence_number".to_string(), current.to_string()),
459                    ]
460                    .into_iter()
461                    .collect(),
462                    ..Default::default()
463                },
464            ),
465            self.error_detail(),
466        )
467    }
468
469    fn ensure_min_sequence_number(&self, required: Option<u64>) -> Result<u64, ConnectError> {
470        let current = self.current_sequence_number();
471        if let Some(required) = required {
472            if current < required {
473                return Err(self.consistency_not_ready_error(required, current));
474            }
475        }
476        Ok(current)
477    }
478}
479
480impl<Q> QueryApi for QueryConnect<Q>
481where
482    Q: Query,
483{
484    async fn get(
485        &self,
486        _ctx: Context,
487        request: buffa::view::OwnedView<exoware_proto::store::query::v1::GetRequestView<'static>>,
488    ) -> connectrpc::ServiceResult<GetResponse> {
489        validate::validate_get_request(&request)?;
490        let token = self.ensure_min_sequence_number(request.min_sequence_number)?;
491        let wire = request.bytes();
492        let key: Key = wire.slice_ref(request.key);
493        let (value, extra) = self
494            .state
495            .query
496            .get(key)
497            .await
498            .map_err(ConnectError::internal)?;
499        let detail = query_detail(token, extra);
500        connectrpc::Response::ok(GetResponse {
501            value,
502            detail: Some(detail).into(),
503            ..Default::default()
504        })
505    }
506
507    async fn get_many(
508        &self,
509        _ctx: Context,
510        request: buffa::view::OwnedView<
511            exoware_proto::store::query::v1::GetManyRequestView<'static>,
512        >,
513    ) -> connectrpc::ServiceResult<connectrpc::ServiceStream<GetManyFrame>> {
514        validate::validate_get_many_request(&request)?;
515        let sequence_number = self.ensure_min_sequence_number(request.min_sequence_number)?;
516
517        let wire = request.bytes();
518        let keys: Vec<Key> = request.keys.iter().map(|key| wire.slice_ref(key)).collect();
519        let (entries, extra) = self
520            .state
521            .query
522            .get_many(keys)
523            .await
524            .map_err(ConnectError::internal)?;
525        let detail = query_detail(sequence_number, extra);
526        let batch_size = (request.batch_size as usize).min(RANGE_STREAM_MAX_FRAME_ROWS);
527        let mut frames = Vec::new();
528        let mut chunk = Vec::new();
529        for (key, value) in entries {
530            chunk.push(GetManyEntry {
531                key: key.to_vec(),
532                value,
533                ..Default::default()
534            });
535            if chunk.len() >= batch_size {
536                frames.push(Ok(GetManyFrame {
537                    results: std::mem::take(&mut chunk),
538                    detail: Some(detail.clone()).into(),
539                    ..Default::default()
540                }));
541            }
542        }
543        if !chunk.is_empty() {
544            frames.push(Ok(GetManyFrame {
545                results: chunk,
546                detail: Some(detail).into(),
547                ..Default::default()
548            }));
549        } else if frames.is_empty() {
550            frames.push(Ok(GetManyFrame {
551                detail: Some(detail).into(),
552                ..Default::default()
553            }));
554        }
555
556        Ok(connectrpc::Response::stream(stream_util::iter(frames)))
557    }
558
559    async fn range(
560        &self,
561        _ctx: Context,
562        request: buffa::view::OwnedView<exoware_proto::store::query::v1::RangeRequestView<'static>>,
563    ) -> connectrpc::ServiceResult<connectrpc::ServiceStream<RangeFrame>> {
564        validate::validate_range_request(&request)?;
565        let sequence_number = self.ensure_min_sequence_number(request.min_sequence_number)?;
566        let wire = request.bytes();
567        let start_key: Key = wire.slice_ref(request.start);
568        let end_key: Key = wire.slice_ref(request.end);
569        let limit = request.limit.map(|v| v as usize).unwrap_or(usize::MAX);
570        let batch_size = (request.batch_size as usize).min(RANGE_STREAM_MAX_FRAME_ROWS);
571        let forward = match parse_range_traversal_direction(request.mode) {
572            Ok(RangeTraversalDirection::Forward) => true,
573            Ok(RangeTraversalDirection::Reverse) => false,
574            Err(e) => return Err(ConnectError::internal(format!("traversal mode: {e:?}"))),
575        };
576        Ok(connectrpc::Response::stream(
577            range_stream(
578                self.state.query.clone(),
579                RangeStreamRequest {
580                    start_key,
581                    end_key,
582                    limit,
583                    batch_size,
584                    forward,
585                    sequence_number,
586                },
587            )
588            .await?,
589        ))
590    }
591
592    async fn reduce(
593        &self,
594        _ctx: Context,
595        request: buffa::view::OwnedView<
596            exoware_proto::store::query::v1::ReduceRequestView<'static>,
597        >,
598    ) -> connectrpc::ServiceResult<ReduceResponse> {
599        validate::validate_reduce_request(&request)?;
600        let token = self.ensure_min_sequence_number(request.min_sequence_number)?;
601        let wire = request.bytes();
602        let start_key: Key = wire.slice_ref(request.start);
603        let end_key: Key = wire.slice_ref(request.end);
604        let domain = to_domain_reduce_request_from_view(&request.params)
605            .map_err(validate::reduce_params_error)?;
606
607        let mut rows = self
608            .state
609            .query
610            .range_scan(start_key, end_key, usize::MAX, true)
611            .await
612            .map_err(ConnectError::internal)?;
613
614        let mut reducer = RangeReducer::new(&domain)
615            .map_err(|e: crate::RangeError| ConnectError::internal(e.to_string()))?;
616        let mut latest_extra = None;
617        let final_extra = loop {
618            let batch = rows
619                .next_batch(REDUCE_SCAN_BATCH_SIZE)
620                .await
621                .map_err(ConnectError::internal)?;
622            if batch.rows.is_empty() {
623                break if batch.extra.is_empty() {
624                    latest_extra.unwrap_or_default()
625                } else {
626                    batch.extra
627                };
628            }
629            latest_extra = Some(batch.extra);
630            for (key, value) in batch.rows {
631                reducer
632                    .update(&key, &value)
633                    .map_err(|e: crate::RangeError| ConnectError::internal(e.to_string()))?;
634            }
635        };
636        let response = reducer.finish();
637
638        let detail = query_detail(token, final_extra);
639
640        connectrpc::Response::ok(ReduceResponse {
641            results: response
642                .results
643                .into_iter()
644                .map(|result| exoware_proto::query::RangeReduceResult {
645                    value: result.value.map(to_proto_reduced_value).into(),
646                    ..Default::default()
647                })
648                .collect(),
649            groups: response
650                .groups
651                .into_iter()
652                .map(|group| {
653                    let group_values_present =
654                        group.group_values.iter().map(Option::is_some).collect();
655                    exoware_proto::query::RangeReduceGroup {
656                        group_values: group
657                            .group_values
658                            .into_iter()
659                            .map(to_proto_optional_reduced_value)
660                            .collect(),
661                        group_values_present,
662                        results: group
663                            .results
664                            .into_iter()
665                            .map(|result| exoware_proto::query::RangeReduceResult {
666                                value: result.value.map(to_proto_reduced_value).into(),
667                                ..Default::default()
668                            })
669                            .collect(),
670                        ..Default::default()
671                    }
672                })
673                .collect(),
674            detail: Some(detail).into(),
675            ..Default::default()
676        })
677    }
678}
679
680pub struct CompactConnect<P> {
681    state: CompactState<P>,
682}
683
684impl<P> Clone for CompactConnect<P> {
685    fn clone(&self) -> Self {
686        Self {
687            state: self.state.clone(),
688        }
689    }
690}
691
692impl<P> CompactConnect<P>
693where
694    P: Prune,
695{
696    pub fn new(state: impl Into<CompactState<P>>) -> Self {
697        Self {
698            state: state.into(),
699        }
700    }
701}
702
703impl<P> CompactApi for CompactConnect<P>
704where
705    P: Prune,
706{
707    async fn prune(
708        &self,
709        _ctx: Context,
710        request: buffa::view::OwnedView<
711            exoware_proto::store::compact::v1::PruneRequestView<'static>,
712        >,
713    ) -> connectrpc::ServiceResult<PruneResponse> {
714        validate::validate_prune_request(&request)?;
715        let document = exoware_proto::parse_and_validate_policy_document(&request)
716            .map_err(|e| ConnectError::invalid_argument(e.to_string()))?;
717
718        self.state
719            .prune
720            .apply_prune_policies(document)
721            .await
722            .map_err(ConnectError::internal)?;
723        connectrpc::Response::ok(PruneResponse::default())
724    }
725}
726
727pub struct StreamConnect<B> {
728    state: StreamState<B>,
729}
730
731impl<B> Clone for StreamConnect<B> {
732    fn clone(&self) -> Self {
733        Self {
734            state: self.state.clone(),
735        }
736    }
737}
738
739impl<B> StreamConnect<B>
740where
741    B: Log,
742{
743    pub fn new(state: impl Into<StreamState<B>>) -> Self {
744        Self {
745            state: state.into(),
746        }
747    }
748
749    fn batch_evicted_connect_error(oldest_retained: Option<u64>) -> ConnectError {
750        let mut metadata = HashMap::new();
751        if let Some(v) = oldest_retained {
752            metadata.insert(
753                crate::stream::METADATA_OLDEST_RETAINED.to_string(),
754                v.to_string(),
755            );
756        }
757        with_error_info_detail(
758            ConnectError::out_of_range("batch has been evicted from the log"),
759            ErrorInfo {
760                reason: crate::stream::REASON_BATCH_EVICTED.to_string(),
761                domain: crate::stream::STREAM_ERROR_DOMAIN.to_string(),
762                metadata,
763                ..Default::default()
764            },
765        )
766    }
767
768    fn batch_evicted_error(&self, oldest_retained: Option<u64>) -> ConnectError {
769        Self::batch_evicted_connect_error(oldest_retained)
770    }
771
772    fn batch_not_found_error(&self) -> ConnectError {
773        with_error_info_detail(
774            ConnectError::not_found("batch not found"),
775            ErrorInfo {
776                reason: crate::stream::REASON_BATCH_NOT_FOUND.to_string(),
777                domain: crate::stream::STREAM_ERROR_DOMAIN.to_string(),
778                ..Default::default()
779            },
780        )
781    }
782}
783
784fn filtered_subscribe_response(
785    seq: u64,
786    kvs: &[(Bytes, Bytes)],
787    matchers: &crate::stream::CompiledMatchers,
788) -> Option<SubscribeResponse> {
789    let entries = crate::stream::apply_filter(matchers, kvs);
790    (!entries.is_empty()).then_some(SubscribeResponse {
791        sequence_number: seq,
792        entries,
793        ..Default::default()
794    })
795}
796
797struct ReplayState {
798    next_sequence: u64,
799    bound: u64,
800    first_batch: Option<Vec<(Bytes, Bytes)>>,
801}
802
803enum ReplayProgress {
804    Frame(SubscribeResponse),
805    Advanced,
806    Done,
807}
808
809enum LiveProgress {
810    Frame(SubscribeResponse),
811    Advanced,
812    NeedWait,
813}
814
815struct SubscriptionState<B> {
816    state: StreamState<B>,
817    matchers: crate::stream::CompiledMatchers,
818    replay: Option<ReplayState>,
819    next_live_sequence: u64,
820    live_notify: Arc<Notify>,
821    terminated: bool,
822}
823
824impl<B> SubscriptionState<B>
825where
826    B: Log,
827{
828    fn new(
829        state: StreamState<B>,
830        matchers: crate::stream::CompiledMatchers,
831        replay: Option<ReplayState>,
832        next_live_sequence: u64,
833        live_notify: Arc<Notify>,
834    ) -> Self {
835        Self {
836            state,
837            matchers,
838            replay,
839            next_live_sequence,
840            live_notify,
841            terminated: false,
842        }
843    }
844
845    fn into_stream(
846        self,
847    ) -> Pin<Box<dyn Stream<Item = Result<SubscribeResponse, ConnectError>> + Send>> {
848        Box::pin(stream_util::unfold(self, |mut state| async move {
849            loop {
850                if state.terminated {
851                    return None;
852                }
853
854                if state.replay.is_some() {
855                    match state.next_replay_frame().await {
856                        Ok(ReplayProgress::Frame(frame)) => return Some((Ok(frame), state)),
857                        Ok(ReplayProgress::Advanced) => continue,
858                        Ok(ReplayProgress::Done) => {}
859                        Err(err) => {
860                            state.terminated = true;
861                            return Some((Err(err), state));
862                        }
863                    }
864                }
865
866                match state.next_live_frame().await {
867                    Ok(LiveProgress::Frame(frame)) => return Some((Ok(frame), state)),
868                    Ok(LiveProgress::Advanced) => continue,
869                    Ok(LiveProgress::NeedWait) => {
870                        state.wait_for_live().await;
871                    }
872                    Err(err) => {
873                        state.terminated = true;
874                        return Some((Err(err), state));
875                    }
876                }
877            }
878        }))
879    }
880
881    async fn next_replay_frame(&mut self) -> Result<ReplayProgress, ConnectError> {
882        let Some(replay) = &mut self.replay else {
883            return Ok(ReplayProgress::Done);
884        };
885        let seq = replay.next_sequence;
886        let kvs = if let Some(first_batch) = replay.first_batch.take() {
887            Some(first_batch)
888        } else {
889            self.state
890                .log
891                .get_batch(seq)
892                .await
893                .map_err(ConnectError::internal)?
894        };
895        replay.next_sequence += 1;
896        if replay.next_sequence > replay.bound {
897            self.replay = None;
898        }
899        let Some(kvs) = kvs else {
900            let oldest = self
901                .state
902                .log
903                .oldest_retained_batch()
904                .await
905                .map_err(ConnectError::internal)?;
906            return Err(StreamConnect::<B>::batch_evicted_connect_error(oldest));
907        };
908        Ok(
909            match filtered_subscribe_response(seq, &kvs, &self.matchers) {
910                Some(frame) => ReplayProgress::Frame(frame),
911                None => ReplayProgress::Advanced,
912            },
913        )
914    }
915
916    async fn next_live_frame(&mut self) -> Result<LiveProgress, ConnectError> {
917        let current = self.state.notifier.current_sequence();
918        if self.next_live_sequence > current {
919            return Ok(LiveProgress::NeedWait);
920        }
921        let seq = self.next_live_sequence;
922        self.next_live_sequence += 1;
923        let kvs = self
924            .state
925            .log
926            .get_batch(seq)
927            .await
928            .map_err(ConnectError::internal)?;
929        let Some(kvs) = kvs else {
930            let oldest = self
931                .state
932                .log
933                .oldest_retained_batch()
934                .await
935                .map_err(ConnectError::internal)?;
936            return Err(StreamConnect::<B>::batch_evicted_connect_error(oldest));
937        };
938        Ok(
939            match filtered_subscribe_response(seq, &kvs, &self.matchers) {
940                Some(frame) => LiveProgress::Frame(frame),
941                None => LiveProgress::Advanced,
942            },
943        )
944    }
945
946    async fn wait_for_live(&self) {
947        if self.next_live_sequence <= self.state.notifier.current_sequence() {
948            return;
949        }
950        let notified = self.live_notify.clone().notified_owned();
951        if self.next_live_sequence <= self.state.notifier.current_sequence() {
952            return;
953        }
954        notified.await;
955    }
956}
957
958fn domain_filter_from_subscribe_view(
959    req: &SubscribeRequestView<'_>,
960) -> Result<StreamFilter, ConnectError> {
961    let mut match_keys = Vec::with_capacity(req.match_keys.len());
962    for mk in req.match_keys.iter() {
963        let reserved_bits = u8::try_from(mk.reserved_bits).map_err(|_| {
964            ConnectError::invalid_argument(format!(
965                "match_key.reserved_bits {} does not fit in u8",
966                mk.reserved_bits
967            ))
968        })?;
969        let prefix = u16::try_from(mk.prefix).map_err(|_| {
970            ConnectError::invalid_argument(format!(
971                "match_key.prefix {} does not fit in u16",
972                mk.prefix
973            ))
974        })?;
975        match_keys.push(MatchKey {
976            reserved_bits,
977            prefix,
978            payload_regex: exoware_sdk::kv_codec::Utf8::from(mk.payload_regex),
979        });
980    }
981    let mut value_filters = Vec::with_capacity(req.value_filters.len());
982    for vf in req.value_filters.iter() {
983        value_filters.push(match vf.kind {
984            Some(ProtoBytesFilterKindView::Exact(bytes)) => {
985                BytesFilter::Exact(Bytes::copy_from_slice(bytes))
986            }
987            Some(ProtoBytesFilterKindView::Prefix(bytes)) => {
988                BytesFilter::Prefix(Bytes::copy_from_slice(bytes))
989            }
990            Some(ProtoBytesFilterKindView::Regex(pattern)) => {
991                BytesFilter::Regex(pattern.to_string())
992            }
993            None => {
994                return Err(ConnectError::invalid_argument(
995                    "each value_filter must set exactly one of exact, prefix, or regex",
996                ))
997            }
998        });
999    }
1000    Ok(StreamFilter {
1001        match_keys,
1002        value_filters,
1003    })
1004}
1005
1006impl<B> StreamApi for StreamConnect<B>
1007where
1008    B: Log,
1009{
1010    async fn subscribe(
1011        &self,
1012        _ctx: Context,
1013        request: buffa::view::OwnedView<SubscribeRequestView<'static>>,
1014    ) -> connectrpc::ServiceResult<connectrpc::ServiceStream<SubscribeResponse>> {
1015        let filter = domain_filter_from_subscribe_view(&request)?;
1016        let since = request.since_sequence_number;
1017
1018        // Snapshot the current published frontier and subscribe for future
1019        // wakeups. The stream then walks the log by sequence cursor, so
1020        // live delivery is paced by client reads instead of server-side
1021        // buffering.
1022        let matchers = crate::stream::compile_matchers(&filter)?;
1023        let subscription = self.state.notifier.subscribe();
1024        let replay_bound = subscription.current_sequence;
1025        let live_notify = subscription.notify;
1026
1027        // Optional replay. Validate the starting batch eagerly so an
1028        // already-evicted cursor fails the RPC immediately; later replay holes
1029        // are surfaced on the stream itself so callers reconnect from a safe
1030        // point instead of silently continuing.
1031        let replay = match since {
1032            Some(s) if s <= replay_bound && s > 0 => {
1033                let first_batch = self
1034                    .state
1035                    .log
1036                    .get_batch(s)
1037                    .await
1038                    .map_err(ConnectError::internal)?;
1039                let Some(first_batch) = first_batch else {
1040                    let oldest = self
1041                        .state
1042                        .log
1043                        .oldest_retained_batch()
1044                        .await
1045                        .map_err(ConnectError::internal)?;
1046                    return Err(self.batch_evicted_error(oldest));
1047                };
1048                Some(ReplayState {
1049                    next_sequence: s,
1050                    bound: replay_bound,
1051                    first_batch: Some(first_batch),
1052                })
1053            }
1054            _ => None,
1055        };
1056        let next_live_sequence = replay_bound.saturating_add(1);
1057
1058        Ok(connectrpc::Response::stream(
1059            SubscriptionState::new(
1060                self.state.clone(),
1061                matchers,
1062                replay,
1063                next_live_sequence,
1064                live_notify,
1065            )
1066            .into_stream(),
1067        ))
1068    }
1069
1070    async fn get(
1071        &self,
1072        _ctx: Context,
1073        request: buffa::view::OwnedView<GetRequestView<'static>>,
1074    ) -> connectrpc::ServiceResult<StreamGetResponse> {
1075        let seq = request.sequence_number;
1076        match self
1077            .state
1078            .log
1079            .get_batch(seq)
1080            .await
1081            .map_err(ConnectError::internal)?
1082        {
1083            Some(kvs) => {
1084                let entries = kvs
1085                    .into_iter()
1086                    .map(|(k, v)| KvEntry {
1087                        key: k.to_vec(),
1088                        value: v,
1089                        ..Default::default()
1090                    })
1091                    .collect();
1092                connectrpc::Response::ok(StreamGetResponse {
1093                    sequence_number: seq,
1094                    entries,
1095                    ..Default::default()
1096                })
1097            }
1098            None => {
1099                let current = self.state.log.current_sequence();
1100                // Distinguish "never existed" (seq > current) vs "evicted".
1101                if seq > current {
1102                    Err(self.batch_not_found_error())
1103                } else {
1104                    let oldest = self
1105                        .state
1106                        .log
1107                        .oldest_retained_batch()
1108                        .await
1109                        .map_err(ConnectError::internal)?;
1110                    Err(self.batch_evicted_error(oldest))
1111                }
1112            }
1113        }
1114    }
1115}
1116
1117fn connect_limits() -> Limits {
1118    Limits::default()
1119        .max_request_body_size(MAX_CONNECTRPC_BODY_BYTES)
1120        .max_message_size(MAX_CONNECTRPC_BODY_BYTES)
1121}
1122
1123pub(crate) type IngestService<I> = ConnectRpcService<IngestServiceServer<IngestConnect<I>>>;
1124pub(crate) type QueryService<Q> = ConnectRpcService<QueryServiceServer<QueryConnect<Q>>>;
1125pub(crate) type CompactService<P> = ConnectRpcService<CompactServiceServer<CompactConnect<P>>>;
1126pub(crate) type StreamService<B> = ConnectRpcService<StreamServiceServer<StreamConnect<B>>>;
1127pub(crate) type QueryStack<Q, B> = ConnectRpcService<
1128    Chain<QueryServiceServer<QueryConnect<Q>>, StreamServiceServer<StreamConnect<B>>>,
1129>;
1130pub(crate) type ConnectStack<I, Q, P, B> = ConnectRpcService<
1131    Chain<
1132        IngestServiceServer<IngestConnect<I>>,
1133        Chain<
1134            QueryServiceServer<QueryConnect<Q>>,
1135            Chain<CompactServiceServer<CompactConnect<P>>, StreamServiceServer<StreamConnect<B>>>,
1136        >,
1137    >,
1138>;
1139
1140fn ingest_server<I>(state: IngestState<I>) -> IngestServiceServer<IngestConnect<I>>
1141where
1142    I: Ingest,
1143{
1144    IngestServiceServer::new(IngestConnect::new(state))
1145}
1146
1147fn query_server<Q>(state: QueryState<Q>) -> QueryServiceServer<QueryConnect<Q>>
1148where
1149    Q: Query,
1150{
1151    QueryServiceServer::new(QueryConnect::new(state))
1152}
1153
1154fn compact_server<P>(state: CompactState<P>) -> CompactServiceServer<CompactConnect<P>>
1155where
1156    P: Prune,
1157{
1158    CompactServiceServer::new(CompactConnect::new(state))
1159}
1160
1161fn stream_server<B>(state: StreamState<B>) -> StreamServiceServer<StreamConnect<B>>
1162where
1163    B: Log,
1164{
1165    StreamServiceServer::new(StreamConnect::new(state))
1166}
1167
1168pub fn ingest_service<I>(state: IngestState<I>) -> IngestService<I>
1169where
1170    I: Ingest,
1171{
1172    ConnectRpcService::new(ingest_server(state))
1173        .with_limits(connect_limits())
1174        .with_compression(connect_compression_registry())
1175}
1176
1177pub fn query_service<Q>(state: QueryState<Q>) -> QueryService<Q>
1178where
1179    Q: Query,
1180{
1181    ConnectRpcService::new(query_server(state))
1182        .with_limits(connect_limits())
1183        .with_compression(connect_compression_registry())
1184}
1185
1186pub fn compact_service<P>(state: CompactState<P>) -> CompactService<P>
1187where
1188    P: Prune,
1189{
1190    ConnectRpcService::new(compact_server(state))
1191        .with_limits(connect_limits())
1192        .with_compression(connect_compression_registry())
1193}
1194
1195pub fn stream_service<B>(state: StreamState<B>) -> StreamService<B>
1196where
1197    B: Log,
1198{
1199    ConnectRpcService::new(stream_server(state))
1200        .with_limits(connect_limits())
1201        .with_compression(connect_compression_registry())
1202}
1203
1204pub fn query_stack<Q, B>(
1205    query_state: QueryState<Q>,
1206    stream_state: StreamState<B>,
1207) -> QueryStack<Q, B>
1208where
1209    Q: Query,
1210    B: Log,
1211{
1212    ConnectRpcService::new(Chain(
1213        query_server(query_state),
1214        stream_server(stream_state),
1215    ))
1216    .with_limits(connect_limits())
1217    .with_compression(connect_compression_registry())
1218}
1219
1220pub fn connect_stack<E>(state: AppState<E>) -> ConnectStack<E, E, E, E>
1221where
1222    E: StoreEngine,
1223{
1224    ConnectRpcService::new(Chain(
1225        ingest_server(state.clone().into()),
1226        Chain(
1227            query_server(state.clone().into()),
1228            Chain(
1229                compact_server(state.clone().into()),
1230                stream_server(state.into()),
1231            ),
1232        ),
1233    ))
1234    .with_limits(connect_limits())
1235    .with_compression(connect_compression_registry())
1236}
1237
1238#[cfg(test)]
1239mod tests {
1240    use super::*;
1241    use std::collections::{BTreeMap, HashMap};
1242    use std::sync::atomic::AtomicU64;
1243    use std::sync::Mutex;
1244    use std::time::Duration;
1245
1246    use buffa::Message;
1247    use exoware_proto::store::common::v1::MatchKey as ProtoMatchKey;
1248    use exoware_proto::store::compact::v1::{
1249        policy, policy_retain, Policy as ProtoPolicy, PolicyRetain, PruneRequest, PruneRequestView,
1250        RetainKeepLatest,
1251    };
1252    use exoware_proto::store::stream::v1::{SubscribeRequest, SubscribeRequestView};
1253    use exoware_sdk::keys::KeyCodec;
1254    use exoware_sdk::kv_codec::KvReducedValue;
1255    use exoware_sdk::prune_policy::{PrunePolicyDocument, PRUNE_POLICY_DOCUMENT_VERSION};
1256    use exoware_sdk::{decode_connect_error, to_domain_reduce_response};
1257    use futures::StreamExt;
1258
1259    use crate::{
1260        Ingest, Log, Prune, Query, QueryExtra, RangeScan, RangeScanBatch, Sequence,
1261        StreamNotification, StreamNotifier,
1262    };
1263
1264    const TEST_RESERVED_BITS: u8 = 4;
1265    const TEST_PREFIX: u16 = 1;
1266
1267    #[derive(Clone)]
1268    struct PublishDuringReplay {
1269        hub: Arc<StreamHub>,
1270        sequence_offset: u64,
1271        kvs: Vec<(Bytes, Bytes)>,
1272    }
1273
1274    #[derive(Default)]
1275    struct FakeEngineState {
1276        current_sequence: u64,
1277        batches: BTreeMap<u64, Option<Vec<(Bytes, Bytes)>>>,
1278        oldest_retained: Option<u64>,
1279        publish_on_get_batch: Option<PublishDuringReplay>,
1280        range_rows: Vec<(Bytes, Bytes)>,
1281        range_eof_extra: QueryExtra,
1282        range_next_count: usize,
1283        query_extra: QueryExtra,
1284        prune_policy_counts: Vec<usize>,
1285    }
1286
1287    #[derive(Default)]
1288    struct FakeEngine {
1289        state: Arc<Mutex<FakeEngineState>>,
1290    }
1291
1292    struct IteratorRangeScan {
1293        iter: Box<dyn Iterator<Item = Result<(Bytes, Bytes), String>> + Send + 'static>,
1294        eof_extra: Option<QueryExtra>,
1295    }
1296
1297    impl RangeScan for IteratorRangeScan {
1298        async fn next_batch(&mut self, max_items: usize) -> Result<RangeScanBatch, String> {
1299            let mut rows = Vec::new();
1300            for row in self.iter.by_ref().take(max_items) {
1301                rows.push(row?);
1302            }
1303            let extra = if rows.is_empty() {
1304                self.eof_extra.take().unwrap_or_default()
1305            } else {
1306                QueryExtra::default()
1307            };
1308            Ok(RangeScanBatch { rows, extra })
1309        }
1310    }
1311
1312    fn range_scan_from_iter<I>(iter: I) -> IteratorRangeScan
1313    where
1314        I: Iterator<Item = Result<(Bytes, Bytes), String>> + Send + 'static,
1315    {
1316        range_scan_from_iter_with_eof_extra(iter, QueryExtra::default())
1317    }
1318
1319    fn range_scan_from_iter_with_eof_extra<I>(iter: I, eof_extra: QueryExtra) -> IteratorRangeScan
1320    where
1321        I: Iterator<Item = Result<(Bytes, Bytes), String>> + Send + 'static,
1322    {
1323        IteratorRangeScan {
1324            iter: Box::new(iter),
1325            eof_extra: Some(eof_extra),
1326        }
1327    }
1328
1329    impl FakeEngine {
1330        fn set_current_sequence(&self, sequence_number: u64) {
1331            self.state.lock().expect("lock").current_sequence = sequence_number;
1332        }
1333
1334        fn set_batch(&self, sequence_number: u64, kvs: Option<Vec<(Bytes, Bytes)>>) {
1335            self.state
1336                .lock()
1337                .expect("lock")
1338                .batches
1339                .insert(sequence_number, kvs);
1340        }
1341
1342        fn set_oldest_retained(&self, oldest_retained: Option<u64>) {
1343            self.state.lock().expect("lock").oldest_retained = oldest_retained;
1344        }
1345
1346        fn publish_live(
1347            &self,
1348            hub: Arc<StreamHub>,
1349            sequence_number: u64,
1350            kvs: Vec<(Bytes, Bytes)>,
1351        ) {
1352            let mut state = self.state.lock().expect("lock");
1353            state.current_sequence = state.current_sequence.max(sequence_number);
1354            state.batches.insert(sequence_number, Some(kvs.clone()));
1355            drop(state);
1356            hub.publish(sequence_number);
1357        }
1358
1359        fn publish_on_every_get_batch(
1360            &self,
1361            hub: Arc<StreamHub>,
1362            sequence_offset: u64,
1363            kvs: Vec<(Bytes, Bytes)>,
1364        ) {
1365            self.state.lock().expect("lock").publish_on_get_batch = Some(PublishDuringReplay {
1366                hub,
1367                sequence_offset,
1368                kvs,
1369            });
1370        }
1371
1372        fn set_range_rows(&self, rows: Vec<(Bytes, Bytes)>) {
1373            self.state.lock().expect("lock").range_rows = rows;
1374        }
1375
1376        fn set_range_eof_extra(&self, extra: QueryExtra) {
1377            self.state.lock().expect("lock").range_eof_extra = extra;
1378        }
1379
1380        fn range_next_count(&self) -> usize {
1381            self.state.lock().expect("lock").range_next_count
1382        }
1383
1384        fn set_query_extra(&self, extra: QueryExtra) {
1385            self.state.lock().expect("lock").query_extra = extra;
1386        }
1387    }
1388
1389    impl Sequence for FakeEngine {
1390        fn current_sequence(&self) -> u64 {
1391            self.state.lock().expect("lock").current_sequence
1392        }
1393    }
1394
1395    impl Ingest for FakeEngine {
1396        async fn put_batch(&self, kvs: Vec<(Bytes, Bytes)>) -> Result<u64, String> {
1397            let mut state = self.state.lock().map_err(|e| e.to_string())?;
1398            state.current_sequence += 1;
1399            let seq = state.current_sequence;
1400            state.batches.insert(seq, Some(kvs));
1401            Ok(seq)
1402        }
1403    }
1404
1405    impl Query for FakeEngine {
1406        type RangeScan = IteratorRangeScan;
1407
1408        async fn get(&self, _key: Bytes) -> Result<(Option<Bytes>, QueryExtra), String> {
1409            self.state
1410                .lock()
1411                .map(|state| (None, state.query_extra.clone()))
1412                .map_err(|e| e.to_string())
1413        }
1414
1415        async fn get_many(
1416            &self,
1417            keys: Vec<Bytes>,
1418        ) -> Result<(Vec<(Bytes, Option<Bytes>)>, QueryExtra), String> {
1419            self.state
1420                .lock()
1421                .map(|state| {
1422                    let entries = keys.into_iter().map(|key| (key, None)).collect();
1423                    (entries, state.query_extra.clone())
1424                })
1425                .map_err(|e| e.to_string())
1426        }
1427
1428        async fn range_scan(
1429            &self,
1430            _start: Bytes,
1431            _end: Bytes,
1432            _limit: usize,
1433            _forward: bool,
1434        ) -> Result<Self::RangeScan, String> {
1435            let result = self
1436                .state
1437                .lock()
1438                .map(|state| (state.range_rows.clone(), state.range_eof_extra.clone()))
1439                .map_err(|e| e.to_string());
1440            let state = self.state.clone();
1441            let cursor = result.map(|(rows, eof_extra)| {
1442                range_scan_from_iter_with_eof_extra(
1443                    rows.into_iter().map(move |row| {
1444                        state.lock().expect("lock").range_next_count += 1;
1445                        Ok(row)
1446                    }),
1447                    eof_extra,
1448                )
1449            });
1450            cursor
1451        }
1452    }
1453
1454    impl Prune for FakeEngine {
1455        async fn apply_prune_policies(&self, document: PrunePolicyDocument) -> Result<(), String> {
1456            self.state
1457                .lock()
1458                .map(|mut state| {
1459                    state.prune_policy_counts.push(document.policies.len());
1460                })
1461                .map_err(|e| e.to_string())
1462        }
1463    }
1464
1465    impl Log for FakeEngine {
1466        async fn get_batch(
1467            &self,
1468            sequence_number: u64,
1469        ) -> Result<Option<Vec<(Bytes, Bytes)>>, String> {
1470            let result: Result<_, String> = (|| {
1471                let mut state = self.state.lock().map_err(|e| e.to_string())?;
1472                let publish = state.publish_on_get_batch.clone();
1473                if let Some(publish) = publish.as_ref() {
1474                    let live_sequence = publish.sequence_offset + sequence_number;
1475                    state.current_sequence = state.current_sequence.max(live_sequence);
1476                    state
1477                        .batches
1478                        .entry(live_sequence)
1479                        .or_insert_with(|| Some(publish.kvs.clone()));
1480                }
1481                Ok((
1482                    publish,
1483                    state.batches.get(&sequence_number).cloned().unwrap_or(None),
1484                ))
1485            })();
1486            let (publish, batch) = result?;
1487            if let Some(publish) = publish {
1488                publish
1489                    .hub
1490                    .publish(publish.sequence_offset + sequence_number);
1491            }
1492            Ok(batch)
1493        }
1494
1495        async fn oldest_retained_batch(&self) -> Result<Option<u64>, String> {
1496            self.state
1497                .lock()
1498                .map(|state| state.oldest_retained)
1499                .map_err(|e| e.to_string())
1500        }
1501    }
1502
1503    struct QueryOnlyEngine {
1504        sequence_number: u64,
1505        value: Option<Bytes>,
1506    }
1507
1508    impl Sequence for QueryOnlyEngine {
1509        fn current_sequence(&self) -> u64 {
1510            self.sequence_number
1511        }
1512    }
1513
1514    impl Query for QueryOnlyEngine {
1515        type RangeScan = IteratorRangeScan;
1516
1517        async fn get(&self, _key: Bytes) -> Result<(Option<Bytes>, QueryExtra), String> {
1518            Ok((self.value.clone(), QueryExtra::default()))
1519        }
1520
1521        async fn range_scan(
1522            &self,
1523            _start: Bytes,
1524            _end: Bytes,
1525            _limit: usize,
1526            _forward: bool,
1527        ) -> Result<Self::RangeScan, String> {
1528            Ok(range_scan_from_iter(std::iter::empty()))
1529        }
1530
1531        async fn get_many(
1532            &self,
1533            keys: Vec<Bytes>,
1534        ) -> Result<(Vec<(Bytes, Option<Bytes>)>, QueryExtra), String> {
1535            Ok((
1536                keys.into_iter().map(|key| (key, None)).collect(),
1537                QueryExtra::default(),
1538            ))
1539        }
1540    }
1541
1542    #[derive(Default)]
1543    struct PruneOnlyEngine {
1544        documents: Mutex<Vec<(u32, usize)>>,
1545    }
1546
1547    impl PruneOnlyEngine {
1548        fn applied_count(&self) -> usize {
1549            self.documents.lock().expect("lock").len()
1550        }
1551
1552        fn last_document(&self) -> Option<(u32, usize)> {
1553            self.documents.lock().expect("lock").last().copied()
1554        }
1555    }
1556
1557    impl Prune for PruneOnlyEngine {
1558        async fn apply_prune_policies(&self, document: PrunePolicyDocument) -> Result<(), String> {
1559            self.documents
1560                .lock()
1561                .map(|mut documents| {
1562                    documents.push((document.version, document.policies.len()));
1563                })
1564                .map_err(|e| e.to_string())
1565        }
1566    }
1567
1568    struct ManualNotifier {
1569        current_sequence: AtomicU64,
1570        notify: Arc<Notify>,
1571    }
1572
1573    impl ManualNotifier {
1574        fn new(current_sequence: u64) -> Self {
1575            Self {
1576                current_sequence: AtomicU64::new(current_sequence),
1577                notify: Arc::new(Notify::new()),
1578            }
1579        }
1580    }
1581
1582    impl StreamNotifier for ManualNotifier {
1583        fn subscribe(&self) -> StreamNotification {
1584            StreamNotification {
1585                current_sequence: self.current_sequence.load(Ordering::Acquire),
1586                notify: self.notify.clone(),
1587            }
1588        }
1589
1590        fn current_sequence(&self) -> u64 {
1591            self.current_sequence.load(Ordering::Acquire)
1592        }
1593
1594        fn advance(&self, seq: u64) {
1595            self.current_sequence.fetch_max(seq, Ordering::SeqCst);
1596            self.notify.notify_waiters();
1597        }
1598    }
1599
1600    fn matching_kv(payload: &[u8], value: &[u8]) -> (Bytes, Bytes) {
1601        let codec = KeyCodec::new(TEST_RESERVED_BITS, TEST_PREFIX);
1602        let key = codec.encode(payload).expect("encode key");
1603        (
1604            Bytes::copy_from_slice(key.as_ref()),
1605            Bytes::copy_from_slice(value),
1606        )
1607    }
1608
1609    fn numeric_query_extra(name: &str, value: f64) -> QueryExtra {
1610        HashMap::from([(
1611            name.to_string(),
1612            buffa_types::google::protobuf::Value::from(value),
1613        )])
1614    }
1615
1616    fn subscribe_request_bytes(since_sequence_number: Option<u64>) -> Vec<u8> {
1617        SubscribeRequest {
1618            match_keys: vec![ProtoMatchKey {
1619                reserved_bits: u32::from(TEST_RESERVED_BITS),
1620                prefix: u32::from(TEST_PREFIX),
1621                payload_regex: "(?s).*".to_string(),
1622                ..Default::default()
1623            }],
1624            since_sequence_number,
1625            ..Default::default()
1626        }
1627        .encode_to_vec()
1628    }
1629
1630    fn put_request(
1631        value_len: usize,
1632    ) -> buffa::view::OwnedView<exoware_proto::store::ingest::v1::PutRequestView<'static>> {
1633        let bytes = exoware_proto::ingest::PutRequest {
1634            kvs: vec![exoware_proto::common::KvEntry {
1635                key: b"k".to_vec(),
1636                value: Bytes::from(vec![1u8; value_len]),
1637                ..Default::default()
1638            }],
1639            ..Default::default()
1640        }
1641        .encode_to_vec();
1642        buffa::view::OwnedView::<exoware_proto::store::ingest::v1::PutRequestView<'static>>::decode(
1643            bytes.into(),
1644        )
1645        .expect("decode put request")
1646    }
1647
1648    fn sequence_drop_all_policy() -> ProtoPolicy {
1649        ProtoPolicy {
1650            scope: Some(policy::Scope::Sequence(Box::default())),
1651            retain: Some(PolicyRetain {
1652                kind: Some(policy_retain::Kind::DropAll(Box::default())),
1653                ..Default::default()
1654            })
1655            .into(),
1656            ..Default::default()
1657        }
1658    }
1659
1660    fn sequence_keep_latest_policy(count: u64) -> ProtoPolicy {
1661        ProtoPolicy {
1662            scope: Some(policy::Scope::Sequence(Box::default())),
1663            retain: Some(PolicyRetain {
1664                kind: Some(policy_retain::Kind::KeepLatest(Box::new(
1665                    RetainKeepLatest {
1666                        count,
1667                        ..Default::default()
1668                    },
1669                ))),
1670                ..Default::default()
1671            })
1672            .into(),
1673            ..Default::default()
1674        }
1675    }
1676
1677    fn prune_request(
1678        policies: Vec<ProtoPolicy>,
1679    ) -> buffa::view::OwnedView<PruneRequestView<'static>> {
1680        let bytes = PruneRequest {
1681            policies,
1682            ..Default::default()
1683        }
1684        .encode_to_vec();
1685        buffa::view::OwnedView::<PruneRequestView<'static>>::decode(bytes.into())
1686            .expect("decode prune request")
1687    }
1688
1689    async fn subscribe_stream<B>(
1690        connect: &StreamConnect<B>,
1691        since_sequence_number: Option<u64>,
1692    ) -> Result<
1693        Pin<Box<dyn Stream<Item = Result<SubscribeResponse, ConnectError>> + Send>>,
1694        ConnectError,
1695    >
1696    where
1697        B: Log,
1698    {
1699        let bytes = subscribe_request_bytes(since_sequence_number);
1700        let request = buffa::view::OwnedView::<SubscribeRequestView<'static>>::decode(bytes.into())
1701            .expect("decode subscribe request");
1702        Ok(StreamApi::subscribe(connect, Context::default(), request)
1703            .await?
1704            .body)
1705    }
1706
1707    #[tokio::test]
1708    async fn compact_connect_accepts_prune_only_engine() {
1709        let prune = Arc::new(PruneOnlyEngine::default());
1710        let connect = CompactConnect::new(CompactState::new(prune.clone()));
1711        let request = prune_request(vec![sequence_drop_all_policy()]);
1712
1713        CompactApi::prune(&connect, Context::default(), request)
1714            .await
1715            .expect("prune");
1716
1717        assert_eq!(prune.applied_count(), 1);
1718        assert_eq!(
1719            prune.last_document(),
1720            Some((PRUNE_POLICY_DOCUMENT_VERSION, 1))
1721        );
1722    }
1723
1724    #[tokio::test]
1725    async fn compact_rejects_unparseable_policy_before_engine_prune() {
1726        let prune = Arc::new(PruneOnlyEngine::default());
1727        let connect = CompactConnect::new(CompactState::new(prune.clone()));
1728        let invalid_policy = ProtoPolicy {
1729            scope: Some(policy::Scope::Sequence(Box::default())),
1730            ..Default::default()
1731        };
1732        let request = prune_request(vec![invalid_policy]);
1733
1734        let err = CompactApi::prune(&connect, Context::default(), request)
1735            .await
1736            .expect_err("invalid prune");
1737
1738        assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
1739        assert_eq!(prune.applied_count(), 0);
1740    }
1741
1742    #[tokio::test]
1743    async fn compact_rejects_invalid_policy_before_engine_prune() {
1744        let prune = Arc::new(PruneOnlyEngine::default());
1745        let connect = CompactConnect::new(CompactState::new(prune.clone()));
1746        let request = prune_request(vec![sequence_keep_latest_policy(0)]);
1747
1748        let err = CompactApi::prune(&connect, Context::default(), request)
1749            .await
1750            .expect_err("invalid prune");
1751
1752        assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
1753        assert_eq!(prune.applied_count(), 0);
1754    }
1755
1756    #[tokio::test]
1757    async fn query_connect_accepts_query_only_engine() {
1758        let query = Arc::new(QueryOnlyEngine {
1759            sequence_number: 9,
1760            value: Some(Bytes::from_static(b"value")),
1761        });
1762        let connect = QueryConnect::new(QueryState { query });
1763        let bytes = exoware_proto::query::GetRequest {
1764            key: b"k".to_vec(),
1765            ..Default::default()
1766        }
1767        .encode_to_vec();
1768        let request = buffa::view::OwnedView::<
1769            exoware_proto::store::query::v1::GetRequestView<'static>,
1770        >::decode(bytes.into())
1771        .expect("decode get request");
1772
1773        let response = QueryApi::get(&connect, Context::default(), request)
1774            .await
1775            .expect("get")
1776            .body;
1777        let detail = response.detail.as_option().expect("query detail");
1778
1779        assert_eq!(response.value.as_deref(), Some(b"value".as_slice()));
1780        assert_eq!(detail.sequence_number, 9);
1781    }
1782
1783    #[tokio::test]
1784    async fn get_includes_engine_query_extra() {
1785        let engine = Arc::new(FakeEngine::default());
1786        engine.set_current_sequence(5);
1787        engine.set_query_extra(HashMap::from([(
1788            "scanned_bytes".to_string(),
1789            buffa_types::google::protobuf::Value::from(123.0),
1790        )]));
1791        let connect = QueryConnect::new(AppState::new(engine));
1792        let bytes = exoware_proto::query::GetRequest {
1793            key: b"k".to_vec(),
1794            ..Default::default()
1795        }
1796        .encode_to_vec();
1797        let request = buffa::view::OwnedView::<
1798            exoware_proto::store::query::v1::GetRequestView<'static>,
1799        >::decode(bytes.into())
1800        .expect("decode get request");
1801
1802        let response = QueryApi::get(&connect, Context::default(), request)
1803            .await
1804            .expect("get")
1805            .body;
1806        let detail = response.detail.as_option().expect("query detail");
1807
1808        assert_eq!(detail.sequence_number, 5);
1809        assert_eq!(
1810            detail
1811                .extra
1812                .get("scanned_bytes")
1813                .and_then(|v| v.as_number()),
1814            Some(123.0)
1815        );
1816    }
1817
1818    #[test]
1819    fn split_service_constructors_build_independent_process_surfaces() {
1820        let engine = Arc::new(FakeEngine::default());
1821        let state = AppState::new(engine);
1822
1823        let _ingest = ingest_service(state.clone().into());
1824        let _query = query_service(state.clone().into());
1825        let _compact = compact_service(state.clone().into());
1826        let _stream = stream_service(state.clone().into());
1827        let _query_stack = query_stack(state.clone().into(), state.into());
1828    }
1829
1830    #[tokio::test]
1831    async fn ingest_uses_configured_value_limit() {
1832        let engine = Arc::new(FakeEngine::default());
1833        let state = IngestState::new(engine).with_limits(IngestLimits { max_value_len: 4 });
1834        let connect = IngestConnect::new(state);
1835
1836        let err = IngestApi::put(&connect, Context::default(), put_request(5))
1837            .await
1838            .expect_err("put should reject oversized value");
1839
1840        assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
1841    }
1842
1843    #[tokio::test]
1844    async fn stream_can_be_advanced_by_external_notifier() {
1845        let engine = Arc::new(FakeEngine::default());
1846        let notifier = Arc::new(ManualNotifier::new(0));
1847        let connect = StreamConnect::new(StreamState::new(engine.clone(), notifier.clone()));
1848        let mut stream = subscribe_stream(&connect, None).await.expect("subscribe");
1849
1850        engine.set_current_sequence(1);
1851        engine.set_batch(1, Some(vec![matching_kv(b"hit", b"v1")]));
1852        notifier.advance(1);
1853
1854        let frame = tokio::time::timeout(Duration::from_secs(1), stream.next())
1855            .await
1856            .expect("stream should yield")
1857            .expect("frame should exist")
1858            .expect("frame should be ok");
1859        assert_eq!(frame.sequence_number, 1);
1860        assert_eq!(frame.entries.len(), 1);
1861        assert_eq!(frame.entries[0].value.as_ref(), b"v1");
1862    }
1863
1864    #[tokio::test]
1865    async fn reduce_consumes_range_iterator_and_returns_detail() {
1866        let engine = Arc::new(FakeEngine::default());
1867        engine.set_current_sequence(7);
1868        engine.set_range_rows(vec![
1869            (Bytes::from_static(b"a"), Bytes::from_static(b"xx")),
1870            (Bytes::from_static(b"bb"), Bytes::from_static(b"yyy")),
1871        ]);
1872        let connect = QueryConnect::new(AppState::new(engine.clone()));
1873        let bytes = exoware_proto::query::ReduceRequest {
1874            start: b"a".to_vec(),
1875            end: b"z".to_vec(),
1876            params: Some(exoware_proto::query::ReduceParams {
1877                reducers: vec![exoware_proto::query::RangeReducerSpec {
1878                    op: exoware_proto::query::RangeReduceOp::RANGE_REDUCE_OP_COUNT_ALL.into(),
1879                    ..Default::default()
1880                }],
1881                ..Default::default()
1882            })
1883            .into(),
1884            ..Default::default()
1885        }
1886        .encode_to_vec();
1887        let request = buffa::view::OwnedView::<
1888            exoware_proto::store::query::v1::ReduceRequestView<'static>,
1889        >::decode(bytes.into())
1890        .expect("decode reduce request");
1891
1892        let response = QueryApi::reduce(&connect, Context::default(), request)
1893            .await
1894            .expect("reduce")
1895            .body;
1896        let detail = response.detail.as_option().expect("query detail").clone();
1897        let response = to_domain_reduce_response(response).expect("decode reduce response");
1898
1899        assert_eq!(engine.range_next_count(), 2);
1900        assert_eq!(response.results.len(), 1);
1901        assert_eq!(response.results[0].value, Some(KvReducedValue::UInt64(2)));
1902        assert_eq!(detail.sequence_number, 7);
1903        assert!(detail.extra.is_empty());
1904    }
1905
1906    #[tokio::test]
1907    async fn reduce_uses_eof_query_extra() {
1908        let engine = Arc::new(FakeEngine::default());
1909        engine.set_current_sequence(8);
1910        engine.set_range_rows(vec![
1911            (Bytes::from_static(b"a"), Bytes::from_static(b"xx")),
1912            (Bytes::from_static(b"bb"), Bytes::from_static(b"yyy")),
1913        ]);
1914        engine.set_range_eof_extra(numeric_query_extra("final_rows", 2.0));
1915        let connect = QueryConnect::new(AppState::new(engine));
1916        let bytes = exoware_proto::query::ReduceRequest {
1917            start: b"a".to_vec(),
1918            end: b"z".to_vec(),
1919            params: Some(exoware_proto::query::ReduceParams {
1920                reducers: vec![exoware_proto::query::RangeReducerSpec {
1921                    op: exoware_proto::query::RangeReduceOp::RANGE_REDUCE_OP_COUNT_ALL.into(),
1922                    ..Default::default()
1923                }],
1924                ..Default::default()
1925            })
1926            .into(),
1927            ..Default::default()
1928        }
1929        .encode_to_vec();
1930        let request = buffa::view::OwnedView::<
1931            exoware_proto::store::query::v1::ReduceRequestView<'static>,
1932        >::decode(bytes.into())
1933        .expect("decode reduce request");
1934
1935        let response = QueryApi::reduce(&connect, Context::default(), request)
1936            .await
1937            .expect("reduce")
1938            .body;
1939        let detail = response.detail.as_option().expect("query detail");
1940
1941        assert_eq!(detail.sequence_number, 8);
1942        assert_eq!(
1943            detail.extra.get("final_rows").and_then(|v| v.as_number()),
1944            Some(2.0)
1945        );
1946    }
1947
1948    #[tokio::test]
1949    async fn get_many_populates_detail_on_each_frame() {
1950        let engine = Arc::new(FakeEngine::default());
1951        engine.set_current_sequence(11);
1952        let connect = QueryConnect::new(AppState::new(engine));
1953        let bytes = exoware_proto::query::GetManyRequest {
1954            keys: vec![b"a".to_vec(), b"bb".to_vec(), b"ccc".to_vec()],
1955            batch_size: 2,
1956            ..Default::default()
1957        }
1958        .encode_to_vec();
1959        let request = buffa::view::OwnedView::<
1960            exoware_proto::store::query::v1::GetManyRequestView<'static>,
1961        >::decode(bytes.into())
1962        .expect("decode get_many request");
1963
1964        let mut stream = QueryApi::get_many(&connect, Context::default(), request)
1965            .await
1966            .expect("get_many")
1967            .body;
1968        let mut frame_sizes = Vec::new();
1969        let mut detail_frames = 0usize;
1970        while let Some(frame) = stream.next().await {
1971            let frame = frame.expect("get_many frame");
1972            frame_sizes.push(frame.results.len());
1973            let detail = frame.detail.as_option().expect("query detail");
1974            assert_eq!(detail.sequence_number, 11);
1975            assert!(detail.extra.is_empty());
1976            detail_frames += 1;
1977        }
1978
1979        assert_eq!(frame_sizes, vec![2, 1]);
1980        assert_eq!(detail_frames, 2);
1981    }
1982
1983    #[tokio::test]
1984    async fn range_returns_without_materializing_full_iterator() {
1985        let engine = Arc::new(FakeEngine::default());
1986        engine.set_current_sequence(9);
1987        engine.set_range_rows(
1988            (0..1000)
1989                .map(|i| {
1990                    (
1991                        Bytes::from(format!("key-{i:04}")),
1992                        Bytes::from_static(b"value"),
1993                    )
1994                })
1995                .collect(),
1996        );
1997        let connect = QueryConnect::new(AppState::new(engine.clone()));
1998        let bytes = exoware_proto::query::RangeRequest {
1999            start: b"a".to_vec(),
2000            end: b"z".to_vec(),
2001            limit: Some(1000),
2002            batch_size: 1,
2003            ..Default::default()
2004        }
2005        .encode_to_vec();
2006        let request = buffa::view::OwnedView::<
2007            exoware_proto::store::query::v1::RangeRequestView<'static>,
2008        >::decode(bytes.into())
2009        .expect("decode range request");
2010
2011        let mut stream = QueryApi::range(&connect, Context::default(), request)
2012            .await
2013            .expect("range")
2014            .body;
2015
2016        tokio::time::sleep(Duration::from_millis(50)).await;
2017        let consumed = engine.range_next_count();
2018        assert!(
2019            consumed < 1000,
2020            "range should not consume the full iterator before the response stream is read; consumed {consumed}",
2021        );
2022
2023        let mut rows = 0;
2024        let mut latest_detail = None;
2025        let mut detail_frames = 0usize;
2026        while let Some(frame) = stream.next().await {
2027            let frame = frame.expect("range frame");
2028            rows += frame.results.len();
2029            if let Some(detail) = frame.detail.as_option() {
2030                detail_frames += 1;
2031                latest_detail = Some(detail.clone());
2032            }
2033        }
2034
2035        assert_eq!(rows, 1000);
2036        assert_eq!(detail_frames, 1000);
2037        let detail = latest_detail.expect("query detail");
2038        assert_eq!(detail.sequence_number, 9);
2039        assert!(detail.extra.is_empty());
2040    }
2041
2042    #[tokio::test]
2043    async fn range_emits_eof_query_extra_after_rows() {
2044        let engine = Arc::new(FakeEngine::default());
2045        engine.set_current_sequence(10);
2046        engine.set_range_rows(vec![
2047            (Bytes::from_static(b"a"), Bytes::from_static(b"1")),
2048            (Bytes::from_static(b"b"), Bytes::from_static(b"2")),
2049        ]);
2050        engine.set_range_eof_extra(numeric_query_extra("final_rows", 2.0));
2051        let connect = QueryConnect::new(AppState::new(engine));
2052        let bytes = exoware_proto::query::RangeRequest {
2053            start: b"a".to_vec(),
2054            end: b"z".to_vec(),
2055            limit: Some(2),
2056            batch_size: 2,
2057            ..Default::default()
2058        }
2059        .encode_to_vec();
2060        let request = buffa::view::OwnedView::<
2061            exoware_proto::store::query::v1::RangeRequestView<'static>,
2062        >::decode(bytes.into())
2063        .expect("decode range request");
2064
2065        let mut stream = QueryApi::range(&connect, Context::default(), request)
2066            .await
2067            .expect("range")
2068            .body;
2069        let mut frames = Vec::new();
2070        while let Some(frame) = stream.next().await {
2071            frames.push(frame.expect("range frame"));
2072        }
2073
2074        assert_eq!(frames.len(), 2);
2075        assert_eq!(frames[0].results.len(), 2);
2076        let row_detail = frames[0].detail.as_option().expect("row detail");
2077        assert!(row_detail.extra.is_empty());
2078
2079        assert!(frames[1].results.is_empty());
2080        let final_detail = frames[1].detail.as_option().expect("final detail");
2081        assert_eq!(final_detail.sequence_number, 10);
2082        assert_eq!(
2083            final_detail
2084                .extra
2085                .get("final_rows")
2086                .and_then(|v| v.as_number()),
2087            Some(2.0)
2088        );
2089    }
2090
2091    #[tokio::test]
2092    async fn subscribe_without_replay_reads_the_next_live_batch() {
2093        let engine = Arc::new(FakeEngine::default());
2094        let state = AppState::new(engine.clone());
2095        let connect = StreamConnect::new(state.clone());
2096        let mut stream = subscribe_stream(&connect, None).await.expect("subscribe");
2097        engine.publish_live(state.stream.clone(), 1, vec![matching_kv(b"hit", b"v1")]);
2098        let frame = tokio::time::timeout(Duration::from_secs(1), stream.next())
2099            .await
2100            .expect("stream should yield")
2101            .expect("frame should exist")
2102            .expect("frame should be ok");
2103        assert_eq!(frame.sequence_number, 1);
2104        assert_eq!(frame.entries.len(), 1);
2105        assert_eq!(frame.entries[0].value.as_ref(), b"v1");
2106    }
2107
2108    #[tokio::test]
2109    async fn subscribe_past_end_reads_only_future_live_batches() {
2110        let engine = Arc::new(FakeEngine::default());
2111        engine.set_current_sequence(5);
2112        for seq in 1..=5 {
2113            engine.set_batch(seq, Some(vec![matching_kv(b"seed", b"v")]));
2114        }
2115        let state = AppState::new(engine.clone());
2116        let connect = StreamConnect::new(state.clone());
2117        let mut stream = subscribe_stream(&connect, Some(15))
2118            .await
2119            .expect("subscribe");
2120
2121        assert!(
2122            tokio::time::timeout(Duration::from_millis(200), stream.next())
2123                .await
2124                .is_err(),
2125            "past-end cursor should not replay synthetic or historical frames",
2126        );
2127
2128        engine.publish_live(state.stream.clone(), 6, vec![matching_kv(b"live", b"n")]);
2129        let frame = tokio::time::timeout(Duration::from_secs(1), stream.next())
2130            .await
2131            .expect("stream should yield")
2132            .expect("frame should exist")
2133            .expect("frame should be ok");
2134        assert_eq!(frame.sequence_number, 6);
2135        assert_eq!(frame.entries.len(), 1);
2136        assert_eq!(frame.entries[0].value.as_ref(), b"n");
2137    }
2138
2139    #[tokio::test]
2140    async fn replay_hole_returns_batch_evicted_error_instead_of_empty_frame() {
2141        let engine = Arc::new(FakeEngine::default());
2142        engine.set_current_sequence(3);
2143        engine.set_oldest_retained(Some(2));
2144        engine.set_batch(2, Some(vec![matching_kv(b"replay", b"v2")]));
2145
2146        let state = AppState::new(engine);
2147        let connect = StreamConnect::new(state);
2148        let mut stream = subscribe_stream(&connect, Some(2))
2149            .await
2150            .expect("subscribe");
2151
2152        let first = tokio::time::timeout(Duration::from_secs(1), stream.next())
2153            .await
2154            .expect("stream should yield")
2155            .expect("first replay frame should exist")
2156            .expect("first replay frame should be ok");
2157        assert_eq!(first.sequence_number, 2);
2158        assert_eq!(first.entries.len(), 1);
2159
2160        let err = tokio::time::timeout(Duration::from_secs(1), stream.next())
2161            .await
2162            .expect("stream should yield error")
2163            .expect("error item should exist")
2164            .expect_err("replay hole must be surfaced as an error");
2165        let decoded = decode_connect_error(&err).expect("decode connect error");
2166        assert_eq!(
2167            decoded.error_info.expect("error info").reason,
2168            crate::stream::REASON_BATCH_EVICTED,
2169        );
2170        assert!(
2171            tokio::time::timeout(Duration::from_secs(1), stream.next())
2172                .await
2173                .expect("stream should terminate")
2174                .is_none(),
2175            "stream must terminate after surfacing the replay hole",
2176        );
2177    }
2178
2179    #[tokio::test]
2180    async fn replay_with_live_burst_under_capacity_still_delivers_in_order() {
2181        const REPLAY_BATCHES: u64 = 100;
2182
2183        let engine = Arc::new(FakeEngine::default());
2184        engine.set_current_sequence(REPLAY_BATCHES);
2185        engine.set_oldest_retained(Some(1));
2186        for seq in 1..=REPLAY_BATCHES {
2187            engine.set_batch(seq, Some(vec![matching_kv(b"replay", b"v")]));
2188        }
2189
2190        let state = AppState::new(engine.clone());
2191        engine.publish_on_every_get_batch(
2192            state.stream.clone(),
2193            REPLAY_BATCHES,
2194            vec![matching_kv(b"live", b"tail")],
2195        );
2196
2197        let connect = StreamConnect::new(state);
2198        let mut stream = subscribe_stream(&connect, Some(1))
2199            .await
2200            .expect("subscribe");
2201        let mut sequence_numbers = Vec::with_capacity((REPLAY_BATCHES * 2) as usize);
2202        while sequence_numbers.len() < (REPLAY_BATCHES * 2) as usize {
2203            let frame = tokio::time::timeout(Duration::from_secs(2), stream.next())
2204                .await
2205                .expect("stream should keep yielding")
2206                .expect("frame should exist")
2207                .expect("frame should be ok");
2208            sequence_numbers.push(frame.sequence_number);
2209        }
2210
2211        let expected: Vec<u64> = (1..=(REPLAY_BATCHES * 2)).collect();
2212        assert_eq!(sequence_numbers, expected);
2213    }
2214
2215    #[tokio::test]
2216    async fn replay_large_live_burst_is_paced_by_client_reads() {
2217        const REPLAY_BATCHES: u64 = 300;
2218
2219        let engine = Arc::new(FakeEngine::default());
2220        engine.set_current_sequence(REPLAY_BATCHES);
2221        engine.set_oldest_retained(Some(1));
2222        for seq in 1..=REPLAY_BATCHES {
2223            engine.set_batch(seq, Some(vec![matching_kv(b"replay", b"v")]));
2224        }
2225
2226        let state = AppState::new(engine.clone());
2227        engine.publish_on_every_get_batch(
2228            state.stream.clone(),
2229            REPLAY_BATCHES,
2230            vec![matching_kv(b"live", b"tail")],
2231        );
2232
2233        let connect = StreamConnect::new(state);
2234        let mut stream = subscribe_stream(&connect, Some(1))
2235            .await
2236            .expect("subscribe");
2237        let mut sequence_numbers = Vec::with_capacity((REPLAY_BATCHES * 2) as usize);
2238        while sequence_numbers.len() < (REPLAY_BATCHES * 2) as usize {
2239            let frame = tokio::time::timeout(Duration::from_secs(2), stream.next())
2240                .await
2241                .expect("stream should keep yielding")
2242                .expect("frame should exist")
2243                .expect("frame should be ok");
2244            sequence_numbers.push(frame.sequence_number);
2245        }
2246        let expected: Vec<u64> = (1..=(REPLAY_BATCHES * 2)).collect();
2247        assert_eq!(sequence_numbers, expected);
2248    }
2249}