use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use serde::Serialize;
use crate::AdminService;
use crate::EngineError;
use crate::embedder::BatchEmbedder;
use crate::writer::{
VectorProjectionApplyRequest, VectorProjectionClaimRequest, VectorProjectionDiscard,
VectorProjectionSuccess, VectorWorkClaim, WriterActor,
};
pub(crate) const INCREMENTAL_BATCH: usize = 64;
pub(crate) const BACKFILL_SLICE: usize = 32;
const IDLE_POLL: Duration = Duration::from_millis(250);
#[derive(Debug)]
pub(crate) enum VectorWorkSignal {
#[allow(dead_code)]
Wakeup,
Shutdown,
}
#[derive(Clone, Debug, Default, Serialize)]
pub struct DrainReport {
pub incremental_processed: u64,
pub backfill_processed: u64,
pub failed: u64,
pub discarded_stale: u64,
pub embedder_unavailable_ticks: u64,
}
#[derive(Debug)]
pub struct VectorProjectionActor {
thread_handle: Option<thread::JoinHandle<()>>,
sender: Option<mpsc::SyncSender<VectorWorkSignal>>,
}
impl VectorProjectionActor {
pub fn start(_writer: &WriterActor) -> Result<Self, EngineError> {
let (sender, receiver) = mpsc::sync_channel::<VectorWorkSignal>(16);
let handle = thread::Builder::new()
.name("fathomdb-vector-projection".to_owned())
.spawn(move || {
vector_projection_loop(&receiver);
})
.map_err(EngineError::Io)?;
Ok(Self {
thread_handle: Some(handle),
sender: Some(sender),
})
}
}
impl Drop for VectorProjectionActor {
fn drop(&mut self) {
if let Some(sender) = self.sender.take() {
let _ = sender.try_send(VectorWorkSignal::Shutdown);
drop(sender);
}
if let Some(handle) = self.thread_handle.take() {
match handle.join() {
Ok(()) => {}
Err(payload) => {
if std::thread::panicking() {
trace_warn!(
"vector projection thread panicked during shutdown (suppressed: already panicking)"
);
} else {
std::panic::resume_unwind(payload);
}
}
}
}
}
}
fn vector_projection_loop(receiver: &mpsc::Receiver<VectorWorkSignal>) {
trace_info!("vector projection thread started");
loop {
match receiver.recv_timeout(IDLE_POLL) {
Ok(VectorWorkSignal::Shutdown) | Err(mpsc::RecvTimeoutError::Disconnected) => break,
Ok(VectorWorkSignal::Wakeup) | Err(mpsc::RecvTimeoutError::Timeout) => {}
}
}
trace_info!("vector projection thread exiting");
}
#[allow(clippy::too_many_lines)]
pub(crate) fn run_tick(
admin: &AdminService,
writer: &WriterActor,
embedder: &dyn BatchEmbedder,
) -> Result<TickReport, EngineError> {
let mut claims = writer.claim_vector_projection(VectorProjectionClaimRequest {
min_priority: 1000,
limit: INCREMENTAL_BATCH,
})?;
let mut is_incremental = true;
if claims.is_empty() {
claims = writer.claim_vector_projection(VectorProjectionClaimRequest {
min_priority: i64::MIN,
limit: BACKFILL_SLICE,
})?;
is_incremental = false;
}
if claims.is_empty() {
return Ok(TickReport {
processed_incremental: 0,
processed_backfill: 0,
failed: 0,
discarded_stale: 0,
embedder_unavailable: false,
idle: true,
});
}
let active_profile_id: Option<i64> = admin.active_embedding_profile_id()?;
let mut successes: Vec<VectorProjectionSuccess> = Vec::new();
let mut discards: Vec<VectorProjectionDiscard> = Vec::new();
let mut embeddable: Vec<VectorWorkClaim> = Vec::new();
for claim in claims {
if claim.chunk_missing {
discards.push(VectorProjectionDiscard {
work_id: claim.work_id,
reason: Some("chunk no longer exists".to_owned()),
});
continue;
}
let current_hash = crate::admin::canonical_chunk_hash(&claim.chunk_id, &claim.text_content);
if current_hash != claim.canonical_hash {
discards.push(VectorProjectionDiscard {
work_id: claim.work_id,
reason: Some("canonical_hash mismatch".to_owned()),
});
continue;
}
if let Some(pid) = active_profile_id
&& claim.embedding_profile_id != pid
{
discards.push(VectorProjectionDiscard {
work_id: claim.work_id,
reason: Some("embedding profile changed".to_owned()),
});
continue;
}
embeddable.push(claim);
}
let mut embedder_unavailable = false;
let mut failed_count: u64 = 0;
if !embeddable.is_empty() {
let texts: Vec<String> = embeddable.iter().map(|c| c.text_content.clone()).collect();
match embedder.batch_embed(&texts) {
Ok(vectors) if vectors.len() == embeddable.len() => {
let identity = embedder.identity();
for (claim, vector) in embeddable.iter().zip(vectors) {
if vector.len() != identity.dimension || vector.iter().any(|v| !v.is_finite()) {
discards.push(VectorProjectionDiscard {
work_id: claim.work_id,
reason: Some("embedder returned invalid vector".to_owned()),
});
failed_count += 1;
continue;
}
successes.push(VectorProjectionSuccess {
work_id: claim.work_id,
kind: claim.kind.clone(),
chunk_id: claim.chunk_id.clone(),
embedding: vector,
});
}
}
Ok(_) | Err(_) => {
embedder_unavailable = true;
}
}
}
let reverts: Vec<i64> = if embedder_unavailable {
embeddable.iter().map(|c| c.work_id).collect()
} else {
Vec::new()
};
let revert_error = if embedder_unavailable {
Some("embedder unavailable".to_owned())
} else {
None
};
let apply = VectorProjectionApplyRequest {
successes,
discards,
reverts,
revert_error,
};
let processed_successes = u64::try_from(apply.successes.len()).unwrap_or(0);
let discarded = u64::try_from(apply.discards.len()).unwrap_or(0);
writer.apply_vector_projection(apply)?;
Ok(TickReport {
processed_incremental: if is_incremental {
processed_successes
} else {
0
},
processed_backfill: if is_incremental {
0
} else {
processed_successes
},
failed: failed_count,
discarded_stale: discarded - failed_count,
embedder_unavailable,
idle: false,
})
}
#[derive(Clone, Debug, Default)]
pub(crate) struct TickReport {
pub processed_incremental: u64,
pub processed_backfill: u64,
pub failed: u64,
pub discarded_stale: u64,
pub embedder_unavailable: bool,
pub idle: bool,
}