Skip to main content

reddb_server/runtime/ai/
vision.rs

1//! Local computer-vision backend + image fetch (#1275, PRD #1267).
2//!
3//! Vision is the second async AI modality wired over the CDC enrichment
4//! lane (the first being embeddings, #1272). A collection that declares a
5//! `VISION (...)` policy names an *image-reference field* whose value is a
6//! URL/URI. After commit, the CDC enrichment consumer fetches the
7//! referenced image and runs the policy's vision provider, producing:
8//!
9//!   * a structured **component-detections** array
10//!     (`[{label, confidence, bbox:[x,y,w,h]}]`) written to a derived
11//!     field that RQL can filter, and
12//!   * an optional **image-embedding** vector reusing the existing vector
13//!     pipeline for image similarity search.
14//!
15//! Mirroring [`super::local_embedding`], the engine is a swappable,
16//! process-global [`LocalVisionBackend`]. The default
17//! [`DeterministicFakeVisionBackend`] derives stable detections from
18//! `SHA-256(model || image-bytes)` so the end-to-end contract (fetch →
19//! analyze → attach) can be exercised without a real model. Tests install
20//! their own mock provider via [`install_local_vision_backend`]; a real
21//! candle/onnx engine slots in the same way at server boot.
22
23use std::sync::{Arc, OnceLock, RwLock};
24use std::time::Duration;
25
26use crate::crypto::sha256::Sha256;
27use crate::{RedDBError, RedDBResult};
28
29/// One structured component detection: a labelled, scored bounding box.
30/// `bbox` is `[x, y, w, h]` in image-relative units, matching the
31/// canonical output shape recorded in the issue.
32#[derive(Debug, Clone, PartialEq)]
33pub struct VisionDetection {
34    pub label: String,
35    pub confidence: f32,
36    pub bbox: [f32; 4],
37}
38
39/// Output of a single vision analysis pass.
40#[derive(Debug, Clone, Default, PartialEq)]
41pub struct VisionResult {
42    /// Structured component detections (present when detections were
43    /// requested by the policy).
44    pub detections: Vec<VisionDetection>,
45    /// Image embedding (present only when the policy requested an
46    /// image-embedding output kind).
47    pub embedding: Option<Vec<f32>>,
48}
49
50/// A materialised vision request handed to a backend.
51#[derive(Debug, Clone)]
52pub struct VisionRequest {
53    /// Model name as written in the collection's VISION policy.
54    pub model: String,
55    /// Fetched image bytes (already resolved from the row's reference).
56    pub image_bytes: Vec<u8>,
57    /// Whether the policy asked for structured detections.
58    pub want_detections: bool,
59    /// Whether the policy asked for an image-embedding output.
60    pub want_embedding: bool,
61}
62
63/// Backend abstraction so the enrichment lane does not depend on a
64/// specific vision engine. Tests install a mock; production installs a
65/// real engine via [`install_local_vision_backend`].
66pub trait LocalVisionBackend: Send + Sync {
67    fn analyze(&self, request: &VisionRequest) -> RedDBResult<VisionResult>;
68}
69
70const LOCAL_VISION_DISABLED_MESSAGE: &str =
71    "local vision requires the `local-models` feature flag at engine build time, \
72     or a backend installed via \
73     runtime::ai::vision::install_local_vision_backend. Alternatively, declare a \
74     vision-capable remote provider in the collection's VISION policy.";
75
76/// Width (in f32 lanes) of the deterministic fake image embedding.
77const FAKE_EMBEDDING_DIM: usize = 16;
78
79/// Deterministic, dependency-free vision backend used to prove the
80/// fetch → analyze → attach contract end-to-end. Output is a pure
81/// function of `(model, image-bytes)` — no I/O, no clocks, no RNGs — so
82/// tests get byte-identical results across runs.
83#[derive(Debug, Default, Clone, Copy)]
84pub struct DeterministicFakeVisionBackend;
85
86impl LocalVisionBackend for DeterministicFakeVisionBackend {
87    fn analyze(&self, request: &VisionRequest) -> RedDBResult<VisionResult> {
88        let digest = {
89            let mut hasher = Sha256::new();
90            hasher.update(request.model.as_bytes());
91            hasher.update(&[0u8]);
92            hasher.update(&request.image_bytes);
93            hasher.finalize()
94        };
95
96        let detections = if request.want_detections {
97            // One detection per non-overlapping label, chosen
98            // deterministically from a small fixed vocabulary by the
99            // digest. Two distinct, stable detections so containment
100            // filters have something to match.
101            const VOCAB: [&str; 4] = ["person", "car", "dog", "bicycle"];
102            let pick = |byte: u8| VOCAB[(byte as usize) % VOCAB.len()].to_string();
103            let conf = |byte: u8| (byte as f32) / 255.0;
104            let coord = |byte: u8| (byte as f32) / 255.0;
105            vec![
106                VisionDetection {
107                    label: pick(digest[0]),
108                    confidence: conf(digest[1]),
109                    bbox: [
110                        coord(digest[2]),
111                        coord(digest[3]),
112                        coord(digest[4]),
113                        coord(digest[5]),
114                    ],
115                },
116                VisionDetection {
117                    label: pick(digest[6]),
118                    confidence: conf(digest[7]),
119                    bbox: [
120                        coord(digest[8]),
121                        coord(digest[9]),
122                        coord(digest[10]),
123                        coord(digest[11]),
124                    ],
125                },
126            ]
127        } else {
128            Vec::new()
129        };
130
131        let embedding = if request.want_embedding {
132            let mut out = Vec::with_capacity(FAKE_EMBEDDING_DIM);
133            let mut counter: u32 = 0;
134            while out.len() < FAKE_EMBEDDING_DIM {
135                let mut hasher = Sha256::new();
136                hasher.update(&digest);
137                hasher.update(&counter.to_le_bytes());
138                let chunk_digest = hasher.finalize();
139                for chunk in chunk_digest.chunks(4) {
140                    if out.len() >= FAKE_EMBEDDING_DIM {
141                        break;
142                    }
143                    let mut bytes = [0u8; 4];
144                    bytes.copy_from_slice(chunk);
145                    let raw = u32::from_le_bytes(bytes) as f32 / u32::MAX as f32;
146                    out.push(raw * 2.0 - 1.0);
147                }
148                counter = counter.wrapping_add(1);
149            }
150            Some(out)
151        } else {
152            None
153        };
154
155        Ok(VisionResult {
156            detections,
157            embedding,
158        })
159    }
160}
161
162type BackendSlot = Arc<dyn LocalVisionBackend>;
163
164fn backend_slot() -> &'static RwLock<Option<BackendSlot>> {
165    static SLOT: OnceLock<RwLock<Option<BackendSlot>>> = OnceLock::new();
166    SLOT.get_or_init(|| RwLock::new(None))
167}
168
169/// Install (or replace) the process-global local vision backend.
170///
171/// Production servers built with `--features local-models` call this once
172/// at boot with their real engine. Tests use it to swap in a mock vision
173/// provider. Safe to call from any thread; the most recent install wins.
174pub fn install_local_vision_backend(backend: Arc<dyn LocalVisionBackend>) {
175    let mut guard = backend_slot()
176        .write()
177        .expect("vision backend slot poisoned");
178    *guard = Some(backend);
179}
180
181/// Test-only: clear the installed backend so a subsequent call exercises
182/// the feature-disabled path again.
183#[doc(hidden)]
184pub fn clear_local_vision_backend_for_tests() {
185    let mut guard = backend_slot()
186        .write()
187        .expect("vision backend slot poisoned");
188    *guard = None;
189}
190
191fn current_backend() -> Option<BackendSlot> {
192    backend_slot()
193        .read()
194        .expect("vision backend slot poisoned")
195        .as_ref()
196        .map(Arc::clone)
197}
198
199/// Resolve and run a local vision request end-to-end. Falls back to the
200/// deterministic fake when the `local-models` feature is on but no engine
201/// was installed; errors with a clear message when neither is available.
202pub fn analyze_local(
203    model: &str,
204    image_bytes: Vec<u8>,
205    want_detections: bool,
206    want_embedding: bool,
207) -> RedDBResult<VisionResult> {
208    let backend = match current_backend() {
209        Some(b) => b,
210        None => {
211            if cfg!(feature = "local-models") {
212                let fake: Arc<dyn LocalVisionBackend> = Arc::new(DeterministicFakeVisionBackend);
213                install_local_vision_backend(Arc::clone(&fake));
214                fake
215            } else {
216                return Err(RedDBError::FeatureNotEnabled(
217                    LOCAL_VISION_DISABLED_MESSAGE.to_string(),
218                ));
219            }
220        }
221    };
222
223    backend.analyze(&VisionRequest {
224        model: model.to_string(),
225        image_bytes,
226        want_detections,
227        want_embedding,
228    })
229}
230
231/// Fetch the bytes of an image referenced by a row field.
232///
233/// The reference is a URL/URI (ADR 0057 stores the reference, never the
234/// bytes). Supported schemes:
235///   * `file://<path>` and bare filesystem paths — read from disk;
236///   * `http://` / `https://` — fetched via `ureq` (rustls).
237///
238/// Network/IO failures surface as [`RedDBError`] so the enrichment lane's
239/// retry-with-backoff and dead-letter machinery handles them like any
240/// other provider failure.
241pub fn fetch_image_bytes(reference: &str) -> RedDBResult<Vec<u8>> {
242    let reference = reference.trim();
243    if reference.is_empty() {
244        return Err(RedDBError::Query(
245            "vision image reference is empty".to_string(),
246        ));
247    }
248
249    if let Some(rest) = reference.strip_prefix("file://") {
250        // Tolerate the `file://localhost/path` authority form.
251        let path = rest.strip_prefix("localhost").unwrap_or(rest);
252        return std::fs::read(path)
253            .map_err(|err| RedDBError::Internal(format!("read image '{path}': {err}")));
254    }
255
256    if reference.starts_with("http://") || reference.starts_with("https://") {
257        return fetch_http_image(reference);
258    }
259
260    // Bare filesystem path.
261    std::fs::read(reference)
262        .map_err(|err| RedDBError::Internal(format!("read image '{reference}': {err}")))
263}
264
265fn fetch_http_image(url: &str) -> RedDBResult<Vec<u8>> {
266    let agent: ureq::Agent = ureq::Agent::config_builder()
267        .timeout_connect(Some(Duration::from_secs(15)))
268        .timeout_send_request(Some(Duration::from_secs(30)))
269        .timeout_recv_response(Some(Duration::from_secs(30)))
270        .timeout_recv_body(Some(Duration::from_secs(120)))
271        .build()
272        .into();
273
274    let mut resp = agent
275        .get(url)
276        .call()
277        .map_err(|err| RedDBError::Internal(format!("HTTP GET image '{url}': {err}")))?;
278
279    let status = resp.status().as_u16();
280    if status != 200 {
281        return Err(RedDBError::Internal(format!(
282            "HTTP GET image '{url}': status {status}"
283        )));
284    }
285
286    resp.body_mut()
287        .read_to_vec()
288        .map_err(|err| RedDBError::Internal(format!("read image body from '{url}': {err}")))
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294
295    #[test]
296    fn deterministic_fake_is_pure() {
297        let req = VisionRequest {
298            model: "fake-vision".to_string(),
299            image_bytes: b"some image bytes".to_vec(),
300            want_detections: true,
301            want_embedding: true,
302        };
303        let a = DeterministicFakeVisionBackend.analyze(&req).expect("a");
304        let b = DeterministicFakeVisionBackend.analyze(&req).expect("b");
305        assert_eq!(a, b, "fake vision backend must be pure");
306        assert_eq!(a.detections.len(), 2);
307        assert_eq!(a.embedding.as_ref().map(Vec::len), Some(FAKE_EMBEDDING_DIM));
308    }
309
310    #[test]
311    fn detections_and_embedding_are_gated_by_request() {
312        let base = VisionRequest {
313            model: "m".to_string(),
314            image_bytes: b"img".to_vec(),
315            want_detections: false,
316            want_embedding: false,
317        };
318        let none = DeterministicFakeVisionBackend.analyze(&base).expect("none");
319        assert!(none.detections.is_empty());
320        assert!(none.embedding.is_none());
321
322        let detect_only = DeterministicFakeVisionBackend
323            .analyze(&VisionRequest {
324                want_detections: true,
325                ..base.clone()
326            })
327            .expect("detect");
328        assert!(!detect_only.detections.is_empty());
329        assert!(detect_only.embedding.is_none());
330    }
331
332    #[test]
333    fn fetch_reads_file_uri_and_bare_path() {
334        let dir = std::env::temp_dir();
335        let path = dir.join("reddb_vision_fetch_fixture.bin");
336        std::fs::write(&path, b"\x89PNG fixture").expect("write fixture");
337
338        let via_bare = fetch_image_bytes(path.to_str().expect("utf8 path")).expect("bare");
339        assert_eq!(via_bare, b"\x89PNG fixture");
340
341        let uri = format!("file://{}", path.to_str().expect("utf8 path"));
342        let via_uri = fetch_image_bytes(&uri).expect("file uri");
343        assert_eq!(via_uri, b"\x89PNG fixture");
344
345        let _ = std::fs::remove_file(&path);
346    }
347
348    #[test]
349    fn fetch_rejects_empty_reference() {
350        assert!(fetch_image_bytes("   ").is_err());
351    }
352}