reddb_server/runtime/ai/cdc_enrichment.rs
1//! CDC enrichment consumer (#1272, PRD #1267) — the first end-to-end AI
2//! modality wired over the existing change stream.
3//!
4//! A collection that declares an `EMBED (...)` policy (issue #1271) gets
5//! its declared fields auto-vectorised *asynchronously* after commit. The
6//! write path itself does no provider work: an INSERT/UPDATE simply emits
7//! its usual CDC event and returns. This consumer is the thing that, on a
8//! later pass, drains the LSN-ordered change stream, recomputes embeddings
9//! for committed rows via the policy's provider, and attaches the vectors
10//! into the collection (reusing the existing `create_vector` +
11//! local-embedding machinery).
12//!
13//! Because a row is only searchable once its vector exists, "pending"
14//! enrichment is naturally excluded from `VECTOR SEARCH` until the consumer
15//! attaches the vector — at which point the row is included like any other.
16//! The consumer additionally owns:
17//! - a `pending` work set (rows whose enrichment hasn't completed),
18//! - retry-with-backoff on provider failure,
19//! - a dead-letter list after a bounded number of failures, and
20//! - an ops re-drive path that moves dead-letters back to pending.
21//!
22//! The consumer is driven explicitly via [`CdcEnrichmentConsumer::tick`],
23//! which takes the current time so retry backoff is deterministic in tests
24//! and a production scheduler can drive it from a background thread without
25//! changing the semantics.
26
27use crate::application::entity::{CreateVectorInput, DeleteEntityInput, PatchEntityInput};
28use crate::application::ports::RuntimeEntityPort;
29use crate::catalog::{EmbedPolicy, ModerateDegradedMode, ModeratePolicy, VisionPolicy};
30use crate::replication::cdc::ChangeOperation;
31use crate::runtime::ai::moderation::{
32 ModerationOutcome, MODERATION_STATUS_FIELD, MODERATION_STATUS_PENDING,
33 MODERATION_STATUS_REJECTED,
34};
35use crate::runtime::mutation::MutationRow;
36use crate::storage::schema::Value;
37use crate::storage::{EntityData, EntityId};
38use crate::{RedDBError, RedDBResult, RedDBRuntime};
39
40/// Derived field that receives the structured component-detections array
41/// (`[{label, confidence, bbox:[x,y,w,h]}]`). It is a normal row field, so
42/// RQL filters (e.g. `CONTAINS(vision_detections, 'person')`) work over it
43/// once the consumer attaches it.
44pub const VISION_DETECTIONS_FIELD: &str = "vision_detections";
45
46/// Which async enrichment a pending work item carries. A single committed
47/// row can require both (a collection may declare EMBED and VISION); each
48/// is tracked and retried independently.
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum EnrichmentKind {
51 /// Auto-embed the declared text fields (#1272).
52 Embed,
53 /// Run computer vision over the declared image-reference field (#1275).
54 Vision,
55 /// Re-moderate a quarantine-pending row's declared text fields (#1274).
56 /// A pass clears the row's quarantine (it becomes visible); a reject
57 /// tombstones it (hidden, retained for audit) or hard-deletes it.
58 Moderate,
59}
60
61/// Tunables for the enrichment consumer.
62#[derive(Debug, Clone)]
63pub struct EnrichmentConfig {
64 /// Number of provider attempts before a work item is dead-lettered.
65 /// Must be `>= 1`.
66 pub max_attempts: u32,
67 /// Base backoff applied after the first failure; subsequent failures
68 /// back off exponentially (`base * 2^(attempts-1)`).
69 pub base_backoff_ms: u64,
70 /// Maximum number of CDC events ingested per `tick`.
71 pub poll_max: usize,
72}
73
74impl Default for EnrichmentConfig {
75 fn default() -> Self {
76 Self {
77 max_attempts: 3,
78 base_backoff_ms: 100,
79 poll_max: 1024,
80 }
81 }
82}
83
84/// A row awaiting enrichment.
85#[derive(Debug, Clone)]
86struct PendingWork {
87 collection: String,
88 entity_id: u64,
89 kind: EnrichmentKind,
90 attempts: u32,
91 /// Earliest wall-clock (unix ms) at which the next attempt may run.
92 not_before_ms: u64,
93}
94
95/// A work item that exhausted its retry budget. Surfaced to operators and
96/// re-drivable via [`CdcEnrichmentConsumer::redrive`].
97#[derive(Debug, Clone)]
98pub struct DeadLetter {
99 pub collection: String,
100 pub entity_id: u64,
101 pub kind: EnrichmentKind,
102 pub attempts: u32,
103 pub last_error: String,
104}
105
106/// Per-`tick` outcome counters.
107#[derive(Debug, Clone, Default, PartialEq, Eq)]
108pub struct TickStats {
109 /// CDC events accepted into the pending set this tick.
110 pub ingested: usize,
111 /// Rows whose vectors were attached this tick.
112 pub attached: usize,
113 /// Failed attempts re-scheduled with backoff this tick.
114 pub retried: usize,
115 /// Work items dead-lettered this tick.
116 pub dead_lettered: usize,
117}
118
119/// Drains the CDC stream and enriches embed-policy collections.
120///
121/// Holds its own cursor, pending set, and dead-letter list — one consumer
122/// instance owns the enrichment state for a runtime.
123pub struct CdcEnrichmentConsumer {
124 cursor: u64,
125 config: EnrichmentConfig,
126 pending: Vec<PendingWork>,
127 dead_letters: Vec<DeadLetter>,
128}
129
130impl CdcEnrichmentConsumer {
131 /// New consumer starting from the stream origin (LSN 0) with the given
132 /// config.
133 pub fn new(config: EnrichmentConfig) -> Self {
134 Self {
135 cursor: 0,
136 config,
137 pending: Vec::new(),
138 dead_letters: Vec::new(),
139 }
140 }
141
142 /// New consumer with default tunables.
143 pub fn with_defaults() -> Self {
144 Self::new(EnrichmentConfig::default())
145 }
146
147 /// Last CDC LSN this consumer has ingested.
148 pub fn cursor(&self) -> u64 {
149 self.cursor
150 }
151
152 /// Rows currently awaiting enrichment.
153 pub fn pending_len(&self) -> usize {
154 self.pending.len()
155 }
156
157 /// True while `(collection, entity_id)` is still awaiting any kind of
158 /// enrichment.
159 pub fn is_pending(&self, collection: &str, entity_id: u64) -> bool {
160 self.pending
161 .iter()
162 .any(|w| w.collection == collection && w.entity_id == entity_id)
163 }
164
165 /// True while `(collection, entity_id)` is awaiting the given kind of
166 /// enrichment specifically.
167 pub fn is_pending_kind(&self, collection: &str, entity_id: u64, kind: EnrichmentKind) -> bool {
168 self.pending
169 .iter()
170 .any(|w| w.collection == collection && w.entity_id == entity_id && w.kind == kind)
171 }
172
173 /// Dead-lettered work items (enrichment failed past the retry budget).
174 pub fn dead_letters(&self) -> &[DeadLetter] {
175 &self.dead_letters
176 }
177
178 /// Ops re-drive: move every dead-letter back into the pending set with a
179 /// fresh retry budget. Returns the number of items re-driven.
180 pub fn redrive(&mut self) -> usize {
181 let drained: Vec<DeadLetter> = self.dead_letters.drain(..).collect();
182 let count = drained.len();
183 for dl in drained {
184 self.enqueue(dl.collection, dl.entity_id, dl.kind);
185 }
186 count
187 }
188
189 /// Ingest newly committed CDC events and then attempt every pending item
190 /// whose backoff has elapsed. Embedding work happens here, never on the
191 /// write path — so write latency is independent of provider latency.
192 pub fn tick(&mut self, rt: &RedDBRuntime, now_ms: u64) -> RedDBResult<TickStats> {
193 let mut stats = TickStats::default();
194
195 // 1. Ingest committed change events for embed- and vision-policy
196 // collections. A row can require both; each modality is queued
197 // and retried independently.
198 let events = rt.cdc_poll(self.cursor, self.config.poll_max);
199 for event in &events {
200 if event.lsn > self.cursor {
201 self.cursor = event.lsn;
202 }
203 if let Some(policy) = rt.collection_embed_policy(&event.collection) {
204 if change_touches_embed_fields(event, &policy)
205 && self.enqueue(
206 event.collection.clone(),
207 event.entity_id,
208 EnrichmentKind::Embed,
209 )
210 {
211 stats.ingested += 1;
212 }
213 }
214 if let Some(policy) = rt.collection_vision_policy(&event.collection) {
215 if change_touches_vision_field(event, &policy)
216 && self.enqueue(
217 event.collection.clone(),
218 event.entity_id,
219 EnrichmentKind::Vision,
220 )
221 {
222 stats.ingested += 1;
223 }
224 }
225 // Re-moderation rides the same lane, but only quarantine-pending
226 // rows are eligible: the synchronous gate already screened
227 // everything that committed clean, so a normal insert/update
228 // must NOT be re-screened here. We gate on the row actually
229 // carrying the pending marker (see `row_is_moderation_pending`).
230 if let Some(policy) = rt.collection_moderate_policy(&event.collection) {
231 if change_touches_moderate_fields(event, &policy)
232 && rt.row_is_moderation_pending(&event.collection, event.entity_id)
233 && self.enqueue(
234 event.collection.clone(),
235 event.entity_id,
236 EnrichmentKind::Moderate,
237 )
238 {
239 stats.ingested += 1;
240 }
241 }
242 }
243
244 // 2. Attempt every ready pending item.
245 let drained: Vec<PendingWork> = std::mem::take(&mut self.pending);
246 let mut still_pending = Vec::with_capacity(drained.len());
247 for mut work in drained {
248 if work.not_before_ms > now_ms {
249 still_pending.push(work);
250 continue;
251 }
252 // The policy can disappear if the collection was dropped/altered
253 // between enqueue and drain — quietly forget such work.
254 let attempt = match work.kind {
255 EnrichmentKind::Embed => match rt.collection_embed_policy(&work.collection) {
256 Some(policy) => {
257 rt.enrich_row_embedding(&work.collection, work.entity_id, &policy)
258 }
259 None => continue,
260 },
261 EnrichmentKind::Vision => match rt.collection_vision_policy(&work.collection) {
262 Some(policy) => rt.enrich_row_vision(&work.collection, work.entity_id, &policy),
263 None => continue,
264 },
265 EnrichmentKind::Moderate => match rt.collection_moderate_policy(&work.collection) {
266 Some(policy) => {
267 rt.remoderate_pending_row(&work.collection, work.entity_id, &policy)
268 }
269 None => continue,
270 },
271 };
272 match attempt {
273 Ok(()) => stats.attached += 1,
274 Err(err) => {
275 work.attempts += 1;
276 if work.attempts >= self.config.max_attempts {
277 self.dead_letters.push(DeadLetter {
278 collection: work.collection,
279 entity_id: work.entity_id,
280 kind: work.kind,
281 attempts: work.attempts,
282 last_error: format!("{err:?}"),
283 });
284 stats.dead_lettered += 1;
285 } else {
286 let shift = work.attempts - 1;
287 let backoff = self
288 .config
289 .base_backoff_ms
290 .saturating_mul(1u64.checked_shl(shift).unwrap_or(u64::MAX));
291 work.not_before_ms = now_ms.saturating_add(backoff);
292 still_pending.push(work);
293 stats.retried += 1;
294 }
295 }
296 }
297 }
298 self.pending = still_pending;
299
300 Ok(stats)
301 }
302
303 /// Add a row to the pending set unless this `(collection, entity, kind)`
304 /// is already queued. Returns true when a new item was enqueued.
305 fn enqueue(&mut self, collection: String, entity_id: u64, kind: EnrichmentKind) -> bool {
306 if self
307 .pending
308 .iter()
309 .any(|w| w.entity_id == entity_id && w.kind == kind && w.collection == collection)
310 {
311 return false;
312 }
313 self.pending.push(PendingWork {
314 collection,
315 entity_id,
316 kind,
317 attempts: 0,
318 not_before_ms: 0,
319 });
320 true
321 }
322}
323
324/// Whether a change event should (re)enrich the row under `policy`.
325///
326/// Inserts always enrich. Updates enrich when the damage vector intersects
327/// the declared embed fields, or when no damage vector is available (the
328/// emitter didn't compute one — enrich conservatively). Deletes/refreshes
329/// never enrich.
330fn change_touches_embed_fields(
331 event: &crate::replication::cdc::ChangeEvent,
332 policy: &EmbedPolicy,
333) -> bool {
334 match event.operation {
335 ChangeOperation::Insert => true,
336 ChangeOperation::Update => match &event.changed_columns {
337 Some(columns) => columns
338 .iter()
339 .any(|column| policy.fields.iter().any(|field| field == column)),
340 None => true,
341 },
342 ChangeOperation::Delete | ChangeOperation::Refresh => false,
343 }
344}
345
346/// Whether a change event should (re)run vision over the row under
347/// `policy`. Inserts always run. Updates run only when the declared
348/// image-reference field changed (or no damage vector is available, so we
349/// run conservatively). Crucially, an update that only touched the derived
350/// detections field — the consumer's own write-back — does NOT match,
351/// because that field is not the image-reference field, so vision never
352/// re-triggers itself into a loop. Deletes/refreshes never run.
353fn change_touches_vision_field(
354 event: &crate::replication::cdc::ChangeEvent,
355 policy: &VisionPolicy,
356) -> bool {
357 match event.operation {
358 ChangeOperation::Insert => true,
359 ChangeOperation::Update => match &event.changed_columns {
360 Some(columns) => columns.iter().any(|column| column == &policy.image_field),
361 None => true,
362 },
363 ChangeOperation::Delete | ChangeOperation::Refresh => false,
364 }
365}
366
367/// Whether a change event should (re)moderate the row under `policy`.
368///
369/// Inserts always qualify; updates qualify when a declared moderated field
370/// changed (or no damage vector is available, so we screen conservatively).
371/// Crucially the consumer's own write-backs — clearing the pending marker
372/// or stamping a reject — touch only [`MODERATION_STATUS_FIELD`], never a
373/// declared field, so re-moderation never re-triggers itself into a loop.
374/// Deletes/refreshes never qualify.
375fn change_touches_moderate_fields(
376 event: &crate::replication::cdc::ChangeEvent,
377 policy: &ModeratePolicy,
378) -> bool {
379 match event.operation {
380 ChangeOperation::Insert => true,
381 ChangeOperation::Update => match &event.changed_columns {
382 Some(columns) => columns
383 .iter()
384 .any(|column| policy.fields.iter().any(|field| field == column)),
385 None => true,
386 },
387 ChangeOperation::Delete | ChangeOperation::Refresh => false,
388 }
389}
390
391/// Whether the policy's output kinds request structured component
392/// detections. Several spellings are accepted so DDL authors are not
393/// boxed into one keyword.
394fn vision_wants_detections(policy: &VisionPolicy) -> bool {
395 policy.output_kinds.iter().any(|kind| {
396 matches!(
397 kind.trim().to_ascii_lowercase().as_str(),
398 "detections" | "objects" | "components" | "detection"
399 )
400 })
401}
402
403/// Whether the policy's output kinds request an image-embedding output.
404fn vision_wants_embedding(policy: &VisionPolicy) -> bool {
405 policy.output_kinds.iter().any(|kind| {
406 matches!(
407 kind.trim().to_ascii_lowercase().as_str(),
408 "embedding" | "image_embedding" | "image-embedding"
409 )
410 })
411}
412
413impl RedDBRuntime {
414 /// The declared embed policy for `collection`, if any.
415 pub fn collection_embed_policy(&self, collection: &str) -> Option<EmbedPolicy> {
416 self.db()
417 .collection_contract_arc(collection)
418 .and_then(|contract| contract.ai_policy.as_ref().and_then(|p| p.embed.clone()))
419 }
420
421 /// The declared vision policy for `collection`, if any.
422 pub fn collection_vision_policy(&self, collection: &str) -> Option<VisionPolicy> {
423 self.db()
424 .collection_contract_arc(collection)
425 .and_then(|contract| contract.ai_policy.as_ref().and_then(|p| p.vision.clone()))
426 }
427
428 /// The declared moderation policy for `collection`, if any.
429 pub fn collection_moderate_policy(&self, collection: &str) -> Option<ModeratePolicy> {
430 self.db()
431 .collection_contract_arc(collection)
432 .and_then(|contract| contract.ai_policy.as_ref().and_then(|p| p.moderate.clone()))
433 }
434
435 /// True when the live row carries the quarantine-pending moderation
436 /// marker. Resolves the row through its stable logical id (the same
437 /// path the enrichment methods use), so a superseded MVCC version
438 /// never reports a stale state.
439 pub(crate) fn row_is_moderation_pending(&self, collection: &str, entity_id: u64) -> bool {
440 self.db()
441 .store()
442 .get_table_row_by_logical_id(collection, EntityId::new(entity_id))
443 .map(|entity| {
444 matches!(
445 row_text_field(&entity.data, MODERATION_STATUS_FIELD).as_deref(),
446 Some(MODERATION_STATUS_PENDING)
447 )
448 })
449 .unwrap_or(false)
450 }
451
452 /// Synchronous pre-commit moderation gate (#1274, ADR 0057).
453 ///
454 /// Runs only when `collection` declares a `MODERATE (... sync = true)`
455 /// policy. For every queued row it screens the concatenated text of the
456 /// declared moderated fields BEFORE the durable commit:
457 /// * **Allow** — the row commits unchanged.
458 /// * **Reject** — the whole write is refused (`Err`); no row persists.
459 /// * **ProviderDown** + degraded `Open` (default) — the row is
460 /// quarantined: the reserved [`MODERATION_STATUS_FIELD`] is set to
461 /// `pending`, so it commits but is hidden from normal reads and is
462 /// re-moderated asynchronously by the CDC consumer.
463 /// * **ProviderDown** + degraded `Closed` — the write is refused.
464 ///
465 /// A row whose declared fields are all empty has nothing to screen and
466 /// is allowed unchanged.
467 pub(crate) fn apply_sync_moderation_gate(
468 &self,
469 collection: &str,
470 rows: &mut [MutationRow],
471 ) -> RedDBResult<()> {
472 let Some(policy) = self.collection_moderate_policy(collection) else {
473 return Ok(());
474 };
475 if !policy.sync_gate || rows.is_empty() {
476 return Ok(());
477 }
478
479 for row in rows.iter_mut() {
480 let text = combine_moderate_text(&row.fields, &policy.fields);
481 if text.is_empty() {
482 continue;
483 }
484 let outcome =
485 crate::runtime::ai::moderation::moderate_local(&policy.model, text.clone())?;
486 match outcome {
487 ModerationOutcome::Allow => {}
488 ModerationOutcome::Reject { categories } => {
489 // Reject fails the write — the row never persists.
490 return Err(RedDBError::Query(format!(
491 "write rejected by moderation gate on collection '{collection}': \
492 flagged categories [{}]",
493 categories.join(", ")
494 )));
495 }
496 ModerationOutcome::ProviderDown { reason } => match policy.degraded_mode {
497 // Fail-closed: provider-down blocks the write.
498 ModerateDegradedMode::Closed => {
499 return Err(RedDBError::Query(format!(
500 "write blocked: moderation provider unavailable for collection \
501 '{collection}' (degraded = closed): {reason}"
502 )));
503 }
504 // Fail-open default: quarantine the row. It commits but
505 // is hidden from normal reads and re-moderated async.
506 ModerateDegradedMode::Open => {
507 set_row_moderation_marker(row, MODERATION_STATUS_PENDING);
508 }
509 },
510 }
511 }
512 Ok(())
513 }
514
515 /// Re-moderate one quarantine-pending row (CDC lane). A pass clears the
516 /// pending marker (the row becomes visible); a reject either tombstones
517 /// the row (default — hidden, retained for audit/appeal) or hard-deletes
518 /// it when the policy opts in via `hard_delete`. Provider-down here is a
519 /// retryable failure: the row stays pending and the consumer's
520 /// retry/dead-letter machinery handles it like any other failure.
521 pub(crate) fn remoderate_pending_row(
522 &self,
523 collection: &str,
524 entity_id: u64,
525 policy: &ModeratePolicy,
526 ) -> RedDBResult<()> {
527 let db = self.db();
528 let Some(entity) = db
529 .store()
530 .get_table_row_by_logical_id(collection, EntityId::new(entity_id))
531 else {
532 return Ok(());
533 };
534
535 // Only act on rows still in the pending state. A row that was
536 // already cleared or tombstoned between enqueue and drain needs no
537 // further work.
538 if !matches!(
539 row_text_field(&entity.data, MODERATION_STATUS_FIELD).as_deref(),
540 Some(MODERATION_STATUS_PENDING)
541 ) {
542 return Ok(());
543 }
544
545 let text = combine_moderate_text_named(&entity.data, &policy.fields);
546 if text.is_empty() {
547 // Nothing to screen — clear the quarantine so the row surfaces.
548 return self.clear_row_moderation_marker(collection, entity.id);
549 }
550
551 let outcome = crate::runtime::ai::moderation::moderate_local(&policy.model, text)?;
552 match outcome {
553 ModerationOutcome::Allow => self.clear_row_moderation_marker(collection, entity.id),
554 ModerationOutcome::Reject { .. } => {
555 if policy.hard_delete_on_reject {
556 self.delete_entity(DeleteEntityInput {
557 collection: collection.to_string(),
558 id: entity.id,
559 })?;
560 Ok(())
561 } else {
562 self.set_row_moderation_status(
563 collection,
564 entity.id,
565 MODERATION_STATUS_REJECTED,
566 )
567 }
568 }
569 // Provider still down — surface as a retryable error so the
570 // row stays pending and is retried/dead-lettered.
571 ModerationOutcome::ProviderDown { reason } => Err(RedDBError::Query(format!(
572 "re-moderation provider unavailable for collection '{collection}': {reason}"
573 ))),
574 }
575 }
576
577 /// Clear the moderation marker so a previously-quarantined row becomes
578 /// visible to normal reads. Patches the field to an empty string and
579 /// relies on the visibility helper treating only a present text marker
580 /// as hidden — so an empty marker is, by design, still hidden. To make
581 /// the row visible we instead remove the field via a `fields` patch
582 /// that sets it to JSON null, which the storage merge drops.
583 fn clear_row_moderation_marker(&self, collection: &str, id: EntityId) -> RedDBResult<()> {
584 self.patch_entity(PatchEntityInput {
585 collection: collection.to_string(),
586 id,
587 payload: moderation_marker_clear_payload(),
588 operations: Vec::new(),
589 })?;
590 Ok(())
591 }
592
593 /// Stamp the row with a moderation status (`pending`/`rejected`).
594 fn set_row_moderation_status(
595 &self,
596 collection: &str,
597 id: EntityId,
598 status: &str,
599 ) -> RedDBResult<()> {
600 self.patch_entity(PatchEntityInput {
601 collection: collection.to_string(),
602 id,
603 payload: moderation_marker_set_payload(status),
604 operations: Vec::new(),
605 })?;
606 Ok(())
607 }
608
609 /// Compute the embedding for one committed row and attach it as a vector
610 /// in the same collection. Reuses the existing embedding + vector
611 /// storage path so `VECTOR SEARCH` surfaces the row exactly as a manual
612 /// `WITH AUTO EMBED` insert would.
613 ///
614 /// A row whose declared fields are all empty is treated as complete (no
615 /// vector attached) rather than failed — there is nothing to embed.
616 pub(crate) fn enrich_row_embedding(
617 &self,
618 collection: &str,
619 entity_id: u64,
620 policy: &EmbedPolicy,
621 ) -> RedDBResult<()> {
622 let db = self.db();
623 // The CDC event carries the row's stable *logical* id; resolve the
624 // live version through it so an update re-embeds the new field values
625 // rather than a superseded MVCC version. A `None` here means the
626 // event was not a live table row (e.g. the enrichment vector's own
627 // insert event, or a deleted row) — nothing to enrich.
628 let Some(entity) = db
629 .store()
630 .get_table_row_by_logical_id(collection, EntityId::new(entity_id))
631 else {
632 return Ok(());
633 };
634
635 let Some(text) = combine_embed_text(&entity.data, &policy.fields) else {
636 return Ok(());
637 };
638
639 let dense = embed_one(self, policy, &text)?;
640 if dense.is_empty() {
641 return Ok(());
642 }
643
644 self.create_vector(CreateVectorInput {
645 collection: collection.to_string(),
646 dense,
647 content: Some(text),
648 metadata: Vec::new(),
649 link_row: None,
650 link_node: None,
651 })?;
652 Ok(())
653 }
654
655 /// Run computer vision over one committed row: fetch the image
656 /// referenced by the policy's `image_field`, call the vision provider,
657 /// write the structured component-detections to the derived
658 /// [`VISION_DETECTIONS_FIELD`] (RQL-filterable), and — when the policy
659 /// requests it — attach an image-embedding vector reusing the existing
660 /// vector pipeline.
661 ///
662 /// A row whose image reference is absent/empty is treated as complete
663 /// (nothing to analyze) rather than failed.
664 pub(crate) fn enrich_row_vision(
665 &self,
666 collection: &str,
667 entity_id: u64,
668 policy: &VisionPolicy,
669 ) -> RedDBResult<()> {
670 // This slice drives the in-process `local` provider (the path the
671 // mock vision backend exercises); other providers are rejected with
672 // a deterministic error that the retry/dead-letter machinery
673 // handles like any failure.
674 match crate::ai::parse_provider(&policy.provider)? {
675 crate::ai::AiProvider::Local => {}
676 other => {
677 return Err(RedDBError::Query(format!(
678 "CDC vision enrichment currently drives the 'local' provider; \
679 collection policy declares '{other:?}'"
680 )));
681 }
682 }
683
684 let db = self.db();
685 // Resolve the live row through its stable logical id (see
686 // `enrich_row_embedding`). `None` means the event was not a live
687 // table row — nothing to enrich.
688 let Some(entity) = db
689 .store()
690 .get_table_row_by_logical_id(collection, EntityId::new(entity_id))
691 else {
692 return Ok(());
693 };
694
695 let Some(reference) = row_text_field(&entity.data, &policy.image_field) else {
696 return Ok(());
697 };
698 if reference.is_empty() {
699 return Ok(());
700 }
701
702 let want_detections = vision_wants_detections(policy);
703 let want_embedding = vision_wants_embedding(policy);
704 if !want_detections && !want_embedding {
705 return Ok(());
706 }
707
708 let image_bytes = crate::runtime::ai::vision::fetch_image_bytes(&reference)?;
709 let result = crate::runtime::ai::vision::analyze_local(
710 &policy.model,
711 image_bytes,
712 want_detections,
713 want_embedding,
714 )?;
715
716 if want_detections {
717 // Write the canonical detections array as a JSON row field. The
718 // damage vector for this update covers only the derived field,
719 // never `image_field`, so it cannot re-trigger vision.
720 let detections_json = detections_to_json(&result.detections);
721 self.patch_entity(PatchEntityInput {
722 collection: collection.to_string(),
723 id: entity.id,
724 payload: vision_detections_payload(detections_json),
725 operations: Vec::new(),
726 })?;
727 }
728
729 if want_embedding {
730 if let Some(embedding) = result.embedding {
731 if !embedding.is_empty() {
732 self.create_vector(CreateVectorInput {
733 collection: collection.to_string(),
734 dense: embedding,
735 content: Some(reference),
736 metadata: Vec::new(),
737 link_row: None,
738 link_node: None,
739 })?;
740 }
741 }
742 }
743
744 Ok(())
745 }
746}
747
748/// Read a row's text-valued field as an owned string. Returns `None` when
749/// the entity is not a row, the field is absent, or it is not text.
750fn row_text_field(data: &EntityData, field: &str) -> Option<String> {
751 let EntityData::Row(row) = data else {
752 return None;
753 };
754 let named = row.named.as_ref()?;
755 match named.get(field) {
756 Some(Value::Text(text)) => Some(text.to_string()),
757 Some(Value::Url(url)) => Some(url.clone()),
758 _ => None,
759 }
760}
761
762/// Encode the detections as a JSON array value
763/// (`[{label, confidence, bbox:[x,y,w,h]}]`).
764fn detections_to_json(
765 detections: &[crate::runtime::ai::vision::VisionDetection],
766) -> crate::serde_json::Value {
767 use crate::serde_json::{Map, Value as Sj};
768 let items = detections
769 .iter()
770 .map(|d| {
771 let mut obj = Map::new();
772 obj.insert("label".to_string(), Sj::String(d.label.clone()));
773 obj.insert("confidence".to_string(), Sj::Number(d.confidence as f64));
774 obj.insert(
775 "bbox".to_string(),
776 Sj::Array(d.bbox.iter().map(|v| Sj::Number(*v as f64)).collect()),
777 );
778 Sj::Object(obj)
779 })
780 .collect();
781 Sj::Array(items)
782}
783
784/// Build the JSON-patch payload that sets the derived detections field via
785/// `patch_entity`'s `fields` merge form.
786fn vision_detections_payload(
787 detections_json: crate::serde_json::Value,
788) -> crate::serde_json::Value {
789 use crate::serde_json::{Map, Value as Sj};
790 let mut fields = Map::new();
791 fields.insert(VISION_DETECTIONS_FIELD.to_string(), detections_json);
792 let mut root = Map::new();
793 root.insert("fields".to_string(), Sj::Object(fields));
794 Sj::Object(root)
795}
796
797/// Concatenate the declared moderated fields' text from a pre-commit
798/// `MutationRow`'s field list. Only text-valued declared fields contribute;
799/// the result is empty when there is nothing to screen.
800fn combine_moderate_text(fields: &[(String, Value)], declared: &[String]) -> String {
801 let mut parts: Vec<&str> = Vec::new();
802 for field in declared {
803 for (name, value) in fields {
804 if name == field {
805 if let Value::Text(text) = value {
806 if !text.is_empty() {
807 parts.push(text);
808 }
809 }
810 }
811 }
812 }
813 parts.join(" ")
814}
815
816/// Concatenate the declared moderated fields' text from a committed row's
817/// `EntityData`. Empty when the entity is not a row or no declared field
818/// holds non-empty text.
819fn combine_moderate_text_named(data: &EntityData, declared: &[String]) -> String {
820 let EntityData::Row(row) = data else {
821 return String::new();
822 };
823 let Some(named) = row.named.as_ref() else {
824 return String::new();
825 };
826 let parts: Vec<String> = declared
827 .iter()
828 .filter_map(|field| match named.get(field) {
829 Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
830 _ => None,
831 })
832 .collect();
833 parts.join(" ")
834}
835
836/// Stamp the reserved moderation marker onto a pre-commit row's field list,
837/// replacing any prior value for the field.
838fn set_row_moderation_marker(row: &mut MutationRow, status: &str) {
839 row.fields
840 .retain(|(name, _)| name != MODERATION_STATUS_FIELD);
841 row.fields.push((
842 MODERATION_STATUS_FIELD.to_string(),
843 Value::Text(std::sync::Arc::from(status)),
844 ));
845}
846
847/// `fields`-merge patch payload that sets the moderation marker to `status`.
848fn moderation_marker_set_payload(status: &str) -> crate::serde_json::Value {
849 use crate::serde_json::{Map, Value as Sj};
850 let mut fields = Map::new();
851 fields.insert(
852 MODERATION_STATUS_FIELD.to_string(),
853 Sj::String(status.to_string()),
854 );
855 let mut root = Map::new();
856 root.insert("fields".to_string(), Sj::Object(fields));
857 Sj::Object(root)
858}
859
860/// `fields`-merge patch payload that clears the moderation marker (sets it
861/// to the empty string). The storage merge has no field-removal form, so
862/// the cleared row keeps an empty marker — which the visibility helper
863/// treats as visible, exactly like an absent marker.
864fn moderation_marker_clear_payload() -> crate::serde_json::Value {
865 moderation_marker_set_payload("")
866}
867
868/// Join the declared embed fields' text values, mirroring the manual
869/// `WITH AUTO EMBED` collector. Returns `None` when no non-empty text field
870/// is present (e.g. the entity is a vector/non-row, or all fields are empty).
871fn combine_embed_text(data: &EntityData, fields: &[String]) -> Option<String> {
872 let EntityData::Row(row) = data else {
873 return None;
874 };
875 let named = row.named.as_ref()?;
876 let texts: Vec<String> = fields
877 .iter()
878 .filter_map(|field| match named.get(field) {
879 Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
880 _ => None,
881 })
882 .collect();
883 if texts.is_empty() {
884 None
885 } else {
886 Some(texts.join(" "))
887 }
888}
889
890/// Dispatch a single embedding through the policy's provider. This slice
891/// drives the in-process `local` backend (the path the issue's mock
892/// provider exercises); other providers are rejected with a deterministic
893/// error that the retry/dead-letter machinery handles like any failure.
894fn embed_one(rt: &RedDBRuntime, policy: &EmbedPolicy, text: &str) -> RedDBResult<Vec<f32>> {
895 let provider = crate::ai::parse_provider(&policy.provider)?;
896 match provider {
897 crate::ai::AiProvider::Local => {
898 let db = rt.db();
899 let response = crate::runtime::ai::local_embedding::embed_local_with_db(
900 &db,
901 &policy.model,
902 vec![text.to_string()],
903 )?;
904 response.embeddings.into_iter().next().ok_or_else(|| {
905 RedDBError::Query("local embedding backend returned no vector".to_string())
906 })
907 }
908 other => Err(RedDBError::Query(format!(
909 "CDC enrichment currently drives the 'local' provider; \
910 collection policy declares '{other:?}'"
911 ))),
912 }
913}