1use std::collections::{HashMap, HashSet};
2
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5
6fn default_filter_fields_json() -> String {
7 "[]".to_owned()
8}
9
10fn default_validation_json() -> String {
11 String::new()
12}
13
14fn default_secondary_indexes_json() -> String {
15 "[]".to_owned()
16}
17
18#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
19#[serde(rename_all = "snake_case")]
20pub enum OperationalCollectionKind {
21 AppendOnlyLog,
22 LatestState,
23}
24
25impl OperationalCollectionKind {
26 #[must_use]
27 pub fn as_str(self) -> &'static str {
28 match self {
29 Self::AppendOnlyLog => "append_only_log",
30 Self::LatestState => "latest_state",
31 }
32 }
33}
34
35impl TryFrom<&str> for OperationalCollectionKind {
36 type Error = String;
37
38 fn try_from(value: &str) -> Result<Self, Self::Error> {
39 match value {
40 "append_only_log" => Ok(Self::AppendOnlyLog),
41 "latest_state" => Ok(Self::LatestState),
42 other => Err(format!("unknown operational collection kind '{other}'")),
43 }
44 }
45}
46
47#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
48pub struct OperationalCollectionRecord {
49 pub name: String,
50 pub kind: OperationalCollectionKind,
51 pub schema_json: String,
52 pub retention_json: String,
53 #[serde(default = "default_filter_fields_json")]
54 pub filter_fields_json: String,
55 #[serde(default = "default_validation_json")]
56 pub validation_json: String,
57 #[serde(default = "default_secondary_indexes_json")]
58 pub secondary_indexes_json: String,
59 pub format_version: i64,
60 pub created_at: i64,
61 pub disabled_at: Option<i64>,
62}
63
64#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
65pub struct OperationalMutationRow {
66 pub id: String,
67 pub collection_name: String,
68 pub record_key: String,
69 pub op_kind: String,
70 pub payload_json: String,
71 pub source_ref: Option<String>,
72 pub created_at: i64,
73}
74
75#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
76pub struct OperationalCurrentRow {
77 pub collection_name: String,
78 pub record_key: String,
79 pub payload_json: String,
80 pub updated_at: i64,
81 pub last_mutation_id: String,
82}
83
84#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
85pub struct OperationalRegisterRequest {
86 pub name: String,
87 pub kind: OperationalCollectionKind,
88 pub schema_json: String,
89 pub retention_json: String,
90 #[serde(default = "default_filter_fields_json")]
91 pub filter_fields_json: String,
92 #[serde(default = "default_validation_json")]
93 pub validation_json: String,
94 #[serde(default = "default_secondary_indexes_json")]
95 pub secondary_indexes_json: String,
96 pub format_version: i64,
97}
98
99#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
100#[serde(rename_all = "snake_case")]
101pub enum OperationalValidationMode {
102 Disabled,
103 ReportOnly,
104 Enforce,
105}
106
107#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
108pub struct OperationalValidationContract {
109 pub format_version: i64,
110 pub mode: OperationalValidationMode,
111 #[serde(default = "default_true")]
112 pub additional_properties: bool,
113 #[serde(default)]
114 pub fields: Vec<OperationalValidationField>,
115}
116
117const fn default_true() -> bool {
118 true
119}
120
121#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
122pub enum OperationalValidationFieldType {
123 #[serde(rename = "string")]
124 String,
125 #[serde(rename = "integer")]
126 Integer,
127 #[serde(rename = "float")]
128 Float,
129 #[serde(rename = "boolean")]
130 Boolean,
131 #[serde(rename = "timestamp")]
132 Timestamp,
133 #[serde(rename = "object")]
134 Object,
135 #[serde(rename = "array[string]")]
136 ArrayString,
137 #[serde(rename = "array[integer]")]
138 ArrayInteger,
139 #[serde(rename = "array[float]")]
140 ArrayFloat,
141 #[serde(rename = "array[boolean]")]
142 ArrayBoolean,
143}
144
145#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
146pub struct OperationalValidationField {
147 pub name: String,
148 #[serde(rename = "type")]
149 pub field_type: OperationalValidationFieldType,
150 #[serde(default)]
151 pub required: bool,
152 #[serde(default)]
153 pub nullable: bool,
154 #[serde(default, rename = "enum")]
155 pub enum_values: Vec<serde_json::Value>,
156 pub minimum: Option<f64>,
157 pub maximum: Option<f64>,
158 pub max_length: Option<usize>,
159 pub max_items: Option<usize>,
160}
161
162#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
163pub struct OperationalHistoryValidationIssue {
164 pub mutation_id: String,
165 pub record_key: String,
166 pub op_kind: String,
167 pub message: String,
168}
169
170#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
171pub struct OperationalHistoryValidationReport {
172 pub collection_name: String,
173 pub checked_rows: usize,
174 pub invalid_row_count: usize,
175 pub issues: Vec<OperationalHistoryValidationIssue>,
176}
177
178#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
179#[serde(rename_all = "snake_case")]
180pub enum OperationalSecondaryIndexValueType {
181 String,
182 Integer,
183 Timestamp,
184}
185
186#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
187pub struct OperationalSecondaryIndexField {
188 pub name: String,
189 pub value_type: OperationalSecondaryIndexValueType,
190}
191
192#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
193#[serde(tag = "kind", rename_all = "snake_case")]
194pub enum OperationalSecondaryIndexDefinition {
195 AppendOnlyFieldTime {
196 name: String,
197 field: String,
198 value_type: OperationalSecondaryIndexValueType,
199 time_field: String,
200 },
201 LatestStateField {
202 name: String,
203 field: String,
204 value_type: OperationalSecondaryIndexValueType,
205 },
206 LatestStateComposite {
207 name: String,
208 fields: Vec<OperationalSecondaryIndexField>,
209 },
210}
211
212#[derive(Clone, Debug, PartialEq, Eq)]
213pub(crate) struct OperationalSecondaryIndexEntry {
214 pub index_name: String,
215 pub sort_timestamp: Option<i64>,
216 pub slot1_text: Option<String>,
217 pub slot1_integer: Option<i64>,
218 pub slot2_text: Option<String>,
219 pub slot2_integer: Option<i64>,
220 pub slot3_text: Option<String>,
221 pub slot3_integer: Option<i64>,
222}
223
224#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
225pub struct OperationalSecondaryIndexRebuildReport {
226 pub collection_name: String,
227 pub mutation_entries_rebuilt: usize,
228 pub current_entries_rebuilt: usize,
229}
230
231#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
232#[serde(rename_all = "snake_case")]
233pub enum OperationalRetentionActionKind {
234 Noop,
235 PurgeBeforeSeconds,
236 KeepLast,
237}
238
239#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
240pub struct OperationalRetentionPlanItem {
241 pub collection_name: String,
242 pub action_kind: OperationalRetentionActionKind,
243 pub candidate_deletions: usize,
244 pub before_timestamp: Option<i64>,
245 pub max_rows: Option<usize>,
246 pub last_run_at: Option<i64>,
247}
248
249#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
250pub struct OperationalRetentionPlanReport {
251 pub planned_at: i64,
252 pub collections_examined: usize,
253 pub items: Vec<OperationalRetentionPlanItem>,
254}
255
256#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
257pub struct OperationalRetentionRunItem {
258 pub collection_name: String,
259 pub action_kind: OperationalRetentionActionKind,
260 pub deleted_mutations: usize,
261 pub before_timestamp: Option<i64>,
262 pub max_rows: Option<usize>,
263 pub rows_remaining: usize,
264}
265
266#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
267pub struct OperationalRetentionRunReport {
268 pub executed_at: i64,
269 pub collections_examined: usize,
270 pub collections_acted_on: usize,
271 pub dry_run: bool,
272 pub items: Vec<OperationalRetentionRunItem>,
273}
274
275pub(crate) fn parse_operational_validation_contract(
276 validation_json: &str,
277) -> Result<Option<OperationalValidationContract>, String> {
278 if validation_json.is_empty() {
279 return Ok(None);
280 }
281 let contract: OperationalValidationContract = serde_json::from_str(validation_json)
282 .map_err(|error| format!("invalid validation_json: {error}"))?;
283 validate_operational_validation_contract(&contract)?;
284 Ok(Some(contract))
285}
286
287pub(crate) fn parse_operational_secondary_indexes_json(
288 secondary_indexes_json: &str,
289 collection_kind: OperationalCollectionKind,
290) -> Result<Vec<OperationalSecondaryIndexDefinition>, String> {
291 let secondary_indexes_json = if secondary_indexes_json.is_empty() {
292 "[]"
293 } else {
294 secondary_indexes_json
295 };
296 let indexes: Vec<OperationalSecondaryIndexDefinition> =
297 serde_json::from_str(secondary_indexes_json)
298 .map_err(|error| format!("invalid secondary_indexes_json: {error}"))?;
299 validate_operational_secondary_indexes(&indexes, collection_kind)?;
300 Ok(indexes)
301}
302
303pub(crate) fn validate_operational_payload_against_contract(
304 contract: &OperationalValidationContract,
305 payload_json: &str,
306) -> Result<(), String> {
307 let payload: Value = serde_json::from_str(payload_json)
308 .map_err(|error| format!("payload_json is not valid JSON: {error}"))?;
309 validate_operational_payload_value(contract, &payload)
310}
311
312fn validate_operational_validation_contract(
313 contract: &OperationalValidationContract,
314) -> Result<(), String> {
315 if contract.format_version != 1 {
316 return Err("validation_json format_version must be 1".to_owned());
317 }
318
319 let mut seen = HashSet::new();
320 for field in &contract.fields {
321 if field.name.trim().is_empty() {
322 return Err("validation_json field names must not be empty".to_owned());
323 }
324 if !seen.insert(field.name.as_str()) {
325 return Err(format!(
326 "validation_json contains duplicate field '{}'",
327 field.name
328 ));
329 }
330 validate_operational_validation_field(field)?;
331 }
332 Ok(())
333}
334
335fn validate_operational_secondary_indexes(
336 indexes: &[OperationalSecondaryIndexDefinition],
337 collection_kind: OperationalCollectionKind,
338) -> Result<(), String> {
339 let mut seen = HashSet::new();
340 for index in indexes {
341 let name = index.name();
342 if name.trim().is_empty() {
343 return Err("secondary_indexes_json index names must not be empty".to_owned());
344 }
345 if !seen.insert(name) {
346 return Err(format!(
347 "secondary_indexes_json contains duplicate index '{name}'"
348 ));
349 }
350 validate_operational_secondary_index(index, collection_kind)?;
351 }
352 Ok(())
353}
354
355fn validate_operational_secondary_index(
356 index: &OperationalSecondaryIndexDefinition,
357 collection_kind: OperationalCollectionKind,
358) -> Result<(), String> {
359 match index {
360 OperationalSecondaryIndexDefinition::AppendOnlyFieldTime {
361 field, time_field, ..
362 } => {
363 if collection_kind != OperationalCollectionKind::AppendOnlyLog {
364 return Err(format!(
365 "secondary index '{}' only supports append_only_log collections",
366 index.name()
367 ));
368 }
369 if field.trim().is_empty() || time_field.trim().is_empty() {
370 return Err(format!(
371 "secondary index '{}' field names must not be empty",
372 index.name()
373 ));
374 }
375 }
376 OperationalSecondaryIndexDefinition::LatestStateField { field, .. } => {
377 if collection_kind != OperationalCollectionKind::LatestState {
378 return Err(format!(
379 "secondary index '{}' only supports latest_state collections",
380 index.name()
381 ));
382 }
383 if field.trim().is_empty() {
384 return Err(format!(
385 "secondary index '{}' field names must not be empty",
386 index.name()
387 ));
388 }
389 }
390 OperationalSecondaryIndexDefinition::LatestStateComposite { fields, .. } => {
391 if collection_kind != OperationalCollectionKind::LatestState {
392 return Err(format!(
393 "secondary index '{}' only supports latest_state collections",
394 index.name()
395 ));
396 }
397 if fields.is_empty() || fields.len() > 3 {
398 return Err(format!(
399 "secondary index '{}' must declare between 1 and 3 fields",
400 index.name()
401 ));
402 }
403 let mut seen = HashSet::new();
404 for field in fields {
405 if field.name.trim().is_empty() {
406 return Err(format!(
407 "secondary index '{}' field names must not be empty",
408 index.name()
409 ));
410 }
411 if !seen.insert(field.name.as_str()) {
412 return Err(format!(
413 "secondary index '{}' contains duplicate field '{}'",
414 index.name(),
415 field.name
416 ));
417 }
418 }
419 }
420 }
421 Ok(())
422}
423
424#[allow(clippy::too_many_lines)]
425fn validate_operational_validation_field(field: &OperationalValidationField) -> Result<(), String> {
426 if let (Some(minimum), Some(maximum)) = (field.minimum, field.maximum)
427 && minimum > maximum
428 {
429 return Err(format!(
430 "validation field '{}' minimum must be less than or equal to maximum",
431 field.name
432 ));
433 }
434
435 match field.field_type {
436 OperationalValidationFieldType::String => {
437 if field.minimum.is_some() || field.maximum.is_some() {
438 return Err(format!(
439 "validation field '{}' only supports minimum/maximum for integer, float, and timestamp types",
440 field.name
441 ));
442 }
443 if field.max_items.is_some() {
444 return Err(format!(
445 "validation field '{}' only supports max_items for array types",
446 field.name
447 ));
448 }
449 }
450 OperationalValidationFieldType::Integer | OperationalValidationFieldType::Timestamp => {
451 if field.max_length.is_some() {
452 return Err(format!(
453 "validation field '{}' only supports max_length for string types",
454 field.name
455 ));
456 }
457 if field.max_items.is_some() {
458 return Err(format!(
459 "validation field '{}' only supports max_items for array types",
460 field.name
461 ));
462 }
463 if let Some(minimum) = field.minimum
464 && minimum.fract() != 0.0
465 {
466 return Err(format!(
467 "validation field '{}' minimum must be an integer for {}",
468 field.name,
469 field.field_type.as_str()
470 ));
471 }
472 if let Some(maximum) = field.maximum
473 && maximum.fract() != 0.0
474 {
475 return Err(format!(
476 "validation field '{}' maximum must be an integer for {}",
477 field.name,
478 field.field_type.as_str()
479 ));
480 }
481 }
482 OperationalValidationFieldType::Float => {
483 if field.max_length.is_some() {
484 return Err(format!(
485 "validation field '{}' only supports max_length for string types",
486 field.name
487 ));
488 }
489 if field.max_items.is_some() {
490 return Err(format!(
491 "validation field '{}' only supports max_items for array types",
492 field.name
493 ));
494 }
495 }
496 OperationalValidationFieldType::Boolean | OperationalValidationFieldType::Object => {
497 if field.minimum.is_some() || field.maximum.is_some() {
498 return Err(format!(
499 "validation field '{}' only supports minimum/maximum for integer, float, and timestamp types",
500 field.name
501 ));
502 }
503 if field.max_length.is_some() {
504 return Err(format!(
505 "validation field '{}' only supports max_length for string types",
506 field.name
507 ));
508 }
509 if field.max_items.is_some() {
510 return Err(format!(
511 "validation field '{}' only supports max_items for array types",
512 field.name
513 ));
514 }
515 }
516 OperationalValidationFieldType::ArrayString
517 | OperationalValidationFieldType::ArrayInteger
518 | OperationalValidationFieldType::ArrayFloat
519 | OperationalValidationFieldType::ArrayBoolean => {
520 if field.minimum.is_some() || field.maximum.is_some() {
521 return Err(format!(
522 "validation field '{}' only supports minimum/maximum for integer, float, and timestamp types",
523 field.name
524 ));
525 }
526 if field.max_length.is_some() {
527 return Err(format!(
528 "validation field '{}' only supports max_length for string types",
529 field.name
530 ));
531 }
532 }
533 }
534
535 if !field.enum_values.is_empty() {
536 for value in &field.enum_values {
537 if !field.field_type.matches_enum_value(value) {
538 return Err(format!(
539 "validation field '{}' has an enum value incompatible with type {}",
540 field.name,
541 field.field_type.as_str()
542 ));
543 }
544 }
545 }
546
547 Ok(())
548}
549
550fn validate_operational_payload_value(
551 contract: &OperationalValidationContract,
552 payload: &Value,
553) -> Result<(), String> {
554 let object = payload
555 .as_object()
556 .ok_or_else(|| "payload must be a JSON object".to_owned())?;
557 let field_map = contract
558 .fields
559 .iter()
560 .map(|field| (field.name.as_str(), field))
561 .collect::<HashMap<_, _>>();
562
563 if !contract.additional_properties {
564 for key in object.keys() {
565 if !field_map.contains_key(key.as_str()) {
566 return Err(format!("field '{key}' is not allowed"));
567 }
568 }
569 }
570
571 for field in &contract.fields {
572 let Some(value) = object.get(&field.name) else {
573 if field.required {
574 return Err(format!("field '{}' is required", field.name));
575 }
576 continue;
577 };
578 validate_operational_field_value(field, value)?;
579 }
580
581 Ok(())
582}
583
584#[allow(clippy::cast_precision_loss)]
585fn validate_operational_field_value(
586 field: &OperationalValidationField,
587 value: &Value,
588) -> Result<(), String> {
589 if value.is_null() {
590 if field.nullable {
591 return Ok(());
592 }
593 return Err(format!("field '{}' must not be null", field.name));
594 }
595
596 match field.field_type {
597 OperationalValidationFieldType::String => {
598 let string_value = value
599 .as_str()
600 .ok_or_else(|| format!("field '{}' must be a string", field.name))?;
601 if let Some(max_length) = field.max_length
602 && string_value.len() > max_length
603 {
604 return Err(format!(
605 "field '{}' must have length <= {}",
606 field.name, max_length
607 ));
608 }
609 validate_enum_membership(field, value)?;
610 }
611 OperationalValidationFieldType::Integer => {
612 let integer_value = value
613 .as_i64()
614 .ok_or_else(|| format!("field '{}' must be an integer", field.name))?;
615 validate_numeric_bounds(field, integer_value as f64)?;
616 validate_enum_membership(field, value)?;
617 }
618 OperationalValidationFieldType::Timestamp => {
619 let timestamp_value = value
620 .as_i64()
621 .ok_or_else(|| format!("field '{}' must be a timestamp integer", field.name))?;
622 validate_numeric_bounds(field, timestamp_value as f64)?;
623 validate_enum_membership(field, value)?;
624 }
625 OperationalValidationFieldType::Float => {
626 let float_value = value
627 .as_f64()
628 .ok_or_else(|| format!("field '{}' must be a float", field.name))?;
629 validate_numeric_bounds(field, float_value)?;
630 validate_enum_membership(field, value)?;
631 }
632 OperationalValidationFieldType::Boolean => {
633 value
634 .as_bool()
635 .ok_or_else(|| format!("field '{}' must be a boolean", field.name))?;
636 validate_enum_membership(field, value)?;
637 }
638 OperationalValidationFieldType::Object => {
639 value
640 .as_object()
641 .ok_or_else(|| format!("field '{}' must be an object", field.name))?;
642 validate_enum_membership(field, value)?;
643 }
644 OperationalValidationFieldType::ArrayString => {
645 validate_array(field, value, |item| item.as_str().is_some(), "string")?;
646 }
647 OperationalValidationFieldType::ArrayInteger => {
648 validate_array(field, value, |item| item.as_i64().is_some(), "integer")?;
649 }
650 OperationalValidationFieldType::ArrayFloat => {
651 validate_array(field, value, |item| item.as_f64().is_some(), "float")?;
652 }
653 OperationalValidationFieldType::ArrayBoolean => {
654 validate_array(field, value, |item| item.as_bool().is_some(), "boolean")?;
655 }
656 }
657 Ok(())
658}
659
660fn validate_array(
661 field: &OperationalValidationField,
662 value: &Value,
663 predicate: impl Fn(&Value) -> bool,
664 expected: &str,
665) -> Result<(), String> {
666 let items = value
667 .as_array()
668 .ok_or_else(|| format!("field '{}' must be an array", field.name))?;
669 if let Some(max_items) = field.max_items
670 && items.len() > max_items
671 {
672 return Err(format!(
673 "field '{}' must have at most {} items",
674 field.name, max_items
675 ));
676 }
677 for item in items {
678 if !predicate(item) {
679 return Err(format!(
680 "field '{}' must contain only {} values",
681 field.name, expected
682 ));
683 }
684 }
685 validate_enum_membership(field, value)?;
686 Ok(())
687}
688
689fn validate_numeric_bounds(
690 field: &OperationalValidationField,
691 numeric_value: f64,
692) -> Result<(), String> {
693 if let Some(minimum) = field.minimum
694 && numeric_value < minimum
695 {
696 return Err(format!("field '{}' must be >= {}", field.name, minimum));
697 }
698 if let Some(maximum) = field.maximum
699 && numeric_value > maximum
700 {
701 return Err(format!("field '{}' must be <= {}", field.name, maximum));
702 }
703 Ok(())
704}
705
706fn validate_enum_membership(
707 field: &OperationalValidationField,
708 value: &Value,
709) -> Result<(), String> {
710 if !field.enum_values.is_empty()
711 && !field.enum_values.iter().any(|candidate| candidate == value)
712 {
713 return Err(format!(
714 "field '{}' must be one of {}",
715 field.name,
716 serde_json::to_string(&field.enum_values).unwrap_or_else(|_| "[]".to_owned())
717 ));
718 }
719 Ok(())
720}
721
722impl OperationalValidationFieldType {
723 fn as_str(self) -> &'static str {
724 match self {
725 Self::String => "string",
726 Self::Integer => "integer",
727 Self::Float => "float",
728 Self::Boolean => "boolean",
729 Self::Timestamp => "timestamp",
730 Self::Object => "object",
731 Self::ArrayString => "array[string]",
732 Self::ArrayInteger => "array[integer]",
733 Self::ArrayFloat => "array[float]",
734 Self::ArrayBoolean => "array[boolean]",
735 }
736 }
737
738 fn matches_enum_value(self, value: &Value) -> bool {
739 match self {
740 Self::String => value.as_str().is_some(),
741 Self::Integer | Self::Timestamp => value.as_i64().is_some(),
742 Self::Float => value.as_f64().is_some(),
743 Self::Boolean => value.as_bool().is_some(),
744 Self::Object => value.as_object().is_some(),
745 Self::ArrayString => value
746 .as_array()
747 .is_some_and(|items| items.iter().all(|item| item.as_str().is_some())),
748 Self::ArrayInteger => value
749 .as_array()
750 .is_some_and(|items| items.iter().all(|item| item.as_i64().is_some())),
751 Self::ArrayFloat => value
752 .as_array()
753 .is_some_and(|items| items.iter().all(|item| item.as_f64().is_some())),
754 Self::ArrayBoolean => value
755 .as_array()
756 .is_some_and(|items| items.iter().all(|item| item.as_bool().is_some())),
757 }
758 }
759}
760
761impl OperationalSecondaryIndexDefinition {
762 #[must_use]
763 pub fn name(&self) -> &str {
764 match self {
765 Self::AppendOnlyFieldTime { name, .. }
766 | Self::LatestStateField { name, .. }
767 | Self::LatestStateComposite { name, .. } => name,
768 }
769 }
770}
771
772pub(crate) fn extract_secondary_index_entries_for_mutation(
773 indexes: &[OperationalSecondaryIndexDefinition],
774 payload_json: &str,
775) -> Vec<OperationalSecondaryIndexEntry> {
776 let Ok(parsed) = serde_json::from_str::<Value>(payload_json) else {
777 return Vec::new();
778 };
779 let Some(object) = parsed.as_object() else {
780 return Vec::new();
781 };
782
783 indexes
784 .iter()
785 .filter_map(|index| match index {
786 OperationalSecondaryIndexDefinition::AppendOnlyFieldTime {
787 name,
788 field,
789 value_type,
790 time_field,
791 } => {
792 let sort_timestamp = object.get(time_field)?.as_i64()?;
793 let slot1 = extract_secondary_index_slot(object, field, *value_type)?;
794 Some(OperationalSecondaryIndexEntry {
795 index_name: name.clone(),
796 sort_timestamp: Some(sort_timestamp),
797 slot1_text: slot1.0,
798 slot1_integer: slot1.1,
799 slot2_text: None,
800 slot2_integer: None,
801 slot3_text: None,
802 slot3_integer: None,
803 })
804 }
805 OperationalSecondaryIndexDefinition::LatestStateField { .. }
806 | OperationalSecondaryIndexDefinition::LatestStateComposite { .. } => None,
807 })
808 .collect()
809}
810
811pub(crate) fn extract_secondary_index_entries_for_current(
812 indexes: &[OperationalSecondaryIndexDefinition],
813 payload_json: &str,
814 updated_at: i64,
815) -> Vec<OperationalSecondaryIndexEntry> {
816 let Ok(parsed) = serde_json::from_str::<Value>(payload_json) else {
817 return Vec::new();
818 };
819 let Some(object) = parsed.as_object() else {
820 return Vec::new();
821 };
822
823 indexes
824 .iter()
825 .filter_map(|index| match index {
826 OperationalSecondaryIndexDefinition::AppendOnlyFieldTime { .. } => None,
827 OperationalSecondaryIndexDefinition::LatestStateField {
828 name,
829 field,
830 value_type,
831 } => {
832 let slot1 = extract_secondary_index_slot(object, field, *value_type)?;
833 Some(OperationalSecondaryIndexEntry {
834 index_name: name.clone(),
835 sort_timestamp: Some(updated_at),
836 slot1_text: slot1.0,
837 slot1_integer: slot1.1,
838 slot2_text: None,
839 slot2_integer: None,
840 slot3_text: None,
841 slot3_integer: None,
842 })
843 }
844 OperationalSecondaryIndexDefinition::LatestStateComposite { name, fields } => {
845 let slots = fields
846 .iter()
847 .map(|field| {
848 extract_secondary_index_slot(object, &field.name, field.value_type)
849 })
850 .collect::<Option<Vec<_>>>()?;
851 Some(OperationalSecondaryIndexEntry {
852 index_name: name.clone(),
853 sort_timestamp: Some(updated_at),
854 slot1_text: slots.first().and_then(|slot| slot.0.clone()),
855 slot1_integer: slots.first().and_then(|slot| slot.1),
856 slot2_text: slots.get(1).and_then(|slot| slot.0.clone()),
857 slot2_integer: slots.get(1).and_then(|slot| slot.1),
858 slot3_text: slots.get(2).and_then(|slot| slot.0.clone()),
859 slot3_integer: slots.get(2).and_then(|slot| slot.1),
860 })
861 }
862 })
863 .collect()
864}
865
866fn extract_secondary_index_slot(
867 object: &serde_json::Map<String, Value>,
868 field_name: &str,
869 value_type: OperationalSecondaryIndexValueType,
870) -> Option<(Option<String>, Option<i64>)> {
871 let value = object.get(field_name)?;
872 match value_type {
873 OperationalSecondaryIndexValueType::String => {
874 Some((Some(value.as_str()?.to_owned()), None))
875 }
876 OperationalSecondaryIndexValueType::Integer
877 | OperationalSecondaryIndexValueType::Timestamp => Some((None, Some(value.as_i64()?))),
878 }
879}
880
881#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
882#[serde(rename_all = "snake_case")]
883pub enum OperationalFilterMode {
884 Exact,
885 Prefix,
886 Range,
887}
888
889#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
890#[serde(rename_all = "snake_case")]
891pub enum OperationalFilterFieldType {
892 String,
893 Integer,
894 Timestamp,
895}
896
897#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
898pub struct OperationalFilterField {
899 pub name: String,
900 #[serde(rename = "type")]
901 pub field_type: OperationalFilterFieldType,
902 pub modes: Vec<OperationalFilterMode>,
903}
904
905#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
906#[serde(untagged)]
907pub enum OperationalFilterValue {
908 String(String),
909 Integer(i64),
910}
911
912#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
913#[serde(tag = "mode", rename_all = "snake_case")]
914pub enum OperationalFilterClause {
915 Exact {
916 field: String,
917 value: OperationalFilterValue,
918 },
919 Prefix {
920 field: String,
921 value: String,
922 },
923 Range {
924 field: String,
925 lower: Option<i64>,
926 upper: Option<i64>,
927 },
928}
929
930#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
931pub struct OperationalReadRequest {
932 pub collection_name: String,
933 pub filters: Vec<OperationalFilterClause>,
934 pub limit: Option<usize>,
935}
936
937#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
938pub struct OperationalReadReport {
939 pub collection_name: String,
940 pub row_count: usize,
941 pub applied_limit: usize,
942 pub was_limited: bool,
943 pub rows: Vec<OperationalMutationRow>,
944}
945
946#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
947pub struct OperationalTraceReport {
948 pub collection_name: String,
949 pub record_key: Option<String>,
950 pub mutation_count: usize,
951 pub current_count: usize,
952 pub mutations: Vec<OperationalMutationRow>,
953 pub current_rows: Vec<OperationalCurrentRow>,
954}
955
956#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
957pub struct OperationalRepairReport {
958 pub collections_rebuilt: usize,
959 pub current_rows_rebuilt: usize,
960}
961
962#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
963pub struct OperationalCompactionReport {
964 pub collection_name: String,
965 pub deleted_mutations: usize,
966 pub dry_run: bool,
967 pub before_timestamp: Option<i64>,
968}
969
970#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
971pub struct OperationalPurgeReport {
972 pub collection_name: String,
973 pub deleted_mutations: usize,
974 pub before_timestamp: i64,
975}
976
977#[cfg(test)]
978#[allow(clippy::expect_used)]
979mod tests {
980 use super::{
981 OperationalValidationContract, OperationalValidationMode,
982 parse_operational_validation_contract, validate_operational_payload_against_contract,
983 };
984
985 #[test]
986 fn parse_validation_contract_accepts_array_enum_members() {
987 let contract = parse_operational_validation_contract(
988 r#"{"format_version":1,"mode":"enforce","fields":[{"name":"tags","type":"array[string]","enum":[["a"],["b","c"]]}]}"#,
989 )
990 .expect("contract parses")
991 .expect("contract present");
992
993 assert_eq!(contract.mode, OperationalValidationMode::Enforce);
994 assert_eq!(contract.fields.len(), 1);
995 }
996
997 #[test]
998 fn report_only_contract_validates_payload_without_rejecting_contract() {
999 let contract = parse_operational_validation_contract(
1000 r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#,
1001 )
1002 .expect("contract parses")
1003 .expect("contract present");
1004
1005 assert!(matches!(
1006 contract.mode,
1007 OperationalValidationMode::ReportOnly
1008 ));
1009 assert!(
1010 validate_operational_payload_against_contract(&contract, r#"{"status":"bogus"}"#)
1011 .is_err()
1012 );
1013 }
1014
1015 #[test]
1016 fn report_only_contract_round_trips_via_serde() {
1017 let contract: OperationalValidationContract = serde_json::from_str(
1018 r#"{"format_version":1,"mode":"report_only","additional_properties":true,"fields":[]}"#,
1019 )
1020 .expect("deserialize");
1021
1022 assert!(matches!(
1023 contract.mode,
1024 OperationalValidationMode::ReportOnly
1025 ));
1026 assert_eq!(
1027 serde_json::to_string(&contract).expect("serialize"),
1028 r#"{"format_version":1,"mode":"report_only","additional_properties":true,"fields":[]}"#
1029 );
1030 }
1031}