lash-protocol-rlm 0.1.0-alpha.38

RLM protocol (persistent Lashlang execution) for the lash agent runtime.
Documentation
use std::sync::Arc;

use lash_core::{SessionAppendNode, ToolArgumentProjectionPolicy};
use lash_rlm_types::{PROJECTED_JSON_TAG, PROJECTION_REF_JSON_TAG};
use lashlang::{
    ImageValue, ProjectedFuture, ProjectedValue, Record as FlowRecord, State as FlowState,
    Value as FlowValue,
};
use serde_json::Value;

use super::bindings::{ProjectionRef, ProjectionResolver, RlmProjectedSeedError};

#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct RlmSeed {
    pub projected: lash_rlm_types::RlmProjectedSeedSnapshot,
    pub globals: serde_json::Map<String, Value>,
}

impl RlmSeed {
    pub fn from_tool_args(args: &Value) -> Result<Self, String> {
        match args.get("seed") {
            None => Ok(Self::default()),
            Some(seed) => Self::from_seed_value(seed),
        }
    }

    pub fn from_seed_value(seed: &Value) -> Result<Self, String> {
        let raw = match seed {
            Value::Null => return Ok(Self::default()),
            Value::Object(map) => map,
            _ => return Err("`seed` must be a record/dict".to_string()),
        };
        let mut out = Self::default();
        for (name, value) in raw.iter() {
            if let Some(inner) = lash_rlm_types::projection_inner(value) {
                out.projected.push(name.clone(), inner.clone());
            } else {
                out.globals.insert(name.clone(), value.clone());
            }
        }
        Ok(out)
    }

    pub fn is_empty(&self) -> bool {
        self.globals.is_empty() && self.projected.is_empty()
    }

    pub fn into_event_body(self) -> lash_rlm_types::RlmSeedPluginBody {
        lash_rlm_types::RlmSeedPluginBody {
            globals: self.globals,
            projected: self.projected,
        }
    }
}

pub fn rlm_seed_initial_nodes(seed: RlmSeed) -> Vec<SessionAppendNode> {
    if seed.is_empty() {
        return Vec::new();
    }
    vec![SessionAppendNode::protocol_event(
        super::context::rlm_protocol_event(lash_rlm_types::RlmProtocolEvent::RlmSeed(
            seed.into_event_body(),
        )),
    )]
}

pub(crate) fn normalize_tool_args_for_projection(
    args: Value,
    policy: &ToolArgumentProjectionPolicy,
) -> Value {
    match policy {
        ToolArgumentProjectionPolicy::MaterializeProjectedValues => {
            materialize_projected_json(args)
        }
        ToolArgumentProjectionPolicy::PreserveProjectedRefsInField { field } => {
            normalize_seed_preserving_tool_args(args, field)
        }
    }
}

#[cfg(test)]
pub(crate) async fn flow_record_to_tool_args(
    record: &FlowRecord,
    policy: &ToolArgumentProjectionPolicy,
) -> Value {
    normalize_tool_args_for_projection(flow_record_to_json_value(record).await, policy)
}

fn normalize_seed_preserving_tool_args(args: Value, field: &str) -> Value {
    let Value::Object(args) = args else {
        return materialize_projected_json(args);
    };
    Value::Object(
        args.into_iter()
            .map(|(key, value)| {
                let value = if key == field {
                    normalize_projected_seed(value)
                } else {
                    materialize_projected_json(value)
                };
                (key, value)
            })
            .collect(),
    )
}

fn normalize_projected_seed(seed: Value) -> Value {
    let Value::Object(seed) = seed else {
        return materialize_projected_json(seed);
    };
    Value::Object(
        seed.into_iter()
            .map(|(key, value)| {
                let value = if lash_rlm_types::projection_inner(&value).is_some() {
                    value
                } else {
                    materialize_projected_json(value)
                };
                (key, value)
            })
            .collect(),
    )
}

fn materialize_projected_json(value: Value) -> Value {
    if let Some(inner) = lash_rlm_types::projection_inner(&value) {
        return materialize_projected_json(inner.clone());
    }
    match value {
        Value::Array(items) => {
            Value::Array(items.into_iter().map(materialize_projected_json).collect())
        }
        Value::Object(map) => Value::Object(
            map.into_iter()
                .map(|(key, value)| (key, materialize_projected_json(value)))
                .collect(),
        ),
        value => value,
    }
}

pub(crate) fn flow_to_json_value<'a>(value: &'a FlowValue) -> ProjectedFuture<'a, Value> {
    Box::pin(async move {
        match value {
            FlowValue::Null => Value::Null,
            FlowValue::Bool(value) => Value::Bool(*value),
            FlowValue::Number(value) => json_number(*value),
            FlowValue::String(value) => Value::String(value.to_string()),
            FlowValue::Image(image) => serde_json::to_value(image)
                .unwrap_or_else(|_| Value::Object(serde_json::Map::new())),
            FlowValue::Resource(resource) => serde_json::to_value(resource)
                .unwrap_or_else(|_| Value::Object(serde_json::Map::new())),
            FlowValue::List(values) => {
                let mut out = Vec::with_capacity(values.len());
                for value in values.iter() {
                    out.push(flow_to_json_value(value).await);
                }
                Value::Array(out)
            }
            FlowValue::Record(record) => flow_record_to_json_value(record).await,
            FlowValue::Projected(value) => {
                if let Some(reference) = value.projection_ref() {
                    let mut ref_obj = serde_json::Map::with_capacity(1);
                    ref_obj.insert(PROJECTION_REF_JSON_TAG.to_string(), reference.clone());
                    let mut obj = serde_json::Map::with_capacity(1);
                    obj.insert(PROJECTED_JSON_TAG.to_string(), Value::Object(ref_obj));
                    return Value::Object(obj);
                }
                let inner = flow_to_json_value(&value.materialize_async().await).await;
                let mut obj = serde_json::Map::with_capacity(1);
                obj.insert(PROJECTED_JSON_TAG.to_string(), inner);
                Value::Object(obj)
            }
        }
    })
}

pub(crate) async fn flow_record_to_json_value(record: &FlowRecord) -> Value {
    let mut object = serde_json::Map::with_capacity(record.len());
    for (key, value) in record.iter() {
        object.insert(key.to_string(), flow_to_json_value(value).await);
    }
    Value::Object(object)
}

fn json_number(value: f64) -> Value {
    if value.is_finite() && value.fract() == 0.0 {
        let as_i64 = value as i64 as f64;
        if as_i64 == value {
            return Value::Number(serde_json::Number::from(value as i64));
        }
        let as_u64 = value as u64 as f64;
        if as_u64 == value {
            return Value::Number(serde_json::Number::from(value as u64));
        }
    }
    serde_json::Number::from_f64(value)
        .map(Value::Number)
        .unwrap_or(Value::Null)
}

pub(crate) fn json_to_flow_value(value: Value) -> FlowValue {
    match value {
        Value::Null => FlowValue::Null,
        Value::Bool(value) => FlowValue::Bool(value),
        Value::Number(value) => FlowValue::Number(value.as_f64().unwrap_or_default()),
        Value::String(value) => FlowValue::String(value.into()),
        Value::Array(values) => {
            FlowValue::List(values.into_iter().map(json_to_flow_value).collect())
        }
        Value::Object(map) => json_map_to_image(&map)
            .map(FlowValue::Image)
            .unwrap_or_else(|| {
                FlowValue::Record(Arc::new(
                    map.into_iter()
                        .map(|(key, value)| (key, json_to_flow_value(value)))
                        .collect::<FlowRecord>(),
                ))
            }),
    }
}

pub(crate) async fn rehydrate_projected_globals(
    rlm: &mut FlowState,
    projection_resolver: Arc<dyn ProjectionResolver>,
) -> Result<(), String> {
    let mut snapshot = rlm.snapshot();
    let mut changed = false;
    let keys = snapshot
        .globals
        .keys()
        .map(str::to_string)
        .collect::<Vec<_>>();
    for key in keys {
        if let Some(value) = snapshot.globals.get_mut(&key) {
            changed |= rehydrate_projected_value(value, Arc::clone(&projection_resolver)).await?;
        }
    }
    if changed {
        *rlm = FlowState::from_snapshot(snapshot);
    }
    Ok(())
}

fn rehydrate_projected_value<'a>(
    value: &'a mut FlowValue,
    projection_resolver: Arc<dyn ProjectionResolver>,
) -> ProjectedFuture<'a, Result<bool, String>> {
    Box::pin(async move {
        match value {
            FlowValue::Projected(projected) => {
                let Some(ref_json) = projected.projection_ref().cloned() else {
                    return Ok(false);
                };
                let name = projected.name().to_string();
                let reference = serde_json::from_value::<ProjectionRef>(ref_json.clone())
                    .map_err(|err| format!("invalid projection ref for `{name}`: {err}"))?;
                let resolved = projection_resolver
                    .resolve_projection(&reference)
                    .await
                    .map_err(|err| err.to_string())?;
                *value = FlowValue::Projected(ProjectedValue::custom_with_projection_ref(
                    name, resolved, ref_json,
                ));
                Ok(true)
            }
            FlowValue::List(values) => {
                let mut changed = false;
                let mut restored = values.iter().cloned().collect::<Vec<_>>();
                for value in restored.iter_mut() {
                    changed |=
                        rehydrate_projected_value(value, Arc::clone(&projection_resolver)).await?;
                }
                if changed {
                    *value = FlowValue::List(restored.into());
                }
                Ok(changed)
            }
            FlowValue::Record(record) => {
                let mut changed = false;
                let record = Arc::make_mut(record);
                let keys = record.keys().map(str::to_string).collect::<Vec<_>>();
                for key in keys {
                    if let Some(value) = record.get_mut(&key) {
                        changed |=
                            rehydrate_projected_value(value, Arc::clone(&projection_resolver))
                                .await?;
                    }
                }
                Ok(changed)
            }
            FlowValue::Null
            | FlowValue::Bool(_)
            | FlowValue::Number(_)
            | FlowValue::String(_)
            | FlowValue::Resource(_)
            | FlowValue::Image(_) => Ok(false),
        }
    })
}

fn json_map_to_image(map: &serde_json::Map<String, Value>) -> Option<ImageValue> {
    if map.get("type")?.as_str()? != "image" {
        return None;
    }
    Some(ImageValue::new(
        map.get("id")?.as_str()?.to_string(),
        map.get("label")?.as_str()?.to_string(),
        map.get("size")?.as_u64()?,
        optional_json_u32(map.get("width")?)?,
        optional_json_u32(map.get("height")?)?,
    ))
}

fn optional_json_u32(value: &Value) -> Option<Option<u32>> {
    match value {
        Value::Null => Some(None),
        Value::Number(number) => number
            .as_u64()
            .and_then(|value| u32::try_from(value).ok())
            .map(Some),
        _ => None,
    }
}

pub(crate) async fn format_output_value(value: &FlowValue) -> String {
    match value {
        FlowValue::Null => "null".to_string(),
        FlowValue::String(text) => text.to_string(),
        FlowValue::Bool(value) => value.to_string(),
        FlowValue::Number(value) => value.to_string(),
        FlowValue::Image(_)
        | FlowValue::Resource(_)
        | FlowValue::List(_)
        | FlowValue::Record(_)
        | FlowValue::Projected(_) => serde_json::to_string(&flow_to_json_value(value).await)
            .unwrap_or_else(|_| value.to_string()),
    }
}

pub(crate) fn projection_ref_from_seed_value(
    name: &str,
    value: &Value,
) -> Result<Option<ProjectionRef>, RlmProjectedSeedError> {
    let Some(inner) = lash_rlm_types::projection_ref_inner(value) else {
        return Ok(None);
    };
    serde_json::from_value::<ProjectionRef>(inner.clone())
        .map(Some)
        .map_err(|err| RlmProjectedSeedError::invalid_projection_ref(name, err))
}