Skip to main content

fathomdb_engine/admin/
operational.rs

1use std::fmt::Write as _;
2use std::io;
3use std::time::SystemTime;
4
5use rusqlite::{OptionalExtension, TransactionBehavior};
6use serde::Deserialize;
7
8use super::{
9    AdminService, DEFAULT_OPERATIONAL_READ_LIMIT, EngineError, MAX_OPERATIONAL_READ_LIMIT,
10    persist_simple_provenance_event, rebuild_operational_current_rows,
11};
12use crate::ids::new_id;
13use crate::operational::{
14    OperationalCollectionKind, OperationalCollectionRecord, OperationalCompactionReport,
15    OperationalCurrentRow, OperationalFilterClause, OperationalFilterField,
16    OperationalFilterFieldType, OperationalFilterMode, OperationalFilterValue,
17    OperationalHistoryValidationIssue, OperationalHistoryValidationReport, OperationalMutationRow,
18    OperationalPurgeReport, OperationalReadReport, OperationalReadRequest,
19    OperationalRegisterRequest, OperationalRepairReport, OperationalRetentionActionKind,
20    OperationalRetentionPlanItem, OperationalRetentionPlanReport, OperationalRetentionRunItem,
21    OperationalRetentionRunReport, OperationalSecondaryIndexDefinition,
22    OperationalSecondaryIndexRebuildReport, OperationalTraceReport,
23    extract_secondary_index_entries_for_current, extract_secondary_index_entries_for_mutation,
24    parse_operational_secondary_indexes_json, parse_operational_validation_contract,
25    validate_operational_payload_against_contract,
26};
27
28#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize)]
29#[serde(tag = "mode", rename_all = "snake_case")]
30enum OperationalRetentionPolicy {
31    KeepAll,
32    PurgeBeforeSeconds { max_age_seconds: i64 },
33    KeepLast { max_rows: usize },
34}
35
36#[derive(Clone, Debug, PartialEq, Eq)]
37struct CompiledOperationalReadFilter {
38    field: String,
39    condition: OperationalReadCondition,
40}
41
42#[derive(Clone, Debug)]
43struct MatchedAppendOnlySecondaryIndexRead<'a> {
44    index_name: &'a str,
45    value_filter: &'a CompiledOperationalReadFilter,
46    time_range: Option<&'a CompiledOperationalReadFilter>,
47}
48
49#[derive(Clone, Debug, PartialEq, Eq)]
50enum OperationalReadCondition {
51    ExactString(String),
52    ExactInteger(i64),
53    Prefix(String),
54    Range {
55        lower: Option<i64>,
56        upper: Option<i64>,
57    },
58}
59
60#[derive(Clone, Debug, PartialEq, Eq)]
61struct ExtractedOperationalFilterValue {
62    field_name: String,
63    string_value: Option<String>,
64    integer_value: Option<i64>,
65}
66
67impl AdminService {
68    /// # Errors
69    /// Returns [`EngineError`] if the collection metadata is invalid or the insert fails.
70    pub fn register_operational_collection(
71        &self,
72        request: &OperationalRegisterRequest,
73    ) -> Result<OperationalCollectionRecord, EngineError> {
74        if request.name.trim().is_empty() {
75            return Err(EngineError::InvalidWrite(
76                "operational collection name must not be empty".to_owned(),
77            ));
78        }
79        if request.schema_json.is_empty() {
80            return Err(EngineError::InvalidWrite(
81                "operational collection schema_json must not be empty".to_owned(),
82            ));
83        }
84        if request.retention_json.is_empty() {
85            return Err(EngineError::InvalidWrite(
86                "operational collection retention_json must not be empty".to_owned(),
87            ));
88        }
89        if request.filter_fields_json.is_empty() {
90            return Err(EngineError::InvalidWrite(
91                "operational collection filter_fields_json must not be empty".to_owned(),
92            ));
93        }
94        parse_operational_validation_contract(&request.validation_json)
95            .map_err(EngineError::InvalidWrite)?;
96        parse_operational_secondary_indexes_json(&request.secondary_indexes_json, request.kind)
97            .map_err(EngineError::InvalidWrite)?;
98        if request.format_version <= 0 {
99            return Err(EngineError::InvalidWrite(
100                "operational collection format_version must be positive".to_owned(),
101            ));
102        }
103        parse_operational_filter_fields(&request.filter_fields_json)
104            .map_err(EngineError::InvalidWrite)?;
105
106        let mut conn = self.connect()?;
107        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
108        tx.execute(
109            "INSERT INTO operational_collections \
110             (name, kind, schema_json, retention_json, filter_fields_json, validation_json, secondary_indexes_json, format_version, created_at) \
111             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, unixepoch())",
112            rusqlite::params![
113                request.name.as_str(),
114                request.kind.as_str(),
115                request.schema_json.as_str(),
116                request.retention_json.as_str(),
117                request.filter_fields_json.as_str(),
118                request.validation_json.as_str(),
119                request.secondary_indexes_json.as_str(),
120                request.format_version,
121            ],
122        )?;
123        persist_simple_provenance_event(
124            &tx,
125            "operational_collection_registered",
126            request.name.as_str(),
127            Some(serde_json::json!({
128                "kind": request.kind.as_str(),
129                "format_version": request.format_version,
130            })),
131        )?;
132        tx.commit()?;
133
134        self.describe_operational_collection(&request.name)?
135            .ok_or_else(|| {
136                EngineError::Bridge("registered collection missing after commit".to_owned())
137            })
138    }
139
140    /// # Errors
141    /// Returns [`EngineError`] if the database query fails.
142    pub fn describe_operational_collection(
143        &self,
144        name: &str,
145    ) -> Result<Option<OperationalCollectionRecord>, EngineError> {
146        let conn = self.connect()?;
147        load_operational_collection_record(&conn, name)
148    }
149
150    /// # Errors
151    /// Returns [`EngineError`] if the collection is missing, the filter contract is invalid,
152    /// or existing mutation backfill fails.
153    pub fn update_operational_collection_filters(
154        &self,
155        name: &str,
156        filter_fields_json: &str,
157    ) -> Result<OperationalCollectionRecord, EngineError> {
158        if filter_fields_json.is_empty() {
159            return Err(EngineError::InvalidWrite(
160                "operational collection filter_fields_json must not be empty".to_owned(),
161            ));
162        }
163        let declared_fields = parse_operational_filter_fields(filter_fields_json)
164            .map_err(EngineError::InvalidWrite)?;
165
166        let mut conn = self.connect()?;
167        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
168        load_operational_collection_record(&tx, name)?.ok_or_else(|| {
169            EngineError::InvalidWrite(format!("operational collection '{name}' is not registered"))
170        })?;
171        tx.execute(
172            "UPDATE operational_collections SET filter_fields_json = ?2 WHERE name = ?1",
173            rusqlite::params![name, filter_fields_json],
174        )?;
175        tx.execute(
176            "DELETE FROM operational_filter_values WHERE collection_name = ?1",
177            [name],
178        )?;
179
180        let mut mutation_stmt = tx.prepare(
181            "SELECT id, payload_json FROM operational_mutations \
182             WHERE collection_name = ?1 ORDER BY mutation_order",
183        )?;
184        let mutations = mutation_stmt
185            .query_map([name], |row| {
186                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
187            })?
188            .collect::<Result<Vec<_>, _>>()?;
189        drop(mutation_stmt);
190
191        let mut insert_filter_value = tx.prepare_cached(
192            "INSERT INTO operational_filter_values \
193             (mutation_id, collection_name, field_name, string_value, integer_value) \
194             VALUES (?1, ?2, ?3, ?4, ?5)",
195        )?;
196        let mut inserted_values = 0usize;
197        for (mutation_id, payload_json) in &mutations {
198            for filter_value in
199                extract_operational_filter_values(&declared_fields, payload_json.as_str())
200            {
201                insert_filter_value.execute(rusqlite::params![
202                    mutation_id,
203                    name,
204                    filter_value.field_name,
205                    filter_value.string_value,
206                    filter_value.integer_value,
207                ])?;
208                inserted_values += 1;
209            }
210        }
211        drop(insert_filter_value);
212
213        persist_simple_provenance_event(
214            &tx,
215            "operational_collection_filter_fields_updated",
216            name,
217            Some(serde_json::json!({
218                "field_count": declared_fields.len(),
219                "mutations_backfilled": mutations.len(),
220                "inserted_filter_values": inserted_values,
221            })),
222        )?;
223        let updated = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
224            EngineError::Bridge("operational collection missing after filter update".to_owned())
225        })?;
226        tx.commit()?;
227        Ok(updated)
228    }
229
230    /// # Errors
231    /// Returns [`EngineError`] if the collection is missing or the validation contract is invalid.
232    pub fn update_operational_collection_validation(
233        &self,
234        name: &str,
235        validation_json: &str,
236    ) -> Result<OperationalCollectionRecord, EngineError> {
237        parse_operational_validation_contract(validation_json)
238            .map_err(EngineError::InvalidWrite)?;
239
240        let mut conn = self.connect()?;
241        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
242        load_operational_collection_record(&tx, name)?.ok_or_else(|| {
243            EngineError::InvalidWrite(format!("operational collection '{name}' is not registered"))
244        })?;
245        tx.execute(
246            "UPDATE operational_collections SET validation_json = ?2 WHERE name = ?1",
247            rusqlite::params![name, validation_json],
248        )?;
249        persist_simple_provenance_event(
250            &tx,
251            "operational_collection_validation_updated",
252            name,
253            Some(serde_json::json!({
254                "has_validation": !validation_json.is_empty(),
255            })),
256        )?;
257        let updated = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
258            EngineError::Bridge("operational collection missing after validation update".to_owned())
259        })?;
260        tx.commit()?;
261        Ok(updated)
262    }
263
264    /// # Errors
265    /// Returns [`EngineError`] if the collection is missing, the contract is invalid,
266    /// or derived index rebuild fails.
267    pub fn update_operational_collection_secondary_indexes(
268        &self,
269        name: &str,
270        secondary_indexes_json: &str,
271    ) -> Result<OperationalCollectionRecord, EngineError> {
272        let mut conn = self.connect()?;
273        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
274        let record = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
275            EngineError::InvalidWrite(format!("operational collection '{name}' is not registered"))
276        })?;
277        let indexes = parse_operational_secondary_indexes_json(secondary_indexes_json, record.kind)
278            .map_err(EngineError::InvalidWrite)?;
279        tx.execute(
280            "UPDATE operational_collections SET secondary_indexes_json = ?2 WHERE name = ?1",
281            rusqlite::params![name, secondary_indexes_json],
282        )?;
283        let (mutation_entries_rebuilt, current_entries_rebuilt) =
284            rebuild_operational_secondary_index_entries(&tx, &record.name, record.kind, &indexes)?;
285        persist_simple_provenance_event(
286            &tx,
287            "operational_collection_secondary_indexes_updated",
288            name,
289            Some(serde_json::json!({
290                "index_count": indexes.len(),
291                "mutation_entries_rebuilt": mutation_entries_rebuilt,
292                "current_entries_rebuilt": current_entries_rebuilt,
293            })),
294        )?;
295        let updated = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
296            EngineError::Bridge(
297                "operational collection missing after secondary index update".to_owned(),
298            )
299        })?;
300        tx.commit()?;
301        Ok(updated)
302    }
303
304    /// # Errors
305    /// Returns [`EngineError`] if the collection is missing or rebuild fails.
306    pub fn rebuild_operational_secondary_indexes(
307        &self,
308        name: &str,
309    ) -> Result<OperationalSecondaryIndexRebuildReport, EngineError> {
310        let mut conn = self.connect()?;
311        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
312        let record = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
313            EngineError::InvalidWrite(format!("operational collection '{name}' is not registered"))
314        })?;
315        let indexes =
316            parse_operational_secondary_indexes_json(&record.secondary_indexes_json, record.kind)
317                .map_err(EngineError::InvalidWrite)?;
318        let (mutation_entries_rebuilt, current_entries_rebuilt) =
319            rebuild_operational_secondary_index_entries(&tx, &record.name, record.kind, &indexes)?;
320        persist_simple_provenance_event(
321            &tx,
322            "operational_secondary_indexes_rebuilt",
323            name,
324            Some(serde_json::json!({
325                "index_count": indexes.len(),
326                "mutation_entries_rebuilt": mutation_entries_rebuilt,
327                "current_entries_rebuilt": current_entries_rebuilt,
328            })),
329        )?;
330        tx.commit()?;
331        Ok(OperationalSecondaryIndexRebuildReport {
332            collection_name: name.to_owned(),
333            mutation_entries_rebuilt,
334            current_entries_rebuilt,
335        })
336    }
337
338    /// # Errors
339    /// Returns [`EngineError`] if the collection is missing or its validation contract is invalid.
340    pub fn validate_operational_collection_history(
341        &self,
342        name: &str,
343    ) -> Result<OperationalHistoryValidationReport, EngineError> {
344        let conn = self.connect()?;
345        let record = load_operational_collection_record(&conn, name)?.ok_or_else(|| {
346            EngineError::InvalidWrite(format!("operational collection '{name}' is not registered"))
347        })?;
348        let Some(contract) = parse_operational_validation_contract(&record.validation_json)
349            .map_err(EngineError::InvalidWrite)?
350        else {
351            return Err(EngineError::InvalidWrite(format!(
352                "operational collection '{name}' has no validation_json configured"
353            )));
354        };
355
356        let mut stmt = conn.prepare(
357            "SELECT id, record_key, op_kind, payload_json FROM operational_mutations \
358             WHERE collection_name = ?1 ORDER BY mutation_order",
359        )?;
360        let rows = stmt
361            .query_map([name], |row| {
362                Ok((
363                    row.get::<_, String>(0)?,
364                    row.get::<_, String>(1)?,
365                    row.get::<_, String>(2)?,
366                    row.get::<_, String>(3)?,
367                ))
368            })?
369            .collect::<Result<Vec<_>, _>>()?;
370        drop(stmt);
371
372        let mut checked_rows = 0usize;
373        let mut issues = Vec::new();
374        for (mutation_id, record_key, op_kind, payload_json) in rows {
375            if op_kind == "delete" {
376                continue;
377            }
378            checked_rows += 1;
379            if let Err(message) =
380                validate_operational_payload_against_contract(&contract, payload_json.as_str())
381            {
382                issues.push(OperationalHistoryValidationIssue {
383                    mutation_id,
384                    record_key,
385                    op_kind,
386                    message,
387                });
388            }
389        }
390
391        Ok(OperationalHistoryValidationReport {
392            collection_name: name.to_owned(),
393            checked_rows,
394            invalid_row_count: issues.len(),
395            issues,
396        })
397    }
398
399    /// # Errors
400    /// Returns [`EngineError`] if the database query fails.
401    pub fn disable_operational_collection(
402        &self,
403        name: &str,
404    ) -> Result<OperationalCollectionRecord, EngineError> {
405        let mut conn = self.connect()?;
406        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
407        let record = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
408            EngineError::InvalidWrite(format!("operational collection '{name}' is not registered"))
409        })?;
410        let changed = if record.disabled_at.is_none() {
411            tx.execute(
412                "UPDATE operational_collections SET disabled_at = unixepoch() WHERE name = ?1",
413                [name],
414            )?;
415            true
416        } else {
417            false
418        };
419        let record = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
420            EngineError::Bridge("operational collection missing after disable".to_owned())
421        })?;
422        persist_simple_provenance_event(
423            &tx,
424            "operational_collection_disabled",
425            name,
426            Some(serde_json::json!({
427                "disabled_at": record.disabled_at,
428                "changed": changed,
429            })),
430        )?;
431        tx.commit()?;
432        Ok(record)
433    }
434
435    /// # Errors
436    /// Returns [`EngineError`] if the database query fails.
437    pub fn compact_operational_collection(
438        &self,
439        name: &str,
440        dry_run: bool,
441    ) -> Result<OperationalCompactionReport, EngineError> {
442        let mut conn = self.connect()?;
443        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
444        let collection = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
445            EngineError::InvalidWrite(format!("operational collection '{name}' is not registered"))
446        })?;
447        validate_append_only_operational_collection(&collection, "compact")?;
448        let (mutation_ids, before_timestamp) =
449            operational_compaction_candidates(&tx, &collection.retention_json, name)?;
450        if dry_run {
451            drop(tx);
452            return Ok(OperationalCompactionReport {
453                collection_name: name.to_owned(),
454                deleted_mutations: mutation_ids.len(),
455                dry_run: true,
456                before_timestamp,
457            });
458        }
459        let mut delete_stmt =
460            tx.prepare_cached("DELETE FROM operational_mutations WHERE id = ?1")?;
461        for mutation_id in &mutation_ids {
462            delete_stmt.execute([mutation_id.as_str()])?;
463        }
464        drop(delete_stmt);
465        persist_simple_provenance_event(
466            &tx,
467            "operational_collection_compacted",
468            name,
469            Some(serde_json::json!({
470                "deleted_mutations": mutation_ids.len(),
471                "before_timestamp": before_timestamp,
472            })),
473        )?;
474        tx.commit()?;
475        Ok(OperationalCompactionReport {
476            collection_name: name.to_owned(),
477            deleted_mutations: mutation_ids.len(),
478            dry_run: false,
479            before_timestamp,
480        })
481    }
482
483    /// # Errors
484    /// Returns [`EngineError`] if the database query fails.
485    pub fn purge_operational_collection(
486        &self,
487        name: &str,
488        before_timestamp: i64,
489    ) -> Result<OperationalPurgeReport, EngineError> {
490        let mut conn = self.connect()?;
491        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
492        let collection = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
493            EngineError::InvalidWrite(format!("operational collection '{name}' is not registered"))
494        })?;
495        validate_append_only_operational_collection(&collection, "purge")?;
496        let deleted_mutations = tx.execute(
497            "DELETE FROM operational_mutations WHERE collection_name = ?1 AND created_at < ?2",
498            rusqlite::params![name, before_timestamp],
499        )?;
500        persist_simple_provenance_event(
501            &tx,
502            "operational_collection_purged",
503            name,
504            Some(serde_json::json!({
505                "deleted_mutations": deleted_mutations,
506                "before_timestamp": before_timestamp,
507            })),
508        )?;
509        tx.commit()?;
510        Ok(OperationalPurgeReport {
511            collection_name: name.to_owned(),
512            deleted_mutations,
513            before_timestamp,
514        })
515    }
516
517    /// # Errors
518    /// Returns [`EngineError`] if collection selection or policy parsing fails.
519    pub fn plan_operational_retention(
520        &self,
521        now_timestamp: i64,
522        collection_names: Option<&[String]>,
523        max_collections: Option<usize>,
524    ) -> Result<OperationalRetentionPlanReport, EngineError> {
525        let conn = self.connect()?;
526        let records = load_operational_retention_records(&conn, collection_names, max_collections)?;
527        let mut items = Vec::with_capacity(records.len());
528        for record in records {
529            items.push(plan_operational_retention_item(
530                &conn,
531                &record,
532                now_timestamp,
533            )?);
534        }
535        Ok(OperationalRetentionPlanReport {
536            planned_at: now_timestamp,
537            collections_examined: items.len(),
538            items,
539        })
540    }
541
542    /// # Errors
543    /// Returns [`EngineError`] if collection selection, policy parsing, or execution fails.
544    pub fn run_operational_retention(
545        &self,
546        now_timestamp: i64,
547        collection_names: Option<&[String]>,
548        max_collections: Option<usize>,
549        dry_run: bool,
550    ) -> Result<OperationalRetentionRunReport, EngineError> {
551        let mut conn = self.connect()?;
552        let records = load_operational_retention_records(&conn, collection_names, max_collections)?;
553        let mut items = Vec::with_capacity(records.len());
554        let mut collections_acted_on = 0usize;
555
556        for record in records {
557            let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
558            let item = run_operational_retention_item(&tx, &record, now_timestamp, dry_run)?;
559            if item.deleted_mutations > 0 {
560                collections_acted_on += 1;
561            }
562            if dry_run || item.action_kind == OperationalRetentionActionKind::Noop {
563                drop(tx);
564            } else {
565                tx.commit()?;
566            }
567            items.push(item);
568        }
569
570        Ok(OperationalRetentionRunReport {
571            executed_at: now_timestamp,
572            collections_examined: items.len(),
573            collections_acted_on,
574            dry_run,
575            items,
576        })
577    }
578
579    /// # Errors
580    /// Returns [`EngineError`] if the database query fails.
581    pub fn trace_operational_collection(
582        &self,
583        collection_name: &str,
584        record_key: Option<&str>,
585    ) -> Result<OperationalTraceReport, EngineError> {
586        let conn = self.connect()?;
587        ensure_operational_collection_registered(&conn, collection_name)?;
588        let mutations = if let Some(record_key) = record_key {
589            let mut stmt = conn.prepare(
590                "SELECT id, collection_name, record_key, op_kind, payload_json, source_ref, created_at \
591                 FROM operational_mutations \
592                 WHERE collection_name = ?1 AND record_key = ?2 \
593                 ORDER BY mutation_order",
594            )?;
595            stmt.query_map([collection_name, record_key], map_operational_mutation_row)?
596                .collect::<Result<Vec<_>, _>>()?
597        } else {
598            let mut stmt = conn.prepare(
599                "SELECT id, collection_name, record_key, op_kind, payload_json, source_ref, created_at \
600                 FROM operational_mutations \
601                 WHERE collection_name = ?1 \
602                 ORDER BY mutation_order",
603            )?;
604            stmt.query_map([collection_name], map_operational_mutation_row)?
605                .collect::<Result<Vec<_>, _>>()?
606        };
607        let current_rows = if let Some(record_key) = record_key {
608            let mut stmt = conn.prepare(
609                "SELECT collection_name, record_key, payload_json, updated_at, last_mutation_id \
610                 FROM operational_current \
611                 WHERE collection_name = ?1 AND record_key = ?2 \
612                 ORDER BY updated_at, record_key",
613            )?;
614            stmt.query_map([collection_name, record_key], map_operational_current_row)?
615                .collect::<Result<Vec<_>, _>>()?
616        } else {
617            let mut stmt = conn.prepare(
618                "SELECT collection_name, record_key, payload_json, updated_at, last_mutation_id \
619                 FROM operational_current \
620                 WHERE collection_name = ?1 \
621                 ORDER BY updated_at, record_key",
622            )?;
623            stmt.query_map([collection_name], map_operational_current_row)?
624                .collect::<Result<Vec<_>, _>>()?
625        };
626
627        Ok(OperationalTraceReport {
628            collection_name: collection_name.to_owned(),
629            record_key: record_key.map(str::to_owned),
630            mutation_count: mutations.len(),
631            current_count: current_rows.len(),
632            mutations,
633            current_rows,
634        })
635    }
636
637    /// # Errors
638    /// Returns [`EngineError`] if the collection contract is invalid or the filtered read fails.
639    pub fn read_operational_collection(
640        &self,
641        request: &OperationalReadRequest,
642    ) -> Result<OperationalReadReport, EngineError> {
643        if request.collection_name.trim().is_empty() {
644            return Err(EngineError::InvalidWrite(
645                "operational read collection_name must not be empty".to_owned(),
646            ));
647        }
648        if request.filters.is_empty() {
649            return Err(EngineError::InvalidWrite(
650                "operational read requires at least one filter clause".to_owned(),
651            ));
652        }
653
654        let conn = self.connect()?;
655        let record = load_operational_collection_record(&conn, &request.collection_name)?
656            .ok_or_else(|| {
657                EngineError::InvalidWrite(format!(
658                    "operational collection '{}' is not registered",
659                    request.collection_name
660                ))
661            })?;
662        validate_append_only_operational_collection(&record, "read")?;
663        let declared_fields = parse_operational_filter_fields(&record.filter_fields_json)
664            .map_err(EngineError::InvalidWrite)?;
665        let secondary_indexes =
666            parse_operational_secondary_indexes_json(&record.secondary_indexes_json, record.kind)
667                .map_err(EngineError::InvalidWrite)?;
668        let applied_limit = operational_read_limit(request.limit)?;
669        let filters = compile_operational_read_filters(&request.filters, &declared_fields)?;
670        if let Some(report) = execute_operational_secondary_index_read(
671            &conn,
672            &request.collection_name,
673            &filters,
674            &secondary_indexes,
675            applied_limit,
676        )? {
677            return Ok(report);
678        }
679        execute_operational_filtered_read(&conn, &request.collection_name, &filters, applied_limit)
680    }
681
682    /// # Errors
683    /// Returns [`EngineError`] if the database query fails or collection validation fails.
684    pub fn rebuild_operational_current(
685        &self,
686        collection_name: Option<&str>,
687    ) -> Result<OperationalRepairReport, EngineError> {
688        let mut conn = self.connect()?;
689        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
690        let collections = if let Some(name) = collection_name {
691            let maybe_kind: Option<String> = tx
692                .query_row(
693                    "SELECT kind FROM operational_collections WHERE name = ?1",
694                    [name],
695                    |row| row.get(0),
696                )
697                .optional()?;
698            let Some(kind) = maybe_kind else {
699                return Err(EngineError::InvalidWrite(format!(
700                    "operational collection '{name}' is not registered"
701                )));
702            };
703            if kind != OperationalCollectionKind::LatestState.as_str() {
704                return Err(EngineError::InvalidWrite(format!(
705                    "operational collection '{name}' is not latest_state"
706                )));
707            }
708            vec![name.to_owned()]
709        } else {
710            let mut stmt = tx.prepare(
711                "SELECT name FROM operational_collections WHERE kind = 'latest_state' ORDER BY name",
712            )?;
713            stmt.query_map([], |row| row.get::<_, String>(0))?
714                .collect::<Result<Vec<_>, _>>()?
715        };
716
717        let rebuilt_rows = rebuild_operational_current_rows(&tx, &collections)?;
718        for collection in &collections {
719            let record = load_operational_collection_record(&tx, collection)?.ok_or_else(|| {
720                EngineError::Bridge(format!(
721                    "operational collection '{collection}' missing during current rebuild"
722                ))
723            })?;
724            let indexes = parse_operational_secondary_indexes_json(
725                &record.secondary_indexes_json,
726                record.kind,
727            )
728            .map_err(EngineError::InvalidWrite)?;
729            if !indexes.is_empty() {
730                rebuild_operational_secondary_index_entries(
731                    &tx,
732                    &record.name,
733                    record.kind,
734                    &indexes,
735                )?;
736            }
737        }
738
739        persist_simple_provenance_event(
740            &tx,
741            "operational_current_rebuilt",
742            collection_name.unwrap_or("*"),
743            Some(serde_json::json!({
744                "collections_rebuilt": collections.len(),
745                "current_rows_rebuilt": rebuilt_rows,
746            })),
747        )?;
748        tx.commit()?;
749
750        Ok(OperationalRepairReport {
751            collections_rebuilt: collections.len(),
752            current_rows_rebuilt: rebuilt_rows,
753        })
754    }
755}
756
757pub(super) fn load_operational_collection_record(
758    conn: &rusqlite::Connection,
759    name: &str,
760) -> Result<Option<OperationalCollectionRecord>, EngineError> {
761    conn.query_row(
762        "SELECT name, kind, schema_json, retention_json, filter_fields_json, validation_json, secondary_indexes_json, format_version, created_at, disabled_at \
763         FROM operational_collections WHERE name = ?1",
764        [name],
765        map_operational_collection_row,
766    )
767    .optional()
768    .map_err(EngineError::Sqlite)
769}
770
771pub(super) fn map_operational_mutation_row(
772    row: &rusqlite::Row<'_>,
773) -> Result<OperationalMutationRow, rusqlite::Error> {
774    Ok(OperationalMutationRow {
775        id: row.get(0)?,
776        collection_name: row.get(1)?,
777        record_key: row.get(2)?,
778        op_kind: row.get(3)?,
779        payload_json: row.get(4)?,
780        source_ref: row.get(5)?,
781        created_at: row.get(6)?,
782    })
783}
784
785pub(super) fn map_operational_current_row(
786    row: &rusqlite::Row<'_>,
787) -> Result<OperationalCurrentRow, rusqlite::Error> {
788    Ok(OperationalCurrentRow {
789        collection_name: row.get(0)?,
790        record_key: row.get(1)?,
791        payload_json: row.get(2)?,
792        updated_at: row.get(3)?,
793        last_mutation_id: row.get(4)?,
794    })
795}
796
797fn map_operational_collection_row(
798    row: &rusqlite::Row<'_>,
799) -> Result<OperationalCollectionRecord, rusqlite::Error> {
800    let kind_text: String = row.get(1)?;
801    let kind = OperationalCollectionKind::try_from(kind_text.as_str()).map_err(|message| {
802        rusqlite::Error::FromSqlConversionFailure(
803            1,
804            rusqlite::types::Type::Text,
805            Box::new(io::Error::new(io::ErrorKind::InvalidData, message)),
806        )
807    })?;
808    Ok(OperationalCollectionRecord {
809        name: row.get(0)?,
810        kind,
811        schema_json: row.get(2)?,
812        retention_json: row.get(3)?,
813        filter_fields_json: row.get(4)?,
814        validation_json: row.get(5)?,
815        secondary_indexes_json: row.get(6)?,
816        format_version: row.get(7)?,
817        created_at: row.get(8)?,
818        disabled_at: row.get(9)?,
819    })
820}
821
822fn validate_append_only_operational_collection(
823    record: &OperationalCollectionRecord,
824    operation: &str,
825) -> Result<(), EngineError> {
826    if record.kind != OperationalCollectionKind::AppendOnlyLog {
827        return Err(EngineError::InvalidWrite(format!(
828            "operational collection '{}' must be append_only_log to {operation}",
829            record.name
830        )));
831    }
832    Ok(())
833}
834
835fn ensure_operational_collection_registered(
836    conn: &rusqlite::Connection,
837    collection_name: &str,
838) -> Result<(), EngineError> {
839    if load_operational_collection_record(conn, collection_name)?.is_none() {
840        return Err(EngineError::InvalidWrite(format!(
841            "operational collection '{collection_name}' is not registered"
842        )));
843    }
844    Ok(())
845}
846
847fn clear_operational_secondary_index_entries(
848    tx: &rusqlite::Transaction<'_>,
849    collection_name: &str,
850) -> Result<(), EngineError> {
851    tx.execute(
852        "DELETE FROM operational_secondary_index_entries WHERE collection_name = ?1",
853        [collection_name],
854    )?;
855    Ok(())
856}
857
858fn insert_operational_secondary_index_entry(
859    tx: &rusqlite::Transaction<'_>,
860    collection_name: &str,
861    subject_kind: &str,
862    mutation_id: &str,
863    record_key: &str,
864    entry: &crate::operational::OperationalSecondaryIndexEntry,
865) -> Result<(), EngineError> {
866    tx.execute(
867        "INSERT INTO operational_secondary_index_entries \
868         (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
869          slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
870         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
871        rusqlite::params![
872            collection_name,
873            entry.index_name,
874            subject_kind,
875            mutation_id,
876            record_key,
877            entry.sort_timestamp,
878            entry.slot1_text,
879            entry.slot1_integer,
880            entry.slot2_text,
881            entry.slot2_integer,
882            entry.slot3_text,
883            entry.slot3_integer,
884        ],
885    )?;
886    Ok(())
887}
888
889pub(super) fn rebuild_operational_secondary_index_entries(
890    tx: &rusqlite::Transaction<'_>,
891    collection_name: &str,
892    collection_kind: OperationalCollectionKind,
893    indexes: &[OperationalSecondaryIndexDefinition],
894) -> Result<(usize, usize), EngineError> {
895    clear_operational_secondary_index_entries(tx, collection_name)?;
896
897    let mut mutation_entries_rebuilt = 0usize;
898    if collection_kind == OperationalCollectionKind::AppendOnlyLog {
899        let mut stmt = tx.prepare(
900            "SELECT id, record_key, payload_json FROM operational_mutations \
901             WHERE collection_name = ?1 ORDER BY mutation_order",
902        )?;
903        let rows = stmt
904            .query_map([collection_name], |row| {
905                Ok((
906                    row.get::<_, String>(0)?,
907                    row.get::<_, String>(1)?,
908                    row.get::<_, String>(2)?,
909                ))
910            })?
911            .collect::<Result<Vec<_>, _>>()?;
912        drop(stmt);
913        for (mutation_id, record_key, payload_json) in rows {
914            for entry in extract_secondary_index_entries_for_mutation(indexes, &payload_json) {
915                insert_operational_secondary_index_entry(
916                    tx,
917                    collection_name,
918                    "mutation",
919                    &mutation_id,
920                    &record_key,
921                    &entry,
922                )?;
923                mutation_entries_rebuilt += 1;
924            }
925        }
926    }
927
928    let mut current_entries_rebuilt = 0usize;
929    if collection_kind == OperationalCollectionKind::LatestState {
930        let mut stmt = tx.prepare(
931            "SELECT record_key, payload_json, updated_at, last_mutation_id FROM operational_current \
932             WHERE collection_name = ?1 ORDER BY updated_at DESC, record_key",
933        )?;
934        let rows = stmt
935            .query_map([collection_name], |row| {
936                Ok((
937                    row.get::<_, String>(0)?,
938                    row.get::<_, String>(1)?,
939                    row.get::<_, i64>(2)?,
940                    row.get::<_, String>(3)?,
941                ))
942            })?
943            .collect::<Result<Vec<_>, _>>()?;
944        drop(stmt);
945        for (record_key, payload_json, updated_at, last_mutation_id) in rows {
946            for entry in
947                extract_secondary_index_entries_for_current(indexes, &payload_json, updated_at)
948            {
949                insert_operational_secondary_index_entry(
950                    tx,
951                    collection_name,
952                    "current",
953                    &last_mutation_id,
954                    &record_key,
955                    &entry,
956                )?;
957                current_entries_rebuilt += 1;
958            }
959        }
960    }
961
962    Ok((mutation_entries_rebuilt, current_entries_rebuilt))
963}
964
965fn operational_read_limit(limit: Option<usize>) -> Result<usize, EngineError> {
966    let applied_limit = limit.unwrap_or(DEFAULT_OPERATIONAL_READ_LIMIT);
967    if applied_limit == 0 {
968        return Err(EngineError::InvalidWrite(
969            "operational read limit must be greater than zero".to_owned(),
970        ));
971    }
972    Ok(applied_limit.min(MAX_OPERATIONAL_READ_LIMIT))
973}
974
975pub(super) fn parse_operational_filter_fields(
976    filter_fields_json: &str,
977) -> Result<Vec<OperationalFilterField>, String> {
978    let fields: Vec<OperationalFilterField> = serde_json::from_str(filter_fields_json)
979        .map_err(|error| format!("invalid filter_fields_json: {error}"))?;
980    let mut seen = std::collections::HashSet::new();
981    for field in &fields {
982        if field.name.trim().is_empty() {
983            return Err("filter_fields_json field names must not be empty".to_owned());
984        }
985        if !seen.insert(field.name.as_str()) {
986            return Err(format!(
987                "filter_fields_json contains duplicate field '{}'",
988                field.name
989            ));
990        }
991        if field.modes.is_empty() {
992            return Err(format!(
993                "filter_fields_json field '{}' must declare at least one mode",
994                field.name
995            ));
996        }
997        if field.modes.contains(&OperationalFilterMode::Prefix)
998            && field.field_type != OperationalFilterFieldType::String
999        {
1000            return Err(format!(
1001                "filter field '{}' only supports prefix for string types",
1002                field.name
1003            ));
1004        }
1005    }
1006    Ok(fields)
1007}
1008
1009fn compile_operational_read_filters(
1010    filters: &[OperationalFilterClause],
1011    declared_fields: &[OperationalFilterField],
1012) -> Result<Vec<CompiledOperationalReadFilter>, EngineError> {
1013    let field_map = declared_fields
1014        .iter()
1015        .map(|field| (field.name.as_str(), field))
1016        .collect::<std::collections::HashMap<_, _>>();
1017    filters
1018        .iter()
1019        .map(|filter| match filter {
1020            OperationalFilterClause::Exact { field, value } => {
1021                let declared = field_map.get(field.as_str()).ok_or_else(|| {
1022                    EngineError::InvalidWrite(format!(
1023                        "operational read filter uses undeclared field '{field}'"
1024                    ))
1025                })?;
1026                if !declared.modes.contains(&OperationalFilterMode::Exact) {
1027                    return Err(EngineError::InvalidWrite(format!(
1028                        "operational read field '{field}' does not allow exact filters"
1029                    )));
1030                }
1031                let condition = match (declared.field_type, value) {
1032                    (OperationalFilterFieldType::String, OperationalFilterValue::String(value)) => {
1033                        OperationalReadCondition::ExactString(value.clone())
1034                    }
1035                    (
1036                        OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp,
1037                        OperationalFilterValue::Integer(value),
1038                    ) => OperationalReadCondition::ExactInteger(*value),
1039                    _ => {
1040                        return Err(EngineError::InvalidWrite(format!(
1041                            "operational read field '{field}' received a value with the wrong type"
1042                        )));
1043                    }
1044                };
1045                Ok(CompiledOperationalReadFilter {
1046                    field: field.clone(),
1047                    condition,
1048                })
1049            }
1050            OperationalFilterClause::Prefix { field, value } => {
1051                let declared = field_map.get(field.as_str()).ok_or_else(|| {
1052                    EngineError::InvalidWrite(format!(
1053                        "operational read filter uses undeclared field '{field}'"
1054                    ))
1055                })?;
1056                if !declared.modes.contains(&OperationalFilterMode::Prefix) {
1057                    return Err(EngineError::InvalidWrite(format!(
1058                        "operational read field '{field}' does not allow prefix filters"
1059                    )));
1060                }
1061                if declared.field_type != OperationalFilterFieldType::String {
1062                    return Err(EngineError::InvalidWrite(format!(
1063                        "operational read field '{field}' only supports prefix filters for strings"
1064                    )));
1065                }
1066                Ok(CompiledOperationalReadFilter {
1067                    field: field.clone(),
1068                    condition: OperationalReadCondition::Prefix(value.clone()),
1069                })
1070            }
1071            OperationalFilterClause::Range {
1072                field,
1073                lower,
1074                upper,
1075            } => {
1076                let declared = field_map.get(field.as_str()).ok_or_else(|| {
1077                    EngineError::InvalidWrite(format!(
1078                        "operational read filter uses undeclared field '{field}'"
1079                    ))
1080                })?;
1081                if !declared.modes.contains(&OperationalFilterMode::Range) {
1082                    return Err(EngineError::InvalidWrite(format!(
1083                        "operational read field '{field}' does not allow range filters"
1084                    )));
1085                }
1086                if !matches!(
1087                    declared.field_type,
1088                    OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp
1089                ) {
1090                    return Err(EngineError::InvalidWrite(format!(
1091                        "operational read field '{field}' only supports range filters for integer/timestamp fields"
1092                    )));
1093                }
1094                if lower.is_none() && upper.is_none() {
1095                    return Err(EngineError::InvalidWrite(format!(
1096                        "operational read range filter for '{field}' must specify a lower or upper bound"
1097                    )));
1098                }
1099                Ok(CompiledOperationalReadFilter {
1100                    field: field.clone(),
1101                    condition: OperationalReadCondition::Range {
1102                        lower: *lower,
1103                        upper: *upper,
1104                    },
1105                })
1106            }
1107        })
1108        .collect()
1109}
1110
1111fn match_append_only_secondary_index_read<'a>(
1112    filters: &'a [CompiledOperationalReadFilter],
1113    indexes: &'a [OperationalSecondaryIndexDefinition],
1114) -> Option<MatchedAppendOnlySecondaryIndexRead<'a>> {
1115    indexes.iter().find_map(|index| {
1116        let OperationalSecondaryIndexDefinition::AppendOnlyFieldTime {
1117            name,
1118            field,
1119            value_type,
1120            time_field,
1121        } = index
1122        else {
1123            return None;
1124        };
1125        if !(1..=2).contains(&filters.len()) {
1126            return None;
1127        }
1128
1129        let mut value_filter = None;
1130        let mut time_range = None;
1131        for filter in filters {
1132            if filter.field == *field {
1133                let supported = matches!(
1134                    (&filter.condition, value_type),
1135                    (
1136                        OperationalReadCondition::ExactString(_)
1137                            | OperationalReadCondition::Prefix(_),
1138                        crate::operational::OperationalSecondaryIndexValueType::String
1139                    ) | (
1140                        OperationalReadCondition::ExactInteger(_),
1141                        crate::operational::OperationalSecondaryIndexValueType::Integer
1142                            | crate::operational::OperationalSecondaryIndexValueType::Timestamp
1143                    )
1144                );
1145                if !supported || value_filter.is_some() {
1146                    return None;
1147                }
1148                value_filter = Some(filter);
1149                continue;
1150            }
1151            if filter.field == *time_field {
1152                if !matches!(filter.condition, OperationalReadCondition::Range { .. })
1153                    || time_range.is_some()
1154                {
1155                    return None;
1156                }
1157                time_range = Some(filter);
1158                continue;
1159            }
1160            return None;
1161        }
1162
1163        value_filter.map(|value_filter| MatchedAppendOnlySecondaryIndexRead {
1164            index_name: name.as_str(),
1165            value_filter,
1166            time_range,
1167        })
1168    })
1169}
1170
1171fn execute_operational_secondary_index_read(
1172    conn: &rusqlite::Connection,
1173    collection_name: &str,
1174    filters: &[CompiledOperationalReadFilter],
1175    indexes: &[OperationalSecondaryIndexDefinition],
1176    applied_limit: usize,
1177) -> Result<Option<OperationalReadReport>, EngineError> {
1178    use rusqlite::types::Value;
1179
1180    let Some(matched) = match_append_only_secondary_index_read(filters, indexes) else {
1181        return Ok(None);
1182    };
1183
1184    let mut sql = String::from(
1185        "SELECT m.id, m.collection_name, m.record_key, m.op_kind, m.payload_json, m.source_ref, m.created_at \
1186         FROM operational_secondary_index_entries s \
1187         JOIN operational_mutations m ON m.id = s.mutation_id \
1188         WHERE s.collection_name = ?1 AND s.index_name = ?2 AND s.subject_kind = 'mutation' ",
1189    );
1190    let mut params = vec![
1191        Value::from(collection_name.to_owned()),
1192        Value::from(matched.index_name.to_owned()),
1193    ];
1194
1195    match &matched.value_filter.condition {
1196        OperationalReadCondition::ExactString(value) => {
1197            let _ = write!(sql, "AND s.slot1_text = ?{} ", params.len() + 1);
1198            params.push(Value::from(value.clone()));
1199        }
1200        OperationalReadCondition::Prefix(value) => {
1201            let _ = write!(sql, "AND s.slot1_text GLOB ?{} ", params.len() + 1);
1202            params.push(Value::from(glob_prefix_pattern(value)));
1203        }
1204        OperationalReadCondition::ExactInteger(value) => {
1205            let _ = write!(sql, "AND s.slot1_integer = ?{} ", params.len() + 1);
1206            params.push(Value::from(*value));
1207        }
1208        OperationalReadCondition::Range { .. } => return Ok(None),
1209    }
1210
1211    if let Some(time_range) = matched.time_range
1212        && let OperationalReadCondition::Range { lower, upper } = &time_range.condition
1213    {
1214        if let Some(lower) = lower {
1215            let _ = write!(sql, "AND s.sort_timestamp >= ?{} ", params.len() + 1);
1216            params.push(Value::from(*lower));
1217        }
1218        if let Some(upper) = upper {
1219            let _ = write!(sql, "AND s.sort_timestamp <= ?{} ", params.len() + 1);
1220            params.push(Value::from(*upper));
1221        }
1222    }
1223
1224    let _ = write!(
1225        sql,
1226        "ORDER BY s.sort_timestamp DESC, m.mutation_order DESC LIMIT ?{}",
1227        params.len() + 1
1228    );
1229    params.push(Value::from(i64::try_from(applied_limit + 1).map_err(
1230        |_| EngineError::Bridge("operational read limit overflow".to_owned()),
1231    )?));
1232
1233    let mut stmt = conn.prepare(&sql)?;
1234    let mut rows = stmt
1235        .query_map(
1236            rusqlite::params_from_iter(params),
1237            map_operational_mutation_row,
1238        )?
1239        .collect::<Result<Vec<_>, _>>()?;
1240    let was_limited = rows.len() > applied_limit;
1241    if was_limited {
1242        rows.truncate(applied_limit);
1243    }
1244
1245    Ok(Some(OperationalReadReport {
1246        collection_name: collection_name.to_owned(),
1247        row_count: rows.len(),
1248        applied_limit,
1249        was_limited,
1250        rows,
1251    }))
1252}
1253
1254fn execute_operational_filtered_read(
1255    conn: &rusqlite::Connection,
1256    collection_name: &str,
1257    filters: &[CompiledOperationalReadFilter],
1258    applied_limit: usize,
1259) -> Result<OperationalReadReport, EngineError> {
1260    use rusqlite::types::Value;
1261
1262    let mut sql = String::from(
1263        "SELECT m.id, m.collection_name, m.record_key, m.op_kind, m.payload_json, m.source_ref, m.created_at \
1264         FROM operational_mutations m ",
1265    );
1266    let mut params = vec![Value::from(collection_name.to_owned())];
1267    for (index, filter) in filters.iter().enumerate() {
1268        let _ = write!(
1269            sql,
1270            "JOIN operational_filter_values f{index} \
1271             ON f{index}.mutation_id = m.id \
1272            AND f{index}.collection_name = m.collection_name "
1273        );
1274        match &filter.condition {
1275            OperationalReadCondition::ExactString(value) => {
1276                let _ = write!(
1277                    sql,
1278                    "AND f{index}.field_name = ?{} AND f{index}.string_value = ?{} ",
1279                    params.len() + 1,
1280                    params.len() + 2
1281                );
1282                params.push(Value::from(filter.field.clone()));
1283                params.push(Value::from(value.clone()));
1284            }
1285            OperationalReadCondition::ExactInteger(value) => {
1286                let _ = write!(
1287                    sql,
1288                    "AND f{index}.field_name = ?{} AND f{index}.integer_value = ?{} ",
1289                    params.len() + 1,
1290                    params.len() + 2
1291                );
1292                params.push(Value::from(filter.field.clone()));
1293                params.push(Value::from(*value));
1294            }
1295            OperationalReadCondition::Prefix(value) => {
1296                let _ = write!(
1297                    sql,
1298                    "AND f{index}.field_name = ?{} AND f{index}.string_value GLOB ?{} ",
1299                    params.len() + 1,
1300                    params.len() + 2
1301                );
1302                params.push(Value::from(filter.field.clone()));
1303                params.push(Value::from(glob_prefix_pattern(value)));
1304            }
1305            OperationalReadCondition::Range { lower, upper } => {
1306                let _ = write!(sql, "AND f{index}.field_name = ?{} ", params.len() + 1);
1307                params.push(Value::from(filter.field.clone()));
1308                if let Some(lower) = lower {
1309                    let _ = write!(sql, "AND f{index}.integer_value >= ?{} ", params.len() + 1);
1310                    params.push(Value::from(*lower));
1311                }
1312                if let Some(upper) = upper {
1313                    let _ = write!(sql, "AND f{index}.integer_value <= ?{} ", params.len() + 1);
1314                    params.push(Value::from(*upper));
1315                }
1316            }
1317        }
1318    }
1319    let _ = write!(
1320        sql,
1321        "WHERE m.collection_name = ?1 ORDER BY m.mutation_order DESC LIMIT ?{}",
1322        params.len() + 1
1323    );
1324    params.push(Value::from(i64::try_from(applied_limit + 1).map_err(
1325        |_| EngineError::Bridge("operational read limit overflow".to_owned()),
1326    )?));
1327
1328    let mut stmt = conn.prepare(&sql)?;
1329    let mut rows = stmt
1330        .query_map(
1331            rusqlite::params_from_iter(params),
1332            map_operational_mutation_row,
1333        )?
1334        .collect::<Result<Vec<_>, _>>()?;
1335    let was_limited = rows.len() > applied_limit;
1336    if was_limited {
1337        rows.truncate(applied_limit);
1338    }
1339    Ok(OperationalReadReport {
1340        collection_name: collection_name.to_owned(),
1341        row_count: rows.len(),
1342        applied_limit,
1343        was_limited,
1344        rows,
1345    })
1346}
1347
1348fn glob_prefix_pattern(value: &str) -> String {
1349    let mut pattern = String::with_capacity(value.len() + 1);
1350    for ch in value.chars() {
1351        match ch {
1352            '*' => pattern.push_str("[*]"),
1353            '?' => pattern.push_str("[?]"),
1354            '[' => pattern.push_str("[[]"),
1355            _ => pattern.push(ch),
1356        }
1357    }
1358    pattern.push('*');
1359    pattern
1360}
1361
1362fn extract_operational_filter_values(
1363    filter_fields: &[OperationalFilterField],
1364    payload_json: &str,
1365) -> Vec<ExtractedOperationalFilterValue> {
1366    let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1367        return Vec::new();
1368    };
1369    let Some(object) = parsed.as_object() else {
1370        return Vec::new();
1371    };
1372
1373    filter_fields
1374        .iter()
1375        .filter_map(|field| {
1376            let value = object.get(&field.name)?;
1377            match field.field_type {
1378                OperationalFilterFieldType::String => {
1379                    value
1380                        .as_str()
1381                        .map(|string_value| ExtractedOperationalFilterValue {
1382                            field_name: field.name.clone(),
1383                            string_value: Some(string_value.to_owned()),
1384                            integer_value: None,
1385                        })
1386                }
1387                OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1388                    value
1389                        .as_i64()
1390                        .map(|integer_value| ExtractedOperationalFilterValue {
1391                            field_name: field.name.clone(),
1392                            string_value: None,
1393                            integer_value: Some(integer_value),
1394                        })
1395                }
1396            }
1397        })
1398        .collect()
1399}
1400
1401fn operational_compaction_candidates(
1402    conn: &rusqlite::Connection,
1403    retention_json: &str,
1404    collection_name: &str,
1405) -> Result<(Vec<String>, Option<i64>), EngineError> {
1406    operational_compaction_candidates_at(
1407        conn,
1408        retention_json,
1409        collection_name,
1410        current_unix_timestamp()?,
1411    )
1412}
1413
1414fn operational_compaction_candidates_at(
1415    conn: &rusqlite::Connection,
1416    retention_json: &str,
1417    collection_name: &str,
1418    now_timestamp: i64,
1419) -> Result<(Vec<String>, Option<i64>), EngineError> {
1420    let policy = parse_operational_retention_policy(retention_json)?;
1421    match policy {
1422        OperationalRetentionPolicy::KeepAll => Ok((Vec::new(), None)),
1423        OperationalRetentionPolicy::PurgeBeforeSeconds { max_age_seconds } => {
1424            let before_timestamp = now_timestamp - max_age_seconds;
1425            let mut stmt = conn.prepare(
1426                "SELECT id FROM operational_mutations \
1427                 WHERE collection_name = ?1 AND created_at < ?2 \
1428                 ORDER BY mutation_order",
1429            )?;
1430            let mutation_ids = stmt
1431                .query_map(
1432                    rusqlite::params![collection_name, before_timestamp],
1433                    |row| row.get::<_, String>(0),
1434                )?
1435                .collect::<Result<Vec<_>, _>>()?;
1436            Ok((mutation_ids, Some(before_timestamp)))
1437        }
1438        OperationalRetentionPolicy::KeepLast { max_rows } => {
1439            let mut stmt = conn.prepare(
1440                "SELECT id FROM operational_mutations \
1441                 WHERE collection_name = ?1 \
1442                 ORDER BY mutation_order DESC",
1443            )?;
1444            let ordered_ids = stmt
1445                .query_map([collection_name], |row| row.get::<_, String>(0))?
1446                .collect::<Result<Vec<_>, _>>()?;
1447            Ok((ordered_ids.into_iter().skip(max_rows).collect(), None))
1448        }
1449    }
1450}
1451
1452fn parse_operational_retention_policy(
1453    retention_json: &str,
1454) -> Result<OperationalRetentionPolicy, EngineError> {
1455    let policy: OperationalRetentionPolicy = serde_json::from_str(retention_json)
1456        .map_err(|error| EngineError::InvalidWrite(format!("invalid retention_json: {error}")))?;
1457    match policy {
1458        OperationalRetentionPolicy::KeepAll => Ok(policy),
1459        OperationalRetentionPolicy::PurgeBeforeSeconds { max_age_seconds } => {
1460            if max_age_seconds <= 0 {
1461                return Err(EngineError::InvalidWrite(
1462                    "retention_json max_age_seconds must be greater than zero".to_owned(),
1463                ));
1464            }
1465            Ok(policy)
1466        }
1467        OperationalRetentionPolicy::KeepLast { max_rows } => {
1468            if max_rows == 0 {
1469                return Err(EngineError::InvalidWrite(
1470                    "retention_json max_rows must be greater than zero".to_owned(),
1471                ));
1472            }
1473            Ok(policy)
1474        }
1475    }
1476}
1477
1478fn load_operational_retention_records(
1479    conn: &rusqlite::Connection,
1480    collection_names: Option<&[String]>,
1481    max_collections: Option<usize>,
1482) -> Result<Vec<OperationalCollectionRecord>, EngineError> {
1483    let limit = max_collections.unwrap_or(usize::MAX);
1484    if limit == 0 {
1485        return Err(EngineError::InvalidWrite(
1486            "max_collections must be greater than zero".to_owned(),
1487        ));
1488    }
1489
1490    let mut records = Vec::new();
1491    if let Some(collection_names) = collection_names {
1492        for name in collection_names.iter().take(limit) {
1493            let record = load_operational_collection_record(conn, name)?.ok_or_else(|| {
1494                EngineError::InvalidWrite(format!(
1495                    "operational collection '{name}' is not registered"
1496                ))
1497            })?;
1498            records.push(record);
1499        }
1500        return Ok(records);
1501    }
1502
1503    let mut stmt = conn.prepare(
1504        "SELECT name, kind, schema_json, retention_json, filter_fields_json, validation_json, secondary_indexes_json, format_version, created_at, disabled_at \
1505         FROM operational_collections ORDER BY name",
1506    )?;
1507    let rows = stmt
1508        .query_map([], map_operational_collection_row)?
1509        .take(limit)
1510        .collect::<Result<Vec<_>, _>>()?;
1511    Ok(rows)
1512}
1513
1514fn last_operational_retention_run_at(
1515    conn: &rusqlite::Connection,
1516    collection_name: &str,
1517) -> Result<Option<i64>, EngineError> {
1518    conn.query_row(
1519        "SELECT MAX(executed_at) FROM operational_retention_runs WHERE collection_name = ?1",
1520        [collection_name],
1521        |row| row.get(0),
1522    )
1523    .optional()
1524    .map_err(EngineError::Sqlite)
1525    .map(Option::flatten)
1526}
1527
1528fn count_operational_mutations_for_collection(
1529    conn: &rusqlite::Connection,
1530    collection_name: &str,
1531) -> Result<usize, EngineError> {
1532    let count: i64 = conn.query_row(
1533        "SELECT count(*) FROM operational_mutations WHERE collection_name = ?1",
1534        [collection_name],
1535        |row| row.get(0),
1536    )?;
1537    usize::try_from(count).map_err(|_| {
1538        EngineError::Bridge(format!("count overflow for collection {collection_name}"))
1539    })
1540}
1541
1542fn retention_action_kind_and_limit(
1543    policy: &OperationalRetentionPolicy,
1544) -> (OperationalRetentionActionKind, Option<usize>) {
1545    match policy {
1546        OperationalRetentionPolicy::KeepAll => (OperationalRetentionActionKind::Noop, None),
1547        OperationalRetentionPolicy::PurgeBeforeSeconds { .. } => {
1548            (OperationalRetentionActionKind::PurgeBeforeSeconds, None)
1549        }
1550        OperationalRetentionPolicy::KeepLast { max_rows } => {
1551            (OperationalRetentionActionKind::KeepLast, Some(*max_rows))
1552        }
1553    }
1554}
1555
1556fn plan_operational_retention_item(
1557    conn: &rusqlite::Connection,
1558    record: &OperationalCollectionRecord,
1559    now_timestamp: i64,
1560) -> Result<OperationalRetentionPlanItem, EngineError> {
1561    let last_run_at = last_operational_retention_run_at(conn, &record.name)?;
1562    if record.kind != OperationalCollectionKind::AppendOnlyLog {
1563        return Ok(OperationalRetentionPlanItem {
1564            collection_name: record.name.clone(),
1565            action_kind: OperationalRetentionActionKind::Noop,
1566            candidate_deletions: 0,
1567            before_timestamp: None,
1568            max_rows: None,
1569            last_run_at,
1570        });
1571    }
1572    let policy = parse_operational_retention_policy(&record.retention_json)?;
1573    let (action_kind, max_rows) = retention_action_kind_and_limit(&policy);
1574    let (candidate_ids, before_timestamp) = operational_compaction_candidates_at(
1575        conn,
1576        &record.retention_json,
1577        &record.name,
1578        now_timestamp,
1579    )?;
1580    Ok(OperationalRetentionPlanItem {
1581        collection_name: record.name.clone(),
1582        action_kind,
1583        candidate_deletions: candidate_ids.len(),
1584        before_timestamp,
1585        max_rows,
1586        last_run_at,
1587    })
1588}
1589
1590fn run_operational_retention_item(
1591    tx: &rusqlite::Transaction<'_>,
1592    record: &OperationalCollectionRecord,
1593    now_timestamp: i64,
1594    dry_run: bool,
1595) -> Result<OperationalRetentionRunItem, EngineError> {
1596    let plan = plan_operational_retention_item(tx, record, now_timestamp)?;
1597    let mut deleted_mutations = 0usize;
1598    if record.kind == OperationalCollectionKind::AppendOnlyLog
1599        && plan.action_kind != OperationalRetentionActionKind::Noop
1600        && plan.candidate_deletions > 0
1601        && !dry_run
1602    {
1603        let (candidate_ids, _) = operational_compaction_candidates_at(
1604            tx,
1605            &record.retention_json,
1606            &record.name,
1607            now_timestamp,
1608        )?;
1609        let mut delete_stmt =
1610            tx.prepare_cached("DELETE FROM operational_mutations WHERE id = ?1")?;
1611        for mutation_id in &candidate_ids {
1612            delete_stmt.execute([mutation_id.as_str()])?;
1613            deleted_mutations += 1;
1614        }
1615        drop(delete_stmt);
1616
1617        persist_simple_provenance_event(
1618            tx,
1619            "operational_retention_run",
1620            &record.name,
1621            Some(serde_json::json!({
1622                "action_kind": plan.action_kind,
1623                "deleted_mutations": deleted_mutations,
1624                "before_timestamp": plan.before_timestamp,
1625                "max_rows": plan.max_rows,
1626                "executed_at": now_timestamp,
1627            })),
1628        )?;
1629    }
1630
1631    let live_rows_remaining = count_operational_mutations_for_collection(tx, &record.name)?;
1632    let effective_deleted_mutations = if dry_run {
1633        plan.candidate_deletions
1634    } else {
1635        deleted_mutations
1636    };
1637    let rows_remaining = if dry_run {
1638        live_rows_remaining.saturating_sub(effective_deleted_mutations)
1639    } else {
1640        live_rows_remaining
1641    };
1642    if !dry_run && plan.action_kind != OperationalRetentionActionKind::Noop {
1643        tx.execute(
1644            "INSERT INTO operational_retention_runs \
1645             (id, collection_name, executed_at, action_kind, dry_run, deleted_mutations, rows_remaining, metadata_json) \
1646             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1647            rusqlite::params![
1648                new_id(),
1649                record.name,
1650                now_timestamp,
1651                serde_json::to_string(&plan.action_kind)
1652                    .unwrap_or_else(|_| "\"noop\"".to_owned())
1653                    .trim_matches('"')
1654                    .to_owned(),
1655                i32::from(dry_run),
1656                deleted_mutations,
1657                rows_remaining,
1658                serde_json::json!({
1659                    "before_timestamp": plan.before_timestamp,
1660                    "max_rows": plan.max_rows,
1661                })
1662                .to_string(),
1663            ],
1664        )?;
1665    }
1666
1667    Ok(OperationalRetentionRunItem {
1668        collection_name: plan.collection_name,
1669        action_kind: plan.action_kind,
1670        deleted_mutations: effective_deleted_mutations,
1671        before_timestamp: plan.before_timestamp,
1672        max_rows: plan.max_rows,
1673        rows_remaining,
1674    })
1675}
1676
1677fn current_unix_timestamp() -> Result<i64, EngineError> {
1678    let now = SystemTime::now()
1679        .duration_since(SystemTime::UNIX_EPOCH)
1680        .map_err(|error| EngineError::Bridge(format!("system clock error: {error}")))?;
1681    i64::try_from(now.as_secs())
1682        .map_err(|_| EngineError::Bridge("unix timestamp overflow".to_owned()))
1683}