Skip to main content

reddb_server/runtime/ai/
moderation.rs

1//! Local content-moderation backend (#1274, PRD #1267, ADR 0057).
2//!
3//! Moderation is the third AI modality on the write/CDC lane (after
4//! embeddings #1272 and vision #1275). A collection that declares a
5//! `MODERATE (...)` policy names text/image-reference fields that are
6//! screened by a moderation provider. Unlike embed/vision, moderation has
7//! a **synchronous pre-commit gate**: when `sync = true`, the write path
8//! screens the declared fields *before the durable commit* and refuses the
9//! write on a reject (ADR 0057). Provider-down behaviour and re-moderation
10//! of quarantined rows ride the existing CDC enrichment lane.
11//!
12//! Mirroring [`super::vision`] and [`super::local_embedding`], the engine
13//! is a swappable, process-global [`LocalModerationBackend`]. There is no
14//! built-in default — a real engine or a mock is installed via
15//! [`install_local_moderation_backend`]. A backend may also report itself
16//! *down* (returning [`ModerationOutcome::ProviderDown`]) so tests can
17//! exercise the fail-open-quarantine and fail-closed paths deterministically.
18
19use std::sync::{Arc, OnceLock, RwLock};
20
21use crate::storage::schema::Value;
22use crate::storage::unified::entity::{EntityData, UnifiedEntity};
23use crate::{RedDBError, RedDBResult};
24
25/// Reserved row field that carries a row's moderation visibility state.
26///
27/// A row that committed but is hidden from normal reads carries this
28/// field; a fully-cleared (allowed) row does not. Two values are used:
29///   * [`MODERATION_STATUS_PENDING`] — quarantine-pending (provider was
30///     down at write time under fail-open; awaiting async re-moderation),
31///   * [`MODERATION_STATUS_REJECTED`] — re-moderated to a reject; the row
32///     is tombstoned-and-retained for audit/appeal.
33///
34/// It is a reserved system field (see `crate::reserved_fields`) so users
35/// cannot declare or set it, and the read-path visibility helpers hide any
36/// row that carries it from normal SELECT/scan reads (ADR 0057).
37pub const MODERATION_STATUS_FIELD: &str = "__moderation_status";
38
39/// Value of [`MODERATION_STATUS_FIELD`] for a quarantine-pending row.
40pub const MODERATION_STATUS_PENDING: &str = "pending";
41
42/// Value of [`MODERATION_STATUS_FIELD`] for a rejected-tombstone row.
43pub const MODERATION_STATUS_REJECTED: &str = "rejected";
44
45/// Whether `entity` carries a moderation status that must hide it from
46/// normal reads (quarantine-pending or rejected-tombstone).
47///
48/// This is consulted on the hot read path for *every* table-row candidate,
49/// so it is a single, allocation-free field probe. Non-row entities and
50/// rows without the marker are never hidden by moderation.
51#[inline]
52pub fn entity_moderation_hidden(entity: &UnifiedEntity) -> bool {
53    let EntityData::Row(row) = &entity.data else {
54        return false;
55    };
56    // Hidden only for the two active hidden states. A cleared (allowed)
57    // row carries an empty/absent marker and stays visible; this keeps the
58    // clear path a simple field overwrite (the storage merge has no
59    // field-removal form) without leaking a stale "hidden" decision.
60    matches!(
61        row.get_field(MODERATION_STATUS_FIELD),
62        Some(Value::Text(status))
63            if status.as_ref() == MODERATION_STATUS_PENDING
64                || status.as_ref() == MODERATION_STATUS_REJECTED
65    )
66}
67
68/// Verdict for one moderation pass over a row's declared fields.
69#[derive(Debug, Clone, PartialEq, Eq)]
70pub enum ModerationOutcome {
71    /// Content passed — the write/row is allowed to be visible.
72    Allow,
73    /// Content failed moderation — the write is rejected. `categories`
74    /// carries the flagged reasons for audit/appeal.
75    Reject { categories: Vec<String> },
76    /// The provider could not be reached. The caller applies the policy's
77    /// degraded-mode behaviour (fail-open + quarantine, or fail-closed).
78    ProviderDown { reason: String },
79}
80
81impl ModerationOutcome {
82    /// True when the content was rejected by moderation.
83    pub fn is_reject(&self) -> bool {
84        matches!(self, Self::Reject { .. })
85    }
86
87    /// True when the provider was unreachable.
88    pub fn is_provider_down(&self) -> bool {
89        matches!(self, Self::ProviderDown { .. })
90    }
91}
92
93/// A materialised moderation request handed to a backend. The declared
94/// text fields are concatenated by the caller into `text`; an empty
95/// `text` means there was nothing to screen.
96#[derive(Debug, Clone)]
97pub struct ModerationRequest {
98    /// Model name as written in the collection's MODERATE policy.
99    pub model: String,
100    /// The concatenated text of the row's declared moderated fields.
101    pub text: String,
102}
103
104/// Backend abstraction so the gate/enrichment lanes do not depend on a
105/// specific moderation engine. Tests install a mock; production installs a
106/// real engine via [`install_local_moderation_backend`].
107pub trait LocalModerationBackend: Send + Sync {
108    fn moderate(&self, request: &ModerationRequest) -> RedDBResult<ModerationOutcome>;
109}
110
111const LOCAL_MODERATION_DISABLED_MESSAGE: &str =
112    "local moderation requires a backend installed via \
113     runtime::ai::moderation::install_local_moderation_backend. Alternatively, \
114     declare a moderation-capable remote provider in the collection's MODERATE \
115     policy.";
116
117type BackendSlot = Arc<dyn LocalModerationBackend>;
118
119fn backend_slot() -> &'static RwLock<Option<BackendSlot>> {
120    static SLOT: OnceLock<RwLock<Option<BackendSlot>>> = OnceLock::new();
121    SLOT.get_or_init(|| RwLock::new(None))
122}
123
124/// Install (or replace) the process-global local moderation backend.
125///
126/// Production servers call this once at boot with their real engine. Tests
127/// use it to swap in a mock moderation provider. Safe to call from any
128/// thread; the most recent install wins.
129pub fn install_local_moderation_backend(backend: Arc<dyn LocalModerationBackend>) {
130    let mut guard = backend_slot()
131        .write()
132        .expect("moderation backend slot poisoned");
133    *guard = Some(backend);
134}
135
136/// Test-only: clear the installed backend so a subsequent call exercises
137/// the feature-disabled path again.
138#[doc(hidden)]
139pub fn clear_local_moderation_backend_for_tests() {
140    let mut guard = backend_slot()
141        .write()
142        .expect("moderation backend slot poisoned");
143    *guard = None;
144}
145
146fn current_backend() -> Option<BackendSlot> {
147    backend_slot()
148        .read()
149        .expect("moderation backend slot poisoned")
150        .as_ref()
151        .map(Arc::clone)
152}
153
154/// Resolve and run a local moderation request end-to-end. Errors with a
155/// clear message when no backend is installed; a *down provider* is NOT an
156/// error — the backend signals it via [`ModerationOutcome::ProviderDown`]
157/// so the caller can apply the policy's degraded-mode behaviour.
158pub fn moderate_local(model: &str, text: String) -> RedDBResult<ModerationOutcome> {
159    let backend = current_backend().ok_or_else(|| {
160        RedDBError::FeatureNotEnabled(LOCAL_MODERATION_DISABLED_MESSAGE.to_string())
161    })?;
162    backend.moderate(&ModerationRequest {
163        model: model.to_string(),
164        text,
165    })
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171
172    struct AllowAll;
173    impl LocalModerationBackend for AllowAll {
174        fn moderate(&self, _request: &ModerationRequest) -> RedDBResult<ModerationOutcome> {
175            Ok(ModerationOutcome::Allow)
176        }
177    }
178
179    #[test]
180    fn missing_backend_is_feature_disabled_error() {
181        clear_local_moderation_backend_for_tests();
182        let err = moderate_local("m", "hello".to_string()).expect_err("no backend");
183        assert!(matches!(err, RedDBError::FeatureNotEnabled(_)));
184    }
185
186    #[test]
187    fn installed_backend_drives_outcome() {
188        install_local_moderation_backend(Arc::new(AllowAll));
189        let outcome = moderate_local("m", "hello".to_string()).expect("allowed");
190        assert_eq!(outcome, ModerationOutcome::Allow);
191        clear_local_moderation_backend_for_tests();
192    }
193
194    #[test]
195    fn outcome_predicates() {
196        assert!(ModerationOutcome::Reject {
197            categories: vec!["hate".to_string()],
198        }
199        .is_reject());
200        assert!(ModerationOutcome::ProviderDown {
201            reason: "timeout".to_string(),
202        }
203        .is_provider_down());
204        assert!(!ModerationOutcome::Allow.is_reject());
205    }
206}