1use std::fmt::Write as _;
2use std::io;
3use std::time::SystemTime;
4
5use rusqlite::{OptionalExtension, TransactionBehavior};
6use serde::Deserialize;
7
8use super::{
9 AdminService, DEFAULT_OPERATIONAL_READ_LIMIT, EngineError, MAX_OPERATIONAL_READ_LIMIT,
10 persist_simple_provenance_event, rebuild_operational_current_rows,
11};
12use crate::ids::new_id;
13use crate::operational::{
14 OperationalCollectionKind, OperationalCollectionRecord, OperationalCompactionReport,
15 OperationalCurrentRow, OperationalFilterClause, OperationalFilterField,
16 OperationalFilterFieldType, OperationalFilterMode, OperationalFilterValue,
17 OperationalHistoryValidationIssue, OperationalHistoryValidationReport, OperationalMutationRow,
18 OperationalPurgeReport, OperationalReadReport, OperationalReadRequest,
19 OperationalRegisterRequest, OperationalRepairReport, OperationalRetentionActionKind,
20 OperationalRetentionPlanItem, OperationalRetentionPlanReport, OperationalRetentionRunItem,
21 OperationalRetentionRunReport, OperationalSecondaryIndexDefinition,
22 OperationalSecondaryIndexRebuildReport, OperationalTraceReport,
23 extract_secondary_index_entries_for_current, extract_secondary_index_entries_for_mutation,
24 parse_operational_secondary_indexes_json, parse_operational_validation_contract,
25 validate_operational_payload_against_contract,
26};
27
28#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize)]
29#[serde(tag = "mode", rename_all = "snake_case")]
30enum OperationalRetentionPolicy {
31 KeepAll,
32 PurgeBeforeSeconds { max_age_seconds: i64 },
33 KeepLast { max_rows: usize },
34}
35
36#[derive(Clone, Debug, PartialEq, Eq)]
37struct CompiledOperationalReadFilter {
38 field: String,
39 condition: OperationalReadCondition,
40}
41
42#[derive(Clone, Debug)]
43struct MatchedAppendOnlySecondaryIndexRead<'a> {
44 index_name: &'a str,
45 value_filter: &'a CompiledOperationalReadFilter,
46 time_range: Option<&'a CompiledOperationalReadFilter>,
47}
48
49#[derive(Clone, Debug, PartialEq, Eq)]
50enum OperationalReadCondition {
51 ExactString(String),
52 ExactInteger(i64),
53 Prefix(String),
54 Range {
55 lower: Option<i64>,
56 upper: Option<i64>,
57 },
58}
59
60#[derive(Clone, Debug, PartialEq, Eq)]
61struct ExtractedOperationalFilterValue {
62 field_name: String,
63 string_value: Option<String>,
64 integer_value: Option<i64>,
65}
66
67impl AdminService {
68 pub fn register_operational_collection(
71 &self,
72 request: &OperationalRegisterRequest,
73 ) -> Result<OperationalCollectionRecord, EngineError> {
74 if request.name.trim().is_empty() {
75 return Err(EngineError::InvalidWrite(
76 "operational collection name must not be empty".to_owned(),
77 ));
78 }
79 if request.schema_json.is_empty() {
80 return Err(EngineError::InvalidWrite(
81 "operational collection schema_json must not be empty".to_owned(),
82 ));
83 }
84 if request.retention_json.is_empty() {
85 return Err(EngineError::InvalidWrite(
86 "operational collection retention_json must not be empty".to_owned(),
87 ));
88 }
89 if request.filter_fields_json.is_empty() {
90 return Err(EngineError::InvalidWrite(
91 "operational collection filter_fields_json must not be empty".to_owned(),
92 ));
93 }
94 parse_operational_validation_contract(&request.validation_json)
95 .map_err(EngineError::InvalidWrite)?;
96 parse_operational_secondary_indexes_json(&request.secondary_indexes_json, request.kind)
97 .map_err(EngineError::InvalidWrite)?;
98 if request.format_version <= 0 {
99 return Err(EngineError::InvalidWrite(
100 "operational collection format_version must be positive".to_owned(),
101 ));
102 }
103 parse_operational_filter_fields(&request.filter_fields_json)
104 .map_err(EngineError::InvalidWrite)?;
105
106 let mut conn = self.connect()?;
107 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
108 tx.execute(
109 "INSERT INTO operational_collections \
110 (name, kind, schema_json, retention_json, filter_fields_json, validation_json, secondary_indexes_json, format_version, created_at) \
111 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, unixepoch())",
112 rusqlite::params![
113 request.name.as_str(),
114 request.kind.as_str(),
115 request.schema_json.as_str(),
116 request.retention_json.as_str(),
117 request.filter_fields_json.as_str(),
118 request.validation_json.as_str(),
119 request.secondary_indexes_json.as_str(),
120 request.format_version,
121 ],
122 )?;
123 persist_simple_provenance_event(
124 &tx,
125 "operational_collection_registered",
126 request.name.as_str(),
127 Some(serde_json::json!({
128 "kind": request.kind.as_str(),
129 "format_version": request.format_version,
130 })),
131 )?;
132 tx.commit()?;
133
134 self.describe_operational_collection(&request.name)?
135 .ok_or_else(|| {
136 EngineError::Bridge("registered collection missing after commit".to_owned())
137 })
138 }
139
140 pub fn describe_operational_collection(
143 &self,
144 name: &str,
145 ) -> Result<Option<OperationalCollectionRecord>, EngineError> {
146 let conn = self.connect()?;
147 load_operational_collection_record(&conn, name)
148 }
149
150 pub fn update_operational_collection_filters(
154 &self,
155 name: &str,
156 filter_fields_json: &str,
157 ) -> Result<OperationalCollectionRecord, EngineError> {
158 if filter_fields_json.is_empty() {
159 return Err(EngineError::InvalidWrite(
160 "operational collection filter_fields_json must not be empty".to_owned(),
161 ));
162 }
163 let declared_fields = parse_operational_filter_fields(filter_fields_json)
164 .map_err(EngineError::InvalidWrite)?;
165
166 let mut conn = self.connect()?;
167 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
168 load_operational_collection_record(&tx, name)?.ok_or_else(|| {
169 EngineError::InvalidWrite(format!("operational collection '{name}' is not registered"))
170 })?;
171 tx.execute(
172 "UPDATE operational_collections SET filter_fields_json = ?2 WHERE name = ?1",
173 rusqlite::params![name, filter_fields_json],
174 )?;
175 tx.execute(
176 "DELETE FROM operational_filter_values WHERE collection_name = ?1",
177 [name],
178 )?;
179
180 let mut mutation_stmt = tx.prepare(
181 "SELECT id, payload_json FROM operational_mutations \
182 WHERE collection_name = ?1 ORDER BY mutation_order",
183 )?;
184 let mutations = mutation_stmt
185 .query_map([name], |row| {
186 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
187 })?
188 .collect::<Result<Vec<_>, _>>()?;
189 drop(mutation_stmt);
190
191 let mut insert_filter_value = tx.prepare_cached(
192 "INSERT INTO operational_filter_values \
193 (mutation_id, collection_name, field_name, string_value, integer_value) \
194 VALUES (?1, ?2, ?3, ?4, ?5)",
195 )?;
196 let mut inserted_values = 0usize;
197 for (mutation_id, payload_json) in &mutations {
198 for filter_value in
199 extract_operational_filter_values(&declared_fields, payload_json.as_str())
200 {
201 insert_filter_value.execute(rusqlite::params![
202 mutation_id,
203 name,
204 filter_value.field_name,
205 filter_value.string_value,
206 filter_value.integer_value,
207 ])?;
208 inserted_values += 1;
209 }
210 }
211 drop(insert_filter_value);
212
213 persist_simple_provenance_event(
214 &tx,
215 "operational_collection_filter_fields_updated",
216 name,
217 Some(serde_json::json!({
218 "field_count": declared_fields.len(),
219 "mutations_backfilled": mutations.len(),
220 "inserted_filter_values": inserted_values,
221 })),
222 )?;
223 let updated = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
224 EngineError::Bridge("operational collection missing after filter update".to_owned())
225 })?;
226 tx.commit()?;
227 Ok(updated)
228 }
229
230 pub fn update_operational_collection_validation(
233 &self,
234 name: &str,
235 validation_json: &str,
236 ) -> Result<OperationalCollectionRecord, EngineError> {
237 parse_operational_validation_contract(validation_json)
238 .map_err(EngineError::InvalidWrite)?;
239
240 let mut conn = self.connect()?;
241 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
242 load_operational_collection_record(&tx, name)?.ok_or_else(|| {
243 EngineError::InvalidWrite(format!("operational collection '{name}' is not registered"))
244 })?;
245 tx.execute(
246 "UPDATE operational_collections SET validation_json = ?2 WHERE name = ?1",
247 rusqlite::params![name, validation_json],
248 )?;
249 persist_simple_provenance_event(
250 &tx,
251 "operational_collection_validation_updated",
252 name,
253 Some(serde_json::json!({
254 "has_validation": !validation_json.is_empty(),
255 })),
256 )?;
257 let updated = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
258 EngineError::Bridge("operational collection missing after validation update".to_owned())
259 })?;
260 tx.commit()?;
261 Ok(updated)
262 }
263
264 pub fn update_operational_collection_secondary_indexes(
268 &self,
269 name: &str,
270 secondary_indexes_json: &str,
271 ) -> Result<OperationalCollectionRecord, EngineError> {
272 let mut conn = self.connect()?;
273 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
274 let record = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
275 EngineError::InvalidWrite(format!("operational collection '{name}' is not registered"))
276 })?;
277 let indexes = parse_operational_secondary_indexes_json(secondary_indexes_json, record.kind)
278 .map_err(EngineError::InvalidWrite)?;
279 tx.execute(
280 "UPDATE operational_collections SET secondary_indexes_json = ?2 WHERE name = ?1",
281 rusqlite::params![name, secondary_indexes_json],
282 )?;
283 let (mutation_entries_rebuilt, current_entries_rebuilt) =
284 rebuild_operational_secondary_index_entries(&tx, &record.name, record.kind, &indexes)?;
285 persist_simple_provenance_event(
286 &tx,
287 "operational_collection_secondary_indexes_updated",
288 name,
289 Some(serde_json::json!({
290 "index_count": indexes.len(),
291 "mutation_entries_rebuilt": mutation_entries_rebuilt,
292 "current_entries_rebuilt": current_entries_rebuilt,
293 })),
294 )?;
295 let updated = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
296 EngineError::Bridge(
297 "operational collection missing after secondary index update".to_owned(),
298 )
299 })?;
300 tx.commit()?;
301 Ok(updated)
302 }
303
304 pub fn rebuild_operational_secondary_indexes(
307 &self,
308 name: &str,
309 ) -> Result<OperationalSecondaryIndexRebuildReport, EngineError> {
310 let mut conn = self.connect()?;
311 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
312 let record = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
313 EngineError::InvalidWrite(format!("operational collection '{name}' is not registered"))
314 })?;
315 let indexes =
316 parse_operational_secondary_indexes_json(&record.secondary_indexes_json, record.kind)
317 .map_err(EngineError::InvalidWrite)?;
318 let (mutation_entries_rebuilt, current_entries_rebuilt) =
319 rebuild_operational_secondary_index_entries(&tx, &record.name, record.kind, &indexes)?;
320 persist_simple_provenance_event(
321 &tx,
322 "operational_secondary_indexes_rebuilt",
323 name,
324 Some(serde_json::json!({
325 "index_count": indexes.len(),
326 "mutation_entries_rebuilt": mutation_entries_rebuilt,
327 "current_entries_rebuilt": current_entries_rebuilt,
328 })),
329 )?;
330 tx.commit()?;
331 Ok(OperationalSecondaryIndexRebuildReport {
332 collection_name: name.to_owned(),
333 mutation_entries_rebuilt,
334 current_entries_rebuilt,
335 })
336 }
337
338 pub fn validate_operational_collection_history(
341 &self,
342 name: &str,
343 ) -> Result<OperationalHistoryValidationReport, EngineError> {
344 let conn = self.connect()?;
345 let record = load_operational_collection_record(&conn, name)?.ok_or_else(|| {
346 EngineError::InvalidWrite(format!("operational collection '{name}' is not registered"))
347 })?;
348 let Some(contract) = parse_operational_validation_contract(&record.validation_json)
349 .map_err(EngineError::InvalidWrite)?
350 else {
351 return Err(EngineError::InvalidWrite(format!(
352 "operational collection '{name}' has no validation_json configured"
353 )));
354 };
355
356 let mut stmt = conn.prepare(
357 "SELECT id, record_key, op_kind, payload_json FROM operational_mutations \
358 WHERE collection_name = ?1 ORDER BY mutation_order",
359 )?;
360 let rows = stmt
361 .query_map([name], |row| {
362 Ok((
363 row.get::<_, String>(0)?,
364 row.get::<_, String>(1)?,
365 row.get::<_, String>(2)?,
366 row.get::<_, String>(3)?,
367 ))
368 })?
369 .collect::<Result<Vec<_>, _>>()?;
370 drop(stmt);
371
372 let mut checked_rows = 0usize;
373 let mut issues = Vec::new();
374 for (mutation_id, record_key, op_kind, payload_json) in rows {
375 if op_kind == "delete" {
376 continue;
377 }
378 checked_rows += 1;
379 if let Err(message) =
380 validate_operational_payload_against_contract(&contract, payload_json.as_str())
381 {
382 issues.push(OperationalHistoryValidationIssue {
383 mutation_id,
384 record_key,
385 op_kind,
386 message,
387 });
388 }
389 }
390
391 Ok(OperationalHistoryValidationReport {
392 collection_name: name.to_owned(),
393 checked_rows,
394 invalid_row_count: issues.len(),
395 issues,
396 })
397 }
398
399 pub fn disable_operational_collection(
402 &self,
403 name: &str,
404 ) -> Result<OperationalCollectionRecord, EngineError> {
405 let mut conn = self.connect()?;
406 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
407 let record = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
408 EngineError::InvalidWrite(format!("operational collection '{name}' is not registered"))
409 })?;
410 let changed = if record.disabled_at.is_none() {
411 tx.execute(
412 "UPDATE operational_collections SET disabled_at = unixepoch() WHERE name = ?1",
413 [name],
414 )?;
415 true
416 } else {
417 false
418 };
419 let record = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
420 EngineError::Bridge("operational collection missing after disable".to_owned())
421 })?;
422 persist_simple_provenance_event(
423 &tx,
424 "operational_collection_disabled",
425 name,
426 Some(serde_json::json!({
427 "disabled_at": record.disabled_at,
428 "changed": changed,
429 })),
430 )?;
431 tx.commit()?;
432 Ok(record)
433 }
434
435 pub fn compact_operational_collection(
438 &self,
439 name: &str,
440 dry_run: bool,
441 ) -> Result<OperationalCompactionReport, EngineError> {
442 let mut conn = self.connect()?;
443 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
444 let collection = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
445 EngineError::InvalidWrite(format!("operational collection '{name}' is not registered"))
446 })?;
447 validate_append_only_operational_collection(&collection, "compact")?;
448 let (mutation_ids, before_timestamp) =
449 operational_compaction_candidates(&tx, &collection.retention_json, name)?;
450 if dry_run {
451 drop(tx);
452 return Ok(OperationalCompactionReport {
453 collection_name: name.to_owned(),
454 deleted_mutations: mutation_ids.len(),
455 dry_run: true,
456 before_timestamp,
457 });
458 }
459 let mut delete_stmt =
460 tx.prepare_cached("DELETE FROM operational_mutations WHERE id = ?1")?;
461 for mutation_id in &mutation_ids {
462 delete_stmt.execute([mutation_id.as_str()])?;
463 }
464 drop(delete_stmt);
465 persist_simple_provenance_event(
466 &tx,
467 "operational_collection_compacted",
468 name,
469 Some(serde_json::json!({
470 "deleted_mutations": mutation_ids.len(),
471 "before_timestamp": before_timestamp,
472 })),
473 )?;
474 tx.commit()?;
475 Ok(OperationalCompactionReport {
476 collection_name: name.to_owned(),
477 deleted_mutations: mutation_ids.len(),
478 dry_run: false,
479 before_timestamp,
480 })
481 }
482
483 pub fn purge_operational_collection(
486 &self,
487 name: &str,
488 before_timestamp: i64,
489 ) -> Result<OperationalPurgeReport, EngineError> {
490 let mut conn = self.connect()?;
491 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
492 let collection = load_operational_collection_record(&tx, name)?.ok_or_else(|| {
493 EngineError::InvalidWrite(format!("operational collection '{name}' is not registered"))
494 })?;
495 validate_append_only_operational_collection(&collection, "purge")?;
496 let deleted_mutations = tx.execute(
497 "DELETE FROM operational_mutations WHERE collection_name = ?1 AND created_at < ?2",
498 rusqlite::params![name, before_timestamp],
499 )?;
500 persist_simple_provenance_event(
501 &tx,
502 "operational_collection_purged",
503 name,
504 Some(serde_json::json!({
505 "deleted_mutations": deleted_mutations,
506 "before_timestamp": before_timestamp,
507 })),
508 )?;
509 tx.commit()?;
510 Ok(OperationalPurgeReport {
511 collection_name: name.to_owned(),
512 deleted_mutations,
513 before_timestamp,
514 })
515 }
516
517 pub fn plan_operational_retention(
520 &self,
521 now_timestamp: i64,
522 collection_names: Option<&[String]>,
523 max_collections: Option<usize>,
524 ) -> Result<OperationalRetentionPlanReport, EngineError> {
525 let conn = self.connect()?;
526 let records = load_operational_retention_records(&conn, collection_names, max_collections)?;
527 let mut items = Vec::with_capacity(records.len());
528 for record in records {
529 items.push(plan_operational_retention_item(
530 &conn,
531 &record,
532 now_timestamp,
533 )?);
534 }
535 Ok(OperationalRetentionPlanReport {
536 planned_at: now_timestamp,
537 collections_examined: items.len(),
538 items,
539 })
540 }
541
542 pub fn run_operational_retention(
545 &self,
546 now_timestamp: i64,
547 collection_names: Option<&[String]>,
548 max_collections: Option<usize>,
549 dry_run: bool,
550 ) -> Result<OperationalRetentionRunReport, EngineError> {
551 let mut conn = self.connect()?;
552 let records = load_operational_retention_records(&conn, collection_names, max_collections)?;
553 let mut items = Vec::with_capacity(records.len());
554 let mut collections_acted_on = 0usize;
555
556 for record in records {
557 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
558 let item = run_operational_retention_item(&tx, &record, now_timestamp, dry_run)?;
559 if item.deleted_mutations > 0 {
560 collections_acted_on += 1;
561 }
562 if dry_run || item.action_kind == OperationalRetentionActionKind::Noop {
563 drop(tx);
564 } else {
565 tx.commit()?;
566 }
567 items.push(item);
568 }
569
570 Ok(OperationalRetentionRunReport {
571 executed_at: now_timestamp,
572 collections_examined: items.len(),
573 collections_acted_on,
574 dry_run,
575 items,
576 })
577 }
578
579 pub fn trace_operational_collection(
582 &self,
583 collection_name: &str,
584 record_key: Option<&str>,
585 ) -> Result<OperationalTraceReport, EngineError> {
586 let conn = self.connect()?;
587 ensure_operational_collection_registered(&conn, collection_name)?;
588 let mutations = if let Some(record_key) = record_key {
589 let mut stmt = conn.prepare(
590 "SELECT id, collection_name, record_key, op_kind, payload_json, source_ref, created_at \
591 FROM operational_mutations \
592 WHERE collection_name = ?1 AND record_key = ?2 \
593 ORDER BY mutation_order",
594 )?;
595 stmt.query_map([collection_name, record_key], map_operational_mutation_row)?
596 .collect::<Result<Vec<_>, _>>()?
597 } else {
598 let mut stmt = conn.prepare(
599 "SELECT id, collection_name, record_key, op_kind, payload_json, source_ref, created_at \
600 FROM operational_mutations \
601 WHERE collection_name = ?1 \
602 ORDER BY mutation_order",
603 )?;
604 stmt.query_map([collection_name], map_operational_mutation_row)?
605 .collect::<Result<Vec<_>, _>>()?
606 };
607 let current_rows = if let Some(record_key) = record_key {
608 let mut stmt = conn.prepare(
609 "SELECT collection_name, record_key, payload_json, updated_at, last_mutation_id \
610 FROM operational_current \
611 WHERE collection_name = ?1 AND record_key = ?2 \
612 ORDER BY updated_at, record_key",
613 )?;
614 stmt.query_map([collection_name, record_key], map_operational_current_row)?
615 .collect::<Result<Vec<_>, _>>()?
616 } else {
617 let mut stmt = conn.prepare(
618 "SELECT collection_name, record_key, payload_json, updated_at, last_mutation_id \
619 FROM operational_current \
620 WHERE collection_name = ?1 \
621 ORDER BY updated_at, record_key",
622 )?;
623 stmt.query_map([collection_name], map_operational_current_row)?
624 .collect::<Result<Vec<_>, _>>()?
625 };
626
627 Ok(OperationalTraceReport {
628 collection_name: collection_name.to_owned(),
629 record_key: record_key.map(str::to_owned),
630 mutation_count: mutations.len(),
631 current_count: current_rows.len(),
632 mutations,
633 current_rows,
634 })
635 }
636
637 pub fn read_operational_collection(
640 &self,
641 request: &OperationalReadRequest,
642 ) -> Result<OperationalReadReport, EngineError> {
643 if request.collection_name.trim().is_empty() {
644 return Err(EngineError::InvalidWrite(
645 "operational read collection_name must not be empty".to_owned(),
646 ));
647 }
648 if request.filters.is_empty() {
649 return Err(EngineError::InvalidWrite(
650 "operational read requires at least one filter clause".to_owned(),
651 ));
652 }
653
654 let conn = self.connect()?;
655 let record = load_operational_collection_record(&conn, &request.collection_name)?
656 .ok_or_else(|| {
657 EngineError::InvalidWrite(format!(
658 "operational collection '{}' is not registered",
659 request.collection_name
660 ))
661 })?;
662 validate_append_only_operational_collection(&record, "read")?;
663 let declared_fields = parse_operational_filter_fields(&record.filter_fields_json)
664 .map_err(EngineError::InvalidWrite)?;
665 let secondary_indexes =
666 parse_operational_secondary_indexes_json(&record.secondary_indexes_json, record.kind)
667 .map_err(EngineError::InvalidWrite)?;
668 let applied_limit = operational_read_limit(request.limit)?;
669 let filters = compile_operational_read_filters(&request.filters, &declared_fields)?;
670 if let Some(report) = execute_operational_secondary_index_read(
671 &conn,
672 &request.collection_name,
673 &filters,
674 &secondary_indexes,
675 applied_limit,
676 )? {
677 return Ok(report);
678 }
679 execute_operational_filtered_read(&conn, &request.collection_name, &filters, applied_limit)
680 }
681
682 pub fn rebuild_operational_current(
685 &self,
686 collection_name: Option<&str>,
687 ) -> Result<OperationalRepairReport, EngineError> {
688 let mut conn = self.connect()?;
689 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
690 let collections = if let Some(name) = collection_name {
691 let maybe_kind: Option<String> = tx
692 .query_row(
693 "SELECT kind FROM operational_collections WHERE name = ?1",
694 [name],
695 |row| row.get(0),
696 )
697 .optional()?;
698 let Some(kind) = maybe_kind else {
699 return Err(EngineError::InvalidWrite(format!(
700 "operational collection '{name}' is not registered"
701 )));
702 };
703 if kind != OperationalCollectionKind::LatestState.as_str() {
704 return Err(EngineError::InvalidWrite(format!(
705 "operational collection '{name}' is not latest_state"
706 )));
707 }
708 vec![name.to_owned()]
709 } else {
710 let mut stmt = tx.prepare(
711 "SELECT name FROM operational_collections WHERE kind = 'latest_state' ORDER BY name",
712 )?;
713 stmt.query_map([], |row| row.get::<_, String>(0))?
714 .collect::<Result<Vec<_>, _>>()?
715 };
716
717 let rebuilt_rows = rebuild_operational_current_rows(&tx, &collections)?;
718 for collection in &collections {
719 let record = load_operational_collection_record(&tx, collection)?.ok_or_else(|| {
720 EngineError::Bridge(format!(
721 "operational collection '{collection}' missing during current rebuild"
722 ))
723 })?;
724 let indexes = parse_operational_secondary_indexes_json(
725 &record.secondary_indexes_json,
726 record.kind,
727 )
728 .map_err(EngineError::InvalidWrite)?;
729 if !indexes.is_empty() {
730 rebuild_operational_secondary_index_entries(
731 &tx,
732 &record.name,
733 record.kind,
734 &indexes,
735 )?;
736 }
737 }
738
739 persist_simple_provenance_event(
740 &tx,
741 "operational_current_rebuilt",
742 collection_name.unwrap_or("*"),
743 Some(serde_json::json!({
744 "collections_rebuilt": collections.len(),
745 "current_rows_rebuilt": rebuilt_rows,
746 })),
747 )?;
748 tx.commit()?;
749
750 Ok(OperationalRepairReport {
751 collections_rebuilt: collections.len(),
752 current_rows_rebuilt: rebuilt_rows,
753 })
754 }
755}
756
757pub(super) fn load_operational_collection_record(
758 conn: &rusqlite::Connection,
759 name: &str,
760) -> Result<Option<OperationalCollectionRecord>, EngineError> {
761 conn.query_row(
762 "SELECT name, kind, schema_json, retention_json, filter_fields_json, validation_json, secondary_indexes_json, format_version, created_at, disabled_at \
763 FROM operational_collections WHERE name = ?1",
764 [name],
765 map_operational_collection_row,
766 )
767 .optional()
768 .map_err(EngineError::Sqlite)
769}
770
771pub(super) fn map_operational_mutation_row(
772 row: &rusqlite::Row<'_>,
773) -> Result<OperationalMutationRow, rusqlite::Error> {
774 Ok(OperationalMutationRow {
775 id: row.get(0)?,
776 collection_name: row.get(1)?,
777 record_key: row.get(2)?,
778 op_kind: row.get(3)?,
779 payload_json: row.get(4)?,
780 source_ref: row.get(5)?,
781 created_at: row.get(6)?,
782 })
783}
784
785pub(super) fn map_operational_current_row(
786 row: &rusqlite::Row<'_>,
787) -> Result<OperationalCurrentRow, rusqlite::Error> {
788 Ok(OperationalCurrentRow {
789 collection_name: row.get(0)?,
790 record_key: row.get(1)?,
791 payload_json: row.get(2)?,
792 updated_at: row.get(3)?,
793 last_mutation_id: row.get(4)?,
794 })
795}
796
797fn map_operational_collection_row(
798 row: &rusqlite::Row<'_>,
799) -> Result<OperationalCollectionRecord, rusqlite::Error> {
800 let kind_text: String = row.get(1)?;
801 let kind = OperationalCollectionKind::try_from(kind_text.as_str()).map_err(|message| {
802 rusqlite::Error::FromSqlConversionFailure(
803 1,
804 rusqlite::types::Type::Text,
805 Box::new(io::Error::new(io::ErrorKind::InvalidData, message)),
806 )
807 })?;
808 Ok(OperationalCollectionRecord {
809 name: row.get(0)?,
810 kind,
811 schema_json: row.get(2)?,
812 retention_json: row.get(3)?,
813 filter_fields_json: row.get(4)?,
814 validation_json: row.get(5)?,
815 secondary_indexes_json: row.get(6)?,
816 format_version: row.get(7)?,
817 created_at: row.get(8)?,
818 disabled_at: row.get(9)?,
819 })
820}
821
822fn validate_append_only_operational_collection(
823 record: &OperationalCollectionRecord,
824 operation: &str,
825) -> Result<(), EngineError> {
826 if record.kind != OperationalCollectionKind::AppendOnlyLog {
827 return Err(EngineError::InvalidWrite(format!(
828 "operational collection '{}' must be append_only_log to {operation}",
829 record.name
830 )));
831 }
832 Ok(())
833}
834
835fn ensure_operational_collection_registered(
836 conn: &rusqlite::Connection,
837 collection_name: &str,
838) -> Result<(), EngineError> {
839 if load_operational_collection_record(conn, collection_name)?.is_none() {
840 return Err(EngineError::InvalidWrite(format!(
841 "operational collection '{collection_name}' is not registered"
842 )));
843 }
844 Ok(())
845}
846
847fn clear_operational_secondary_index_entries(
848 tx: &rusqlite::Transaction<'_>,
849 collection_name: &str,
850) -> Result<(), EngineError> {
851 tx.execute(
852 "DELETE FROM operational_secondary_index_entries WHERE collection_name = ?1",
853 [collection_name],
854 )?;
855 Ok(())
856}
857
858fn insert_operational_secondary_index_entry(
859 tx: &rusqlite::Transaction<'_>,
860 collection_name: &str,
861 subject_kind: &str,
862 mutation_id: &str,
863 record_key: &str,
864 entry: &crate::operational::OperationalSecondaryIndexEntry,
865) -> Result<(), EngineError> {
866 tx.execute(
867 "INSERT INTO operational_secondary_index_entries \
868 (collection_name, index_name, subject_kind, mutation_id, record_key, sort_timestamp, \
869 slot1_text, slot1_integer, slot2_text, slot2_integer, slot3_text, slot3_integer) \
870 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
871 rusqlite::params![
872 collection_name,
873 entry.index_name,
874 subject_kind,
875 mutation_id,
876 record_key,
877 entry.sort_timestamp,
878 entry.slot1_text,
879 entry.slot1_integer,
880 entry.slot2_text,
881 entry.slot2_integer,
882 entry.slot3_text,
883 entry.slot3_integer,
884 ],
885 )?;
886 Ok(())
887}
888
889pub(super) fn rebuild_operational_secondary_index_entries(
890 tx: &rusqlite::Transaction<'_>,
891 collection_name: &str,
892 collection_kind: OperationalCollectionKind,
893 indexes: &[OperationalSecondaryIndexDefinition],
894) -> Result<(usize, usize), EngineError> {
895 clear_operational_secondary_index_entries(tx, collection_name)?;
896
897 let mut mutation_entries_rebuilt = 0usize;
898 if collection_kind == OperationalCollectionKind::AppendOnlyLog {
899 let mut stmt = tx.prepare(
900 "SELECT id, record_key, payload_json FROM operational_mutations \
901 WHERE collection_name = ?1 ORDER BY mutation_order",
902 )?;
903 let rows = stmt
904 .query_map([collection_name], |row| {
905 Ok((
906 row.get::<_, String>(0)?,
907 row.get::<_, String>(1)?,
908 row.get::<_, String>(2)?,
909 ))
910 })?
911 .collect::<Result<Vec<_>, _>>()?;
912 drop(stmt);
913 for (mutation_id, record_key, payload_json) in rows {
914 for entry in extract_secondary_index_entries_for_mutation(indexes, &payload_json) {
915 insert_operational_secondary_index_entry(
916 tx,
917 collection_name,
918 "mutation",
919 &mutation_id,
920 &record_key,
921 &entry,
922 )?;
923 mutation_entries_rebuilt += 1;
924 }
925 }
926 }
927
928 let mut current_entries_rebuilt = 0usize;
929 if collection_kind == OperationalCollectionKind::LatestState {
930 let mut stmt = tx.prepare(
931 "SELECT record_key, payload_json, updated_at, last_mutation_id FROM operational_current \
932 WHERE collection_name = ?1 ORDER BY updated_at DESC, record_key",
933 )?;
934 let rows = stmt
935 .query_map([collection_name], |row| {
936 Ok((
937 row.get::<_, String>(0)?,
938 row.get::<_, String>(1)?,
939 row.get::<_, i64>(2)?,
940 row.get::<_, String>(3)?,
941 ))
942 })?
943 .collect::<Result<Vec<_>, _>>()?;
944 drop(stmt);
945 for (record_key, payload_json, updated_at, last_mutation_id) in rows {
946 for entry in
947 extract_secondary_index_entries_for_current(indexes, &payload_json, updated_at)
948 {
949 insert_operational_secondary_index_entry(
950 tx,
951 collection_name,
952 "current",
953 &last_mutation_id,
954 &record_key,
955 &entry,
956 )?;
957 current_entries_rebuilt += 1;
958 }
959 }
960 }
961
962 Ok((mutation_entries_rebuilt, current_entries_rebuilt))
963}
964
965fn operational_read_limit(limit: Option<usize>) -> Result<usize, EngineError> {
966 let applied_limit = limit.unwrap_or(DEFAULT_OPERATIONAL_READ_LIMIT);
967 if applied_limit == 0 {
968 return Err(EngineError::InvalidWrite(
969 "operational read limit must be greater than zero".to_owned(),
970 ));
971 }
972 Ok(applied_limit.min(MAX_OPERATIONAL_READ_LIMIT))
973}
974
975pub(super) fn parse_operational_filter_fields(
976 filter_fields_json: &str,
977) -> Result<Vec<OperationalFilterField>, String> {
978 let fields: Vec<OperationalFilterField> = serde_json::from_str(filter_fields_json)
979 .map_err(|error| format!("invalid filter_fields_json: {error}"))?;
980 let mut seen = std::collections::HashSet::new();
981 for field in &fields {
982 if field.name.trim().is_empty() {
983 return Err("filter_fields_json field names must not be empty".to_owned());
984 }
985 if !seen.insert(field.name.as_str()) {
986 return Err(format!(
987 "filter_fields_json contains duplicate field '{}'",
988 field.name
989 ));
990 }
991 if field.modes.is_empty() {
992 return Err(format!(
993 "filter_fields_json field '{}' must declare at least one mode",
994 field.name
995 ));
996 }
997 if field.modes.contains(&OperationalFilterMode::Prefix)
998 && field.field_type != OperationalFilterFieldType::String
999 {
1000 return Err(format!(
1001 "filter field '{}' only supports prefix for string types",
1002 field.name
1003 ));
1004 }
1005 }
1006 Ok(fields)
1007}
1008
1009fn compile_operational_read_filters(
1010 filters: &[OperationalFilterClause],
1011 declared_fields: &[OperationalFilterField],
1012) -> Result<Vec<CompiledOperationalReadFilter>, EngineError> {
1013 let field_map = declared_fields
1014 .iter()
1015 .map(|field| (field.name.as_str(), field))
1016 .collect::<std::collections::HashMap<_, _>>();
1017 filters
1018 .iter()
1019 .map(|filter| match filter {
1020 OperationalFilterClause::Exact { field, value } => {
1021 let declared = field_map.get(field.as_str()).ok_or_else(|| {
1022 EngineError::InvalidWrite(format!(
1023 "operational read filter uses undeclared field '{field}'"
1024 ))
1025 })?;
1026 if !declared.modes.contains(&OperationalFilterMode::Exact) {
1027 return Err(EngineError::InvalidWrite(format!(
1028 "operational read field '{field}' does not allow exact filters"
1029 )));
1030 }
1031 let condition = match (declared.field_type, value) {
1032 (OperationalFilterFieldType::String, OperationalFilterValue::String(value)) => {
1033 OperationalReadCondition::ExactString(value.clone())
1034 }
1035 (
1036 OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp,
1037 OperationalFilterValue::Integer(value),
1038 ) => OperationalReadCondition::ExactInteger(*value),
1039 _ => {
1040 return Err(EngineError::InvalidWrite(format!(
1041 "operational read field '{field}' received a value with the wrong type"
1042 )));
1043 }
1044 };
1045 Ok(CompiledOperationalReadFilter {
1046 field: field.clone(),
1047 condition,
1048 })
1049 }
1050 OperationalFilterClause::Prefix { field, value } => {
1051 let declared = field_map.get(field.as_str()).ok_or_else(|| {
1052 EngineError::InvalidWrite(format!(
1053 "operational read filter uses undeclared field '{field}'"
1054 ))
1055 })?;
1056 if !declared.modes.contains(&OperationalFilterMode::Prefix) {
1057 return Err(EngineError::InvalidWrite(format!(
1058 "operational read field '{field}' does not allow prefix filters"
1059 )));
1060 }
1061 if declared.field_type != OperationalFilterFieldType::String {
1062 return Err(EngineError::InvalidWrite(format!(
1063 "operational read field '{field}' only supports prefix filters for strings"
1064 )));
1065 }
1066 Ok(CompiledOperationalReadFilter {
1067 field: field.clone(),
1068 condition: OperationalReadCondition::Prefix(value.clone()),
1069 })
1070 }
1071 OperationalFilterClause::Range {
1072 field,
1073 lower,
1074 upper,
1075 } => {
1076 let declared = field_map.get(field.as_str()).ok_or_else(|| {
1077 EngineError::InvalidWrite(format!(
1078 "operational read filter uses undeclared field '{field}'"
1079 ))
1080 })?;
1081 if !declared.modes.contains(&OperationalFilterMode::Range) {
1082 return Err(EngineError::InvalidWrite(format!(
1083 "operational read field '{field}' does not allow range filters"
1084 )));
1085 }
1086 if !matches!(
1087 declared.field_type,
1088 OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp
1089 ) {
1090 return Err(EngineError::InvalidWrite(format!(
1091 "operational read field '{field}' only supports range filters for integer/timestamp fields"
1092 )));
1093 }
1094 if lower.is_none() && upper.is_none() {
1095 return Err(EngineError::InvalidWrite(format!(
1096 "operational read range filter for '{field}' must specify a lower or upper bound"
1097 )));
1098 }
1099 Ok(CompiledOperationalReadFilter {
1100 field: field.clone(),
1101 condition: OperationalReadCondition::Range {
1102 lower: *lower,
1103 upper: *upper,
1104 },
1105 })
1106 }
1107 })
1108 .collect()
1109}
1110
1111fn match_append_only_secondary_index_read<'a>(
1112 filters: &'a [CompiledOperationalReadFilter],
1113 indexes: &'a [OperationalSecondaryIndexDefinition],
1114) -> Option<MatchedAppendOnlySecondaryIndexRead<'a>> {
1115 indexes.iter().find_map(|index| {
1116 let OperationalSecondaryIndexDefinition::AppendOnlyFieldTime {
1117 name,
1118 field,
1119 value_type,
1120 time_field,
1121 } = index
1122 else {
1123 return None;
1124 };
1125 if !(1..=2).contains(&filters.len()) {
1126 return None;
1127 }
1128
1129 let mut value_filter = None;
1130 let mut time_range = None;
1131 for filter in filters {
1132 if filter.field == *field {
1133 let supported = matches!(
1134 (&filter.condition, value_type),
1135 (
1136 OperationalReadCondition::ExactString(_)
1137 | OperationalReadCondition::Prefix(_),
1138 crate::operational::OperationalSecondaryIndexValueType::String
1139 ) | (
1140 OperationalReadCondition::ExactInteger(_),
1141 crate::operational::OperationalSecondaryIndexValueType::Integer
1142 | crate::operational::OperationalSecondaryIndexValueType::Timestamp
1143 )
1144 );
1145 if !supported || value_filter.is_some() {
1146 return None;
1147 }
1148 value_filter = Some(filter);
1149 continue;
1150 }
1151 if filter.field == *time_field {
1152 if !matches!(filter.condition, OperationalReadCondition::Range { .. })
1153 || time_range.is_some()
1154 {
1155 return None;
1156 }
1157 time_range = Some(filter);
1158 continue;
1159 }
1160 return None;
1161 }
1162
1163 value_filter.map(|value_filter| MatchedAppendOnlySecondaryIndexRead {
1164 index_name: name.as_str(),
1165 value_filter,
1166 time_range,
1167 })
1168 })
1169}
1170
1171fn execute_operational_secondary_index_read(
1172 conn: &rusqlite::Connection,
1173 collection_name: &str,
1174 filters: &[CompiledOperationalReadFilter],
1175 indexes: &[OperationalSecondaryIndexDefinition],
1176 applied_limit: usize,
1177) -> Result<Option<OperationalReadReport>, EngineError> {
1178 use rusqlite::types::Value;
1179
1180 let Some(matched) = match_append_only_secondary_index_read(filters, indexes) else {
1181 return Ok(None);
1182 };
1183
1184 let mut sql = String::from(
1185 "SELECT m.id, m.collection_name, m.record_key, m.op_kind, m.payload_json, m.source_ref, m.created_at \
1186 FROM operational_secondary_index_entries s \
1187 JOIN operational_mutations m ON m.id = s.mutation_id \
1188 WHERE s.collection_name = ?1 AND s.index_name = ?2 AND s.subject_kind = 'mutation' ",
1189 );
1190 let mut params = vec![
1191 Value::from(collection_name.to_owned()),
1192 Value::from(matched.index_name.to_owned()),
1193 ];
1194
1195 match &matched.value_filter.condition {
1196 OperationalReadCondition::ExactString(value) => {
1197 let _ = write!(sql, "AND s.slot1_text = ?{} ", params.len() + 1);
1198 params.push(Value::from(value.clone()));
1199 }
1200 OperationalReadCondition::Prefix(value) => {
1201 let _ = write!(sql, "AND s.slot1_text GLOB ?{} ", params.len() + 1);
1202 params.push(Value::from(glob_prefix_pattern(value)));
1203 }
1204 OperationalReadCondition::ExactInteger(value) => {
1205 let _ = write!(sql, "AND s.slot1_integer = ?{} ", params.len() + 1);
1206 params.push(Value::from(*value));
1207 }
1208 OperationalReadCondition::Range { .. } => return Ok(None),
1209 }
1210
1211 if let Some(time_range) = matched.time_range
1212 && let OperationalReadCondition::Range { lower, upper } = &time_range.condition
1213 {
1214 if let Some(lower) = lower {
1215 let _ = write!(sql, "AND s.sort_timestamp >= ?{} ", params.len() + 1);
1216 params.push(Value::from(*lower));
1217 }
1218 if let Some(upper) = upper {
1219 let _ = write!(sql, "AND s.sort_timestamp <= ?{} ", params.len() + 1);
1220 params.push(Value::from(*upper));
1221 }
1222 }
1223
1224 let _ = write!(
1225 sql,
1226 "ORDER BY s.sort_timestamp DESC, m.mutation_order DESC LIMIT ?{}",
1227 params.len() + 1
1228 );
1229 params.push(Value::from(i64::try_from(applied_limit + 1).map_err(
1230 |_| EngineError::Bridge("operational read limit overflow".to_owned()),
1231 )?));
1232
1233 let mut stmt = conn.prepare(&sql)?;
1234 let mut rows = stmt
1235 .query_map(
1236 rusqlite::params_from_iter(params),
1237 map_operational_mutation_row,
1238 )?
1239 .collect::<Result<Vec<_>, _>>()?;
1240 let was_limited = rows.len() > applied_limit;
1241 if was_limited {
1242 rows.truncate(applied_limit);
1243 }
1244
1245 Ok(Some(OperationalReadReport {
1246 collection_name: collection_name.to_owned(),
1247 row_count: rows.len(),
1248 applied_limit,
1249 was_limited,
1250 rows,
1251 }))
1252}
1253
1254fn execute_operational_filtered_read(
1255 conn: &rusqlite::Connection,
1256 collection_name: &str,
1257 filters: &[CompiledOperationalReadFilter],
1258 applied_limit: usize,
1259) -> Result<OperationalReadReport, EngineError> {
1260 use rusqlite::types::Value;
1261
1262 let mut sql = String::from(
1263 "SELECT m.id, m.collection_name, m.record_key, m.op_kind, m.payload_json, m.source_ref, m.created_at \
1264 FROM operational_mutations m ",
1265 );
1266 let mut params = vec![Value::from(collection_name.to_owned())];
1267 for (index, filter) in filters.iter().enumerate() {
1268 let _ = write!(
1269 sql,
1270 "JOIN operational_filter_values f{index} \
1271 ON f{index}.mutation_id = m.id \
1272 AND f{index}.collection_name = m.collection_name "
1273 );
1274 match &filter.condition {
1275 OperationalReadCondition::ExactString(value) => {
1276 let _ = write!(
1277 sql,
1278 "AND f{index}.field_name = ?{} AND f{index}.string_value = ?{} ",
1279 params.len() + 1,
1280 params.len() + 2
1281 );
1282 params.push(Value::from(filter.field.clone()));
1283 params.push(Value::from(value.clone()));
1284 }
1285 OperationalReadCondition::ExactInteger(value) => {
1286 let _ = write!(
1287 sql,
1288 "AND f{index}.field_name = ?{} AND f{index}.integer_value = ?{} ",
1289 params.len() + 1,
1290 params.len() + 2
1291 );
1292 params.push(Value::from(filter.field.clone()));
1293 params.push(Value::from(*value));
1294 }
1295 OperationalReadCondition::Prefix(value) => {
1296 let _ = write!(
1297 sql,
1298 "AND f{index}.field_name = ?{} AND f{index}.string_value GLOB ?{} ",
1299 params.len() + 1,
1300 params.len() + 2
1301 );
1302 params.push(Value::from(filter.field.clone()));
1303 params.push(Value::from(glob_prefix_pattern(value)));
1304 }
1305 OperationalReadCondition::Range { lower, upper } => {
1306 let _ = write!(sql, "AND f{index}.field_name = ?{} ", params.len() + 1);
1307 params.push(Value::from(filter.field.clone()));
1308 if let Some(lower) = lower {
1309 let _ = write!(sql, "AND f{index}.integer_value >= ?{} ", params.len() + 1);
1310 params.push(Value::from(*lower));
1311 }
1312 if let Some(upper) = upper {
1313 let _ = write!(sql, "AND f{index}.integer_value <= ?{} ", params.len() + 1);
1314 params.push(Value::from(*upper));
1315 }
1316 }
1317 }
1318 }
1319 let _ = write!(
1320 sql,
1321 "WHERE m.collection_name = ?1 ORDER BY m.mutation_order DESC LIMIT ?{}",
1322 params.len() + 1
1323 );
1324 params.push(Value::from(i64::try_from(applied_limit + 1).map_err(
1325 |_| EngineError::Bridge("operational read limit overflow".to_owned()),
1326 )?));
1327
1328 let mut stmt = conn.prepare(&sql)?;
1329 let mut rows = stmt
1330 .query_map(
1331 rusqlite::params_from_iter(params),
1332 map_operational_mutation_row,
1333 )?
1334 .collect::<Result<Vec<_>, _>>()?;
1335 let was_limited = rows.len() > applied_limit;
1336 if was_limited {
1337 rows.truncate(applied_limit);
1338 }
1339 Ok(OperationalReadReport {
1340 collection_name: collection_name.to_owned(),
1341 row_count: rows.len(),
1342 applied_limit,
1343 was_limited,
1344 rows,
1345 })
1346}
1347
1348fn glob_prefix_pattern(value: &str) -> String {
1349 let mut pattern = String::with_capacity(value.len() + 1);
1350 for ch in value.chars() {
1351 match ch {
1352 '*' => pattern.push_str("[*]"),
1353 '?' => pattern.push_str("[?]"),
1354 '[' => pattern.push_str("[[]"),
1355 _ => pattern.push(ch),
1356 }
1357 }
1358 pattern.push('*');
1359 pattern
1360}
1361
1362fn extract_operational_filter_values(
1363 filter_fields: &[OperationalFilterField],
1364 payload_json: &str,
1365) -> Vec<ExtractedOperationalFilterValue> {
1366 let Ok(parsed) = serde_json::from_str::<serde_json::Value>(payload_json) else {
1367 return Vec::new();
1368 };
1369 let Some(object) = parsed.as_object() else {
1370 return Vec::new();
1371 };
1372
1373 filter_fields
1374 .iter()
1375 .filter_map(|field| {
1376 let value = object.get(&field.name)?;
1377 match field.field_type {
1378 OperationalFilterFieldType::String => {
1379 value
1380 .as_str()
1381 .map(|string_value| ExtractedOperationalFilterValue {
1382 field_name: field.name.clone(),
1383 string_value: Some(string_value.to_owned()),
1384 integer_value: None,
1385 })
1386 }
1387 OperationalFilterFieldType::Integer | OperationalFilterFieldType::Timestamp => {
1388 value
1389 .as_i64()
1390 .map(|integer_value| ExtractedOperationalFilterValue {
1391 field_name: field.name.clone(),
1392 string_value: None,
1393 integer_value: Some(integer_value),
1394 })
1395 }
1396 }
1397 })
1398 .collect()
1399}
1400
1401fn operational_compaction_candidates(
1402 conn: &rusqlite::Connection,
1403 retention_json: &str,
1404 collection_name: &str,
1405) -> Result<(Vec<String>, Option<i64>), EngineError> {
1406 operational_compaction_candidates_at(
1407 conn,
1408 retention_json,
1409 collection_name,
1410 current_unix_timestamp()?,
1411 )
1412}
1413
1414fn operational_compaction_candidates_at(
1415 conn: &rusqlite::Connection,
1416 retention_json: &str,
1417 collection_name: &str,
1418 now_timestamp: i64,
1419) -> Result<(Vec<String>, Option<i64>), EngineError> {
1420 let policy = parse_operational_retention_policy(retention_json)?;
1421 match policy {
1422 OperationalRetentionPolicy::KeepAll => Ok((Vec::new(), None)),
1423 OperationalRetentionPolicy::PurgeBeforeSeconds { max_age_seconds } => {
1424 let before_timestamp = now_timestamp - max_age_seconds;
1425 let mut stmt = conn.prepare(
1426 "SELECT id FROM operational_mutations \
1427 WHERE collection_name = ?1 AND created_at < ?2 \
1428 ORDER BY mutation_order",
1429 )?;
1430 let mutation_ids = stmt
1431 .query_map(
1432 rusqlite::params![collection_name, before_timestamp],
1433 |row| row.get::<_, String>(0),
1434 )?
1435 .collect::<Result<Vec<_>, _>>()?;
1436 Ok((mutation_ids, Some(before_timestamp)))
1437 }
1438 OperationalRetentionPolicy::KeepLast { max_rows } => {
1439 let mut stmt = conn.prepare(
1440 "SELECT id FROM operational_mutations \
1441 WHERE collection_name = ?1 \
1442 ORDER BY mutation_order DESC",
1443 )?;
1444 let ordered_ids = stmt
1445 .query_map([collection_name], |row| row.get::<_, String>(0))?
1446 .collect::<Result<Vec<_>, _>>()?;
1447 Ok((ordered_ids.into_iter().skip(max_rows).collect(), None))
1448 }
1449 }
1450}
1451
1452fn parse_operational_retention_policy(
1453 retention_json: &str,
1454) -> Result<OperationalRetentionPolicy, EngineError> {
1455 let policy: OperationalRetentionPolicy = serde_json::from_str(retention_json)
1456 .map_err(|error| EngineError::InvalidWrite(format!("invalid retention_json: {error}")))?;
1457 match policy {
1458 OperationalRetentionPolicy::KeepAll => Ok(policy),
1459 OperationalRetentionPolicy::PurgeBeforeSeconds { max_age_seconds } => {
1460 if max_age_seconds <= 0 {
1461 return Err(EngineError::InvalidWrite(
1462 "retention_json max_age_seconds must be greater than zero".to_owned(),
1463 ));
1464 }
1465 Ok(policy)
1466 }
1467 OperationalRetentionPolicy::KeepLast { max_rows } => {
1468 if max_rows == 0 {
1469 return Err(EngineError::InvalidWrite(
1470 "retention_json max_rows must be greater than zero".to_owned(),
1471 ));
1472 }
1473 Ok(policy)
1474 }
1475 }
1476}
1477
1478fn load_operational_retention_records(
1479 conn: &rusqlite::Connection,
1480 collection_names: Option<&[String]>,
1481 max_collections: Option<usize>,
1482) -> Result<Vec<OperationalCollectionRecord>, EngineError> {
1483 let limit = max_collections.unwrap_or(usize::MAX);
1484 if limit == 0 {
1485 return Err(EngineError::InvalidWrite(
1486 "max_collections must be greater than zero".to_owned(),
1487 ));
1488 }
1489
1490 let mut records = Vec::new();
1491 if let Some(collection_names) = collection_names {
1492 for name in collection_names.iter().take(limit) {
1493 let record = load_operational_collection_record(conn, name)?.ok_or_else(|| {
1494 EngineError::InvalidWrite(format!(
1495 "operational collection '{name}' is not registered"
1496 ))
1497 })?;
1498 records.push(record);
1499 }
1500 return Ok(records);
1501 }
1502
1503 let mut stmt = conn.prepare(
1504 "SELECT name, kind, schema_json, retention_json, filter_fields_json, validation_json, secondary_indexes_json, format_version, created_at, disabled_at \
1505 FROM operational_collections ORDER BY name",
1506 )?;
1507 let rows = stmt
1508 .query_map([], map_operational_collection_row)?
1509 .take(limit)
1510 .collect::<Result<Vec<_>, _>>()?;
1511 Ok(rows)
1512}
1513
1514fn last_operational_retention_run_at(
1515 conn: &rusqlite::Connection,
1516 collection_name: &str,
1517) -> Result<Option<i64>, EngineError> {
1518 conn.query_row(
1519 "SELECT MAX(executed_at) FROM operational_retention_runs WHERE collection_name = ?1",
1520 [collection_name],
1521 |row| row.get(0),
1522 )
1523 .optional()
1524 .map_err(EngineError::Sqlite)
1525 .map(Option::flatten)
1526}
1527
1528fn count_operational_mutations_for_collection(
1529 conn: &rusqlite::Connection,
1530 collection_name: &str,
1531) -> Result<usize, EngineError> {
1532 let count: i64 = conn.query_row(
1533 "SELECT count(*) FROM operational_mutations WHERE collection_name = ?1",
1534 [collection_name],
1535 |row| row.get(0),
1536 )?;
1537 usize::try_from(count).map_err(|_| {
1538 EngineError::Bridge(format!("count overflow for collection {collection_name}"))
1539 })
1540}
1541
1542fn retention_action_kind_and_limit(
1543 policy: &OperationalRetentionPolicy,
1544) -> (OperationalRetentionActionKind, Option<usize>) {
1545 match policy {
1546 OperationalRetentionPolicy::KeepAll => (OperationalRetentionActionKind::Noop, None),
1547 OperationalRetentionPolicy::PurgeBeforeSeconds { .. } => {
1548 (OperationalRetentionActionKind::PurgeBeforeSeconds, None)
1549 }
1550 OperationalRetentionPolicy::KeepLast { max_rows } => {
1551 (OperationalRetentionActionKind::KeepLast, Some(*max_rows))
1552 }
1553 }
1554}
1555
1556fn plan_operational_retention_item(
1557 conn: &rusqlite::Connection,
1558 record: &OperationalCollectionRecord,
1559 now_timestamp: i64,
1560) -> Result<OperationalRetentionPlanItem, EngineError> {
1561 let last_run_at = last_operational_retention_run_at(conn, &record.name)?;
1562 if record.kind != OperationalCollectionKind::AppendOnlyLog {
1563 return Ok(OperationalRetentionPlanItem {
1564 collection_name: record.name.clone(),
1565 action_kind: OperationalRetentionActionKind::Noop,
1566 candidate_deletions: 0,
1567 before_timestamp: None,
1568 max_rows: None,
1569 last_run_at,
1570 });
1571 }
1572 let policy = parse_operational_retention_policy(&record.retention_json)?;
1573 let (action_kind, max_rows) = retention_action_kind_and_limit(&policy);
1574 let (candidate_ids, before_timestamp) = operational_compaction_candidates_at(
1575 conn,
1576 &record.retention_json,
1577 &record.name,
1578 now_timestamp,
1579 )?;
1580 Ok(OperationalRetentionPlanItem {
1581 collection_name: record.name.clone(),
1582 action_kind,
1583 candidate_deletions: candidate_ids.len(),
1584 before_timestamp,
1585 max_rows,
1586 last_run_at,
1587 })
1588}
1589
1590fn run_operational_retention_item(
1591 tx: &rusqlite::Transaction<'_>,
1592 record: &OperationalCollectionRecord,
1593 now_timestamp: i64,
1594 dry_run: bool,
1595) -> Result<OperationalRetentionRunItem, EngineError> {
1596 let plan = plan_operational_retention_item(tx, record, now_timestamp)?;
1597 let mut deleted_mutations = 0usize;
1598 if record.kind == OperationalCollectionKind::AppendOnlyLog
1599 && plan.action_kind != OperationalRetentionActionKind::Noop
1600 && plan.candidate_deletions > 0
1601 && !dry_run
1602 {
1603 let (candidate_ids, _) = operational_compaction_candidates_at(
1604 tx,
1605 &record.retention_json,
1606 &record.name,
1607 now_timestamp,
1608 )?;
1609 let mut delete_stmt =
1610 tx.prepare_cached("DELETE FROM operational_mutations WHERE id = ?1")?;
1611 for mutation_id in &candidate_ids {
1612 delete_stmt.execute([mutation_id.as_str()])?;
1613 deleted_mutations += 1;
1614 }
1615 drop(delete_stmt);
1616
1617 persist_simple_provenance_event(
1618 tx,
1619 "operational_retention_run",
1620 &record.name,
1621 Some(serde_json::json!({
1622 "action_kind": plan.action_kind,
1623 "deleted_mutations": deleted_mutations,
1624 "before_timestamp": plan.before_timestamp,
1625 "max_rows": plan.max_rows,
1626 "executed_at": now_timestamp,
1627 })),
1628 )?;
1629 }
1630
1631 let live_rows_remaining = count_operational_mutations_for_collection(tx, &record.name)?;
1632 let effective_deleted_mutations = if dry_run {
1633 plan.candidate_deletions
1634 } else {
1635 deleted_mutations
1636 };
1637 let rows_remaining = if dry_run {
1638 live_rows_remaining.saturating_sub(effective_deleted_mutations)
1639 } else {
1640 live_rows_remaining
1641 };
1642 if !dry_run && plan.action_kind != OperationalRetentionActionKind::Noop {
1643 tx.execute(
1644 "INSERT INTO operational_retention_runs \
1645 (id, collection_name, executed_at, action_kind, dry_run, deleted_mutations, rows_remaining, metadata_json) \
1646 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1647 rusqlite::params![
1648 new_id(),
1649 record.name,
1650 now_timestamp,
1651 serde_json::to_string(&plan.action_kind)
1652 .unwrap_or_else(|_| "\"noop\"".to_owned())
1653 .trim_matches('"')
1654 .to_owned(),
1655 i32::from(dry_run),
1656 deleted_mutations,
1657 rows_remaining,
1658 serde_json::json!({
1659 "before_timestamp": plan.before_timestamp,
1660 "max_rows": plan.max_rows,
1661 })
1662 .to_string(),
1663 ],
1664 )?;
1665 }
1666
1667 Ok(OperationalRetentionRunItem {
1668 collection_name: plan.collection_name,
1669 action_kind: plan.action_kind,
1670 deleted_mutations: effective_deleted_mutations,
1671 before_timestamp: plan.before_timestamp,
1672 max_rows: plan.max_rows,
1673 rows_remaining,
1674 })
1675}
1676
1677fn current_unix_timestamp() -> Result<i64, EngineError> {
1678 let now = SystemTime::now()
1679 .duration_since(SystemTime::UNIX_EPOCH)
1680 .map_err(|error| EngineError::Bridge(format!("system clock error: {error}")))?;
1681 i64::try_from(now.as_secs())
1682 .map_err(|_| EngineError::Bridge("unix timestamp overflow".to_owned()))
1683}