Skip to main content

fathomdb_engine/
vector_projection_actor.rs

1//! Background actor that drives the `vector_projection_work` queue.
2//!
3//! Modeled on [`crate::rebuild_actor::RebuildActor`]: one OS thread,
4//! `std::sync::mpsc` wakeups, `JoinHandle` for shutdown.  All writes to
5//! `vec_<kind>` and to `vector_projection_work` state are issued through
6//! [`crate::writer::WriterActor`] — the actor never opens a second write
7//! connection.
8//!
9//! Drop order invariant (enforced by field order in
10//! [`crate::runtime::EngineRuntime`]):
11//! readers (`coordinator`) → `writer` → `vector_actor` → `rebuild` → `lock`.
12//! The vector actor drops BEFORE the rebuild actor so that any in-flight
13//! writer submissions from its thread are already rejected by the time the
14//! rebuild thread's connection is torn down.
15
16use std::sync::mpsc;
17use std::thread;
18use std::time::Duration;
19
20use serde::Serialize;
21
22use crate::AdminService;
23use crate::EngineError;
24use crate::embedder::BatchEmbedder;
25use crate::writer::{
26    VectorProjectionApplyRequest, VectorProjectionClaimRequest, VectorProjectionDiscard,
27    VectorProjectionSuccess, VectorWorkClaim, WriterActor,
28};
29
30/// Target batch size for incremental (priority >= 1000) work rows.
31pub(crate) const INCREMENTAL_BATCH: usize = 64;
32/// Target batch size for backfill (priority < 1000) work rows.
33pub(crate) const BACKFILL_SLICE: usize = 32;
34/// Idle-loop polling interval.
35const IDLE_POLL: Duration = Duration::from_millis(250);
36
37/// Signals sent to the projection actor's channel.
38#[derive(Debug)]
39pub(crate) enum VectorWorkSignal {
40    /// Best-effort wakeup notification.  Reserved for Pack F/G scheduler.
41    #[allow(dead_code)]
42    Wakeup,
43    /// Terminate the actor loop.
44    Shutdown,
45}
46
47/// Report returned from [`AdminService::drain_vector_projection`](crate::AdminService::drain_vector_projection).
48#[derive(Clone, Debug, Default, Serialize)]
49pub struct DrainReport {
50    /// Number of incremental (priority >= 1000) work rows that produced a
51    /// vec row in this drain.
52    pub incremental_processed: u64,
53    /// Number of backfill (priority < 1000) work rows that produced a vec
54    /// row in this drain.
55    pub backfill_processed: u64,
56    /// Number of rows that produced a hard failure (e.g. embedder output
57    /// wrong dimension).
58    pub failed: u64,
59    /// Number of rows whose `canonical_hash` mismatched the current chunk
60    /// and were marked `discarded`.
61    pub discarded_stale: u64,
62    /// Number of ticks that were aborted because the embedder was
63    /// unavailable.
64    pub embedder_unavailable_ticks: u64,
65}
66
67/// Background actor that serializes projection work ticks.
68#[derive(Debug)]
69pub struct VectorProjectionActor {
70    thread_handle: Option<thread::JoinHandle<()>>,
71    sender: Option<mpsc::SyncSender<VectorWorkSignal>>,
72}
73
74impl VectorProjectionActor {
75    /// Start the actor thread.
76    ///
77    /// The actor holds a clone of the writer-actor handle so it can submit
78    /// claim/apply transactions.  It does NOT receive an embedder at
79    /// construction time — production flows (Pack G) wire one via
80    /// `AdminService::drain_vector_projection` or future hooks.  Until an
81    /// embedder is provided, the actor loop idles.
82    ///
83    /// # Errors
84    /// Returns [`EngineError::Io`] if the thread cannot be spawned.
85    pub fn start(_writer: &WriterActor) -> Result<Self, EngineError> {
86        let (sender, receiver) = mpsc::sync_channel::<VectorWorkSignal>(16);
87        // The production loop currently only reacts to shutdown and tick
88        // wakeups — actual drain work is driven on-demand by
89        // `AdminService::drain_vector_projection` because embedders are
90        // supplied at call-time.  This keeps the actor alive for drop-order
91        // discipline and future scheduler work (Pack F/G).
92        let handle = thread::Builder::new()
93            .name("fathomdb-vector-projection".to_owned())
94            .spawn(move || {
95                vector_projection_loop(&receiver);
96            })
97            .map_err(EngineError::Io)?;
98        Ok(Self {
99            thread_handle: Some(handle),
100            sender: Some(sender),
101        })
102    }
103}
104
105impl Drop for VectorProjectionActor {
106    fn drop(&mut self) {
107        if let Some(sender) = self.sender.take() {
108            // Best-effort shutdown signal; ignore send failures (thread may
109            // already be exiting).
110            let _ = sender.try_send(VectorWorkSignal::Shutdown);
111            drop(sender);
112        }
113        if let Some(handle) = self.thread_handle.take() {
114            match handle.join() {
115                Ok(()) => {}
116                Err(payload) => {
117                    if std::thread::panicking() {
118                        trace_warn!(
119                            "vector projection thread panicked during shutdown (suppressed: already panicking)"
120                        );
121                    } else {
122                        std::panic::resume_unwind(payload);
123                    }
124                }
125            }
126        }
127    }
128}
129
130fn vector_projection_loop(receiver: &mpsc::Receiver<VectorWorkSignal>) {
131    trace_info!("vector projection thread started");
132    loop {
133        match receiver.recv_timeout(IDLE_POLL) {
134            Ok(VectorWorkSignal::Shutdown) | Err(mpsc::RecvTimeoutError::Disconnected) => break,
135            // Wakeup/timeout tick: no-op today. Production scheduling is
136            // currently driven by admin drain calls; future commits will
137            // invoke `run_tick` here once an embedder resolver is wired.
138            Ok(VectorWorkSignal::Wakeup) | Err(mpsc::RecvTimeoutError::Timeout) => {}
139        }
140    }
141    trace_info!("vector projection thread exiting");
142}
143
144/// Run a single scheduling tick: claim up to `INCREMENTAL_BATCH` incremental
145/// rows; if none found, claim up to `BACKFILL_SLICE` backfill rows.  Embed
146/// them via `embedder`, apply the results through `writer`.
147///
148/// Returns the per-tick accounting used to build [`DrainReport`].
149///
150/// # Errors
151/// Returns [`EngineError`] if the writer claim/apply steps fail.
152#[allow(clippy::too_many_lines)]
153pub(crate) fn run_tick(
154    admin: &AdminService,
155    writer: &WriterActor,
156    embedder: &dyn BatchEmbedder,
157) -> Result<TickReport, EngineError> {
158    // Step 1: claim incremental first.
159    let mut claims = writer.claim_vector_projection(VectorProjectionClaimRequest {
160        min_priority: 1000,
161        limit: INCREMENTAL_BATCH,
162    })?;
163    let mut is_incremental = true;
164
165    if claims.is_empty() {
166        claims = writer.claim_vector_projection(VectorProjectionClaimRequest {
167            min_priority: i64::MIN,
168            limit: BACKFILL_SLICE,
169        })?;
170        is_incremental = false;
171    }
172
173    if claims.is_empty() {
174        return Ok(TickReport {
175            processed_incremental: 0,
176            processed_backfill: 0,
177            failed: 0,
178            discarded_stale: 0,
179            embedder_unavailable: false,
180            idle: true,
181        });
182    }
183
184    // Step 2: determine which claims are immediately discardable
185    // (hash mismatch, chunk missing, profile mismatch).
186    let active_profile_id: Option<i64> = admin.active_embedding_profile_id()?;
187
188    let mut successes: Vec<VectorProjectionSuccess> = Vec::new();
189    let mut discards: Vec<VectorProjectionDiscard> = Vec::new();
190    let mut embeddable: Vec<VectorWorkClaim> = Vec::new();
191
192    for claim in claims {
193        if claim.chunk_missing {
194            discards.push(VectorProjectionDiscard {
195                work_id: claim.work_id,
196                reason: Some("chunk no longer exists".to_owned()),
197            });
198            continue;
199        }
200        let current_hash = crate::admin::canonical_chunk_hash(&claim.chunk_id, &claim.text_content);
201        if current_hash != claim.canonical_hash {
202            discards.push(VectorProjectionDiscard {
203                work_id: claim.work_id,
204                reason: Some("canonical_hash mismatch".to_owned()),
205            });
206            continue;
207        }
208        if let Some(pid) = active_profile_id
209            && claim.embedding_profile_id != pid
210        {
211            discards.push(VectorProjectionDiscard {
212                work_id: claim.work_id,
213                reason: Some("embedding profile changed".to_owned()),
214            });
215            continue;
216        }
217        embeddable.push(claim);
218    }
219
220    // Step 3: embed (if any embeddable rows).
221    let mut embedder_unavailable = false;
222    let mut failed_count: u64 = 0;
223    if !embeddable.is_empty() {
224        let texts: Vec<String> = embeddable.iter().map(|c| c.text_content.clone()).collect();
225        match embedder.batch_embed(&texts) {
226            Ok(vectors) if vectors.len() == embeddable.len() => {
227                let identity = embedder.identity();
228                for (claim, vector) in embeddable.iter().zip(vectors) {
229                    if vector.len() != identity.dimension || vector.iter().any(|v| !v.is_finite()) {
230                        discards.push(VectorProjectionDiscard {
231                            work_id: claim.work_id,
232                            reason: Some("embedder returned invalid vector".to_owned()),
233                        });
234                        failed_count += 1;
235                        continue;
236                    }
237                    successes.push(VectorProjectionSuccess {
238                        work_id: claim.work_id,
239                        kind: claim.kind.clone(),
240                        chunk_id: claim.chunk_id.clone(),
241                        embedding: vector,
242                    });
243                }
244            }
245            // Size-mismatch OR explicit error: treat as embedder failure and
246            // revert claimed rows to pending.
247            Ok(_) | Err(_) => {
248                embedder_unavailable = true;
249            }
250        }
251    }
252
253    // Step 4: build the apply request (reverts = all embeddable rows if
254    // embedder unavailable).
255    let reverts: Vec<i64> = if embedder_unavailable {
256        embeddable.iter().map(|c| c.work_id).collect()
257    } else {
258        Vec::new()
259    };
260    let revert_error = if embedder_unavailable {
261        Some("embedder unavailable".to_owned())
262    } else {
263        None
264    };
265
266    let apply = VectorProjectionApplyRequest {
267        successes,
268        discards,
269        reverts,
270        revert_error,
271    };
272
273    let processed_successes = u64::try_from(apply.successes.len()).unwrap_or(0);
274    let discarded = u64::try_from(apply.discards.len()).unwrap_or(0);
275
276    writer.apply_vector_projection(apply)?;
277
278    Ok(TickReport {
279        processed_incremental: if is_incremental {
280            processed_successes
281        } else {
282            0
283        },
284        processed_backfill: if is_incremental {
285            0
286        } else {
287            processed_successes
288        },
289        failed: failed_count,
290        discarded_stale: discarded - failed_count,
291        embedder_unavailable,
292        idle: false,
293    })
294}
295
296/// Outcome of a single tick.
297#[derive(Clone, Debug, Default)]
298pub(crate) struct TickReport {
299    pub processed_incremental: u64,
300    pub processed_backfill: u64,
301    pub failed: u64,
302    pub discarded_stale: u64,
303    pub embedder_unavailable: bool,
304    pub idle: bool,
305}