Skip to main content

reddb_server/runtime/ai/
cdc_enrichment.rs

1//! CDC enrichment consumer (#1272, PRD #1267) — the first end-to-end AI
2//! modality wired over the existing change stream.
3//!
4//! A collection that declares an `EMBED (...)` policy (issue #1271) gets
5//! its declared fields auto-vectorised *asynchronously* after commit. The
6//! write path itself does no provider work: an INSERT/UPDATE simply emits
7//! its usual CDC event and returns. This consumer is the thing that, on a
8//! later pass, drains the LSN-ordered change stream, recomputes embeddings
9//! for committed rows via the policy's provider, and attaches the vectors
10//! into the collection (reusing the existing `create_vector` +
11//! local-embedding machinery).
12//!
13//! Because a row is only searchable once its vector exists, "pending"
14//! enrichment is naturally excluded from `VECTOR SEARCH` until the consumer
15//! attaches the vector — at which point the row is included like any other.
16//! The consumer additionally owns:
17//!   - a `pending` work set (rows whose enrichment hasn't completed),
18//!   - retry-with-backoff on provider failure,
19//!   - a dead-letter list after a bounded number of failures, and
20//!   - an ops re-drive path that moves dead-letters back to pending.
21//!
22//! The consumer is driven explicitly via [`CdcEnrichmentConsumer::tick`],
23//! which takes the current time so retry backoff is deterministic in tests
24//! and a production scheduler can drive it from a background thread without
25//! changing the semantics.
26
27use crate::application::entity::{CreateVectorInput, DeleteEntityInput, PatchEntityInput};
28use crate::application::ports::RuntimeEntityPort;
29use crate::catalog::{EmbedPolicy, ModerateDegradedMode, ModeratePolicy, VisionPolicy};
30use crate::replication::cdc::ChangeOperation;
31use crate::runtime::ai::moderation::{
32    ModerationOutcome, MODERATION_STATUS_FIELD, MODERATION_STATUS_PENDING,
33    MODERATION_STATUS_REJECTED,
34};
35use crate::runtime::mutation::MutationRow;
36use crate::storage::schema::Value;
37use crate::storage::{EntityData, EntityId};
38use crate::{RedDBError, RedDBResult, RedDBRuntime};
39
40/// Derived field that receives the structured component-detections array
41/// (`[{label, confidence, bbox:[x,y,w,h]}]`). It is a normal row field, so
42/// RQL filters (e.g. `CONTAINS(vision_detections, 'person')`) work over it
43/// once the consumer attaches it.
44pub const VISION_DETECTIONS_FIELD: &str = "vision_detections";
45
46/// Which async enrichment a pending work item carries. A single committed
47/// row can require both (a collection may declare EMBED and VISION); each
48/// is tracked and retried independently.
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum EnrichmentKind {
51    /// Auto-embed the declared text fields (#1272).
52    Embed,
53    /// Run computer vision over the declared image-reference field (#1275).
54    Vision,
55    /// Re-moderate a quarantine-pending row's declared text fields (#1274).
56    /// A pass clears the row's quarantine (it becomes visible); a reject
57    /// tombstones it (hidden, retained for audit) or hard-deletes it.
58    Moderate,
59}
60
61/// Tunables for the enrichment consumer.
62#[derive(Debug, Clone)]
63pub struct EnrichmentConfig {
64    /// Number of provider attempts before a work item is dead-lettered.
65    /// Must be `>= 1`.
66    pub max_attempts: u32,
67    /// Base backoff applied after the first failure; subsequent failures
68    /// back off exponentially (`base * 2^(attempts-1)`).
69    pub base_backoff_ms: u64,
70    /// Maximum number of CDC events ingested per `tick`.
71    pub poll_max: usize,
72}
73
74impl Default for EnrichmentConfig {
75    fn default() -> Self {
76        Self {
77            max_attempts: 3,
78            base_backoff_ms: 100,
79            poll_max: 1024,
80        }
81    }
82}
83
84/// A row awaiting enrichment.
85#[derive(Debug, Clone)]
86struct PendingWork {
87    collection: String,
88    entity_id: u64,
89    kind: EnrichmentKind,
90    attempts: u32,
91    /// Earliest wall-clock (unix ms) at which the next attempt may run.
92    not_before_ms: u64,
93}
94
95/// A work item that exhausted its retry budget. Surfaced to operators and
96/// re-drivable via [`CdcEnrichmentConsumer::redrive`].
97#[derive(Debug, Clone)]
98pub struct DeadLetter {
99    pub collection: String,
100    pub entity_id: u64,
101    pub kind: EnrichmentKind,
102    pub attempts: u32,
103    pub last_error: String,
104}
105
106/// Per-`tick` outcome counters.
107#[derive(Debug, Clone, Default, PartialEq, Eq)]
108pub struct TickStats {
109    /// CDC events accepted into the pending set this tick.
110    pub ingested: usize,
111    /// Rows whose vectors were attached this tick.
112    pub attached: usize,
113    /// Failed attempts re-scheduled with backoff this tick.
114    pub retried: usize,
115    /// Work items dead-lettered this tick.
116    pub dead_lettered: usize,
117}
118
119/// Drains the CDC stream and enriches embed-policy collections.
120///
121/// Holds its own cursor, pending set, and dead-letter list — one consumer
122/// instance owns the enrichment state for a runtime.
123pub struct CdcEnrichmentConsumer {
124    cursor: u64,
125    config: EnrichmentConfig,
126    pending: Vec<PendingWork>,
127    dead_letters: Vec<DeadLetter>,
128}
129
130impl CdcEnrichmentConsumer {
131    /// New consumer starting from the stream origin (LSN 0) with the given
132    /// config.
133    pub fn new(config: EnrichmentConfig) -> Self {
134        Self {
135            cursor: 0,
136            config,
137            pending: Vec::new(),
138            dead_letters: Vec::new(),
139        }
140    }
141
142    /// New consumer with default tunables.
143    pub fn with_defaults() -> Self {
144        Self::new(EnrichmentConfig::default())
145    }
146
147    /// Last CDC LSN this consumer has ingested.
148    pub fn cursor(&self) -> u64 {
149        self.cursor
150    }
151
152    /// Rows currently awaiting enrichment.
153    pub fn pending_len(&self) -> usize {
154        self.pending.len()
155    }
156
157    /// True while `(collection, entity_id)` is still awaiting any kind of
158    /// enrichment.
159    pub fn is_pending(&self, collection: &str, entity_id: u64) -> bool {
160        self.pending
161            .iter()
162            .any(|w| w.collection == collection && w.entity_id == entity_id)
163    }
164
165    /// True while `(collection, entity_id)` is awaiting the given kind of
166    /// enrichment specifically.
167    pub fn is_pending_kind(&self, collection: &str, entity_id: u64, kind: EnrichmentKind) -> bool {
168        self.pending
169            .iter()
170            .any(|w| w.collection == collection && w.entity_id == entity_id && w.kind == kind)
171    }
172
173    /// Dead-lettered work items (enrichment failed past the retry budget).
174    pub fn dead_letters(&self) -> &[DeadLetter] {
175        &self.dead_letters
176    }
177
178    /// Ops re-drive: move every dead-letter back into the pending set with a
179    /// fresh retry budget. Returns the number of items re-driven.
180    pub fn redrive(&mut self) -> usize {
181        let drained: Vec<DeadLetter> = self.dead_letters.drain(..).collect();
182        let count = drained.len();
183        for dl in drained {
184            self.enqueue(dl.collection, dl.entity_id, dl.kind);
185        }
186        count
187    }
188
189    /// Ingest newly committed CDC events and then attempt every pending item
190    /// whose backoff has elapsed. Embedding work happens here, never on the
191    /// write path — so write latency is independent of provider latency.
192    pub fn tick(&mut self, rt: &RedDBRuntime, now_ms: u64) -> RedDBResult<TickStats> {
193        let mut stats = TickStats::default();
194
195        // 1. Ingest committed change events for embed- and vision-policy
196        //    collections. A row can require both; each modality is queued
197        //    and retried independently.
198        let events = rt.cdc_poll(self.cursor, self.config.poll_max);
199        for event in &events {
200            if event.lsn > self.cursor {
201                self.cursor = event.lsn;
202            }
203            if let Some(policy) = rt.collection_embed_policy(&event.collection) {
204                if change_touches_embed_fields(event, &policy)
205                    && self.enqueue(
206                        event.collection.clone(),
207                        event.entity_id,
208                        EnrichmentKind::Embed,
209                    )
210                {
211                    stats.ingested += 1;
212                }
213            }
214            if let Some(policy) = rt.collection_vision_policy(&event.collection) {
215                if change_touches_vision_field(event, &policy)
216                    && self.enqueue(
217                        event.collection.clone(),
218                        event.entity_id,
219                        EnrichmentKind::Vision,
220                    )
221                {
222                    stats.ingested += 1;
223                }
224            }
225            // Re-moderation rides the same lane, but only quarantine-pending
226            // rows are eligible: the synchronous gate already screened
227            // everything that committed clean, so a normal insert/update
228            // must NOT be re-screened here. We gate on the row actually
229            // carrying the pending marker (see `row_is_moderation_pending`).
230            if let Some(policy) = rt.collection_moderate_policy(&event.collection) {
231                if change_touches_moderate_fields(event, &policy)
232                    && rt.row_is_moderation_pending(&event.collection, event.entity_id)
233                    && self.enqueue(
234                        event.collection.clone(),
235                        event.entity_id,
236                        EnrichmentKind::Moderate,
237                    )
238                {
239                    stats.ingested += 1;
240                }
241            }
242        }
243
244        // 2. Attempt every ready pending item.
245        let drained: Vec<PendingWork> = std::mem::take(&mut self.pending);
246        let mut still_pending = Vec::with_capacity(drained.len());
247        for mut work in drained {
248            if work.not_before_ms > now_ms {
249                still_pending.push(work);
250                continue;
251            }
252            // The policy can disappear if the collection was dropped/altered
253            // between enqueue and drain — quietly forget such work.
254            let attempt = match work.kind {
255                EnrichmentKind::Embed => match rt.collection_embed_policy(&work.collection) {
256                    Some(policy) => {
257                        rt.enrich_row_embedding(&work.collection, work.entity_id, &policy)
258                    }
259                    None => continue,
260                },
261                EnrichmentKind::Vision => match rt.collection_vision_policy(&work.collection) {
262                    Some(policy) => rt.enrich_row_vision(&work.collection, work.entity_id, &policy),
263                    None => continue,
264                },
265                EnrichmentKind::Moderate => match rt.collection_moderate_policy(&work.collection) {
266                    Some(policy) => {
267                        rt.remoderate_pending_row(&work.collection, work.entity_id, &policy)
268                    }
269                    None => continue,
270                },
271            };
272            match attempt {
273                Ok(()) => stats.attached += 1,
274                Err(err) => {
275                    work.attempts += 1;
276                    if work.attempts >= self.config.max_attempts {
277                        self.dead_letters.push(DeadLetter {
278                            collection: work.collection,
279                            entity_id: work.entity_id,
280                            kind: work.kind,
281                            attempts: work.attempts,
282                            last_error: format!("{err:?}"),
283                        });
284                        stats.dead_lettered += 1;
285                    } else {
286                        let shift = work.attempts - 1;
287                        let backoff = self
288                            .config
289                            .base_backoff_ms
290                            .saturating_mul(1u64.checked_shl(shift).unwrap_or(u64::MAX));
291                        work.not_before_ms = now_ms.saturating_add(backoff);
292                        still_pending.push(work);
293                        stats.retried += 1;
294                    }
295                }
296            }
297        }
298        self.pending = still_pending;
299
300        Ok(stats)
301    }
302
303    /// Add a row to the pending set unless this `(collection, entity, kind)`
304    /// is already queued. Returns true when a new item was enqueued.
305    fn enqueue(&mut self, collection: String, entity_id: u64, kind: EnrichmentKind) -> bool {
306        if self
307            .pending
308            .iter()
309            .any(|w| w.entity_id == entity_id && w.kind == kind && w.collection == collection)
310        {
311            return false;
312        }
313        self.pending.push(PendingWork {
314            collection,
315            entity_id,
316            kind,
317            attempts: 0,
318            not_before_ms: 0,
319        });
320        true
321    }
322}
323
324/// Whether a change event should (re)enrich the row under `policy`.
325///
326/// Inserts always enrich. Updates enrich when the damage vector intersects
327/// the declared embed fields, or when no damage vector is available (the
328/// emitter didn't compute one — enrich conservatively). Deletes/refreshes
329/// never enrich.
330fn change_touches_embed_fields(
331    event: &crate::replication::cdc::ChangeEvent,
332    policy: &EmbedPolicy,
333) -> bool {
334    match event.operation {
335        ChangeOperation::Insert => true,
336        ChangeOperation::Update => match &event.changed_columns {
337            Some(columns) => columns
338                .iter()
339                .any(|column| policy.fields.iter().any(|field| field == column)),
340            None => true,
341        },
342        ChangeOperation::Delete | ChangeOperation::Refresh => false,
343    }
344}
345
346/// Whether a change event should (re)run vision over the row under
347/// `policy`. Inserts always run. Updates run only when the declared
348/// image-reference field changed (or no damage vector is available, so we
349/// run conservatively). Crucially, an update that only touched the derived
350/// detections field — the consumer's own write-back — does NOT match,
351/// because that field is not the image-reference field, so vision never
352/// re-triggers itself into a loop. Deletes/refreshes never run.
353fn change_touches_vision_field(
354    event: &crate::replication::cdc::ChangeEvent,
355    policy: &VisionPolicy,
356) -> bool {
357    match event.operation {
358        ChangeOperation::Insert => true,
359        ChangeOperation::Update => match &event.changed_columns {
360            Some(columns) => columns.iter().any(|column| column == &policy.image_field),
361            None => true,
362        },
363        ChangeOperation::Delete | ChangeOperation::Refresh => false,
364    }
365}
366
367/// Whether a change event should (re)moderate the row under `policy`.
368///
369/// Inserts always qualify; updates qualify when a declared moderated field
370/// changed (or no damage vector is available, so we screen conservatively).
371/// Crucially the consumer's own write-backs — clearing the pending marker
372/// or stamping a reject — touch only [`MODERATION_STATUS_FIELD`], never a
373/// declared field, so re-moderation never re-triggers itself into a loop.
374/// Deletes/refreshes never qualify.
375fn change_touches_moderate_fields(
376    event: &crate::replication::cdc::ChangeEvent,
377    policy: &ModeratePolicy,
378) -> bool {
379    match event.operation {
380        ChangeOperation::Insert => true,
381        ChangeOperation::Update => match &event.changed_columns {
382            Some(columns) => columns
383                .iter()
384                .any(|column| policy.fields.iter().any(|field| field == column)),
385            None => true,
386        },
387        ChangeOperation::Delete | ChangeOperation::Refresh => false,
388    }
389}
390
391/// Whether the policy's output kinds request structured component
392/// detections. Several spellings are accepted so DDL authors are not
393/// boxed into one keyword.
394fn vision_wants_detections(policy: &VisionPolicy) -> bool {
395    policy.output_kinds.iter().any(|kind| {
396        matches!(
397            kind.trim().to_ascii_lowercase().as_str(),
398            "detections" | "objects" | "components" | "detection"
399        )
400    })
401}
402
403/// Whether the policy's output kinds request an image-embedding output.
404fn vision_wants_embedding(policy: &VisionPolicy) -> bool {
405    policy.output_kinds.iter().any(|kind| {
406        matches!(
407            kind.trim().to_ascii_lowercase().as_str(),
408            "embedding" | "image_embedding" | "image-embedding"
409        )
410    })
411}
412
413impl RedDBRuntime {
414    /// The declared embed policy for `collection`, if any.
415    pub fn collection_embed_policy(&self, collection: &str) -> Option<EmbedPolicy> {
416        self.db()
417            .collection_contract_arc(collection)
418            .and_then(|contract| contract.ai_policy.as_ref().and_then(|p| p.embed.clone()))
419    }
420
421    /// The declared vision policy for `collection`, if any.
422    pub fn collection_vision_policy(&self, collection: &str) -> Option<VisionPolicy> {
423        self.db()
424            .collection_contract_arc(collection)
425            .and_then(|contract| contract.ai_policy.as_ref().and_then(|p| p.vision.clone()))
426    }
427
428    /// The declared moderation policy for `collection`, if any.
429    pub fn collection_moderate_policy(&self, collection: &str) -> Option<ModeratePolicy> {
430        self.db()
431            .collection_contract_arc(collection)
432            .and_then(|contract| contract.ai_policy.as_ref().and_then(|p| p.moderate.clone()))
433    }
434
435    /// True when the live row carries the quarantine-pending moderation
436    /// marker. Resolves the row through its stable logical id (the same
437    /// path the enrichment methods use), so a superseded MVCC version
438    /// never reports a stale state.
439    pub(crate) fn row_is_moderation_pending(&self, collection: &str, entity_id: u64) -> bool {
440        self.db()
441            .store()
442            .get_table_row_by_logical_id(collection, EntityId::new(entity_id))
443            .map(|entity| {
444                matches!(
445                    row_text_field(&entity.data, MODERATION_STATUS_FIELD).as_deref(),
446                    Some(MODERATION_STATUS_PENDING)
447                )
448            })
449            .unwrap_or(false)
450    }
451
452    /// Synchronous pre-commit moderation gate (#1274, ADR 0057).
453    ///
454    /// Runs only when `collection` declares a `MODERATE (... sync = true)`
455    /// policy. For every queued row it screens the concatenated text of the
456    /// declared moderated fields BEFORE the durable commit:
457    ///   * **Allow** — the row commits unchanged.
458    ///   * **Reject** — the whole write is refused (`Err`); no row persists.
459    ///   * **ProviderDown** + degraded `Open` (default) — the row is
460    ///     quarantined: the reserved [`MODERATION_STATUS_FIELD`] is set to
461    ///     `pending`, so it commits but is hidden from normal reads and is
462    ///     re-moderated asynchronously by the CDC consumer.
463    ///   * **ProviderDown** + degraded `Closed` — the write is refused.
464    ///
465    /// A row whose declared fields are all empty has nothing to screen and
466    /// is allowed unchanged.
467    pub(crate) fn apply_sync_moderation_gate(
468        &self,
469        collection: &str,
470        rows: &mut [MutationRow],
471    ) -> RedDBResult<()> {
472        let Some(policy) = self.collection_moderate_policy(collection) else {
473            return Ok(());
474        };
475        if !policy.sync_gate || rows.is_empty() {
476            return Ok(());
477        }
478
479        for row in rows.iter_mut() {
480            let text = combine_moderate_text(&row.fields, &policy.fields);
481            if text.is_empty() {
482                continue;
483            }
484            let outcome =
485                crate::runtime::ai::moderation::moderate_local(&policy.model, text.clone())?;
486            match outcome {
487                ModerationOutcome::Allow => {}
488                ModerationOutcome::Reject { categories } => {
489                    // Reject fails the write — the row never persists.
490                    return Err(RedDBError::Query(format!(
491                        "write rejected by moderation gate on collection '{collection}': \
492                         flagged categories [{}]",
493                        categories.join(", ")
494                    )));
495                }
496                ModerationOutcome::ProviderDown { reason } => match policy.degraded_mode {
497                    // Fail-closed: provider-down blocks the write.
498                    ModerateDegradedMode::Closed => {
499                        return Err(RedDBError::Query(format!(
500                            "write blocked: moderation provider unavailable for collection \
501                             '{collection}' (degraded = closed): {reason}"
502                        )));
503                    }
504                    // Fail-open default: quarantine the row. It commits but
505                    // is hidden from normal reads and re-moderated async.
506                    ModerateDegradedMode::Open => {
507                        set_row_moderation_marker(row, MODERATION_STATUS_PENDING);
508                    }
509                },
510            }
511        }
512        Ok(())
513    }
514
515    /// Re-moderate one quarantine-pending row (CDC lane). A pass clears the
516    /// pending marker (the row becomes visible); a reject either tombstones
517    /// the row (default — hidden, retained for audit/appeal) or hard-deletes
518    /// it when the policy opts in via `hard_delete`. Provider-down here is a
519    /// retryable failure: the row stays pending and the consumer's
520    /// retry/dead-letter machinery handles it like any other failure.
521    pub(crate) fn remoderate_pending_row(
522        &self,
523        collection: &str,
524        entity_id: u64,
525        policy: &ModeratePolicy,
526    ) -> RedDBResult<()> {
527        let db = self.db();
528        let Some(entity) = db
529            .store()
530            .get_table_row_by_logical_id(collection, EntityId::new(entity_id))
531        else {
532            return Ok(());
533        };
534
535        // Only act on rows still in the pending state. A row that was
536        // already cleared or tombstoned between enqueue and drain needs no
537        // further work.
538        if !matches!(
539            row_text_field(&entity.data, MODERATION_STATUS_FIELD).as_deref(),
540            Some(MODERATION_STATUS_PENDING)
541        ) {
542            return Ok(());
543        }
544
545        let text = combine_moderate_text_named(&entity.data, &policy.fields);
546        if text.is_empty() {
547            // Nothing to screen — clear the quarantine so the row surfaces.
548            return self.clear_row_moderation_marker(collection, entity.id);
549        }
550
551        let outcome = crate::runtime::ai::moderation::moderate_local(&policy.model, text)?;
552        match outcome {
553            ModerationOutcome::Allow => self.clear_row_moderation_marker(collection, entity.id),
554            ModerationOutcome::Reject { .. } => {
555                if policy.hard_delete_on_reject {
556                    self.delete_entity(DeleteEntityInput {
557                        collection: collection.to_string(),
558                        id: entity.id,
559                    })?;
560                    Ok(())
561                } else {
562                    self.set_row_moderation_status(
563                        collection,
564                        entity.id,
565                        MODERATION_STATUS_REJECTED,
566                    )
567                }
568            }
569            // Provider still down — surface as a retryable error so the
570            // row stays pending and is retried/dead-lettered.
571            ModerationOutcome::ProviderDown { reason } => Err(RedDBError::Query(format!(
572                "re-moderation provider unavailable for collection '{collection}': {reason}"
573            ))),
574        }
575    }
576
577    /// Clear the moderation marker so a previously-quarantined row becomes
578    /// visible to normal reads. Patches the field to an empty string and
579    /// relies on the visibility helper treating only a present text marker
580    /// as hidden — so an empty marker is, by design, still hidden. To make
581    /// the row visible we instead remove the field via a `fields` patch
582    /// that sets it to JSON null, which the storage merge drops.
583    fn clear_row_moderation_marker(&self, collection: &str, id: EntityId) -> RedDBResult<()> {
584        self.patch_entity(PatchEntityInput {
585            collection: collection.to_string(),
586            id,
587            payload: moderation_marker_clear_payload(),
588            operations: Vec::new(),
589        })?;
590        Ok(())
591    }
592
593    /// Stamp the row with a moderation status (`pending`/`rejected`).
594    fn set_row_moderation_status(
595        &self,
596        collection: &str,
597        id: EntityId,
598        status: &str,
599    ) -> RedDBResult<()> {
600        self.patch_entity(PatchEntityInput {
601            collection: collection.to_string(),
602            id,
603            payload: moderation_marker_set_payload(status),
604            operations: Vec::new(),
605        })?;
606        Ok(())
607    }
608
609    /// Compute the embedding for one committed row and attach it as a vector
610    /// in the same collection. Reuses the existing embedding + vector
611    /// storage path so `VECTOR SEARCH` surfaces the row exactly as a manual
612    /// `WITH AUTO EMBED` insert would.
613    ///
614    /// A row whose declared fields are all empty is treated as complete (no
615    /// vector attached) rather than failed — there is nothing to embed.
616    pub(crate) fn enrich_row_embedding(
617        &self,
618        collection: &str,
619        entity_id: u64,
620        policy: &EmbedPolicy,
621    ) -> RedDBResult<()> {
622        let db = self.db();
623        // The CDC event carries the row's stable *logical* id; resolve the
624        // live version through it so an update re-embeds the new field values
625        // rather than a superseded MVCC version. A `None` here means the
626        // event was not a live table row (e.g. the enrichment vector's own
627        // insert event, or a deleted row) — nothing to enrich.
628        let Some(entity) = db
629            .store()
630            .get_table_row_by_logical_id(collection, EntityId::new(entity_id))
631        else {
632            return Ok(());
633        };
634
635        let Some(text) = combine_embed_text(&entity.data, &policy.fields) else {
636            return Ok(());
637        };
638
639        let dense = embed_one(self, policy, &text)?;
640        if dense.is_empty() {
641            return Ok(());
642        }
643
644        self.create_vector(CreateVectorInput {
645            collection: collection.to_string(),
646            dense,
647            content: Some(text),
648            metadata: Vec::new(),
649            link_row: None,
650            link_node: None,
651        })?;
652        Ok(())
653    }
654
655    /// Run computer vision over one committed row: fetch the image
656    /// referenced by the policy's `image_field`, call the vision provider,
657    /// write the structured component-detections to the derived
658    /// [`VISION_DETECTIONS_FIELD`] (RQL-filterable), and — when the policy
659    /// requests it — attach an image-embedding vector reusing the existing
660    /// vector pipeline.
661    ///
662    /// A row whose image reference is absent/empty is treated as complete
663    /// (nothing to analyze) rather than failed.
664    pub(crate) fn enrich_row_vision(
665        &self,
666        collection: &str,
667        entity_id: u64,
668        policy: &VisionPolicy,
669    ) -> RedDBResult<()> {
670        // This slice drives the in-process `local` provider (the path the
671        // mock vision backend exercises); other providers are rejected with
672        // a deterministic error that the retry/dead-letter machinery
673        // handles like any failure.
674        match crate::ai::parse_provider(&policy.provider)? {
675            crate::ai::AiProvider::Local => {}
676            other => {
677                return Err(RedDBError::Query(format!(
678                    "CDC vision enrichment currently drives the 'local' provider; \
679                     collection policy declares '{other:?}'"
680                )));
681            }
682        }
683
684        let db = self.db();
685        // Resolve the live row through its stable logical id (see
686        // `enrich_row_embedding`). `None` means the event was not a live
687        // table row — nothing to enrich.
688        let Some(entity) = db
689            .store()
690            .get_table_row_by_logical_id(collection, EntityId::new(entity_id))
691        else {
692            return Ok(());
693        };
694
695        let Some(reference) = row_text_field(&entity.data, &policy.image_field) else {
696            return Ok(());
697        };
698        if reference.is_empty() {
699            return Ok(());
700        }
701
702        let want_detections = vision_wants_detections(policy);
703        let want_embedding = vision_wants_embedding(policy);
704        if !want_detections && !want_embedding {
705            return Ok(());
706        }
707
708        let image_bytes = crate::runtime::ai::vision::fetch_image_bytes(&reference)?;
709        let result = crate::runtime::ai::vision::analyze_local(
710            &policy.model,
711            image_bytes,
712            want_detections,
713            want_embedding,
714        )?;
715
716        if want_detections {
717            // Write the canonical detections array as a JSON row field. The
718            // damage vector for this update covers only the derived field,
719            // never `image_field`, so it cannot re-trigger vision.
720            let detections_json = detections_to_json(&result.detections);
721            self.patch_entity(PatchEntityInput {
722                collection: collection.to_string(),
723                id: entity.id,
724                payload: vision_detections_payload(detections_json),
725                operations: Vec::new(),
726            })?;
727        }
728
729        if want_embedding {
730            if let Some(embedding) = result.embedding {
731                if !embedding.is_empty() {
732                    self.create_vector(CreateVectorInput {
733                        collection: collection.to_string(),
734                        dense: embedding,
735                        content: Some(reference),
736                        metadata: Vec::new(),
737                        link_row: None,
738                        link_node: None,
739                    })?;
740                }
741            }
742        }
743
744        Ok(())
745    }
746}
747
748/// Read a row's text-valued field as an owned string. Returns `None` when
749/// the entity is not a row, the field is absent, or it is not text.
750fn row_text_field(data: &EntityData, field: &str) -> Option<String> {
751    let EntityData::Row(row) = data else {
752        return None;
753    };
754    let named = row.named.as_ref()?;
755    match named.get(field) {
756        Some(Value::Text(text)) => Some(text.to_string()),
757        Some(Value::Url(url)) => Some(url.clone()),
758        _ => None,
759    }
760}
761
762/// Encode the detections as a JSON array value
763/// (`[{label, confidence, bbox:[x,y,w,h]}]`).
764fn detections_to_json(
765    detections: &[crate::runtime::ai::vision::VisionDetection],
766) -> crate::serde_json::Value {
767    use crate::serde_json::{Map, Value as Sj};
768    let items = detections
769        .iter()
770        .map(|d| {
771            let mut obj = Map::new();
772            obj.insert("label".to_string(), Sj::String(d.label.clone()));
773            obj.insert("confidence".to_string(), Sj::Number(d.confidence as f64));
774            obj.insert(
775                "bbox".to_string(),
776                Sj::Array(d.bbox.iter().map(|v| Sj::Number(*v as f64)).collect()),
777            );
778            Sj::Object(obj)
779        })
780        .collect();
781    Sj::Array(items)
782}
783
784/// Build the JSON-patch payload that sets the derived detections field via
785/// `patch_entity`'s `fields` merge form.
786fn vision_detections_payload(
787    detections_json: crate::serde_json::Value,
788) -> crate::serde_json::Value {
789    use crate::serde_json::{Map, Value as Sj};
790    let mut fields = Map::new();
791    fields.insert(VISION_DETECTIONS_FIELD.to_string(), detections_json);
792    let mut root = Map::new();
793    root.insert("fields".to_string(), Sj::Object(fields));
794    Sj::Object(root)
795}
796
797/// Concatenate the declared moderated fields' text from a pre-commit
798/// `MutationRow`'s field list. Only text-valued declared fields contribute;
799/// the result is empty when there is nothing to screen.
800fn combine_moderate_text(fields: &[(String, Value)], declared: &[String]) -> String {
801    let mut parts: Vec<&str> = Vec::new();
802    for field in declared {
803        for (name, value) in fields {
804            if name == field {
805                if let Value::Text(text) = value {
806                    if !text.is_empty() {
807                        parts.push(text);
808                    }
809                }
810            }
811        }
812    }
813    parts.join(" ")
814}
815
816/// Concatenate the declared moderated fields' text from a committed row's
817/// `EntityData`. Empty when the entity is not a row or no declared field
818/// holds non-empty text.
819fn combine_moderate_text_named(data: &EntityData, declared: &[String]) -> String {
820    let EntityData::Row(row) = data else {
821        return String::new();
822    };
823    let Some(named) = row.named.as_ref() else {
824        return String::new();
825    };
826    let parts: Vec<String> = declared
827        .iter()
828        .filter_map(|field| match named.get(field) {
829            Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
830            _ => None,
831        })
832        .collect();
833    parts.join(" ")
834}
835
836/// Stamp the reserved moderation marker onto a pre-commit row's field list,
837/// replacing any prior value for the field.
838fn set_row_moderation_marker(row: &mut MutationRow, status: &str) {
839    row.fields
840        .retain(|(name, _)| name != MODERATION_STATUS_FIELD);
841    row.fields.push((
842        MODERATION_STATUS_FIELD.to_string(),
843        Value::Text(std::sync::Arc::from(status)),
844    ));
845}
846
847/// `fields`-merge patch payload that sets the moderation marker to `status`.
848fn moderation_marker_set_payload(status: &str) -> crate::serde_json::Value {
849    use crate::serde_json::{Map, Value as Sj};
850    let mut fields = Map::new();
851    fields.insert(
852        MODERATION_STATUS_FIELD.to_string(),
853        Sj::String(status.to_string()),
854    );
855    let mut root = Map::new();
856    root.insert("fields".to_string(), Sj::Object(fields));
857    Sj::Object(root)
858}
859
860/// `fields`-merge patch payload that clears the moderation marker (sets it
861/// to the empty string). The storage merge has no field-removal form, so
862/// the cleared row keeps an empty marker — which the visibility helper
863/// treats as visible, exactly like an absent marker.
864fn moderation_marker_clear_payload() -> crate::serde_json::Value {
865    moderation_marker_set_payload("")
866}
867
868/// Join the declared embed fields' text values, mirroring the manual
869/// `WITH AUTO EMBED` collector. Returns `None` when no non-empty text field
870/// is present (e.g. the entity is a vector/non-row, or all fields are empty).
871fn combine_embed_text(data: &EntityData, fields: &[String]) -> Option<String> {
872    let EntityData::Row(row) = data else {
873        return None;
874    };
875    let named = row.named.as_ref()?;
876    let texts: Vec<String> = fields
877        .iter()
878        .filter_map(|field| match named.get(field) {
879            Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
880            _ => None,
881        })
882        .collect();
883    if texts.is_empty() {
884        None
885    } else {
886        Some(texts.join(" "))
887    }
888}
889
890/// Dispatch a single embedding through the policy's provider. This slice
891/// drives the in-process `local` backend (the path the issue's mock
892/// provider exercises); other providers are rejected with a deterministic
893/// error that the retry/dead-letter machinery handles like any failure.
894fn embed_one(rt: &RedDBRuntime, policy: &EmbedPolicy, text: &str) -> RedDBResult<Vec<f32>> {
895    let provider = crate::ai::parse_provider(&policy.provider)?;
896    match provider {
897        crate::ai::AiProvider::Local => {
898            let db = rt.db();
899            let response = crate::runtime::ai::local_embedding::embed_local_with_db(
900                &db,
901                &policy.model,
902                vec![text.to_string()],
903            )?;
904            response.embeddings.into_iter().next().ok_or_else(|| {
905                RedDBError::Query("local embedding backend returned no vector".to_string())
906            })
907        }
908        other => Err(RedDBError::Query(format!(
909            "CDC enrichment currently drives the 'local' provider; \
910             collection policy declares '{other:?}'"
911        ))),
912    }
913}