use crate::error::Result;
use crate::models::OutboxEvent;
use crate::state::{OmReflectionApplyContext, OmReflectionBufferPayload};
use crate::uri::AxiomUri;
use super::AxiomSync;
mod reflector;
use reflector::{
OmReflectorCallOptions, buffered_or_resolved_reflector_response, parse_observe_session_id,
parse_om_observe_buffer_requested_payload, parse_om_reflect_buffer_requested_payload,
parse_om_reflect_requested_payload, resolve_reflection_cover_entry_ids,
resolve_reflector_response,
};
impl AxiomSync {
pub(super) fn handle_outbox_event(&self, event: &OutboxEvent) -> Result<bool> {
match event.event_type.as_str() {
"semantic_scan" => {
let target = AxiomUri::parse(&event.uri)?;
self.prune_index_prefix_from_memory(&target)?;
self.state
.remove_search_documents_with_prefix(&target.to_string())?;
self.state
.remove_index_state_with_prefix(&target.to_string())?;
self.ensure_tiers_recursive(&target)?;
self.reindex_uri_tree(&target)?;
Ok(true)
}
"upsert" | "reindex" | "delete" => Ok(true),
"om_reflect_buffer_requested" => {
if !self.config.om.enabled {
return Ok(true);
}
self.handle_om_reflect_buffer_requested(event)
}
"om_observe_buffer_requested" => {
if !self.config.om.enabled {
return Ok(true);
}
self.handle_om_observe_buffer_requested(event)
}
"om_reflect_requested" => {
if !self.config.om.enabled {
return Ok(true);
}
self.handle_om_reflect_requested(event)
}
_ => Ok(false),
}
}
fn handle_om_reflect_buffer_requested(&self, event: &OutboxEvent) -> Result<bool> {
let payload = parse_om_reflect_buffer_requested_payload(&event.payload_json)?;
let scope_key = payload.scope_key.as_str();
let expected_generation = payload.expected_generation;
let Some(record) = self.state.get_om_record_by_scope_key(scope_key)? else {
return Ok(true);
};
if record.generation_count != expected_generation {
return Ok(true);
}
if record
.buffered_reflection
.as_deref()
.is_some_and(|value| !value.trim().is_empty())
{
return Ok(true);
}
let active_entries = self.state.list_om_active_entries(scope_key)?;
let reflection = resolve_reflector_response(
&record,
scope_key,
expected_generation,
OmReflectorCallOptions::BUFFERED,
&self.config.om.reflector,
&active_entries,
)?;
let _buffered = self.state.buffer_om_reflection_with_cas(
scope_key,
expected_generation,
OmReflectionBufferPayload {
reflection: &reflection.reflection,
reflection_token_count: reflection.reflection_token_count,
reflection_input_tokens: reflection.usage.input_tokens,
},
)?;
Ok(true)
}
fn handle_om_observe_buffer_requested(&self, event: &OutboxEvent) -> Result<bool> {
let payload = parse_om_observe_buffer_requested_payload(&event.payload_json)?;
let scope_key = payload.scope_key.as_str();
let expected_generation = payload.expected_generation;
let session_id = parse_observe_session_id(payload.session_id.as_deref(), &event.uri)?;
let session = self.session(Some(&session_id));
let _ = session.process_om_observe_buffer_requested(
scope_key,
expected_generation,
event.id,
)?;
Ok(true)
}
fn handle_om_reflect_requested(&self, event: &OutboxEvent) -> Result<bool> {
let payload = parse_om_reflect_requested_payload(&event.payload_json)?;
let scope_key = payload.scope_key.as_str();
let expected_generation = payload.expected_generation;
let Some(record) = self.state.get_om_record_by_scope_key(scope_key)? else {
return Ok(true);
};
if record.generation_count != expected_generation {
return Ok(true);
}
let active_entries = self.state.list_om_active_entries(scope_key)?;
let reflection = buffered_or_resolved_reflector_response(
&record,
scope_key,
expected_generation,
OmReflectorCallOptions::DEFAULT,
&self.config.om.reflector,
&active_entries,
)?;
let covers_entry_ids = resolve_reflection_cover_entry_ids(
&record,
OmReflectorCallOptions::DEFAULT,
&self.config.om.reflector,
&active_entries,
);
let _applied = self.state.apply_om_reflection_with_cas(
scope_key,
expected_generation,
event.id,
&reflection.reflection,
&covers_entry_ids,
OmReflectionApplyContext {
current_task: reflection.current_task.as_deref(),
suggested_response: reflection.suggested_response.as_deref(),
},
)?;
Ok(true)
}
}