axiomsync 1.0.1

Local retrieval runtime and CLI for AxiomSync.
Documentation
use std::collections::HashSet;

use chrono::{DateTime, Utc};
use uuid::Uuid;

#[cfg(test)]
use super::super::scope_binding;
use super::super::{
    OmObservationChunk, OmObserverMessageCandidate, OmOriginType, OmRecord, OmScopeBinding,
    combine_observations_for_buffering, estimate_text_tokens,
};

pub(in crate::session::om) fn new_om_record(
    scope: &OmScopeBinding,
    now: DateTime<Utc>,
) -> OmRecord {
    OmRecord {
        id: Uuid::new_v4().to_string(),
        scope: scope.scope,
        scope_key: scope.scope_key.clone(),
        session_id: scope.session_id.clone(),
        thread_id: scope.thread_id.clone(),
        resource_id: scope.resource_id.clone(),
        generation_count: 0,
        last_applied_outbox_event_id: None,
        origin_type: OmOriginType::Initial,
        active_observations: String::new(),
        observation_token_count: 0,
        pending_message_tokens: 0,
        last_observed_at: None,
        current_task: None,
        suggested_response: None,
        last_activated_message_ids: Vec::new(),
        observer_trigger_count_total: 0,
        reflector_trigger_count_total: 0,
        is_observing: false,
        is_reflecting: false,
        is_buffering_observation: false,
        is_buffering_reflection: false,
        last_buffered_at_tokens: 0,
        last_buffered_at_time: None,
        buffered_reflection: None,
        buffered_reflection_tokens: None,
        buffered_reflection_input_tokens: None,
        created_at: now,
        updated_at: now,
    }
}

#[cfg(test)]
pub(in crate::session::om) fn new_session_om_record(
    session_id: &str,
    scope_key: &str,
    now: DateTime<Utc>,
) -> OmRecord {
    let scope = OmScopeBinding {
        scope: super::super::OmScope::Session,
        scope_key: scope_key.to_string(),
        session_id: Some(session_id.to_string()),
        thread_id: None,
        resource_id: None,
    };
    new_om_record(&scope, now)
}

pub(in crate::session::om) fn observed_message_ids_set(
    activated_ids: &[String],
    buffered_chunks: &[OmObservationChunk],
) -> HashSet<String> {
    let mut out = HashSet::new();
    for id in activated_ids {
        if !id.trim().is_empty() {
            out.insert(id.clone());
        }
    }
    for chunk in buffered_chunks {
        for id in &chunk.message_ids {
            if !id.trim().is_empty() {
                out.insert(id.clone());
            }
        }
    }
    out
}

pub(in crate::session::om) fn buffered_observations_text(
    buffered_chunks: &[OmObservationChunk],
) -> String {
    buffered_chunks
        .iter()
        .map(|chunk| chunk.observations.trim())
        .filter(|text| !text.is_empty())
        .collect::<Vec<_>>()
        .join("\n\n")
}

pub(in crate::session::om) fn record_with_buffered_observation_context(
    record: &OmRecord,
    buffered_chunks: &[OmObservationChunk],
    active_observations_max_chars: usize,
) -> OmRecord {
    let buffered_text = buffered_observations_text(buffered_chunks);
    let Some(combined) =
        combine_observations_for_buffering(&record.active_observations, &buffered_text)
    else {
        return record.clone();
    };
    let mut out = record.clone();
    out.active_observations =
        truncate_chars(&combined, active_observations_max_chars.saturating_mul(2));
    out
}

pub(in crate::session::om) fn build_observation_chunk(
    record_id: &str,
    selected: &[OmObserverMessageCandidate],
    buffered_chunks: &[OmObservationChunk],
    now: DateTime<Utc>,
    observations_text: &str,
    observation_max_chars: usize,
) -> Option<OmObservationChunk> {
    if selected.is_empty() {
        return None;
    }

    let observation = truncate_chars(
        &normalize_observation_text(observations_text),
        observation_max_chars,
    );
    if observation.trim().is_empty() {
        return None;
    }

    let message_tokens = selected.iter().fold(0u32, |sum, item| {
        sum.saturating_add(estimate_text_tokens(&item.text))
    });
    if message_tokens == 0 {
        return None;
    }

    let message_ids = selected
        .iter()
        .map(|item| item.id.clone())
        .collect::<Vec<_>>();
    let (cycle_anchor_id, last_observed_at) = selected
        .iter()
        .max_by(|a, b| {
            a.created_at
                .cmp(&b.created_at)
                .then_with(|| a.id.cmp(&b.id))
        })
        .map(|item| (item.id.clone(), item.created_at))?;
    let next_seq = buffered_chunks
        .last()
        .map_or(1, |chunk| chunk.seq.saturating_add(1));

    Some(OmObservationChunk {
        id: Uuid::new_v4().to_string(),
        record_id: record_id.to_string(),
        seq: next_seq,
        cycle_id: format!("observer_sync:{cycle_anchor_id}"),
        observations: observation.clone(),
        token_count: estimate_text_tokens(&observation),
        message_tokens,
        message_ids,
        last_observed_at,
        created_at: now,
    })
}

pub(in crate::session::om) fn normalize_text(text: &str) -> String {
    text.split_whitespace().collect::<Vec<_>>().join(" ")
}

pub(in crate::session::om) fn normalize_observation_text(text: &str) -> String {
    text.lines()
        .map(str::trim)
        .filter(|line| !line.is_empty())
        .collect::<Vec<_>>()
        .join("\n")
}

pub(in crate::session::om) fn truncate_chars(text: &str, max_chars: usize) -> String {
    text.chars().take(max_chars).collect::<String>()
}

#[cfg(test)]
pub(in crate::session::om) fn parse_env_enabled_default_true(raw: Option<&str>) -> bool {
    scope_binding::parse_env_enabled_default_true(raw)
}