use anyhow::{Context, Result};
use async_trait::async_trait;
use crate::memory::config::MemoryConfig;
use crate::memory::queue::ops::set_backfill_in_progress;
use crate::memory::queue::store;
use crate::memory::queue::types::{
AppendBufferPayload, AppendTarget, ExtractChunkPayload, FlushStalePayload, Job, JobKind,
JobOutcome, NewJob, NodeRef, ReembedBackfillPayload, SealDocumentPayload, SealPayload,
};
pub const L0_DEFAULT_FLUSH_AGE_SECS: i64 = 60 * 60;
pub const REEMBED_BACKFILL_REVISIT_MS: i64 = 750;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ExtractDecision {
pub kept: bool,
pub uses_document_subtree: bool,
pub tree_scope: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct AppendDecision {
pub tree_id: String,
pub should_seal: bool,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StaleBuffer {
pub tree_id: String,
pub level: u32,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ReembedProgress {
Wrote { more_pending: bool },
Covered,
NoProvider,
StaleSignature,
}
#[async_trait]
pub trait QueueDelegates: Send + Sync {
async fn extract_chunk(
&self,
config: &MemoryConfig,
chunk_id: &str,
) -> Result<Option<ExtractDecision>>;
async fn append_node(
&self,
config: &MemoryConfig,
node: &NodeRef,
target: &AppendTarget,
) -> Result<Option<AppendDecision>>;
async fn seal_level(
&self,
config: &MemoryConfig,
payload: &SealPayload,
) -> Result<Option<SealPayload>>;
async fn list_stale_buffers(
&self,
config: &MemoryConfig,
max_age_secs: i64,
) -> Result<Vec<StaleBuffer>>;
async fn seal_document(
&self,
config: &MemoryConfig,
payload: &SealDocumentPayload,
) -> Result<()>;
async fn reembed_batch(
&self,
config: &MemoryConfig,
signature: &str,
) -> Result<ReembedProgress>;
fn active_signature(&self, config: &MemoryConfig) -> String;
fn has_uncovered_reembed_work(&self, config: &MemoryConfig, signature: &str) -> Result<bool>;
}
pub async fn handle_job(
config: &MemoryConfig,
job: &Job,
delegates: &dyn QueueDelegates,
) -> Result<JobOutcome> {
match job.kind {
JobKind::ExtractChunk => handle_extract(config, job, delegates).await,
JobKind::AppendBuffer => handle_append_buffer(config, job, delegates).await,
JobKind::Seal => handle_seal(config, job, delegates).await,
JobKind::FlushStale => handle_flush_stale(config, job, delegates).await,
JobKind::ReembedBackfill => handle_reembed_backfill(config, job, delegates).await,
JobKind::SealDocument => handle_seal_document(config, job, delegates).await,
}
}
async fn handle_extract(
config: &MemoryConfig,
job: &Job,
delegates: &dyn QueueDelegates,
) -> Result<JobOutcome> {
let payload: ExtractChunkPayload =
serde_json::from_str(&job.payload_json).context("parse ExtractChunk payload")?;
let Some(decision) = delegates.extract_chunk(config, &payload.chunk_id).await? else {
return Ok(JobOutcome::Done);
};
if decision.kept && !decision.uses_document_subtree {
let follow_up = NewJob::append_buffer(&AppendBufferPayload {
node: NodeRef::Leaf {
chunk_id: payload.chunk_id.clone(),
},
target: AppendTarget::Source {
source_id: decision.tree_scope.clone(),
},
})?;
store::enqueue(config, &follow_up)?;
}
if decision.kept {
crate::memory::queue::ops::ensure_reembed_backfill(config, delegates)?;
}
Ok(JobOutcome::Done)
}
async fn handle_append_buffer(
config: &MemoryConfig,
job: &Job,
delegates: &dyn QueueDelegates,
) -> Result<JobOutcome> {
let payload: AppendBufferPayload =
serde_json::from_str(&job.payload_json).context("parse AppendBuffer payload")?;
let Some(decision) = delegates
.append_node(config, &payload.node, &payload.target)
.await?
else {
return Ok(JobOutcome::Done);
};
if decision.should_seal {
let seal = SealPayload {
tree_id: decision.tree_id,
level: 0,
force_now_ms: None,
};
store::enqueue(config, &NewJob::seal(&seal)?)?;
}
Ok(JobOutcome::Done)
}
async fn handle_seal(
config: &MemoryConfig,
job: &Job,
delegates: &dyn QueueDelegates,
) -> Result<JobOutcome> {
let payload: SealPayload =
serde_json::from_str(&job.payload_json).context("parse Seal payload")?;
if let Some(parent) = delegates.seal_level(config, &payload).await? {
store::enqueue(config, &NewJob::seal(&parent)?)?;
}
Ok(JobOutcome::Done)
}
async fn handle_flush_stale(
config: &MemoryConfig,
job: &Job,
delegates: &dyn QueueDelegates,
) -> Result<JobOutcome> {
let payload: FlushStalePayload =
serde_json::from_str(&job.payload_json).context("parse FlushStale payload")?;
let age_secs = payload.max_age_secs.unwrap_or(L0_DEFAULT_FLUSH_AGE_SECS);
let now_ms = chrono::Utc::now().timestamp_millis();
for buf in delegates.list_stale_buffers(config, age_secs).await? {
let seal = SealPayload {
tree_id: buf.tree_id,
level: buf.level,
force_now_ms: Some(now_ms),
};
store::enqueue(config, &NewJob::seal(&seal)?)?;
}
Ok(JobOutcome::Done)
}
async fn handle_seal_document(
config: &MemoryConfig,
job: &Job,
delegates: &dyn QueueDelegates,
) -> Result<JobOutcome> {
let payload: SealDocumentPayload =
serde_json::from_str(&job.payload_json).context("parse SealDocument payload")?;
if payload.chunk_ids.is_empty() {
return Ok(JobOutcome::Done);
}
delegates.seal_document(config, &payload).await?;
Ok(JobOutcome::Done)
}
async fn handle_reembed_backfill(
config: &MemoryConfig,
job: &Job,
delegates: &dyn QueueDelegates,
) -> Result<JobOutcome> {
let payload: ReembedBackfillPayload =
serde_json::from_str(&job.payload_json).context("parse ReembedBackfill payload")?;
match delegates.reembed_batch(config, &payload.signature).await? {
ReembedProgress::Wrote {
more_pending: true, ..
} => {
set_backfill_in_progress(true);
Ok(JobOutcome::Defer {
until_ms: chrono::Utc::now().timestamp_millis() + REEMBED_BACKFILL_REVISIT_MS,
reason: "re-embed backfill: batch done, more pending".to_string(),
})
}
ReembedProgress::Wrote {
more_pending: false,
}
| ReembedProgress::Covered
| ReembedProgress::NoProvider
| ReembedProgress::StaleSignature => {
set_backfill_in_progress(false);
Ok(JobOutcome::Done)
}
}
}
#[cfg(test)]
#[path = "handlers_tests.rs"]
mod tests;