Skip to main content

exoware_server/
connect.rs

1//! Ingest, query, and compact services; storage is provided by [`crate::StoreEngine`].
2
3use std::collections::HashMap;
4use std::pin::Pin;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::Arc;
7
8use connectrpc::{Chain, ConnectError, ConnectRpcService, Context, Limits};
9use exoware_proto::compact::{
10    PruneResponse, Service as CompactApi, ServiceServer as CompactServiceServer,
11};
12use exoware_proto::google::rpc::{ErrorInfo, RetryInfo};
13use exoware_proto::ingest::{
14    PutResponse as ProtoPutResponse, Service as IngestApi, ServiceServer as IngestServiceServer,
15};
16use exoware_proto::query::{
17    Detail, GetManyEntry, GetManyFrame, GetResponse, RangeEntry, RangeFrame, ReduceResponse,
18    Service as QueryApi, ServiceServer as QueryServiceServer,
19};
20use exoware_proto::{
21    connect_compression_registry, encode_query_detail_header_value,
22    parse_range_traversal_direction, to_domain_reduce_request_from_view,
23    to_proto_optional_reduced_value, to_proto_reduced_value, with_error_info_detail,
24    with_query_detail, with_retry_info_detail, RangeTraversalDirection,
25    QUERY_DETAIL_RESPONSE_HEADER,
26};
27use exoware_sdk_rs as exoware_proto;
28use exoware_sdk_rs::keys::Key;
29use futures::{stream, Stream};
30use http::header::HeaderValue;
31use http::HeaderName;
32
33use crate::reduce::reduce_over_rows;
34use crate::validate;
35use crate::StoreEngine;
36
37const MAX_CONNECTRPC_BODY_BYTES: usize = 256 * 1024 * 1024;
38
39/// Total bytes of keys plus values for entries read from the store (reference RocksDB engine).
40fn read_bytes_for_kv_rows<K: AsRef<[u8]>, V: AsRef<[u8]>>(entries: &[(K, V)]) -> u64 {
41    entries
42        .iter()
43        .map(|(k, v)| k.as_ref().len() as u64 + v.as_ref().len() as u64)
44        .sum()
45}
46
47fn read_stats_read_bytes<K: AsRef<[u8]>, V: AsRef<[u8]>>(
48    entries: &[(K, V)],
49) -> HashMap<String, u64> {
50    [("read_bytes".to_string(), read_bytes_for_kv_rows(entries))]
51        .into_iter()
52        .collect()
53}
54
55#[derive(Clone)]
56pub struct AppState {
57    pub engine: Arc<dyn StoreEngine>,
58    /// Gates ingest (writes) only. Query and compact remain available during drains so that
59    /// in-flight reads can complete while the worker sheds write traffic.
60    pub ready: Arc<AtomicBool>,
61}
62
63impl AppState {
64    pub fn new(engine: Arc<dyn StoreEngine>) -> Self {
65        Self {
66            engine,
67            ready: Arc::new(AtomicBool::new(true)),
68        }
69    }
70}
71
72#[derive(Clone)]
73pub struct IngestConnect {
74    state: AppState,
75}
76
77impl IngestConnect {
78    pub fn new(state: AppState) -> Self {
79        Self { state }
80    }
81}
82
83impl IngestApi for IngestConnect {
84    async fn put(
85        &self,
86        _ctx: Context,
87        request: buffa::view::OwnedView<exoware_proto::store::ingest::v1::PutRequestView<'static>>,
88    ) -> Result<(ProtoPutResponse, Context), ConnectError> {
89        if !self.state.ready.load(Ordering::SeqCst) {
90            return Err(with_error_info_detail(
91                ConnectError::unavailable("ingest is not ready"),
92                ErrorInfo {
93                    reason: "WORKER_NOT_READY".to_string(),
94                    domain: "store.ingest".to_string(),
95                    ..Default::default()
96                },
97            ));
98        }
99
100        validate::validate_put_request(&request)?;
101
102        let wire = request.bytes();
103        let mut batch = Vec::new();
104        for kv in request.kvs.iter() {
105            let key: Key = wire.slice_ref(kv.key);
106            let value = wire.slice_ref(kv.value);
107            batch.push((key, value));
108        }
109
110        let seq = self
111            .state
112            .engine
113            .put_batch(&batch)
114            .map_err(ConnectError::internal)?;
115
116        Ok((
117            ProtoPutResponse {
118                sequence_number: seq,
119                ..Default::default()
120            },
121            Context::default(),
122        ))
123    }
124}
125
126#[derive(Clone)]
127pub struct QueryConnect {
128    state: AppState,
129}
130
131impl QueryConnect {
132    pub fn new(state: AppState) -> Self {
133        Self { state }
134    }
135
136    fn current_sequence_number(&self) -> u64 {
137        self.state.engine.current_sequence()
138    }
139
140    fn error_detail(&self) -> Detail {
141        Detail {
142            sequence_number: self.current_sequence_number(),
143            read_stats: HashMap::new(),
144            ..Default::default()
145        }
146    }
147
148    fn consistency_not_ready_error(&self, required: u64, current: u64) -> ConnectError {
149        let err = with_retry_info_detail(
150            ConnectError::aborted("minimum consistency token is not yet visible"),
151            RetryInfo {
152                retry_delay: Some(buffa_types::google::protobuf::Duration::from(
153                    std::time::Duration::from_secs(1),
154                ))
155                .into(),
156                ..Default::default()
157            },
158        );
159        with_query_detail(
160            with_error_info_detail(
161                err,
162                ErrorInfo {
163                    reason: "CONSISTENCY_NOT_READY".to_string(),
164                    domain: "store.query".to_string(),
165                    metadata: [
166                        ("required_sequence_number".to_string(), required.to_string()),
167                        ("current_sequence_number".to_string(), current.to_string()),
168                    ]
169                    .into_iter()
170                    .collect(),
171                    ..Default::default()
172                },
173            ),
174            self.error_detail(),
175        )
176    }
177
178    fn ensure_min_sequence_number(&self, required: Option<u64>) -> Result<u64, ConnectError> {
179        let current = self.current_sequence_number();
180        if let Some(required) = required {
181            if current < required {
182                return Err(self.consistency_not_ready_error(required, current));
183            }
184        }
185        Ok(current)
186    }
187
188    fn apply_query_detail_header(ctx: &mut Context, detail: &Detail) {
189        if let Ok(value) = HeaderValue::from_str(&encode_query_detail_header_value(detail)) {
190            if let Ok(name) = HeaderName::from_bytes(QUERY_DETAIL_RESPONSE_HEADER.as_bytes()) {
191                ctx.response_headers.insert(name, value);
192            }
193        }
194    }
195
196    fn apply_query_detail_trailer(ctx: &mut Context, detail: &Detail) {
197        if let Ok(value) = HeaderValue::from_str(&encode_query_detail_header_value(detail)) {
198            if let Ok(name) = HeaderName::from_bytes(QUERY_DETAIL_RESPONSE_HEADER.as_bytes()) {
199                ctx.set_trailer(name, value);
200            }
201        }
202    }
203}
204
205impl QueryApi for QueryConnect {
206    async fn get(
207        &self,
208        mut ctx: Context,
209        request: buffa::view::OwnedView<exoware_proto::store::query::v1::GetRequestView<'static>>,
210    ) -> Result<(GetResponse, Context), ConnectError> {
211        validate::validate_get_request(&request)?;
212        let token = self.ensure_min_sequence_number(request.min_sequence_number)?;
213        let wire = request.bytes();
214        let key: Key = wire.slice_ref(request.key);
215        let value = self
216            .state
217            .engine
218            .get(key.as_ref())
219            .map_err(ConnectError::internal)?;
220        let read_bytes =
221            key.as_ref().len() as u64 + value.as_ref().map_or(0u64, |v| v.len() as u64);
222        let detail = Detail {
223            sequence_number: token,
224            read_stats: [("read_bytes".to_string(), read_bytes)]
225                .into_iter()
226                .collect(),
227            ..Default::default()
228        };
229        Self::apply_query_detail_header(&mut ctx, &detail);
230        Ok((
231            GetResponse {
232                value,
233                ..Default::default()
234            },
235            ctx,
236        ))
237    }
238
239    async fn get_many(
240        &self,
241        mut ctx: Context,
242        request: buffa::view::OwnedView<
243            exoware_proto::store::query::v1::GetManyRequestView<'static>,
244        >,
245    ) -> Result<
246        (
247            Pin<Box<dyn Stream<Item = Result<GetManyFrame, ConnectError>> + Send>>,
248            Context,
249        ),
250        ConnectError,
251    > {
252        validate::validate_get_many_request(&request)?;
253        let sequence_number = self.ensure_min_sequence_number(request.min_sequence_number)?;
254
255        let key_refs: Vec<&[u8]> = request.keys.iter().copied().collect();
256        let entries = self
257            .state
258            .engine
259            .get_many(&key_refs)
260            .map_err(ConnectError::internal)?;
261        let read_bytes: u64 = entries
262            .iter()
263            .map(|(k, v)| k.len() as u64 + v.as_ref().map_or(0u64, |v| v.len() as u64))
264            .sum();
265        let detail = Detail {
266            sequence_number,
267            read_stats: [("read_bytes".to_string(), read_bytes)]
268                .into_iter()
269                .collect(),
270            ..Default::default()
271        };
272        Self::apply_query_detail_trailer(&mut ctx, &detail);
273
274        let batch_size = request.batch_size as usize;
275        let mut frames = Vec::new();
276        let mut chunk = Vec::new();
277        for (key, value) in entries {
278            chunk.push(GetManyEntry {
279                key,
280                value,
281                ..Default::default()
282            });
283            if chunk.len() >= batch_size {
284                frames.push(Ok(GetManyFrame {
285                    results: std::mem::take(&mut chunk),
286                    ..Default::default()
287                }));
288            }
289        }
290        if !chunk.is_empty() {
291            frames.push(Ok(GetManyFrame {
292                results: chunk,
293                ..Default::default()
294            }));
295        }
296
297        Ok((Box::pin(stream::iter(frames)), ctx))
298    }
299
300    async fn range(
301        &self,
302        mut ctx: Context,
303        request: buffa::view::OwnedView<exoware_proto::store::query::v1::RangeRequestView<'static>>,
304    ) -> Result<
305        (
306            Pin<Box<dyn Stream<Item = Result<RangeFrame, ConnectError>> + Send>>,
307            Context,
308        ),
309        ConnectError,
310    > {
311        validate::validate_range_request(&request)?;
312        let sequence_number = self.ensure_min_sequence_number(request.min_sequence_number)?;
313        let wire = request.bytes();
314        let start_key: Key = wire.slice_ref(request.start);
315        let end_key: Key = wire.slice_ref(request.end);
316        let limit = request.limit.map(|v| v as usize).unwrap_or(usize::MAX);
317        let batch_size = request.batch_size as usize;
318        let forward = match parse_range_traversal_direction(request.mode) {
319            Ok(RangeTraversalDirection::Forward) => true,
320            Ok(RangeTraversalDirection::Reverse) => false,
321            Err(e) => return Err(ConnectError::internal(format!("traversal mode: {e:?}"))),
322        };
323
324        let entries = self
325            .state
326            .engine
327            .range_scan(start_key.as_ref(), end_key.as_ref(), limit, forward)
328            .map_err(ConnectError::internal)?;
329        let detail = Detail {
330            sequence_number,
331            read_stats: read_stats_read_bytes(&entries),
332            ..Default::default()
333        };
334        Self::apply_query_detail_trailer(&mut ctx, &detail);
335
336        let mut frames = Vec::new();
337        let mut chunk = Vec::new();
338        for (key, value) in entries {
339            chunk.push((key, value));
340            if chunk.len() >= batch_size {
341                frames.push(Ok(RangeFrame {
342                    results: chunk
343                        .drain(..)
344                        .map(|(k, v)| RangeEntry {
345                            key: k.into(),
346                            value: v.into(),
347                            ..Default::default()
348                        })
349                        .collect(),
350                    ..Default::default()
351                }));
352            }
353        }
354        if !chunk.is_empty() {
355            frames.push(Ok(RangeFrame {
356                results: chunk
357                    .into_iter()
358                    .map(|(k, v)| RangeEntry {
359                        key: k.into(),
360                        value: v.into(),
361                        ..Default::default()
362                    })
363                    .collect(),
364                ..Default::default()
365            }));
366        }
367
368        Ok((Box::pin(stream::iter(frames)), ctx))
369    }
370
371    async fn reduce(
372        &self,
373        mut ctx: Context,
374        request: buffa::view::OwnedView<
375            exoware_proto::store::query::v1::ReduceRequestView<'static>,
376        >,
377    ) -> Result<(ReduceResponse, Context), ConnectError> {
378        validate::validate_reduce_request(&request)?; // proto-level; reduce_over_rows re-validates per-reducer constraints
379        let token = self.ensure_min_sequence_number(request.min_sequence_number)?;
380        let wire = request.bytes();
381        let start_key: Key = wire.slice_ref(request.start);
382        let end_key: Key = wire.slice_ref(request.end);
383        let domain = to_domain_reduce_request_from_view(&request.params)
384            .map_err(validate::reduce_params_error)?;
385
386        let rows = self
387            .state
388            .engine
389            .range_scan(start_key.as_ref(), end_key.as_ref(), usize::MAX, true)
390            .map_err(ConnectError::internal)?;
391
392        let response = reduce_over_rows(&rows, &domain)
393            .map_err(|e: crate::RangeError| ConnectError::internal(e.to_string()))?;
394        let detail = Detail {
395            sequence_number: token,
396            read_stats: read_stats_read_bytes(&rows),
397            ..Default::default()
398        };
399        Self::apply_query_detail_header(&mut ctx, &detail);
400
401        Ok((
402            ReduceResponse {
403                results: response
404                    .results
405                    .into_iter()
406                    .map(|result| exoware_proto::query::RangeReduceResult {
407                        value: result.value.map(to_proto_reduced_value).into(),
408                        ..Default::default()
409                    })
410                    .collect(),
411                groups: response
412                    .groups
413                    .into_iter()
414                    .map(|group| {
415                        let group_values_present =
416                            group.group_values.iter().map(Option::is_some).collect();
417                        exoware_proto::query::RangeReduceGroup {
418                            group_values: group
419                                .group_values
420                                .into_iter()
421                                .map(to_proto_optional_reduced_value)
422                                .collect(),
423                            group_values_present,
424                            results: group
425                                .results
426                                .into_iter()
427                                .map(|result| exoware_proto::query::RangeReduceResult {
428                                    value: result.value.map(to_proto_reduced_value).into(),
429                                    ..Default::default()
430                                })
431                                .collect(),
432                            ..Default::default()
433                        }
434                    })
435                    .collect(),
436                ..Default::default()
437            },
438            ctx,
439        ))
440    }
441}
442
443#[derive(Clone)]
444pub struct CompactConnect {
445    state: AppState,
446}
447
448impl CompactConnect {
449    pub fn new(state: AppState) -> Self {
450        Self { state }
451    }
452}
453
454impl CompactApi for CompactConnect {
455    async fn prune(
456        &self,
457        ctx: Context,
458        request: buffa::view::OwnedView<
459            exoware_proto::store::compact::v1::PruneRequestView<'static>,
460        >,
461    ) -> Result<(PruneResponse, Context), ConnectError> {
462        validate::validate_prune_request(&request)?;
463        let document = exoware_proto::prune_policy_document_from_prune_request_view(&request)
464            .map_err(|e| ConnectError::invalid_argument(e.to_string()))?;
465        crate::prune::execute_prune(&self.state.engine, &document)
466            .map_err(|e| ConnectError::internal(e.to_string()))?;
467        Ok((PruneResponse::default(), ctx))
468    }
469}
470
471fn connect_limits() -> Limits {
472    Limits::default()
473        .max_request_body_size(MAX_CONNECTRPC_BODY_BYTES)
474        .max_message_size(MAX_CONNECTRPC_BODY_BYTES)
475}
476
477pub fn connect_stack(
478    state: AppState,
479) -> ConnectRpcService<
480    Chain<
481        IngestServiceServer<IngestConnect>,
482        Chain<QueryServiceServer<QueryConnect>, CompactServiceServer<CompactConnect>>,
483    >,
484> {
485    ConnectRpcService::new(Chain(
486        IngestServiceServer::new(IngestConnect::new(state.clone())),
487        Chain(
488            QueryServiceServer::new(QueryConnect::new(state.clone())),
489            CompactServiceServer::new(CompactConnect::new(state)),
490        ),
491    ))
492    .with_limits(connect_limits())
493    .with_compression(connect_compression_registry())
494}