icydb-core 0.94.0

IcyDB — A schema-first typed query engine and persistence runtime for Internet Computer canisters
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
//! Module: relation::reverse_index
//! Responsibility: maintain reverse-index relation targets for strong relation consistency.
//! Does not own: planner query semantics or execution routing policies.
//! Boundary: applies relation reverse-index mutations during commit pathways.

use crate::{
    db::{
        Db,
        commit::{PreparedIndexDeltaKind, PreparedIndexMutation},
        data::{
            CanonicalSlotReader, DataKey, RawDataKey, RawRow, ScalarSlotValueRef, ScalarValueRef,
            StorageKey, StructuralSlotReader, decode_relation_target_storage_keys_bytes,
        },
        index::{
            IndexEntry, IndexId, IndexKeyKind, IndexStore, RawIndexEntry, RawIndexKey,
            StructuralIndexEntryReader, encode_canonical_index_component_from_storage_key,
            raw_keys_for_component_prefix_with_kind,
        },
        relation::{
            RelationTargetDecodeContext, RelationTargetMismatchPolicy,
            metadata::{StrongRelationInfo, strong_relations_for_model_iter},
        },
    },
    error::InternalError,
    model::{entity::EntityModel, field::FieldKind},
    traits::{CanisterKind, EntityKind},
    types::EntityTag,
};
use std::{cell::RefCell, thread::LocalKey};

///
/// ReverseRelationSourceInfo
///
/// Resolved authority used while preparing reverse-index mutations.
/// Carries only the source entity path and tag required for diagnostics and
/// reverse-index identity, so the heavy mutation loop does not need `S`.
///

#[derive(Clone, Copy)]
pub(crate) struct ReverseRelationSourceInfo {
    path: &'static str,
    entity_tag: EntityTag,
}

impl ReverseRelationSourceInfo {
    /// Lower one typed source entity into the resolved authority used by reverse-index prep.
    pub(crate) const fn for_type<S>() -> Self
    where
        S: EntityKind,
    {
        Self {
            path: S::PATH,
            entity_tag: S::ENTITY_TAG,
        }
    }

    /// Return the structural source entity tag used for reverse-index identity.
    #[must_use]
    pub(crate) const fn entity_tag(self) -> EntityTag {
        self.entity_tag
    }
}

///
/// ReverseRelationMutationTarget
///
/// Shared reverse-index mutation context for one touched target key.
/// This keeps the structural mutation helper narrow without dragging the
/// whole typed source shell through the per-target update path.
///

#[derive(Clone)]
struct ReverseRelationMutationTarget {
    target_store: &'static LocalKey<RefCell<IndexStore>>,
    reverse_key: RawIndexKey,
    old_contains: bool,
    new_contains: bool,
}

///
/// ReverseRelationSourceTransition
///
/// Shared old/new source-row views used during reverse-index preparation.
/// This lets commit preflight reuse already-decoded structural slot readers
/// while preserving the existing raw-row fallback for other call sites.
///

struct ReverseRelationSourceTransition<'row, 'slots> {
    source_model: &'static EntityModel,
    old_row: Option<&'row RawRow>,
    new_row: Option<&'row RawRow>,
    old_row_fields: Option<&'slots StructuralSlotReader<'row>>,
    new_row_fields: Option<&'slots StructuralSlotReader<'row>>,
}

// Resolve the canonical relation-target decode context label used by
// corruption diagnostics.
const fn relation_target_key_decode_context_label(
    context: RelationTargetDecodeContext,
) -> &'static str {
    match context {
        RelationTargetDecodeContext::DeleteValidation => "delete relation target key decode failed",
        RelationTargetDecodeContext::ReverseIndexPrepare => {
            "relation target key decode failed while preparing reverse index"
        }
    }
}

// Resolve the canonical relation-target entity mismatch label used by
// corruption diagnostics.
const fn relation_target_entity_mismatch_context_label(
    context: RelationTargetDecodeContext,
) -> &'static str {
    match context {
        RelationTargetDecodeContext::DeleteValidation => {
            "relation target entity mismatch during delete validation"
        }
        RelationTargetDecodeContext::ReverseIndexPrepare => {
            "relation target entity mismatch while preparing reverse index"
        }
    }
}

/// Build the canonical reverse-index id for a `(source entity, relation field)` pair.
fn reverse_index_id_for_relation(
    source: ReverseRelationSourceInfo,
    relation: StrongRelationInfo,
) -> Result<IndexId, InternalError> {
    let ordinal = u16::try_from(relation.field_index).map_err(|err| {
        InternalError::reverse_index_ordinal_overflow(
            source.path,
            relation.field_name,
            relation.target_path,
            err,
        )
    })?;

    Ok(IndexId::new(source.entity_tag, ordinal))
}

/// Build a reverse-index key for one target storage key.
pub(super) fn reverse_index_key_for_target_storage_key(
    source: ReverseRelationSourceInfo,
    relation: StrongRelationInfo,
    target_key_value: StorageKey,
) -> Result<Option<RawIndexKey>, InternalError> {
    let Ok(encoded_value) = encode_canonical_index_component_from_storage_key(target_key_value)
    else {
        return Ok(None);
    };

    let index_id = reverse_index_id_for_relation(source, relation)?;
    let (key, _) = raw_keys_for_component_prefix_with_kind(
        &index_id,
        IndexKeyKind::System,
        1,
        std::slice::from_ref(&encoded_value),
    );

    Ok(Some(key))
}

/// Read relation-target raw keys directly from one persisted source row.
///
/// This structural path exists for delete validation, where the runtime only
/// needs the relation field payload and should not decode the full typed
/// entity inside the hot blocked-delete proof loop.
pub(super) fn relation_target_raw_keys_for_source_row(
    raw_row: &RawRow,
    source_model: &'static EntityModel,
    source_info: ReverseRelationSourceInfo,
    relation: StrongRelationInfo,
) -> Result<Vec<RawDataKey>, InternalError> {
    let row_fields = StructuralSlotReader::from_raw_row(raw_row, source_model)?;

    relation_target_raw_keys_for_source_slots(&row_fields, source_info, relation)
}

// Read relation-target raw keys directly from one already-decoded structural
// source row so commit preflight can reuse slot readers it has already
// validated for forward-index planning.
fn relation_target_raw_keys_for_source_slots(
    row_fields: &StructuralSlotReader<'_>,
    source_info: ReverseRelationSourceInfo,
    relation: StrongRelationInfo,
) -> Result<Vec<RawDataKey>, InternalError> {
    let keys = relation_target_storage_keys_for_source_slots(row_fields, source_info, relation)?;

    relation_target_raw_keys_from_storage_keys(source_info, relation, keys)
}

/// Check whether one persisted source row still references one specific target
/// key for the declared strong relation.
///
/// Delete validation uses this narrower helper because the blocked-delete proof
/// loop only needs membership for one candidate target key, not the full
/// canonicalized target-key set.
pub(in crate::db::relation) fn source_row_references_relation_target(
    raw_row: &RawRow,
    source_model: &'static EntityModel,
    source_info: ReverseRelationSourceInfo,
    relation: StrongRelationInfo,
    target_key: StorageKey,
) -> Result<bool, InternalError> {
    let row_fields = StructuralSlotReader::from_raw_row(raw_row, source_model)?;

    source_slots_reference_relation_target(&row_fields, source_info, relation, target_key)
}

// Check one already-decoded structural source row for membership of one target
// key without rebuilding the full canonical target-key vector.
fn source_slots_reference_relation_target(
    row_fields: &StructuralSlotReader<'_>,
    source_info: ReverseRelationSourceInfo,
    relation: StrongRelationInfo,
    target_key: StorageKey,
) -> Result<bool, InternalError> {
    let keys = relation_target_storage_keys_for_source_slots(row_fields, source_info, relation)?;

    Ok(keys.into_iter().any(|candidate| candidate == target_key))
}

// Canonicalize reverse-index target keys into deterministic sorted-unique order.
fn canonicalize_relation_target_keys(keys: &mut Vec<RawDataKey>) {
    keys.sort_unstable();
    keys.dedup();
}

/// Decode a reverse-index entry into source-key membership.
pub(super) fn decode_reverse_entry(
    source: ReverseRelationSourceInfo,
    relation: StrongRelationInfo,
    index_key: &RawIndexKey,
    raw_entry: &RawIndexEntry,
) -> Result<IndexEntry, InternalError> {
    raw_entry.try_decode().map_err(|err| {
        InternalError::reverse_index_entry_corrupted(
            source.path,
            relation.field_name,
            relation.target_path,
            index_key,
            err,
        )
    })
}

/// Encode a reverse-index entry with bounded-size error mapping.
fn encode_reverse_entry(
    source: ReverseRelationSourceInfo,
    relation: StrongRelationInfo,
    entry: &IndexEntry,
) -> Result<RawIndexEntry, InternalError> {
    RawIndexEntry::try_from_entry(entry).map_err(|err| {
        InternalError::reverse_index_entry_encode_failed(
            source.path,
            relation.field_name,
            relation.target_path,
            err,
        )
    })
}

/// Resolve target store handle for one relation descriptor.
pub(super) fn relation_target_store<C>(
    db: &Db<C>,
    source: ReverseRelationSourceInfo,
    relation: StrongRelationInfo,
) -> Result<&'static LocalKey<RefCell<IndexStore>>, InternalError>
where
    C: CanisterKind,
{
    db.with_store_registry(|reg| reg.try_get_store(relation.target_store_path))
        .map(|store| store.index_store())
        .map_err(|err| {
            InternalError::relation_target_store_missing(
                source.path,
                relation.field_name,
                relation.target_path,
                relation.target_store_path,
                err,
            )
        })
}

/// Decode one raw relation target key and enforce reverse-index target invariants.
pub(in crate::db::relation) fn decode_relation_target_data_key(
    source: ReverseRelationSourceInfo,
    relation: StrongRelationInfo,
    target_raw_key: &RawDataKey,
    context: RelationTargetDecodeContext,
    mismatch_policy: RelationTargetMismatchPolicy,
) -> Result<Option<DataKey>, InternalError> {
    let target_data_key = DataKey::try_from_raw(target_raw_key).map_err(|err| {
        InternalError::relation_target_key_decode_failed(
            relation_target_key_decode_context_label(context),
            source.path,
            relation.field_name,
            relation.target_path,
            err,
        )
    })?;

    if target_data_key.entity_tag() != relation.target_entity_tag {
        if matches!(mismatch_policy, RelationTargetMismatchPolicy::Skip) {
            return Ok(None);
        }

        return Err(InternalError::relation_target_entity_mismatch(
            relation_target_entity_mismatch_context_label(context),
            source.path,
            relation.field_name,
            relation.target_path,
            relation.target_entity_name,
            relation.target_entity_tag.value(),
            target_data_key.entity_tag().value(),
        ));
    }

    Ok(Some(target_data_key))
}

// Convert decoded relation target storage keys into canonical sorted raw keys.
fn relation_target_raw_keys_from_storage_keys(
    source: ReverseRelationSourceInfo,
    relation: StrongRelationInfo,
    keys: Vec<StorageKey>,
) -> Result<Vec<RawDataKey>, InternalError> {
    let mut keys = keys
        .into_iter()
        .map(|value| raw_relation_target_key_from_storage_key(source, relation, value))
        .collect::<Result<Vec<_>, _>>()?;
    canonicalize_relation_target_keys(&mut keys);

    Ok(keys)
}

// Decode one relation field into structural target storage keys through the
// shared scalar-fast-path or field-bytes path used by delete validation and
// reverse-index mutation preparation.
fn relation_target_storage_keys_for_source_slots(
    row_fields: &StructuralSlotReader<'_>,
    source: ReverseRelationSourceInfo,
    relation: StrongRelationInfo,
) -> Result<Vec<StorageKey>, InternalError> {
    // Phase 1: keep single relation slots on the scalar fast path when the
    // persisted field already uses a storage-key-compatible leaf codec.
    if let Some(keys) = relation_target_storage_keys_from_scalar_slot(row_fields, source, relation)?
    {
        return Ok(keys);
    }

    // Phase 2: decode the declared relation field payload directly into target
    // storage keys without rebuilding a runtime `Value` container.
    relation_target_storage_keys_from_field_bytes(row_fields, source, relation)
}

// Decode the one strong-relation field payload needed by structural delete
// validation directly into relation target storage keys from the encoded field
// bytes.
fn relation_target_storage_keys_from_field_bytes(
    row_fields: &StructuralSlotReader<'_>,
    source: ReverseRelationSourceInfo,
    relation: StrongRelationInfo,
) -> Result<Vec<StorageKey>, InternalError> {
    validate_relation_field_kind(relation.field_kind)?;

    let bytes = row_fields.required_field_bytes(relation.field_index, relation.field_name)?;
    let keys =
        decode_relation_target_storage_keys_bytes(bytes, relation.field_kind).map_err(|err| {
            InternalError::relation_source_row_decode_failed(
                source.path,
                relation.field_name,
                relation.target_path,
                err,
            )
        })?;

    Ok(keys)
}

// Decode one singular strong relation directly from the scalar slot codec when
// the relation key kind is already storage-key-compatible on the persisted row.
fn relation_target_storage_keys_from_scalar_slot(
    row_fields: &StructuralSlotReader<'_>,
    source: ReverseRelationSourceInfo,
    relation: StrongRelationInfo,
) -> Result<Option<Vec<StorageKey>>, InternalError> {
    let FieldKind::Relation { .. } = relation.field_kind else {
        return Ok(None);
    };

    match row_fields.required_scalar(relation.field_index)? {
        ScalarSlotValueRef::Null => Ok(Some(Vec::new())),
        ScalarSlotValueRef::Value(value) => {
            let storage_key = storage_key_from_relation_scalar(value).ok_or_else(|| {
                InternalError::relation_source_row_unsupported_scalar_relation_key(
                    source.path,
                    relation.field_name,
                    relation.target_path,
                )
            })?;

            Ok(Some(vec![storage_key]))
        }
    }
}

// Convert one scalar relation payload into the storage-key representation used
// by reverse-index and target-row identities.
const fn storage_key_from_relation_scalar(value: ScalarValueRef<'_>) -> Option<StorageKey> {
    match value {
        ScalarValueRef::Int(value) => Some(StorageKey::Int(value)),
        ScalarValueRef::Principal(value) => Some(StorageKey::Principal(value)),
        ScalarValueRef::Subaccount(value) => Some(StorageKey::Subaccount(value)),
        ScalarValueRef::Timestamp(value) => Some(StorageKey::Timestamp(value)),
        ScalarValueRef::Uint(value) => Some(StorageKey::Uint(value)),
        ScalarValueRef::Ulid(value) => Some(StorageKey::Ulid(value)),
        ScalarValueRef::Unit => Some(StorageKey::Unit),
        ScalarValueRef::Blob(_)
        | ScalarValueRef::Bool(_)
        | ScalarValueRef::Date(_)
        | ScalarValueRef::Duration(_)
        | ScalarValueRef::Float32(_)
        | ScalarValueRef::Float64(_)
        | ScalarValueRef::Text(_) => None,
    }
}

// Encode one decoded relation storage key directly into the target raw-key
// shape without materializing an intermediate runtime `Value`.
fn raw_relation_target_key_from_storage_key(
    source: ReverseRelationSourceInfo,
    relation: StrongRelationInfo,
    value: StorageKey,
) -> Result<RawDataKey, InternalError> {
    DataKey::raw_from_parts(relation.target_entity_tag, value).map_err(|err| {
        InternalError::relation_source_row_decode_failed(
            source.path,
            relation.field_name,
            relation.target_path,
            err,
        )
    })
}

// Enforce the narrow relation-field shapes that strong-relation structural
// decode is allowed to accept on this path.
fn validate_relation_field_kind(kind: FieldKind) -> Result<(), InternalError> {
    match kind {
        FieldKind::Relation { key_kind, .. } => validate_relation_key_kind(*key_kind),
        FieldKind::List(FieldKind::Relation { key_kind, .. })
        | FieldKind::Set(FieldKind::Relation { key_kind, .. }) => {
            validate_relation_key_kind(**key_kind)
        }
        other => Err(InternalError::relation_source_row_invalid_field_kind(other)),
    }
}

// Enforce the storage-key-compatible relation key kinds supported by the raw
// relation target-key builder.
fn validate_relation_key_kind(key_kind: FieldKind) -> Result<(), InternalError> {
    match key_kind {
        FieldKind::Account
        | FieldKind::Int
        | FieldKind::Principal
        | FieldKind::Subaccount
        | FieldKind::Timestamp
        | FieldKind::Uint
        | FieldKind::Ulid
        | FieldKind::Unit => Ok(()),
        other => Err(InternalError::relation_source_row_unsupported_key_kind(
            other,
        )),
    }
}

/// Build one reverse-index mutation for one touched target key.
fn prepare_reverse_relation_index_mutation_for_target(
    source: ReverseRelationSourceInfo,
    relation: StrongRelationInfo,
    target: ReverseRelationMutationTarget,
    existing: Option<&RawIndexEntry>,
    source_storage_key: StorageKey,
) -> Result<Option<PreparedIndexMutation>, InternalError> {
    if target.old_contains == target.new_contains {
        return Ok(None);
    }

    let mut entry = existing
        .map(|raw| decode_reverse_entry(source, relation, &target.reverse_key, raw))
        .transpose()?;

    let delta_kind = PreparedIndexDeltaKind::from_reverse_index_membership(
        target.old_contains,
        target.new_contains,
    );

    // Phase 1: mutate the stored reverse-index membership directly from the
    // old/new target-membership booleans. The authoritative source key is the
    // already-validated commit key, so the old/new lanes do not need separate
    // optional key plumbing here.
    if target.old_contains
        && let Some(current) = entry.as_mut()
    {
        current.remove(source_storage_key);
    }

    if target.new_contains {
        if let Some(current) = entry.as_mut() {
            current.insert(source_storage_key);
        } else {
            entry = Some(IndexEntry::new(source_storage_key));
        }
    }

    let next_value = if let Some(next_entry) = entry {
        if next_entry.is_empty() {
            None
        } else {
            Some(encode_reverse_entry(source, relation, &next_entry)?)
        }
    } else {
        None
    };

    Ok(Some(PreparedIndexMutation {
        store: target.target_store,
        key: target.reverse_key,
        value: next_value,
        delta_kind,
    }))
}

/// Prepare reverse-index mutations for one source entity transition using
/// already-decoded structural slot readers from commit preflight.
pub(crate) fn prepare_reverse_relation_index_mutations_for_source_slot_readers<C>(
    db: &Db<C>,
    index_reader: &dyn StructuralIndexEntryReader,
    source: ReverseRelationSourceInfo,
    source_model: &'static EntityModel,
    source_storage_key: StorageKey,
    old_row_fields: Option<&StructuralSlotReader<'_>>,
    new_row_fields: Option<&StructuralSlotReader<'_>>,
) -> Result<Vec<PreparedIndexMutation>, InternalError>
where
    C: CanisterKind,
{
    let mut target_store = |relation| relation_target_store(db, source, relation);
    let source_rows = ReverseRelationSourceTransition {
        source_model,
        old_row: None,
        new_row: None,
        old_row_fields,
        new_row_fields,
    };

    prepare_reverse_relation_index_mutations_for_source_rows_impl(
        &mut target_store,
        index_reader,
        source,
        source_storage_key,
        source_rows,
    )
}

// Keep the reverse-index mutation loop nongeneric once the source entity has
// already been lowered onto one structural target-store lookup callback.
fn prepare_reverse_relation_index_mutations_for_source_rows_impl(
    target_store_for_relation: &mut dyn FnMut(
        StrongRelationInfo,
    ) -> Result<
        &'static LocalKey<RefCell<IndexStore>>,
        InternalError,
    >,
    index_reader: &dyn StructuralIndexEntryReader,
    source: ReverseRelationSourceInfo,
    source_storage_key: StorageKey,
    source_rows: ReverseRelationSourceTransition<'_, '_>,
) -> Result<Vec<PreparedIndexMutation>, InternalError> {
    // Phase 1: derive the single source storage key once from the already-validated
    // commit marker key instead of recomputing it through typed entity ids.
    let mut ops = Vec::new();

    // Phase 2: evaluate each strong relation independently and derive index deltas
    // directly from persisted row payloads.
    for relation in strong_relations_for_model_iter(source_rows.source_model, None) {
        let old_targets = relation_target_keys_for_transition_side(
            &source_rows,
            source_rows.old_row_fields,
            source_rows.old_row,
            source,
            relation,
        )?;
        let new_targets = relation_target_keys_for_transition_side(
            &source_rows,
            source_rows.new_row_fields,
            source_rows.new_row,
            source,
            relation,
        )?;
        let target_store = target_store_for_relation(relation)?;
        let mut old_index = 0usize;
        let mut new_index = 0usize;

        // Phase 3: walk the canonical union of old/new targets directly
        // instead of cloning, re-sorting, and then binary-searching both
        // source vectors again for each touched target.
        while old_index < old_targets.len() || new_index < new_targets.len() {
            let (target_raw_key, old_contains, new_contains) =
                match (old_targets.get(old_index), new_targets.get(new_index)) {
                    (Some(old_key), Some(new_key)) => match old_key.cmp(new_key) {
                        std::cmp::Ordering::Less => {
                            old_index += 1;
                            (*old_key, true, false)
                        }
                        std::cmp::Ordering::Greater => {
                            new_index += 1;
                            (*new_key, false, true)
                        }
                        std::cmp::Ordering::Equal => {
                            old_index += 1;
                            new_index += 1;
                            (*old_key, true, true)
                        }
                    },
                    (Some(old_key), None) => {
                        old_index += 1;
                        (*old_key, true, false)
                    }
                    (None, Some(new_key)) => {
                        new_index += 1;
                        (*new_key, false, true)
                    }
                    (None, None) => break,
                };

            let Some(target_data_key) = decode_relation_target_data_key(
                source,
                relation,
                &target_raw_key,
                RelationTargetDecodeContext::ReverseIndexPrepare,
                RelationTargetMismatchPolicy::Reject,
            )?
            else {
                return Err(
                    InternalError::reverse_index_relation_target_decode_invariant_violated(
                        source.path,
                        relation.field_name,
                        relation.target_path,
                    ),
                );
            };

            let Some(reverse_key) = reverse_index_key_for_target_storage_key(
                source,
                relation,
                target_data_key.storage_key(),
            )?
            else {
                continue;
            };

            let existing = index_reader.read_index_entry_structural(target_store, &reverse_key)?;
            let target = ReverseRelationMutationTarget {
                target_store,
                reverse_key,
                old_contains,
                new_contains,
            };
            let Some(op) = prepare_reverse_relation_index_mutation_for_target(
                source,
                relation,
                target,
                existing.as_ref(),
                source_storage_key,
            )?
            else {
                continue;
            };

            ops.push(op);
        }
    }

    Ok(ops)
}

// Resolve relation targets for one old/new source-row side, preferring the
// already-decoded slot-reader view when commit preflight has one available.
fn relation_target_keys_for_transition_side(
    source_rows: &ReverseRelationSourceTransition<'_, '_>,
    row_fields: Option<&StructuralSlotReader<'_>>,
    row: Option<&RawRow>,
    source: ReverseRelationSourceInfo,
    relation: StrongRelationInfo,
) -> Result<Vec<RawDataKey>, InternalError> {
    match row_fields {
        Some(row_fields) => relation_target_raw_keys_for_source_slots(row_fields, source, relation),
        None => match row {
            Some(row) => relation_target_raw_keys_for_source_row(
                row,
                source_rows.source_model,
                source,
                relation,
            ),
            None => Ok(Vec::new()),
        },
    }
}