1use std::collections::HashMap;
4use std::pin::Pin;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::Arc;
7
8use connectrpc::{Chain, ConnectError, ConnectRpcService, Context, Limits};
9use exoware_proto::compact::{
10 PruneResponse, Service as CompactApi, ServiceServer as CompactServiceServer,
11};
12use exoware_proto::google::rpc::{ErrorInfo, RetryInfo};
13use exoware_proto::ingest::{
14 PutResponse as ProtoPutResponse, Service as IngestApi, ServiceServer as IngestServiceServer,
15};
16use exoware_proto::query::{
17 Detail, GetManyEntry, GetManyFrame, GetResponse, RangeEntry, RangeFrame, ReduceResponse,
18 Service as QueryApi, ServiceServer as QueryServiceServer,
19};
20use exoware_proto::{
21 connect_compression_registry, encode_query_detail_header_value,
22 parse_range_traversal_direction, to_domain_reduce_request_from_view,
23 to_proto_optional_reduced_value, to_proto_reduced_value, with_error_info_detail,
24 with_query_detail, with_retry_info_detail, RangeTraversalDirection,
25 QUERY_DETAIL_RESPONSE_HEADER,
26};
27use exoware_sdk_rs as exoware_proto;
28use exoware_sdk_rs::keys::Key;
29use futures::{stream, Stream};
30use http::header::HeaderValue;
31use http::HeaderName;
32
33use crate::reduce::reduce_over_rows;
34use crate::validate;
35use crate::StoreEngine;
36
37const MAX_CONNECTRPC_BODY_BYTES: usize = 256 * 1024 * 1024;
38
39fn read_bytes_for_kv_rows<K: AsRef<[u8]>, V: AsRef<[u8]>>(entries: &[(K, V)]) -> u64 {
41 entries
42 .iter()
43 .map(|(k, v)| k.as_ref().len() as u64 + v.as_ref().len() as u64)
44 .sum()
45}
46
47fn read_stats_read_bytes<K: AsRef<[u8]>, V: AsRef<[u8]>>(
48 entries: &[(K, V)],
49) -> HashMap<String, u64> {
50 [("read_bytes".to_string(), read_bytes_for_kv_rows(entries))]
51 .into_iter()
52 .collect()
53}
54
55#[derive(Clone)]
56pub struct AppState {
57 pub engine: Arc<dyn StoreEngine>,
58 pub ready: Arc<AtomicBool>,
61}
62
63impl AppState {
64 pub fn new(engine: Arc<dyn StoreEngine>) -> Self {
65 Self {
66 engine,
67 ready: Arc::new(AtomicBool::new(true)),
68 }
69 }
70}
71
72#[derive(Clone)]
73pub struct IngestConnect {
74 state: AppState,
75}
76
77impl IngestConnect {
78 pub fn new(state: AppState) -> Self {
79 Self { state }
80 }
81}
82
83impl IngestApi for IngestConnect {
84 async fn put(
85 &self,
86 _ctx: Context,
87 request: buffa::view::OwnedView<exoware_proto::store::ingest::v1::PutRequestView<'static>>,
88 ) -> Result<(ProtoPutResponse, Context), ConnectError> {
89 if !self.state.ready.load(Ordering::SeqCst) {
90 return Err(with_error_info_detail(
91 ConnectError::unavailable("ingest is not ready"),
92 ErrorInfo {
93 reason: "WORKER_NOT_READY".to_string(),
94 domain: "store.ingest".to_string(),
95 ..Default::default()
96 },
97 ));
98 }
99
100 validate::validate_put_request(&request)?;
101
102 let wire = request.bytes();
103 let mut batch = Vec::new();
104 for kv in request.kvs.iter() {
105 let key: Key = wire.slice_ref(kv.key);
106 let value = wire.slice_ref(kv.value);
107 batch.push((key, value));
108 }
109
110 let seq = self
111 .state
112 .engine
113 .put_batch(&batch)
114 .map_err(ConnectError::internal)?;
115
116 Ok((
117 ProtoPutResponse {
118 sequence_number: seq,
119 ..Default::default()
120 },
121 Context::default(),
122 ))
123 }
124}
125
126#[derive(Clone)]
127pub struct QueryConnect {
128 state: AppState,
129}
130
131impl QueryConnect {
132 pub fn new(state: AppState) -> Self {
133 Self { state }
134 }
135
136 fn current_sequence_number(&self) -> u64 {
137 self.state.engine.current_sequence()
138 }
139
140 fn error_detail(&self) -> Detail {
141 Detail {
142 sequence_number: self.current_sequence_number(),
143 read_stats: HashMap::new(),
144 ..Default::default()
145 }
146 }
147
148 fn consistency_not_ready_error(&self, required: u64, current: u64) -> ConnectError {
149 let err = with_retry_info_detail(
150 ConnectError::aborted("minimum consistency token is not yet visible"),
151 RetryInfo {
152 retry_delay: Some(buffa_types::google::protobuf::Duration::from(
153 std::time::Duration::from_secs(1),
154 ))
155 .into(),
156 ..Default::default()
157 },
158 );
159 with_query_detail(
160 with_error_info_detail(
161 err,
162 ErrorInfo {
163 reason: "CONSISTENCY_NOT_READY".to_string(),
164 domain: "store.query".to_string(),
165 metadata: [
166 ("required_sequence_number".to_string(), required.to_string()),
167 ("current_sequence_number".to_string(), current.to_string()),
168 ]
169 .into_iter()
170 .collect(),
171 ..Default::default()
172 },
173 ),
174 self.error_detail(),
175 )
176 }
177
178 fn ensure_min_sequence_number(&self, required: Option<u64>) -> Result<u64, ConnectError> {
179 let current = self.current_sequence_number();
180 if let Some(required) = required {
181 if current < required {
182 return Err(self.consistency_not_ready_error(required, current));
183 }
184 }
185 Ok(current)
186 }
187
188 fn apply_query_detail_header(ctx: &mut Context, detail: &Detail) {
189 if let Ok(value) = HeaderValue::from_str(&encode_query_detail_header_value(detail)) {
190 if let Ok(name) = HeaderName::from_bytes(QUERY_DETAIL_RESPONSE_HEADER.as_bytes()) {
191 ctx.response_headers.insert(name, value);
192 }
193 }
194 }
195
196 fn apply_query_detail_trailer(ctx: &mut Context, detail: &Detail) {
197 if let Ok(value) = HeaderValue::from_str(&encode_query_detail_header_value(detail)) {
198 if let Ok(name) = HeaderName::from_bytes(QUERY_DETAIL_RESPONSE_HEADER.as_bytes()) {
199 ctx.set_trailer(name, value);
200 }
201 }
202 }
203}
204
205impl QueryApi for QueryConnect {
206 async fn get(
207 &self,
208 mut ctx: Context,
209 request: buffa::view::OwnedView<exoware_proto::store::query::v1::GetRequestView<'static>>,
210 ) -> Result<(GetResponse, Context), ConnectError> {
211 validate::validate_get_request(&request)?;
212 let token = self.ensure_min_sequence_number(request.min_sequence_number)?;
213 let wire = request.bytes();
214 let key: Key = wire.slice_ref(request.key);
215 let value = self
216 .state
217 .engine
218 .get(key.as_ref())
219 .map_err(ConnectError::internal)?;
220 let read_bytes =
221 key.as_ref().len() as u64 + value.as_ref().map_or(0u64, |v| v.len() as u64);
222 let detail = Detail {
223 sequence_number: token,
224 read_stats: [("read_bytes".to_string(), read_bytes)]
225 .into_iter()
226 .collect(),
227 ..Default::default()
228 };
229 Self::apply_query_detail_header(&mut ctx, &detail);
230 Ok((
231 GetResponse {
232 value,
233 ..Default::default()
234 },
235 ctx,
236 ))
237 }
238
239 async fn get_many(
240 &self,
241 mut ctx: Context,
242 request: buffa::view::OwnedView<
243 exoware_proto::store::query::v1::GetManyRequestView<'static>,
244 >,
245 ) -> Result<
246 (
247 Pin<Box<dyn Stream<Item = Result<GetManyFrame, ConnectError>> + Send>>,
248 Context,
249 ),
250 ConnectError,
251 > {
252 validate::validate_get_many_request(&request)?;
253 let sequence_number = self.ensure_min_sequence_number(request.min_sequence_number)?;
254
255 let key_refs: Vec<&[u8]> = request.keys.iter().copied().collect();
256 let entries = self
257 .state
258 .engine
259 .get_many(&key_refs)
260 .map_err(ConnectError::internal)?;
261 let read_bytes: u64 = entries
262 .iter()
263 .map(|(k, v)| k.len() as u64 + v.as_ref().map_or(0u64, |v| v.len() as u64))
264 .sum();
265 let detail = Detail {
266 sequence_number,
267 read_stats: [("read_bytes".to_string(), read_bytes)]
268 .into_iter()
269 .collect(),
270 ..Default::default()
271 };
272 Self::apply_query_detail_trailer(&mut ctx, &detail);
273
274 let batch_size = request.batch_size as usize;
275 let mut frames = Vec::new();
276 let mut chunk = Vec::new();
277 for (key, value) in entries {
278 chunk.push(GetManyEntry {
279 key,
280 value,
281 ..Default::default()
282 });
283 if chunk.len() >= batch_size {
284 frames.push(Ok(GetManyFrame {
285 results: std::mem::take(&mut chunk),
286 ..Default::default()
287 }));
288 }
289 }
290 if !chunk.is_empty() {
291 frames.push(Ok(GetManyFrame {
292 results: chunk,
293 ..Default::default()
294 }));
295 }
296
297 Ok((Box::pin(stream::iter(frames)), ctx))
298 }
299
300 async fn range(
301 &self,
302 mut ctx: Context,
303 request: buffa::view::OwnedView<exoware_proto::store::query::v1::RangeRequestView<'static>>,
304 ) -> Result<
305 (
306 Pin<Box<dyn Stream<Item = Result<RangeFrame, ConnectError>> + Send>>,
307 Context,
308 ),
309 ConnectError,
310 > {
311 validate::validate_range_request(&request)?;
312 let sequence_number = self.ensure_min_sequence_number(request.min_sequence_number)?;
313 let wire = request.bytes();
314 let start_key: Key = wire.slice_ref(request.start);
315 let end_key: Key = wire.slice_ref(request.end);
316 let limit = request.limit.map(|v| v as usize).unwrap_or(usize::MAX);
317 let batch_size = request.batch_size as usize;
318 let forward = match parse_range_traversal_direction(request.mode) {
319 Ok(RangeTraversalDirection::Forward) => true,
320 Ok(RangeTraversalDirection::Reverse) => false,
321 Err(e) => return Err(ConnectError::internal(format!("traversal mode: {e:?}"))),
322 };
323
324 let entries = self
325 .state
326 .engine
327 .range_scan(start_key.as_ref(), end_key.as_ref(), limit, forward)
328 .map_err(ConnectError::internal)?;
329 let detail = Detail {
330 sequence_number,
331 read_stats: read_stats_read_bytes(&entries),
332 ..Default::default()
333 };
334 Self::apply_query_detail_trailer(&mut ctx, &detail);
335
336 let mut frames = Vec::new();
337 let mut chunk = Vec::new();
338 for (key, value) in entries {
339 chunk.push((key, value));
340 if chunk.len() >= batch_size {
341 frames.push(Ok(RangeFrame {
342 results: chunk
343 .drain(..)
344 .map(|(k, v)| RangeEntry {
345 key: k.into(),
346 value: v.into(),
347 ..Default::default()
348 })
349 .collect(),
350 ..Default::default()
351 }));
352 }
353 }
354 if !chunk.is_empty() {
355 frames.push(Ok(RangeFrame {
356 results: chunk
357 .into_iter()
358 .map(|(k, v)| RangeEntry {
359 key: k.into(),
360 value: v.into(),
361 ..Default::default()
362 })
363 .collect(),
364 ..Default::default()
365 }));
366 }
367
368 Ok((Box::pin(stream::iter(frames)), ctx))
369 }
370
371 async fn reduce(
372 &self,
373 mut ctx: Context,
374 request: buffa::view::OwnedView<
375 exoware_proto::store::query::v1::ReduceRequestView<'static>,
376 >,
377 ) -> Result<(ReduceResponse, Context), ConnectError> {
378 validate::validate_reduce_request(&request)?; let token = self.ensure_min_sequence_number(request.min_sequence_number)?;
380 let wire = request.bytes();
381 let start_key: Key = wire.slice_ref(request.start);
382 let end_key: Key = wire.slice_ref(request.end);
383 let domain = to_domain_reduce_request_from_view(&request.params)
384 .map_err(validate::reduce_params_error)?;
385
386 let rows = self
387 .state
388 .engine
389 .range_scan(start_key.as_ref(), end_key.as_ref(), usize::MAX, true)
390 .map_err(ConnectError::internal)?;
391
392 let response = reduce_over_rows(&rows, &domain)
393 .map_err(|e: crate::RangeError| ConnectError::internal(e.to_string()))?;
394 let detail = Detail {
395 sequence_number: token,
396 read_stats: read_stats_read_bytes(&rows),
397 ..Default::default()
398 };
399 Self::apply_query_detail_header(&mut ctx, &detail);
400
401 Ok((
402 ReduceResponse {
403 results: response
404 .results
405 .into_iter()
406 .map(|result| exoware_proto::query::RangeReduceResult {
407 value: result.value.map(to_proto_reduced_value).into(),
408 ..Default::default()
409 })
410 .collect(),
411 groups: response
412 .groups
413 .into_iter()
414 .map(|group| {
415 let group_values_present =
416 group.group_values.iter().map(Option::is_some).collect();
417 exoware_proto::query::RangeReduceGroup {
418 group_values: group
419 .group_values
420 .into_iter()
421 .map(to_proto_optional_reduced_value)
422 .collect(),
423 group_values_present,
424 results: group
425 .results
426 .into_iter()
427 .map(|result| exoware_proto::query::RangeReduceResult {
428 value: result.value.map(to_proto_reduced_value).into(),
429 ..Default::default()
430 })
431 .collect(),
432 ..Default::default()
433 }
434 })
435 .collect(),
436 ..Default::default()
437 },
438 ctx,
439 ))
440 }
441}
442
443#[derive(Clone)]
444pub struct CompactConnect {
445 state: AppState,
446}
447
448impl CompactConnect {
449 pub fn new(state: AppState) -> Self {
450 Self { state }
451 }
452}
453
454impl CompactApi for CompactConnect {
455 async fn prune(
456 &self,
457 ctx: Context,
458 request: buffa::view::OwnedView<
459 exoware_proto::store::compact::v1::PruneRequestView<'static>,
460 >,
461 ) -> Result<(PruneResponse, Context), ConnectError> {
462 validate::validate_prune_request(&request)?;
463 let document = exoware_proto::prune_policy_document_from_prune_request_view(&request)
464 .map_err(|e| ConnectError::invalid_argument(e.to_string()))?;
465 crate::prune::execute_prune(&self.state.engine, &document)
466 .map_err(|e| ConnectError::internal(e.to_string()))?;
467 Ok((PruneResponse::default(), ctx))
468 }
469}
470
471fn connect_limits() -> Limits {
472 Limits::default()
473 .max_request_body_size(MAX_CONNECTRPC_BODY_BYTES)
474 .max_message_size(MAX_CONNECTRPC_BODY_BYTES)
475}
476
477pub fn connect_stack(
478 state: AppState,
479) -> ConnectRpcService<
480 Chain<
481 IngestServiceServer<IngestConnect>,
482 Chain<QueryServiceServer<QueryConnect>, CompactServiceServer<CompactConnect>>,
483 >,
484> {
485 ConnectRpcService::new(Chain(
486 IngestServiceServer::new(IngestConnect::new(state.clone())),
487 Chain(
488 QueryServiceServer::new(QueryConnect::new(state.clone())),
489 CompactServiceServer::new(CompactConnect::new(state)),
490 ),
491 ))
492 .with_limits(connect_limits())
493 .with_compression(connect_compression_registry())
494}