use crate::application::entity::{CreateVectorInput, DeleteEntityInput, PatchEntityInput};
use crate::application::ports::RuntimeEntityPort;
use crate::catalog::{EmbedPolicy, ModerateDegradedMode, ModeratePolicy, VisionPolicy};
use crate::replication::cdc::ChangeOperation;
use crate::runtime::ai::moderation::{
ModerationOutcome, MODERATION_STATUS_FIELD, MODERATION_STATUS_PENDING,
MODERATION_STATUS_REJECTED,
};
use crate::runtime::mutation::MutationRow;
use crate::storage::schema::Value;
use crate::storage::{EntityData, EntityId};
use crate::{RedDBError, RedDBResult, RedDBRuntime};
pub const VISION_DETECTIONS_FIELD: &str = "vision_detections";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EnrichmentKind {
Embed,
Vision,
Moderate,
}
#[derive(Debug, Clone)]
pub struct EnrichmentConfig {
pub max_attempts: u32,
pub base_backoff_ms: u64,
pub poll_max: usize,
}
impl Default for EnrichmentConfig {
fn default() -> Self {
Self {
max_attempts: 3,
base_backoff_ms: 100,
poll_max: 1024,
}
}
}
#[derive(Debug, Clone)]
struct PendingWork {
collection: String,
entity_id: u64,
kind: EnrichmentKind,
attempts: u32,
not_before_ms: u64,
}
#[derive(Debug, Clone)]
pub struct DeadLetter {
pub collection: String,
pub entity_id: u64,
pub kind: EnrichmentKind,
pub attempts: u32,
pub last_error: String,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct TickStats {
pub ingested: usize,
pub attached: usize,
pub retried: usize,
pub dead_lettered: usize,
}
pub struct CdcEnrichmentConsumer {
cursor: u64,
config: EnrichmentConfig,
pending: Vec<PendingWork>,
dead_letters: Vec<DeadLetter>,
}
impl CdcEnrichmentConsumer {
pub fn new(config: EnrichmentConfig) -> Self {
Self {
cursor: 0,
config,
pending: Vec::new(),
dead_letters: Vec::new(),
}
}
pub fn with_defaults() -> Self {
Self::new(EnrichmentConfig::default())
}
pub fn cursor(&self) -> u64 {
self.cursor
}
pub fn pending_len(&self) -> usize {
self.pending.len()
}
pub fn is_pending(&self, collection: &str, entity_id: u64) -> bool {
self.pending
.iter()
.any(|w| w.collection == collection && w.entity_id == entity_id)
}
pub fn is_pending_kind(&self, collection: &str, entity_id: u64, kind: EnrichmentKind) -> bool {
self.pending
.iter()
.any(|w| w.collection == collection && w.entity_id == entity_id && w.kind == kind)
}
pub fn dead_letters(&self) -> &[DeadLetter] {
&self.dead_letters
}
pub fn redrive(&mut self) -> usize {
let drained: Vec<DeadLetter> = self.dead_letters.drain(..).collect();
let count = drained.len();
for dl in drained {
self.enqueue(dl.collection, dl.entity_id, dl.kind);
}
count
}
pub fn tick(&mut self, rt: &RedDBRuntime, now_ms: u64) -> RedDBResult<TickStats> {
let mut stats = TickStats::default();
let events = rt.cdc_poll(self.cursor, self.config.poll_max);
for event in &events {
if event.lsn > self.cursor {
self.cursor = event.lsn;
}
if let Some(policy) = rt.collection_embed_policy(&event.collection) {
if change_touches_embed_fields(event, &policy)
&& self.enqueue(
event.collection.clone(),
event.entity_id,
EnrichmentKind::Embed,
)
{
stats.ingested += 1;
}
}
if let Some(policy) = rt.collection_vision_policy(&event.collection) {
if change_touches_vision_field(event, &policy)
&& self.enqueue(
event.collection.clone(),
event.entity_id,
EnrichmentKind::Vision,
)
{
stats.ingested += 1;
}
}
if let Some(policy) = rt.collection_moderate_policy(&event.collection) {
if change_touches_moderate_fields(event, &policy)
&& rt.row_is_moderation_pending(&event.collection, event.entity_id)
&& self.enqueue(
event.collection.clone(),
event.entity_id,
EnrichmentKind::Moderate,
)
{
stats.ingested += 1;
}
}
}
let drained: Vec<PendingWork> = std::mem::take(&mut self.pending);
let mut still_pending = Vec::with_capacity(drained.len());
for mut work in drained {
if work.not_before_ms > now_ms {
still_pending.push(work);
continue;
}
let attempt = match work.kind {
EnrichmentKind::Embed => match rt.collection_embed_policy(&work.collection) {
Some(policy) => {
rt.enrich_row_embedding(&work.collection, work.entity_id, &policy)
}
None => continue,
},
EnrichmentKind::Vision => match rt.collection_vision_policy(&work.collection) {
Some(policy) => rt.enrich_row_vision(&work.collection, work.entity_id, &policy),
None => continue,
},
EnrichmentKind::Moderate => match rt.collection_moderate_policy(&work.collection) {
Some(policy) => {
rt.remoderate_pending_row(&work.collection, work.entity_id, &policy)
}
None => continue,
},
};
match attempt {
Ok(()) => stats.attached += 1,
Err(err) => {
work.attempts += 1;
if work.attempts >= self.config.max_attempts {
self.dead_letters.push(DeadLetter {
collection: work.collection,
entity_id: work.entity_id,
kind: work.kind,
attempts: work.attempts,
last_error: format!("{err:?}"),
});
stats.dead_lettered += 1;
} else {
let shift = work.attempts - 1;
let backoff = self
.config
.base_backoff_ms
.saturating_mul(1u64.checked_shl(shift).unwrap_or(u64::MAX));
work.not_before_ms = now_ms.saturating_add(backoff);
still_pending.push(work);
stats.retried += 1;
}
}
}
}
self.pending = still_pending;
Ok(stats)
}
fn enqueue(&mut self, collection: String, entity_id: u64, kind: EnrichmentKind) -> bool {
if self
.pending
.iter()
.any(|w| w.entity_id == entity_id && w.kind == kind && w.collection == collection)
{
return false;
}
self.pending.push(PendingWork {
collection,
entity_id,
kind,
attempts: 0,
not_before_ms: 0,
});
true
}
}
fn change_touches_embed_fields(
event: &crate::replication::cdc::ChangeEvent,
policy: &EmbedPolicy,
) -> bool {
match event.operation {
ChangeOperation::Insert => true,
ChangeOperation::Update => match &event.changed_columns {
Some(columns) => columns
.iter()
.any(|column| policy.fields.iter().any(|field| field == column)),
None => true,
},
ChangeOperation::Delete | ChangeOperation::Refresh => false,
}
}
fn change_touches_vision_field(
event: &crate::replication::cdc::ChangeEvent,
policy: &VisionPolicy,
) -> bool {
match event.operation {
ChangeOperation::Insert => true,
ChangeOperation::Update => match &event.changed_columns {
Some(columns) => columns.iter().any(|column| column == &policy.image_field),
None => true,
},
ChangeOperation::Delete | ChangeOperation::Refresh => false,
}
}
fn change_touches_moderate_fields(
event: &crate::replication::cdc::ChangeEvent,
policy: &ModeratePolicy,
) -> bool {
match event.operation {
ChangeOperation::Insert => true,
ChangeOperation::Update => match &event.changed_columns {
Some(columns) => columns
.iter()
.any(|column| policy.fields.iter().any(|field| field == column)),
None => true,
},
ChangeOperation::Delete | ChangeOperation::Refresh => false,
}
}
fn vision_wants_detections(policy: &VisionPolicy) -> bool {
policy.output_kinds.iter().any(|kind| {
matches!(
kind.trim().to_ascii_lowercase().as_str(),
"detections" | "objects" | "components" | "detection"
)
})
}
fn vision_wants_embedding(policy: &VisionPolicy) -> bool {
policy.output_kinds.iter().any(|kind| {
matches!(
kind.trim().to_ascii_lowercase().as_str(),
"embedding" | "image_embedding" | "image-embedding"
)
})
}
impl RedDBRuntime {
pub fn collection_embed_policy(&self, collection: &str) -> Option<EmbedPolicy> {
self.db()
.collection_contract_arc(collection)
.and_then(|contract| contract.ai_policy.as_ref().and_then(|p| p.embed.clone()))
}
pub fn collection_vision_policy(&self, collection: &str) -> Option<VisionPolicy> {
self.db()
.collection_contract_arc(collection)
.and_then(|contract| contract.ai_policy.as_ref().and_then(|p| p.vision.clone()))
}
pub fn collection_moderate_policy(&self, collection: &str) -> Option<ModeratePolicy> {
self.db()
.collection_contract_arc(collection)
.and_then(|contract| contract.ai_policy.as_ref().and_then(|p| p.moderate.clone()))
}
pub(crate) fn row_is_moderation_pending(&self, collection: &str, entity_id: u64) -> bool {
self.db()
.store()
.get_table_row_by_logical_id(collection, EntityId::new(entity_id))
.map(|entity| {
matches!(
row_text_field(&entity.data, MODERATION_STATUS_FIELD).as_deref(),
Some(MODERATION_STATUS_PENDING)
)
})
.unwrap_or(false)
}
pub(crate) fn apply_sync_moderation_gate(
&self,
collection: &str,
rows: &mut [MutationRow],
) -> RedDBResult<()> {
let Some(policy) = self.collection_moderate_policy(collection) else {
return Ok(());
};
if !policy.sync_gate || rows.is_empty() {
return Ok(());
}
for row in rows.iter_mut() {
let text = combine_moderate_text(&row.fields, &policy.fields);
if text.is_empty() {
continue;
}
let outcome =
crate::runtime::ai::moderation::moderate_local(&policy.model, text.clone())?;
match outcome {
ModerationOutcome::Allow => {}
ModerationOutcome::Reject { categories } => {
return Err(RedDBError::Query(format!(
"write rejected by moderation gate on collection '{collection}': \
flagged categories [{}]",
categories.join(", ")
)));
}
ModerationOutcome::ProviderDown { reason } => match policy.degraded_mode {
ModerateDegradedMode::Closed => {
return Err(RedDBError::Query(format!(
"write blocked: moderation provider unavailable for collection \
'{collection}' (degraded = closed): {reason}"
)));
}
ModerateDegradedMode::Open => {
set_row_moderation_marker(row, MODERATION_STATUS_PENDING);
}
},
}
}
Ok(())
}
pub(crate) fn remoderate_pending_row(
&self,
collection: &str,
entity_id: u64,
policy: &ModeratePolicy,
) -> RedDBResult<()> {
let db = self.db();
let Some(entity) = db
.store()
.get_table_row_by_logical_id(collection, EntityId::new(entity_id))
else {
return Ok(());
};
if !matches!(
row_text_field(&entity.data, MODERATION_STATUS_FIELD).as_deref(),
Some(MODERATION_STATUS_PENDING)
) {
return Ok(());
}
let text = combine_moderate_text_named(&entity.data, &policy.fields);
if text.is_empty() {
return self.clear_row_moderation_marker(collection, entity.id);
}
let outcome = crate::runtime::ai::moderation::moderate_local(&policy.model, text)?;
match outcome {
ModerationOutcome::Allow => self.clear_row_moderation_marker(collection, entity.id),
ModerationOutcome::Reject { .. } => {
if policy.hard_delete_on_reject {
self.delete_entity(DeleteEntityInput {
collection: collection.to_string(),
id: entity.id,
})?;
Ok(())
} else {
self.set_row_moderation_status(
collection,
entity.id,
MODERATION_STATUS_REJECTED,
)
}
}
ModerationOutcome::ProviderDown { reason } => Err(RedDBError::Query(format!(
"re-moderation provider unavailable for collection '{collection}': {reason}"
))),
}
}
fn clear_row_moderation_marker(&self, collection: &str, id: EntityId) -> RedDBResult<()> {
self.patch_entity(PatchEntityInput {
collection: collection.to_string(),
id,
payload: moderation_marker_clear_payload(),
operations: Vec::new(),
})?;
Ok(())
}
fn set_row_moderation_status(
&self,
collection: &str,
id: EntityId,
status: &str,
) -> RedDBResult<()> {
self.patch_entity(PatchEntityInput {
collection: collection.to_string(),
id,
payload: moderation_marker_set_payload(status),
operations: Vec::new(),
})?;
Ok(())
}
pub(crate) fn enrich_row_embedding(
&self,
collection: &str,
entity_id: u64,
policy: &EmbedPolicy,
) -> RedDBResult<()> {
let db = self.db();
let Some(entity) = db
.store()
.get_table_row_by_logical_id(collection, EntityId::new(entity_id))
else {
return Ok(());
};
let Some(text) = combine_embed_text(&entity.data, &policy.fields) else {
return Ok(());
};
let dense = embed_one(self, policy, &text)?;
if dense.is_empty() {
return Ok(());
}
self.create_vector(CreateVectorInput {
collection: collection.to_string(),
dense,
content: Some(text),
metadata: Vec::new(),
link_row: None,
link_node: None,
})?;
Ok(())
}
pub(crate) fn enrich_row_vision(
&self,
collection: &str,
entity_id: u64,
policy: &VisionPolicy,
) -> RedDBResult<()> {
match crate::ai::parse_provider(&policy.provider)? {
crate::ai::AiProvider::Local => {}
other => {
return Err(RedDBError::Query(format!(
"CDC vision enrichment currently drives the 'local' provider; \
collection policy declares '{other:?}'"
)));
}
}
let db = self.db();
let Some(entity) = db
.store()
.get_table_row_by_logical_id(collection, EntityId::new(entity_id))
else {
return Ok(());
};
let Some(reference) = row_text_field(&entity.data, &policy.image_field) else {
return Ok(());
};
if reference.is_empty() {
return Ok(());
}
let want_detections = vision_wants_detections(policy);
let want_embedding = vision_wants_embedding(policy);
if !want_detections && !want_embedding {
return Ok(());
}
let image_bytes = crate::runtime::ai::vision::fetch_image_bytes(&reference)?;
let result = crate::runtime::ai::vision::analyze_local(
&policy.model,
image_bytes,
want_detections,
want_embedding,
)?;
if want_detections {
let detections_json = detections_to_json(&result.detections);
self.patch_entity(PatchEntityInput {
collection: collection.to_string(),
id: entity.id,
payload: vision_detections_payload(detections_json),
operations: Vec::new(),
})?;
}
if want_embedding {
if let Some(embedding) = result.embedding {
if !embedding.is_empty() {
self.create_vector(CreateVectorInput {
collection: collection.to_string(),
dense: embedding,
content: Some(reference),
metadata: Vec::new(),
link_row: None,
link_node: None,
})?;
}
}
}
Ok(())
}
}
fn row_text_field(data: &EntityData, field: &str) -> Option<String> {
let EntityData::Row(row) = data else {
return None;
};
let named = row.named.as_ref()?;
match named.get(field) {
Some(Value::Text(text)) => Some(text.to_string()),
Some(Value::Url(url)) => Some(url.clone()),
_ => None,
}
}
fn detections_to_json(
detections: &[crate::runtime::ai::vision::VisionDetection],
) -> crate::serde_json::Value {
use crate::serde_json::{Map, Value as Sj};
let items = detections
.iter()
.map(|d| {
let mut obj = Map::new();
obj.insert("label".to_string(), Sj::String(d.label.clone()));
obj.insert("confidence".to_string(), Sj::Number(d.confidence as f64));
obj.insert(
"bbox".to_string(),
Sj::Array(d.bbox.iter().map(|v| Sj::Number(*v as f64)).collect()),
);
Sj::Object(obj)
})
.collect();
Sj::Array(items)
}
fn vision_detections_payload(
detections_json: crate::serde_json::Value,
) -> crate::serde_json::Value {
use crate::serde_json::{Map, Value as Sj};
let mut fields = Map::new();
fields.insert(VISION_DETECTIONS_FIELD.to_string(), detections_json);
let mut root = Map::new();
root.insert("fields".to_string(), Sj::Object(fields));
Sj::Object(root)
}
fn combine_moderate_text(fields: &[(String, Value)], declared: &[String]) -> String {
let mut parts: Vec<&str> = Vec::new();
for field in declared {
for (name, value) in fields {
if name == field {
if let Value::Text(text) = value {
if !text.is_empty() {
parts.push(text);
}
}
}
}
}
parts.join(" ")
}
fn combine_moderate_text_named(data: &EntityData, declared: &[String]) -> String {
let EntityData::Row(row) = data else {
return String::new();
};
let Some(named) = row.named.as_ref() else {
return String::new();
};
let parts: Vec<String> = declared
.iter()
.filter_map(|field| match named.get(field) {
Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
_ => None,
})
.collect();
parts.join(" ")
}
fn set_row_moderation_marker(row: &mut MutationRow, status: &str) {
row.fields
.retain(|(name, _)| name != MODERATION_STATUS_FIELD);
row.fields.push((
MODERATION_STATUS_FIELD.to_string(),
Value::Text(std::sync::Arc::from(status)),
));
}
fn moderation_marker_set_payload(status: &str) -> crate::serde_json::Value {
use crate::serde_json::{Map, Value as Sj};
let mut fields = Map::new();
fields.insert(
MODERATION_STATUS_FIELD.to_string(),
Sj::String(status.to_string()),
);
let mut root = Map::new();
root.insert("fields".to_string(), Sj::Object(fields));
Sj::Object(root)
}
fn moderation_marker_clear_payload() -> crate::serde_json::Value {
moderation_marker_set_payload("")
}
fn combine_embed_text(data: &EntityData, fields: &[String]) -> Option<String> {
let EntityData::Row(row) = data else {
return None;
};
let named = row.named.as_ref()?;
let texts: Vec<String> = fields
.iter()
.filter_map(|field| match named.get(field) {
Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
_ => None,
})
.collect();
if texts.is_empty() {
None
} else {
Some(texts.join(" "))
}
}
fn embed_one(rt: &RedDBRuntime, policy: &EmbedPolicy, text: &str) -> RedDBResult<Vec<f32>> {
let provider = crate::ai::parse_provider(&policy.provider)?;
match provider {
crate::ai::AiProvider::Local => {
let db = rt.db();
let response = crate::runtime::ai::local_embedding::embed_local_with_db(
&db,
&policy.model,
vec![text.to_string()],
)?;
response.embeddings.into_iter().next().ok_or_else(|| {
RedDBError::Query("local embedding backend returned no vector".to_string())
})
}
other => Err(RedDBError::Query(format!(
"CDC enrichment currently drives the 'local' provider; \
collection policy declares '{other:?}'"
))),
}
}