Skip to main content

exoware_server/
validate.rs

1//! Runtime enforcement of `buf.validate` proto annotations.
2//!
3//! Each function validates one request message type and returns a structured
4//! `ConnectError` (INVALID_ARGUMENT with `BadRequest` + `ErrorInfo` details) on
5//! the first constraint violation.
6
7use connectrpc::ConnectError;
8use exoware_proto::google::rpc::{bad_request::FieldViolation, BadRequest, ErrorInfo};
9use exoware_proto::{
10    parse_range_traversal_direction, with_bad_request_detail, with_error_info_detail,
11    RangeTraversalModeError,
12};
13use exoware_sdk as exoware_proto;
14use exoware_sdk::keys::{validate_key_size, MAX_KEY_LEN};
15
16/// Default maximum value size accepted by ingest validation.
17pub const DEFAULT_MAX_VALUE_LEN: usize = 10 * 1024 * 1024;
18
19/// Limits enforced before ingest requests reach the backend.
20#[derive(Clone, Copy, Debug, Eq, PartialEq)]
21pub struct IngestLimits {
22    /// Maximum accepted size, in bytes, for each value field.
23    pub max_value_len: usize,
24}
25
26impl Default for IngestLimits {
27    fn default() -> Self {
28        Self {
29            max_value_len: DEFAULT_MAX_VALUE_LEN,
30        }
31    }
32}
33
34fn field_error(
35    domain: &str,
36    field: impl Into<String>,
37    description: impl Into<String>,
38    reason: &str,
39    message: impl Into<String>,
40    metadata: impl IntoIterator<Item = (String, String)>,
41) -> ConnectError {
42    let description = description.into();
43    let err = with_bad_request_detail(
44        ConnectError::invalid_argument(message),
45        BadRequest {
46            field_violations: vec![FieldViolation {
47                field: field.into(),
48                description: description.clone(),
49                ..Default::default()
50            }],
51            ..Default::default()
52        },
53    );
54    with_error_info_detail(
55        err,
56        ErrorInfo {
57            reason: reason.to_string(),
58            domain: domain.to_string(),
59            metadata: metadata
60                .into_iter()
61                .chain(std::iter::once(("description".to_string(), description)))
62                .collect(),
63            ..Default::default()
64        },
65    )
66}
67
68fn validate_key_field(domain: &str, field: &str, key: &[u8]) -> Result<(), ConnectError> {
69    validate_key_size(key.len()).map_err(|e| {
70        field_error(
71            domain,
72            field,
73            e.to_string(),
74            "INVALID_KEY_LENGTH",
75            format!("{field} key length is outside store limits"),
76            [("max_key_len".to_string(), MAX_KEY_LEN.to_string())],
77        )
78    })
79}
80
81fn validate_value_field(
82    domain: &str,
83    field: &str,
84    value: &[u8],
85    limits: IngestLimits,
86) -> Result<(), ConnectError> {
87    if value.len() > limits.max_value_len {
88        return Err(field_error(
89            domain,
90            field,
91            format!(
92                "value length {} exceeds maximum {}",
93                value.len(),
94                limits.max_value_len
95            ),
96            "INVALID_VALUE_LENGTH",
97            format!("{field} value length is outside store limits"),
98            [(
99                "max_value_len".to_string(),
100                limits.max_value_len.to_string(),
101            )],
102        ));
103    }
104    Ok(())
105}
106
107// -- ingest --
108
109pub fn validate_put_request(
110    request: &exoware_proto::store::ingest::v1::PutRequestView<'_>,
111    limits: IngestLimits,
112) -> Result<(), ConnectError> {
113    // buf.validate: repeated.min_items = 1
114    if request.kvs.is_empty() {
115        return Err(field_error(
116            "store.ingest",
117            "kvs",
118            "at least one key-value pair is required",
119            "INVALID_BATCH",
120            "put request must contain at least one key-value pair",
121            [],
122        ));
123    }
124    // buf.validate: KvEntry.key bytes.max_len = 254
125    for (index, kv) in request.kvs.iter().enumerate() {
126        validate_key_field("store.ingest", &format!("kvs[{index}].key"), kv.key)?;
127        validate_value_field(
128            "store.ingest",
129            &format!("kvs[{index}].value"),
130            kv.value,
131            limits,
132        )?;
133    }
134    Ok(())
135}
136
137// -- query --
138
139pub fn validate_get_request(
140    request: &exoware_proto::store::query::v1::GetRequestView<'_>,
141) -> Result<(), ConnectError> {
142    // buf.validate: bytes.max_len = 254
143    validate_key_field("store.query", "key", request.key)
144}
145
146pub fn validate_range_request(
147    request: &exoware_proto::store::query::v1::RangeRequestView<'_>,
148) -> Result<(), ConnectError> {
149    // buf.validate: bytes.max_len = 254
150    validate_key_field("store.query", "start", request.start)?;
151    validate_key_field("store.query", "end", request.end)?;
152    // buf.validate: uint32.gt = 0
153    if request.batch_size == 0 {
154        return Err(field_error(
155            "store.query",
156            "batch_size",
157            "batch_size must be greater than 0",
158            "INVALID_BATCH_SIZE",
159            "range batch_size must be positive",
160            [],
161        ));
162    }
163    // buf.validate: enum.defined_only = true
164    if let Err(RangeTraversalModeError::UnknownWireValue(v)) =
165        parse_range_traversal_direction(request.mode)
166    {
167        return Err(field_error(
168            "store.query",
169            "mode",
170            format!("unknown TraversalMode enum value {v}"),
171            "INVALID_TRAVERSAL_MODE",
172            "range mode must be TRAVERSAL_MODE_FORWARD (0) or TRAVERSAL_MODE_REVERSE (1)",
173            [],
174        ));
175    }
176    Ok(())
177}
178
179pub fn validate_get_many_request(
180    request: &exoware_proto::store::query::v1::GetManyRequestView<'_>,
181) -> Result<(), ConnectError> {
182    if request.keys.is_empty() {
183        return Err(field_error(
184            "store.query",
185            "keys",
186            "at least one key is required",
187            "INVALID_BATCH",
188            "get_many request must contain at least one key",
189            [],
190        ));
191    }
192    for (index, key) in request.keys.iter().enumerate() {
193        validate_key_field("store.query", &format!("keys[{index}]"), key)?;
194    }
195    if request.batch_size == 0 {
196        return Err(field_error(
197            "store.query",
198            "batch_size",
199            "batch_size must be greater than 0",
200            "INVALID_BATCH_SIZE",
201            "get_many batch_size must be positive",
202            [],
203        ));
204    }
205    Ok(())
206}
207
208pub fn validate_reduce_request(
209    request: &exoware_proto::store::query::v1::ReduceRequestView<'_>,
210) -> Result<(), ConnectError> {
211    // buf.validate: bytes.max_len = 254
212    validate_key_field("store.query", "start", request.start)?;
213    validate_key_field("store.query", "end", request.end)?;
214    if request.params.reducers.is_empty() && request.params.group_by.is_empty() {
215        return Err(field_error(
216            "store.query",
217            "params",
218            "at least one reducer or group_by field is required",
219            "INVALID_REDUCE_PARAMS",
220            "reduce request must specify at least one reducer or group_by",
221            [],
222        ));
223    }
224    Ok(())
225}
226
227pub fn reduce_params_error(description: impl Into<String>) -> ConnectError {
228    field_error(
229        "store.query",
230        "params",
231        description,
232        "INVALID_REDUCE_PARAMS",
233        "reduce params are invalid",
234        [],
235    )
236}
237
238// -- compact --
239
240pub fn validate_prune_request(
241    request: &exoware_proto::store::compact::v1::PruneRequestView<'_>,
242) -> Result<(), ConnectError> {
243    if request.policies.is_empty() {
244        return Err(field_error(
245            "store.compact",
246            "policies",
247            "at least one policy is required",
248            "INVALID_PRUNE_REQUEST",
249            "prune request must contain at least one policy",
250            [],
251        ));
252    }
253    Ok(())
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259    use buffa::view::MessageView as _;
260    use bytes::Bytes;
261
262    fn empty_put_request_bytes() -> Vec<u8> {
263        Vec::new()
264    }
265
266    fn put_request_with_oversized_key() -> Vec<u8> {
267        use buffa::Message;
268        let req = exoware_proto::ingest::PutRequest {
269            kvs: vec![exoware_proto::common::KvEntry {
270                key: vec![0u8; 255],
271                value: Bytes::from_static(&[1]),
272                ..Default::default()
273            }],
274            ..Default::default()
275        };
276        req.encode_to_vec()
277    }
278
279    fn put_request_with_oversized_value() -> Vec<u8> {
280        use buffa::Message;
281        let req = exoware_proto::ingest::PutRequest {
282            kvs: vec![exoware_proto::common::KvEntry {
283                key: vec![0u8; 10],
284                value: Bytes::from(vec![1u8; DEFAULT_MAX_VALUE_LEN + 1]),
285                ..Default::default()
286            }],
287            ..Default::default()
288        };
289        req.encode_to_vec()
290    }
291
292    fn valid_put_request_with_value_len(value_len: usize) -> Vec<u8> {
293        use buffa::Message;
294        let req = exoware_proto::ingest::PutRequest {
295            kvs: vec![exoware_proto::common::KvEntry {
296                key: vec![0u8; 10],
297                value: Bytes::from(vec![1u8; value_len]),
298                ..Default::default()
299            }],
300            ..Default::default()
301        };
302        req.encode_to_vec()
303    }
304
305    #[test]
306    fn put_rejects_empty_batch() {
307        let bytes = empty_put_request_bytes();
308        let view =
309            exoware_proto::store::ingest::v1::PutRequestView::decode_view(&bytes).expect("parse");
310        let err = validate_put_request(&view, IngestLimits::default()).unwrap_err();
311        assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
312    }
313
314    #[test]
315    fn put_rejects_oversized_key() {
316        let bytes = put_request_with_oversized_key();
317        let view =
318            exoware_proto::store::ingest::v1::PutRequestView::decode_view(&bytes).expect("parse");
319        let err = validate_put_request(&view, IngestLimits::default()).unwrap_err();
320        assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
321    }
322
323    #[test]
324    fn put_rejects_oversized_value() {
325        let bytes = put_request_with_oversized_value();
326        let view =
327            exoware_proto::store::ingest::v1::PutRequestView::decode_view(&bytes).expect("parse");
328        let err = validate_put_request(&view, IngestLimits::default()).unwrap_err();
329        assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
330    }
331
332    #[test]
333    fn put_rejects_value_over_custom_limit() {
334        let bytes = valid_put_request_with_value_len(5);
335        let view =
336            exoware_proto::store::ingest::v1::PutRequestView::decode_view(&bytes).expect("parse");
337        let limits = IngestLimits { max_value_len: 4 };
338        let err = validate_put_request(&view, limits).unwrap_err();
339
340        assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
341    }
342
343    #[test]
344    fn put_accepts_valid_request() {
345        let bytes = valid_put_request_with_value_len(1);
346        let view =
347            exoware_proto::store::ingest::v1::PutRequestView::decode_view(&bytes).expect("parse");
348        validate_put_request(&view, IngestLimits::default()).expect("should be valid");
349    }
350
351    fn get_request_bytes(key: &[u8]) -> Vec<u8> {
352        use buffa::Message;
353        exoware_proto::query::GetRequest {
354            key: key.to_vec(),
355            ..Default::default()
356        }
357        .encode_to_vec()
358    }
359
360    #[test]
361    fn get_rejects_oversized_key() {
362        let bytes = get_request_bytes(&[0u8; 255]);
363        let view =
364            exoware_proto::store::query::v1::GetRequestView::decode_view(&bytes).expect("parse");
365        assert!(validate_get_request(&view).is_err());
366    }
367
368    #[test]
369    fn get_accepts_max_key() {
370        let bytes = get_request_bytes(&[0u8; 254]);
371        let view =
372            exoware_proto::store::query::v1::GetRequestView::decode_view(&bytes).expect("parse");
373        validate_get_request(&view).expect("should be valid");
374    }
375
376    fn range_request_bytes(
377        batch_size: u32,
378        mode: impl Into<buffa::EnumValue<exoware_proto::query::TraversalMode>>,
379    ) -> Vec<u8> {
380        use buffa::Message;
381        exoware_proto::query::RangeRequest {
382            start: vec![0],
383            batch_size,
384            mode: mode.into(),
385            ..Default::default()
386        }
387        .encode_to_vec()
388    }
389
390    #[test]
391    fn range_rejects_zero_batch_size() {
392        use exoware_proto::query::TraversalMode;
393        let bytes = range_request_bytes(0, TraversalMode::TRAVERSAL_MODE_FORWARD);
394        let view =
395            exoware_proto::store::query::v1::RangeRequestView::decode_view(&bytes).expect("parse");
396        let err = validate_range_request(&view).unwrap_err();
397        assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
398    }
399
400    #[test]
401    fn range_rejects_unknown_traversal_mode() {
402        let bytes = range_request_bytes(
403            1,
404            buffa::EnumValue::<exoware_proto::query::TraversalMode>::Unknown(99),
405        );
406        let view =
407            exoware_proto::store::query::v1::RangeRequestView::decode_view(&bytes).expect("parse");
408        let err = validate_range_request(&view).unwrap_err();
409        assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
410    }
411
412    #[test]
413    fn range_accepts_valid_request() {
414        use exoware_proto::query::TraversalMode;
415        let bytes = range_request_bytes(100, TraversalMode::TRAVERSAL_MODE_FORWARD);
416        let view =
417            exoware_proto::store::query::v1::RangeRequestView::decode_view(&bytes).expect("parse");
418        validate_range_request(&view).expect("should be valid");
419    }
420
421    fn prune_request_bytes(n_policies: usize) -> Vec<u8> {
422        use buffa::Message;
423        exoware_proto::compact::PruneRequest {
424            policies: (0..n_policies)
425                .map(|_| exoware_proto::compact::Policy::default())
426                .collect(),
427            ..Default::default()
428        }
429        .encode_to_vec()
430    }
431
432    #[test]
433    fn prune_rejects_empty_policies() {
434        let bytes = prune_request_bytes(0);
435        let view = exoware_proto::store::compact::v1::PruneRequestView::decode_view(&bytes)
436            .expect("parse");
437        assert!(validate_prune_request(&view).is_err());
438    }
439
440    #[test]
441    fn prune_accepts_one_policy() {
442        let bytes = prune_request_bytes(1);
443        let view = exoware_proto::store::compact::v1::PruneRequestView::decode_view(&bytes)
444            .expect("parse");
445        validate_prune_request(&view).expect("should be valid");
446    }
447
448    #[test]
449    fn prune_shape_accepts_keep_latest_count_zero() {
450        use buffa::Message;
451        let bytes = exoware_proto::compact::PruneRequest {
452            policies: vec![exoware_proto::compact::Policy {
453                retain: Some(exoware_proto::compact::PolicyRetain {
454                    kind: Some(exoware_proto::compact::policy_retain::Kind::KeepLatest(
455                        Box::new(exoware_proto::compact::RetainKeepLatest {
456                            count: 0,
457                            ..Default::default()
458                        }),
459                    )),
460                    ..Default::default()
461                })
462                .into(),
463                ..Default::default()
464            }],
465            ..Default::default()
466        }
467        .encode_to_vec();
468        let view = exoware_proto::store::compact::v1::PruneRequestView::decode_view(&bytes)
469            .expect("parse");
470        validate_prune_request(&view).expect("shape should be valid");
471    }
472
473    // -- get_many --
474
475    fn get_many_request_bytes(keys: &[&[u8]], batch_size: u32) -> Vec<u8> {
476        use buffa::Message;
477        exoware_proto::query::GetManyRequest {
478            keys: keys.iter().map(|k| (*k).to_vec()).collect(),
479            batch_size,
480            ..Default::default()
481        }
482        .encode_to_vec()
483    }
484
485    #[test]
486    fn get_many_rejects_empty_keys() {
487        let bytes = get_many_request_bytes(&[], 10);
488        let view = exoware_proto::store::query::v1::GetManyRequestView::decode_view(&bytes)
489            .expect("parse");
490        let err = validate_get_many_request(&view).unwrap_err();
491        assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
492    }
493
494    #[test]
495    fn get_many_rejects_zero_batch_size() {
496        let bytes = get_many_request_bytes(&[b"a"], 0);
497        let view = exoware_proto::store::query::v1::GetManyRequestView::decode_view(&bytes)
498            .expect("parse");
499        let err = validate_get_many_request(&view).unwrap_err();
500        assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
501    }
502
503    #[test]
504    fn get_many_rejects_oversized_key() {
505        let big = vec![0u8; 255];
506        let bytes = get_many_request_bytes(&[&big], 10);
507        let view = exoware_proto::store::query::v1::GetManyRequestView::decode_view(&bytes)
508            .expect("parse");
509        let err = validate_get_many_request(&view).unwrap_err();
510        assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
511    }
512
513    #[test]
514    fn get_many_accepts_valid_request() {
515        let bytes = get_many_request_bytes(&[b"a", b"b"], 10);
516        let view = exoware_proto::store::query::v1::GetManyRequestView::decode_view(&bytes)
517            .expect("parse");
518        validate_get_many_request(&view).expect("should be valid");
519    }
520
521    // -- reduce --
522
523    fn reduce_request_bytes(n_reducers: usize) -> Vec<u8> {
524        use buffa::Message;
525        exoware_proto::query::ReduceRequest {
526            start: vec![0],
527            end: vec![0],
528            params: Some(exoware_proto::query::ReduceParams {
529                reducers: (0..n_reducers)
530                    .map(|_| exoware_proto::query::RangeReducerSpec {
531                        op: exoware_proto::query::RangeReduceOp::RANGE_REDUCE_OP_COUNT_ALL.into(),
532                        ..Default::default()
533                    })
534                    .collect(),
535                ..Default::default()
536            })
537            .into(),
538            ..Default::default()
539        }
540        .encode_to_vec()
541    }
542
543    #[test]
544    fn reduce_rejects_empty_params() {
545        let bytes = reduce_request_bytes(0);
546        let view =
547            exoware_proto::store::query::v1::ReduceRequestView::decode_view(&bytes).expect("parse");
548        let err = validate_reduce_request(&view).unwrap_err();
549        assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
550    }
551
552    #[test]
553    fn reduce_rejects_oversized_key() {
554        use buffa::Message;
555        let bytes = exoware_proto::query::ReduceRequest {
556            start: vec![0u8; 255],
557            end: vec![0],
558            params: Some(exoware_proto::query::ReduceParams {
559                reducers: vec![exoware_proto::query::RangeReducerSpec {
560                    op: exoware_proto::query::RangeReduceOp::RANGE_REDUCE_OP_COUNT_ALL.into(),
561                    ..Default::default()
562                }],
563                ..Default::default()
564            })
565            .into(),
566            ..Default::default()
567        }
568        .encode_to_vec();
569        let view =
570            exoware_proto::store::query::v1::ReduceRequestView::decode_view(&bytes).expect("parse");
571        let err = validate_reduce_request(&view).unwrap_err();
572        assert_eq!(err.code, connectrpc::ErrorCode::InvalidArgument);
573    }
574
575    #[test]
576    fn reduce_accepts_valid_request() {
577        let bytes = reduce_request_bytes(1);
578        let view =
579            exoware_proto::store::query::v1::ReduceRequestView::decode_view(&bytes).expect("parse");
580        validate_reduce_request(&view).expect("should be valid");
581    }
582}