lash_protocol_rlm/projection/
transport.rs1use std::sync::Arc;
2
3use lash_core::{SessionAppendNode, ToolArgumentProjectionPolicy};
4use lash_rlm_types::{PROJECTED_JSON_TAG, PROJECTION_REF_JSON_TAG};
5use lashlang::{
6 ImageValue, ProjectedFuture, ProjectedValue, Record as FlowRecord, State as FlowState,
7 Value as FlowValue,
8};
9use serde_json::Value;
10
11use super::bindings::{ProjectionRef, ProjectionResolver, RlmProjectedSeedError};
12
13#[derive(Default, Debug, Clone, PartialEq, Eq)]
14pub struct RlmSeed {
15 pub projected: lash_rlm_types::RlmProjectedSeedSnapshot,
16 pub globals: serde_json::Map<String, Value>,
17}
18
19impl RlmSeed {
20 pub fn from_tool_args(args: &Value) -> Result<Self, String> {
21 match args.get("seed") {
22 None => Ok(Self::default()),
23 Some(seed) => Self::from_seed_value(seed),
24 }
25 }
26
27 pub fn from_seed_value(seed: &Value) -> Result<Self, String> {
28 let raw = match seed {
29 Value::Null => return Ok(Self::default()),
30 Value::Object(map) => map,
31 _ => return Err("`seed` must be a record/dict".to_string()),
32 };
33 let mut out = Self::default();
34 for (name, value) in raw.iter() {
35 if let Some(inner) = lash_rlm_types::projection_inner(value) {
36 out.projected.push(name.clone(), inner.clone());
37 } else {
38 out.globals.insert(name.clone(), value.clone());
39 }
40 }
41 Ok(out)
42 }
43
44 pub fn is_empty(&self) -> bool {
45 self.globals.is_empty() && self.projected.is_empty()
46 }
47
48 pub fn into_event_body(self) -> lash_rlm_types::RlmSeedPluginBody {
49 lash_rlm_types::RlmSeedPluginBody {
50 globals: self.globals,
51 projected: self.projected,
52 }
53 }
54}
55
56pub fn rlm_seed_initial_nodes(seed: RlmSeed) -> Vec<SessionAppendNode> {
57 if seed.is_empty() {
58 return Vec::new();
59 }
60 vec![SessionAppendNode::protocol_event(
61 super::context::rlm_protocol_event(lash_rlm_types::RlmProtocolEvent::RlmSeed(
62 seed.into_event_body(),
63 )),
64 )]
65}
66
67pub(crate) fn normalize_tool_args_for_projection(
68 args: Value,
69 policy: &ToolArgumentProjectionPolicy,
70) -> Value {
71 match policy {
72 ToolArgumentProjectionPolicy::MaterializeProjectedValues => {
73 materialize_projected_json(args)
74 }
75 ToolArgumentProjectionPolicy::PreserveProjectedRefsInField { field } => {
76 normalize_seed_preserving_tool_args(args, field)
77 }
78 }
79}
80
81#[cfg(test)]
82pub(crate) async fn flow_record_to_tool_args(
83 record: &FlowRecord,
84 policy: &ToolArgumentProjectionPolicy,
85) -> Value {
86 normalize_tool_args_for_projection(flow_record_to_json_value(record).await, policy)
87}
88
89fn normalize_seed_preserving_tool_args(args: Value, field: &str) -> Value {
90 let Value::Object(args) = args else {
91 return materialize_projected_json(args);
92 };
93 Value::Object(
94 args.into_iter()
95 .map(|(key, value)| {
96 let value = if key == field {
97 normalize_projected_seed(value)
98 } else {
99 materialize_projected_json(value)
100 };
101 (key, value)
102 })
103 .collect(),
104 )
105}
106
107fn normalize_projected_seed(seed: Value) -> Value {
108 let Value::Object(seed) = seed else {
109 return materialize_projected_json(seed);
110 };
111 Value::Object(
112 seed.into_iter()
113 .map(|(key, value)| {
114 let value = if lash_rlm_types::projection_inner(&value).is_some() {
115 value
116 } else {
117 materialize_projected_json(value)
118 };
119 (key, value)
120 })
121 .collect(),
122 )
123}
124
125fn materialize_projected_json(value: Value) -> Value {
126 if let Some(inner) = lash_rlm_types::projection_inner(&value) {
127 return materialize_projected_json(inner.clone());
128 }
129 match value {
130 Value::Array(items) => {
131 Value::Array(items.into_iter().map(materialize_projected_json).collect())
132 }
133 Value::Object(map) => Value::Object(
134 map.into_iter()
135 .map(|(key, value)| (key, materialize_projected_json(value)))
136 .collect(),
137 ),
138 value => value,
139 }
140}
141
142pub(crate) fn flow_to_json_value<'a>(value: &'a FlowValue) -> ProjectedFuture<'a, Value> {
143 Box::pin(async move {
144 match value {
145 FlowValue::Null => Value::Null,
146 FlowValue::Bool(value) => Value::Bool(*value),
147 FlowValue::Number(value) => json_number(*value),
148 FlowValue::String(value) => Value::String(value.to_string()),
149 FlowValue::Image(image) => serde_json::to_value(image)
150 .unwrap_or_else(|_| Value::Object(serde_json::Map::new())),
151 FlowValue::Resource(resource) => serde_json::to_value(resource)
152 .unwrap_or_else(|_| Value::Object(serde_json::Map::new())),
153 FlowValue::List(values) => {
154 let mut out = Vec::with_capacity(values.len());
155 for value in values.iter() {
156 out.push(flow_to_json_value(value).await);
157 }
158 Value::Array(out)
159 }
160 FlowValue::Record(record) => flow_record_to_json_value(record).await,
161 FlowValue::Projected(value) => {
162 if let Some(reference) = value.projection_ref() {
163 let mut ref_obj = serde_json::Map::with_capacity(1);
164 ref_obj.insert(PROJECTION_REF_JSON_TAG.to_string(), reference.clone());
165 let mut obj = serde_json::Map::with_capacity(1);
166 obj.insert(PROJECTED_JSON_TAG.to_string(), Value::Object(ref_obj));
167 return Value::Object(obj);
168 }
169 let inner = flow_to_json_value(&value.materialize_async().await).await;
170 let mut obj = serde_json::Map::with_capacity(1);
171 obj.insert(PROJECTED_JSON_TAG.to_string(), inner);
172 Value::Object(obj)
173 }
174 }
175 })
176}
177
178pub(crate) async fn flow_record_to_json_value(record: &FlowRecord) -> Value {
179 let mut object = serde_json::Map::with_capacity(record.len());
180 for (key, value) in record.iter() {
181 object.insert(key.to_string(), flow_to_json_value(value).await);
182 }
183 Value::Object(object)
184}
185
186fn json_number(value: f64) -> Value {
187 if value.is_finite() && value.fract() == 0.0 {
188 let as_i64 = value as i64 as f64;
189 if as_i64 == value {
190 return Value::Number(serde_json::Number::from(value as i64));
191 }
192 let as_u64 = value as u64 as f64;
193 if as_u64 == value {
194 return Value::Number(serde_json::Number::from(value as u64));
195 }
196 }
197 serde_json::Number::from_f64(value)
198 .map(Value::Number)
199 .unwrap_or(Value::Null)
200}
201
202pub(crate) fn json_to_flow_value(value: Value) -> FlowValue {
203 match value {
204 Value::Null => FlowValue::Null,
205 Value::Bool(value) => FlowValue::Bool(value),
206 Value::Number(value) => FlowValue::Number(value.as_f64().unwrap_or_default()),
207 Value::String(value) => FlowValue::String(value.into()),
208 Value::Array(values) => {
209 FlowValue::List(values.into_iter().map(json_to_flow_value).collect())
210 }
211 Value::Object(map) => json_map_to_image(&map)
212 .map(FlowValue::Image)
213 .unwrap_or_else(|| {
214 FlowValue::Record(Arc::new(
215 map.into_iter()
216 .map(|(key, value)| (key, json_to_flow_value(value)))
217 .collect::<FlowRecord>(),
218 ))
219 }),
220 }
221}
222
223pub(crate) async fn rehydrate_projected_globals(
224 rlm: &mut FlowState,
225 projection_resolver: Arc<dyn ProjectionResolver>,
226) -> Result<(), String> {
227 let mut snapshot = rlm.snapshot();
228 let mut changed = false;
229 let keys = snapshot
230 .globals
231 .keys()
232 .map(str::to_string)
233 .collect::<Vec<_>>();
234 for key in keys {
235 if let Some(value) = snapshot.globals.get_mut(&key) {
236 changed |= rehydrate_projected_value(value, Arc::clone(&projection_resolver)).await?;
237 }
238 }
239 if changed {
240 *rlm = FlowState::from_snapshot(snapshot);
241 }
242 Ok(())
243}
244
245fn rehydrate_projected_value<'a>(
246 value: &'a mut FlowValue,
247 projection_resolver: Arc<dyn ProjectionResolver>,
248) -> ProjectedFuture<'a, Result<bool, String>> {
249 Box::pin(async move {
250 match value {
251 FlowValue::Projected(projected) => {
252 let Some(ref_json) = projected.projection_ref().cloned() else {
253 return Ok(false);
254 };
255 let name = projected.name().to_string();
256 let reference = serde_json::from_value::<ProjectionRef>(ref_json.clone())
257 .map_err(|err| format!("invalid projection ref for `{name}`: {err}"))?;
258 let resolved = projection_resolver
259 .resolve_projection(&reference)
260 .await
261 .map_err(|err| err.to_string())?;
262 *value = FlowValue::Projected(ProjectedValue::custom_with_projection_ref(
263 name, resolved, ref_json,
264 ));
265 Ok(true)
266 }
267 FlowValue::List(values) => {
268 let mut changed = false;
269 let mut restored = values.iter().cloned().collect::<Vec<_>>();
270 for value in restored.iter_mut() {
271 changed |=
272 rehydrate_projected_value(value, Arc::clone(&projection_resolver)).await?;
273 }
274 if changed {
275 *value = FlowValue::List(restored.into());
276 }
277 Ok(changed)
278 }
279 FlowValue::Record(record) => {
280 let mut changed = false;
281 let record = Arc::make_mut(record);
282 let keys = record.keys().map(str::to_string).collect::<Vec<_>>();
283 for key in keys {
284 if let Some(value) = record.get_mut(&key) {
285 changed |=
286 rehydrate_projected_value(value, Arc::clone(&projection_resolver))
287 .await?;
288 }
289 }
290 Ok(changed)
291 }
292 FlowValue::Null
293 | FlowValue::Bool(_)
294 | FlowValue::Number(_)
295 | FlowValue::String(_)
296 | FlowValue::Resource(_)
297 | FlowValue::Image(_) => Ok(false),
298 }
299 })
300}
301
302fn json_map_to_image(map: &serde_json::Map<String, Value>) -> Option<ImageValue> {
303 if map.get("type")?.as_str()? != "image" {
304 return None;
305 }
306 Some(ImageValue::new(
307 map.get("id")?.as_str()?.to_string(),
308 map.get("label")?.as_str()?.to_string(),
309 map.get("size")?.as_u64()?,
310 optional_json_u32(map.get("width")?)?,
311 optional_json_u32(map.get("height")?)?,
312 ))
313}
314
315fn optional_json_u32(value: &Value) -> Option<Option<u32>> {
316 match value {
317 Value::Null => Some(None),
318 Value::Number(number) => number
319 .as_u64()
320 .and_then(|value| u32::try_from(value).ok())
321 .map(Some),
322 _ => None,
323 }
324}
325
326pub(crate) async fn format_output_value(value: &FlowValue) -> String {
327 match value {
328 FlowValue::Null => "null".to_string(),
329 FlowValue::String(text) => text.to_string(),
330 FlowValue::Bool(value) => value.to_string(),
331 FlowValue::Number(value) => value.to_string(),
332 FlowValue::Image(_)
333 | FlowValue::Resource(_)
334 | FlowValue::List(_)
335 | FlowValue::Record(_)
336 | FlowValue::Projected(_) => serde_json::to_string(&flow_to_json_value(value).await)
337 .unwrap_or_else(|_| value.to_string()),
338 }
339}
340
341pub(crate) fn projection_ref_from_seed_value(
342 name: &str,
343 value: &Value,
344) -> Result<Option<ProjectionRef>, RlmProjectedSeedError> {
345 let Some(inner) = lash_rlm_types::projection_ref_inner(value) else {
346 return Ok(None);
347 };
348 serde_json::from_value::<ProjectionRef>(inner.clone())
349 .map(Some)
350 .map_err(|err| RlmProjectedSeedError::invalid_projection_ref(name, err))
351}