Skip to main content

fathomdb_engine/
operational.rs

1use std::collections::{HashMap, HashSet};
2
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5
6fn default_filter_fields_json() -> String {
7    "[]".to_owned()
8}
9
10fn default_validation_json() -> String {
11    String::new()
12}
13
14fn default_secondary_indexes_json() -> String {
15    "[]".to_owned()
16}
17
18#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
19#[serde(rename_all = "snake_case")]
20pub enum OperationalCollectionKind {
21    AppendOnlyLog,
22    LatestState,
23}
24
25impl OperationalCollectionKind {
26    #[must_use]
27    pub fn as_str(self) -> &'static str {
28        match self {
29            Self::AppendOnlyLog => "append_only_log",
30            Self::LatestState => "latest_state",
31        }
32    }
33}
34
35impl TryFrom<&str> for OperationalCollectionKind {
36    type Error = String;
37
38    fn try_from(value: &str) -> Result<Self, Self::Error> {
39        match value {
40            "append_only_log" => Ok(Self::AppendOnlyLog),
41            "latest_state" => Ok(Self::LatestState),
42            other => Err(format!("unknown operational collection kind '{other}'")),
43        }
44    }
45}
46
47#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
48pub struct OperationalCollectionRecord {
49    pub name: String,
50    pub kind: OperationalCollectionKind,
51    pub schema_json: String,
52    pub retention_json: String,
53    #[serde(default = "default_filter_fields_json")]
54    pub filter_fields_json: String,
55    #[serde(default = "default_validation_json")]
56    pub validation_json: String,
57    #[serde(default = "default_secondary_indexes_json")]
58    pub secondary_indexes_json: String,
59    pub format_version: i64,
60    pub created_at: i64,
61    pub disabled_at: Option<i64>,
62}
63
64#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
65pub struct OperationalMutationRow {
66    pub id: String,
67    pub collection_name: String,
68    pub record_key: String,
69    pub op_kind: String,
70    pub payload_json: String,
71    pub source_ref: Option<String>,
72    pub created_at: i64,
73}
74
75#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
76pub struct OperationalCurrentRow {
77    pub collection_name: String,
78    pub record_key: String,
79    pub payload_json: String,
80    pub updated_at: i64,
81    pub last_mutation_id: String,
82}
83
84#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
85pub struct OperationalRegisterRequest {
86    pub name: String,
87    pub kind: OperationalCollectionKind,
88    pub schema_json: String,
89    pub retention_json: String,
90    #[serde(default = "default_filter_fields_json")]
91    pub filter_fields_json: String,
92    #[serde(default = "default_validation_json")]
93    pub validation_json: String,
94    #[serde(default = "default_secondary_indexes_json")]
95    pub secondary_indexes_json: String,
96    pub format_version: i64,
97}
98
99#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
100#[serde(rename_all = "snake_case")]
101pub enum OperationalValidationMode {
102    Disabled,
103    ReportOnly,
104    Enforce,
105}
106
107#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
108pub struct OperationalValidationContract {
109    pub format_version: i64,
110    pub mode: OperationalValidationMode,
111    #[serde(default = "default_true")]
112    pub additional_properties: bool,
113    #[serde(default)]
114    pub fields: Vec<OperationalValidationField>,
115}
116
117const fn default_true() -> bool {
118    true
119}
120
121#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
122pub enum OperationalValidationFieldType {
123    #[serde(rename = "string")]
124    String,
125    #[serde(rename = "integer")]
126    Integer,
127    #[serde(rename = "float")]
128    Float,
129    #[serde(rename = "boolean")]
130    Boolean,
131    #[serde(rename = "timestamp")]
132    Timestamp,
133    #[serde(rename = "object")]
134    Object,
135    #[serde(rename = "array[string]")]
136    ArrayString,
137    #[serde(rename = "array[integer]")]
138    ArrayInteger,
139    #[serde(rename = "array[float]")]
140    ArrayFloat,
141    #[serde(rename = "array[boolean]")]
142    ArrayBoolean,
143}
144
145#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
146pub struct OperationalValidationField {
147    pub name: String,
148    #[serde(rename = "type")]
149    pub field_type: OperationalValidationFieldType,
150    #[serde(default)]
151    pub required: bool,
152    #[serde(default)]
153    pub nullable: bool,
154    #[serde(default, rename = "enum")]
155    pub enum_values: Vec<serde_json::Value>,
156    pub minimum: Option<f64>,
157    pub maximum: Option<f64>,
158    pub max_length: Option<usize>,
159    pub max_items: Option<usize>,
160}
161
162#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
163pub struct OperationalHistoryValidationIssue {
164    pub mutation_id: String,
165    pub record_key: String,
166    pub op_kind: String,
167    pub message: String,
168}
169
170#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
171pub struct OperationalHistoryValidationReport {
172    pub collection_name: String,
173    pub checked_rows: usize,
174    pub invalid_row_count: usize,
175    pub issues: Vec<OperationalHistoryValidationIssue>,
176}
177
178#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
179#[serde(rename_all = "snake_case")]
180pub enum OperationalSecondaryIndexValueType {
181    String,
182    Integer,
183    Timestamp,
184}
185
186#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
187pub struct OperationalSecondaryIndexField {
188    pub name: String,
189    pub value_type: OperationalSecondaryIndexValueType,
190}
191
192#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
193#[serde(tag = "kind", rename_all = "snake_case")]
194pub enum OperationalSecondaryIndexDefinition {
195    AppendOnlyFieldTime {
196        name: String,
197        field: String,
198        value_type: OperationalSecondaryIndexValueType,
199        time_field: String,
200    },
201    LatestStateField {
202        name: String,
203        field: String,
204        value_type: OperationalSecondaryIndexValueType,
205    },
206    LatestStateComposite {
207        name: String,
208        fields: Vec<OperationalSecondaryIndexField>,
209    },
210}
211
212#[derive(Clone, Debug, PartialEq, Eq)]
213pub(crate) struct OperationalSecondaryIndexEntry {
214    pub index_name: String,
215    pub sort_timestamp: Option<i64>,
216    pub slot1_text: Option<String>,
217    pub slot1_integer: Option<i64>,
218    pub slot2_text: Option<String>,
219    pub slot2_integer: Option<i64>,
220    pub slot3_text: Option<String>,
221    pub slot3_integer: Option<i64>,
222}
223
224#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
225pub struct OperationalSecondaryIndexRebuildReport {
226    pub collection_name: String,
227    pub mutation_entries_rebuilt: usize,
228    pub current_entries_rebuilt: usize,
229}
230
231#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
232#[serde(rename_all = "snake_case")]
233pub enum OperationalRetentionActionKind {
234    Noop,
235    PurgeBeforeSeconds,
236    KeepLast,
237}
238
239#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
240pub struct OperationalRetentionPlanItem {
241    pub collection_name: String,
242    pub action_kind: OperationalRetentionActionKind,
243    pub candidate_deletions: usize,
244    pub before_timestamp: Option<i64>,
245    pub max_rows: Option<usize>,
246    pub last_run_at: Option<i64>,
247}
248
249#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
250pub struct OperationalRetentionPlanReport {
251    pub planned_at: i64,
252    pub collections_examined: usize,
253    pub items: Vec<OperationalRetentionPlanItem>,
254}
255
256#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
257pub struct OperationalRetentionRunItem {
258    pub collection_name: String,
259    pub action_kind: OperationalRetentionActionKind,
260    pub deleted_mutations: usize,
261    pub before_timestamp: Option<i64>,
262    pub max_rows: Option<usize>,
263    pub rows_remaining: usize,
264}
265
266#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
267pub struct OperationalRetentionRunReport {
268    pub executed_at: i64,
269    pub collections_examined: usize,
270    pub collections_acted_on: usize,
271    pub dry_run: bool,
272    pub items: Vec<OperationalRetentionRunItem>,
273}
274
275pub(crate) fn parse_operational_validation_contract(
276    validation_json: &str,
277) -> Result<Option<OperationalValidationContract>, String> {
278    if validation_json.is_empty() {
279        return Ok(None);
280    }
281    let contract: OperationalValidationContract = serde_json::from_str(validation_json)
282        .map_err(|error| format!("invalid validation_json: {error}"))?;
283    validate_operational_validation_contract(&contract)?;
284    Ok(Some(contract))
285}
286
287pub(crate) fn parse_operational_secondary_indexes_json(
288    secondary_indexes_json: &str,
289    collection_kind: OperationalCollectionKind,
290) -> Result<Vec<OperationalSecondaryIndexDefinition>, String> {
291    let secondary_indexes_json = if secondary_indexes_json.is_empty() {
292        "[]"
293    } else {
294        secondary_indexes_json
295    };
296    let indexes: Vec<OperationalSecondaryIndexDefinition> =
297        serde_json::from_str(secondary_indexes_json)
298            .map_err(|error| format!("invalid secondary_indexes_json: {error}"))?;
299    validate_operational_secondary_indexes(&indexes, collection_kind)?;
300    Ok(indexes)
301}
302
303pub(crate) fn validate_operational_payload_against_contract(
304    contract: &OperationalValidationContract,
305    payload_json: &str,
306) -> Result<(), String> {
307    let payload: Value = serde_json::from_str(payload_json)
308        .map_err(|error| format!("payload_json is not valid JSON: {error}"))?;
309    validate_operational_payload_value(contract, &payload)
310}
311
312fn validate_operational_validation_contract(
313    contract: &OperationalValidationContract,
314) -> Result<(), String> {
315    if contract.format_version != 1 {
316        return Err("validation_json format_version must be 1".to_owned());
317    }
318
319    let mut seen = HashSet::new();
320    for field in &contract.fields {
321        if field.name.trim().is_empty() {
322            return Err("validation_json field names must not be empty".to_owned());
323        }
324        if !seen.insert(field.name.as_str()) {
325            return Err(format!(
326                "validation_json contains duplicate field '{}'",
327                field.name
328            ));
329        }
330        validate_operational_validation_field(field)?;
331    }
332    Ok(())
333}
334
335fn validate_operational_secondary_indexes(
336    indexes: &[OperationalSecondaryIndexDefinition],
337    collection_kind: OperationalCollectionKind,
338) -> Result<(), String> {
339    let mut seen = HashSet::new();
340    for index in indexes {
341        let name = index.name();
342        if name.trim().is_empty() {
343            return Err("secondary_indexes_json index names must not be empty".to_owned());
344        }
345        if !seen.insert(name) {
346            return Err(format!(
347                "secondary_indexes_json contains duplicate index '{name}'"
348            ));
349        }
350        validate_operational_secondary_index(index, collection_kind)?;
351    }
352    Ok(())
353}
354
355fn validate_operational_secondary_index(
356    index: &OperationalSecondaryIndexDefinition,
357    collection_kind: OperationalCollectionKind,
358) -> Result<(), String> {
359    match index {
360        OperationalSecondaryIndexDefinition::AppendOnlyFieldTime {
361            field, time_field, ..
362        } => {
363            if collection_kind != OperationalCollectionKind::AppendOnlyLog {
364                return Err(format!(
365                    "secondary index '{}' only supports append_only_log collections",
366                    index.name()
367                ));
368            }
369            if field.trim().is_empty() || time_field.trim().is_empty() {
370                return Err(format!(
371                    "secondary index '{}' field names must not be empty",
372                    index.name()
373                ));
374            }
375        }
376        OperationalSecondaryIndexDefinition::LatestStateField { field, .. } => {
377            if collection_kind != OperationalCollectionKind::LatestState {
378                return Err(format!(
379                    "secondary index '{}' only supports latest_state collections",
380                    index.name()
381                ));
382            }
383            if field.trim().is_empty() {
384                return Err(format!(
385                    "secondary index '{}' field names must not be empty",
386                    index.name()
387                ));
388            }
389        }
390        OperationalSecondaryIndexDefinition::LatestStateComposite { fields, .. } => {
391            if collection_kind != OperationalCollectionKind::LatestState {
392                return Err(format!(
393                    "secondary index '{}' only supports latest_state collections",
394                    index.name()
395                ));
396            }
397            if fields.is_empty() || fields.len() > 3 {
398                return Err(format!(
399                    "secondary index '{}' must declare between 1 and 3 fields",
400                    index.name()
401                ));
402            }
403            let mut seen = HashSet::new();
404            for field in fields {
405                if field.name.trim().is_empty() {
406                    return Err(format!(
407                        "secondary index '{}' field names must not be empty",
408                        index.name()
409                    ));
410                }
411                if !seen.insert(field.name.as_str()) {
412                    return Err(format!(
413                        "secondary index '{}' contains duplicate field '{}'",
414                        index.name(),
415                        field.name
416                    ));
417                }
418            }
419        }
420    }
421    Ok(())
422}
423
424#[allow(clippy::too_many_lines)]
425fn validate_operational_validation_field(field: &OperationalValidationField) -> Result<(), String> {
426    if let (Some(minimum), Some(maximum)) = (field.minimum, field.maximum)
427        && minimum > maximum
428    {
429        return Err(format!(
430            "validation field '{}' minimum must be less than or equal to maximum",
431            field.name
432        ));
433    }
434
435    match field.field_type {
436        OperationalValidationFieldType::String => {
437            if field.minimum.is_some() || field.maximum.is_some() {
438                return Err(format!(
439                    "validation field '{}' only supports minimum/maximum for integer, float, and timestamp types",
440                    field.name
441                ));
442            }
443            if field.max_items.is_some() {
444                return Err(format!(
445                    "validation field '{}' only supports max_items for array types",
446                    field.name
447                ));
448            }
449        }
450        OperationalValidationFieldType::Integer | OperationalValidationFieldType::Timestamp => {
451            if field.max_length.is_some() {
452                return Err(format!(
453                    "validation field '{}' only supports max_length for string types",
454                    field.name
455                ));
456            }
457            if field.max_items.is_some() {
458                return Err(format!(
459                    "validation field '{}' only supports max_items for array types",
460                    field.name
461                ));
462            }
463            if let Some(minimum) = field.minimum
464                && minimum.fract() != 0.0
465            {
466                return Err(format!(
467                    "validation field '{}' minimum must be an integer for {}",
468                    field.name,
469                    field.field_type.as_str()
470                ));
471            }
472            if let Some(maximum) = field.maximum
473                && maximum.fract() != 0.0
474            {
475                return Err(format!(
476                    "validation field '{}' maximum must be an integer for {}",
477                    field.name,
478                    field.field_type.as_str()
479                ));
480            }
481        }
482        OperationalValidationFieldType::Float => {
483            if field.max_length.is_some() {
484                return Err(format!(
485                    "validation field '{}' only supports max_length for string types",
486                    field.name
487                ));
488            }
489            if field.max_items.is_some() {
490                return Err(format!(
491                    "validation field '{}' only supports max_items for array types",
492                    field.name
493                ));
494            }
495        }
496        OperationalValidationFieldType::Boolean | OperationalValidationFieldType::Object => {
497            if field.minimum.is_some() || field.maximum.is_some() {
498                return Err(format!(
499                    "validation field '{}' only supports minimum/maximum for integer, float, and timestamp types",
500                    field.name
501                ));
502            }
503            if field.max_length.is_some() {
504                return Err(format!(
505                    "validation field '{}' only supports max_length for string types",
506                    field.name
507                ));
508            }
509            if field.max_items.is_some() {
510                return Err(format!(
511                    "validation field '{}' only supports max_items for array types",
512                    field.name
513                ));
514            }
515        }
516        OperationalValidationFieldType::ArrayString
517        | OperationalValidationFieldType::ArrayInteger
518        | OperationalValidationFieldType::ArrayFloat
519        | OperationalValidationFieldType::ArrayBoolean => {
520            if field.minimum.is_some() || field.maximum.is_some() {
521                return Err(format!(
522                    "validation field '{}' only supports minimum/maximum for integer, float, and timestamp types",
523                    field.name
524                ));
525            }
526            if field.max_length.is_some() {
527                return Err(format!(
528                    "validation field '{}' only supports max_length for string types",
529                    field.name
530                ));
531            }
532        }
533    }
534
535    if !field.enum_values.is_empty() {
536        for value in &field.enum_values {
537            if !field.field_type.matches_enum_value(value) {
538                return Err(format!(
539                    "validation field '{}' has an enum value incompatible with type {}",
540                    field.name,
541                    field.field_type.as_str()
542                ));
543            }
544        }
545    }
546
547    Ok(())
548}
549
550fn validate_operational_payload_value(
551    contract: &OperationalValidationContract,
552    payload: &Value,
553) -> Result<(), String> {
554    let object = payload
555        .as_object()
556        .ok_or_else(|| "payload must be a JSON object".to_owned())?;
557    let field_map = contract
558        .fields
559        .iter()
560        .map(|field| (field.name.as_str(), field))
561        .collect::<HashMap<_, _>>();
562
563    if !contract.additional_properties {
564        for key in object.keys() {
565            if !field_map.contains_key(key.as_str()) {
566                return Err(format!("field '{key}' is not allowed"));
567            }
568        }
569    }
570
571    for field in &contract.fields {
572        let Some(value) = object.get(&field.name) else {
573            if field.required {
574                return Err(format!("field '{}' is required", field.name));
575            }
576            continue;
577        };
578        validate_operational_field_value(field, value)?;
579    }
580
581    Ok(())
582}
583
584#[allow(clippy::cast_precision_loss)]
585fn validate_operational_field_value(
586    field: &OperationalValidationField,
587    value: &Value,
588) -> Result<(), String> {
589    if value.is_null() {
590        if field.nullable {
591            return Ok(());
592        }
593        return Err(format!("field '{}' must not be null", field.name));
594    }
595
596    match field.field_type {
597        OperationalValidationFieldType::String => {
598            let string_value = value
599                .as_str()
600                .ok_or_else(|| format!("field '{}' must be a string", field.name))?;
601            if let Some(max_length) = field.max_length
602                && string_value.len() > max_length
603            {
604                return Err(format!(
605                    "field '{}' must have length <= {}",
606                    field.name, max_length
607                ));
608            }
609            validate_enum_membership(field, value)?;
610        }
611        OperationalValidationFieldType::Integer => {
612            let integer_value = value
613                .as_i64()
614                .ok_or_else(|| format!("field '{}' must be an integer", field.name))?;
615            validate_numeric_bounds(field, integer_value as f64)?;
616            validate_enum_membership(field, value)?;
617        }
618        OperationalValidationFieldType::Timestamp => {
619            let timestamp_value = value
620                .as_i64()
621                .ok_or_else(|| format!("field '{}' must be a timestamp integer", field.name))?;
622            validate_numeric_bounds(field, timestamp_value as f64)?;
623            validate_enum_membership(field, value)?;
624        }
625        OperationalValidationFieldType::Float => {
626            let float_value = value
627                .as_f64()
628                .ok_or_else(|| format!("field '{}' must be a float", field.name))?;
629            validate_numeric_bounds(field, float_value)?;
630            validate_enum_membership(field, value)?;
631        }
632        OperationalValidationFieldType::Boolean => {
633            value
634                .as_bool()
635                .ok_or_else(|| format!("field '{}' must be a boolean", field.name))?;
636            validate_enum_membership(field, value)?;
637        }
638        OperationalValidationFieldType::Object => {
639            value
640                .as_object()
641                .ok_or_else(|| format!("field '{}' must be an object", field.name))?;
642            validate_enum_membership(field, value)?;
643        }
644        OperationalValidationFieldType::ArrayString => {
645            validate_array(field, value, |item| item.as_str().is_some(), "string")?;
646        }
647        OperationalValidationFieldType::ArrayInteger => {
648            validate_array(field, value, |item| item.as_i64().is_some(), "integer")?;
649        }
650        OperationalValidationFieldType::ArrayFloat => {
651            validate_array(field, value, |item| item.as_f64().is_some(), "float")?;
652        }
653        OperationalValidationFieldType::ArrayBoolean => {
654            validate_array(field, value, |item| item.as_bool().is_some(), "boolean")?;
655        }
656    }
657    Ok(())
658}
659
660fn validate_array(
661    field: &OperationalValidationField,
662    value: &Value,
663    predicate: impl Fn(&Value) -> bool,
664    expected: &str,
665) -> Result<(), String> {
666    let items = value
667        .as_array()
668        .ok_or_else(|| format!("field '{}' must be an array", field.name))?;
669    if let Some(max_items) = field.max_items
670        && items.len() > max_items
671    {
672        return Err(format!(
673            "field '{}' must have at most {} items",
674            field.name, max_items
675        ));
676    }
677    for item in items {
678        if !predicate(item) {
679            return Err(format!(
680                "field '{}' must contain only {} values",
681                field.name, expected
682            ));
683        }
684    }
685    validate_enum_membership(field, value)?;
686    Ok(())
687}
688
689fn validate_numeric_bounds(
690    field: &OperationalValidationField,
691    numeric_value: f64,
692) -> Result<(), String> {
693    if let Some(minimum) = field.minimum
694        && numeric_value < minimum
695    {
696        return Err(format!("field '{}' must be >= {}", field.name, minimum));
697    }
698    if let Some(maximum) = field.maximum
699        && numeric_value > maximum
700    {
701        return Err(format!("field '{}' must be <= {}", field.name, maximum));
702    }
703    Ok(())
704}
705
706fn validate_enum_membership(
707    field: &OperationalValidationField,
708    value: &Value,
709) -> Result<(), String> {
710    if !field.enum_values.is_empty()
711        && !field.enum_values.iter().any(|candidate| candidate == value)
712    {
713        return Err(format!(
714            "field '{}' must be one of {}",
715            field.name,
716            serde_json::to_string(&field.enum_values).unwrap_or_else(|_| "[]".to_owned())
717        ));
718    }
719    Ok(())
720}
721
722impl OperationalValidationFieldType {
723    fn as_str(self) -> &'static str {
724        match self {
725            Self::String => "string",
726            Self::Integer => "integer",
727            Self::Float => "float",
728            Self::Boolean => "boolean",
729            Self::Timestamp => "timestamp",
730            Self::Object => "object",
731            Self::ArrayString => "array[string]",
732            Self::ArrayInteger => "array[integer]",
733            Self::ArrayFloat => "array[float]",
734            Self::ArrayBoolean => "array[boolean]",
735        }
736    }
737
738    fn matches_enum_value(self, value: &Value) -> bool {
739        match self {
740            Self::String => value.as_str().is_some(),
741            Self::Integer | Self::Timestamp => value.as_i64().is_some(),
742            Self::Float => value.as_f64().is_some(),
743            Self::Boolean => value.as_bool().is_some(),
744            Self::Object => value.as_object().is_some(),
745            Self::ArrayString => value
746                .as_array()
747                .is_some_and(|items| items.iter().all(|item| item.as_str().is_some())),
748            Self::ArrayInteger => value
749                .as_array()
750                .is_some_and(|items| items.iter().all(|item| item.as_i64().is_some())),
751            Self::ArrayFloat => value
752                .as_array()
753                .is_some_and(|items| items.iter().all(|item| item.as_f64().is_some())),
754            Self::ArrayBoolean => value
755                .as_array()
756                .is_some_and(|items| items.iter().all(|item| item.as_bool().is_some())),
757        }
758    }
759}
760
761impl OperationalSecondaryIndexDefinition {
762    #[must_use]
763    pub fn name(&self) -> &str {
764        match self {
765            Self::AppendOnlyFieldTime { name, .. }
766            | Self::LatestStateField { name, .. }
767            | Self::LatestStateComposite { name, .. } => name,
768        }
769    }
770}
771
772pub(crate) fn extract_secondary_index_entries_for_mutation(
773    indexes: &[OperationalSecondaryIndexDefinition],
774    payload_json: &str,
775) -> Vec<OperationalSecondaryIndexEntry> {
776    let Ok(parsed) = serde_json::from_str::<Value>(payload_json) else {
777        return Vec::new();
778    };
779    let Some(object) = parsed.as_object() else {
780        return Vec::new();
781    };
782
783    indexes
784        .iter()
785        .filter_map(|index| match index {
786            OperationalSecondaryIndexDefinition::AppendOnlyFieldTime {
787                name,
788                field,
789                value_type,
790                time_field,
791            } => {
792                let sort_timestamp = object.get(time_field)?.as_i64()?;
793                let slot1 = extract_secondary_index_slot(object, field, *value_type)?;
794                Some(OperationalSecondaryIndexEntry {
795                    index_name: name.clone(),
796                    sort_timestamp: Some(sort_timestamp),
797                    slot1_text: slot1.0,
798                    slot1_integer: slot1.1,
799                    slot2_text: None,
800                    slot2_integer: None,
801                    slot3_text: None,
802                    slot3_integer: None,
803                })
804            }
805            OperationalSecondaryIndexDefinition::LatestStateField { .. }
806            | OperationalSecondaryIndexDefinition::LatestStateComposite { .. } => None,
807        })
808        .collect()
809}
810
811pub(crate) fn extract_secondary_index_entries_for_current(
812    indexes: &[OperationalSecondaryIndexDefinition],
813    payload_json: &str,
814    updated_at: i64,
815) -> Vec<OperationalSecondaryIndexEntry> {
816    let Ok(parsed) = serde_json::from_str::<Value>(payload_json) else {
817        return Vec::new();
818    };
819    let Some(object) = parsed.as_object() else {
820        return Vec::new();
821    };
822
823    indexes
824        .iter()
825        .filter_map(|index| match index {
826            OperationalSecondaryIndexDefinition::AppendOnlyFieldTime { .. } => None,
827            OperationalSecondaryIndexDefinition::LatestStateField {
828                name,
829                field,
830                value_type,
831            } => {
832                let slot1 = extract_secondary_index_slot(object, field, *value_type)?;
833                Some(OperationalSecondaryIndexEntry {
834                    index_name: name.clone(),
835                    sort_timestamp: Some(updated_at),
836                    slot1_text: slot1.0,
837                    slot1_integer: slot1.1,
838                    slot2_text: None,
839                    slot2_integer: None,
840                    slot3_text: None,
841                    slot3_integer: None,
842                })
843            }
844            OperationalSecondaryIndexDefinition::LatestStateComposite { name, fields } => {
845                let slots = fields
846                    .iter()
847                    .map(|field| {
848                        extract_secondary_index_slot(object, &field.name, field.value_type)
849                    })
850                    .collect::<Option<Vec<_>>>()?;
851                Some(OperationalSecondaryIndexEntry {
852                    index_name: name.clone(),
853                    sort_timestamp: Some(updated_at),
854                    slot1_text: slots.first().and_then(|slot| slot.0.clone()),
855                    slot1_integer: slots.first().and_then(|slot| slot.1),
856                    slot2_text: slots.get(1).and_then(|slot| slot.0.clone()),
857                    slot2_integer: slots.get(1).and_then(|slot| slot.1),
858                    slot3_text: slots.get(2).and_then(|slot| slot.0.clone()),
859                    slot3_integer: slots.get(2).and_then(|slot| slot.1),
860                })
861            }
862        })
863        .collect()
864}
865
866fn extract_secondary_index_slot(
867    object: &serde_json::Map<String, Value>,
868    field_name: &str,
869    value_type: OperationalSecondaryIndexValueType,
870) -> Option<(Option<String>, Option<i64>)> {
871    let value = object.get(field_name)?;
872    match value_type {
873        OperationalSecondaryIndexValueType::String => {
874            Some((Some(value.as_str()?.to_owned()), None))
875        }
876        OperationalSecondaryIndexValueType::Integer
877        | OperationalSecondaryIndexValueType::Timestamp => Some((None, Some(value.as_i64()?))),
878    }
879}
880
881#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
882#[serde(rename_all = "snake_case")]
883pub enum OperationalFilterMode {
884    Exact,
885    Prefix,
886    Range,
887}
888
889#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
890#[serde(rename_all = "snake_case")]
891pub enum OperationalFilterFieldType {
892    String,
893    Integer,
894    Timestamp,
895}
896
897#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
898pub struct OperationalFilterField {
899    pub name: String,
900    #[serde(rename = "type")]
901    pub field_type: OperationalFilterFieldType,
902    pub modes: Vec<OperationalFilterMode>,
903}
904
905#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
906#[serde(untagged)]
907pub enum OperationalFilterValue {
908    String(String),
909    Integer(i64),
910}
911
912#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
913#[serde(tag = "mode", rename_all = "snake_case")]
914pub enum OperationalFilterClause {
915    Exact {
916        field: String,
917        value: OperationalFilterValue,
918    },
919    Prefix {
920        field: String,
921        value: String,
922    },
923    Range {
924        field: String,
925        lower: Option<i64>,
926        upper: Option<i64>,
927    },
928}
929
930#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
931pub struct OperationalReadRequest {
932    pub collection_name: String,
933    pub filters: Vec<OperationalFilterClause>,
934    pub limit: Option<usize>,
935}
936
937#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
938pub struct OperationalReadReport {
939    pub collection_name: String,
940    pub row_count: usize,
941    pub applied_limit: usize,
942    pub was_limited: bool,
943    pub rows: Vec<OperationalMutationRow>,
944}
945
946#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
947pub struct OperationalTraceReport {
948    pub collection_name: String,
949    pub record_key: Option<String>,
950    pub mutation_count: usize,
951    pub current_count: usize,
952    pub mutations: Vec<OperationalMutationRow>,
953    pub current_rows: Vec<OperationalCurrentRow>,
954}
955
956#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
957pub struct OperationalRepairReport {
958    pub collections_rebuilt: usize,
959    pub current_rows_rebuilt: usize,
960}
961
962#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
963pub struct OperationalCompactionReport {
964    pub collection_name: String,
965    pub deleted_mutations: usize,
966    pub dry_run: bool,
967    pub before_timestamp: Option<i64>,
968}
969
970#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
971pub struct OperationalPurgeReport {
972    pub collection_name: String,
973    pub deleted_mutations: usize,
974    pub before_timestamp: i64,
975}
976
977#[cfg(test)]
978#[allow(clippy::expect_used)]
979mod tests {
980    use super::{
981        OperationalValidationContract, OperationalValidationMode,
982        parse_operational_validation_contract, validate_operational_payload_against_contract,
983    };
984
985    #[test]
986    fn parse_validation_contract_accepts_array_enum_members() {
987        let contract = parse_operational_validation_contract(
988            r#"{"format_version":1,"mode":"enforce","fields":[{"name":"tags","type":"array[string]","enum":[["a"],["b","c"]]}]}"#,
989        )
990        .expect("contract parses")
991        .expect("contract present");
992
993        assert_eq!(contract.mode, OperationalValidationMode::Enforce);
994        assert_eq!(contract.fields.len(), 1);
995    }
996
997    #[test]
998    fn report_only_contract_validates_payload_without_rejecting_contract() {
999        let contract = parse_operational_validation_contract(
1000            r#"{"format_version":1,"mode":"report_only","additional_properties":false,"fields":[{"name":"status","type":"string","required":true,"enum":["ok","failed"]}]}"#,
1001        )
1002        .expect("contract parses")
1003        .expect("contract present");
1004
1005        assert!(matches!(
1006            contract.mode,
1007            OperationalValidationMode::ReportOnly
1008        ));
1009        assert!(
1010            validate_operational_payload_against_contract(&contract, r#"{"status":"bogus"}"#)
1011                .is_err()
1012        );
1013    }
1014
1015    #[test]
1016    fn report_only_contract_round_trips_via_serde() {
1017        let contract: OperationalValidationContract = serde_json::from_str(
1018            r#"{"format_version":1,"mode":"report_only","additional_properties":true,"fields":[]}"#,
1019        )
1020        .expect("deserialize");
1021
1022        assert!(matches!(
1023            contract.mode,
1024            OperationalValidationMode::ReportOnly
1025        ));
1026        assert_eq!(
1027            serde_json::to_string(&contract).expect("serialize"),
1028            r#"{"format_version":1,"mode":"report_only","additional_properties":true,"fields":[]}"#
1029        );
1030    }
1031}