axiomsync 1.0.1

Local retrieval runtime and CLI for AxiomSync.
Documentation
use reqwest::Url;
use reqwest::blocking::Client;
use serde_json::Value;

use super::super::{
    ObserverThreadStateUpdate, OmInferenceFailureKind, OmInferenceModelConfig, OmObserverConfig,
    OmObserverMessageCandidate, OmObserverPromptInput, OmObserverRequest, OmObserverResponse,
    OmPendingMessage, OmRecord, Result, build_observer_prompt_contract_v2,
    build_observer_system_prompt, build_observer_user_prompt, build_other_conversation_blocks,
    estimate_text_tokens, format_observer_messages_for_prompt, om_observer_error, om_status_kind,
    parse_local_loopback_endpoint,
};
use super::parsing::parse_llm_observer_response;
use super::record::{normalize_observation_text, normalize_text, truncate_chars};

pub(in crate::session::om) fn build_observer_endpoint(config: &OmObserverConfig) -> Result<Url> {
    parse_local_loopback_endpoint(&config.llm.endpoint, "om observer endpoint", "local host")
        .map_err(|err| {
            om_observer_error(
                OmInferenceFailureKind::Fatal,
                format!("invalid endpoint: {err}"),
            )
        })
}

pub(in crate::session::om) fn build_observer_client(config: &OmObserverConfig) -> Result<Client> {
    Client::builder()
        .timeout(std::time::Duration::from_millis(config.llm.timeout_ms))
        .build()
        .map_err(|err| {
            om_observer_error(
                OmInferenceFailureKind::Fatal,
                format!("client build failed: {err}"),
            )
        })
}

pub(in crate::session::om) fn build_observer_llm_request(
    record: &OmRecord,
    scope_key: &str,
    config: &OmObserverConfig,
    pending_candidates: &[OmObserverMessageCandidate],
    other_conversation_candidates: &[OmObserverMessageCandidate],
) -> OmObserverRequest {
    OmObserverRequest {
        scope: record.scope,
        scope_key: scope_key.to_string(),
        model: OmInferenceModelConfig {
            provider: "local-http".to_string(),
            model: config.llm.model.clone(),
            max_output_tokens: config.llm.max_output_tokens,
            temperature_milli: config.llm.temperature_milli,
        },
        active_observations: truncate_chars(
            &normalize_observation_text(&record.active_observations),
            config.text_budget.active_observations_max_chars,
        ),
        other_conversations: build_other_conversation_blocks(
            other_conversation_candidates,
            None,
            config.text_budget.other_conversation_max_part_chars,
        ),
        pending_messages: pending_candidates
            .iter()
            .map(|item| OmPendingMessage {
                id: item.id.clone(),
                role: item.role.clone(),
                text: normalize_text(&item.text),
                created_at_rfc3339: Some(item.created_at.to_rfc3339()),
            })
            .collect::<Vec<_>>(),
    }
}
pub(in crate::session::om) fn run_single_thread_observer_response(
    client: &Client,
    endpoint: &Url,
    config: &OmObserverConfig,
    request: &OmObserverRequest,
    pending_candidates: &[OmObserverMessageCandidate],
    skip_continuation_hints: bool,
) -> Result<(OmObserverResponse, Vec<ObserverThreadStateUpdate>)> {
    let system_prompt = build_observer_system_prompt();
    let message_history = format_observer_messages_for_prompt(&request.pending_messages);
    let known_ids = pending_candidates
        .iter()
        .map(|item| item.id.clone())
        .collect::<Vec<_>>();
    let request_json = observer_prompt_contract_json(
        request,
        pending_candidates,
        skip_continuation_hints,
        config.text_budget.observation_max_chars,
    )?;
    let user_prompt = build_observer_user_prompt(OmObserverPromptInput {
        request_json: Some(request_json.as_str()),
        existing_observations: Some(&request.active_observations),
        message_history: &message_history,
        other_conversation_context: request.other_conversations.as_deref(),
        skip_continuation_hints,
    });
    let value = send_observer_llm_request(client, endpoint, config, &system_prompt, &user_prompt)?;
    Ok((
        parse_llm_observer_response(&value, &known_ids, config.text_budget.observation_max_chars)?,
        Vec::new(),
    ))
}

fn observer_prompt_contract_json(
    request: &OmObserverRequest,
    pending_candidates: &[OmObserverMessageCandidate],
    skip_continuation_hints: bool,
    observation_max_chars: usize,
) -> Result<String> {
    let known_ids = pending_candidates
        .iter()
        .map(|item| item.id.clone())
        .collect::<Vec<_>>();
    let preferred_thread_id = pending_candidates
        .iter()
        .find_map(|item| {
            item.source_thread_id
                .as_deref()
                .filter(|value| !value.trim().is_empty())
        })
        .or_else(|| {
            pending_candidates.iter().find_map(|item| {
                item.source_session_id
                    .as_deref()
                    .filter(|value| !value.trim().is_empty())
            })
        });
    let request_contract = build_observer_prompt_contract_v2(
        request,
        &known_ids,
        skip_continuation_hints,
        preferred_thread_id,
        observation_max_chars,
    );
    serde_json::to_string_pretty(&request_contract).map_err(|err| {
        om_observer_error(
            OmInferenceFailureKind::Schema,
            format!("failed to encode observer prompt contract json: {err}"),
        )
    })
}

pub(in crate::session::om) fn send_observer_llm_request(
    client: &Client,
    endpoint: &Url,
    config: &OmObserverConfig,
    system_prompt: &str,
    user_prompt: &str,
) -> Result<Value> {
    let payload = serde_json::json!({
        "model": config.llm.model,
        "messages": [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        "stream": false,
        "options": {
            "temperature": (f64::from(config.llm.temperature_milli) / 1000.0),
            "num_predict": config.llm.max_output_tokens
        }
    });
    let response = client
        .post(endpoint.clone())
        .json(&payload)
        .send()
        .map_err(|err| {
            om_observer_error(
                OmInferenceFailureKind::Transient,
                format!("request failed: {err}"),
            )
        })?;
    if !response.status().is_success() {
        let status = response.status();
        return Err(om_observer_error(
            om_status_kind(status),
            format!("non-success status: {status}"),
        ));
    }
    response.json::<Value>().map_err(|err| {
        om_observer_error(
            OmInferenceFailureKind::Schema,
            format!("invalid json response: {err}"),
        )
    })
}
pub(in crate::session::om) fn select_messages_for_observer_llm(
    selected: &[OmObserverMessageCandidate],
    max_chars_per_message: usize,
    max_input_tokens: u32,
) -> Vec<OmObserverMessageCandidate> {
    let mut kept = Vec::<OmObserverMessageCandidate>::new();
    let mut total_tokens = 0u32;

    for item in selected.iter().rev() {
        let bounded_text = truncate_chars(&normalize_text(&item.text), max_chars_per_message);
        if bounded_text.is_empty() {
            continue;
        }
        let bounded = OmObserverMessageCandidate {
            id: item.id.clone(),
            role: item.role.clone(),
            text: bounded_text,
            created_at: item.created_at,
            source_thread_id: item.source_thread_id.clone(),
            source_session_id: item.source_session_id.clone(),
        };
        let item_tokens = estimate_text_tokens(&bounded.id)
            .saturating_add(estimate_text_tokens(&bounded.role))
            .saturating_add(estimate_text_tokens(&bounded.text))
            .saturating_add(8);

        if !kept.is_empty() && total_tokens.saturating_add(item_tokens) > max_input_tokens {
            break;
        }

        kept.push(bounded);
        total_tokens = total_tokens
            .saturating_add(item_tokens)
            .min(max_input_tokens);
    }

    kept.into_iter().rev().collect()
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::om::{OmInferenceModelConfig, OmPendingMessage, OmScope};

    #[test]
    fn observer_prompt_contract_json_contains_v2_contract_fields() {
        let request = OmObserverRequest {
            scope: OmScope::Session,
            scope_key: "session:s-contract".to_string(),
            model: OmInferenceModelConfig {
                provider: "local-http".to_string(),
                model: "qwen2.5:7b".to_string(),
                max_output_tokens: 512,
                temperature_milli: 0,
            },
            active_observations: "obs".to_string(),
            other_conversations: None,
            pending_messages: vec![OmPendingMessage {
                id: "m1".to_string(),
                role: "user".to_string(),
                text: "hello".to_string(),
                created_at_rfc3339: None,
            }],
        };
        let candidates = vec![OmObserverMessageCandidate {
            id: "m1".to_string(),
            role: "user".to_string(),
            text: "hello".to_string(),
            created_at: chrono::Utc::now(),
            source_thread_id: Some("thread-a".to_string()),
            source_session_id: Some("s-contract".to_string()),
        }];
        let encoded =
            observer_prompt_contract_json(&request, &candidates, false, 4096).expect("json");
        let value = serde_json::from_str::<serde_json::Value>(&encoded).expect("parse json");
        assert_eq!(value["header"]["contract_name"], "axiomsync.om.prompt");
        assert_eq!(value["header"]["contract_version"], "2.0.0");
        assert_eq!(value["header"]["protocol_version"], "om-v2");
        assert_eq!(value["header"]["request_kind"], "observer_single");
        assert_eq!(value["preferred_thread_id"], "thread-a");
    }
}