1#![allow(refining_impl_trait)]
4
5use std::collections::HashMap;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9
10use bytes::Bytes;
11use connectrpc::{Chain, ConnectError, ConnectRpcService, Limits, RequestContext as Context};
12use exoware_proto::common::KvEntry;
13use exoware_proto::compact::{
14 PruneResponse, Service as CompactApi, ServiceServer as CompactServiceServer,
15};
16use exoware_proto::google::rpc::{ErrorInfo, RetryInfo};
17use exoware_proto::ingest::{
18 PutResponse as ProtoPutResponse, Service as IngestApi, ServiceServer as IngestServiceServer,
19};
20use exoware_proto::query::{
21 Detail, GetManyEntry, GetManyFrame, GetResponse, RangeFrame, ReduceResponse,
22 Service as QueryApi, ServiceServer as QueryServiceServer,
23};
24use exoware_proto::store::stream::v1::{
25 GetRequestView, GetResponse as StreamGetResponse, Service as StreamApi,
26 ServiceServer as StreamServiceServer, SubscribeRequestView, SubscribeResponse,
27};
28use exoware_proto::stream_filter::{BytesFilter, StreamFilter};
29use exoware_proto::{
30 connect_compression_registry, parse_range_traversal_direction,
31 to_domain_reduce_request_from_view, to_proto_optional_reduced_value, to_proto_reduced_value,
32 with_error_info_detail, with_query_detail, with_retry_info_detail, RangeTraversalDirection,
33};
34use exoware_sdk as exoware_proto;
35use exoware_sdk::keys::Key;
36use exoware_sdk::match_key::MatchKey;
37use exoware_sdk::store::common::v1::bytes_filter::KindView as ProtoBytesFilterKindView;
38use futures::{stream as stream_util, Stream};
39use tokio::sync::Notify;
40
41use crate::reduce::RangeReducer;
42use crate::stream::{StreamHub, StreamNotifier};
43use crate::validate::{self, IngestLimits};
44use crate::{Ingest, Log, Prune, Query, QueryExtra, RangeScan, StoreEngine};
45
46const MAX_CONNECTRPC_BODY_BYTES: usize = 256 * 1024 * 1024;
48const RANGE_STREAM_MAX_FRAME_ROWS: usize = 4096;
49const REDUCE_SCAN_BATCH_SIZE: usize = 4096;
50
51fn query_detail(sequence_number: u64, extra: QueryExtra) -> Detail {
52 Detail {
53 sequence_number,
54 extra,
55 ..Default::default()
56 }
57}
58
59struct RangeStreamRequest {
60 start_key: Key,
61 end_key: Key,
62 limit: usize,
63 batch_size: usize,
64 forward: bool,
65 sequence_number: u64,
66}
67
68async fn range_stream<Q>(
69 query: Arc<Q>,
70 request: RangeStreamRequest,
71) -> Result<Pin<Box<dyn Stream<Item = Result<RangeFrame, ConnectError>> + Send>>, ConnectError>
72where
73 Q: Query,
74{
75 let RangeStreamRequest {
76 start_key,
77 end_key,
78 limit,
79 batch_size,
80 forward,
81 sequence_number,
82 } = request;
83 let entries = query
84 .range_scan(start_key, end_key, limit, forward)
85 .await
86 .map_err(ConnectError::internal)?;
87
88 Ok(Box::pin(stream_util::unfold(
89 Some((entries, false)),
90 move |state| async move {
91 let (mut entries, emitted_frame) = state?;
92 let batch = match entries.next_batch(batch_size).await {
93 Ok(batch) => batch,
94 Err(e) => return Some((Err(ConnectError::internal(e)), None)),
95 };
96 let detail = query_detail(sequence_number, batch.extra);
97 if batch.rows.is_empty() {
98 if emitted_frame && detail.extra.is_empty() {
99 return None;
100 }
101 return Some((
102 Ok(RangeFrame {
103 detail: Some(detail).into(),
104 ..Default::default()
105 }),
106 None,
107 ));
108 }
109
110 let mut chunk = Vec::with_capacity(batch.rows.len());
111 for (key, value) in batch.rows {
112 chunk.push(KvEntry {
113 key: key.into(),
114 value,
115 ..Default::default()
116 });
117 }
118 Some((
119 Ok(RangeFrame {
120 results: chunk,
121 detail: Some(detail).into(),
122 ..Default::default()
123 }),
124 Some((entries, true)),
125 ))
126 },
127 )))
128}
129
130pub struct AppState<E> {
133 pub engine: Arc<E>,
135 pub ingest_limits: IngestLimits,
137 pub ready: Arc<AtomicBool>,
140 pub stream: Arc<StreamHub>,
142}
143
144impl<E> Clone for AppState<E> {
145 fn clone(&self) -> Self {
146 Self {
147 engine: self.engine.clone(),
148 ingest_limits: self.ingest_limits,
149 ready: self.ready.clone(),
150 stream: self.stream.clone(),
151 }
152 }
153}
154
155impl<E> AppState<E>
156where
157 E: StoreEngine,
158{
159 pub fn new(engine: Arc<E>) -> Self {
160 let current_sequence = engine.current_sequence();
161 Self {
162 engine,
163 ingest_limits: IngestLimits::default(),
164 ready: Arc::new(AtomicBool::new(true)),
165 stream: Arc::new(StreamHub::new(current_sequence)),
166 }
167 }
168
169 pub fn with_ingest_limits(mut self, limits: IngestLimits) -> Self {
170 self.ingest_limits = limits;
171 self
172 }
173}
174
175pub struct IngestState<I> {
177 pub ingest: Arc<I>,
179 pub limits: IngestLimits,
181 pub ready: Arc<AtomicBool>,
183 pub notifier: Option<Arc<dyn StreamNotifier>>,
185}
186
187impl<I> Clone for IngestState<I> {
188 fn clone(&self) -> Self {
189 Self {
190 ingest: self.ingest.clone(),
191 limits: self.limits,
192 ready: self.ready.clone(),
193 notifier: self.notifier.clone(),
194 }
195 }
196}
197
198impl<I> IngestState<I>
199where
200 I: Ingest,
201{
202 pub fn new(ingest: Arc<I>) -> Self {
203 Self {
204 ingest,
205 limits: IngestLimits::default(),
206 ready: Arc::new(AtomicBool::new(true)),
207 notifier: None,
208 }
209 }
210
211 pub fn with_notifier(ingest: Arc<I>, notifier: Arc<dyn StreamNotifier>) -> Self {
212 Self {
213 ingest,
214 limits: IngestLimits::default(),
215 ready: Arc::new(AtomicBool::new(true)),
216 notifier: Some(notifier),
217 }
218 }
219
220 pub fn with_limits(mut self, limits: IngestLimits) -> Self {
221 self.limits = limits;
222 self
223 }
224}
225
226impl<E> From<AppState<E>> for IngestState<E> {
227 fn from(state: AppState<E>) -> Self {
228 Self {
229 ingest: state.engine,
230 limits: state.ingest_limits,
231 ready: state.ready,
232 notifier: Some(state.stream),
233 }
234 }
235}
236
237pub struct QueryState<Q> {
239 pub query: Arc<Q>,
241}
242
243impl<Q> Clone for QueryState<Q> {
244 fn clone(&self) -> Self {
245 Self {
246 query: self.query.clone(),
247 }
248 }
249}
250
251impl<Q> QueryState<Q>
252where
253 Q: Query,
254{
255 pub fn new(query: Arc<Q>) -> Self {
256 Self { query }
257 }
258}
259
260impl<E> From<AppState<E>> for QueryState<E> {
261 fn from(state: AppState<E>) -> Self {
262 Self {
263 query: state.engine,
264 }
265 }
266}
267
268pub struct CompactState<P> {
270 pub prune: Arc<P>,
272}
273
274impl<P> Clone for CompactState<P> {
275 fn clone(&self) -> Self {
276 Self {
277 prune: self.prune.clone(),
278 }
279 }
280}
281
282impl<P> CompactState<P>
283where
284 P: Prune,
285{
286 pub fn new(prune: Arc<P>) -> Self {
287 Self { prune }
288 }
289}
290
291impl<E> From<AppState<E>> for CompactState<E> {
292 fn from(state: AppState<E>) -> Self {
293 Self {
294 prune: state.engine,
295 }
296 }
297}
298
299pub struct StreamState<L> {
301 pub log: Arc<L>,
303 pub notifier: Arc<dyn StreamNotifier>,
305}
306
307impl<L> Clone for StreamState<L> {
308 fn clone(&self) -> Self {
309 Self {
310 log: self.log.clone(),
311 notifier: self.notifier.clone(),
312 }
313 }
314}
315
316impl<L> StreamState<L>
317where
318 L: Log,
319{
320 pub fn new(log: Arc<L>, notifier: Arc<dyn StreamNotifier>) -> Self {
321 Self { log, notifier }
322 }
323}
324
325impl<E> From<AppState<E>> for StreamState<E> {
326 fn from(state: AppState<E>) -> Self {
327 Self {
328 log: state.engine,
329 notifier: state.stream,
330 }
331 }
332}
333
334pub struct IngestConnect<I> {
335 state: IngestState<I>,
336}
337
338impl<I> Clone for IngestConnect<I> {
339 fn clone(&self) -> Self {
340 Self {
341 state: self.state.clone(),
342 }
343 }
344}
345
346impl<I> IngestConnect<I>
347where
348 I: Ingest,
349{
350 pub fn new(state: impl Into<IngestState<I>>) -> Self {
351 Self {
352 state: state.into(),
353 }
354 }
355}
356
357impl<I> IngestApi for IngestConnect<I>
358where
359 I: Ingest,
360{
361 async fn put(
362 &self,
363 _ctx: Context,
364 request: buffa::view::OwnedView<exoware_proto::store::ingest::v1::PutRequestView<'static>>,
365 ) -> connectrpc::ServiceResult<ProtoPutResponse> {
366 if !self.state.ready.load(Ordering::SeqCst) {
367 return Err(with_error_info_detail(
368 ConnectError::unavailable("ingest is not ready"),
369 ErrorInfo {
370 reason: "WORKER_NOT_READY".to_string(),
371 domain: "store.ingest".to_string(),
372 ..Default::default()
373 },
374 ));
375 }
376
377 validate::validate_put_request(&request, self.state.limits)?;
378
379 let wire = request.bytes();
380 let mut batch = Vec::with_capacity(request.kvs.len());
381 for kv in request.kvs.iter() {
382 let key: Key = wire.slice_ref(kv.key);
383 let value = wire.slice_ref(kv.value);
384 batch.push((key, value));
385 }
386
387 let seq = self
388 .state
389 .ingest
390 .put_batch(batch)
391 .await
392 .map_err(ConnectError::internal)?;
393
394 if let Some(notifier) = &self.state.notifier {
396 notifier.advance(seq);
397 }
398
399 connectrpc::Response::ok(ProtoPutResponse {
400 sequence_number: seq,
401 ..Default::default()
402 })
403 }
404}
405
406pub struct QueryConnect<Q> {
407 state: QueryState<Q>,
408}
409
410impl<Q> Clone for QueryConnect<Q> {
411 fn clone(&self) -> Self {
412 Self {
413 state: self.state.clone(),
414 }
415 }
416}
417
418impl<Q> QueryConnect<Q>
419where
420 Q: Query,
421{
422 pub fn new(state: impl Into<QueryState<Q>>) -> Self {
423 Self {
424 state: state.into(),
425 }
426 }
427
428 fn current_sequence_number(&self) -> u64 {
429 self.state.query.current_sequence()
430 }
431
432 fn error_detail(&self) -> Detail {
433 Detail {
434 sequence_number: self.current_sequence_number(),
435 ..Default::default()
436 }
437 }
438
439 fn consistency_not_ready_error(&self, required: u64, current: u64) -> ConnectError {
440 let err = with_retry_info_detail(
441 ConnectError::aborted("minimum consistency token is not yet visible"),
442 RetryInfo {
443 retry_delay: Some(buffa_types::google::protobuf::Duration::from(
444 std::time::Duration::from_secs(1),
445 ))
446 .into(),
447 ..Default::default()
448 },
449 );
450 with_query_detail(
451 with_error_info_detail(
452 err,
453 ErrorInfo {
454 reason: "CONSISTENCY_NOT_READY".to_string(),
455 domain: "store.query".to_string(),
456 metadata: [
457 ("required_sequence_number".to_string(), required.to_string()),
458 ("current_sequence_number".to_string(), current.to_string()),
459 ]
460 .into_iter()
461 .collect(),
462 ..Default::default()
463 },
464 ),
465 self.error_detail(),
466 )
467 }
468
469 fn ensure_min_sequence_number(&self, required: Option<u64>) -> Result<u64, ConnectError> {
470 let current = self.current_sequence_number();
471 if let Some(required) = required {
472 if current < required {
473 return Err(self.consistency_not_ready_error(required, current));
474 }
475 }
476 Ok(current)
477 }
478}
479
480impl<Q> QueryApi for QueryConnect<Q>
481where
482 Q: Query,
483{
484 async fn get(
485 &self,
486 _ctx: Context,
487 request: buffa::view::OwnedView<exoware_proto::store::query::v1::GetRequestView<'static>>,
488 ) -> connectrpc::ServiceResult<GetResponse> {
489 validate::validate_get_request(&request)?;
490 let token = self.ensure_min_sequence_number(request.min_sequence_number)?;
491 let wire = request.bytes();
492 let key: Key = wire.slice_ref(request.key);
493 let (value, extra) = self
494 .state
495 .query
496 .get(key)
497 .await
498 .map_err(ConnectError::internal)?;
499 let detail = query_detail(token, extra);
500 connectrpc::Response::ok(GetResponse {
501 value,
502 detail: Some(detail).into(),
503 ..Default::default()
504 })
505 }
506
507 async fn get_many(
508 &self,
509 _ctx: Context,
510 request: buffa::view::OwnedView<
511 exoware_proto::store::query::v1::GetManyRequestView<'static>,
512 >,
513 ) -> connectrpc::ServiceResult<connectrpc::ServiceStream<GetManyFrame>> {
514 validate::validate_get_many_request(&request)?;
515 let sequence_number = self.ensure_min_sequence_number(request.min_sequence_number)?;
516
517 let wire = request.bytes();
518 let keys: Vec<Key> = request.keys.iter().map(|key| wire.slice_ref(key)).collect();
519 let (entries, extra) = self
520 .state
521 .query
522 .get_many(keys)
523 .await
524 .map_err(ConnectError::internal)?;
525 let detail = query_detail(sequence_number, extra);
526 let batch_size = (request.batch_size as usize).min(RANGE_STREAM_MAX_FRAME_ROWS);
527 let mut frames = Vec::new();
528 let mut chunk = Vec::new();
529 for (key, value) in entries {
530 chunk.push(GetManyEntry {
531 key: key.to_vec(),
532 value,
533 ..Default::default()
534 });
535 if chunk.len() >= batch_size {
536 frames.push(Ok(GetManyFrame {
537 results: std::mem::take(&mut chunk),
538 detail: Some(detail.clone()).into(),
539 ..Default::default()
540 }));
541 }
542 }
543 if !chunk.is_empty() {
544 frames.push(Ok(GetManyFrame {
545 results: chunk,
546 detail: Some(detail).into(),
547 ..Default::default()
548 }));
549 } else if frames.is_empty() {
550 frames.push(Ok(GetManyFrame {
551 detail: Some(detail).into(),
552 ..Default::default()
553 }));
554 }
555
556 Ok(connectrpc::Response::stream(stream_util::iter(frames)))
557 }
558
559 async fn range(
560 &self,
561 _ctx: Context,
562 request: buffa::view::OwnedView<exoware_proto::store::query::v1::RangeRequestView<'static>>,
563 ) -> connectrpc::ServiceResult<connectrpc::ServiceStream<RangeFrame>> {
564 validate::validate_range_request(&request)?;
565 let sequence_number = self.ensure_min_sequence_number(request.min_sequence_number)?;
566 let wire = request.bytes();
567 let start_key: Key = wire.slice_ref(request.start);
568 let end_key: Key = wire.slice_ref(request.end);
569 let limit = request.limit.map(|v| v as usize).unwrap_or(usize::MAX);
570 let batch_size = (request.batch_size as usize).min(RANGE_STREAM_MAX_FRAME_ROWS);
571 let forward = match parse_range_traversal_direction(request.mode) {
572 Ok(RangeTraversalDirection::Forward) => true,
573 Ok(RangeTraversalDirection::Reverse) => false,
574 Err(e) => return Err(ConnectError::internal(format!("traversal mode: {e:?}"))),
575 };
576 Ok(connectrpc::Response::stream(
577 range_stream(
578 self.state.query.clone(),
579 RangeStreamRequest {
580 start_key,
581 end_key,
582 limit,
583 batch_size,
584 forward,
585 sequence_number,
586 },
587 )
588 .await?,
589 ))
590 }
591
592 async fn reduce(
593 &self,
594 _ctx: Context,
595 request: buffa::view::OwnedView<
596 exoware_proto::store::query::v1::ReduceRequestView<'static>,
597 >,
598 ) -> connectrpc::ServiceResult<ReduceResponse> {
599 validate::validate_reduce_request(&request)?;
600 let token = self.ensure_min_sequence_number(request.min_sequence_number)?;
601 let wire = request.bytes();
602 let start_key: Key = wire.slice_ref(request.start);
603 let end_key: Key = wire.slice_ref(request.end);
604 let domain = to_domain_reduce_request_from_view(&request.params)
605 .map_err(validate::reduce_params_error)?;
606
607 let mut rows = self
608 .state
609 .query
610 .range_scan(start_key, end_key, usize::MAX, true)
611 .await
612 .map_err(ConnectError::internal)?;
613
614 let mut reducer = RangeReducer::new(&domain)
615 .map_err(|e: crate::RangeError| ConnectError::internal(e.to_string()))?;
616 let mut latest_extra = None;
617 let final_extra = loop {
618 let batch = rows
619 .next_batch(REDUCE_SCAN_BATCH_SIZE)
620 .await
621 .map_err(ConnectError::internal)?;
622 if batch.rows.is_empty() {
623 break if batch.extra.is_empty() {
624 latest_extra.unwrap_or_default()
625 } else {
626 batch.extra
627 };
628 }
629 latest_extra = Some(batch.extra);
630 for (key, value) in batch.rows {
631 reducer
632 .update(&key, &value)
633 .map_err(|e: crate::RangeError| ConnectError::internal(e.to_string()))?;
634 }
635 };
636 let response = reducer.finish();
637
638 let detail = query_detail(token, final_extra);
639
640 connectrpc::Response::ok(ReduceResponse {
641 results: response
642 .results
643 .into_iter()
644 .map(|result| exoware_proto::query::RangeReduceResult {
645 value: result.value.map(to_proto_reduced_value).into(),
646 ..Default::default()
647 })
648 .collect(),
649 groups: response
650 .groups
651 .into_iter()
652 .map(|group| {
653 let group_values_present =
654 group.group_values.iter().map(Option::is_some).collect();
655 exoware_proto::query::RangeReduceGroup {
656 group_values: group
657 .group_values
658 .into_iter()
659 .map(to_proto_optional_reduced_value)
660 .collect(),
661 group_values_present,
662 results: group
663 .results
664 .into_iter()
665 .map(|result| exoware_proto::query::RangeReduceResult {
666 value: result.value.map(to_proto_reduced_value).into(),
667 ..Default::default()
668 })
669 .collect(),
670 ..Default::default()
671 }
672 })
673 .collect(),
674 detail: Some(detail).into(),
675 ..Default::default()
676 })
677 }
678}
679
680pub struct CompactConnect<P> {
681 state: CompactState<P>,
682}
683
684impl<P> Clone for CompactConnect<P> {
685 fn clone(&self) -> Self {
686 Self {
687 state: self.state.clone(),
688 }
689 }
690}
691
692impl<P> CompactConnect<P>
693where
694 P: Prune,
695{
696 pub fn new(state: impl Into<CompactState<P>>) -> Self {
697 Self {
698 state: state.into(),
699 }
700 }
701}
702
703impl<P> CompactApi for CompactConnect<P>
704where
705 P: Prune,
706{
707 async fn prune(
708 &self,
709 _ctx: Context,
710 request: buffa::view::OwnedView<
711 exoware_proto::store::compact::v1::PruneRequestView<'static>,
712 >,
713 ) -> connectrpc::ServiceResult<PruneResponse> {
714 validate::validate_prune_request(&request)?;
715 let document = exoware_proto::parse_and_validate_policy_document(&request)
716 .map_err(|e| ConnectError::invalid_argument(e.to_string()))?;
717
718 self.state
719 .prune
720 .apply_prune_policies(document)
721 .await
722 .map_err(ConnectError::internal)?;
723 connectrpc::Response::ok(PruneResponse::default())
724 }
725}
726
727pub struct StreamConnect<B> {
728 state: StreamState<B>,
729}
730
731impl<B> Clone for StreamConnect<B> {
732 fn clone(&self) -> Self {
733 Self {
734 state: self.state.clone(),
735 }
736 }
737}
738
739impl<B> StreamConnect<B>
740where
741 B: Log,
742{
743 pub fn new(state: impl Into<StreamState<B>>) -> Self {
744 Self {
745 state: state.into(),
746 }
747 }
748
749 fn batch_evicted_connect_error(oldest_retained: Option<u64>) -> ConnectError {
750 let mut metadata = HashMap::new();
751 if let Some(v) = oldest_retained {
752 metadata.insert(
753 crate::stream::METADATA_OLDEST_RETAINED.to_string(),
754 v.to_string(),
755 );
756 }
757 with_error_info_detail(
758 ConnectError::out_of_range("batch has been evicted from the log"),
759 ErrorInfo {
760 reason: crate::stream::REASON_BATCH_EVICTED.to_string(),
761 domain: crate::stream::STREAM_ERROR_DOMAIN.to_string(),
762 metadata,
763 ..Default::default()
764 },
765 )
766 }
767
768 fn batch_evicted_error(&self, oldest_retained: Option<u64>) -> ConnectError {
769 Self::batch_evicted_connect_error(oldest_retained)
770 }
771
772 fn batch_not_found_error(&self) -> ConnectError {
773 with_error_info_detail(
774 ConnectError::not_found("batch not found"),
775 ErrorInfo {
776 reason: crate::stream::REASON_BATCH_NOT_FOUND.to_string(),
777 domain: crate::stream::STREAM_ERROR_DOMAIN.to_string(),
778 ..Default::default()
779 },
780 )
781 }
782}
783
784fn filtered_subscribe_response(
785 seq: u64,
786 kvs: &[(Bytes, Bytes)],
787 matchers: &crate::stream::CompiledMatchers,
788) -> Option<SubscribeResponse> {
789 let entries = crate::stream::apply_filter(matchers, kvs);
790 (!entries.is_empty()).then_some(SubscribeResponse {
791 sequence_number: seq,
792 entries,
793 ..Default::default()
794 })
795}
796
797struct ReplayState {
798 next_sequence: u64,
799 bound: u64,
800 first_batch: Option<Vec<(Bytes, Bytes)>>,
801}
802
803enum ReplayProgress {
804 Frame(SubscribeResponse),
805 Advanced,
806 Done,
807}
808
809enum LiveProgress {
810 Frame(SubscribeResponse),
811 Advanced,
812 NeedWait,
813}
814
815struct SubscriptionState<B> {
816 state: StreamState<B>,
817 matchers: crate::stream::CompiledMatchers,
818 replay: Option<ReplayState>,
819 next_live_sequence: u64,
820 live_notify: Arc<Notify>,
821 terminated: bool,
822}
823
824impl<B> SubscriptionState<B>
825where
826 B: Log,
827{
828 fn new(
829 state: StreamState<B>,
830 matchers: crate::stream::CompiledMatchers,
831 replay: Option<ReplayState>,
832 next_live_sequence: u64,
833 live_notify: Arc<Notify>,
834 ) -> Self {
835 Self {
836 state,
837 matchers,
838 replay,
839 next_live_sequence,
840 live_notify,
841 terminated: false,
842 }
843 }
844
845 fn into_stream(
846 self,
847 ) -> Pin<Box<dyn Stream<Item = Result<SubscribeResponse, ConnectError>> + Send>> {
848 Box::pin(stream_util::unfold(self, |mut state| async move {
849 loop {
850 if state.terminated {
851 return None;
852 }
853
854 if state.replay.is_some() {
855 match state.next_replay_frame().await {
856 Ok(ReplayProgress::Frame(frame)) => return Some((Ok(frame), state)),
857 Ok(ReplayProgress::Advanced) => continue,
858 Ok(ReplayProgress::Done) => {}
859 Err(err) => {
860 state.terminated = true;
861 return Some((Err(err), state));
862 }
863 }
864 }
865
866 match state.next_live_frame().await {
867 Ok(LiveProgress::Frame(frame)) => return Some((Ok(frame), state)),
868 Ok(LiveProgress::Advanced) => continue,
869 Ok(LiveProgress::NeedWait) => {
870 state.wait_for_live().await;
871 }
872 Err(err) => {
873 state.terminated = true;
874 return Some((Err(err), state));
875 }
876 }
877 }
878 }))
879 }
880
881 async fn next_replay_frame(&mut self) -> Result<ReplayProgress, ConnectError> {
882 let Some(replay) = &mut self.replay else {
883 return Ok(ReplayProgress::Done);
884 };
885 let seq = replay.next_sequence;
886 let kvs = if let Some(first_batch) = replay.first_batch.take() {
887 Some(first_batch)
888 } else {
889 self.state
890 .log
891 .get_batch(seq)
892 .await
893 .map_err(ConnectError::internal)?
894 };
895 replay.next_sequence += 1;
896 if replay.next_sequence > replay.bound {
897 self.replay = None;
898 }
899 let Some(kvs) = kvs else {
900 let oldest = self
901 .state
902 .log
903 .oldest_retained_batch()
904 .await
905 .map_err(ConnectError::internal)?;
906 return Err(StreamConnect::<B>::batch_evicted_connect_error(oldest));
907 };
908 Ok(
909 match filtered_subscribe_response(seq, &kvs, &self.matchers) {
910 Some(frame) => ReplayProgress::Frame(frame),
911 None => ReplayProgress::Advanced,
912 },
913 )
914 }
915
916 async fn next_live_frame(&mut self) -> Result<LiveProgress, ConnectError> {
917 let current = self.state.notifier.current_sequence();
918 if self.next_live_sequence > current {
919 return Ok(LiveProgress::NeedWait);
920 }
921 let seq = self.next_live_sequence;
922 self.next_live_sequence += 1;
923 let kvs = self
924 .state
925 .log
926 .get_batch(seq)
927 .await
928 .map_err(ConnectError::internal)?;
929 let Some(kvs) = kvs else {
930 let oldest = self
931 .state
932 .log
933 .oldest_retained_batch()
934 .await
935 .map_err(ConnectError::internal)?;
936 return Err(StreamConnect::<B>::batch_evicted_connect_error(oldest));
937 };
938 Ok(
939 match filtered_subscribe_response(seq, &kvs, &self.matchers) {
940 Some(frame) => LiveProgress::Frame(frame),
941 None => LiveProgress::Advanced,
942 },
943 )
944 }
945
946 async fn wait_for_live(&self) {
947 if self.next_live_sequence <= self.state.notifier.current_sequence() {
948 return;
949 }
950 let notified = self.live_notify.clone().notified_owned();
951 if self.next_live_sequence <= self.state.notifier.current_sequence() {
952 return;
953 }
954 notified.await;
955 }
956}
957
958fn domain_filter_from_subscribe_view(
959 req: &SubscribeRequestView<'_>,
960) -> Result<StreamFilter, ConnectError> {
961 let mut match_keys = Vec::with_capacity(req.match_keys.len());
962 for mk in req.match_keys.iter() {
963 let reserved_bits = u8::try_from(mk.reserved_bits).map_err(|_| {
964 ConnectError::invalid_argument(format!(
965 "match_key.reserved_bits {} does not fit in u8",
966 mk.reserved_bits
967 ))
968 })?;
969 let prefix = u16::try_from(mk.prefix).map_err(|_| {
970 ConnectError::invalid_argument(format!(
971 "match_key.prefix {} does not fit in u16",
972 mk.prefix
973 ))
974 })?;
975 match_keys.push(MatchKey {
976 reserved_bits,
977 prefix,
978 payload_regex: exoware_sdk::kv_codec::Utf8::from(mk.payload_regex),
979 });
980 }
981 let mut value_filters = Vec::with_capacity(req.value_filters.len());
982 for vf in req.value_filters.iter() {
983 value_filters.push(match vf.kind {
984 Some(ProtoBytesFilterKindView::Exact(bytes)) => {
985 BytesFilter::Exact(Bytes::copy_from_slice(bytes))
986 }
987 Some(ProtoBytesFilterKindView::Prefix(bytes)) => {
988 BytesFilter::Prefix(Bytes::copy_from_slice(bytes))
989 }
990 Some(ProtoBytesFilterKindView::Regex(pattern)) => {
991 BytesFilter::Regex(pattern.to_string())
992 }
993 None => {
994 return Err(ConnectError::invalid_argument(
995 "each value_filter must set exactly one of exact, prefix, or regex",
996 ))
997 }
998 });
999 }
1000 Ok(StreamFilter {
1001 match_keys,
1002 value_filters,
1003 })
1004}
1005
1006impl<B> StreamApi for StreamConnect<B>
1007where
1008 B: Log,
1009{
1010 async fn subscribe(
1011 &self,
1012 _ctx: Context,
1013 request: buffa::view::OwnedView<SubscribeRequestView<'static>>,
1014 ) -> connectrpc::ServiceResult<connectrpc::ServiceStream<SubscribeResponse>> {
1015 let filter = domain_filter_from_subscribe_view(&request)?;
1016 let since = request.since_sequence_number;
1017
1018 let matchers = crate::stream::compile_matchers(&filter)?;
1023 let subscription = self.state.notifier.subscribe();
1024 let replay_bound = subscription.current_sequence;
1025 let live_notify = subscription.notify;
1026
1027 let replay = match since {
1032 Some(s) if s <= replay_bound && s > 0 => {
1033 let first_batch = self
1034 .state
1035 .log
1036 .get_batch(s)
1037 .await
1038 .map_err(ConnectError::internal)?;
1039 let Some(first_batch) = first_batch else {
1040 let oldest = self
1041 .state
1042 .log
1043 .oldest_retained_batch()
1044 .await
1045 .map_err(ConnectError::internal)?;
1046 return Err(self.batch_evicted_error(oldest));
1047 };
1048 Some(ReplayState {
1049 next_sequence: s,
1050 bound: replay_bound,
1051 first_batch: Some(first_batch),
1052 })
1053 }
1054 _ => None,
1055 };
1056 let next_live_sequence = replay_bound.saturating_add(1);
1057
1058 Ok(connectrpc::Response::stream(
1059 SubscriptionState::new(
1060 self.state.clone(),
1061 matchers,
1062 replay,
1063 next_live_sequence,
1064 live_notify,
1065 )
1066 .into_stream(),
1067 ))
1068 }
1069
1070 async fn get(
1071 &self,
1072 _ctx: Context,
1073 request: buffa::view::OwnedView<GetRequestView<'static>>,
1074 ) -> connectrpc::ServiceResult<StreamGetResponse> {
1075 let seq = request.sequence_number;
1076 match self
1077 .state
1078 .log
1079 .get_batch(seq)
1080 .await
1081 .map_err(ConnectError::internal)?
1082 {
1083 Some(kvs) => {
1084 let entries = kvs
1085 .into_iter()
1086 .map(|(k, v)| KvEntry {
1087 key: k.to_vec(),
1088 value: v,
1089 ..Default::default()
1090 })
1091 .collect();
1092 connectrpc::Response::ok(StreamGetResponse {
1093 sequence_number: seq,
1094 entries,
1095 ..Default::default()
1096 })
1097 }
1098 None => {
1099 let current = self.state.log.current_sequence();
1100 if seq > current {
1102 Err(self.batch_not_found_error())
1103 } else {
1104 let oldest = self
1105 .state
1106 .log
1107 .oldest_retained_batch()
1108 .await
1109 .map_err(ConnectError::internal)?;
1110 Err(self.batch_evicted_error(oldest))
1111 }
1112 }
1113 }
1114 }
1115}
1116
1117fn connect_limits() -> Limits {
1118 Limits::default()
1119 .max_request_body_size(MAX_CONNECTRPC_BODY_BYTES)
1120 .max_message_size(MAX_CONNECTRPC_BODY_BYTES)
1121}
1122
1123pub(crate) type IngestService<I> = ConnectRpcService<IngestServiceServer<IngestConnect<I>>>;
1124pub(crate) type QueryService<Q> = ConnectRpcService<QueryServiceServer<QueryConnect<Q>>>;
1125pub(crate) type CompactService<P> = ConnectRpcService<CompactServiceServer<CompactConnect<P>>>;
1126pub(crate) type StreamService<B> = ConnectRpcService<StreamServiceServer<StreamConnect<B>>>;
1127pub(crate) type QueryStack<Q, B> = ConnectRpcService<
1128 Chain<QueryServiceServer<QueryConnect<Q>>, StreamServiceServer<StreamConnect<B>>>,
1129>;
1130pub(crate) type ConnectStack<I, Q, P, B> = ConnectRpcService<
1131 Chain<
1132 IngestServiceServer<IngestConnect<I>>,
1133 Chain<
1134 QueryServiceServer<QueryConnect<Q>>,
1135 Chain<CompactServiceServer<CompactConnect<P>>, StreamServiceServer<StreamConnect<B>>>,
1136 >,
1137 >,
1138>;
1139
1140fn ingest_server<I>(state: IngestState<I>) -> IngestServiceServer<IngestConnect<I>>
1141where
1142 I: Ingest,
1143{
1144 IngestServiceServer::new(IngestConnect::new(state))
1145}
1146
1147fn query_server<Q>(state: QueryState<Q>) -> QueryServiceServer<QueryConnect<Q>>
1148where
1149 Q: Query,
1150{
1151 QueryServiceServer::new(QueryConnect::new(state))
1152}
1153
1154fn compact_server<P>(state: CompactState<P>) -> CompactServiceServer<CompactConnect<P>>
1155where
1156 P: Prune,
1157{
1158 CompactServiceServer::new(CompactConnect::new(state))
1159}
1160
1161fn stream_server<B>(state: StreamState<B>) -> StreamServiceServer<StreamConnect<B>>
1162where
1163 B: Log,
1164{
1165 StreamServiceServer::new(StreamConnect::new(state))
1166}
1167
1168pub fn ingest_service<I>(state: IngestState<I>) -> IngestService<I>
1169where
1170 I: Ingest,
1171{
1172 ConnectRpcService::new(ingest_server(state))
1173 .with_limits(connect_limits())
1174 .with_compression(connect_compression_registry())
1175}
1176
1177pub fn query_service<Q>(state: QueryState<Q>) -> QueryService<Q>
1178where
1179 Q: Query,
1180{
1181 ConnectRpcService::new(query_server(state))
1182 .with_limits(connect_limits())
1183 .with_compression(connect_compression_registry())
1184}
1185
1186pub fn compact_service<P>(state: CompactState<P>) -> CompactService<P>
1187where
1188 P: Prune,
1189{
1190 ConnectRpcService::new(compact_server(state))
1191 .with_limits(connect_limits())
1192 .with_compression(connect_compression_registry())
1193}
1194
1195pub fn stream_service<B>(state: StreamState<B>) -> StreamService<B>
1196where
1197 B: Log,
1198{
1199 ConnectRpcService::new(stream_server(state))
1200 .with_limits(connect_limits())
1201 .with_compression(connect_compression_registry())
1202}
1203
1204pub fn query_stack<Q, B>(
1205 query_state: QueryState<Q>,
1206 stream_state: StreamState<B>,
1207) -> QueryStack<Q, B>
1208where
1209 Q: Query,
1210 B: Log,
1211{
1212 ConnectRpcService::new(Chain(
1213 query_server(query_state),
1214 stream_server(stream_state),
1215 ))
1216 .with_limits(connect_limits())
1217 .with_compression(connect_compression_registry())
1218}
1219
1220pub fn connect_stack<E>(state: AppState<E>) -> ConnectStack<E, E, E, E>
1221where
1222 E: StoreEngine,
1223{
1224 ConnectRpcService::new(Chain(
1225 ingest_server(state.clone().into()),
1226 Chain(
1227 query_server(state.clone().into()),
1228 Chain(
1229 compact_server(state.clone().into()),
1230 stream_server(state.into()),
1231 ),
1232 ),
1233 ))
1234 .with_limits(connect_limits())
1235 .with_compression(connect_compression_registry())
1236}
1237
1238#[cfg(test)]
1239mod tests {
1240 use super::*;
1241 use std::collections::{BTreeMap, HashMap};
1242 use std::sync::atomic::AtomicU64;
1243 use std::sync::Mutex;
1244 use std::time::Duration;
1245
1246 use buffa::Message;
1247 use exoware_proto::store::common::v1::MatchKey as ProtoMatchKey;
1248 use exoware_proto::store::compact::v1::{
1249 policy, policy_retain, Policy as ProtoPolicy, PolicyRetain, PruneRequest, PruneRequestView,
1250 RetainKeepLatest,
1251 };
1252 use exoware_proto::store::stream::v1::{SubscribeRequest, SubscribeRequestView};
1253 use exoware_sdk::keys::KeyCodec;
1254 use exoware_sdk::kv_codec::KvReducedValue;
1255 use exoware_sdk::prune_policy::{PrunePolicyDocument, PRUNE_POLICY_DOCUMENT_VERSION};
1256 use exoware_sdk::{decode_connect_error, to_domain_reduce_response};
1257 use futures::StreamExt;
1258
1259 use crate::{
1260 Ingest, Log, Prune, Query, QueryExtra, RangeScan, RangeScanBatch, Sequence,
1261 StreamNotification, StreamNotifier,
1262 };
1263
1264 const TEST_RESERVED_BITS: u8 = 4;
1265 const TEST_PREFIX: u16 = 1;
1266
1267 #[derive(Clone)]
1268 struct PublishDuringReplay {
1269 hub: Arc<StreamHub>,
1270 sequence_offset: u64,
1271 kvs: Vec<(Bytes, Bytes)>,
1272 }
1273
1274 #[derive(Default)]
1275 struct FakeEngineState {
1276 current_sequence: u64,
1277 batches: BTreeMap<u64, Option<Vec<(Bytes, Bytes)>>>,
1278 oldest_retained: Option<u64>,
1279 publish_on_get_batch: Option<PublishDuringReplay>,
1280 range_rows: Vec<(Bytes, Bytes)>,
1281 range_eof_extra: QueryExtra,
1282 range_next_count: usize,
1283 query_extra: QueryExtra,
1284 prune_policy_counts: Vec<usize>,
1285 }
1286
1287 #[derive(Default)]
1288 struct FakeEngine {
1289 state: Arc<Mutex<FakeEngineState>>,
1290 }
1291
1292 struct IteratorRangeScan {
1293 iter: Box<dyn Iterator<Item = Result<(Bytes, Bytes), String>> + Send + 'static>,
1294 eof_extra: Option<QueryExtra>,
1295 }
1296
1297 impl RangeScan for IteratorRangeScan {
1298 async fn next_batch(&mut self, max_items: usize) -> Result<RangeScanBatch, String> {
1299 let mut rows = Vec::new();
1300 for row in self.iter.by_ref().take(max_items) {
1301 rows.push(row?);
1302 }
1303 let extra = if rows.is_empty() {
1304 self.eof_extra.take().unwrap_or_default()
1305 } else {
1306 QueryExtra::default()
1307 };
1308 Ok(RangeScanBatch { rows, extra })
1309 }
1310 }
1311
1312 fn range_scan_from_iter<I>(iter: I) -> IteratorRangeScan
1313 where
1314 I: Iterator<Item = Result<(Bytes, Bytes), String>> + Send + 'static,
1315 {
1316 range_scan_from_iter_with_eof_extra(iter, QueryExtra::default())
1317 }
1318
1319 fn range_scan_from_iter_with_eof_extra<I>(iter: I, eof_extra: QueryExtra) -> IteratorRangeScan
1320 where
1321 I: Iterator<Item = Result<(Bytes, Bytes), String>> + Send + 'static,
1322 {
1323 IteratorRangeScan {
1324 iter: Box::new(iter),
1325 eof_extra: Some(eof_extra),
1326 }
1327 }
1328
1329 impl FakeEngine {
1330 fn set_current_sequence(&self, sequence_number: u64) {
1331 self.state.lock().expect("lock").current_sequence = sequence_number;
1332 }
1333
1334 fn set_batch(&self, sequence_number: u64, kvs: Option<Vec<(Bytes, Bytes)>>) {
1335 self.state
1336 .lock()
1337 .expect("lock")
1338 .batches
1339 .insert(sequence_number, kvs);
1340 }
1341
1342 fn set_oldest_retained(&self, oldest_retained: Option<u64>) {
1343 self.state.lock().expect("lock").oldest_retained = oldest_retained;
1344 }
1345
1346 fn publish_live(
1347 &self,
1348 hub: Arc<StreamHub>,
1349 sequence_number: u64,
1350 kvs: Vec<(Bytes, Bytes)>,
1351 ) {
1352 let mut state = self.state.lock().expect("lock");
1353 state.current_sequence = state.current_sequence.max(sequence_number);
1354 state.batches.insert(sequence_number, Some(kvs.clone()));
1355 drop(state);
1356 hub.publish(sequence_number);
1357 }
1358
1359 fn publish_on_every_get_batch(
1360 &self,
1361 hub: Arc<StreamHub>,
1362 sequence_offset: u64,
1363 kvs: Vec<(Bytes, Bytes)>,
1364 ) {
1365 self.state.lock().expect("lock").publish_on_get_batch = Some(PublishDuringReplay {
1366 hub,
1367 sequence_offset,
1368 kvs,
1369 });
1370 }
1371
1372 fn set_range_rows(&self, rows: Vec<(Bytes, Bytes)>) {
1373 self.state.lock().expect("lock").range_rows = rows;
1374 }
1375
1376 fn set_range_eof_extra(&self, extra: QueryExtra) {
1377 self.state.lock().expect("lock").range_eof_extra = extra;
1378 }
1379
1380 fn range_next_count(&self) -> usize {
1381 self.state.lock().expect("lock").range_next_count
1382 }
1383
1384 fn set_query_extra(&self, extra: QueryExtra) {
1385 self.state.lock().expect("lock").query_extra = extra;
1386 }
1387 }
1388
1389 impl Sequence for FakeEngine {
1390 fn current_sequence(&self) -> u64 {
1391 self.state.lock().expect("lock").current_sequence
1392 }
1393 }
1394
1395 impl Ingest for FakeEngine {
1396 async fn put_batch(&self, kvs: Vec<(Bytes, Bytes)>) -> Result<u64, String> {
1397 let mut state = self.state.lock().map_err(|e| e.to_string())?;
1398 state.current_sequence += 1;
1399 let seq = state.current_sequence;
1400 state.batches.insert(seq, Some(kvs));
1401 Ok(seq)
1402 }
1403 }
1404
1405 impl Query for FakeEngine {
1406 type RangeScan = IteratorRangeScan;
1407
1408 async fn get(&self, _key: Bytes) -> Result<(Option<Bytes>, QueryExtra), String> {
1409 self.state
1410 .lock()
1411 .map(|state| (None, state.query_extra.clone()))
1412 .map_err(|e| e.to_string())
1413 }
1414
1415 async fn get_many(
1416 &self,
1417 keys: Vec<Bytes>,
1418 ) -> Result<(Vec<(Bytes, Option<Bytes>)>, QueryExtra), String> {
1419 self.state
1420 .lock()
1421 .map(|state| {
1422 let entries = keys.into_iter().map(|key| (key, None)).collect();
1423 (entries, state.query_extra.clone())
1424 })
1425 .map_err(|e| e.to_string())
1426 }
1427
1428 async fn range_scan(
1429 &self,
1430 _start: Bytes,
1431 _end: Bytes,
1432 _limit: usize,
1433 _forward: bool,
1434 ) -> Result<Self::RangeScan, String> {
1435 let result = self
1436 .state
1437 .lock()
1438 .map(|state| (state.range_rows.clone(), state.range_eof_extra.clone()))
1439 .map_err(|e| e.to_string());
1440 let state = self.state.clone();
1441 let cursor = result.map(|(rows, eof_extra)| {
1442 range_scan_from_iter_with_eof_extra(
1443 rows.into_iter().map(move |row| {
1444 state.lock().expect("lock").range_next_count += 1;
1445 Ok(row)
1446 }),
1447 eof_extra,
1448 )
1449 });
1450 cursor
1451 }
1452 }
1453
1454 impl Prune for FakeEngine {
1455 async fn apply_prune_policies(&self, document: PrunePolicyDocument) -> Result<(), String> {
1456 self.state
1457 .lock()
1458 .map(|mut state| {
1459 state.prune_policy_counts.push(document.policies.len());
1460 })
1461 .map_err(|e| e.to_string())
1462 }
1463 }
1464
1465 impl Log for FakeEngine {
1466 async fn get_batch(
1467 &self,
1468 sequence_number: u64,
1469 ) -> Result<Option<Vec<(Bytes, Bytes)>>, String> {
1470 let result: Result<_, String> = (|| {
1471 let mut state = self.state.lock().map_err(|e| e.to_string())?;
1472 let publish = state.publish_on_get_batch.clone();
1473 if let Some(publish) = publish.as_ref() {
1474 let live_sequence = publish.sequence_offset + sequence_number;
1475 state.current_sequence = state.current_sequence.max(live_sequence);
1476 state
1477 .batches
1478 .entry(live_sequence)
1479 .or_insert_with(|| Some(publish.kvs.clone()));
1480 }
1481 Ok((
1482 publish,
1483 state.batches.get(&sequence_number).cloned().unwrap_or(None),
1484 ))
1485 })();
1486 let (publish, batch) = result?;
1487 if let Some(publish) = publish {
1488 publish
1489 .hub
1490 .publish(publish.sequence_offset + sequence_number);
1491 }
1492 Ok(batch)
1493 }
1494
1495 async fn oldest_retained_batch(&self) -> Result<Option<u64>, String> {
1496 self.state
1497 .lock()
1498 .map(|state| state.oldest_retained)
1499 .map_err(|e| e.to_string())
1500 }
1501 }
1502
1503 struct QueryOnlyEngine {
1504 sequence_number: u64,
1505 value: Option<Bytes>,
1506 }
1507
1508 impl Sequence for QueryOnlyEngine {
1509 fn current_sequence(&self) -> u64 {
1510 self.sequence_number
1511 }
1512 }
1513
1514 impl Query for QueryOnlyEngine {
1515 type RangeScan = IteratorRangeScan;
1516
1517 async fn get(&self, _key: Bytes) -> Result<(Option<Bytes>, QueryExtra), String> {
1518 Ok((self.value.clone(), QueryExtra::default()))
1519 }
1520
1521 async fn range_scan(
1522 &self,
1523 _start: Bytes,
1524 _end: Bytes,
1525 _limit: usize,
1526 _forward: bool,
1527 ) -> Result<Self::RangeScan, String> {
1528 Ok(range_scan_from_iter(std::iter::empty()))
1529 }
1530
1531 async fn get_many(
1532 &self,
1533 keys: Vec<Bytes>,
1534 ) -> Result<(Vec<(Bytes, Option<Bytes>)>, QueryExtra), String> {
1535 Ok((
1536 keys.into_iter().map(|key| (key, None)).collect(),
1537 QueryExtra::default(),
1538 ))
1539 }
1540 }
1541
1542 #[derive(Default)]
1543 struct PruneOnlyEngine {
1544 documents: Mutex<Vec<(u32, usize)>>,
1545 }
1546
1547 impl PruneOnlyEngine {
1548 fn applied_count(&self) -> usize {
1549 self.documents.lock().expect("lock").len()
1550 }
1551
1552 fn last_document(&self) -> Option<(u32, usize)> {
1553 self.documents.lock().expect("lock").last().copied()
1554 }
1555 }
1556
1557 impl Prune for PruneOnlyEngine {
1558 async fn apply_prune_policies(&self, document: PrunePolicyDocument) -> Result<(), String> {
1559 self.documents
1560 .lock()
1561 .map(|mut documents| {
1562 documents.push((document.version, document.policies.len()));
1563 })
1564 .map_err(|e| e.to_string())
1565 }
1566 }
1567
1568 struct ManualNotifier {
1569 current_sequence: AtomicU64,
1570 notify: Arc<Notify>,
1571 }
1572
1573 impl ManualNotifier {
1574 fn new(current_sequence: u64) -> Self {
1575 Self {
1576 current_sequence: AtomicU64::new(current_sequence),
1577 notify: Arc::new(Notify::new()),
1578 }
1579 }
1580 }
1581
1582 impl StreamNotifier for ManualNotifier {
1583 fn subscribe(&self) -> StreamNotification {
1584 StreamNotification {
1585 current_sequence: self.current_sequence.load(Ordering::Acquire),
1586 notify: self.notify.clone(),
1587 }
1588 }
1589
1590 fn current_sequence(&self) -> u64 {
1591 self.current_sequence.load(Ordering::Acquire)
1592 }
1593
1594 fn advance(&self, seq: u64) {
1595 self.current_sequence.fetch_max(seq, Ordering::SeqCst);
1596 self.notify.notify_waiters();
1597 }
1598 }
1599
1600 fn matching_kv(payload: &[u8], value: &[u8]) -> (Bytes, Bytes) {
1601 let codec = KeyCodec::new(TEST_RESERVED_BITS, TEST_PREFIX);
1602 let key = codec.encode(payload).expect("encode key");
1603 (
1604 Bytes::copy_from_slice(key.as_ref()),
1605 Bytes::copy_from_slice(value),
1606 )
1607 }
1608
1609 fn numeric_query_extra(name: &str, value: f64) -> QueryExtra {
1610 HashMap::from([(
1611 name.to_string(),
1612 buffa_types::google::protobuf::Value::from(value),
1613 )])
1614 }
1615
1616 fn subscribe_request_bytes(since_sequence_number: Option<u64>) -> Vec<u8> {
1617 SubscribeRequest {
1618 match_keys: vec![ProtoMatchKey {
1619 reserved_bits: u32::from(TEST_RESERVED_BITS),
1620 prefix: u32::from(TEST_PREFIX),
1621 payload_regex: "(?s).*".to_string(),
1622 ..Default::default()
1623 }],
1624 since_sequence_number,
1625 ..Default::default()
1626 }
1627 .encode_to_vec()
1628 }
1629
1630 fn put_request(
1631 value_len: usize,
1632 ) -> buffa::view::OwnedView<exoware_proto::store::ingest::v1::PutRequestView<'static>> {
1633 let bytes = exoware_proto::ingest::PutRequest {
1634 kvs: vec![exoware_proto::common::KvEntry {
1635 key: b"k".to_vec(),
1636 value: Bytes::from(vec![1u8; value_len]),
1637 ..Default::default()
1638 }],
1639 ..Default::default()
1640 }
1641 .encode_to_vec();
1642 buffa::view::OwnedView::<exoware_proto::store::ingest::v1::PutRequestView<'static>>::decode(
1643 bytes.into(),
1644 )
1645 .expect("decode put request")
1646 }
1647
1648 fn sequence_drop_all_policy() -> ProtoPolicy {
1649 ProtoPolicy {
1650 scope: Some(policy::Scope::Sequence(Box::default())),
1651 retain: Some(PolicyRetain {
1652 kind: Some(policy_retain::Kind::DropAll(Box::default())),
1653 ..Default::default()
1654 })
1655 .into(),
1656 ..Default::default()
1657 }
1658 }
1659
1660 fn sequence_keep_latest_policy(count: u64) -> ProtoPolicy {
1661 ProtoPolicy {
1662 scope: Some(policy::Scope::Sequence(Box::default())),
1663 retain: Some(PolicyRetain {
1664 kind: Some(policy_retain::Kind::KeepLatest(Box::new(
1665 RetainKeepLatest {
1666 count,
1667 ..Default::default()
1668 },
1669 ))),
1670 ..Default::default()
1671 })
1672 .into(),
1673 ..Default::default()
1674 }
1675 }
1676
1677 fn prune_request(
1678 policies: Vec<ProtoPolicy>,
1679 ) -> buffa::view::OwnedView<PruneRequestView<'static>> {
1680 let bytes = PruneRequest {
1681 policies,
1682 ..Default::default()
1683 }
1684 .encode_to_vec();
1685 buffa::view::OwnedView::<PruneRequestView<'static>>::decode(bytes.into())
1686 .expect("decode prune request")
1687 }
1688
1689 async fn subscribe_stream<B>(
1690 connect: &StreamConnect<B>,
1691 since_sequence_number: Option<u64>,
1692 ) -> Result<
1693 Pin<Box<dyn Stream<Item = Result<SubscribeResponse, ConnectError>> + Send>>,
1694 ConnectError,
1695 >
1696 where
1697 B: Log,
1698 {
1699 let bytes = subscribe_request_bytes(since_sequence_number);
1700 let request = buffa::view::OwnedView::<SubscribeRequestView<'static>>::decode(bytes.into())
1701 .expect("decode subscribe request");
1702 Ok(StreamApi::subscribe(connect, Context::default(), request)
1703 .await?
1704 .body)
1705 }
1706
1707 #[tokio::test]
1708 async fn compact_connect_accepts_prune_only_engine() {
1709 let prune = Arc::new(PruneOnlyEngine::default());
1710 let connect = CompactConnect::new(CompactState::new(prune.clone()));
1711 let request = prune_request(vec![sequence_drop_all_policy()]);
1712
1713 CompactApi::prune(&connect, Context::default(), request)
1714 .await
1715 .expect("prune");
1716
1717 assert_eq!(prune.applied_count(), 1);
1718 assert_eq!(
1719 prune.last_document(),
1720 Some((PRUNE_POLICY_DOCUMENT_VERSION, 1))
1721 );
1722 }
1723
1724 #[tokio::test]
1725 async fn compact_rejects_unparseable_policy_before_engine_prune() {
1726 let prune = Arc::new(PruneOnlyEngine::default());
1727 let connect = CompactConnect::new(CompactState::new(prune.clone()));
1728 let invalid_policy = ProtoPolicy {
1729 scope: Some(policy::Scope::Sequence(Box::default())),
1730 ..Default::default()
1731 };
1732 let request = prune_request(vec![invalid_policy]);
1733
1734 let err = CompactApi::prune(&connect, Context::default(), request)
1735 .await
1736 .expect_err("invalid prune");
1737
1738 assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
1739 assert_eq!(prune.applied_count(), 0);
1740 }
1741
1742 #[tokio::test]
1743 async fn compact_rejects_invalid_policy_before_engine_prune() {
1744 let prune = Arc::new(PruneOnlyEngine::default());
1745 let connect = CompactConnect::new(CompactState::new(prune.clone()));
1746 let request = prune_request(vec![sequence_keep_latest_policy(0)]);
1747
1748 let err = CompactApi::prune(&connect, Context::default(), request)
1749 .await
1750 .expect_err("invalid prune");
1751
1752 assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
1753 assert_eq!(prune.applied_count(), 0);
1754 }
1755
1756 #[tokio::test]
1757 async fn query_connect_accepts_query_only_engine() {
1758 let query = Arc::new(QueryOnlyEngine {
1759 sequence_number: 9,
1760 value: Some(Bytes::from_static(b"value")),
1761 });
1762 let connect = QueryConnect::new(QueryState { query });
1763 let bytes = exoware_proto::query::GetRequest {
1764 key: b"k".to_vec(),
1765 ..Default::default()
1766 }
1767 .encode_to_vec();
1768 let request = buffa::view::OwnedView::<
1769 exoware_proto::store::query::v1::GetRequestView<'static>,
1770 >::decode(bytes.into())
1771 .expect("decode get request");
1772
1773 let response = QueryApi::get(&connect, Context::default(), request)
1774 .await
1775 .expect("get")
1776 .body;
1777 let detail = response.detail.as_option().expect("query detail");
1778
1779 assert_eq!(response.value.as_deref(), Some(b"value".as_slice()));
1780 assert_eq!(detail.sequence_number, 9);
1781 }
1782
1783 #[tokio::test]
1784 async fn get_includes_engine_query_extra() {
1785 let engine = Arc::new(FakeEngine::default());
1786 engine.set_current_sequence(5);
1787 engine.set_query_extra(HashMap::from([(
1788 "scanned_bytes".to_string(),
1789 buffa_types::google::protobuf::Value::from(123.0),
1790 )]));
1791 let connect = QueryConnect::new(AppState::new(engine));
1792 let bytes = exoware_proto::query::GetRequest {
1793 key: b"k".to_vec(),
1794 ..Default::default()
1795 }
1796 .encode_to_vec();
1797 let request = buffa::view::OwnedView::<
1798 exoware_proto::store::query::v1::GetRequestView<'static>,
1799 >::decode(bytes.into())
1800 .expect("decode get request");
1801
1802 let response = QueryApi::get(&connect, Context::default(), request)
1803 .await
1804 .expect("get")
1805 .body;
1806 let detail = response.detail.as_option().expect("query detail");
1807
1808 assert_eq!(detail.sequence_number, 5);
1809 assert_eq!(
1810 detail
1811 .extra
1812 .get("scanned_bytes")
1813 .and_then(|v| v.as_number()),
1814 Some(123.0)
1815 );
1816 }
1817
1818 #[test]
1819 fn split_service_constructors_build_independent_process_surfaces() {
1820 let engine = Arc::new(FakeEngine::default());
1821 let state = AppState::new(engine);
1822
1823 let _ingest = ingest_service(state.clone().into());
1824 let _query = query_service(state.clone().into());
1825 let _compact = compact_service(state.clone().into());
1826 let _stream = stream_service(state.clone().into());
1827 let _query_stack = query_stack(state.clone().into(), state.into());
1828 }
1829
1830 #[tokio::test]
1831 async fn ingest_uses_configured_value_limit() {
1832 let engine = Arc::new(FakeEngine::default());
1833 let state = IngestState::new(engine).with_limits(IngestLimits { max_value_len: 4 });
1834 let connect = IngestConnect::new(state);
1835
1836 let err = IngestApi::put(&connect, Context::default(), put_request(5))
1837 .await
1838 .expect_err("put should reject oversized value");
1839
1840 assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
1841 }
1842
1843 #[tokio::test]
1844 async fn stream_can_be_advanced_by_external_notifier() {
1845 let engine = Arc::new(FakeEngine::default());
1846 let notifier = Arc::new(ManualNotifier::new(0));
1847 let connect = StreamConnect::new(StreamState::new(engine.clone(), notifier.clone()));
1848 let mut stream = subscribe_stream(&connect, None).await.expect("subscribe");
1849
1850 engine.set_current_sequence(1);
1851 engine.set_batch(1, Some(vec![matching_kv(b"hit", b"v1")]));
1852 notifier.advance(1);
1853
1854 let frame = tokio::time::timeout(Duration::from_secs(1), stream.next())
1855 .await
1856 .expect("stream should yield")
1857 .expect("frame should exist")
1858 .expect("frame should be ok");
1859 assert_eq!(frame.sequence_number, 1);
1860 assert_eq!(frame.entries.len(), 1);
1861 assert_eq!(frame.entries[0].value.as_ref(), b"v1");
1862 }
1863
1864 #[tokio::test]
1865 async fn reduce_consumes_range_iterator_and_returns_detail() {
1866 let engine = Arc::new(FakeEngine::default());
1867 engine.set_current_sequence(7);
1868 engine.set_range_rows(vec![
1869 (Bytes::from_static(b"a"), Bytes::from_static(b"xx")),
1870 (Bytes::from_static(b"bb"), Bytes::from_static(b"yyy")),
1871 ]);
1872 let connect = QueryConnect::new(AppState::new(engine.clone()));
1873 let bytes = exoware_proto::query::ReduceRequest {
1874 start: b"a".to_vec(),
1875 end: b"z".to_vec(),
1876 params: Some(exoware_proto::query::ReduceParams {
1877 reducers: vec![exoware_proto::query::RangeReducerSpec {
1878 op: exoware_proto::query::RangeReduceOp::RANGE_REDUCE_OP_COUNT_ALL.into(),
1879 ..Default::default()
1880 }],
1881 ..Default::default()
1882 })
1883 .into(),
1884 ..Default::default()
1885 }
1886 .encode_to_vec();
1887 let request = buffa::view::OwnedView::<
1888 exoware_proto::store::query::v1::ReduceRequestView<'static>,
1889 >::decode(bytes.into())
1890 .expect("decode reduce request");
1891
1892 let response = QueryApi::reduce(&connect, Context::default(), request)
1893 .await
1894 .expect("reduce")
1895 .body;
1896 let detail = response.detail.as_option().expect("query detail").clone();
1897 let response = to_domain_reduce_response(response).expect("decode reduce response");
1898
1899 assert_eq!(engine.range_next_count(), 2);
1900 assert_eq!(response.results.len(), 1);
1901 assert_eq!(response.results[0].value, Some(KvReducedValue::UInt64(2)));
1902 assert_eq!(detail.sequence_number, 7);
1903 assert!(detail.extra.is_empty());
1904 }
1905
1906 #[tokio::test]
1907 async fn reduce_uses_eof_query_extra() {
1908 let engine = Arc::new(FakeEngine::default());
1909 engine.set_current_sequence(8);
1910 engine.set_range_rows(vec![
1911 (Bytes::from_static(b"a"), Bytes::from_static(b"xx")),
1912 (Bytes::from_static(b"bb"), Bytes::from_static(b"yyy")),
1913 ]);
1914 engine.set_range_eof_extra(numeric_query_extra("final_rows", 2.0));
1915 let connect = QueryConnect::new(AppState::new(engine));
1916 let bytes = exoware_proto::query::ReduceRequest {
1917 start: b"a".to_vec(),
1918 end: b"z".to_vec(),
1919 params: Some(exoware_proto::query::ReduceParams {
1920 reducers: vec![exoware_proto::query::RangeReducerSpec {
1921 op: exoware_proto::query::RangeReduceOp::RANGE_REDUCE_OP_COUNT_ALL.into(),
1922 ..Default::default()
1923 }],
1924 ..Default::default()
1925 })
1926 .into(),
1927 ..Default::default()
1928 }
1929 .encode_to_vec();
1930 let request = buffa::view::OwnedView::<
1931 exoware_proto::store::query::v1::ReduceRequestView<'static>,
1932 >::decode(bytes.into())
1933 .expect("decode reduce request");
1934
1935 let response = QueryApi::reduce(&connect, Context::default(), request)
1936 .await
1937 .expect("reduce")
1938 .body;
1939 let detail = response.detail.as_option().expect("query detail");
1940
1941 assert_eq!(detail.sequence_number, 8);
1942 assert_eq!(
1943 detail.extra.get("final_rows").and_then(|v| v.as_number()),
1944 Some(2.0)
1945 );
1946 }
1947
1948 #[tokio::test]
1949 async fn get_many_populates_detail_on_each_frame() {
1950 let engine = Arc::new(FakeEngine::default());
1951 engine.set_current_sequence(11);
1952 let connect = QueryConnect::new(AppState::new(engine));
1953 let bytes = exoware_proto::query::GetManyRequest {
1954 keys: vec![b"a".to_vec(), b"bb".to_vec(), b"ccc".to_vec()],
1955 batch_size: 2,
1956 ..Default::default()
1957 }
1958 .encode_to_vec();
1959 let request = buffa::view::OwnedView::<
1960 exoware_proto::store::query::v1::GetManyRequestView<'static>,
1961 >::decode(bytes.into())
1962 .expect("decode get_many request");
1963
1964 let mut stream = QueryApi::get_many(&connect, Context::default(), request)
1965 .await
1966 .expect("get_many")
1967 .body;
1968 let mut frame_sizes = Vec::new();
1969 let mut detail_frames = 0usize;
1970 while let Some(frame) = stream.next().await {
1971 let frame = frame.expect("get_many frame");
1972 frame_sizes.push(frame.results.len());
1973 let detail = frame.detail.as_option().expect("query detail");
1974 assert_eq!(detail.sequence_number, 11);
1975 assert!(detail.extra.is_empty());
1976 detail_frames += 1;
1977 }
1978
1979 assert_eq!(frame_sizes, vec![2, 1]);
1980 assert_eq!(detail_frames, 2);
1981 }
1982
1983 #[tokio::test]
1984 async fn range_returns_without_materializing_full_iterator() {
1985 let engine = Arc::new(FakeEngine::default());
1986 engine.set_current_sequence(9);
1987 engine.set_range_rows(
1988 (0..1000)
1989 .map(|i| {
1990 (
1991 Bytes::from(format!("key-{i:04}")),
1992 Bytes::from_static(b"value"),
1993 )
1994 })
1995 .collect(),
1996 );
1997 let connect = QueryConnect::new(AppState::new(engine.clone()));
1998 let bytes = exoware_proto::query::RangeRequest {
1999 start: b"a".to_vec(),
2000 end: b"z".to_vec(),
2001 limit: Some(1000),
2002 batch_size: 1,
2003 ..Default::default()
2004 }
2005 .encode_to_vec();
2006 let request = buffa::view::OwnedView::<
2007 exoware_proto::store::query::v1::RangeRequestView<'static>,
2008 >::decode(bytes.into())
2009 .expect("decode range request");
2010
2011 let mut stream = QueryApi::range(&connect, Context::default(), request)
2012 .await
2013 .expect("range")
2014 .body;
2015
2016 tokio::time::sleep(Duration::from_millis(50)).await;
2017 let consumed = engine.range_next_count();
2018 assert!(
2019 consumed < 1000,
2020 "range should not consume the full iterator before the response stream is read; consumed {consumed}",
2021 );
2022
2023 let mut rows = 0;
2024 let mut latest_detail = None;
2025 let mut detail_frames = 0usize;
2026 while let Some(frame) = stream.next().await {
2027 let frame = frame.expect("range frame");
2028 rows += frame.results.len();
2029 if let Some(detail) = frame.detail.as_option() {
2030 detail_frames += 1;
2031 latest_detail = Some(detail.clone());
2032 }
2033 }
2034
2035 assert_eq!(rows, 1000);
2036 assert_eq!(detail_frames, 1000);
2037 let detail = latest_detail.expect("query detail");
2038 assert_eq!(detail.sequence_number, 9);
2039 assert!(detail.extra.is_empty());
2040 }
2041
2042 #[tokio::test]
2043 async fn range_emits_eof_query_extra_after_rows() {
2044 let engine = Arc::new(FakeEngine::default());
2045 engine.set_current_sequence(10);
2046 engine.set_range_rows(vec![
2047 (Bytes::from_static(b"a"), Bytes::from_static(b"1")),
2048 (Bytes::from_static(b"b"), Bytes::from_static(b"2")),
2049 ]);
2050 engine.set_range_eof_extra(numeric_query_extra("final_rows", 2.0));
2051 let connect = QueryConnect::new(AppState::new(engine));
2052 let bytes = exoware_proto::query::RangeRequest {
2053 start: b"a".to_vec(),
2054 end: b"z".to_vec(),
2055 limit: Some(2),
2056 batch_size: 2,
2057 ..Default::default()
2058 }
2059 .encode_to_vec();
2060 let request = buffa::view::OwnedView::<
2061 exoware_proto::store::query::v1::RangeRequestView<'static>,
2062 >::decode(bytes.into())
2063 .expect("decode range request");
2064
2065 let mut stream = QueryApi::range(&connect, Context::default(), request)
2066 .await
2067 .expect("range")
2068 .body;
2069 let mut frames = Vec::new();
2070 while let Some(frame) = stream.next().await {
2071 frames.push(frame.expect("range frame"));
2072 }
2073
2074 assert_eq!(frames.len(), 2);
2075 assert_eq!(frames[0].results.len(), 2);
2076 let row_detail = frames[0].detail.as_option().expect("row detail");
2077 assert!(row_detail.extra.is_empty());
2078
2079 assert!(frames[1].results.is_empty());
2080 let final_detail = frames[1].detail.as_option().expect("final detail");
2081 assert_eq!(final_detail.sequence_number, 10);
2082 assert_eq!(
2083 final_detail
2084 .extra
2085 .get("final_rows")
2086 .and_then(|v| v.as_number()),
2087 Some(2.0)
2088 );
2089 }
2090
2091 #[tokio::test]
2092 async fn subscribe_without_replay_reads_the_next_live_batch() {
2093 let engine = Arc::new(FakeEngine::default());
2094 let state = AppState::new(engine.clone());
2095 let connect = StreamConnect::new(state.clone());
2096 let mut stream = subscribe_stream(&connect, None).await.expect("subscribe");
2097 engine.publish_live(state.stream.clone(), 1, vec![matching_kv(b"hit", b"v1")]);
2098 let frame = tokio::time::timeout(Duration::from_secs(1), stream.next())
2099 .await
2100 .expect("stream should yield")
2101 .expect("frame should exist")
2102 .expect("frame should be ok");
2103 assert_eq!(frame.sequence_number, 1);
2104 assert_eq!(frame.entries.len(), 1);
2105 assert_eq!(frame.entries[0].value.as_ref(), b"v1");
2106 }
2107
2108 #[tokio::test]
2109 async fn subscribe_past_end_reads_only_future_live_batches() {
2110 let engine = Arc::new(FakeEngine::default());
2111 engine.set_current_sequence(5);
2112 for seq in 1..=5 {
2113 engine.set_batch(seq, Some(vec![matching_kv(b"seed", b"v")]));
2114 }
2115 let state = AppState::new(engine.clone());
2116 let connect = StreamConnect::new(state.clone());
2117 let mut stream = subscribe_stream(&connect, Some(15))
2118 .await
2119 .expect("subscribe");
2120
2121 assert!(
2122 tokio::time::timeout(Duration::from_millis(200), stream.next())
2123 .await
2124 .is_err(),
2125 "past-end cursor should not replay synthetic or historical frames",
2126 );
2127
2128 engine.publish_live(state.stream.clone(), 6, vec![matching_kv(b"live", b"n")]);
2129 let frame = tokio::time::timeout(Duration::from_secs(1), stream.next())
2130 .await
2131 .expect("stream should yield")
2132 .expect("frame should exist")
2133 .expect("frame should be ok");
2134 assert_eq!(frame.sequence_number, 6);
2135 assert_eq!(frame.entries.len(), 1);
2136 assert_eq!(frame.entries[0].value.as_ref(), b"n");
2137 }
2138
2139 #[tokio::test]
2140 async fn replay_hole_returns_batch_evicted_error_instead_of_empty_frame() {
2141 let engine = Arc::new(FakeEngine::default());
2142 engine.set_current_sequence(3);
2143 engine.set_oldest_retained(Some(2));
2144 engine.set_batch(2, Some(vec![matching_kv(b"replay", b"v2")]));
2145
2146 let state = AppState::new(engine);
2147 let connect = StreamConnect::new(state);
2148 let mut stream = subscribe_stream(&connect, Some(2))
2149 .await
2150 .expect("subscribe");
2151
2152 let first = tokio::time::timeout(Duration::from_secs(1), stream.next())
2153 .await
2154 .expect("stream should yield")
2155 .expect("first replay frame should exist")
2156 .expect("first replay frame should be ok");
2157 assert_eq!(first.sequence_number, 2);
2158 assert_eq!(first.entries.len(), 1);
2159
2160 let err = tokio::time::timeout(Duration::from_secs(1), stream.next())
2161 .await
2162 .expect("stream should yield error")
2163 .expect("error item should exist")
2164 .expect_err("replay hole must be surfaced as an error");
2165 let decoded = decode_connect_error(&err).expect("decode connect error");
2166 assert_eq!(
2167 decoded.error_info.expect("error info").reason,
2168 crate::stream::REASON_BATCH_EVICTED,
2169 );
2170 assert!(
2171 tokio::time::timeout(Duration::from_secs(1), stream.next())
2172 .await
2173 .expect("stream should terminate")
2174 .is_none(),
2175 "stream must terminate after surfacing the replay hole",
2176 );
2177 }
2178
2179 #[tokio::test]
2180 async fn replay_with_live_burst_under_capacity_still_delivers_in_order() {
2181 const REPLAY_BATCHES: u64 = 100;
2182
2183 let engine = Arc::new(FakeEngine::default());
2184 engine.set_current_sequence(REPLAY_BATCHES);
2185 engine.set_oldest_retained(Some(1));
2186 for seq in 1..=REPLAY_BATCHES {
2187 engine.set_batch(seq, Some(vec![matching_kv(b"replay", b"v")]));
2188 }
2189
2190 let state = AppState::new(engine.clone());
2191 engine.publish_on_every_get_batch(
2192 state.stream.clone(),
2193 REPLAY_BATCHES,
2194 vec![matching_kv(b"live", b"tail")],
2195 );
2196
2197 let connect = StreamConnect::new(state);
2198 let mut stream = subscribe_stream(&connect, Some(1))
2199 .await
2200 .expect("subscribe");
2201 let mut sequence_numbers = Vec::with_capacity((REPLAY_BATCHES * 2) as usize);
2202 while sequence_numbers.len() < (REPLAY_BATCHES * 2) as usize {
2203 let frame = tokio::time::timeout(Duration::from_secs(2), stream.next())
2204 .await
2205 .expect("stream should keep yielding")
2206 .expect("frame should exist")
2207 .expect("frame should be ok");
2208 sequence_numbers.push(frame.sequence_number);
2209 }
2210
2211 let expected: Vec<u64> = (1..=(REPLAY_BATCHES * 2)).collect();
2212 assert_eq!(sequence_numbers, expected);
2213 }
2214
2215 #[tokio::test]
2216 async fn replay_large_live_burst_is_paced_by_client_reads() {
2217 const REPLAY_BATCHES: u64 = 300;
2218
2219 let engine = Arc::new(FakeEngine::default());
2220 engine.set_current_sequence(REPLAY_BATCHES);
2221 engine.set_oldest_retained(Some(1));
2222 for seq in 1..=REPLAY_BATCHES {
2223 engine.set_batch(seq, Some(vec![matching_kv(b"replay", b"v")]));
2224 }
2225
2226 let state = AppState::new(engine.clone());
2227 engine.publish_on_every_get_batch(
2228 state.stream.clone(),
2229 REPLAY_BATCHES,
2230 vec![matching_kv(b"live", b"tail")],
2231 );
2232
2233 let connect = StreamConnect::new(state);
2234 let mut stream = subscribe_stream(&connect, Some(1))
2235 .await
2236 .expect("subscribe");
2237 let mut sequence_numbers = Vec::with_capacity((REPLAY_BATCHES * 2) as usize);
2238 while sequence_numbers.len() < (REPLAY_BATCHES * 2) as usize {
2239 let frame = tokio::time::timeout(Duration::from_secs(2), stream.next())
2240 .await
2241 .expect("stream should keep yielding")
2242 .expect("frame should exist")
2243 .expect("frame should be ok");
2244 sequence_numbers.push(frame.sequence_number);
2245 }
2246 let expected: Vec<u64> = (1..=(REPLAY_BATCHES * 2)).collect();
2247 assert_eq!(sequence_numbers, expected);
2248 }
2249}