1use connectrpc::ConnectError;
8use exoware_proto::google::rpc::{bad_request::FieldViolation, BadRequest, ErrorInfo};
9use exoware_proto::{
10 parse_range_traversal_direction, with_bad_request_detail, with_error_info_detail,
11 RangeTraversalModeError,
12};
13use exoware_sdk as exoware_proto;
14use exoware_sdk::keys::{validate_key_size, MAX_KEY_LEN};
15
16pub const DEFAULT_MAX_VALUE_LEN: usize = 10 * 1024 * 1024;
18
19#[derive(Clone, Copy, Debug, Eq, PartialEq)]
21pub struct IngestLimits {
22 pub max_value_len: usize,
24}
25
26impl Default for IngestLimits {
27 fn default() -> Self {
28 Self {
29 max_value_len: DEFAULT_MAX_VALUE_LEN,
30 }
31 }
32}
33
34fn field_error(
35 domain: &str,
36 field: impl Into<String>,
37 description: impl Into<String>,
38 reason: &str,
39 message: impl Into<String>,
40 metadata: impl IntoIterator<Item = (String, String)>,
41) -> ConnectError {
42 let description = description.into();
43 let err = with_bad_request_detail(
44 ConnectError::invalid_argument(message),
45 BadRequest {
46 field_violations: vec![FieldViolation {
47 field: field.into(),
48 description: description.clone(),
49 ..Default::default()
50 }],
51 ..Default::default()
52 },
53 );
54 with_error_info_detail(
55 err,
56 ErrorInfo {
57 reason: reason.to_string(),
58 domain: domain.to_string(),
59 metadata: metadata
60 .into_iter()
61 .chain(std::iter::once(("description".to_string(), description)))
62 .collect(),
63 ..Default::default()
64 },
65 )
66}
67
68fn validate_key_field(domain: &str, field: &str, key: &[u8]) -> Result<(), ConnectError> {
69 validate_key_size(key.len()).map_err(|e| {
70 field_error(
71 domain,
72 field,
73 e.to_string(),
74 "INVALID_KEY_LENGTH",
75 format!("{field} key length is outside store limits"),
76 [("max_key_len".to_string(), MAX_KEY_LEN.to_string())],
77 )
78 })
79}
80
81fn validate_value_field(
82 domain: &str,
83 field: &str,
84 value: &[u8],
85 limits: IngestLimits,
86) -> Result<(), ConnectError> {
87 if value.len() > limits.max_value_len {
88 return Err(field_error(
89 domain,
90 field,
91 format!(
92 "value length {} exceeds maximum {}",
93 value.len(),
94 limits.max_value_len
95 ),
96 "INVALID_VALUE_LENGTH",
97 format!("{field} value length is outside store limits"),
98 [(
99 "max_value_len".to_string(),
100 limits.max_value_len.to_string(),
101 )],
102 ));
103 }
104 Ok(())
105}
106
107pub fn validate_put_request(
110 request: &exoware_proto::store::ingest::v1::PutRequestView<'_>,
111 limits: IngestLimits,
112) -> Result<(), ConnectError> {
113 if request.kvs.is_empty() {
115 return Err(field_error(
116 "store.ingest",
117 "kvs",
118 "at least one key-value pair is required",
119 "INVALID_BATCH",
120 "put request must contain at least one key-value pair",
121 [],
122 ));
123 }
124 for (index, kv) in request.kvs.iter().enumerate() {
126 validate_key_field("store.ingest", &format!("kvs[{index}].key"), kv.key)?;
127 validate_value_field(
128 "store.ingest",
129 &format!("kvs[{index}].value"),
130 kv.value,
131 limits,
132 )?;
133 }
134 Ok(())
135}
136
137pub fn validate_get_request(
140 request: &exoware_proto::store::query::v1::GetRequestView<'_>,
141) -> Result<(), ConnectError> {
142 validate_key_field("store.query", "key", request.key)
144}
145
146pub fn validate_range_request(
147 request: &exoware_proto::store::query::v1::RangeRequestView<'_>,
148) -> Result<(), ConnectError> {
149 validate_key_field("store.query", "start", request.start)?;
151 validate_key_field("store.query", "end", request.end)?;
152 if request.batch_size == 0 {
154 return Err(field_error(
155 "store.query",
156 "batch_size",
157 "batch_size must be greater than 0",
158 "INVALID_BATCH_SIZE",
159 "range batch_size must be positive",
160 [],
161 ));
162 }
163 if let Err(RangeTraversalModeError::UnknownWireValue(v)) =
165 parse_range_traversal_direction(request.mode)
166 {
167 return Err(field_error(
168 "store.query",
169 "mode",
170 format!("unknown TraversalMode enum value {v}"),
171 "INVALID_TRAVERSAL_MODE",
172 "range mode must be TRAVERSAL_MODE_FORWARD (0) or TRAVERSAL_MODE_REVERSE (1)",
173 [],
174 ));
175 }
176 Ok(())
177}
178
179pub fn validate_get_many_request(
180 request: &exoware_proto::store::query::v1::GetManyRequestView<'_>,
181) -> Result<(), ConnectError> {
182 if request.keys.is_empty() {
183 return Err(field_error(
184 "store.query",
185 "keys",
186 "at least one key is required",
187 "INVALID_BATCH",
188 "get_many request must contain at least one key",
189 [],
190 ));
191 }
192 for (index, key) in request.keys.iter().enumerate() {
193 validate_key_field("store.query", &format!("keys[{index}]"), key)?;
194 }
195 if request.batch_size == 0 {
196 return Err(field_error(
197 "store.query",
198 "batch_size",
199 "batch_size must be greater than 0",
200 "INVALID_BATCH_SIZE",
201 "get_many batch_size must be positive",
202 [],
203 ));
204 }
205 Ok(())
206}
207
208pub fn validate_reduce_request(
209 request: &exoware_proto::store::query::v1::ReduceRequestView<'_>,
210) -> Result<(), ConnectError> {
211 validate_key_field("store.query", "start", request.start)?;
213 validate_key_field("store.query", "end", request.end)?;
214 if request.params.reducers.is_empty() && request.params.group_by.is_empty() {
215 return Err(field_error(
216 "store.query",
217 "params",
218 "at least one reducer or group_by field is required",
219 "INVALID_REDUCE_PARAMS",
220 "reduce request must specify at least one reducer or group_by",
221 [],
222 ));
223 }
224 Ok(())
225}
226
227pub fn reduce_params_error(description: impl Into<String>) -> ConnectError {
228 field_error(
229 "store.query",
230 "params",
231 description,
232 "INVALID_REDUCE_PARAMS",
233 "reduce params are invalid",
234 [],
235 )
236}
237
238pub fn validate_prune_request(
241 request: &exoware_proto::store::compact::v1::PruneRequestView<'_>,
242) -> Result<(), ConnectError> {
243 if request.policies.is_empty() {
244 return Err(field_error(
245 "store.compact",
246 "policies",
247 "at least one policy is required",
248 "INVALID_PRUNE_REQUEST",
249 "prune request must contain at least one policy",
250 [],
251 ));
252 }
253 Ok(())
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259 use buffa::view::MessageView as _;
260 use bytes::Bytes;
261
262 fn empty_put_request_bytes() -> Vec<u8> {
263 Vec::new()
264 }
265
266 fn put_request_with_oversized_key() -> Vec<u8> {
267 use buffa::Message;
268 let req = exoware_proto::ingest::PutRequest {
269 kvs: vec![exoware_proto::common::KvEntry {
270 key: vec![0u8; 255],
271 value: Bytes::from_static(&[1]),
272 ..Default::default()
273 }],
274 ..Default::default()
275 };
276 req.encode_to_vec()
277 }
278
279 fn put_request_with_oversized_value() -> Vec<u8> {
280 use buffa::Message;
281 let req = exoware_proto::ingest::PutRequest {
282 kvs: vec![exoware_proto::common::KvEntry {
283 key: vec![0u8; 10],
284 value: Bytes::from(vec![1u8; DEFAULT_MAX_VALUE_LEN + 1]),
285 ..Default::default()
286 }],
287 ..Default::default()
288 };
289 req.encode_to_vec()
290 }
291
292 fn valid_put_request_with_value_len(value_len: usize) -> Vec<u8> {
293 use buffa::Message;
294 let req = exoware_proto::ingest::PutRequest {
295 kvs: vec![exoware_proto::common::KvEntry {
296 key: vec![0u8; 10],
297 value: Bytes::from(vec![1u8; value_len]),
298 ..Default::default()
299 }],
300 ..Default::default()
301 };
302 req.encode_to_vec()
303 }
304
305 #[test]
306 fn put_rejects_empty_batch() {
307 let bytes = empty_put_request_bytes();
308 let view =
309 exoware_proto::store::ingest::v1::PutRequestView::decode_view(&bytes).expect("parse");
310 let err = validate_put_request(&view, IngestLimits::default()).unwrap_err();
311 assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
312 }
313
314 #[test]
315 fn put_rejects_oversized_key() {
316 let bytes = put_request_with_oversized_key();
317 let view =
318 exoware_proto::store::ingest::v1::PutRequestView::decode_view(&bytes).expect("parse");
319 let err = validate_put_request(&view, IngestLimits::default()).unwrap_err();
320 assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
321 }
322
323 #[test]
324 fn put_rejects_oversized_value() {
325 let bytes = put_request_with_oversized_value();
326 let view =
327 exoware_proto::store::ingest::v1::PutRequestView::decode_view(&bytes).expect("parse");
328 let err = validate_put_request(&view, IngestLimits::default()).unwrap_err();
329 assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
330 }
331
332 #[test]
333 fn put_rejects_value_over_custom_limit() {
334 let bytes = valid_put_request_with_value_len(5);
335 let view =
336 exoware_proto::store::ingest::v1::PutRequestView::decode_view(&bytes).expect("parse");
337 let limits = IngestLimits { max_value_len: 4 };
338 let err = validate_put_request(&view, limits).unwrap_err();
339
340 assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
341 }
342
343 #[test]
344 fn put_accepts_valid_request() {
345 let bytes = valid_put_request_with_value_len(1);
346 let view =
347 exoware_proto::store::ingest::v1::PutRequestView::decode_view(&bytes).expect("parse");
348 validate_put_request(&view, IngestLimits::default()).expect("should be valid");
349 }
350
351 fn get_request_bytes(key: &[u8]) -> Vec<u8> {
352 use buffa::Message;
353 exoware_proto::query::GetRequest {
354 key: key.to_vec(),
355 ..Default::default()
356 }
357 .encode_to_vec()
358 }
359
360 #[test]
361 fn get_rejects_oversized_key() {
362 let bytes = get_request_bytes(&[0u8; 255]);
363 let view =
364 exoware_proto::store::query::v1::GetRequestView::decode_view(&bytes).expect("parse");
365 assert!(validate_get_request(&view).is_err());
366 }
367
368 #[test]
369 fn get_accepts_max_key() {
370 let bytes = get_request_bytes(&[0u8; 254]);
371 let view =
372 exoware_proto::store::query::v1::GetRequestView::decode_view(&bytes).expect("parse");
373 validate_get_request(&view).expect("should be valid");
374 }
375
376 fn range_request_bytes(
377 batch_size: u32,
378 mode: impl Into<buffa::EnumValue<exoware_proto::query::TraversalMode>>,
379 ) -> Vec<u8> {
380 use buffa::Message;
381 exoware_proto::query::RangeRequest {
382 start: vec![0],
383 batch_size,
384 mode: mode.into(),
385 ..Default::default()
386 }
387 .encode_to_vec()
388 }
389
390 #[test]
391 fn range_rejects_zero_batch_size() {
392 use exoware_proto::query::TraversalMode;
393 let bytes = range_request_bytes(0, TraversalMode::TRAVERSAL_MODE_FORWARD);
394 let view =
395 exoware_proto::store::query::v1::RangeRequestView::decode_view(&bytes).expect("parse");
396 let err = validate_range_request(&view).unwrap_err();
397 assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
398 }
399
400 #[test]
401 fn range_rejects_unknown_traversal_mode() {
402 let bytes = range_request_bytes(
403 1,
404 buffa::EnumValue::<exoware_proto::query::TraversalMode>::Unknown(99),
405 );
406 let view =
407 exoware_proto::store::query::v1::RangeRequestView::decode_view(&bytes).expect("parse");
408 let err = validate_range_request(&view).unwrap_err();
409 assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
410 }
411
412 #[test]
413 fn range_accepts_valid_request() {
414 use exoware_proto::query::TraversalMode;
415 let bytes = range_request_bytes(100, TraversalMode::TRAVERSAL_MODE_FORWARD);
416 let view =
417 exoware_proto::store::query::v1::RangeRequestView::decode_view(&bytes).expect("parse");
418 validate_range_request(&view).expect("should be valid");
419 }
420
421 fn prune_request_bytes(n_policies: usize) -> Vec<u8> {
422 use buffa::Message;
423 exoware_proto::compact::PruneRequest {
424 policies: (0..n_policies)
425 .map(|_| exoware_proto::compact::Policy::default())
426 .collect(),
427 ..Default::default()
428 }
429 .encode_to_vec()
430 }
431
432 #[test]
433 fn prune_rejects_empty_policies() {
434 let bytes = prune_request_bytes(0);
435 let view = exoware_proto::store::compact::v1::PruneRequestView::decode_view(&bytes)
436 .expect("parse");
437 assert!(validate_prune_request(&view).is_err());
438 }
439
440 #[test]
441 fn prune_accepts_one_policy() {
442 let bytes = prune_request_bytes(1);
443 let view = exoware_proto::store::compact::v1::PruneRequestView::decode_view(&bytes)
444 .expect("parse");
445 validate_prune_request(&view).expect("should be valid");
446 }
447
448 #[test]
449 fn prune_shape_accepts_keep_latest_count_zero() {
450 use buffa::Message;
451 let bytes = exoware_proto::compact::PruneRequest {
452 policies: vec![exoware_proto::compact::Policy {
453 retain: Some(exoware_proto::compact::PolicyRetain {
454 kind: Some(exoware_proto::compact::policy_retain::Kind::KeepLatest(
455 Box::new(exoware_proto::compact::RetainKeepLatest {
456 count: 0,
457 ..Default::default()
458 }),
459 )),
460 ..Default::default()
461 })
462 .into(),
463 ..Default::default()
464 }],
465 ..Default::default()
466 }
467 .encode_to_vec();
468 let view = exoware_proto::store::compact::v1::PruneRequestView::decode_view(&bytes)
469 .expect("parse");
470 validate_prune_request(&view).expect("shape should be valid");
471 }
472
473 fn get_many_request_bytes(keys: &[&[u8]], batch_size: u32) -> Vec<u8> {
476 use buffa::Message;
477 exoware_proto::query::GetManyRequest {
478 keys: keys.iter().map(|k| (*k).to_vec()).collect(),
479 batch_size,
480 ..Default::default()
481 }
482 .encode_to_vec()
483 }
484
485 #[test]
486 fn get_many_rejects_empty_keys() {
487 let bytes = get_many_request_bytes(&[], 10);
488 let view = exoware_proto::store::query::v1::GetManyRequestView::decode_view(&bytes)
489 .expect("parse");
490 let err = validate_get_many_request(&view).unwrap_err();
491 assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
492 }
493
494 #[test]
495 fn get_many_rejects_zero_batch_size() {
496 let bytes = get_many_request_bytes(&[b"a"], 0);
497 let view = exoware_proto::store::query::v1::GetManyRequestView::decode_view(&bytes)
498 .expect("parse");
499 let err = validate_get_many_request(&view).unwrap_err();
500 assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
501 }
502
503 #[test]
504 fn get_many_rejects_oversized_key() {
505 let big = vec![0u8; 255];
506 let bytes = get_many_request_bytes(&[&big], 10);
507 let view = exoware_proto::store::query::v1::GetManyRequestView::decode_view(&bytes)
508 .expect("parse");
509 let err = validate_get_many_request(&view).unwrap_err();
510 assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
511 }
512
513 #[test]
514 fn get_many_accepts_valid_request() {
515 let bytes = get_many_request_bytes(&[b"a", b"b"], 10);
516 let view = exoware_proto::store::query::v1::GetManyRequestView::decode_view(&bytes)
517 .expect("parse");
518 validate_get_many_request(&view).expect("should be valid");
519 }
520
521 fn reduce_request_bytes(n_reducers: usize) -> Vec<u8> {
524 use buffa::Message;
525 exoware_proto::query::ReduceRequest {
526 start: vec![0],
527 end: vec![0],
528 params: Some(exoware_proto::query::ReduceParams {
529 reducers: (0..n_reducers)
530 .map(|_| exoware_proto::query::RangeReducerSpec {
531 op: exoware_proto::query::RangeReduceOp::RANGE_REDUCE_OP_COUNT_ALL.into(),
532 ..Default::default()
533 })
534 .collect(),
535 ..Default::default()
536 })
537 .into(),
538 ..Default::default()
539 }
540 .encode_to_vec()
541 }
542
543 #[test]
544 fn reduce_rejects_empty_params() {
545 let bytes = reduce_request_bytes(0);
546 let view =
547 exoware_proto::store::query::v1::ReduceRequestView::decode_view(&bytes).expect("parse");
548 let err = validate_reduce_request(&view).unwrap_err();
549 assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
550 }
551
552 #[test]
553 fn reduce_rejects_oversized_key() {
554 use buffa::Message;
555 let bytes = exoware_proto::query::ReduceRequest {
556 start: vec![0u8; 255],
557 end: vec![0],
558 params: Some(exoware_proto::query::ReduceParams {
559 reducers: vec![exoware_proto::query::RangeReducerSpec {
560 op: exoware_proto::query::RangeReduceOp::RANGE_REDUCE_OP_COUNT_ALL.into(),
561 ..Default::default()
562 }],
563 ..Default::default()
564 })
565 .into(),
566 ..Default::default()
567 }
568 .encode_to_vec();
569 let view =
570 exoware_proto::store::query::v1::ReduceRequestView::decode_view(&bytes).expect("parse");
571 let err = validate_reduce_request(&view).unwrap_err();
572 assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
573 }
574
575 #[test]
576 fn reduce_accepts_valid_request() {
577 let bytes = reduce_request_bytes(1);
578 let view =
579 exoware_proto::store::query::v1::ReduceRequestView::decode_view(&bytes).expect("parse");
580 validate_reduce_request(&view).expect("should be valid");
581 }
582}