Skip to main content

lash_protocol_rlm/projection/
transport.rs

1use std::sync::Arc;
2
3use lash_core::{SessionAppendNode, ToolArgumentProjectionPolicy};
4use lash_rlm_types::{PROJECTED_JSON_TAG, PROJECTION_REF_JSON_TAG};
5use lashlang::{
6    ImageValue, ProjectedFuture, ProjectedValue, Record as FlowRecord, State as FlowState,
7    Value as FlowValue,
8};
9use serde_json::Value;
10
11use super::bindings::{ProjectionRef, ProjectionResolver, RlmProjectedSeedError};
12
13#[derive(Default, Debug, Clone, PartialEq, Eq)]
14pub struct RlmSeed {
15    pub projected: lash_rlm_types::RlmProjectedSeedSnapshot,
16    pub globals: serde_json::Map<String, Value>,
17}
18
19impl RlmSeed {
20    pub fn from_tool_args(args: &Value) -> Result<Self, String> {
21        match args.get("seed") {
22            None => Ok(Self::default()),
23            Some(seed) => Self::from_seed_value(seed),
24        }
25    }
26
27    pub fn from_seed_value(seed: &Value) -> Result<Self, String> {
28        let raw = match seed {
29            Value::Null => return Ok(Self::default()),
30            Value::Object(map) => map,
31            _ => return Err("`seed` must be a record/dict".to_string()),
32        };
33        let mut out = Self::default();
34        for (name, value) in raw.iter() {
35            if let Some(inner) = lash_rlm_types::projection_inner(value) {
36                out.projected.push(name.clone(), inner.clone());
37            } else {
38                out.globals.insert(name.clone(), value.clone());
39            }
40        }
41        Ok(out)
42    }
43
44    pub fn is_empty(&self) -> bool {
45        self.globals.is_empty() && self.projected.is_empty()
46    }
47
48    pub fn into_event_body(self) -> lash_rlm_types::RlmSeedPluginBody {
49        lash_rlm_types::RlmSeedPluginBody {
50            globals: self.globals,
51            projected: self.projected,
52        }
53    }
54}
55
56pub fn rlm_seed_initial_nodes(seed: RlmSeed) -> Vec<SessionAppendNode> {
57    if seed.is_empty() {
58        return Vec::new();
59    }
60    vec![SessionAppendNode::protocol_event(
61        super::context::rlm_protocol_event(lash_rlm_types::RlmProtocolEvent::RlmSeed(
62            seed.into_event_body(),
63        )),
64    )]
65}
66
67pub(crate) fn normalize_tool_args_for_projection(
68    args: Value,
69    policy: &ToolArgumentProjectionPolicy,
70) -> Value {
71    match policy {
72        ToolArgumentProjectionPolicy::MaterializeProjectedValues => {
73            materialize_projected_json(args)
74        }
75        ToolArgumentProjectionPolicy::PreserveProjectedRefsInField { field } => {
76            normalize_seed_preserving_tool_args(args, field)
77        }
78    }
79}
80
81#[cfg(test)]
82pub(crate) async fn flow_record_to_tool_args(
83    record: &FlowRecord,
84    policy: &ToolArgumentProjectionPolicy,
85) -> Value {
86    normalize_tool_args_for_projection(flow_record_to_json_value(record).await, policy)
87}
88
89fn normalize_seed_preserving_tool_args(args: Value, field: &str) -> Value {
90    let Value::Object(args) = args else {
91        return materialize_projected_json(args);
92    };
93    Value::Object(
94        args.into_iter()
95            .map(|(key, value)| {
96                let value = if key == field {
97                    normalize_projected_seed(value)
98                } else {
99                    materialize_projected_json(value)
100                };
101                (key, value)
102            })
103            .collect(),
104    )
105}
106
107fn normalize_projected_seed(seed: Value) -> Value {
108    let Value::Object(seed) = seed else {
109        return materialize_projected_json(seed);
110    };
111    Value::Object(
112        seed.into_iter()
113            .map(|(key, value)| {
114                let value = if lash_rlm_types::projection_inner(&value).is_some() {
115                    value
116                } else {
117                    materialize_projected_json(value)
118                };
119                (key, value)
120            })
121            .collect(),
122    )
123}
124
125fn materialize_projected_json(value: Value) -> Value {
126    if let Some(inner) = lash_rlm_types::projection_inner(&value) {
127        return materialize_projected_json(inner.clone());
128    }
129    match value {
130        Value::Array(items) => {
131            Value::Array(items.into_iter().map(materialize_projected_json).collect())
132        }
133        Value::Object(map) => Value::Object(
134            map.into_iter()
135                .map(|(key, value)| (key, materialize_projected_json(value)))
136                .collect(),
137        ),
138        value => value,
139    }
140}
141
142pub(crate) fn flow_to_json_value<'a>(value: &'a FlowValue) -> ProjectedFuture<'a, Value> {
143    Box::pin(async move {
144        match value {
145            FlowValue::Null => Value::Null,
146            FlowValue::Bool(value) => Value::Bool(*value),
147            FlowValue::Number(value) => json_number(*value),
148            FlowValue::String(value) => Value::String(value.to_string()),
149            FlowValue::Image(image) => serde_json::to_value(image)
150                .unwrap_or_else(|_| Value::Object(serde_json::Map::new())),
151            FlowValue::Resource(resource) => serde_json::to_value(resource)
152                .unwrap_or_else(|_| Value::Object(serde_json::Map::new())),
153            FlowValue::List(values) => {
154                let mut out = Vec::with_capacity(values.len());
155                for value in values.iter() {
156                    out.push(flow_to_json_value(value).await);
157                }
158                Value::Array(out)
159            }
160            FlowValue::Record(record) => flow_record_to_json_value(record).await,
161            FlowValue::Projected(value) => {
162                if let Some(reference) = value.projection_ref() {
163                    let mut ref_obj = serde_json::Map::with_capacity(1);
164                    ref_obj.insert(PROJECTION_REF_JSON_TAG.to_string(), reference.clone());
165                    let mut obj = serde_json::Map::with_capacity(1);
166                    obj.insert(PROJECTED_JSON_TAG.to_string(), Value::Object(ref_obj));
167                    return Value::Object(obj);
168                }
169                let inner = flow_to_json_value(&value.materialize_async().await).await;
170                let mut obj = serde_json::Map::with_capacity(1);
171                obj.insert(PROJECTED_JSON_TAG.to_string(), inner);
172                Value::Object(obj)
173            }
174        }
175    })
176}
177
178pub(crate) async fn flow_record_to_json_value(record: &FlowRecord) -> Value {
179    let mut object = serde_json::Map::with_capacity(record.len());
180    for (key, value) in record.iter() {
181        object.insert(key.to_string(), flow_to_json_value(value).await);
182    }
183    Value::Object(object)
184}
185
186fn json_number(value: f64) -> Value {
187    if value.is_finite() && value.fract() == 0.0 {
188        let as_i64 = value as i64 as f64;
189        if as_i64 == value {
190            return Value::Number(serde_json::Number::from(value as i64));
191        }
192        let as_u64 = value as u64 as f64;
193        if as_u64 == value {
194            return Value::Number(serde_json::Number::from(value as u64));
195        }
196    }
197    serde_json::Number::from_f64(value)
198        .map(Value::Number)
199        .unwrap_or(Value::Null)
200}
201
202pub(crate) fn json_to_flow_value(value: Value) -> FlowValue {
203    match value {
204        Value::Null => FlowValue::Null,
205        Value::Bool(value) => FlowValue::Bool(value),
206        Value::Number(value) => FlowValue::Number(value.as_f64().unwrap_or_default()),
207        Value::String(value) => FlowValue::String(value.into()),
208        Value::Array(values) => {
209            FlowValue::List(values.into_iter().map(json_to_flow_value).collect())
210        }
211        Value::Object(map) => json_map_to_image(&map)
212            .map(FlowValue::Image)
213            .unwrap_or_else(|| {
214                FlowValue::Record(Arc::new(
215                    map.into_iter()
216                        .map(|(key, value)| (key, json_to_flow_value(value)))
217                        .collect::<FlowRecord>(),
218                ))
219            }),
220    }
221}
222
223pub(crate) async fn rehydrate_projected_globals(
224    rlm: &mut FlowState,
225    projection_resolver: Arc<dyn ProjectionResolver>,
226) -> Result<(), String> {
227    let mut snapshot = rlm.snapshot();
228    let mut changed = false;
229    let keys = snapshot
230        .globals
231        .keys()
232        .map(str::to_string)
233        .collect::<Vec<_>>();
234    for key in keys {
235        if let Some(value) = snapshot.globals.get_mut(&key) {
236            changed |= rehydrate_projected_value(value, Arc::clone(&projection_resolver)).await?;
237        }
238    }
239    if changed {
240        *rlm = FlowState::from_snapshot(snapshot);
241    }
242    Ok(())
243}
244
245fn rehydrate_projected_value<'a>(
246    value: &'a mut FlowValue,
247    projection_resolver: Arc<dyn ProjectionResolver>,
248) -> ProjectedFuture<'a, Result<bool, String>> {
249    Box::pin(async move {
250        match value {
251            FlowValue::Projected(projected) => {
252                let Some(ref_json) = projected.projection_ref().cloned() else {
253                    return Ok(false);
254                };
255                let name = projected.name().to_string();
256                let reference = serde_json::from_value::<ProjectionRef>(ref_json.clone())
257                    .map_err(|err| format!("invalid projection ref for `{name}`: {err}"))?;
258                let resolved = projection_resolver
259                    .resolve_projection(&reference)
260                    .await
261                    .map_err(|err| err.to_string())?;
262                *value = FlowValue::Projected(ProjectedValue::custom_with_projection_ref(
263                    name, resolved, ref_json,
264                ));
265                Ok(true)
266            }
267            FlowValue::List(values) => {
268                let mut changed = false;
269                let mut restored = values.iter().cloned().collect::<Vec<_>>();
270                for value in restored.iter_mut() {
271                    changed |=
272                        rehydrate_projected_value(value, Arc::clone(&projection_resolver)).await?;
273                }
274                if changed {
275                    *value = FlowValue::List(restored.into());
276                }
277                Ok(changed)
278            }
279            FlowValue::Record(record) => {
280                let mut changed = false;
281                let record = Arc::make_mut(record);
282                let keys = record.keys().map(str::to_string).collect::<Vec<_>>();
283                for key in keys {
284                    if let Some(value) = record.get_mut(&key) {
285                        changed |=
286                            rehydrate_projected_value(value, Arc::clone(&projection_resolver))
287                                .await?;
288                    }
289                }
290                Ok(changed)
291            }
292            FlowValue::Null
293            | FlowValue::Bool(_)
294            | FlowValue::Number(_)
295            | FlowValue::String(_)
296            | FlowValue::Resource(_)
297            | FlowValue::Image(_) => Ok(false),
298        }
299    })
300}
301
302fn json_map_to_image(map: &serde_json::Map<String, Value>) -> Option<ImageValue> {
303    if map.get("type")?.as_str()? != "image" {
304        return None;
305    }
306    Some(ImageValue::new(
307        map.get("id")?.as_str()?.to_string(),
308        map.get("label")?.as_str()?.to_string(),
309        map.get("size")?.as_u64()?,
310        optional_json_u32(map.get("width")?)?,
311        optional_json_u32(map.get("height")?)?,
312    ))
313}
314
315fn optional_json_u32(value: &Value) -> Option<Option<u32>> {
316    match value {
317        Value::Null => Some(None),
318        Value::Number(number) => number
319            .as_u64()
320            .and_then(|value| u32::try_from(value).ok())
321            .map(Some),
322        _ => None,
323    }
324}
325
326pub(crate) async fn format_output_value(value: &FlowValue) -> String {
327    match value {
328        FlowValue::Null => "null".to_string(),
329        FlowValue::String(text) => text.to_string(),
330        FlowValue::Bool(value) => value.to_string(),
331        FlowValue::Number(value) => value.to_string(),
332        FlowValue::Image(_)
333        | FlowValue::Resource(_)
334        | FlowValue::List(_)
335        | FlowValue::Record(_)
336        | FlowValue::Projected(_) => serde_json::to_string(&flow_to_json_value(value).await)
337            .unwrap_or_else(|_| value.to_string()),
338    }
339}
340
341pub(crate) fn projection_ref_from_seed_value(
342    name: &str,
343    value: &Value,
344) -> Result<Option<ProjectionRef>, RlmProjectedSeedError> {
345    let Some(inner) = lash_rlm_types::projection_ref_inner(value) else {
346        return Ok(None);
347    };
348    serde_json::from_value::<ProjectionRef>(inner.clone())
349        .map(Some)
350        .map_err(|err| RlmProjectedSeedError::invalid_projection_ref(name, err))
351}