fathomdb_engine/
vector_projection_actor.rs1use std::sync::mpsc;
17use std::thread;
18use std::time::Duration;
19
20use serde::Serialize;
21
22use crate::AdminService;
23use crate::EngineError;
24use crate::embedder::BatchEmbedder;
25use crate::writer::{
26 VectorProjectionApplyRequest, VectorProjectionClaimRequest, VectorProjectionDiscard,
27 VectorProjectionSuccess, VectorWorkClaim, WriterActor,
28};
29
30pub(crate) const INCREMENTAL_BATCH: usize = 64;
32pub(crate) const BACKFILL_SLICE: usize = 32;
34const IDLE_POLL: Duration = Duration::from_millis(250);
36
37#[derive(Debug)]
39pub(crate) enum VectorWorkSignal {
40 #[allow(dead_code)]
42 Wakeup,
43 Shutdown,
45}
46
47#[derive(Clone, Debug, Default, Serialize)]
49pub struct DrainReport {
50 pub incremental_processed: u64,
53 pub backfill_processed: u64,
56 pub failed: u64,
59 pub discarded_stale: u64,
62 pub embedder_unavailable_ticks: u64,
65}
66
67#[derive(Debug)]
69pub struct VectorProjectionActor {
70 thread_handle: Option<thread::JoinHandle<()>>,
71 sender: Option<mpsc::SyncSender<VectorWorkSignal>>,
72}
73
74impl VectorProjectionActor {
75 pub fn start(_writer: &WriterActor) -> Result<Self, EngineError> {
86 let (sender, receiver) = mpsc::sync_channel::<VectorWorkSignal>(16);
87 let handle = thread::Builder::new()
93 .name("fathomdb-vector-projection".to_owned())
94 .spawn(move || {
95 vector_projection_loop(&receiver);
96 })
97 .map_err(EngineError::Io)?;
98 Ok(Self {
99 thread_handle: Some(handle),
100 sender: Some(sender),
101 })
102 }
103}
104
105impl Drop for VectorProjectionActor {
106 fn drop(&mut self) {
107 if let Some(sender) = self.sender.take() {
108 let _ = sender.try_send(VectorWorkSignal::Shutdown);
111 drop(sender);
112 }
113 if let Some(handle) = self.thread_handle.take() {
114 match handle.join() {
115 Ok(()) => {}
116 Err(payload) => {
117 if std::thread::panicking() {
118 trace_warn!(
119 "vector projection thread panicked during shutdown (suppressed: already panicking)"
120 );
121 } else {
122 std::panic::resume_unwind(payload);
123 }
124 }
125 }
126 }
127 }
128}
129
130fn vector_projection_loop(receiver: &mpsc::Receiver<VectorWorkSignal>) {
131 trace_info!("vector projection thread started");
132 loop {
133 match receiver.recv_timeout(IDLE_POLL) {
134 Ok(VectorWorkSignal::Shutdown) | Err(mpsc::RecvTimeoutError::Disconnected) => break,
135 Ok(VectorWorkSignal::Wakeup) | Err(mpsc::RecvTimeoutError::Timeout) => {}
139 }
140 }
141 trace_info!("vector projection thread exiting");
142}
143
144#[allow(clippy::too_many_lines)]
153pub(crate) fn run_tick(
154 admin: &AdminService,
155 writer: &WriterActor,
156 embedder: &dyn BatchEmbedder,
157) -> Result<TickReport, EngineError> {
158 let mut claims = writer.claim_vector_projection(VectorProjectionClaimRequest {
160 min_priority: 1000,
161 limit: INCREMENTAL_BATCH,
162 })?;
163 let mut is_incremental = true;
164
165 if claims.is_empty() {
166 claims = writer.claim_vector_projection(VectorProjectionClaimRequest {
167 min_priority: i64::MIN,
168 limit: BACKFILL_SLICE,
169 })?;
170 is_incremental = false;
171 }
172
173 if claims.is_empty() {
174 return Ok(TickReport {
175 processed_incremental: 0,
176 processed_backfill: 0,
177 failed: 0,
178 discarded_stale: 0,
179 embedder_unavailable: false,
180 idle: true,
181 });
182 }
183
184 let active_profile_id: Option<i64> = admin.active_embedding_profile_id()?;
187
188 let mut successes: Vec<VectorProjectionSuccess> = Vec::new();
189 let mut discards: Vec<VectorProjectionDiscard> = Vec::new();
190 let mut embeddable: Vec<VectorWorkClaim> = Vec::new();
191
192 for claim in claims {
193 if claim.chunk_missing {
194 discards.push(VectorProjectionDiscard {
195 work_id: claim.work_id,
196 reason: Some("chunk no longer exists".to_owned()),
197 });
198 continue;
199 }
200 let current_hash = crate::admin::canonical_chunk_hash(&claim.chunk_id, &claim.text_content);
201 if current_hash != claim.canonical_hash {
202 discards.push(VectorProjectionDiscard {
203 work_id: claim.work_id,
204 reason: Some("canonical_hash mismatch".to_owned()),
205 });
206 continue;
207 }
208 if let Some(pid) = active_profile_id
209 && claim.embedding_profile_id != pid
210 {
211 discards.push(VectorProjectionDiscard {
212 work_id: claim.work_id,
213 reason: Some("embedding profile changed".to_owned()),
214 });
215 continue;
216 }
217 embeddable.push(claim);
218 }
219
220 let mut embedder_unavailable = false;
222 let mut failed_count: u64 = 0;
223 if !embeddable.is_empty() {
224 let texts: Vec<String> = embeddable.iter().map(|c| c.text_content.clone()).collect();
225 match embedder.batch_embed(&texts) {
226 Ok(vectors) if vectors.len() == embeddable.len() => {
227 let identity = embedder.identity();
228 for (claim, vector) in embeddable.iter().zip(vectors) {
229 if vector.len() != identity.dimension || vector.iter().any(|v| !v.is_finite()) {
230 discards.push(VectorProjectionDiscard {
231 work_id: claim.work_id,
232 reason: Some("embedder returned invalid vector".to_owned()),
233 });
234 failed_count += 1;
235 continue;
236 }
237 successes.push(VectorProjectionSuccess {
238 work_id: claim.work_id,
239 kind: claim.kind.clone(),
240 chunk_id: claim.chunk_id.clone(),
241 embedding: vector,
242 });
243 }
244 }
245 Ok(_) | Err(_) => {
248 embedder_unavailable = true;
249 }
250 }
251 }
252
253 let reverts: Vec<i64> = if embedder_unavailable {
256 embeddable.iter().map(|c| c.work_id).collect()
257 } else {
258 Vec::new()
259 };
260 let revert_error = if embedder_unavailable {
261 Some("embedder unavailable".to_owned())
262 } else {
263 None
264 };
265
266 let apply = VectorProjectionApplyRequest {
267 successes,
268 discards,
269 reverts,
270 revert_error,
271 };
272
273 let processed_successes = u64::try_from(apply.successes.len()).unwrap_or(0);
274 let discarded = u64::try_from(apply.discards.len()).unwrap_or(0);
275
276 writer.apply_vector_projection(apply)?;
277
278 Ok(TickReport {
279 processed_incremental: if is_incremental {
280 processed_successes
281 } else {
282 0
283 },
284 processed_backfill: if is_incremental {
285 0
286 } else {
287 processed_successes
288 },
289 failed: failed_count,
290 discarded_stale: discarded - failed_count,
291 embedder_unavailable,
292 idle: false,
293 })
294}
295
296#[derive(Clone, Debug, Default)]
298pub(crate) struct TickReport {
299 pub processed_incremental: u64,
300 pub processed_backfill: u64,
301 pub failed: u64,
302 pub discarded_stale: u64,
303 pub embedder_unavailable: bool,
304 pub idle: bool,
305}