axiomsync 1.0.0

Core data-processing engine for AxiomSync local retrieval runtime.
Documentation
use std::collections::HashMap;

use crate::om::{
    OmContinuationSourceKind, OmContinuationStateV2, OmObservationChunk, OmObservationEntryV2,
    OmObservationOriginKind, OmObservationPriority,
};
use crate::state::OmActiveEntry;

use super::{OM_HINT_SNAPSHOT_BUFFERED_TAIL_LIMIT, non_empty_trimmed};

#[derive(Debug, Clone, Default)]
pub(super) struct SnapshotVisibleEntrySelection {
    pub(super) selected_entry_ids: Vec<String>,
    pub(super) activated_visible_entry_ids: Vec<String>,
    pub(super) buffered_visible_entry_ids: Vec<String>,
}

pub(super) fn build_snapshot_activated_entries(
    scope_key: &str,
    active_entries: &[OmActiveEntry],
) -> Vec<OmObservationEntryV2> {
    active_entries
        .iter()
        .filter_map(|entry| {
            non_empty_trimmed(&entry.text).map(|text| OmObservationEntryV2 {
                entry_id: entry.entry_id.clone(),
                scope_key: scope_key.to_string(),
                thread_id: entry.canonical_thread_id.clone(),
                priority: parse_observation_priority(entry.priority.as_str()),
                text: text.to_string(),
                source_message_ids: Vec::new(),
                origin_kind: parse_observation_origin(entry.origin_kind.as_str()),
                created_at_rfc3339: entry.created_at.to_rfc3339(),
                superseded_by: None,
            })
        })
        .collect::<Vec<_>>()
}

pub(super) fn build_snapshot_buffered_entries(
    scope_key: &str,
    fallback_thread_id: &str,
    buffered_chunks: &[OmObservationChunk],
) -> (Vec<OmObservationEntryV2>, Vec<String>) {
    let selected_buffered = buffered_chunks
        .iter()
        .rev()
        .filter_map(|chunk| {
            let normalized = chunk.observations.trim();
            if normalized.is_empty() {
                None
            } else {
                Some((
                    chunk.id.clone(),
                    normalized.to_string(),
                    chunk.message_ids.clone(),
                    chunk.created_at.to_rfc3339(),
                ))
            }
        })
        .take(OM_HINT_SNAPSHOT_BUFFERED_TAIL_LIMIT)
        .collect::<Vec<_>>();
    let buffered_chunk_ids = selected_buffered
        .iter()
        .map(|(chunk_id, _, _, _)| chunk_id.clone())
        .collect::<Vec<_>>();
    let buffered_entries = selected_buffered
        .into_iter()
        .map(
            |(chunk_id, text, message_ids, created_at_rfc3339)| OmObservationEntryV2 {
                entry_id: format!("buffered:{scope_key}:{chunk_id}"),
                scope_key: scope_key.to_string(),
                thread_id: fallback_thread_id.to_string(),
                priority: infer_buffered_entry_priority(&text),
                text,
                source_message_ids: message_ids,
                origin_kind: OmObservationOriginKind::Chunk,
                created_at_rfc3339,
                superseded_by: None,
            },
        )
        .collect::<Vec<_>>();
    (buffered_entries, buffered_chunk_ids)
}

pub(super) fn infer_buffered_entry_priority(text: &str) -> OmObservationPriority {
    let lowered = text.to_ascii_lowercase();
    if text.contains('🔴')
        || lowered.starts_with("priority:high")
        || lowered.contains(" priority:high")
        || lowered.starts_with("high:")
        || lowered.starts_with("[high]")
        || lowered.contains("\npriority:high")
        || lowered.contains("\nhigh:")
        || lowered.contains("\n[high]")
    {
        OmObservationPriority::High
    } else {
        OmObservationPriority::Medium
    }
}

pub(super) fn build_snapshot_continuation_state(
    record: &crate::om::OmRecord,
    fallback_thread_id: &str,
    current_task: &Option<String>,
    suggested_response: &Option<String>,
    updated_at_rfc3339: &str,
) -> Option<OmContinuationStateV2> {
    if current_task.is_none() && suggested_response.is_none() {
        return None;
    }
    Some(OmContinuationStateV2 {
        scope_key: record.scope_key.clone(),
        thread_id: fallback_thread_id.to_string(),
        current_task: current_task.clone(),
        suggested_response: suggested_response.clone(),
        confidence_milli: 1000,
        source_kind: OmContinuationSourceKind::ObserverDeterministic,
        source_message_ids: Vec::new(),
        updated_at_rfc3339: updated_at_rfc3339.to_string(),
        staleness_budget_ms: 0,
    })
}

pub(super) fn snapshot_visible_observation_text(entries: &[OmObservationEntryV2]) -> String {
    entries
        .iter()
        .filter_map(|entry| non_empty_trimmed(&entry.text).map(ToString::to_string))
        .collect::<Vec<_>>()
        .join("\n\n")
}

pub(super) fn snapshot_visible_entry_selection(
    entries: &[OmObservationEntryV2],
) -> SnapshotVisibleEntrySelection {
    let mut selection = SnapshotVisibleEntrySelection::default();
    let mut ordered_sources = Vec::<String>::new();
    let mut selected_by_source = HashMap::<String, (String, bool)>::new();
    for entry in entries {
        let Some(entry_id) = non_empty_trimmed(&entry.entry_id) else {
            continue;
        };
        let selected_entry_id = entry_id.to_string();
        let is_buffered = selected_entry_id.starts_with("buffered:");
        let source_key = snapshot_visible_entry_source_key(entry_id);
        if let Some(existing) = selected_by_source.get_mut(&source_key) {
            if existing.1 && !is_buffered {
                *existing = (selected_entry_id, false);
            }
            continue;
        }
        ordered_sources.push(source_key.clone());
        selected_by_source.insert(source_key, (selected_entry_id, is_buffered));
    }
    for source_key in ordered_sources {
        if let Some((selected_entry_id, is_buffered)) = selected_by_source.remove(&source_key) {
            if is_buffered {
                selection
                    .buffered_visible_entry_ids
                    .push(selected_entry_id.clone());
            } else {
                selection
                    .activated_visible_entry_ids
                    .push(selected_entry_id.clone());
            }
            selection.selected_entry_ids.push(selected_entry_id);
        }
    }
    selection
}

pub(super) fn snapshot_visible_entry_source_key(entry_id: &str) -> String {
    if let Some(chunk_id) = entry_id.strip_prefix("observation:") {
        return format!("chunk:{}", chunk_id.trim());
    }
    if let Some(buffered_tail) = entry_id.strip_prefix("buffered:")
        && let Some(chunk_id) = buffered_tail.rsplit(':').next()
    {
        return format!("chunk:{}", chunk_id.trim());
    }
    format!("entry:{}", entry_id.trim())
}

fn parse_observation_priority(value: &str) -> OmObservationPriority {
    match value.trim().to_ascii_lowercase().as_str() {
        "high" => OmObservationPriority::High,
        "low" => OmObservationPriority::Low,
        _ => OmObservationPriority::Medium,
    }
}

fn parse_observation_origin(value: &str) -> OmObservationOriginKind {
    match value.trim().to_ascii_lowercase().as_str() {
        "reflection" => OmObservationOriginKind::Reflection,
        "chunk" => OmObservationOriginKind::Chunk,
        "summary" => OmObservationOriginKind::Summary,
        _ => OmObservationOriginKind::Observation,
    }
}